博客
关于我
Tomcat 7 中 NIO 处理分析
阅读量:105 次
发布时间:2019-02-25

本文共 31317 字,大约阅读时间需要 104 分钟。

Tomcat 的 Connector 有三种运行模式 bio、nio、apr ,先了解一下这三种的区别。

  1. bio(blocking I/O),顾名思义即阻塞式 I/O 操作,表示 Tomcat 使用的是传统的 Java I/O 操作(即java.io包及其子包)。Tomcat 在默认情况下,就是以 bio 模式运行的。一般而言 bio 模式是三种运行模式中性能最低的一种。
  2. nio(new I/O),是 Java SE 1.4 及后续版本提供的一种新的 I/O 操作方式(即java.nio包及其子包)。Java nio 是一个基于缓冲区并能提供非阻塞 I/O 操作的 Java API ,因此 nio 也被看成是 non-blocking I/O 的缩写。它拥有比传统 I/O 操作( bio )更好的并发运行性能。要让 Tomcat 以 nio 模式来运行只需要在 Tomcat 安装目录/conf/server.xml文件中将 Connector 节点的 protocol 配置成org.apache.coyote.http11.Http11NioProtocol即可。
  3. apr( Apache Portable Runtime/Apache 可移植运行时),是 Apache HTTP 服务器的支持库。可以简单地理解为 Tomcat 将以 JNI 的形式调 用Apache HTTP 服务器的核心动态链接库来处理文件读取或网络传输操作,从而大大提高 Tomcat 对静态文件的处理性能。 Tomcat apr 也是在 Tomcat 上运行高并发应用的首选模式。

写个 BIO 的 Socket 服务器还是比较容易的,无非是每 accept 一个 socket 之后就扔到一个线程中处理请求生成响应,这种方式可以改进的点就是增加线程池的支持,本文主要分析一下 Tomcat 中 NIO 处理方式的相关代码逻辑。

关键代码都是在org.apache.tomcat.util.net.NioEndpoint这个类里面,它是 Http11NioProtocol 中负责接收处理 socket 的主要组件,别看代码很长,仔细阅读会发现有很多共通的地方,如:

  1. 都会对JDK中原有的API做一下扩展或者包装,如 ThreadPoolExecutor 是对java.util.concurrent.ThreadPoolExecutor的扩展,NioChannel 是对 ByteChannel 的扩展,KeyAttachment 则是对 NioChannel 的包装
  2. 很多类设计成非 GC 的,方便缓存和重复使用,实现方式都是通过 ConcurrentLinkedQueue 类构造一个队列。比如 NioEndpoint 类里面的 ConcurrentLinkedQueue processorCache、ConcurrentLinkedQueue keyCache、ConcurrentLinkedQueue eventCache、ConcurrentLinkedQueue nioChannels 。Poller 类里面的 ConcurrentLinkedQueue events

先看下整个 Connector 组件结构图:

看过之前 Tomcat 启动文章的应该都知道,Connector 的启动会调用 Connector 类的 startInternal 方法,里面调用了 protocolHandler 的 start() ,该方法中将调用抽象的 endpoint 的 start() 方法,这个方法会调用到具体 Endpoint 类的 startInternal() ,所以代码分析先从 NioEndpoint 类的 startInternal 看起。

1.NioEndpoint 类核心组件的初始化

