先从Okhttp3的请求实现说起(包括同步请求和异步请求)。

基本使用

同步请求:

1
2
3
4
5
6
private static final String DESTINATION_ADDRESS = "https://github.com/soulrelay";
OkHttpClient client = new OkHttpClient.Builder().build();
Request request = new Request.Builder()
.url(DESTINATION_ADDRESS)
.build();
Response response = client.newCall(request).execute();

异步请求:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
private static final String DESTINATION_ADDRESS = "https://github.com/soulrelay";
OkHttpClient client = new OkHttpClient.Builder().build();
Request request = new Request.Builder()
.url(DESTINATION_ADDRESS)
.build();
client.newCall(request).enqueue(new Callback() {
@Override
public void onFailure(Call call, IOException e) {
}
@Override
public void onResponse(Call call, Response response) throws IOException {
}
});

小结:

  • 不管是同步请求还是异步请求,都需要初始化OkHttpClient以及创建一个Request,然后再调用OkHttpClinet的newCall方法,创建出一个RealCall对象
  • 对于同步请求,是调用RealCall的execute方法;而异步请求,则是调用RealCall的enqueue方法实现

简介

  • OkHttpCLient:在Okhttp库中,OkHttpClient处于一个中心者的地位,很多功能都需要通过它来转发或者实现的。在创建的时候,初始化了很多功能类,比如缓存,拦截器,网络连接池,分发器等类。 由于初始化比较复杂,OkHttpClient内部提供了Builder模式来创建,一般情况下,OkHttpClient是唯一的
  • Request:是用来构建一个请求对象的,符合Http请求的标准,包含了请求头,方法等等属性,较为复杂,因此同样提供Builder模式构建。
  • Response:是用来构建一个响应对象的,包含了响应头,响应码,数据等等属性,同样也提供Builder模式构建

RealCall

1
2
3
4
// 同步
client.newCall(request).execute()
// 异步
client.newCall(request).enqueue(Callback)

同步和异步请求,都是调用OkHttpClient的newCall方法创建一个RealCall对象,然后通过这个对象,执行请求的

1
2
3
4
@Override
public Call newCall(Request request) {
return new RealCall(this, request);
}

execute 同步请求

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@Override
public Response execute() throws IOException {
// 方法检测,execute只能调用1次
synchronized (this) {
if (executed) throw new IllegalStateException("Already Executed");
executed = true;
}
try {
// 将RealCall添加到Dispatcher中的同步请求队列中
client.dispatcher().executed(this);
// 获取响应的数据
// Okhttp会对数据(请求流和响应流)进行拦截进行一些额外的处理,
// 比如失败重连,添加请求头,没网络优先返回缓存数据等等。
// 这一拦截过程,采用责任链模式来实现的,和Android中的事件传递机制差不多
Response result = getResponseWithInterceptorChain();
if (result == null) throw new IOException("Canceled");
return result;
} finally {
// 请求完成或者取消,都会将RealCall从同步请求队列中移除
client.dispatcher().finished(this);
}
}

enqueue-异步请求

1
2
3
4
5
6
7
8
9
10
@Override
public void enqueue(Callback responseCallback) {
// 和execute一样,enqueue也只能调用1次
synchronized (this) {
if (executed) throw new IllegalStateException("Already Executed");
executed = true;
}
// 执行请求
client.dispatcher().enqueue(new AsyncCall(responseCallback));
}

AsyncCall

AsyncCall–实际上是一个Runnable

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public abstract class NamedRunnable implements Runnable {
protected final String name;
public NamedRunnable(String format, Object... args) {
this.name = Util.format(format, args);
}
@Override public final void run() {
String oldName = Thread.currentThread().getName();
Thread.currentThread().setName(name);
try {
execute();
} finally {
Thread.currentThread().setName(oldName);
}
}
protected abstract void execute();
}

NamedRunnable,继承Runnable的抽象类,根据传入的名称来修改线程名称,抽象出执行任务的execute方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
final class AsyncCall extends NamedRunnable {
// 其他省略,execute方法式真正执行请求
@Override protected void execute() {
boolean signalledCallback = false;
try {
Response response = getResponseWithInterceptorChain();
if (retryAndFollowUpInterceptor.isCanceled()) {
signalledCallback = true;
responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
} else {
signalledCallback = true;
responseCallback.onResponse(RealCall.this, response);
}
} catch (IOException e) {
if (signalledCallback) {
// Do not signal the callback twice!
Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
} else {
responseCallback.onFailure(RealCall.this, e);
}
} finally {
client.dispatcher().finished(this);
}
}
}

AsyncCall中的execute方法,是请求任务执行的方法,获取相应数据最终也是调用了getResponseWithInterceptorChain方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
private Response getResponseWithInterceptorChain() throws IOException {
// Build a full stack of interceptors.
List<Interceptor> interceptors = new ArrayList<>();
interceptors.addAll(client.interceptors());
interceptors.add(retryAndFollowUpInterceptor);
interceptors.add(new BridgeInterceptor(client.cookieJar()));
interceptors.add(new CacheInterceptor(client.internalCache()));
interceptors.add(new ConnectInterceptor(client));
if (!retryAndFollowUpInterceptor.isForWebSocket()) {
interceptors.addAll(client.networkInterceptors());
}
interceptors.add(new CallServerInterceptor(
retryAndFollowUpInterceptor.isForWebSocket()));
Interceptor.Chain chain = new RealInterceptorChain(
interceptors, null, null, null, 0, originalRequest);
return chain.proceed(originalRequest);
}

Dispatcher

