the WritePendingException smells like a bug in your handling of the WriteListener.
Take a look at the various gotchas in the Async I/O slide deck. http://www.slideshare.net/SimoneBordet/servlet-31-async-io The section on "Servlet 3.1 Async I/O" lists 6 gotchas for WriteListener alone. Since you are using scala, I really can't help much beyond that. Joakim Erdfelt / [email protected] On Wed, Sep 9, 2015 at 3:34 PM, Xiaodong Wang <[email protected]> wrote: > Hi Joakim, > > Thanks for your response, I changed the code as you suggested but it still > does not work. *But it works in Tomcat.* > > This is the error I got. > [qtp990355670-43] WARN org.eclipse.jetty.util.thread.QueuedThreadPool - > java.lang.IllegalStateException > at > org.eclipse.jetty.server.HttpOutput$AsyncICB.onCompleteSuccess(HttpOutput.java:990) > at > org.eclipse.jetty.server.HttpOutput$AsyncWrite.onCompleteSuccess(HttpOutput.java:1126) > at > org.eclipse.jetty.util.IteratingCallback.processing(IteratingCallback.java:325) > at > org.eclipse.jetty.util.IteratingCallback.succeeded(IteratingCallback.java:365) > at > org.eclipse.jetty.server.HttpConnection$SendCallback.onCompleteSuccess(HttpConnection.java:747) > at > org.eclipse.jetty.util.IteratingCallback.processing(IteratingCallback.java:325) > at > org.eclipse.jetty.util.IteratingCallback.succeeded(IteratingCallback.java:365) > at > org.eclipse.jetty.io.WriteFlusher$PendingState.complete(WriteFlusher.java:269) > at org.eclipse.jetty.io.WriteFlusher.completeWrite(WriteFlusher.java:394) > at > org.eclipse.jetty.io.SelectChannelEndPoint$3.run(SelectChannelEndPoint.java:89) > at > org.eclipse.jetty.util.thread.strategy.ExecuteProduceConsume.produceAndRun(ExecuteProduceConsume.java:213) > at > org.eclipse.jetty.util.thread.strategy.ExecuteProduceConsume.run(ExecuteProduceConsume.java:147) > at > org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:654) > at > org.eclipse.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:572) > at java.lang.Thread.run(Thread.java:745) > [qtp990355670-43] WARN org.eclipse.jetty.util.thread.QueuedThreadPool - > Unexpected thread death: > org.eclipse.jetty.util.thread.QueuedThreadPool$3@2b35b7fd in > qtp990355670{STARTED,8<=21<=200,i=11,q=0} > [qtp990355670-32] WARN org.eclipse.jetty.server.HttpChannel - > //localhost:56447/async > java.nio.channels.WritePendingException > at > org.eclipse.jetty.server.HttpConnection$SendCallback.reset(HttpConnection.java:622) > at > org.eclipse.jetty.server.HttpConnection$SendCallback.access$300(HttpConnection.java:584) > at org.eclipse.jetty.server.HttpConnection.send(HttpConnection.java:510) > at org.eclipse.jetty.server.HttpChannel.sendResponse(HttpChannel.java:650) > at org.eclipse.jetty.server.HttpChannel.write(HttpChannel.java:699) > at org.eclipse.jetty.server.HttpOutput.write(HttpOutput.java:177) > at org.eclipse.jetty.server.HttpOutput.write(HttpOutput.java:163) > at org.eclipse.jetty.server.HttpOutput.close(HttpOutput.java:211) > at org.eclipse.jetty.server.Response.closeOutput(Response.java:987) > at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:412) > at org.eclipse.jetty.server.HttpChannel.run(HttpChannel.java:262) > at > org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:654) > at > org.eclipse.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:572) > at java.lang.Thread.run(Thread.java:745) > > On Wed, Sep 9, 2015 at 11:07 AM, Joakim Erdfelt <[email protected]> > wrote: > >> I think AsyncListener.onError() is a terminal state/call. >> The AsyncContext is no longer valid once that is called. >> The response is committed/closed as well. >> Moving to the complete() state isn't even possible. >> (Note: I could be wrong in this assumption) >> >> Some other advice, don't rely on context.getRequest and >> context.getResponse to always be there and be available. >> The lifecycle of an AsyncContext, and WriteListener can be longer than >> the lifecycle of a Request/Response. >> Its very possible for the Request/Response to be recycled / cleaned up >> before the AsyncContext and WriteListener are done. >> >> Yank them out into variables and use them that way. >> For example, your call to context.getRequest.getServletContext.log is >> just asking for problems (especially in an onError) >> >> >> >> Joakim Erdfelt / [email protected] >> >> On Wed, Sep 9, 2015 at 10:54 AM, Xiaodong Wang <[email protected]> >> wrote: >> >>> I try to use the asynchronous servlet with Jetty 9, but I >>> get IllegalStateException from HttpOutput class. The same code works well >>> with Tomcat. >>> >>> This is my test code (in Scala). >>> >>> TestServers.scala >>> ------------------------------------------------------------------------------------------------------------------------ >>> package playground.webserver >>> >>> import java.io.File >>> import javax.servlet.Servlet >>> >>> import org.apache.catalina.startup.Tomcat >>> import org.eclipse.jetty.server.Server >>> import org.eclipse.jetty.servlet.{ServletHolder, ServletContextHandler} >>> ; >>> >>> trait TestServer { >>> val port: Int >>> def start: Unit >>> def stop: Unit >>> def addServlet(servlet: Servlet, contextPath: String) >>> } >>> >>> class TomcatTestServer(val port: Int) extends TestServer { >>> private val tomcat = new Tomcat() >>> tomcat.setPort(port) >>> >>> override def start: Unit = tomcat.start() >>> >>> override def stop: Unit = tomcat.stop() >>> >>> override def addServlet(servlet: Servlet, contextPath: String): Unit >>> = { >>> val base = new File(System.getProperty("java.io.tmpdir")); >>> val context = tomcat.addContext(contextPath, base.getAbsolutePath()); >>> >>> tomcat.addServlet(contextPath, contextPath, servlet) >>> context.addServletMapping("/", contextPath) >>> } >>> } >>> >>> class JettyTestServer(val port: Int) extends TestServer { >>> private val jetty = new Server(port) >>> private val context = new >>> ServletContextHandler(ServletContextHandler.SESSIONS) >>> context.setContextPath("/") >>> jetty.setHandler(context) >>> >>> override def start: Unit = jetty.start() >>> >>> override def stop: Unit = jetty.stop() >>> >>> override def addServlet(servlet: Servlet, contextPath: String): Unit >>> = { >>> context.addServlet(new ServletHolder(servlet), contextPath) >>> } >>> } >>> >>> >>> >>> AsyncIoTest.scala------------------------------------------------------------------------------------------------------------------------------------ >>> package playground.webserver >>> >>> import java.net.ServerSocket >>> import javax.servlet.http.{HttpServlet, HttpServletRequest, >>> HttpServletResponse} >>> import javax.servlet.{AsyncContext, WriteListener} >>> >>> import org.apache.http.client.methods.HttpGet >>> import org.apache.http.impl.client.HttpClients >>> import org.scalatest.{BeforeAndAfterAll, FunSpec, Matchers} >>> >>> import scala.concurrent.ExecutionContext.Implicits.global >>> import scala.concurrent.duration.Duration >>> import scala.concurrent.{Await, Future} >>> >>> abstract class AsyncIoTest extends FunSpec with Matchers with >>> BeforeAndAfterAll { >>> private val data = new Array[Byte](1024 * 1024 * 128) >>> >>> *class DataWriteListener(context: AsyncContext) extends WriteListener >>> {* >>> * private[this] var pos = 0* >>> >>> * override def onError(t: Throwable): Unit = {* >>> * context.getRequest.getServletContext.log("Async Error", t)* >>> * context.complete()* >>> * }* >>> >>> * override def onWritePossible(): Unit = {* >>> * val out = context.getResponse.getOutputStream* >>> * while (out.isReady && pos < data.length) {* >>> * val toWrite = math.min(1024, data.length - pos)* >>> * out.write(data, pos, toWrite)* >>> * pos += toWrite* >>> * }* >>> >>> * if (pos >= data.length) {* >>> * context.complete()* >>> * }* >>> * }* >>> * }* >>> >>> *class AsyncServlet extends HttpServlet {* >>> * override def doGet(req: HttpServletRequest, resp: >>> HttpServletResponse): Unit = {* >>> * req.setAttribute("org.apache.catalina.ASYNC_SUPPORTED", true);* >>> >>> * resp.setStatus(200)* >>> * resp.setContentLength(data.length)* >>> >>> * val async = req.startAsync()* >>> * val out = resp.getOutputStream* >>> >>> * out.setWriteListener(new DataWriteListener(async))* >>> * }* >>> * }* >>> >>> class SyncServlet extends HttpServlet { >>> override def doGet(req: HttpServletRequest, resp: >>> HttpServletResponse): Unit = { >>> resp.setStatus(200) >>> resp.setContentLength(data.length) >>> val out = resp.getOutputStream >>> var pos = 0 >>> while(pos < data.length) { >>> out.write(data, pos, 1024) >>> pos += 1024 >>> } >>> } >>> } >>> >>> protected val server: TestServer >>> protected lazy val port: Int = findAvailablePort() >>> private val requestNum = 100 >>> >>> private def findAvailablePort(): Int = { >>> val serverSocket = new ServerSocket(0) >>> val port = serverSocket.getLocalPort() >>> serverSocket.close() >>> >>> port >>> } >>> >>> describe("Sync IO") { >>> it("trigger some traffic") { >>> testWithUrl(s"http://localhost:${port}/sync") >>> } >>> } >>> >>> describe("Async IO") { >>> it("trigger some traffic") { >>> testWithUrl(s"http://localhost:${port}/async") >>> } >>> } >>> >>> override protected def beforeAll(): Unit = { >>> server.addServlet(new AsyncServlet, "/async") >>> server.addServlet(new SyncServlet, "/sync") >>> server.start >>> } >>> >>> >>> override protected def afterAll(): Unit = { >>> server.stop >>> } >>> >>> private def testWithUrl(url: String): Unit = { >>> val start = System.currentTimeMillis >>> >>> val futures = (1 to requestNum).map { i => >>> Future { >>> val total = readData(url) >>> total should be(1024 * 1024 * 128) >>> total >>> } >>> } >>> >>> Await.result(Future.sequence(futures), Duration.Inf) >>> >>> println(s"Total millis used: ${System.currentTimeMillis - start}") >>> } >>> >>> private def readData(url: String): Long = { >>> val httpclient = HttpClients.createDefault() >>> val httpGet = new HttpGet(url) >>> val resp = httpclient.execute(httpGet) >>> val is = resp.getEntity.getContent >>> val buf = new Array[Byte](1024 * 1024) >>> var read = 0 >>> var total = 0L >>> while(read != -1) { >>> total += read >>> read = is.read(buf) >>> } >>> >>> resp.close() >>> total >>> } >>> } >>> >>> class AsyncIoWithJettyTest extends AsyncIoTest { >>> override protected val server: TestServer = new JettyTestServer(port) >>> } >>> >>> class AsyncIoWithTomcatTest extends AsyncIoTest { >>> override protected val server: TestServer = new TomcatTestServer(port) >>> } >>> >>> >>> This is the error I got. >>> [qtp990355670-39] WARN org.eclipse.jetty.util.thread.QueuedThreadPool - >>> [qtp990355670-32] WARN org.eclipse.jetty.server.HttpChannel - >>> //localhost:51809/async >>> java.lang.IllegalStateException >>> at >>> org.eclipse.jetty.server.HttpOutput$AsyncICB.onCompleteSuccess(HttpOutput.java:990) >>> at >>> org.eclipse.jetty.server.HttpOutput$AsyncWrite.onCompleteSuccess(HttpOutput.java:1126) >>> at >>> org.eclipse.jetty.util.IteratingCallback.processing(IteratingCallback.java:325) >>> at >>> org.eclipse.jetty.util.IteratingCallback.succeeded(IteratingCallback.java:365) >>> at >>> org.eclipse.jetty.server.HttpConnection$SendCallback.onCompleteSuccess(HttpConnection.java:747) >>> at >>> org.eclipse.jetty.util.IteratingCallback.processing(IteratingCallback.java:325) >>> at >>> org.eclipse.jetty.util.IteratingCallback.succeeded(IteratingCallback.java:365) >>> at >>> org.eclipse.jetty.io.WriteFlusher$PendingState.complete(WriteFlusher.java:269) >>> at org.eclipse.jetty.io.WriteFlusher.completeWrite(WriteFlusher.java:394) >>> at >>> org.eclipse.jetty.io.SelectChannelEndPoint$3.run(SelectChannelEndPoint.java:89) >>> at >>> org.eclipse.jetty.util.thread.strategy.ExecuteProduceConsume.produceAndRun(ExecuteProduceConsume.java:213) >>> at >>> org.eclipse.jetty.util.thread.strategy.ExecuteProduceConsume.run(ExecuteProduceConsume.java:147) >>> at >>> org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:654) >>> at >>> org.eclipse.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:572) >>> at java.lang.Thread.run(Thread.java:745) >>> [qtp990355670-39] WARN org.eclipse.jetty.util.thread.QueuedThreadPool - >>> Unexpected thread death: >>> org.eclipse.jetty.util.thread.QueuedThreadPool$3@26b0a9dd in >>> qtp990355670{STARTED,8<=20<=200,i=9,q=0} >>> java.lang.NullPointerException >>> at >>> playground.webserver.AsyncIoTest$DataWriteListener.onError(AsyncIoTest.scala:22) >>> at org.eclipse.jetty.server.HttpOutput.close(HttpOutput.java:201) >>> at org.eclipse.jetty.server.Response.closeOutput(Response.java:987) >>> at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:412) >>> at org.eclipse.jetty.server.HttpChannel.run(HttpChannel.java:262) >>> at >>> org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:654) >>> at >>> org.eclipse.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:572) >>> at java.lang.Thread.run(Thread.java:745) >>> ...... >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> _______________________________________________ >>> jetty-users mailing list >>> [email protected] >>> To change your delivery options, retrieve your password, or unsubscribe >>> from this list, visit >>> https://dev.eclipse.org/mailman/listinfo/jetty-users >>> >> >> >> _______________________________________________ >> jetty-users mailing list >> [email protected] >> To change your delivery options, retrieve your password, or unsubscribe >> from this list, visit >> https://dev.eclipse.org/mailman/listinfo/jetty-users >> > > > _______________________________________________ > jetty-users mailing list > [email protected] > To change your delivery options, retrieve your password, or unsubscribe > from this list, visit > https://dev.eclipse.org/mailman/listinfo/jetty-users >
_______________________________________________ jetty-users mailing list [email protected] To change your delivery options, retrieve your password, or unsubscribe from this list, visit https://dev.eclipse.org/mailman/listinfo/jetty-users