/** * Start the NIO endpoint, creating acceptor, poller threads. */@Overridepublic void startInternal() throws Exception {    if (!running) {        running = true;        paused = false;        // Create worker collection        if ( getExecutor() == null ) {            // 构造线程池,用于后续执行SocketProcessor线程,这就是上图中的Worker。            createExecutor();        }        initializeConnectionLatch();        // Start poller threads        // 根据处理器数量构造一定数目的轮询器,即上图中的Poller        pollers = new Poller[getPollerThreadCount()];        for (int i=0; i

startAcceptorThreads 调用的是父类org.apache.tomcat.util.net.AbstractEndpoint中的实现: 

protected final void startAcceptorThreads() {    int count = getAcceptorThreadCount();    acceptors = new Acceptor[count];    for (int i = 0; i < count; i++) {        // 调用子类的createAcceptor方法,本例中即NioEndpoint类的createAcceptor方法        acceptors[i] = createAcceptor();        String threadName = getName() + "-Acceptor-" + i;        acceptors[i].setThreadName(threadName);        Thread t = new Thread(acceptors[i], threadName);        t.setPriority(getAcceptorThreadPriority());        t.setDaemon(getDaemon());        t.start();    }}

以上就是 Acceptor、Poller、Worker 等核心组件的初始化过程。

2.请求接收

核心组件初始化之后接着就是 Acceptor 线程接收 socket 连接,看下 Acceptor 的源码:

// --------------------------------------------------- Acceptor Inner Class/** * 后台线程,用于监听TCP/IP连接以及将它们分发给相应的调度器处理。 * The background thread that listens for incoming TCP/IP connections and * hands them off to an appropriate processor. */protected class Acceptor extends AbstractEndpoint.Acceptor {    @Override    public void run() {        int errorDelay = 0;        // 循环遍历直到接收到关闭命令        // Loop until we receive a shutdown command        while (running) {            // Loop if endpoint is paused            while (paused && running) {                state = AcceptorState.PAUSED;                try {                    Thread.sleep(50);                } catch (InterruptedException e) {                    // Ignore                }            }            if (!running) {                break;            }            state = AcceptorState.RUNNING;            try {                // 如果已经达到最大连接数则让线程等待                //if we have reached max connections, wait                countUpOrAwaitConnection();                SocketChannel socket = null;                try {                    // 接收连接,这里用的阻塞模式。                    // Accept the next incoming connection from the server                    // socket                    socket = serverSock.accept();                } catch (IOException ioe) {                    //we didn't get a socket                    countDownConnection();                    // Introduce delay if necessary                    errorDelay = handleExceptionWithDelay(errorDelay);                    // re-throw                    throw ioe;                }                // Successful accept, reset the error delay                errorDelay = 0;                // 注意这个setSocketOptions方法                // 它将把上面接收到的socket添加到轮询器Poller中                // setSocketOptions() will add channel to the poller                // if successful                if (running && !paused) {                    if (!setSocketOptions(socket)) {                        countDownConnection();                        closeSocket(socket);                    }                } else {                    countDownConnection();                    closeSocket(socket);                }            } catch (SocketTimeoutException sx) {                // Ignore: Normal condition            } catch (IOException x) {                if (running) {                    log.error(sm.getString("endpoint.accept.fail"), x);                }            } catch (OutOfMemoryError oom) {                try {                    oomParachuteData = null;                    releaseCaches();                    log.error("", oom);                }catch ( Throwable oomt ) {                    try {                        try {                            System.err.println(oomParachuteMsg);                            oomt.printStackTrace();                        }catch (Throwable letsHopeWeDontGetHere){                            ExceptionUtils.handleThrowable(letsHopeWeDontGetHere);                        }                    }catch (Throwable letsHopeWeDontGetHere){                        ExceptionUtils.handleThrowable(letsHopeWeDontGetHere);                    }                }            } catch (Throwable t) {                ExceptionUtils.handleThrowable(t);                log.error(sm.getString("endpoint.accept.fail"), t);            }        }        state = AcceptorState.ENDED;    }}

3.Socket 参数设置

在 Acceptor 里接收到一个连接之后调用 setSocketOptions 方法设置 SocketChannel 的一些参数,然后将 SocketChannel 注册到 Poller 中。看下 setSocketOptions 的实现:

/** * Process the specified connection. */protected boolean setSocketOptions(SocketChannel socket) {    // Process the connection    try {        // 将SocketChannel配置为非阻塞模式        //disable blocking, APR style, we are gonna be polling it        socket.configureBlocking(false);        Socket sock = socket.socket();        // 设置Socket参数值(从server.xml的Connector节点上获取参数值)        // 比如Socket发送、接收的缓存大小、心跳检测等        socketProperties.setProperties(sock);        // 从NioChannel的缓存队列中取出一个NioChannel        // NioChannel是SocketChannel的一个的包装类        // 这里对上层屏蔽SSL和一般TCP连接的差异        NioChannel channel = nioChannels.poll();        // 缓存队列中没有则新建一个NioChannel        if ( channel == null ) {            // SSL setup            if (sslContext != null) {                SSLEngine engine = createSSLEngine();                int appbufsize = engine.getSession().getApplicationBufferSize();                NioBufferHandler bufhandler = new NioBufferHandler(Math.max(appbufsize,socketProperties.getAppReadBufSize()),                                                                   Math.max(appbufsize,socketProperties.getAppWriteBufSize()),                                                                   socketProperties.getDirectBuffer());                channel = new SecureNioChannel(socket, engine, bufhandler, selectorPool);            } else {                // normal tcp setup                NioBufferHandler bufhandler = new NioBufferHandler(socketProperties.getAppReadBufSize(),                                                                   socketProperties.getAppWriteBufSize(),                                                                   socketProperties.getDirectBuffer());                channel = new NioChannel(socket, bufhandler);            }        } else {            // 将SocketChannel关联到从缓存队列中获取的NioChannel上来            channel.setIOChannel(socket);            if ( channel instanceof SecureNioChannel ) {                SSLEngine engine = createSSLEngine();                ((SecureNioChannel)channel).reset(engine);            } else {                channel.reset();            }        }        // 将新接收到的SocketChannel注册到Poller中        getPoller0().register(channel);    } catch (Throwable t) {        ExceptionUtils.handleThrowable(t);        try {            log.error("",t);        } catch (Throwable tt) {            ExceptionUtils.handleThrowable(t);        }        // Tell to close the socket        return false;    }    return true;}

核心调用是最后的getPoller0().register(channel);它将配置好的 SocketChannel 包装成一个 PollerEvent ,然后加入到 Poller 的 events 缓存队列中。

4.读取事件注册

getPoller0 方法将轮询当前的 Poller 数组,从中取出一个 Poller 返回。( Poller 的初始化参见上述第1步:NioEndpoint 类核心组件的初始化)

/** * Return an available poller in true round robin fashion */public Poller getPoller0() {    // 最简单的轮询调度算法,poller的计数器不断加1再对poller数组取余数    int idx = Math.abs(pollerRotater.incrementAndGet()) % pollers.length;    return pollers[idx];}

之后调用 Poller 对象的 register 方法: 

public void register(final NioChannel socket) {    // 设置socket的Poller引用,便于后续处理    socket.setPoller(this);    // 从NioEndpoint的keyCache缓存队列中取出一个KeyAttachment    KeyAttachment key = keyCache.poll();    // KeyAttachment实际是NioChannel的包装类    final KeyAttachment ka = key!=null?key:new KeyAttachment(socket);    // 重置KeyAttachment对象中Poller、NioChannel等成员变量的引用    ka.reset(this,socket,getSocketProperties().getSoTimeout());    ka.setKeepAliveLeft(NioEndpoint.this.getMaxKeepAliveRequests());    ka.setSecure(isSSLEnabled());    // 从Poller的事件对象缓存中取出一个PollerEvent,并用socket初始化事件对象    PollerEvent r = eventCache.poll();    // 设置读操作为感兴趣的操作    ka.interestOps(SelectionKey.OP_READ);//this is what OP_REGISTER turns into.    if ( r==null) r = new PollerEvent(socket,ka,OP_REGISTER);    else r.reset(socket,ka,OP_REGISTER);    // 加入到Poller对象里的事件队列    addEvent(r);}

看下 Poller 类里 addEvent 的代码: 

/** * Only used in this class. Will be made private in Tomcat 8.0.x * @deprecated */@Deprecatedpublic void addEvent(Runnable event) {    events.offer(event);    if ( wakeupCounter.incrementAndGet() == 0 ) selector.wakeup();}

就两行,第一行从 event 对象添加到缓存队列中,第二行如果当前事件队列中没有事件,则唤醒处于阻塞状态的 selector 。

5.Poller 处理流程

上面讲的是从 Acceptor 中接收到的 Socket 以 PollerEvent 的形式包装并添加到 Poller 的事件缓存队列中,接下来看看另外一个核心组件 Poller 的处理过程:

/** * Poller class. */public class Poller implements Runnable {    // 这就是NIO中用到的选择器,可以看出每一个Poller都会关联一个Selector    protected Selector selector;    // 待处理的事件队列    protected ConcurrentLinkedQueue events = new ConcurrentLinkedQueue();    // 唤醒多路复用器的条件阈值    protected AtomicLong wakeupCounter = new AtomicLong(0l);    public Poller() throws IOException {        // 对Selector的同步访问,通过调用Selector.open()方法创建一个Selector        synchronized (Selector.class) {            // Selector.open() isn't thread safe            // http://bugs.sun.com/view_bug.do?bug_id=6427854            // Affects 1.6.0_29, fixed in 1.7.0_01            this.selector = Selector.open();        }    }    // 通过addEvent方法将事件添加到Poller的事件队列中    /**     * Only used in this class. Will be made private in Tomcat 8.0.x     * @deprecated     */    @Deprecated    public void addEvent(Runnable event) {        events.offer(event);        // 如果队列中没有待处理的事件则唤醒处于阻塞状态的selector        if ( wakeupCounter.incrementAndGet() == 0 ) selector.wakeup();    }    // 处理事件队列中的所有事件,如果事件队列是空的则返回false    /**     * Processes events in the event queue of the Poller.     *     * @return true if some events were processed,     *   false if queue was empty     */    public boolean events() {        boolean result = false;        Runnable r = null;        // 将Poller的事件队列中的事件逐个取出并执行相应的事件线程        while ( (r = events.poll()) != null ) {            result = true;            try {                // 执行事件处理逻辑                // 这里将事件设计成线程是将具体的事件处理逻辑和事件框架分开                r.run();                if ( r instanceof PollerEvent ) {                    ((PollerEvent)r).reset();                    // 事件处理完之后,将事件对象返回NIOEndpoint的事件对象缓存中                    eventCache.offer((PollerEvent)r);                }            } catch ( Throwable x ) {                log.error("",x);            }        }        return result;    }    // 将socket包装成统一的事件对象PollerEvent,加入到待处理事件队列中    public void register(final NioChannel socket) {        socket.setPoller(this);        KeyAttachment key = keyCache.poll();        final KeyAttachment ka = key!=null?key:new KeyAttachment(socket);        ka.reset(this,socket,getSocketProperties().getSoTimeout());        ka.setKeepAliveLeft(NioEndpoint.this.getMaxKeepAliveRequests());        ka.setSecure(isSSLEnabled());        // 从NIOEndpoint的事件对象缓存中取出一个事件对象        PollerEvent r = eventCache.poll();        ka.interestOps(SelectionKey.OP_READ);//this is what OP_REGISTER turns into.        if ( r==null) r = new PollerEvent(socket,ka,OP_REGISTER);        else r.reset(socket,ka,OP_REGISTER);        // 将事件添加打Poller的事件队列中        addEvent(r);    }    // Poller是一个线程,该线程同Acceptor一样会监听TCP/IP连接并将它们交给合适的处理器处理    /**     * The background thread that listens for incoming TCP/IP connections and     * hands them off to an appropriate processor.     */    @Override    public void run() {        // Loop until destroy() is called        while (true) {            try {                // Loop if endpoint is paused                while (paused && (!close) ) {                    try {                        Thread.sleep(100);                    } catch (InterruptedException e) {                        // Ignore                    }                }                boolean hasEvents = false;                // Time to terminate?                if (close) {                    events();                    timeout(0, false);                    try {                        selector.close();                    } catch (IOException ioe) {                        log.error(sm.getString(                                "endpoint.nio.selectorCloseFail"), ioe);                    }                    break;                } else {                    // 执行事件队列中的事件线程                    hasEvents = events();                }                try {                    if ( !close ) {                        if (wakeupCounter.getAndSet(-1) > 0) {                            // 把wakeupCounter设成-1,这是与addEvent里的代码呼应,这样会唤醒selector                            //if we are here, means we have other stuff to do                            //do a non blocking select                            // 以非阻塞方式查看selector是否有事件发生                            keyCount = selector.selectNow();                        } else {                            // 查看selector是否有事件发生,超过指定时间则立即返回                            keyCount = selector.select(selectorTimeout);                        }                        wakeupCounter.set(0);                    }                    if (close) {                        // 执行事件队列中的事件线程                        events();                        timeout(0, false);                        try {                            selector.close();                        } catch (IOException ioe) {                            log.error(sm.getString(                                    "endpoint.nio.selectorCloseFail"), ioe);                        }                        break;                    }                } catch ( NullPointerException x ) {                    //sun bug 5076772 on windows JDK 1.5                    if ( log.isDebugEnabled() ) log.debug("Possibly encountered sun bug 5076772 on windows JDK 1.5",x);                    if ( wakeupCounter == null || selector == null ) throw x;                    continue;                } catch ( CancelledKeyException x ) {                    //sun bug 5076772 on windows JDK 1.5                    if ( log.isDebugEnabled() ) log.debug("Possibly encountered sun bug 5076772 on windows JDK 1.5",x);                    if ( wakeupCounter == null || selector == null ) throw x;                    continue;                } catch (Throwable x) {                    ExceptionUtils.handleThrowable(x);                    log.error("",x);                    continue;                }                //either we timed out or we woke up, process events first                if ( keyCount == 0 ) hasEvents = (hasEvents | events());                Iterator iterator =                    keyCount > 0 ? selector.selectedKeys().iterator() : null;                // 根据向selector中注册的key遍历channel中已经就绪的keys,并处理这些key                // Walk through the collection of ready keys and dispatch                // any active event.                while (iterator != null && iterator.hasNext()) {                    SelectionKey sk = iterator.next();                    // 这里的attachment方法返回的就是在register()方法中注册的                    // 而KeyAttachment对象是对socket的包装                    KeyAttachment attachment = (KeyAttachment)sk.attachment();                    // Attachment may be null if another thread has called                    // cancelledKey()                    if (attachment == null) {                        iterator.remove();                    } else {                        // 更新通道最近一次发生事件的时间                        // 防止因超时没有事件发生而被剔除出selector                        attachment.access();                        iterator.remove();                        // 具体处理通道的逻辑                        processKey(sk, attachment);                    }                }//while                //process timeouts                // 多路复用器每执行一遍完整的轮询便查看所有通道是否超时                // 对超时的通道将会被剔除出多路复用器                timeout(keyCount,hasEvents);                if ( oomParachute > 0 && oomParachuteData == null ) checkParachute();            } catch (OutOfMemoryError oom) {                try {                    oomParachuteData = null;                    releaseCaches();                    log.error("", oom);                }catch ( Throwable oomt ) {                    try {                        System.err.println(oomParachuteMsg);                        oomt.printStackTrace();                    }catch (Throwable letsHopeWeDontGetHere){                        ExceptionUtils.handleThrowable(letsHopeWeDontGetHere);                    }                }            }        }//while        synchronized (this) {            this.notifyAll();        }        stopLatch.countDown();    }    // 处理selector检测到的通道事件    protected boolean processKey(SelectionKey sk, KeyAttachment attachment) {        boolean result = true;        try {            if ( close ) {                cancelledKey(sk, SocketStatus.STOP, attachment.comet);            } else if ( sk.isValid() && attachment != null ) {                // 确保通道不会因超时而被剔除                attachment.access();//make sure we don't time out valid sockets                sk.attach(attachment);//cant remember why this is here                NioChannel channel = attachment.getChannel();                // 处理通道发生的读写事件                if (sk.isReadable() || sk.isWritable() ) {                    if ( attachment.getSendfileData() != null ) {                        processSendfile(sk,attachment, false);                    } else {                        if ( isWorkerAvailable() ) {                            // 在通道上注销对已经发生事件的关注                            unreg(sk, attachment, sk.readyOps());                            boolean closeSocket = false;                            // Read goes before write                            if (sk.isReadable()) {                                // 具体的通道处理逻辑                                if (!processSocket(channel, SocketStatus.OPEN_READ, true)) {                                    closeSocket = true;                                }                            }                            if (!closeSocket && sk.isWritable()) {                                if (!processSocket(channel, SocketStatus.OPEN_WRITE, true)) {                                    closeSocket = true;                                }                            }                            if (closeSocket) {                                // 解除无效通道                                cancelledKey(sk,SocketStatus.DISCONNECT,false);                            }                        } else {                            result = false;                        }                    }                }            } else {                //invalid key                cancelledKey(sk, SocketStatus.ERROR,false);            }        } catch ( CancelledKeyException ckx ) {            cancelledKey(sk, SocketStatus.ERROR,false);        } catch (Throwable t) {            ExceptionUtils.handleThrowable(t);            log.error("",t);        }        return result;    }    // 这个unreg()很巧妙,防止了通道对同一个事件不断select的问题    protected void unreg(SelectionKey sk, KeyAttachment attachment, int readyOps) {        //this is a must, so that we don't have multiple threads messing with the socket        reg(sk,attachment,sk.interestOps()& (~readyOps));    }    // 向NioChannel注册感兴趣的事件,具体代码看下面的PollerEvent类的说明    protected void reg(SelectionKey sk, KeyAttachment attachment, int intops) {        sk.interestOps(intops);        attachment.interestOps(intops);        attachment.setCometOps(intops);    }}

6.PollerEvent 处理流程

Poller 处理的核心是启动执行事件队列中的 PollerEvent,接着从 selector 中遍历已经就绪的 key ,一旦发生了感兴趣的事件,则交由 processSocket 方法处理。PollerEvent 的作用是向 socket 注册或更新感兴趣的事件:

/**    *    * PollerEvent, cacheable object for poller events to avoid GC    */   public static class PollerEvent implements Runnable {// 每个PollerEvent都会保存NioChannel的引用       protected NioChannel socket;       protected int interestOps;       protected KeyAttachment key;       public PollerEvent(NioChannel ch, KeyAttachment k, int intOps) {           reset(ch, k, intOps);       }       public void reset(NioChannel ch, KeyAttachment k, int intOps) {           socket = ch;           interestOps = intOps;           key = k;       }       public void reset() {           reset(null, null, 0);       }       @Override       public void run() {           //socket第一次注册到selector中,完成对socket读事件的注册           if ( interestOps == OP_REGISTER ) {               try {                   socket.getIOChannel().register(socket.getPoller().getSelector(), SelectionKey.OP_READ, key);               } catch (Exception x) {                   log.error("", x);               }           } else {               // socket之前已经注册到了selector中,更新socket所感兴趣的事件               final SelectionKey key = socket.getIOChannel().keyFor(socket.getPoller().getSelector());               try {                   boolean cancel = false;                   if (key != null) {                       final KeyAttachment att = (KeyAttachment) key.attachment();                       if ( att!=null ) {                           //handle callback flag                           if (att.isComet() && (interestOps & OP_CALLBACK) == OP_CALLBACK ) {                               att.setCometNotify(true);                           } else {                               att.setCometNotify(false);                           }                           interestOps = (interestOps & (~OP_CALLBACK));//remove the callback flag                           // 刷新事件的最后访问时间,防止事件超时                           att.access();//to prevent timeout                           //we are registering the key to start with, reset the fairness counter.                           int ops = key.interestOps() | interestOps;                           att.interestOps(ops);                           key.interestOps(ops);                       } else {                           cancel = true;                       }                   } else {                       cancel = true;                   }                   if ( cancel ) socket.getPoller().cancelledKey(key,SocketStatus.ERROR,false);               }catch (CancelledKeyException ckx) {                   try {                       socket.getPoller().cancelledKey(key,SocketStatus.DISCONNECT,true);                   }catch (Exception ignore) {}               }           }//end if       }//run       @Override       public String toString() {           return super.toString()+"[intOps="+this.interestOps+"]";       }   }

7.将 socket 交给 Worker 执行

在第5步的 Poller 处理流程的分析中看到它的 run 方法最后会调用 processKey() 处理 selector 检测到的通道事件,而在这个方法最后会调用 processSocket 来调用具体的通道处理逻辑,看下 processSocket 方法的实现:

public boolean processSocket(NioChannel socket, SocketStatus status, boolean dispatch) {    try {        KeyAttachment attachment = (KeyAttachment)socket.getAttachment();        if (attachment == null) {            return false;        }        attachment.setCometNotify(false); //will get reset upon next reg        // 从SocketProcessor的缓存队列中取出一个来处理socket        SocketProcessor sc = processorCache.poll();        if ( sc == null ) sc = new SocketProcessor(socket,status);        else sc.reset(socket,status);        // 将有事件发生的socket交给Worker处理        if ( dispatch && getExecutor()!=null ) getExecutor().execute(sc);        else sc.run();    } catch (RejectedExecutionException rx) {        log.warn("Socket processing request was rejected for:"+socket,rx);        return false;    } catch (Throwable t) {        ExceptionUtils.handleThrowable(t);        // This means we got an OOM or similar creating a thread, or that        // the pool and its queue are full        log.error(sm.getString("endpoint.process.fail"), t);        return false;    }    return true;}

Poller 通过 NioEndpoint 的协调,将发生事件的 socket 交给工作者线程 Worker 来进一步处理。整个事件框架的工作就到此结束,下面就是 Worker 的处理。

8.从 socket 中处理请求

在 Tomcat 6 版本的 NIO 处理实现中有一个 Worker 类,在 Tomcat 7 中把它去掉了,但工作者的职责还在,只是交由了上面看到的 SocketProcessor 这个类来担当,看下这个类的实现代码:

// ---------------------------------------------- SocketProcessor Inner Class  // 这个类相当于一个工作者,但只会在一个外部线程池中简单使用。  /**   * This class is the equivalent of the Worker, but will simply use in an   * external Executor thread pool.   */  protected class SocketProcessor implements Runnable {      // 每个SocketProcessor保存一个NioChannel的引用      protected NioChannel socket = null;      protected SocketStatus status = null;      public SocketProcessor(NioChannel socket, SocketStatus status) {          reset(socket,status);      }      public void reset(NioChannel socket, SocketStatus status) {          this.socket = socket;          this.status = status;      }      @Override      public void run() {          // 从socket中获取SelectionKey          SelectionKey key = socket.getIOChannel().keyFor(                  socket.getPoller().getSelector());          KeyAttachment ka = null;          if (key != null) {              ka = (KeyAttachment)key.attachment();          }          // Upgraded connections need to allow multiple threads to access the          // connection at the same time to enable blocking IO to be used when          // NIO has been configured          if (ka != null && ka.isUpgraded() &&                  SocketStatus.OPEN_WRITE == status) {              synchronized (ka.getWriteThreadLock()) {                  doRun(key, ka);              }          } else {              synchronized (socket) {                  doRun(key, ka);              }          }      }      private void doRun(SelectionKey key, KeyAttachment ka) {          try {              int handshake = -1;              try {                  if (key != null) {                      // For STOP there is no point trying to handshake as the                      // Poller has been stopped.                      if (socket.isHandshakeComplete() ||                              status == SocketStatus.STOP) {                          handshake = 0;                      } else {                          handshake = socket.handshake(                                  key.isReadable(), key.isWritable());                          // The handshake process reads/writes from/to the                          // socket. status may therefore be OPEN_WRITE once                          // the handshake completes. However, the handshake                          // happens when the socket is opened so the status                          // must always be OPEN_READ after it completes. It                          // is OK to always set this as it is only used if                          // the handshake completes.                          status = SocketStatus.OPEN_READ;                      }                  }              }catch ( IOException x ) {                  handshake = -1;                  if ( log.isDebugEnabled() ) log.debug("Error during SSL handshake",x);              }catch ( CancelledKeyException ckx ) {                  handshake = -1;              }              if ( handshake == 0 ) {                  SocketState state = SocketState.OPEN;                  // Process the request from this socket                  if (status == null) {                      // 最关键的代码,这里将KeyAttachment(实际就是socket)交给Handler处理请求                      state = handler.process(ka, SocketStatus.OPEN_READ);                  } else {                      state = handler.process(ka, status);                  }                  if (state == SocketState.CLOSED) {                      // Close socket and pool                      try {                          close(ka, socket, key, SocketStatus.ERROR);                      } catch ( Exception x ) {                          log.error("",x);                      }                  }              } else if (handshake == -1 ) {                  close(ka, socket, key, SocketStatus.DISCONNECT);              } else {                  ka.getPoller().add(socket, handshake);              }          } catch (CancelledKeyException cx) {              socket.getPoller().cancelledKey(key, null, false);          } catch (OutOfMemoryError oom) {              try {                  oomParachuteData = null;                  log.error("", oom);                  if (socket != null) {                      socket.getPoller().cancelledKey(key,SocketStatus.ERROR, false);                  }                  releaseCaches();              }catch ( Throwable oomt ) {                  try {                      System.err.println(oomParachuteMsg);                      oomt.printStackTrace();                  }catch (Throwable letsHopeWeDontGetHere){                      ExceptionUtils.handleThrowable(letsHopeWeDontGetHere);                  }              }          } catch (VirtualMachineError vme) {              ExceptionUtils.handleThrowable(vme);          }catch ( Throwable t ) {              log.error("",t);              if (socket != null) {                  socket.getPoller().cancelledKey(key,SocketStatus.ERROR,false);              }          } finally {              socket = null;              status = null;              //return to cache              if (running && !paused) {                  processorCache.offer(this);              }          }      }      private void close(KeyAttachment ka, NioChannel socket, SelectionKey key,              SocketStatus socketStatus) {...      }  }

可以看到由 SocketProcessor 寻找合适的 Handler 处理器做最终 socket 转换处理。

可以用下面这幅图总结一下 NioEndpoint 的主要流程:

Acceptor 和 Poller 是线程数组,Worker 是一个线程池( Executor )

转载地址:http://vtn.baihongyu.com/

你可能感兴趣的文章
MySQL8,体验不一样的安装方式!
查看>>
MySQL: Host '127.0.0.1' is not allowed to connect to this MySQL server
查看>>
Mysql: 对换(替换)两条记录的同一个字段值
查看>>
mysql:Can‘t connect to local MySQL server through socket ‘/var/run/mysqld/mysqld.sock‘解决方法
查看>>
MYSQL:基础——3N范式的表结构设计
查看>>
MYSQL:基础——触发器
查看>>
Mysql:连接报错“closing inbound before receiving peer‘s close_notify”
查看>>
mysqlbinlog报错unknown variable ‘default-character-set=utf8mb4‘
查看>>
mysqldump 参数--lock-tables浅析
查看>>
mysqldump 导出中文乱码
查看>>
mysqldump 导出数据库中每张表的前n条
查看>>
mysqldump: Got error: 1044: Access denied for user ‘xx’@’xx’ to database ‘xx’ when using LOCK TABLES
查看>>
Mysqldump参数大全(参数来源于mysql5.5.19源码)
查看>>
mysqldump备份时忽略某些表
查看>>
mysqldump实现数据备份及灾难恢复
查看>>
mysqldump数据库备份无法进行操作只能查询 --single-transaction
查看>>
mysqldump的一些用法
查看>>
mysqli
查看>>
MySQLIntegrityConstraintViolationException异常处理
查看>>
mysqlreport分析工具详解
查看>>