On Fri, 2007-01-12 at 14:06 +0530, Asankha C. Perera wrote:
> Hi Oleg
> >
> > Can it be you are not calling ContentEncoder#complete()?
> >
> > http://jakarta.apache.org/httpcomponents/httpcore/jakarta-httpcore-nio/xref/org/apache/http/nio/impl/codecs/ChunkEncoder.html#69
> >
> >
> >
> I am calling it, and when I debug, it steps into the complete() method
> of the ChunkEncoder as expected. I am not sure how much this will help
> as you will not be able to run this attached code.. but maybe you could
> advise me if I am doing something wrong...
>
Hi Asankha,
I am unable to spot an obvious problem with this code. All looks sane to
me. I can only speculate this is because the HTTP worker fails to flush
the output buffer. Having said that, I have never worked with Pipes in
Java, so I can easily be wrong about it.
It is entirely possible this is a bug in HttpCore NIO, so if you managed
to create a self-contained application that I could you run locally to
reproduce the problem, I would make sure HttpCore works as advertised.
I would also like to suggest the following. Could you simply buffer the
entire content in memory for the time being and get the HTTP protocol
layer work as expected as a first step? Once you are reasonably sure the
protocol layer is all right, I would happily help you implement content
streaming and optimize other transport aspects.
Oleg
> public class ServiceHandler implements NHttpServiceHandler {
>
> private final HttpParams params;
> private final HttpResponseFactory responseFactory;
> private final ByteBuffer inbuf;
> private final ByteBuffer outbuf;
> private final HttpProcessor httpProcessor;
> private final ConnectionReuseStrategy connStrategy;
>
> ConfigurationContext cfgCtx = null;
>
> private Executor workerPool = null;
> private static final int WORKERS_MAX_THREADS = 40;
> private static final long WORKER_KEEP_ALIVE = 100L;
>
> public ServiceHandler(final ConfigurationContext cfgCtx, final
> HttpParams params) {
> super();
> this.cfgCtx = cfgCtx;
> this.params = params;
>
> responseFactory = new DefaultHttpResponseFactory();
> inbuf = ByteBuffer.allocateDirect(2048);
> outbuf = ByteBuffer.allocateDirect(2048);
> BasicHttpProcessor httpProcessor = new BasicHttpProcessor();
> httpProcessor.addInterceptor(new ResponseDate());
> httpProcessor.addInterceptor(new ResponseServer());
> httpProcessor.addInterceptor(new ResponseContent());
> httpProcessor.addInterceptor(new ResponseConnControl());
> this.httpProcessor = httpProcessor;
> connStrategy = new DefaultConnectionReuseStrategy();
>
> workerPool = new ThreadPoolExecutor(
> 1, WORKERS_MAX_THREADS, WORKER_KEEP_ALIVE, TimeUnit.SECONDS,
> new LinkedBlockingQueue(),
> new DefaultThreadFactory(new ThreadGroup("Worker thread
> group"), "HttpWorker"));
> }
>
> private void shutdownConnection(final HttpConnection conn) {
> try {
> conn.shutdown();
> } catch (IOException ignore) {
> }
> }
>
> public void requestReceived(final NHttpServerConnection conn) {
> HttpRequest request = conn.getHttpRequest();
> HttpContext context = conn.getContext();
> HttpVersion httpVersion = request.getRequestLine().getHttpVersion();
> HttpResponse response =
> responseFactory.newHttpResponse(httpVersion, HttpStatus.SC_OK);
> response.setParams(this.params);
>
> try {
> Pipe requestPipe = Pipe.open();
> Pipe responsePipe = Pipe.open();
> context.setAttribute("request-sink-channel",
> requestPipe.sink());
> context.setAttribute("response-source-channel",
> responsePipe.source());
>
> BasicHttpEntity entity = new BasicHttpEntity();
>
> entity.setContent(Channels.newInputStream(responsePipe.source()));
> if (httpVersion.greaterEquals(HttpVersion.HTTP_1_1)) {
> entity.setChunked(true);
> }
> response.setEntity(entity);
>
> context.setAttribute(HttpContext.HTTP_REQUEST, request);
>
> workerPool.execute(
> new Worker(cfgCtx, conn, this,
> request, Channels.newInputStream(requestPipe.source()),
> response,
> Channels.newOutputStream(responsePipe.sink())));
>
> } catch (IOException e) {
> e.printStackTrace();
> }
> }
>
> public void connected(final NHttpServerConnection conn) {
> System.out.println("New incoming connection");
> }
>
> public void closed(final NHttpServerConnection conn) {
> System.out.println("Connection closed");
> }
>
> public void exception(final NHttpServerConnection conn, final
> HttpException ex) {
> HttpRequest request = conn.getHttpRequest();
> HttpVersion ver = request.getRequestLine().getHttpVersion();
> HttpResponse response = this.responseFactory.newHttpResponse(
> ver, HttpStatus.SC_BAD_REQUEST);
> byte[] msg = EncodingUtils.getAsciiBytes(
> "Malformed HTTP request: " + ex.getMessage());
> ByteArrayEntity entity = new ByteArrayEntity(msg);
> entity.setContentType("text/plain; charset=US-ASCII");
> response.setEntity(entity);
> commitResponse(conn, response);
> }
>
> public void exception(NHttpServerConnection conn, IOException ex) {
> System.err.println("I/O error: " + ex.getMessage());
> shutdownConnection(conn);
> }
>
> public void inputReady(final NHttpServerConnection conn, final
> ContentDecoder decoder) {
>
> HttpContext context = conn.getContext();
> Pipe.SinkChannel sink = (Pipe.SinkChannel)
> context.getAttribute("request-sink-channel");
>
> try {
> while (decoder.read(inbuf) > 0) {
> inbuf.flip();
> sink.write(inbuf);
> inbuf.compact();
> }
>
> if (decoder.isCompleted()) {
> sink.close();
> }
>
> } catch (IOException ex) {
> shutdownConnection(conn);
> System.err.println("I/O error: " + ex.getMessage());
> }
> }
>
> public void outputReady(final NHttpServerConnection conn, final
> ContentEncoder encoder) {
>
> HttpContext context = conn.getContext();
> HttpResponse response = conn.getHttpResponse();
> Pipe.SourceChannel source = (Pipe.SourceChannel)
> context.getAttribute("response-source-channel");
>
> try {
> int bytesRead = source.read(outbuf);
> if (bytesRead == -1) {
> encoder.complete();
> } else {
> outbuf.flip();
> encoder.write(outbuf);
> outbuf.compact();
> }
>
> if (encoder.isCompleted()) {
> source.close();
> if (!connStrategy.keepAlive(response, context)) {
> conn.close();
> }
> }
>
> } catch (IOException ex) {
> shutdownConnection(conn);
> System.err.println("I/O error: " + ex.getMessage());
> }
>
> }
>
> public void timeout(final NHttpServerConnection conn) {
> System.err.println("Timeout");
> shutdownConnection(conn);
> }
>
> public void commitResponse(final NHttpServerConnection conn, final
> HttpResponse response) {
> try {
> httpProcessor.process(response, conn.getContext());
> conn.submitResponse(response);
> } catch (HttpException ex) {
> shutdownConnection(conn);
> System.err.println("Unexpected HTTP protocol error: " +
> ex.getMessage());
> } catch (IOException ex) {
> shutdownConnection(conn);
> System.err.println("I/O error: " + ex.getMessage());
> }
> }
> }
>
> thanks
> asankha
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: [EMAIL PROTECTED]
> For additional commands, e-mail: [EMAIL PROTECTED]
>
>
---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]