dubbo的连接机制
这里直接上结论了,dubbo默认是使用单一长连接,即消费者与每个服务提供者建立一个单一长连接,即如果有消费者soa-user1,soa-user2,提供者soa-account三台,则每台消费者user都会与3台account建立一个连接,结果是每台消费者user有3个长连接到分别到3台提供者,每台提供者account维持到soa-user1和soa-user2的2个长连接。
为什么这么做
dubbo这么设计的原因是,一般情况下因为消费者是在请求链路的上游,消费者承担的连接数以及并发量都是最高的,他需要承担更多其他的连接请求,而对提供者来说,承担的连接只来于消费者,所以每台提供者只需要承接消费者数量的连接就可以了,dubbo面向的就是消费者数量远大于服务提供者的情况。所以说,现在很多项目使用的都是消费者和提供者不分的情况,这种情况并没有很好的利用这个机制。
dubbo同步转异步
dubbo的底层是使用netty,netty之前介绍过是非阻塞的,但是dubbo调用我们大多数时候都是使用的同步调用,那么这里是怎么异步转同步的呢?这里其实延伸下,不只是dubbo,大多数在web场景下,还是同步请求为主,那么netty中要如何将异步转同步?我这边描述一下关键步骤。
dubbo的实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| protected Result doInvoke(final Invocation invocation) throws Throwable { if (isOneway) { boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false); currentClient.send(inv, isSent); RpcContext.getContext().setFuture(null); return new RpcResult(); } else if (isAsync) { ResponseFuture future = currentClient.request(inv, timeout); FutureAdapter<Object> futureAdapter = new FutureAdapter<>(future); RpcContext.getContext().setFuture(futureAdapter); Result result; if (isAsyncFuture) { result = new AsyncRpcResult(futureAdapter, futureAdapter.getResultFuture(), false); } else { result = new SimpleAsyncRpcResult(futureAdapter, futureAdapter.getResultFuture(), false); } return result; } else { RpcContext.getContext().setFuture(null); return (Result) currentClient.request(inv, timeout) .get(); } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| public ResponseFuture request(Object request, int timeout) throws RemotingException { Request req = new Request(); req.setVersion(Version.getProtocolVersion()); req.setTwoWay(true); req.setData(request); DefaultFuture future = DefaultFuture.newFuture(channel, req, timeout); try { channel.send(req); } catch (RemotingException e) { future.cancel(); throw e; } return future; }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59
|
public Object get(int timeout) throws RemotingException { if (timeout <= 0) { timeout = 1000; } if (!this.isDone()) { long start = System.currentTimeMillis(); this.lock.lock();
try { while(!this.isDone()) { this.done.await((long)timeout, TimeUnit.MILLISECONDS); if (this.isDone() || System.currentTimeMillis() - start > (long)timeout) { break; } } } catch (InterruptedException var8) { throw new RuntimeException(var8); } finally { this.lock.unlock(); }
if (!this.isDone()) { throw new TimeoutException(this.sent > 0L, this.channel, this.getTimeoutMessage(false)); } }
return this.returnFromResponse(); }
public static void received(Channel channel, Response response) { try { DefaultFuture future = FUTURES.remove(response.getId()); if (future != null) { future.doReceived(response); } else { } } finally { CHANNELS.remove(response.getId()); } } private void doReceived(Response res) { lock.lock(); try { response = res; if (done != null) { done.signal(); } } finally { lock.unlock(); } if (callback != null) { invokeCallback(callback); } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48
|
public class HeaderExchangeHandler implements ChannelHandlerDelegate { public void received(Channel channel, Object message) throws RemotingException { channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis()); HeaderExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
try { if (message instanceof Request) { Request request = (Request)message; if (request.isEvent()) { this.handlerEvent(channel, request); } else if (request.isTwoWay()) { Response response = this.handleRequest(exchangeChannel, request); channel.send(response); } else { this.handler.received(exchangeChannel, request.getData()); } } else if (message instanceof Response) { handleResponse(channel, (Response)message); } else if (message instanceof String) { if (isClientSide(channel)) { Exception e = new Exception("Dubbo client can not supported string message: " + message + " in channel: " + channel + ", url: " + channel.getUrl()); logger.error(e.getMessage(), e); } else { String echo = this.handler.telnet(channel, (String)message); if (echo != null && echo.length() > 0) { channel.send(echo); } } } else { this.handler.received(exchangeChannel, message); } } finally { HeaderExchangeChannel.removeChannelIfDisconnected(channel); }
} static void handleResponse(Channel channel, Response response) throws RemotingException { if (response != null && !response.isHeartbeat()) { DefaultFuture.received(channel, response); }
} }
|
纯netty的简单实现
纯netty的简单实现,其实也很简单,在创建handler时,构造时将外部的FutureTask对象构造到hanlder中,外面使用FutureTask对象get方法阻塞,handler中在最后有结果时,将FutureTask的结果set一下,外部就取消了阻塞。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61
| public SettableTask<String> sendAndSync(FullHttpRequest httpContent){ final SettableTask<String> responseFuture = new SettableTask<>(); ChannelFutureListener connectionListener = future -> { if (future.isSuccess()) { Channel channel = future.channel(); channel.pipeline().addLast(new SpidersResultHandler(responseFuture)); } else { responseFuture.setExceptionResult(future.cause()); } }; try { Channel channel = channelPool.acquire().syncUninterruptibly().getNow(); log.info("channel status:{}",channel.isActive()); channel.writeAndFlush(httpContent).addListener(connectionListener); } catch (Exception e) { log.error("netty写入异常!", e); } return responseFuture; } public class SettableTask<T> extends FutureTask<T> { public SettableTask() { super(() -> { throw new IllegalStateException("Should never be called"); }); }
public void setResultValue(T value) { this.set(value);
}
public void setExceptionResult(Throwable exception) { this.setException(exception); }
@Override protected void done() { super.done(); } } public class SpidersResultHandler extends SimpleChannelInboundHandler<String> { private SettableTask<String> future; public SpidersResultHandler(SettableTask<String> future){ this.future=future; } @Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, String httpContent) throws Exception { log.info("result={}",httpContent); future.setResultValue(httpContent); }
@Override public void exceptionCaught(ChannelHandlerContext channelHandlerContext, Throwable throwable) throws Exception { log.error("{}异常", this.getClass().getName(), throwable);
} }
|
总结
dubbo的高性能,也源于他对每个点不断的优化,最早的时候我记得看到一篇文章写到:dubbo的异步转同步机制,是使用的CountDownLatch实现的。现在想来,可能是在乱说。一些框架的原理,还是要自己多思考多翻看,才能掌握。