博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
okHttp3源码简要分析
阅读量:4696 次
发布时间:2019-06-09

本文共 20171 字,大约阅读时间需要 67 分钟。

首先看一下使用,

public static void main(String[] args) throws IOException {	    OkHttpClient client = new OkHttpClient();  //创建client        Request request = new Request.Builder().url("http://www.xxx.com")                  .get().build();  //构建请求体        Call call = client.newCall(request);//产生一个请求call           //同步请求        Response response=call.execute();          System.out.println("result=:"+response.body().string());          //异步请求,enqueue方式          Call call2 = client.newCall(request);          call2.enqueue(new Callback() {            			@Override			public void onFailure(Call call, IOException e) {				// TODO Auto-generated method stub							}			@Override			public void onResponse(Call call, Response response)					throws IOException {				// TODO Auto-generated method stub							}        });  	}

 

这里主要分为一下几个步骤:

1,创建OKHttpClient,配置参数

2,创建指定Request对象

3,使用OKHttpClient与指定的Request产生Call,指定对于Callback接收请求反馈结果

4,调用执行,发起请求

Call对象分析:

从client.newCall(request)开始,client根据request创建请求Call,这个Call为一接口对象,真正实现的是RealCall

/**   * Prepares the {@code request} to be executed at some point in the future.   */  @Override public Call newCall(Request request) {    return new RealCall(this, request, false /* for web socket */);  }final class RealCall implements Call {  final OkHttpClient client;  final RetryAndFollowUpInterceptor retryAndFollowUpInterceptor;  /** The application's original request unadulterated by redirects or auth headers. */  final Request originalRequest;  final boolean forWebSocket;  // Guarded by this.  private boolean executed;  RealCall(OkHttpClient client, Request originalRequest, boolean forWebSocket) {    this.client = client;    this.originalRequest = originalRequest;    this.forWebSocket = forWebSocket;    this.retryAndFollowUpInterceptor = new RetryAndFollowUpInterceptor(client, forWebSocket);  }......

 

这里体现编程一个原则:这对接口编写,不针对具体对象。

继续看一下excute实现,

@Override public Response execute() throws IOException {    synchronized (this) {//判断call是否执行过,可以看出每个Call对象只能使用一次原则      if (executed) throw new IllegalStateException("Already Executed");      executed = true;    }    captureCallStackTrace();    try {      client.dispatcher().executed(this);//把请求任务加入dispatcher任务调度器等待队列      Response result = getResponseWithInterceptorChain();//使用拦截器处理reapose,类似JavaWeb Struct2拦截器功能      if (result == null) throw new IOException("Canceled");//调用cancel时,responese为null      return result;    } finally {      client.dispatcher().finished(this);    }  }

 

Dispatcher负责Request调度处理,我们看一下这个类实现

public final class Dispatcher {  private int maxRequests = 64;  private int maxRequestsPerHost = 5;  private Runnable idleCallback;  /** Executes calls. Created lazily. */  private ExecutorService executorService;  /** Ready async calls in the order they'll be run. */ //双端队列维护异步中等待执行任务  private final Deque
readyAsyncCalls = new ArrayDeque<>(); /** Running asynchronous calls. Includes canceled calls that haven't finished yet. */ private final Deque
runningAsyncCalls = new ArrayDeque<>();//双端队列维护异步中正在运行中的任务 /** Running synchronous calls. Includes canceled calls that haven't finished yet. */ private final Deque
runningSyncCalls = new ArrayDeque<>();//同步等待执行任务 public Dispatcher(ExecutorService executorService) {//可以更换自定义的线程池 this.executorService = executorService; } public Dispatcher() { } public synchronized ExecutorService executorService() { if (executorService == null) {//异步请求线程池,无上限,自动回收限制60s的线程 executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS, new SynchronousQueue
(), Util.threadFactory("OkHttp Dispatcher", false)); } return executorService; } /** Used by {@code Call#execute} to signal it is in-flight. */ synchronized void executed(RealCall call) {//加入同步请求队列 runningSyncCalls.add(call); } synchronized void enqueue(AsyncCall call) {//加入异步请求队列,最多64个请求,相同host请求为5个 if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) { runningAsyncCalls.add(call); executorService().execute(call); } else {//超过最大同时请求数量,则加入等待队列 readyAsyncCalls.add(call); } } private
void finished(Deque
calls, T call, boolean promoteCalls) { ... synchronized (this) {//异步任务执行完成后,移除已经完成Call,调用加入等待执行任务到运行中的队列 if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!"); if (promoteCalls) promoteCalls(); runningCallsCount = runningCallsCount(); idleCallback = this.idleCallback; }... } private void promoteCalls() {//将等待执行的队列任务,放入执行的任务队列中,限制最大数量,继续执行 if (runningAsyncCalls.size() >= maxRequests) return; // Already running max capacity. if (readyAsyncCalls.isEmpty()) return; // No ready calls to promote. for (Iterator
i = readyAsyncCalls.iterator(); i.hasNext(); ) { AsyncCall call = i.next(); if (runningCallsForHost(call) < maxRequestsPerHost) { i.remove(); runningAsyncCalls.add(call); executorService().execute(call); } if (runningAsyncCalls.size() >= maxRequests) return; // Reached max capacity. } }

 

AcyncCal为RealCall内部类,实现NamedRunnable,NamedRunnable为Runable实现,定义excute方法由run调用

final class AsyncCall extends NamedRunnable {    ......public abstract class NamedRunnable implements Runnable {  protected final String name;  public NamedRunnable(String format, Object... args) {    this.name = Util.format(format, args);  }  @Override public final void run() {    String oldName = Thread.currentThread().getName();    Thread.currentThread().setName(name);    try {      execute();    } finally {      Thread.currentThread().setName(oldName);    }  }  protected abstract void execute();

 

AsyncCal的excute实现:

@Override protected void execute() {      boolean signalledCallback = false;      try {        Response response = getResponseWithInterceptorChain();//response拦截器实现,通过拦截器处理Response        if (retryAndFollowUpInterceptor.isCanceled()) {//取消的任务调用onnfail          signalledCallback = true;          responseCallback.onFailure(RealCall.this, new IOException("Canceled"));        } else {          signalledCallback = true;          responseCallback.onResponse(RealCall.this, response);        }      } catch (IOException e) {        if (signalledCallback) {          // Do not signal the callback twice!          Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);        } else {          responseCallback.onFailure(RealCall.this, e);        }      } finally {        client.dispatcher().finished(this);      }    }

因为excute在run里调用,所以这里的callback方法是在线程里回调的!!

强大的Interceptor:okhttp内部设计了Interceptor,使其设计更加灵活,使得每个request,response对其进行相关拦截与修改,

上面分析看出其实对网络处理是以Interceptor处理,那我们看一下Response result = getResponseWithInterceptorChain();

Response getResponseWithInterceptorChain() throws IOException {    // Build a full stack of interceptors.    List
interceptors = new ArrayList<>(); interceptors.addAll(client.interceptors()); interceptors.add(retryAndFollowUpInterceptor);//RetryAndFollowUpInterceptor 负责失败重连,重定向处理 interceptors.add(new BridgeInterceptor(client.cookieJar()));//处理网络协议Request转换以及网络Response到本地Response转换处理 interceptors.add(new CacheInterceptor(client.internalCache()));//连接server Response缓存处理 interceptors.add(new ConnectInterceptor(client));//网络连接处理 if (!forWebSocket) { interceptors.addAll(client.networkInterceptors()); } interceptors.add(new CallServerInterceptor(forWebSocket));//http connect write read处理 Interceptor.Chain chain = new RealInterceptorChain( interceptors, null, null, null, 0, originalRequest); return chain.proceed(originalRequest); }

 

由此可以看出OKHttp本身很多功能也是采用Interceptor按功能划分来处理

这里面涉及到设计模式FilterChain(责任链)过滤器来处理(l类似Strcuct2拦截机制),整个请求过程通过递归调用由RealInterceptorChain的process()来完成,在每个Interceptor

递归下一个Interceptor 直到最后返回RealInterceptorChain interceptor完成整个流程响应处理

public Response proceed(Request request, StreamAllocation streamAllocation, HttpCodec httpCodec,      Connection connection) throws IOException {    if (index >= interceptors.size()) throw new AssertionError();    ......    // Call the next interceptor in the chain.//每个Intercepter都是通过RealInterceptorChain封装后在进行调用下一个interceptor    RealInterceptorChain next = new RealInterceptorChain(        interceptors, streamAllocation, httpCodec, connection, index + 1, request);    Interceptor interceptor = interceptors.get(index);    Response response = interceptor.intercept(next);    ......    return response;  }

 

Http connect连接处理 ,ConnectInterceptor负责处理server连接。

public final class ConnectInterceptor implements Interceptor { ...  @Override public Response intercept(Chain chain) throws IOException {    RealInterceptorChain realChain = (RealInterceptorChain) chain;    Request request = realChain.request();    StreamAllocation streamAllocation = realChain.streamAllocation();    // We need the network to satisfy this request. Possibly for validating a conditional GET.    boolean doExtensiveHealthChecks = !request.method().equals("GET"); //HttpCodechttp request编码,response解码处理    HttpCodec httpCodec = streamAllocation.newStream(client, doExtensiveHealthChecks); //一个请求连接    RealConnection connection = streamAllocation.connection();    return realChain.proceed(request, streamAllocation, httpCodec, connection);  }}

 StreamAllocation负责维护:连接(Connection)创建,取消,释放(Connection)内部使用ConnectionPool方式存取Connection

 

public final class ConnectionPool {//与前面一样不限制最大个数  private static final Executor executor = new ThreadPoolExecutor(0 /* corePoolSize */,      Integer.MAX_VALUE /* maximumPoolSize */, 60L /* keepAliveTime */, TimeUnit.SECONDS,      new SynchronousQueue
(), Util.threadFactory("OkHttp ConnectionPool", true)); //最大连接数量 private final int maxIdleConnections; //清理资源间隔 private final long keepAliveDurationNs;//清理资源执行任务 private final Runnable cleanupRunnable = new Runnable() { @Override public void run() { while (true) { long waitNanos = cleanup(System.nanoTime()); if (waitNanos == -1) return; if (waitNanos > 0) { long waitMillis = waitNanos / 1000000L; waitNanos -= (waitMillis * 1000000L); synchronized (ConnectionPool.this) { try { ConnectionPool.this.wait(waitMillis, (int) waitNanos); } catch (InterruptedException ignored) { } } } } } };//双端队列存储connection private final Deque
connections = new ArrayDeque<>(); final RouteDatabase routeDatabase = new RouteDatabase(); boolean cleanupRunning; //默认为最多5个connection,每5min定时清理无用的连接资源 public ConnectionPool() { this(5, 5, TimeUnit.MINUTES); } public ConnectionPool(int maxIdleConnections, long keepAliveDuration, TimeUnit timeUnit) { this.maxIdleConnections = maxIdleConnections; this.keepAliveDurationNs = timeUnit.toNanos(keepAliveDuration); // Put a floor on the keep alive duration, otherwise cleanup will spin loop. if (keepAliveDuration <= 0) { throw new IllegalArgumentException("keepAliveDuration <= 0: " + keepAliveDuration); } }

 

 

拦截器应用,看一下官方例子

OkHttpClient client = new OkHttpClient.Builder()    .addInterceptor(new LoggingInterceptor())    .build();Request request = new Request.Builder()    .url("http://www.publicobject.com/helloworld.txt")    .header("User-Agent", "OkHttp Example")    .build();Response response = client.newCall(request).execute(); class LoggingInterceptor implements Interceptor {          @Override public Response intercept(Interceptor.Chain chain) throws IOException {             //get request instance detail modify or add header and so on            Request request = chain.request();            long t1 = System.nanoTime();            logger.info(String.format("Sending request %s on %s%n%s",                request.url(), chain.connection(), request.headers()));            //get response instance detail modify body and so on            Response response = chain.proceed(request);            long t2 = System.nanoTime();            logger.info(String.format("Received response for %s in %.1fms%n%s",                response.request().url(), (t2 - t1) / 1e6d, response.headers()));            return response;          }        }

 

INFO: Sending request http://www.publicobject.com/helloworld.txt on nullUser-Agent: OkHttp ExampleINFO: Received response for https://publicobject.com/helloworld.txt in 1179.7msServer: nginx/1.4.6 (Ubuntu)Content-Type: text/plainContent-Length: 1759Connection: keep-alive

 

通过一个拦截器很容易实现网络每个请求日志记录使用,可以看到拦截器可以对同一个request操作,也可以对Response进行处理既支持双向处理

再来一个自己的实现

class MyIntercepter implements Interceptor {		@Override		public Response intercept(Chain chain) throws IOException {			Response response = null;			//指定debug下拦截指定的url 的response			if (BuildConfig.DEBUG					&& chain.request().url().uri().getPath()							.equals("/api/id")) {				// 这里读取我们需要返回的 Json 字符串				String responseString = "{...}";				//构建新的response				response = new Response.Builder()						.code(200)						.message(responseString)						.request(chain.request())						.protocol(Protocol.HTTP_1_0)						.body(ResponseBody.create(								MediaType.parse("application/json"),								responseString.getBytes()))						.addHeader("content-type", "application/json").build();			} else {				response = chain.proceed(chain.request());			}			return response;		}	}

 

从例子可以看出拦截器强大与灵活。

 

okhttp中分为两种拦截方式,

1,Application Interceptors
2,Network Interceptors

 

上一个例子是Application Interceptors,修改为Network Interceptors后再看

OkHttpClient client = new OkHttpClient.Builder()	    //.addInterceptor(new LoggingInterceptor())	    .addNetworkInterceptor(new LoggingInterceptor())	    .build();

 

INFO: Sending request http://www.publicobject.com/helloworld.txt on Connection{www.publicobject.com:80, proxy=DIRECT hostAddress=54.187.32.157 cipherSuite=none protocol=http/1.1}User-Agent: OkHttp ExampleHost: www.publicobject.comConnection: Keep-AliveAccept-Encoding: gzipINFO: Received response for http://www.publicobject.com/helloworld.txt in 115.6msServer: nginx/1.4.6 (Ubuntu)Content-Type: text/htmlContent-Length: 193Connection: keep-aliveLocation: https://publicobject.com/helloworld.txtINFO: Sending request https://publicobject.com/helloworld.txt on Connection{publicobject.com:443, proxy=DIRECT hostAddress=54.187.32.157 cipherSuite=TLS_ECDHE_RSA_WITH_AES_256_CBC_SHA protocol=http/1.1}User-Agent: OkHttp ExampleHost: publicobject.comConnection: Keep-AliveAccept-Encoding: gzipINFO: Received response for https://publicobject.com/helloworld.txt in 80.9msServer: nginx/1.4.6 (Ubuntu)Content-Type: text/plainContent-Length: 1759Connection: keep-alive

这个是链接发生重定向日志结果,变为Network Interceptors时信息更全,网络中的具体情况redirect,retries等都会调用process(即多次执行process)

 

而Application Interceptors只会调用一次。

应用场景;通过intercepter我们可以更加方便灵活的方式实现不同方式,

修改header参数:

Request request = new Request.Builder()    .url("https://api.github.com/repos/square/okhttp/issues")    .header("User-Agent", "OkHttp Headers.java")    .addHeader("Accept", "application/json; q=0.5")    .addHeader("Accept", "application/vnd.github.v3+json")    .build();

 

通过intercepter实现

public Response intercept(Interceptor.Chain chain) throws IOException {        Request request = chain.request();        Request newRequest;        newRequest = request.newBuilder()                .addHeader(HeadersContract.HEADER_AUTHONRIZATION, O_AUTH_AUTHENTICATION)                .addHeader(HeadersContract.HEADER_X_CLIENT_ID, CLIENT_ID)                .build();        return chain.proceed(newRequest);    }

 

可以看出intercepterer大大增加程序可扩展性,像插件一样可以方便特性,强大的组合功能

简要总结:

1,OkHttp采用Dispatcher与线程池配合,实现高并发,低阻塞

2,采用Deque作为缓存,按照入队的顺序先进先出
3,使用try/finally中调用finished,而不是采用锁的方式实现任务调度,极大减少多并发锁竞争而带来的效率问题
4,通过Intercepter实现实现功能划分处理,可扩展,自定义配置十分灵活

 

okhttp3结合okio request response读写操作,

private boolean bodyHasUnsupportedEncoding(Headers headers) {
        String contentEncoding = headers.get("Content-Encoding");         return contentEncoding != null &&                 !contentEncoding.equalsIgnoreCase("identity") &&                 !contentEncoding.equalsIgnoreCase("gzip");     }     private boolean bodyGzipped(Headers headers) {
        String contentEncoding = headers.get("Content-Encoding");         return "gzip".equalsIgnoreCase(contentEncoding);     } private String readFromBuffer(Buffer buffer, Charset charset) { long bufferSize = buffer.size(); long maxBytes = Math.min(bufferSize, maxContentLength); String body = ""; try { body = buffer.readString(maxBytes, charset); } catch (EOFException e) { body += context.getString(R.string.chuck_body_unexpected_eof); } if (bufferSize > maxContentLength) { body += context.getString(R.string.chuck_body_content_truncated); } return body; } private BufferedSource getNativeSource(BufferedSource input, boolean isGzipped) { if (isGzipped) { GzipSource source = new GzipSource(input); return Okio.buffer(source); } else { return input; } } private BufferedSource getNativeSource(Response response) throws IOException { if (bodyGzipped(response.headers())) { BufferedSource source = response.peekBody(maxContentLength).source(); if (source.buffer().size() < maxContentLength) { return getNativeSource(source, true); } else { Log.w(LOG_TAG, "gzip encoded response was too long"); } } return response.body().source(); }

 

private static final Charset UTF8 = Charset.forName("UTF-8");private long maxContentLength = 250000L;/**     * Returns true if the body in question probably contains human readable text. Uses a small sample     * of code points to detect unicode control characters commonly used in binary file signatures.     */    private boolean isPlaintext(Buffer buffer) {        try {            Buffer prefix = new Buffer();            long byteCount = buffer.size() < 64 ? buffer.size() : 64;            buffer.copyTo(prefix, 0, byteCount);            for (int i = 0; i < 16; i++) {                if (prefix.exhausted()) {                    break;                }                int codePoint = prefix.readUtf8CodePoint();                if (Character.isISOControl(codePoint) && !Character.isWhitespace(codePoint)) {                    return false;                }            }            return true;        } catch (EOFException e) {            return false; // Truncated UTF-8 sequence.        }    }ResponseBody responseBody = response.body();        transaction.setRequestHeaders(response.request().headers()); // includes headers added later in the chain        transaction.setResponseDate(new Date());        transaction.setProtocol(response.protocol().toString());        transaction.setResponseCode(response.code());        transaction.setResponseMessage(response.message());        transaction.setResponseContentLength(responseBody.contentLength());        if (responseBody.contentType() != null) {            transaction.setResponseContentType(responseBody.contentType().toString());        }        transaction.setResponseHeaders(response.headers());        transaction.setResponseBodyIsPlainText(!bodyHasUnsupportedEncoding(response.headers()));        if (HttpHeaders.hasBody(response) && transaction.responseBodyIsPlainText()) {            BufferedSource source = getNativeSource(response);            source.request(Long.MAX_VALUE);            Buffer buffer = source.buffer();            Charset charset = UTF8;            MediaType contentType = responseBody.contentType();            if (contentType != null) {                try {                    charset = contentType.charset(UTF8);                } catch (UnsupportedCharsetException e) {                    update(transaction, transactionUri);                    return response;                }            }            if (isPlaintext(buffer)) {                transaction.setResponseBody(readFromBuffer(buffer.clone(), charset));            } else {                transaction.setResponseBodyIsPlainText(false);            }            transaction.setResponseContentLength(buffer.size());        }

 

 

转载于:https://www.cnblogs.com/happyxiaoyu02/p/6150736.html

你可能感兴趣的文章
洛谷 P1407 [国家集训队]稳定婚姻 解题报告
查看>>
Delphi10.2 Tokyo试用(1)
查看>>
基本数据类型的使用
查看>>
让元素水平和垂直居中的方法总结
查看>>
linux定时执行任务crontab命令用法
查看>>
条件判断_python
查看>>
第二十七天-nfs网络文件系统企业级深度讲解
查看>>
Linux下获取占用CPU内存资源最多的10个进程的方法
查看>>
Azure SQL Database (22) Azure SQL Database支持中文值
查看>>
python元类探究
查看>>
Titanium系列--利用js动态获取当前时间
查看>>
从上往下打印二叉树
查看>>
Python 中的self, cls, super的使用和理解
查看>>
java课堂测试2
查看>>
20145236《信息安全系统设计基础》第5周学习总结
查看>>
python中的jion
查看>>
【图论】[NOIP2014]联合权值
查看>>
嵌入式
查看>>
mysql 中文字段排序( UTF8按拼音首字母排序)
查看>>
iOS - 适配iOS 11
查看>>