连接器组件-NioEndpoint

2019-12-27

Tomcat 的 NioEndpoint 组件实现了 I/O 多路复用模型。
对于多路复用的使用,无非就是下面两步:

  1. 创建一个 Selector,在它身上注册感兴趣的事件,然后调用 select 方法,等待感兴趣的事情发生。
  2. 感兴趣的事情发生了,比如可以读了,这时便创建一个线程从 channel 读取数据。

NioEndpoint 它一共包含 LimitLatch、Acceptor、Poller、SocketProcessor 和 Executor 共 5 个组件,它们的工作过程如下图所示。
null

LimitLatch是用来控制连接个数的同步,在LimitLatch里有做详细介绍。
Acceptor是 Endpoint 的属性,由 Endpoint 完成初始化和端口绑定。初始化过程如下:


serverSock = ServerSocketChannel.open();
serverSock.socket().bind(addr,getAcceptCount());
serverSock.configureBlocking(true);

我们可以看到:

  1. bind 方法的第二个参数表示操作系统的等待队列长度,TCP 三次握手建立连接的过程中,内核通常会为每一个 Listen 状态的 Socket 维护两个队列:
  • SYN 队列(半连接队列):这些连接已经接到客户端 SYN;
  • ACCEPT 队列(全连接队列):这些连接已经接到客户端的 ACK,完成了三次握手,等待被 accept 系统调用取走。
    Acceptor 负责从 ACCEPT 队列中取出连接,当 Acceptor 处理不过来时,连接就堆积在 ACCEPT 队列中,这个队列长度也可以通过参数设置。当应用层面的连接数达到最大值时,操作系统可以继续接受连接的,那么操作系统能继续接受的最大连接数就是这个队列长度,可以通过 acceptCount 参数设置,默认大小是 100。
  1. ServerSocketChannel 被设置成阻塞模式,也就是说它是以阻塞的方式接收连接的。这里用阻塞是因为阻塞方式接受连接好实现,可以看到官方解释如下:
     * <p> If this channel is in non-blocking mode then this method will
     * immediately return <tt>null</tt> if there are no pending connections.
     * Otherwise it will block indefinitely until a new connection is available
     * or an I/O error occurs.

Acceptor

Acceptor 实现了Runnable,并且是一个守护线程。它的 run 方法的大体逻辑为:

public void run(){
	while(endpoint.isRunning()){
		 endpoint.countUpOrAwaitConnection();
		 SocketChannel socket = endpoint.serverSocketAccept();
		 endpoint.setSocketOptions(socket)
	}
}

protected boolean setSocketOptions(SocketChannel socket){
	socket.configureBlocking(false);
	NioChannel channel = new NioChannel(socket, bufhandler);
	getPoller0().register(channel);
}

public void register(final NioChannel socket){
	socketWrapper.interestOps(SelectionKey.OP_READ);
	PollerEvent r = new PollerEvent(socket, socketWrapper, OP_REGISTER);
	addEvent(r);
}

private void addEvent(PollerEvent event) {
	events.offer(event);
	if (wakeupCounter.incrementAndGet() == 0) {
		selector.wakeup();
	}
}

ServerSocketChannel 通过serverSock.accept()方法接受新的连接,accept()方法返回获得 SocketChannel 对象,然后将 SocketChannel 对象封装在一个 PollEvent 对象中,并将 PollEvent 对象压入 Poller 的 Queue 中,这是典型的“生产者-消费者”模式,Acceptor 与 Poller 线程之前通过 Queue 通信。对 Queue 的读写又定义为synchronized方法,从而保证了线程安全。Tomcat 的 Acceptor 数量默认设置为 1,即只有一个 Acceptor 线程来接受连接。

Poller

Poller 本质上是一个 Selector,它内部维护了一个 Queue,定义如下:

private final SynchronizedQueue<PollerEvent> events = new SynchronizedQueue<>();

SynchronizedQueue 的方法比如 offer、poll、size 和 clear 方法,都使用了 synchronized 关键字进行修饰,用来保证同一时刻只有一个 Acceptor 线程对 Queue 进行读写。
Poller 也实现了 Runnable 方法,在它的 Run 方法里通过死循环不断消费 events。如果有消费到数据,就会触发 PollerEvent 的 run 方法,PollerEvent 封装了注册感兴趣事件的逻辑。然后通过selector.select查询 channel 状态,一旦可读就生成任务类 SocketProcessor 交给 Executor 去处理。Poller 的另一个重要任务是循环遍历检查自己所管理的 SocketChannel 是否已经超时,如果有超时就关闭这个 SocketChannel。

大致逻辑如下:

while(true){
	hasEvents = events();
	keyCount = selector.selectNow();
	Iterator<SelectionKey> iterator = keyCount > 0 ? selector.selectedKeys().iterator() : null;
	while(iterator != null && iterator.hasNext()){
		SelectionKey sk = iterator.next();
		NioSocketWrapper attachment = (NioSocketWrapper)sk.attachment();
		iterator.remove();
		// 在这里会走到下面processSocket()方法里
		processKey(sk, attachment);
	        timeout(keyCount,hasEvents);             
	}
}

public boolean processSocket(SocketWrapperBase<S> socketWrapper,SocketEvent event, boolean dispatch){
	        SocketProcessorBase<S> sc = processorCache.pop();
            if (sc == null) {
                sc = createSocketProcessor(socketWrapper, event);
            } else {
                sc.reset(socketWrapper, event);
            }
            Executor executor = getExecutor();
            if (dispatch && executor != null) {
                executor.execute(sc);
            } else {
                sc.run();
            }
}

SocketProcessor

上面分析到 Poller 会创建 SocketProcessor 任务类交给线程池处理,而 SocketProcessor 实现了 Runnable 接口,用来定义 Executor 中线程所执行的任务。SocketProcessor 主要就是调用 Http11Processor 组件来处理请求。Http11Processor 读取 Channel 的数据来生成 ServletRequest 对象,这里请你注意:Http11Processor 并不是直接读取 Channel 的。这是因为 Tomcat 支持同步非阻塞 I/O 模型和异步 I/O 模型,在 Java API 中,相应的 Channel 类也是不一样的,比如有 AsynchronousSocketChannel 和 SocketChannel,为了对 Http11Processor 屏蔽这些差异,Tomcat 设计了一个包装类叫作 SocketWrapper,Http11Processor 只调用 SocketWrapper 的方法去读写数据。

Executor

Executor 是 Tomcat 定制版的线程池,Executor篇幅介绍了 Tomcat 怎么扩展和使用 Java 原生的线程池。


道路且长,行则将至。