原理:

  • 对于异步请求来说,Dispatcher类内部持有一个正在执行(运行中)异步任务的队列和一个等待执行(就绪)异步请求的队列,以及一个线程池
  • 这个线程池,没有常存的核心线程,最多线程数为Integer.MAX_VALUE,线程空闲时存活时间为60秒,而SynchronousQueue #2 是不保存任务的,所以只要把任务添加进去就会执行
  • OkHttp不是在线程池中维护线程的个数,线程是一直在Dispatcher中直接控制。线程池中的请求都是运行中的请求。这也就是说线程的重用不是线程池控制的,通过源码我们发现线程重用的地方是请求结束的地方finished(AsyncCall call) ,而真正的控制是通过promoteCalls方法, 根据maxRequests和maxRequestsPerHost来调整runningAsyncCalls和readyAsyncCalls,使运行中的异步请求不超过两种最大值,并且如果队列有空闲,将就绪状态的请求归类为运行中
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
public final class Dispatcher {
//最大请求数
private int maxRequests = 64;
//相同host最大请求数
private int maxRequestsPerHost = 5;
//请求执行,懒加载
private ExecutorService executorService;
//就绪状态的异步请求队列
private final Deque<AsyncCall> readyCalls = new ArrayDeque<>();
//运行中的异步请求队列
private final Deque<AsyncCall> runningCalls = new ArrayDeque<>();
//进行中的同步请求,包括那些尚未完成被取消的请求
private final Deque<Call> executedCalls = new ArrayDeque<>();
public Dispatcher(ExecutorService executorService) {
this.executorService = executorService;
}
public Dispatcher() {
}
public synchronized ExecutorService getExecutorService() {
if (executorService == null) {
//corePoolSize 为 0表示,没有核心线程,所有执行请求的线程,使用完了如果过期了(keepAliveTime)就回收了,
//maximumPoolSize 无限大的线程池空间
executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));
}
return executorService;
}
public synchronized void setMaxRequests(int maxRequests) {
if (maxRequests < 1) {
throw new IllegalArgumentException("max < 1: " + maxRequests);
}
this.maxRequests = maxRequests;
promoteCalls();
}
public synchronized int getMaxRequests() {
return maxRequests;
}
public synchronized void setMaxRequestsPerHost(int maxRequestsPerHost) {
if (maxRequestsPerHost < 1) {
throw new IllegalArgumentException("max < 1: " + maxRequestsPerHost);
}
this.maxRequestsPerHost = maxRequestsPerHost;
promoteCalls();
}
public synchronized int getMaxRequestsPerHost() {
return maxRequestsPerHost;
}
synchronized void enqueue(AsyncCall call) {
if (runningCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
runningCalls.add(call);
getExecutorService().execute(call);
} else {
readyCalls.add(call);
}
}
//取消带有tag的所有请求
public synchronized void cancel(Object tag) {
for (AsyncCall call : readyCalls) {
if (Util.equal(tag, call.tag())) {
call.cancel();
}
}
for (AsyncCall call : runningCalls) {
if (Util.equal(tag, call.tag())) {
call.get().canceled = true;
HttpEngine engine = call.get().engine;
if (engine != null) engine.cancel();
}
}
for (Call call : executedCalls) {
if (Util.equal(tag, call.tag())) {
call.cancel();
}
}
}
//异步请求结束
//当该异步请求结束的时候,会调用此方法,
//用于将运行中的异步请求队列中的该请求移除并调整请求队列
//此时就绪队列中的请求就可以进入运行中的队列
synchronized void finished(AsyncCall call) {
if (!runningCalls.remove(call)) throw new AssertionError("AsyncCall wasn't running!");
promoteCalls();
}
//根据maxRequests和maxRequestsPerHost来调整
//runningAsyncCalls和readyAsyncCalls
//使运行中的异步请求不超过两种最大值
//并且如果队列有空闲,将就绪状态的请求归类为运行中。
private void promoteCalls() {
//运行中的异步请求队列的请求数大于最大请求数,那么就没必要去将就绪状态的请求移动到运行中。
//其实就是说,如果有超过最大请求数的请求正在运行,是不需要将其移出队列的,继续运行完即可
if (runningCalls.size() >= maxRequests) return;
//如果就绪的队列为空,那就更没有必要移动了,因为都没有。
if (readyCalls.isEmpty()) return;
//遍历就绪队列
for (Iterator<AsyncCall> i = readyCalls.iterator(); i.hasNext(); ) {
//取出一个请求
AsyncCall call = i.next();
//如果当前请求对应的host下,没有超过maxRequestsPerHost
//那么将其从就绪队列中移除,并加入在运行队列。
if (runningCallsForHost(call) < maxRequestsPerHost) {
//移除
i.remove();
//加入运行队列
runningCalls.add(call);
//立即执行该请求
getExecutorService().execute(call);
}
//如果运行队列已经到达了最大请求数上限
//此时如果还有就绪中的请求,也不管了
if (runningCalls.size() >= maxRequests) return;
}
}
//对比已有的运行中的请求和当前请求的host
private int runningCallsForHost(AsyncCall call) {
int result = 0;
for (AsyncCall c : runningCalls) {
if (c.host().equals(call.host())) result++;
}
return result;
}
synchronized void executed(Call call) {
executedCalls.add(call);
}
//同步请求结束
//当该同步请求结束的时候,会调用此方法,
//用于将运行中的同步请求队列中的该请求移除
synchronized void finished(Call call) {
if (!executedCalls.remove(call)) throw new AssertionError("Call wasn't in-flight!");
}
public synchronized int getRunningCallCount() {
return runningCalls.size();
}
public synchronized int getQueuedCallCount() {
return readyCalls.size();
}
}

.