Oh crud, this must be some EOL issue again. Tips? Gary
---------- Forwarded message ---------- From: <[email protected]> Date: Wed, May 10, 2017 at 6:52 PM Subject: httpcomponents-core git commit: Remove unused ctor from this example. To: [email protected] Repository: httpcomponents-core Updated Branches: refs/heads/4.4.x 5b29a6e4a -> 0be867829 Remove unused ctor from this example. Project: http://git-wip-us.apache.org/repos/asf/httpcomponents-core/repo Commit: http://git-wip-us.apache.org/repos/asf/httpcomponents-core/ commit/0be86782 Tree: http://git-wip-us.apache.org/repos/asf/httpcomponents-core/ tree/0be86782 Diff: http://git-wip-us.apache.org/repos/asf/httpcomponents-core/ diff/0be86782 Branch: refs/heads/4.4.x Commit: 0be8678299f1cb7c9da34e2db90b737f13d40048 Parents: 5b29a6e Author: Gary Gregory <[email protected]> Authored: Wed May 10 18:52:13 2017 -0700 Committer: Gary Gregory <[email protected]> Committed: Wed May 10 18:52:13 2017 -0700 ---------------------------------------------------------------------- .../http/examples/nio/NHttpReverseProxy.java | 1794 +++++++++--------- 1 file changed, 894 insertions(+), 900 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/httpcomponents-core/ blob/0be86782/httpcore-nio/src/examples/org/apache/http/examples/nio/ NHttpReverseProxy.java ---------------------------------------------------------------------- diff --git a/httpcore-nio/src/examples/org/apache/http/examples/nio/NHttpReverseProxy.java b/httpcore-nio/src/examples/org/apache/http/examples/nio/ NHttpReverseProxy.java index 82dfa23..632bd2c 100644 --- a/httpcore-nio/src/examples/org/apache/http/examples/nio/ NHttpReverseProxy.java +++ b/httpcore-nio/src/examples/org/apache/http/examples/nio/ NHttpReverseProxy.java @@ -1,900 +1,894 @@ -/* - * ==================================================================== - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * ==================================================================== - * - * This software consists of voluntary contributions made by many - * individuals on behalf of the Apache Software Foundation. For more - * information on the Apache Software Foundation, please see - * <http://www.apache.org/>. - * - */ -package org.apache.http.examples.nio; - -import java.io.IOException; -import java.io.InterruptedIOException; -import java.net.InetSocketAddress; -import java.net.URI; -import java.nio.ByteBuffer; -import java.security.cert.CertificateException; -import java.security.cert.X509Certificate; -import java.util.Locale; -import java.util.concurrent.atomic.AtomicLong; - -import javax.net.ssl.SSLContext; - -import org.apache.http.ConnectionReuseStrategy; -import org.apache.http.HttpEntityEnclosingRequest; -import org.apache.http.HttpException; -import org.apache.http.HttpHost; -import org.apache.http.HttpRequest; -import org.apache.http.HttpRequestInterceptor; -import org.apache.http.HttpResponse; -import org.apache.http.HttpResponseInterceptor; -import org.apache.http.HttpStatus; -import org.apache.http.HttpVersion; -import org.apache.http.config.ConnectionConfig; -import org.apache.http.entity.ContentType; -import org.apache.http.impl.DefaultConnectionReuseStrategy; -import org.apache.http.impl.EnglishReasonPhraseCatalog; -import org.apache.http.impl.nio.DefaultHttpClientIODispatch; -import org.apache.http.impl.nio.DefaultHttpServerIODispatch; -import org.apache.http.impl.nio.DefaultNHttpClientConnectionFactory; -import org.apache.http.impl.nio.SSLNHttpClientConnectionFactory; -import org.apache.http.impl.nio.pool.BasicNIOConnFactory; -import org.apache.http.impl.nio.pool.BasicNIOConnPool; -import org.apache.http.impl.nio.pool.BasicNIOPoolEntry; -import org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor; -import org.apache.http.impl.nio.reactor.DefaultListeningIOReactor; -import org.apache.http.impl.nio.reactor.IOReactorConfig; -import org.apache.http.message.BasicHttpEntityEnclosingRequest; -import org.apache.http.message.BasicHttpRequest; -import org.apache.http.message.BasicHttpResponse; -import org.apache.http.nio.ContentDecoder; -import org.apache.http.nio.ContentEncoder; -import org.apache.http.nio.IOControl; -import org.apache.http.nio.NHttpClientConnection; -import org.apache.http.nio.NHttpConnection; -import org.apache.http.nio.NHttpServerConnection; -import org.apache.http.nio.entity.NStringEntity; -import org.apache.http.nio.pool.NIOConnFactory; -import org.apache.http.nio.protocol.BasicAsyncResponseProducer; -import org.apache.http.nio.protocol.HttpAsyncExchange; -import org.apache.http.nio.protocol.HttpAsyncRequestConsumer; -import org.apache.http.nio.protocol.HttpAsyncRequestExecutor; -import org.apache.http.nio.protocol.HttpAsyncRequestHandler; -import org.apache.http.nio.protocol.HttpAsyncRequestHandlerMapper; -import org.apache.http.nio.protocol.HttpAsyncRequestProducer; -import org.apache.http.nio.protocol.HttpAsyncRequester; -import org.apache.http.nio.protocol.HttpAsyncResponseConsumer; -import org.apache.http.nio.protocol.HttpAsyncResponseProducer; -import org.apache.http.nio.protocol.HttpAsyncService; -import org.apache.http.nio.protocol.UriHttpAsyncRequestHandlerMapper; -import org.apache.http.nio.reactor.ConnectingIOReactor; -import org.apache.http.nio.reactor.IOEventDispatch; -import org.apache.http.nio.reactor.ListeningIOReactor; -import org.apache.http.pool.PoolStats; -import org.apache.http.protocol.HttpContext; -import org.apache.http.protocol.HttpCoreContext; -import org.apache.http.protocol.HttpProcessor; -import org.apache.http.protocol.ImmutableHttpProcessor; -import org.apache.http.protocol.RequestConnControl; -import org.apache.http.protocol.RequestContent; -import org.apache.http.protocol.RequestExpectContinue; -import org.apache.http.protocol.RequestTargetHost; -import org.apache.http.protocol.RequestUserAgent; -import org.apache.http.protocol.ResponseConnControl; -import org.apache.http.protocol.ResponseContent; -import org.apache.http.protocol.ResponseDate; -import org.apache.http.protocol.ResponseServer; -import org.apache.http.ssl.SSLContextBuilder; -import org.apache.http.ssl.TrustStrategy; - -/** - * Asynchronous, fully streaming HTTP/1.1 reverse proxy. - * <p> - * Supports SSL to origin servers which use self-signed certificates. - * </p> - */ -public class NHttpReverseProxy { - - public static void main(String[] args) throws Exception { - if (args.length < 2) { - System.out.println("Usage: NHttpReverseProxy <HostNameURI> <Port> [\"TrustSelfSignedStrategy\"]"); - System.exit(1); - } - // Extract command line arguments - URI uri = new URI(args[0]); - int port = Integer.parseInt(args[1]); - SSLContext sslContext = null; - if (args.length > 2 && args[2].equals("TrustSelfSignedStrategy")) { - System.out.println("Using TrustSelfSignedStrategy (not for production)"); - sslContext = SSLContextBuilder.create().loadTrustMaterial(new TrustStrategy() { - - @Override - public boolean isTrusted( - final X509Certificate[] chain, final String authType) throws CertificateException { - return chain.length == 1; - } - - }).build(); - } - - // Target host - HttpHost targetHost = new HttpHost( - uri.getHost(), - uri.getPort() > 0 ? uri.getPort() : 80, - uri.getScheme() != null ? uri.getScheme() : "http"); - - System.out.println("Reverse proxy to " + targetHost); - - IOReactorConfig config = IOReactorConfig.custom() - .setIoThreadCount(1) - .setSoTimeout(3000) - .setConnectTimeout(3000) - .build(); - final ConnectingIOReactor connectingIOReactor = new DefaultConnectingIOReactor(config); - final ListeningIOReactor listeningIOReactor = new DefaultListeningIOReactor(config); - - // Set up HTTP protocol processor for incoming connections - HttpProcessor inhttpproc = new ImmutableHttpProcessor( - new HttpResponseInterceptor[] { - new ResponseDate(), - new ResponseServer("Test/1.1"), - new ResponseContent(), - new ResponseConnControl() - }); - - // Set up HTTP protocol processor for outgoing connections - HttpProcessor outhttpproc = new ImmutableHttpProcessor( - new HttpRequestInterceptor[] { - new RequestContent(), - new RequestTargetHost(), - new RequestConnControl(), - new RequestUserAgent("Test/1.1"), - new RequestExpectContinue(true) - }); - - ProxyClientProtocolHandler clientHandler = new ProxyClientProtocolHandler(); - HttpAsyncRequester executor = new HttpAsyncRequester( - outhttpproc, new ProxyOutgoingConnectionReuseStrategy()); - - // Without SSL: ProxyConnPool connPool = new ProxyConnPool(connectingIOReactor, ConnectionConfig.DEFAULT); - ProxyConnPool connPool = new ProxyConnPool(connectingIOReactor, - new BasicNIOConnFactory(new DefaultNHttpClientConnectionFa ctory(ConnectionConfig.DEFAULT), - new SSLNHttpClientConnectionFactory(sslContext, null, ConnectionConfig.DEFAULT)), - 0); - connPool.setMaxTotal(100); - connPool.setDefaultMaxPerRoute(20); - - UriHttpAsyncRequestHandlerMapper handlerRegistry = new UriHttpAsyncRequestHandlerMapper(); - handlerRegistry.register("*", new ProxyRequestHandler(targetHost, executor, connPool)); - - ProxyServiceHandler serviceHandler = new ProxyServiceHandler( - inhttpproc, - new ProxyIncomingConnectionReuseStrategy(), - handlerRegistry); - - final IOEventDispatch connectingEventDispatch = DefaultHttpClientIODispatch.create( - clientHandler, sslContext, ConnectionConfig.DEFAULT); - - final IOEventDispatch listeningEventDispatch = new DefaultHttpServerIODispatch( - serviceHandler, ConnectionConfig.DEFAULT); - - Thread t = new Thread(new Runnable() { - - public void run() { - try { - connectingIOReactor.execute(connectingEventDispatch); - } catch (InterruptedIOException ex) { - System.err.println("Interrupted"); - } catch (IOException ex) { - ex.printStackTrace(); - } finally { - try { - listeningIOReactor.shutdown(); - } catch (IOException ex2) { - ex2.printStackTrace(); - } - } - } - - }); - t.start(); - try { - listeningIOReactor.listen(new InetSocketAddress(port)); - listeningIOReactor.execute(listeningEventDispatch); - } catch (InterruptedIOException ex) { - System.err.println("Interrupted"); - } catch (IOException ex) { - ex.printStackTrace(); - } finally { - try { - connectingIOReactor.shutdown(); - } catch (IOException ex2) { - ex2.printStackTrace(); - } - } - } - - static class ProxyHttpExchange { - - private final ByteBuffer inBuffer; - private final ByteBuffer outBuffer; - - private volatile String id; - private volatile HttpHost target; - private volatile HttpAsyncExchange responseTrigger; - private volatile IOControl originIOControl; - private volatile IOControl clientIOControl; - private volatile HttpRequest request; - private volatile boolean requestReceived; - private volatile HttpResponse response; - private volatile boolean responseReceived; - private volatile Exception ex; - - public ProxyHttpExchange() { - super(); - this.inBuffer = ByteBuffer.allocateDirect(10240); - this.outBuffer = ByteBuffer.allocateDirect(10240); - } - - public ByteBuffer getInBuffer() { - return this.inBuffer; - } - - public ByteBuffer getOutBuffer() { - return this.outBuffer; - } - - public String getId() { - return this.id; - } - - public void setId(final String id) { - this.id = id; - } - - public HttpHost getTarget() { - return this.target; - } - - public void setTarget(final HttpHost target) { - this.target = target; - } - - public HttpRequest getRequest() { - return this.request; - } - - public void setRequest(final HttpRequest request) { - this.request = request; - } - - public HttpResponse getResponse() { - return this.response; - } - - public void setResponse(final HttpResponse response) { - this.response = response; - } - - public HttpAsyncExchange getResponseTrigger() { - return this.responseTrigger; - } - - public void setResponseTrigger(final HttpAsyncExchange responseTrigger) { - this.responseTrigger = responseTrigger; - } - - public IOControl getClientIOControl() { - return this.clientIOControl; - } - - public void setClientIOControl(final IOControl clientIOControl) { - this.clientIOControl = clientIOControl; - } - - public IOControl getOriginIOControl() { - return this.originIOControl; - } - - public void setOriginIOControl(final IOControl originIOControl) { - this.originIOControl = originIOControl; - } - - public boolean isRequestReceived() { - return this.requestReceived; - } - - public void setRequestReceived() { - this.requestReceived = true; - } - - public boolean isResponseReceived() { - return this.responseReceived; - } - - public void setResponseReceived() { - this.responseReceived = true; - } - - public Exception getException() { - return this.ex; - } - - public void setException(final Exception ex) { - this.ex = ex; - } - - public void reset() { - this.inBuffer.clear(); - this.outBuffer.clear(); - this.target = null; - this.id = null; - this.responseTrigger = null; - this.clientIOControl = null; - this.originIOControl = null; - this.request = null; - this.requestReceived = false; - this.response = null; - this.responseReceived = false; - this.ex = null; - } - - } - - static class ProxyRequestHandler implements HttpAsyncRequestHandler<ProxyHttpExchange> { - - private final HttpHost target; - private final HttpAsyncRequester executor; - private final BasicNIOConnPool connPool; - private final AtomicLong counter; - - public ProxyRequestHandler( - final HttpHost target, - final HttpAsyncRequester executor, - final BasicNIOConnPool connPool) { - super(); - this.target = target; - this.executor = executor; - this.connPool = connPool; - this.counter = new AtomicLong(1); - } - - public HttpAsyncRequestConsumer<ProxyHttpExchange> processRequest( - final HttpRequest request, - final HttpContext context) { - ProxyHttpExchange httpExchange = (ProxyHttpExchange) context.getAttribute("http-exchange"); - if (httpExchange == null) { - httpExchange = new ProxyHttpExchange(); - context.setAttribute("http-exchange", httpExchange); - } - synchronized (httpExchange) { - httpExchange.reset(); - String id = String.format("%08X", this.counter.getAndIncrement()); - httpExchange.setId(id); - httpExchange.setTarget(this.target); - return new ProxyRequestConsumer(httpExchange, this.executor, this.connPool); - } - } - - public void handle( - final ProxyHttpExchange httpExchange, - final HttpAsyncExchange responseTrigger, - final HttpContext context) throws HttpException, IOException { - synchronized (httpExchange) { - Exception ex = httpExchange.getException(); - if (ex != null) { - System.out.println("[client<-proxy] " + httpExchange.getId() + " " + ex); - int status = HttpStatus.SC_INTERNAL_SERVER_ERROR; - HttpResponse response = new BasicHttpResponse(HttpVersion.HTTP_1_0, status, - EnglishReasonPhraseCatalog.INSTANCE.getReason(status, Locale.US)); - String message = ex.getMessage(); - if (message == null) { - message = "Unexpected error"; - } - response.setEntity(new NStringEntity(message, ContentType.DEFAULT_TEXT)); - responseTrigger.submitResponse(new BasicAsyncResponseProducer(response)); - System.out.println("[client<-proxy] " + httpExchange.getId() + " error response triggered"); - } - HttpResponse response = httpExchange.getResponse(); - if (response != null) { - responseTrigger.submitResponse(new ProxyResponseProducer(httpExchange)); - System.out.println("[client<-proxy] " + httpExchange.getId() + " response triggered"); - } - // No response yet. - httpExchange.setResponseTrigger(responseTrigger); - } - } - - } - - static class ProxyRequestConsumer implements HttpAsyncRequestConsumer<ProxyHttpExchange> { - - private final ProxyHttpExchange httpExchange; - private final HttpAsyncRequester executor; - private final BasicNIOConnPool connPool; - - private volatile boolean completed; - - public ProxyRequestConsumer( - final ProxyHttpExchange httpExchange, - final HttpAsyncRequester executor, - final BasicNIOConnPool connPool) { - super(); - this.httpExchange = httpExchange; - this.executor = executor; - this.connPool = connPool; - } - - public void close() throws IOException { - } - - public void requestReceived(final HttpRequest request) { - synchronized (this.httpExchange) { - System.out.println("[client->proxy] " + this.httpExchange.getId() + " " + request.getRequestLine()); - this.httpExchange.setRequest(request); - this.executor.execute( - new ProxyRequestProducer(this.httpExchange), - new ProxyResponseConsumer(this.httpExchange), - this.connPool); - } - } - - public void consumeContent( - final ContentDecoder decoder, final IOControl ioctrl) throws IOException { - synchronized (this.httpExchange) { - this.httpExchange.setClientIOControl(ioctrl); - // Receive data from the client - ByteBuffer buf = this.httpExchange.getInBuffer(); - int n = decoder.read(buf); - System.out.println("[client->proxy] " + this.httpExchange.getId() + " " + n + " bytes read"); - if (decoder.isCompleted()) { - System.out.println("[client->proxy] " + this.httpExchange.getId() + " content fully read"); - } - // If the buffer is full, suspend client input until there is free - // space in the buffer - if (!buf.hasRemaining()) { - ioctrl.suspendInput(); - System.out.println("[client->proxy] " + this.httpExchange.getId() + " suspend client input"); - } - // If there is some content in the input buffer make sure origin - // output is active - if (buf.position() > 0) { - if (this.httpExchange.getOriginIOControl() != null) { - this.httpExchange.getOriginIOControl(). requestOutput(); - System.out.println("[client->proxy] " + this.httpExchange.getId() + " request origin output"); - } - } - } - } - - public void requestCompleted(final HttpContext context) { - synchronized (this.httpExchange) { - this.completed = true;; - System.out.println("[client->proxy] " + this.httpExchange.getId() + " request completed"); - this.httpExchange.setRequestReceived(); - if (this.httpExchange.getOriginIOControl() != null) { - this.httpExchange.getOriginIOControl().requestOutput(); - System.out.println("[client->proxy] " + this.httpExchange.getId() + " request origin output"); - } - } - } - - public Exception getException() { - return null; - } - - public ProxyHttpExchange getResult() { - return this.httpExchange; - } - - public boolean isDone() { - return this.completed; - } - - public void failed(final Exception ex) { - System.out.println("[client->proxy] " + ex.toString()); - } - - } - - static class ProxyRequestProducer implements HttpAsyncRequestProducer { - - private final ProxyHttpExchange httpExchange; - - public ProxyRequestProducer(final ProxyHttpExchange httpExchange) { - super(); - this.httpExchange = httpExchange; - } - - public void close() throws IOException { - } - - public HttpHost getTarget() { - synchronized (this.httpExchange) { - return this.httpExchange.getTarget(); - } - } - - public HttpRequest generateRequest() { - synchronized (this.httpExchange) { - HttpRequest request = this.httpExchange.getRequest(); - System.out.println("[proxy->origin] " + this.httpExchange.getId() + " " + request.getRequestLine()); - // Rewrite request!!!! - if (request instanceof HttpEntityEnclosingRequest) { - BasicHttpEntityEnclosingRequest r = new BasicHttpEntityEnclosingRequest( - request.getRequestLine()); - r.setEntity(((HttpEntityEnclosingRequest) request).getEntity()); - return r; - } else { - return new BasicHttpRequest(request.getRequestLine()); - } - } - } - - public void produceContent( - final ContentEncoder encoder, final IOControl ioctrl) throws IOException { - synchronized (this.httpExchange) { - this.httpExchange.setOriginIOControl(ioctrl); - // Send data to the origin server - ByteBuffer buf = this.httpExchange.getInBuffer(); - buf.flip(); - int n = encoder.write(buf); - buf.compact(); - System.out.println("[proxy->origin] " + this.httpExchange.getId() + " " + n + " bytes written"); - // If there is space in the buffer and the message has not been - // transferred, make sure the client is sending more data - if (buf.hasRemaining() && !this.httpExchange.isRequestReceived()) { - if (this.httpExchange.getClientIOControl() != null) { - this.httpExchange.getClientIOControl(). requestInput(); - System.out.println("[proxy->origin] " + this.httpExchange.getId() + " request client input"); - } - } - if (buf.position() == 0) { - if (this.httpExchange.isRequestReceived()) { - encoder.complete(); - System.out.println("[proxy->origin] " + this.httpExchange.getId() + " content fully written"); - } else { - // Input buffer is empty. Wait until the client fills up - // the buffer - ioctrl.suspendOutput(); - System.out.println("[proxy->origin] " + this.httpExchange.getId() + " suspend origin output"); - } - } - } - } - - public void requestCompleted(final HttpContext context) { - synchronized (this.httpExchange) { - System.out.println("[proxy->origin] " + this.httpExchange.getId() + " request completed"); - } - } - - public boolean isRepeatable() { - return false; - } - - public void resetRequest() { - } - - public void failed(final Exception ex) { - System.out.println("[proxy->origin] " + ex.toString()); - } - - } - - static class ProxyResponseConsumer implements HttpAsyncResponseConsumer<ProxyHttpExchange> { - - private final ProxyHttpExchange httpExchange; - - private volatile boolean completed; - - public ProxyResponseConsumer(final ProxyHttpExchange httpExchange) { - super(); - this.httpExchange = httpExchange; - } - - public void close() throws IOException { - } - - public void responseReceived(final HttpResponse response) { - synchronized (this.httpExchange) { - System.out.println("[proxy<-origin] " + this.httpExchange.getId() + " " + response.getStatusLine()); - this.httpExchange.setResponse(response); - HttpAsyncExchange responseTrigger = this.httpExchange. getResponseTrigger(); - if (responseTrigger != null && !responseTrigger.isCompleted()) { - System.out.println("[client<-proxy] " + this.httpExchange.getId() + " response triggered"); - responseTrigger.submitResponse(new ProxyResponseProducer(this.httpExchange)); - } - } - } - - public void consumeContent( - final ContentDecoder decoder, final IOControl ioctrl) throws IOException { - synchronized (this.httpExchange) { - this.httpExchange.setOriginIOControl(ioctrl); - // Receive data from the origin - ByteBuffer buf = this.httpExchange.getOutBuffer(); - int n = decoder.read(buf); - System.out.println("[proxy<-origin] " + this.httpExchange.getId() + " " + n + " bytes read"); - if (decoder.isCompleted()) { - System.out.println("[proxy<-origin] " + this.httpExchange.getId() + " content fully read"); - } - // If the buffer is full, suspend origin input until there is free - // space in the buffer - if (!buf.hasRemaining()) { - ioctrl.suspendInput(); - System.out.println("[proxy<-origin] " + this.httpExchange.getId() + " suspend origin input"); - } - // If there is some content in the input buffer make sure client - // output is active - if (buf.position() > 0) { - if (this.httpExchange.getClientIOControl() != null) { - this.httpExchange.getClientIOControl(). requestOutput(); - System.out.println("[proxy<-origin] " + this.httpExchange.getId() + " request client output"); - } - } - } - } - - public void responseCompleted(final HttpContext context) { - synchronized (this.httpExchange) { - if (this.completed) { - return; - } - this.completed = true; - System.out.println("[proxy<-origin] " + this.httpExchange.getId() + " response completed"); - this.httpExchange.setResponseReceived(); - if (this.httpExchange.getClientIOControl() != null) { - this.httpExchange.getClientIOControl().requestOutput(); - System.out.println("[proxy<-origin] " + this.httpExchange.getId() + " request client output"); - } - } - } - - public void failed(final Exception ex) { - synchronized (this.httpExchange) { - if (this.completed) { - return; - } - this.completed = true; - this.httpExchange.setException(ex); - HttpAsyncExchange responseTrigger = this.httpExchange. getResponseTrigger(); - if (responseTrigger != null && !responseTrigger.isCompleted()) { - System.out.println("[client<-proxy] " + this.httpExchange.getId() + " " + ex); - int status = HttpStatus.SC_INTERNAL_SERVER_ERROR; - HttpResponse response = new BasicHttpResponse(HttpVersion.HTTP_1_0, status, - EnglishReasonPhraseCatalog.INSTANCE.getReason(status, Locale.US)); - String message = ex.getMessage(); - if (message == null) { - message = "Unexpected error"; - } - response.setEntity(new NStringEntity(message, ContentType.DEFAULT_TEXT)); - responseTrigger.submitResponse(new BasicAsyncResponseProducer(response)); - } - } - } - - public boolean cancel() { - synchronized (this.httpExchange) { - if (this.completed) { - return false; - } - failed(new InterruptedIOException("Cancelled")); - return true; - } - } - - public ProxyHttpExchange getResult() { - return this.httpExchange; - } - - public Exception getException() { - return null; - } - - public boolean isDone() { - return this.completed; - } - - } - - static class ProxyResponseProducer implements HttpAsyncResponseProducer { - - private final ProxyHttpExchange httpExchange; - - public ProxyResponseProducer(final ProxyHttpExchange httpExchange) { - super(); - this.httpExchange = httpExchange; - } - - public void close() throws IOException { - this.httpExchange.reset(); - } - - public HttpResponse generateResponse() { - synchronized (this.httpExchange) { - HttpResponse response = this.httpExchange.getResponse(); - System.out.println("[client<-proxy] " + this.httpExchange.getId() + " " + response.getStatusLine()); - // Rewrite response!!!! - BasicHttpResponse r = new BasicHttpResponse(response. getStatusLine()); - r.setEntity(response.getEntity()); - return r; - } - } - - public void produceContent( - final ContentEncoder encoder, final IOControl ioctrl) throws IOException { - synchronized (this.httpExchange) { - this.httpExchange.setClientIOControl(ioctrl); - // Send data to the client - ByteBuffer buf = this.httpExchange.getOutBuffer(); - buf.flip(); - int n = encoder.write(buf); - buf.compact(); - System.out.println("[client<-proxy] " + this.httpExchange.getId() + " " + n + " bytes written"); - // If there is space in the buffer and the message has not been - // transferred, make sure the origin is sending more data - if (buf.hasRemaining() && !this.httpExchange.isResponseReceived()) { - if (this.httpExchange.getOriginIOControl() != null) { - this.httpExchange.getOriginIOControl(). requestInput(); - System.out.println("[client<-proxy] " + this.httpExchange.getId() + " request origin input"); - } - } - if (buf.position() == 0) { - if (this.httpExchange.isResponseReceived()) { - encoder.complete(); - System.out.println("[client<-proxy] " + this.httpExchange.getId() + " content fully written"); - } else { - // Input buffer is empty. Wait until the origin fills up - // the buffer - ioctrl.suspendOutput(); - System.out.println("[client<-proxy] " + this.httpExchange.getId() + " suspend client output"); - } - } - } - } - - public void responseCompleted(final HttpContext context) { - synchronized (this.httpExchange) { - System.out.println("[client<-proxy] " + this.httpExchange.getId() + " response completed"); - } - } - - public void failed(final Exception ex) { - System.out.println("[client<-proxy] " + ex.toString()); - } - - } - - static class ProxyIncomingConnectionReuseStrategy extends DefaultConnectionReuseStrategy { - - @Override - public boolean keepAlive(final HttpResponse response, final HttpContext context) { - NHttpConnection conn = (NHttpConnection) context.getAttribute( - HttpCoreContext.HTTP_CONNECTION); - boolean keepAlive = super.keepAlive(response, context); - if (keepAlive) { - System.out.println("[client->proxy] connection kept alive " + conn); - } - return keepAlive; - } - - }; - - static class ProxyOutgoingConnectionReuseStrategy extends DefaultConnectionReuseStrategy { - - @Override - public boolean keepAlive(final HttpResponse response, final HttpContext context) { - NHttpConnection conn = (NHttpConnection) context.getAttribute( - HttpCoreContext.HTTP_CONNECTION); - boolean keepAlive = super.keepAlive(response, context); - if (keepAlive) { - System.out.println("[proxy->origin] connection kept alive " + conn); - } - return keepAlive; - } - - }; - - static class ProxyServiceHandler extends HttpAsyncService { - - public ProxyServiceHandler( - final HttpProcessor httpProcessor, - final ConnectionReuseStrategy reuseStrategy, - final HttpAsyncRequestHandlerMapper handlerResolver) { - super(httpProcessor, reuseStrategy, null, handlerResolver, null); - } - - @Override - protected void log(final Exception ex) { - ex.printStackTrace(); - } - - @Override - public void connected(final NHttpServerConnection conn) { - System.out.println("[client->proxy] connection open " + conn); - super.connected(conn); - } - - @Override - public void closed(final NHttpServerConnection conn) { - System.out.println("[client->proxy] connection closed " + conn); - super.closed(conn); - } - - } - - static class ProxyClientProtocolHandler extends HttpAsyncRequestExecutor { - - public ProxyClientProtocolHandler() { - super(); - } - - @Override - protected void log(final Exception ex) { - ex.printStackTrace(); - } - - @Override - public void connected(final NHttpClientConnection conn, - final Object attachment) throws IOException, HttpException { - System.out.println("[proxy->origin] connection open " + conn); - super.connected(conn, attachment); - } - - @Override - public void closed(final NHttpClientConnection conn) { - System.out.println("[proxy->origin] connection closed " + conn); - super.closed(conn); - } - - } - - static class ProxyConnPool extends BasicNIOConnPool { - - public ProxyConnPool( - final ConnectingIOReactor ioreactor, - final ConnectionConfig config) { - super(ioreactor, config); - } - - public ProxyConnPool( - final ConnectingIOReactor ioreactor, - final NIOConnFactory<HttpHost, NHttpClientConnection> connFactory, - final int connectTimeout) { - super(ioreactor, connFactory, connectTimeout); - } - - @Override - public void release(final BasicNIOPoolEntry entry, boolean reusable) { - System.out.println("[proxy->origin] connection released " + entry.getConnection()); - super.release(entry, reusable); - StringBuilder buf = new StringBuilder(); - PoolStats totals = getTotalStats(); - buf.append("[total kept alive: ").append(totals.getAvailable()).append("; "); - buf.append("total allocated: ").append(totals.getLeased() + totals.getAvailable()); - buf.append(" of ").append(totals.getMax()).append("]"); - System.out.println("[proxy->origin] " + buf.toString()); - } - - } - -} +/* + * ==================================================================== + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * ==================================================================== + * + * This software consists of voluntary contributions made by many + * individuals on behalf of the Apache Software Foundation. For more + * information on the Apache Software Foundation, please see + * <http://www.apache.org/>. + * + */ +package org.apache.http.examples.nio; + +import java.io.IOException; +import java.io.InterruptedIOException; +import java.net.InetSocketAddress; +import java.net.URI; +import java.nio.ByteBuffer; +import java.security.cert.CertificateException; +import java.security.cert.X509Certificate; +import java.util.Locale; +import java.util.concurrent.atomic.AtomicLong; + +import javax.net.ssl.SSLContext; + +import org.apache.http.ConnectionReuseStrategy; +import org.apache.http.HttpEntityEnclosingRequest; +import org.apache.http.HttpException; +import org.apache.http.HttpHost; +import org.apache.http.HttpRequest; +import org.apache.http.HttpRequestInterceptor; +import org.apache.http.HttpResponse; +import org.apache.http.HttpResponseInterceptor; +import org.apache.http.HttpStatus; +import org.apache.http.HttpVersion; +import org.apache.http.config.ConnectionConfig; +import org.apache.http.entity.ContentType; +import org.apache.http.impl.DefaultConnectionReuseStrategy; +import org.apache.http.impl.EnglishReasonPhraseCatalog; +import org.apache.http.impl.nio.DefaultHttpClientIODispatch; +import org.apache.http.impl.nio.DefaultHttpServerIODispatch; +import org.apache.http.impl.nio.DefaultNHttpClientConnectionFactory; +import org.apache.http.impl.nio.SSLNHttpClientConnectionFactory; +import org.apache.http.impl.nio.pool.BasicNIOConnFactory; +import org.apache.http.impl.nio.pool.BasicNIOConnPool; +import org.apache.http.impl.nio.pool.BasicNIOPoolEntry; +import org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor; +import org.apache.http.impl.nio.reactor.DefaultListeningIOReactor; +import org.apache.http.impl.nio.reactor.IOReactorConfig; +import org.apache.http.message.BasicHttpEntityEnclosingRequest; +import org.apache.http.message.BasicHttpRequest; +import org.apache.http.message.BasicHttpResponse; +import org.apache.http.nio.ContentDecoder; +import org.apache.http.nio.ContentEncoder; +import org.apache.http.nio.IOControl; +import org.apache.http.nio.NHttpClientConnection; +import org.apache.http.nio.NHttpConnection; +import org.apache.http.nio.NHttpServerConnection; +import org.apache.http.nio.entity.NStringEntity; +import org.apache.http.nio.pool.NIOConnFactory; +import org.apache.http.nio.protocol.BasicAsyncResponseProducer; +import org.apache.http.nio.protocol.HttpAsyncExchange; +import org.apache.http.nio.protocol.HttpAsyncRequestConsumer; +import org.apache.http.nio.protocol.HttpAsyncRequestExecutor; +import org.apache.http.nio.protocol.HttpAsyncRequestHandler; +import org.apache.http.nio.protocol.HttpAsyncRequestHandlerMapper; +import org.apache.http.nio.protocol.HttpAsyncRequestProducer; +import org.apache.http.nio.protocol.HttpAsyncRequester; +import org.apache.http.nio.protocol.HttpAsyncResponseConsumer; +import org.apache.http.nio.protocol.HttpAsyncResponseProducer; +import org.apache.http.nio.protocol.HttpAsyncService; +import org.apache.http.nio.protocol.UriHttpAsyncRequestHandlerMapper; +import org.apache.http.nio.reactor.ConnectingIOReactor; +import org.apache.http.nio.reactor.IOEventDispatch; +import org.apache.http.nio.reactor.ListeningIOReactor; +import org.apache.http.pool.PoolStats; +import org.apache.http.protocol.HttpContext; +import org.apache.http.protocol.HttpCoreContext; +import org.apache.http.protocol.HttpProcessor; +import org.apache.http.protocol.ImmutableHttpProcessor; +import org.apache.http.protocol.RequestConnControl; +import org.apache.http.protocol.RequestContent; +import org.apache.http.protocol.RequestExpectContinue; +import org.apache.http.protocol.RequestTargetHost; +import org.apache.http.protocol.RequestUserAgent; +import org.apache.http.protocol.ResponseConnControl; +import org.apache.http.protocol.ResponseContent; +import org.apache.http.protocol.ResponseDate; +import org.apache.http.protocol.ResponseServer; +import org.apache.http.ssl.SSLContextBuilder; +import org.apache.http.ssl.TrustStrategy; + +/** + * Asynchronous, fully streaming HTTP/1.1 reverse proxy. + * <p> + * Supports SSL to origin servers which use self-signed certificates. + * </p> + */ +public class NHttpReverseProxy { + + public static void main(String[] args) throws Exception { + if (args.length < 2) { + System.out.println("Usage: NHttpReverseProxy <HostNameURI> <Port> [\"TrustSelfSignedStrategy\"]"); + System.exit(1); + } + // Extract command line arguments + URI uri = new URI(args[0]); + int port = Integer.parseInt(args[1]); + SSLContext sslContext = null; + if (args.length > 2 && args[2].equals("TrustSelfSignedStrategy")) { + System.out.println("Using TrustSelfSignedStrategy (not for production)"); + sslContext = SSLContextBuilder.create().loadTrustMaterial(new TrustStrategy() { + + @Override + public boolean isTrusted( + final X509Certificate[] chain, final String authType) throws CertificateException { + return chain.length == 1; + } + + }).build(); + } + + // Target host + HttpHost targetHost = new HttpHost( + uri.getHost(), + uri.getPort() > 0 ? uri.getPort() : 80, + uri.getScheme() != null ? uri.getScheme() : "http"); + + System.out.println("Reverse proxy to " + targetHost); + + IOReactorConfig config = IOReactorConfig.custom() + .setIoThreadCount(1) + .setSoTimeout(3000) + .setConnectTimeout(3000) + .build(); + final ConnectingIOReactor connectingIOReactor = new DefaultConnectingIOReactor(config); + final ListeningIOReactor listeningIOReactor = new DefaultListeningIOReactor(config); + + // Set up HTTP protocol processor for incoming connections + HttpProcessor inhttpproc = new ImmutableHttpProcessor( + new HttpResponseInterceptor[] { + new ResponseDate(), + new ResponseServer("Test/1.1"), + new ResponseContent(), + new ResponseConnControl() + }); + + // Set up HTTP protocol processor for outgoing connections + HttpProcessor outhttpproc = new ImmutableHttpProcessor( + new HttpRequestInterceptor[] { + new RequestContent(), + new RequestTargetHost(), + new RequestConnControl(), + new RequestUserAgent("Test/1.1"), + new RequestExpectContinue(true) + }); + + ProxyClientProtocolHandler clientHandler = new ProxyClientProtocolHandler(); + HttpAsyncRequester executor = new HttpAsyncRequester( + outhttpproc, new ProxyOutgoingConnectionReuseStrategy()); + + // Without SSL: ProxyConnPool connPool = new ProxyConnPool(connectingIOReactor, ConnectionConfig.DEFAULT); + ProxyConnPool connPool = new ProxyConnPool(connectingIOReactor, + new BasicNIOConnFactory(new DefaultNHttpClientConnectionFa ctory(ConnectionConfig.DEFAULT), + new SSLNHttpClientConnectionFactory(sslContext, null, ConnectionConfig.DEFAULT)), + 0); + connPool.setMaxTotal(100); + connPool.setDefaultMaxPerRoute(20); + + UriHttpAsyncRequestHandlerMapper handlerRegistry = new UriHttpAsyncRequestHandlerMapper(); + handlerRegistry.register("*", new ProxyRequestHandler(targetHost, executor, connPool)); + + ProxyServiceHandler serviceHandler = new ProxyServiceHandler( + inhttpproc, + new ProxyIncomingConnectionReuseStrategy(), + handlerRegistry); + + final IOEventDispatch connectingEventDispatch = DefaultHttpClientIODispatch.create( + clientHandler, sslContext, ConnectionConfig.DEFAULT); + + final IOEventDispatch listeningEventDispatch = new DefaultHttpServerIODispatch( + serviceHandler, ConnectionConfig.DEFAULT); + + Thread t = new Thread(new Runnable() { + + public void run() { + try { + connectingIOReactor.execute(connectingEventDispatch); + } catch (InterruptedIOException ex) { + System.err.println("Interrupted"); + } catch (IOException ex) { + ex.printStackTrace(); + } finally { + try { + listeningIOReactor.shutdown(); + } catch (IOException ex2) { + ex2.printStackTrace(); + } + } + } + + }); + t.start(); + try { + listeningIOReactor.listen(new InetSocketAddress(port)); + listeningIOReactor.execute(listeningEventDispatch); + } catch (InterruptedIOException ex) { + System.err.println("Interrupted"); + } catch (IOException ex) { + ex.printStackTrace(); + } finally { + try { + connectingIOReactor.shutdown(); + } catch (IOException ex2) { + ex2.printStackTrace(); + } + } + } + + static class ProxyHttpExchange { + + private final ByteBuffer inBuffer; + private final ByteBuffer outBuffer; + + private volatile String id; + private volatile HttpHost target; + private volatile HttpAsyncExchange responseTrigger; + private volatile IOControl originIOControl; + private volatile IOControl clientIOControl; + private volatile HttpRequest request; + private volatile boolean requestReceived; + private volatile HttpResponse response; + private volatile boolean responseReceived; + private volatile Exception ex; + + public ProxyHttpExchange() { + super(); + this.inBuffer = ByteBuffer.allocateDirect(10240); + this.outBuffer = ByteBuffer.allocateDirect(10240); + } + + public ByteBuffer getInBuffer() { + return this.inBuffer; + } + + public ByteBuffer getOutBuffer() { + return this.outBuffer; + } + + public String getId() { + return this.id; + } + + public void setId(final String id) { + this.id = id; + } + + public HttpHost getTarget() { + return this.target; + } + + public void setTarget(final HttpHost target) { + this.target = target; + } + + public HttpRequest getRequest() { + return this.request; + } + + public void setRequest(final HttpRequest request) { + this.request = request; + } + + public HttpResponse getResponse() { + return this.response; + } + + public void setResponse(final HttpResponse response) { + this.response = response; + } + + public HttpAsyncExchange getResponseTrigger() { + return this.responseTrigger; + } + + public void setResponseTrigger(final HttpAsyncExchange responseTrigger) { + this.responseTrigger = responseTrigger; + } + + public IOControl getClientIOControl() { + return this.clientIOControl; + } + + public void setClientIOControl(final IOControl clientIOControl) { + this.clientIOControl = clientIOControl; + } + + public IOControl getOriginIOControl() { + return this.originIOControl; + } + + public void setOriginIOControl(final IOControl originIOControl) { + this.originIOControl = originIOControl; + } + + public boolean isRequestReceived() { + return this.requestReceived; + } + + public void setRequestReceived() { + this.requestReceived = true; + } + + public boolean isResponseReceived() { + return this.responseReceived; + } + + public void setResponseReceived() { + this.responseReceived = true; + } + + public Exception getException() { + return this.ex; + } + + public void setException(final Exception ex) { + this.ex = ex; + } + + public void reset() { + this.inBuffer.clear(); + this.outBuffer.clear(); + this.target = null; + this.id = null; + this.responseTrigger = null; + this.clientIOControl = null; + this.originIOControl = null; + this.request = null; + this.requestReceived = false; + this.response = null; + this.responseReceived = false; + this.ex = null; + } + + } + + static class ProxyRequestHandler implements HttpAsyncRequestHandler<ProxyHttpExchange> { + + private final HttpHost target; + private final HttpAsyncRequester executor; + private final BasicNIOConnPool connPool; + private final AtomicLong counter; + + public ProxyRequestHandler( + final HttpHost target, + final HttpAsyncRequester executor, + final BasicNIOConnPool connPool) { + super(); + this.target = target; + this.executor = executor; + this.connPool = connPool; + this.counter = new AtomicLong(1); + } + + public HttpAsyncRequestConsumer<ProxyHttpExchange> processRequest( + final HttpRequest request, + final HttpContext context) { + ProxyHttpExchange httpExchange = (ProxyHttpExchange) context.getAttribute("http-exchange"); + if (httpExchange == null) { + httpExchange = new ProxyHttpExchange(); + context.setAttribute("http-exchange", httpExchange); + } + synchronized (httpExchange) { + httpExchange.reset(); + String id = String.format("%08X", this.counter.getAndIncrement()); + httpExchange.setId(id); + httpExchange.setTarget(this.target); + return new ProxyRequestConsumer(httpExchange, this.executor, this.connPool); + } + } + + public void handle( + final ProxyHttpExchange httpExchange, + final HttpAsyncExchange responseTrigger, + final HttpContext context) throws HttpException, IOException { + synchronized (httpExchange) { + Exception ex = httpExchange.getException(); + if (ex != null) { + System.out.println("[client<-proxy] " + httpExchange.getId() + " " + ex); + int status = HttpStatus.SC_INTERNAL_SERVER_ERROR; + HttpResponse response = new BasicHttpResponse(HttpVersion.HTTP_1_0, status, + EnglishReasonPhraseCatalog.INSTANCE.getReason(status, Locale.US)); + String message = ex.getMessage(); + if (message == null) { + message = "Unexpected error"; + } + response.setEntity(new NStringEntity(message, ContentType.DEFAULT_TEXT)); + responseTrigger.submitResponse(new BasicAsyncResponseProducer(response)); + System.out.println("[client<-proxy] " + httpExchange.getId() + " error response triggered"); + } + HttpResponse response = httpExchange.getResponse(); + if (response != null) { + responseTrigger.submitResponse(new ProxyResponseProducer(httpExchange)); + System.out.println("[client<-proxy] " + httpExchange.getId() + " response triggered"); + } + // No response yet. + httpExchange.setResponseTrigger(responseTrigger); + } + } + + } + + static class ProxyRequestConsumer implements HttpAsyncRequestConsumer<ProxyHttpExchange> { + + private final ProxyHttpExchange httpExchange; + private final HttpAsyncRequester executor; + private final BasicNIOConnPool connPool; + + private volatile boolean completed; + + public ProxyRequestConsumer( + final ProxyHttpExchange httpExchange, + final HttpAsyncRequester executor, + final BasicNIOConnPool connPool) { + super(); + this.httpExchange = httpExchange; + this.executor = executor; + this.connPool = connPool; + } + + public void close() throws IOException { + } + + public void requestReceived(final HttpRequest request) { + synchronized (this.httpExchange) { + System.out.println("[client->proxy] " + this.httpExchange.getId() + " " + request.getRequestLine()); + this.httpExchange.setRequest(request); + this.executor.execute( + new ProxyRequestProducer(this.httpExchange), + new ProxyResponseConsumer(this.httpExchange), + this.connPool); + } + } + + public void consumeContent( + final ContentDecoder decoder, final IOControl ioctrl) throws IOException { + synchronized (this.httpExchange) { + this.httpExchange.setClientIOControl(ioctrl); + // Receive data from the client + ByteBuffer buf = this.httpExchange.getInBuffer(); + int n = decoder.read(buf); + System.out.println("[client->proxy] " + this.httpExchange.getId() + " " + n + " bytes read"); + if (decoder.isCompleted()) { + System.out.println("[client->proxy] " + this.httpExchange.getId() + " content fully read"); + } + // If the buffer is full, suspend client input until there is free + // space in the buffer + if (!buf.hasRemaining()) { + ioctrl.suspendInput(); + System.out.println("[client->proxy] " + this.httpExchange.getId() + " suspend client input"); + } + // If there is some content in the input buffer make sure origin + // output is active + if (buf.position() > 0) { + if (this.httpExchange.getOriginIOControl() != null) { + this.httpExchange.getOriginIOControl(). requestOutput(); + System.out.println("[client->proxy] " + this.httpExchange.getId() + " request origin output"); + } + } + } + } + + public void requestCompleted(final HttpContext context) { + synchronized (this.httpExchange) { + this.completed = true;; + System.out.println("[client->proxy] " + this.httpExchange.getId() + " request completed"); + this.httpExchange.setRequestReceived(); + if (this.httpExchange.getOriginIOControl() != null) { + this.httpExchange.getOriginIOControl().requestOutput(); + System.out.println("[client->proxy] " + this.httpExchange.getId() + " request origin output"); + } + } + } + + public Exception getException() { + return null; + } + + public ProxyHttpExchange getResult() { + return this.httpExchange; + } + + public boolean isDone() { + return this.completed; + } + + public void failed(final Exception ex) { + System.out.println("[client->proxy] " + ex.toString()); + } + + } + + static class ProxyRequestProducer implements HttpAsyncRequestProducer { + + private final ProxyHttpExchange httpExchange; + + public ProxyRequestProducer(final ProxyHttpExchange httpExchange) { + super(); + this.httpExchange = httpExchange; + } + + public void close() throws IOException { + } + + public HttpHost getTarget() { + synchronized (this.httpExchange) { + return this.httpExchange.getTarget(); + } + } + + public HttpRequest generateRequest() { + synchronized (this.httpExchange) { + HttpRequest request = this.httpExchange.getRequest(); + System.out.println("[proxy->origin] " + this.httpExchange.getId() + " " + request.getRequestLine()); + // Rewrite request!!!! + if (request instanceof HttpEntityEnclosingRequest) { + BasicHttpEntityEnclosingRequest r = new BasicHttpEntityEnclosingRequest( + request.getRequestLine()); + r.setEntity(((HttpEntityEnclosingRequest) request).getEntity()); + return r; + } else { + return new BasicHttpRequest(request.getRequestLine()); + } + } + } + + public void produceContent( + final ContentEncoder encoder, final IOControl ioctrl) throws IOException { + synchronized (this.httpExchange) { + this.httpExchange.setOriginIOControl(ioctrl); + // Send data to the origin server + ByteBuffer buf = this.httpExchange.getInBuffer(); + buf.flip(); + int n = encoder.write(buf); + buf.compact(); + System.out.println("[proxy->origin] " + this.httpExchange.getId() + " " + n + " bytes written"); + // If there is space in the buffer and the message has not been + // transferred, make sure the client is sending more data + if (buf.hasRemaining() && !this.httpExchange.isRequestReceived()) { + if (this.httpExchange.getClientIOControl() != null) { + this.httpExchange.getClientIOControl(). requestInput(); + System.out.println("[proxy->origin] " + this.httpExchange.getId() + " request client input"); + } + } + if (buf.position() == 0) { + if (this.httpExchange.isRequestReceived()) { + encoder.complete(); + System.out.println("[proxy->origin] " + this.httpExchange.getId() + " content fully written"); + } else { + // Input buffer is empty. Wait until the client fills up + // the buffer + ioctrl.suspendOutput(); + System.out.println("[proxy->origin] " + this.httpExchange.getId() + " suspend origin output"); + } + } + } + } + + public void requestCompleted(final HttpContext context) { + synchronized (this.httpExchange) { + System.out.println("[proxy->origin] " + this.httpExchange.getId() + " request completed"); + } + } + + public boolean isRepeatable() { + return false; + } + + public void resetRequest() { + } + + public void failed(final Exception ex) { + System.out.println("[proxy->origin] " + ex.toString()); + } + + } + + static class ProxyResponseConsumer implements HttpAsyncResponseConsumer<ProxyHttpExchange> { + + private final ProxyHttpExchange httpExchange; + + private volatile boolean completed; + + public ProxyResponseConsumer(final ProxyHttpExchange httpExchange) { + super(); + this.httpExchange = httpExchange; + } + + public void close() throws IOException { + } + + public void responseReceived(final HttpResponse response) { + synchronized (this.httpExchange) { + System.out.println("[proxy<-origin] " + this.httpExchange.getId() + " " + response.getStatusLine()); + this.httpExchange.setResponse(response); + HttpAsyncExchange responseTrigger = this.httpExchange. getResponseTrigger(); + if (responseTrigger != null && !responseTrigger.isCompleted()) { + System.out.println("[client<-proxy] " + this.httpExchange.getId() + " response triggered"); + responseTrigger.submitResponse(new ProxyResponseProducer(this.httpExchange)); + } + } + } + + public void consumeContent( + final ContentDecoder decoder, final IOControl ioctrl) throws IOException { + synchronized (this.httpExchange) { + this.httpExchange.setOriginIOControl(ioctrl); + // Receive data from the origin + ByteBuffer buf = this.httpExchange.getOutBuffer(); + int n = decoder.read(buf); + System.out.println("[proxy<-origin] " + this.httpExchange.getId() + " " + n + " bytes read"); + if (decoder.isCompleted()) { + System.out.println("[proxy<-origin] " + this.httpExchange.getId() + " content fully read"); + } + // If the buffer is full, suspend origin input until there is free + // space in the buffer + if (!buf.hasRemaining()) { + ioctrl.suspendInput(); + System.out.println("[proxy<-origin] " + this.httpExchange.getId() + " suspend origin input"); + } + // If there is some content in the input buffer make sure client + // output is active + if (buf.position() > 0) { + if (this.httpExchange.getClientIOControl() != null) { + this.httpExchange.getClientIOControl(). requestOutput(); + System.out.println("[proxy<-origin] " + this.httpExchange.getId() + " request client output"); + } + } + } + } + + public void responseCompleted(final HttpContext context) { + synchronized (this.httpExchange) { + if (this.completed) { + return; + } + this.completed = true; + System.out.println("[proxy<-origin] " + this.httpExchange.getId() + " response completed"); + this.httpExchange.setResponseReceived(); + if (this.httpExchange.getClientIOControl() != null) { + this.httpExchange.getClientIOControl().requestOutput(); + System.out.println("[proxy<-origin] " + this.httpExchange.getId() + " request client output"); + } + } + } + + public void failed(final Exception ex) { + synchronized (this.httpExchange) { + if (this.completed) { + return; + } + this.completed = true; + this.httpExchange.setException(ex); + HttpAsyncExchange responseTrigger = this.httpExchange. getResponseTrigger(); + if (responseTrigger != null && !responseTrigger.isCompleted()) { + System.out.println("[client<-proxy] " + this.httpExchange.getId() + " " + ex); + int status = HttpStatus.SC_INTERNAL_SERVER_ERROR; + HttpResponse response = new BasicHttpResponse(HttpVersion.HTTP_1_0, status, + EnglishReasonPhraseCatalog.INSTANCE.getReason(status, Locale.US)); + String message = ex.getMessage(); + if (message == null) { + message = "Unexpected error"; + } + response.setEntity(new NStringEntity(message, ContentType.DEFAULT_TEXT)); + responseTrigger.submitResponse(new BasicAsyncResponseProducer(response)); + } + } + } + + public boolean cancel() { + synchronized (this.httpExchange) { + if (this.completed) { + return false; + } + failed(new InterruptedIOException("Cancelled")); + return true; + } + } + + public ProxyHttpExchange getResult() { + return this.httpExchange; + } + + public Exception getException() { + return null; + } + + public boolean isDone() { + return this.completed; + } + + } + + static class ProxyResponseProducer implements HttpAsyncResponseProducer { + + private final ProxyHttpExchange httpExchange; + + public ProxyResponseProducer(final ProxyHttpExchange httpExchange) { + super(); + this.httpExchange = httpExchange; + } + + public void close() throws IOException { + this.httpExchange.reset(); + } + + public HttpResponse generateResponse() { + synchronized (this.httpExchange) { + HttpResponse response = this.httpExchange.getResponse(); + System.out.println("[client<-proxy] " + this.httpExchange.getId() + " " + response.getStatusLine()); + // Rewrite response!!!! + BasicHttpResponse r = new BasicHttpResponse(response. getStatusLine()); + r.setEntity(response.getEntity()); + return r; + } + } + + public void produceContent( + final ContentEncoder encoder, final IOControl ioctrl) throws IOException { + synchronized (this.httpExchange) { + this.httpExchange.setClientIOControl(ioctrl); + // Send data to the client + ByteBuffer buf = this.httpExchange.getOutBuffer(); + buf.flip(); + int n = encoder.write(buf); + buf.compact(); + System.out.println("[client<-proxy] " + this.httpExchange.getId() + " " + n + " bytes written"); + // If there is space in the buffer and the message has not been + // transferred, make sure the origin is sending more data + if (buf.hasRemaining() && !this.httpExchange.isResponseReceived()) { + if (this.httpExchange.getOriginIOControl() != null) { + this.httpExchange.getOriginIOControl(). requestInput(); + System.out.println("[client<-proxy] " + this.httpExchange.getId() + " request origin input"); + } + } + if (buf.position() == 0) { + if (this.httpExchange.isResponseReceived()) { + encoder.complete(); + System.out.println("[client<-proxy] " + this.httpExchange.getId() + " content fully written"); + } else { + // Input buffer is empty. Wait until the origin fills up + // the buffer + ioctrl.suspendOutput(); + System.out.println("[client<-proxy] " + this.httpExchange.getId() + " suspend client output"); + } + } + } + } + + public void responseCompleted(final HttpContext context) { + synchronized (this.httpExchange) { + System.out.println("[client<-proxy] " + this.httpExchange.getId() + " response completed"); + } + } + + public void failed(final Exception ex) { + System.out.println("[client<-proxy] " + ex.toString()); + } + + } + + static class ProxyIncomingConnectionReuseStrategy extends DefaultConnectionReuseStrategy { + + @Override + public boolean keepAlive(final HttpResponse response, final HttpContext context) { + NHttpConnection conn = (NHttpConnection) context.getAttribute( + HttpCoreContext.HTTP_CONNECTION); + boolean keepAlive = super.keepAlive(response, context); + if (keepAlive) { + System.out.println("[client->proxy] connection kept alive " + conn); + } + return keepAlive; + } + + }; + + static class ProxyOutgoingConnectionReuseStrategy extends DefaultConnectionReuseStrategy { + + @Override + public boolean keepAlive(final HttpResponse response, final HttpContext context) { + NHttpConnection conn = (NHttpConnection) context.getAttribute( + HttpCoreContext.HTTP_CONNECTION); + boolean keepAlive = super.keepAlive(response, context); + if (keepAlive) { + System.out.println("[proxy->origin] connection kept alive " + conn); + } + return keepAlive; + } + + }; + + static class ProxyServiceHandler extends HttpAsyncService { + + public ProxyServiceHandler( + final HttpProcessor httpProcessor, + final ConnectionReuseStrategy reuseStrategy, + final HttpAsyncRequestHandlerMapper handlerResolver) { + super(httpProcessor, reuseStrategy, null, handlerResolver, null); + } + + @Override + protected void log(final Exception ex) { + ex.printStackTrace(); + } + + @Override + public void connected(final NHttpServerConnection conn) { + System.out.println("[client->proxy] connection open " + conn); + super.connected(conn); + } + + @Override + public void closed(final NHttpServerConnection conn) { + System.out.println("[client->proxy] connection closed " + conn); + super.closed(conn); + } + + } + + static class ProxyClientProtocolHandler extends HttpAsyncRequestExecutor { + + public ProxyClientProtocolHandler() { + super(); + } + + @Override + protected void log(final Exception ex) { + ex.printStackTrace(); + } + + @Override + public void connected(final NHttpClientConnection conn, + final Object attachment) throws IOException, HttpException { + System.out.println("[proxy->origin] connection open " + conn); + super.connected(conn, attachment); + } + + @Override + public void closed(final NHttpClientConnection conn) { + System.out.println("[proxy->origin] connection closed " + conn); + super.closed(conn); + } + + } + + static class ProxyConnPool extends BasicNIOConnPool { + + public ProxyConnPool( + final ConnectingIOReactor ioreactor, + final NIOConnFactory<HttpHost, NHttpClientConnection> connFactory, + final int connectTimeout) { + super(ioreactor, connFactory, connectTimeout); + } + + @Override + public void release(final BasicNIOPoolEntry entry, boolean reusable) { + System.out.println("[proxy->origin] connection released " + entry.getConnection()); + super.release(entry, reusable); + StringBuilder buf = new StringBuilder(); + PoolStats totals = getTotalStats(); + buf.append("[total kept alive: ").append(totals.getAvailable()).append("; "); + buf.append("total allocated: ").append(totals.getLeased() + totals.getAvailable()); + buf.append(" of ").append(totals.getMax()).append("]"); + System.out.println("[proxy->origin] " + buf.toString()); + } + + } + +} -- E-Mail: [email protected] | [email protected] Java Persistence with Hibernate, Second Edition <https://www.amazon.com/gp/product/1617290459/ref=as_li_tl?ie=UTF8&camp=1789&creative=9325&creativeASIN=1617290459&linkCode=as2&tag=garygregory-20&linkId=cadb800f39946ec62ea2b1af9fe6a2b8> <http:////ir-na.amazon-adsystem.com/e/ir?t=garygregory-20&l=am2&o=1&a=1617290459> JUnit in Action, Second Edition <https://www.amazon.com/gp/product/1935182021/ref=as_li_tl?ie=UTF8&camp=1789&creative=9325&creativeASIN=1935182021&linkCode=as2&tag=garygregory-20&linkId=31ecd1f6b6d1eaf8886ac902a24de418%22> <http:////ir-na.amazon-adsystem.com/e/ir?t=garygregory-20&l=am2&o=1&a=1935182021> Spring Batch in Action <https://www.amazon.com/gp/product/1935182951/ref=as_li_tl?ie=UTF8&camp=1789&creative=9325&creativeASIN=1935182951&linkCode=%7B%7BlinkCode%7D%7D&tag=garygregory-20&linkId=%7B%7Blink_id%7D%7D%22%3ESpring+Batch+in+Action> <http:////ir-na.amazon-adsystem.com/e/ir?t=garygregory-20&l=am2&o=1&a=1935182951> Blog: http://garygregory.wordpress.com Home: http://garygregory.com/ Tweet! http://twitter.com/GaryGregory
