the WritePendingException smells like a bug in your handling of the
WriteListener.

Take a look at the various gotchas in the Async I/O slide deck.
http://www.slideshare.net/SimoneBordet/servlet-31-async-io

The section on "Servlet 3.1 Async I/O" lists 6 gotchas for WriteListener
alone.
Since you are using scala, I really can't help much beyond that.


Joakim Erdfelt / [email protected]

On Wed, Sep 9, 2015 at 3:34 PM, Xiaodong Wang <[email protected]> wrote:

> Hi Joakim,
>
> Thanks for your response, I changed the code as you suggested but it still
> does not work. *But it works in Tomcat.*
>
> This is the error I got.
> [qtp990355670-43] WARN org.eclipse.jetty.util.thread.QueuedThreadPool -
> java.lang.IllegalStateException
> at
> org.eclipse.jetty.server.HttpOutput$AsyncICB.onCompleteSuccess(HttpOutput.java:990)
> at
> org.eclipse.jetty.server.HttpOutput$AsyncWrite.onCompleteSuccess(HttpOutput.java:1126)
> at
> org.eclipse.jetty.util.IteratingCallback.processing(IteratingCallback.java:325)
> at
> org.eclipse.jetty.util.IteratingCallback.succeeded(IteratingCallback.java:365)
> at
> org.eclipse.jetty.server.HttpConnection$SendCallback.onCompleteSuccess(HttpConnection.java:747)
> at
> org.eclipse.jetty.util.IteratingCallback.processing(IteratingCallback.java:325)
> at
> org.eclipse.jetty.util.IteratingCallback.succeeded(IteratingCallback.java:365)
> at
> org.eclipse.jetty.io.WriteFlusher$PendingState.complete(WriteFlusher.java:269)
> at org.eclipse.jetty.io.WriteFlusher.completeWrite(WriteFlusher.java:394)
> at
> org.eclipse.jetty.io.SelectChannelEndPoint$3.run(SelectChannelEndPoint.java:89)
> at
> org.eclipse.jetty.util.thread.strategy.ExecuteProduceConsume.produceAndRun(ExecuteProduceConsume.java:213)
> at
> org.eclipse.jetty.util.thread.strategy.ExecuteProduceConsume.run(ExecuteProduceConsume.java:147)
> at
> org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:654)
> at
> org.eclipse.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:572)
> at java.lang.Thread.run(Thread.java:745)
> [qtp990355670-43] WARN org.eclipse.jetty.util.thread.QueuedThreadPool -
> Unexpected thread death:
> org.eclipse.jetty.util.thread.QueuedThreadPool$3@2b35b7fd in
> qtp990355670{STARTED,8<=21<=200,i=11,q=0}
> [qtp990355670-32] WARN org.eclipse.jetty.server.HttpChannel -
> //localhost:56447/async
> java.nio.channels.WritePendingException
> at
> org.eclipse.jetty.server.HttpConnection$SendCallback.reset(HttpConnection.java:622)
> at
> org.eclipse.jetty.server.HttpConnection$SendCallback.access$300(HttpConnection.java:584)
> at org.eclipse.jetty.server.HttpConnection.send(HttpConnection.java:510)
> at org.eclipse.jetty.server.HttpChannel.sendResponse(HttpChannel.java:650)
> at org.eclipse.jetty.server.HttpChannel.write(HttpChannel.java:699)
> at org.eclipse.jetty.server.HttpOutput.write(HttpOutput.java:177)
> at org.eclipse.jetty.server.HttpOutput.write(HttpOutput.java:163)
> at org.eclipse.jetty.server.HttpOutput.close(HttpOutput.java:211)
> at org.eclipse.jetty.server.Response.closeOutput(Response.java:987)
> at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:412)
> at org.eclipse.jetty.server.HttpChannel.run(HttpChannel.java:262)
> at
> org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:654)
> at
> org.eclipse.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:572)
> at java.lang.Thread.run(Thread.java:745)
>
> On Wed, Sep 9, 2015 at 11:07 AM, Joakim Erdfelt <[email protected]>
> wrote:
>
>> I think AsyncListener.onError() is a terminal state/call.
>> The AsyncContext is no longer valid once that is called.
>> The response is committed/closed as well.
>> Moving to the complete() state isn't even possible.
>> (Note: I could be wrong in this assumption)
>>
>> Some other advice, don't rely on context.getRequest and
>> context.getResponse to always be there and be available.
>> The lifecycle of an AsyncContext, and WriteListener can be longer than
>> the lifecycle of a Request/Response.
>> Its very possible for the Request/Response to be recycled / cleaned up
>> before the AsyncContext and WriteListener are done.
>>
>> Yank them out into variables and use them that way.
>> For example, your call to context.getRequest.getServletContext.log is
>> just asking for problems (especially in an onError)
>>
>>
>>
>> Joakim Erdfelt / [email protected]
>>
>> On Wed, Sep 9, 2015 at 10:54 AM, Xiaodong Wang <[email protected]>
>> wrote:
>>
>>> I try to use the asynchronous servlet with Jetty 9, but I
>>> get IllegalStateException from HttpOutput class. The same code works well
>>> with Tomcat.
>>>
>>> This is my test code (in Scala).
>>>
>>> TestServers.scala
>>> ------------------------------------------------------------------------------------------------------------------------
>>> package playground.webserver
>>>
>>> import java.io.File
>>> import javax.servlet.Servlet
>>>
>>> import org.apache.catalina.startup.Tomcat
>>> import org.eclipse.jetty.server.Server
>>> import org.eclipse.jetty.servlet.{ServletHolder, ServletContextHandler}
>>> ;
>>>
>>> trait TestServer {
>>>   val port: Int
>>>   def start: Unit
>>>   def stop: Unit
>>>   def addServlet(servlet: Servlet, contextPath: String)
>>> }
>>>
>>> class TomcatTestServer(val port: Int) extends TestServer {
>>>   private val tomcat = new Tomcat()
>>>   tomcat.setPort(port)
>>>
>>>   override def start: Unit = tomcat.start()
>>>
>>>   override def stop: Unit = tomcat.stop()
>>>
>>>   override def addServlet(servlet: Servlet, contextPath: String): Unit
>>> = {
>>>   val base = new File(System.getProperty("java.io.tmpdir"));
>>>   val context = tomcat.addContext(contextPath, base.getAbsolutePath());
>>>
>>>   tomcat.addServlet(contextPath, contextPath, servlet)
>>>   context.addServletMapping("/", contextPath)
>>>   }
>>> }
>>>
>>> class JettyTestServer(val port: Int) extends TestServer {
>>>   private val jetty = new Server(port)
>>>   private val context = new
>>> ServletContextHandler(ServletContextHandler.SESSIONS)
>>>   context.setContextPath("/")
>>>   jetty.setHandler(context)
>>>
>>>   override def start: Unit = jetty.start()
>>>
>>>   override def stop: Unit = jetty.stop()
>>>
>>>   override def addServlet(servlet: Servlet, contextPath: String): Unit
>>> = {
>>>   context.addServlet(new ServletHolder(servlet), contextPath)
>>>   }
>>> }
>>>
>>>
>>>
>>> AsyncIoTest.scala------------------------------------------------------------------------------------------------------------------------------------
>>> package playground.webserver
>>>
>>> import java.net.ServerSocket
>>> import javax.servlet.http.{HttpServlet, HttpServletRequest,
>>> HttpServletResponse}
>>> import javax.servlet.{AsyncContext, WriteListener}
>>>
>>> import org.apache.http.client.methods.HttpGet
>>> import org.apache.http.impl.client.HttpClients
>>> import org.scalatest.{BeforeAndAfterAll, FunSpec, Matchers}
>>>
>>> import scala.concurrent.ExecutionContext.Implicits.global
>>> import scala.concurrent.duration.Duration
>>> import scala.concurrent.{Await, Future}
>>>
>>> abstract class AsyncIoTest extends FunSpec with Matchers with
>>> BeforeAndAfterAll {
>>>   private val data = new Array[Byte](1024 * 1024 * 128)
>>>
>>>   *class DataWriteListener(context: AsyncContext) extends WriteListener
>>> {*
>>> *    private[this] var pos = 0*
>>>
>>> *    override def onError(t: Throwable): Unit = {*
>>> *      context.getRequest.getServletContext.log("Async Error", t)*
>>> *      context.complete()*
>>> *    }*
>>>
>>> *    override def onWritePossible(): Unit = {*
>>> *      val out = context.getResponse.getOutputStream*
>>> *      while (out.isReady && pos < data.length) {*
>>> *        val toWrite = math.min(1024, data.length - pos)*
>>> *        out.write(data, pos, toWrite)*
>>> *        pos += toWrite*
>>> *      }*
>>>
>>> *      if (pos >= data.length) {*
>>> *        context.complete()*
>>> *      }*
>>> *    }*
>>> *  }*
>>>
>>>   *class AsyncServlet extends HttpServlet {*
>>> *    override def doGet(req: HttpServletRequest, resp:
>>> HttpServletResponse): Unit = {*
>>> *      req.setAttribute("org.apache.catalina.ASYNC_SUPPORTED", true);*
>>>
>>> *      resp.setStatus(200)*
>>> *      resp.setContentLength(data.length)*
>>>
>>> *      val async = req.startAsync()*
>>> *      val out = resp.getOutputStream*
>>>
>>> *      out.setWriteListener(new DataWriteListener(async))*
>>> *    }*
>>> *  }*
>>>
>>>   class SyncServlet extends HttpServlet {
>>>     override def doGet(req: HttpServletRequest, resp:
>>> HttpServletResponse): Unit = {
>>>       resp.setStatus(200)
>>>       resp.setContentLength(data.length)
>>>       val out = resp.getOutputStream
>>>       var pos = 0
>>>       while(pos < data.length) {
>>>         out.write(data, pos, 1024)
>>>         pos += 1024
>>>       }
>>>     }
>>>   }
>>>
>>>   protected val server: TestServer
>>>   protected lazy val port: Int = findAvailablePort()
>>>   private val requestNum = 100
>>>
>>>   private def findAvailablePort(): Int = {
>>>     val serverSocket = new ServerSocket(0)
>>>     val port = serverSocket.getLocalPort()
>>>     serverSocket.close()
>>>
>>>     port
>>>   }
>>>
>>>   describe("Sync IO") {
>>>     it("trigger some traffic") {
>>>       testWithUrl(s"http://localhost:${port}/sync";)
>>>     }
>>>   }
>>>
>>>   describe("Async IO") {
>>>     it("trigger some traffic") {
>>>       testWithUrl(s"http://localhost:${port}/async";)
>>>     }
>>>   }
>>>
>>>   override protected def beforeAll(): Unit = {
>>>     server.addServlet(new AsyncServlet, "/async")
>>>     server.addServlet(new SyncServlet, "/sync")
>>>     server.start
>>>   }
>>>
>>>
>>>   override protected def afterAll(): Unit = {
>>>     server.stop
>>>   }
>>>
>>>   private def testWithUrl(url: String): Unit = {
>>>     val start = System.currentTimeMillis
>>>
>>>     val futures = (1 to requestNum).map { i =>
>>>       Future {
>>>         val total = readData(url)
>>>         total should be(1024 * 1024 * 128)
>>>         total
>>>       }
>>>     }
>>>
>>>     Await.result(Future.sequence(futures), Duration.Inf)
>>>
>>>     println(s"Total millis used: ${System.currentTimeMillis - start}")
>>>   }
>>>
>>>   private def readData(url: String): Long = {
>>>     val httpclient = HttpClients.createDefault()
>>>     val httpGet = new HttpGet(url)
>>>     val resp = httpclient.execute(httpGet)
>>>     val is = resp.getEntity.getContent
>>>     val buf = new Array[Byte](1024 * 1024)
>>>     var read = 0
>>>     var total = 0L
>>>     while(read != -1) {
>>>       total += read
>>>       read = is.read(buf)
>>>     }
>>>
>>>     resp.close()
>>>     total
>>>   }
>>> }
>>>
>>> class AsyncIoWithJettyTest extends AsyncIoTest {
>>>   override protected val server: TestServer = new JettyTestServer(port)
>>> }
>>>
>>> class AsyncIoWithTomcatTest extends AsyncIoTest {
>>>   override protected val server: TestServer = new TomcatTestServer(port)
>>> }
>>>
>>>
>>> This is the error I got.
>>> [qtp990355670-39] WARN org.eclipse.jetty.util.thread.QueuedThreadPool -
>>> [qtp990355670-32] WARN org.eclipse.jetty.server.HttpChannel -
>>> //localhost:51809/async
>>> java.lang.IllegalStateException
>>> at
>>> org.eclipse.jetty.server.HttpOutput$AsyncICB.onCompleteSuccess(HttpOutput.java:990)
>>> at
>>> org.eclipse.jetty.server.HttpOutput$AsyncWrite.onCompleteSuccess(HttpOutput.java:1126)
>>> at
>>> org.eclipse.jetty.util.IteratingCallback.processing(IteratingCallback.java:325)
>>> at
>>> org.eclipse.jetty.util.IteratingCallback.succeeded(IteratingCallback.java:365)
>>> at
>>> org.eclipse.jetty.server.HttpConnection$SendCallback.onCompleteSuccess(HttpConnection.java:747)
>>> at
>>> org.eclipse.jetty.util.IteratingCallback.processing(IteratingCallback.java:325)
>>> at
>>> org.eclipse.jetty.util.IteratingCallback.succeeded(IteratingCallback.java:365)
>>> at
>>> org.eclipse.jetty.io.WriteFlusher$PendingState.complete(WriteFlusher.java:269)
>>> at org.eclipse.jetty.io.WriteFlusher.completeWrite(WriteFlusher.java:394)
>>> at
>>> org.eclipse.jetty.io.SelectChannelEndPoint$3.run(SelectChannelEndPoint.java:89)
>>> at
>>> org.eclipse.jetty.util.thread.strategy.ExecuteProduceConsume.produceAndRun(ExecuteProduceConsume.java:213)
>>> at
>>> org.eclipse.jetty.util.thread.strategy.ExecuteProduceConsume.run(ExecuteProduceConsume.java:147)
>>> at
>>> org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:654)
>>> at
>>> org.eclipse.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:572)
>>> at java.lang.Thread.run(Thread.java:745)
>>> [qtp990355670-39] WARN org.eclipse.jetty.util.thread.QueuedThreadPool -
>>> Unexpected thread death:
>>> org.eclipse.jetty.util.thread.QueuedThreadPool$3@26b0a9dd in
>>> qtp990355670{STARTED,8<=20<=200,i=9,q=0}
>>> java.lang.NullPointerException
>>> at
>>> playground.webserver.AsyncIoTest$DataWriteListener.onError(AsyncIoTest.scala:22)
>>> at org.eclipse.jetty.server.HttpOutput.close(HttpOutput.java:201)
>>> at org.eclipse.jetty.server.Response.closeOutput(Response.java:987)
>>> at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:412)
>>> at org.eclipse.jetty.server.HttpChannel.run(HttpChannel.java:262)
>>> at
>>> org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:654)
>>> at
>>> org.eclipse.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:572)
>>> at java.lang.Thread.run(Thread.java:745)
>>> ......
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> _______________________________________________
>>> jetty-users mailing list
>>> [email protected]
>>> To change your delivery options, retrieve your password, or unsubscribe
>>> from this list, visit
>>> https://dev.eclipse.org/mailman/listinfo/jetty-users
>>>
>>
>>
>> _______________________________________________
>> jetty-users mailing list
>> [email protected]
>> To change your delivery options, retrieve your password, or unsubscribe
>> from this list, visit
>> https://dev.eclipse.org/mailman/listinfo/jetty-users
>>
>
>
> _______________________________________________
> jetty-users mailing list
> [email protected]
> To change your delivery options, retrieve your password, or unsubscribe
> from this list, visit
> https://dev.eclipse.org/mailman/listinfo/jetty-users
>
_______________________________________________
jetty-users mailing list
[email protected]
To change your delivery options, retrieve your password, or unsubscribe from 
this list, visit
https://dev.eclipse.org/mailman/listinfo/jetty-users

Reply via email to