// Loop until we receive a shutdown command while (running) {
// 省略代码
try { //if we have reached max connections, wait countUpOrAwaitConnection();
SocketChannel socket = null; try { // Accept the next incoming connection from the server // socket // 当有请求的时候,首先在这里打断点,这里会接受到请求 socket = serverSock.accept(); } catch (IOException ioe) { // We didn't get a socket countDownConnection(); if (running) { // Introduce delay if necessary errorDelay = handleExceptionWithDelay(errorDelay); // re-throw throw ioe; } else { break; } } // Successful accept, reset the error delay errorDelay = 0;
// Configure the socket if (running && !paused) { // setSocketOptions() will hand the socket off to // an appropriate processor if successful // 放到队列里,所以accept只是用来做接收请求的功能 if (!setSocketOptions(socket)) { closeSocket(socket); } } else { closeSocket(socket); } } catch (Throwable t) { ExceptionUtils.handleThrowable(t); log.error(sm.getString("endpoint.accept.fail"), t); } } state = AcceptorState.ENDED; } // 省略代码 }
protectedbooleansetSocketOptions(SocketChannel socket){ // Process the connection try { //disable blocking, APR style, we are gonna be polling it socket.configureBlocking(false); Socket sock = socket.socket(); socketProperties.setProperties(sock);
// 构造对象 NioChannel channel = nioChannels.pop(); if (channel == null) { SocketBufferHandler bufhandler = new SocketBufferHandler( socketProperties.getAppReadBufSize(), socketProperties.getAppWriteBufSize(), socketProperties.getDirectBuffer()); if (isSSLEnabled()) { channel = new SecureNioChannel(socket, bufhandler, selectorPool, this); } else { channel = new NioChannel(socket, bufhandler); } } else { channel.setIOChannel(socket); channel.reset(); } // 创建的对象放到队列里 getPoller0().register(channel); } catch (Throwable t) { ExceptionUtils.handleThrowable(t); try { log.error("",t); } catch (Throwable tt) { ExceptionUtils.handleThrowable(tt); } // Tell to close the socket returnfalse; } returntrue; }
try { if (key != null) { if (socket.isHandshakeComplete()) { // No TLS handshaking required. Let the handler // process this socket / event combination. handshake = 0; } elseif (event == SocketEvent.STOP || event == SocketEvent.DISCONNECT || event == SocketEvent.ERROR) { // Unable to complete the TLS handshake. Treat it as // if the handshake failed. handshake = -1; } else { handshake = socket.handshake(key.isReadable(), key.isWritable()); // The handshake process reads/writes from/to the // socket. status may therefore be OPEN_WRITE once // the handshake completes. However, the handshake // happens when the socket is opened so the status // must always be OPEN_READ after it completes. It // is OK to always set this as it is only used if // the handshake completes. event = SocketEvent.OPEN_READ; } } } catch (IOException x) { handshake = -1; if (log.isDebugEnabled()) log.debug("Error during SSL handshake",x); } catch (CancelledKeyException ckx) { handshake = -1; } if (handshake == 0) { SocketState state = SocketState.OPEN; // Process the request from this socket if (event == null) { state = getHandler().process(socketWrapper, SocketEvent.OPEN_READ); } else { // 调用这里 state = getHandler().process(socketWrapper, event); } if (state == SocketState.CLOSED) { close(socket, key); } } elseif (handshake == -1 ) { close(socket, key); } elseif (handshake == SelectionKey.OP_READ){ socketWrapper.registerReadInterest(); } elseif (handshake == SelectionKey.OP_WRITE){ socketWrapper.registerWriteInterest(); } } catch (CancelledKeyException cx) { socket.getPoller().cancelledKey(key); } catch (VirtualMachineError vme) { ExceptionUtils.handleThrowable(vme); } catch (Throwable t) { log.error("", t); socket.getPoller().cancelledKey(key); } finally { socketWrapper = null; event = null; //return to cache if (running && !paused) { processorCache.push(this); } } }
@Override public SocketState process(SocketWrapperBase<?> socketWrapper, SocketEvent status) throws IOException {
SocketState state = SocketState.CLOSED; Iterator<DispatchType> dispatches = null; do { if (dispatches != null) { DispatchType nextDispatch = dispatches.next(); state = dispatch(nextDispatch.getSocketStatus()); } elseif (status == SocketEvent.DISCONNECT) { // Do nothing here, just wait for it to get recycled } elseif (isAsync() || isUpgrade() || state == SocketState.ASYNC_END) { state = dispatch(status); if (state == SocketState.OPEN) { // There may be pipe-lined data to read. If the data isn't // processed now, execution will exit this loop and call // release() which will recycle the processor (and input // buffer) deleting any pipe-lined data. To avoid this, // process it now. // 调用servie state = service(socketWrapper); } } elseif (status == SocketEvent.OPEN_WRITE) { // Extra write event likely after async, ignore state = SocketState.LONG; } elseif (status == SocketEvent.OPEN_READ){ state = service(socketWrapper); } else { // Default to closing the socket if the SocketEvent passed in // is not consistent with the current state of the Processor state = SocketState.CLOSED; }
if (getLog().isDebugEnabled()) { getLog().debug("Socket: [" + socketWrapper + "], Status in: [" + status + "], State out: [" + state + "]"); }
if (state != SocketState.CLOSED && isAsync()) { state = asyncPostProcess(); if (getLog().isDebugEnabled()) { getLog().debug("Socket: [" + socketWrapper + "], State after async post processing: [" + state + "]"); } }
if (dispatches == null || !dispatches.hasNext()) { // Only returns non-null iterator if there are // dispatches to process. dispatches = getIteratorAndClearDispatches(); } } while (state == SocketState.ASYNC_END || dispatches != null && state != SocketState.CLOSED);
// Select the Host to be used for this Request Host host = request.getHost(); if (host == null) { response.sendError (HttpServletResponse.SC_BAD_REQUEST, sm.getString("standardEngine.noHost", request.getServerName())); return; } if (request.isAsyncSupported()) { request.setAsyncSupported(host.getPipeline().isAsyncSupported()); }
// Ask this Host to process this request host.getPipeline().getFirst().invoke(request, response);
// Ask this Context to process this request. Requests that are // already in error must have been routed here to check for // application defined error pages so DO NOT forward them to the the // application for processing. try { if (!response.isErrorReportRequired()) { // 调用Context责任链 context.getPipeline().getFirst().invoke(request, response); } } catch (Throwable t) { }
// Initialize local variables we may need boolean unavailable = false; Throwable throwable = null; // This should be a Request attribute... long t1=System.currentTimeMillis(); requestCount.incrementAndGet(); StandardWrapper wrapper = (StandardWrapper) getContainer(); // 终于看到Servlet了 Servlet servlet = null; Context context = (Context) wrapper.getParent();
// Check for the application being marked unavailable if (!context.getState().isAvailable()) { response.sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE, sm.getString("standardContext.isUnavailable")); unavailable = true; }
// Check for the servlet being marked unavailable if (!unavailable && wrapper.isUnavailable()) { container.getLogger().info(sm.getString("standardWrapper.isUnavailable", wrapper.getName())); long available = wrapper.getAvailable(); if ((available > 0L) && (available < Long.MAX_VALUE)) { response.setDateHeader("Retry-After", available); response.sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE, sm.getString("standardWrapper.isUnavailable", wrapper.getName())); } elseif (available == Long.MAX_VALUE) { response.sendError(HttpServletResponse.SC_NOT_FOUND, sm.getString("standardWrapper.notFound", wrapper.getName())); } unavailable = true; }
// Allocate a servlet instance to process this request try { if (!unavailable) { // 把当前请求的Servlet赋值 servlet = wrapper.allocate(); } } // Create the filter chain for this request // 创建过滤器责任链 ApplicationFilterChain filterChain = ApplicationFilterFactory.createFilterChain(request, wrapper, servlet); // 调用责任链 filterChain.doFilter(request.getRequest(),response.getResponse()); }
// Call the next filter if there is one // 开始循环调用过滤器 if (pos < n) { ApplicationFilterConfig filterConfig = filters[pos++]; try { Filter filter = filterConfig.getFilter();
if (request.isAsyncSupported() && "false".equalsIgnoreCase( filterConfig.getFilterDef().getAsyncSupported())) { request.setAttribute(Globals.ASYNC_SUPPORTED_ATTR, Boolean.FALSE); } if( Globals.IS_SECURITY_ENABLED ) { final ServletRequest req = request; final ServletResponse res = response; Principal principal = ((HttpServletRequest) req).getUserPrincipal();
// We fell off the end of the chain -- call the servlet instance try { if (ApplicationDispatcher.WRAP_SAME_OBJECT) { lastServicedRequest.set(request); lastServicedResponse.set(response); }
if (request.isAsyncSupported() && !servletSupportsAsync) { request.setAttribute(Globals.ASYNC_SUPPORTED_ATTR, Boolean.FALSE); } // Use potentially wrapped request from this point if ((request instanceof HttpServletRequest) && (response instanceof HttpServletResponse) && Globals.IS_SECURITY_ENABLED ) { final ServletRequest req = request; final ServletResponse res = response; Principal principal = ((HttpServletRequest) req).getUserPrincipal(); Object[] args = new Object[]{req, res}; SecurityUtil.doAsPrivilege("service", servlet, classTypeUsedInService, args, principal); } else { // 执行完过滤器,就开始要调用HttpServlet的方法 servlet.service(request, response); } } catch (IOException | ServletException | RuntimeException e) { throw e; } catch (Throwable e) { e = ExceptionUtils.unwrapInvocationTargetException(e); ExceptionUtils.handleThrowable(e); thrownew ServletException(sm.getString("filterChain.servlet"), e); } finally { if (ApplicationDispatcher.WRAP_SAME_OBJECT) { lastServicedRequest.set(null); lastServicedResponse.set(null); } } }
if (method.equals(METHOD_GET)) { long lastModified = getLastModified(req); if (lastModified == -1) { // servlet doesn't support if-modified-since, no reason // to go through further expensive logic // 这里就开始调用我们自己写的Servlet的方法了 doGet(req, resp); } else {