Author: fhanik Date: Wed Jun 13 15:51:56 2007 New Revision: 547055 URL: http://svn.apache.org/viewvc?view=rev&rev=547055 Log: added simple example code snippets to comet usage
Modified: tomcat/trunk/webapps/docs/aio.xml Modified: tomcat/trunk/webapps/docs/aio.xml URL: http://svn.apache.org/viewvc/tomcat/trunk/webapps/docs/aio.xml?view=diff&rev=547055&r1=547054&r2=547055 ============================================================================== --- tomcat/trunk/webapps/docs/aio.xml (original) +++ tomcat/trunk/webapps/docs/aio.xml Wed Jun 13 15:51:56 2007 @@ -151,6 +151,26 @@ </source> </p> </subsection> + <subsection name="Comet Operations"> + <p> + The previous section touched a little bit on the comet connection lifecycle. + It is important to remember that comet events are based around IO on an actual socket.<br/> + To clarify the Comet API, it has been created to resemble the java.nio channel/selector APIs. + In the case of Comet, Tomcat is the selector and using the CometEvent object, you can + register and unregister your Comet event for different event type notifications. + We call the parameter to the <code>CometEvent.register/unregister</code> method a comet operation. + This is similar to the interestOps method of a <code>SelectionKey</code> in the java.nio implementation. + <br/>The Comet implementation of register and unregister has been greatly simplified to not force the + comet developer to implement complex synchronizations around the register and unregister code. + </p> + <p> + It is important to realize, just like the java.nio API, that once an operation has been registered, it will + remain registered until it is unregistered. If you have registered OP_READ, then the comet connection will + fire READ events, every time data arrives until your unregister the OP_READ operation.<br/> + OP_CALLBACK/OP_WRITE work in the same way, essentially, register(OP_CALLBACK|OP_WRITE) will keep spawning + CALLBACK/WRITE events until you unregister the operation(s). + </p> + </subsection> <subsection name="CometEvent"> @@ -261,8 +281,269 @@ </subsection> - <subsection name="Example code"> + <subsection name="Example code snippets"> + <p> + Imagine you are writing a servlet that is updating a set of + stock tickers. You have a back ground thread that is receiving + updates for tickers as they happen, and you wish to push out these + to all the tickers that have the stocks in their list.<br/> + In the following example, you can accomplish maximum through put by + taking advantage of Tomcat's non blocking Comet features.<br/> + When the StockUpdater thread is running, it receives a set of stock updates. + It gets all the clients that are registered for the stocks that have changed.<br/> + For each client, there is an associated CometEvent object, the StockUpdater + checks if it can write without blocking, if so it writes directly, otherwise + it registers the client for a WRITE event, that will tell the + system that we can write the data now. + This is a perfect example of how we can take advantage of the combination of the write event + and the isWriteable method to determine, when we can write the data.<br/> + As with any kind of non blocking IO, you will need to synchronize your code, + this has not been done in the example below since the focus is on the + data delivery, not synchronization. + </p> + <source> +public class ExampleCometStockStreamer implements CometProcessor { + ... + public class StockUpdater extends Thread { + public void run() { + ... + StockUpdates[] updates = fetchUpdates(); + Client[] clients = getClients(updates); + for (int i=0; i<clients.length; i++ ) { + CometEvent event = client.getEvent(); + StockUpdates[] clientList = getClientUpdates(client,updates); + client.setAndMergeNextUpdates(clientList); + if (event.isWriteable()) { + byte[] data = getUpdateChunk(client.getNextUpdates()); + event.getHttpServletResponse().getOutputStream().write(data); + } else { + event.register(OP_WRITE); + } + } + ... + } + } + ... + public void event(CometEvent event) throws IOException, ServletException { + ... + if ( event.getEventType() == CometEvent.EventType.BEGIN ) { + //configure non blocking + event.configureBlocking(false); + } if ( event.getEventType() == CometEvent.EventType.READ ) { + //read client Id and stock list from client + //and add the event to our list + String clientId = readClientInfo(event,stocks); + clients.add(clientId, event, stocks); + } if ( event.getEventType() == CometEvent.EventType.WRITE ) { + //unregister from the write event + event.unregister(OP_WRITE); + //we can now write + byte[] data = getUpdateChunk(client.getNextUpdates()); + event.getHttpServletResponse().getOutputStream().write(data); + } else if (...) { + ... + } + ... + } + +} + </source> + + <p> + The above stock ticker example is extremely powerful, + but also creates a great deal of complexity trying to + synchronize between the registration of interested events + between the threads. <br/> + Another option is to simplify this by using blocking IO. + That implementation would look like this.<br/> + Notice that writing data to the client is only done + upon an event and never asynchronously. Assuming that the data written is smaller + than the socket network buffer, this write is almost always guaranteed + to be written without a delay. Should the write be blocked, + the system is still concurrent, as the writing happens on a thread + from Tomcat's thread pool. <br/> + In this case, the only synchronization that needs to be done is in between + <code>client.getNextUpdates()</code> and <code>client.setAndMergeNextUpdates(clientList)</code>. + </p> + <source> +public class ExampleCometStockStreamer implements CometProcessor { + ... + public class StockUpdater extends Thread { + public void run() { + ... + StockUpdates[] updates = fetchUpdates(); + Client[] clients = getClients(updates); + for (int i=0; i<clients.length; i++ ) { + StockUpdates[] clientList = getClientUpdates(client,updates); + client.setAndMergeNextUpdates(clientList); + client.getEvent().register(OP_WRITE); + } + ... + } + } + ... + public void event(CometEvent event) throws IOException, ServletException { + ... + if ( event.getEventType() == CometEvent.EventType.BEGIN ) { + //configure blocking + event.configureBlocking(true); + } if ( event.getEventType() == CometEvent.EventType.READ ) { + //read client Id and stock list from client + //and add the event to our list + String clientId = readClientInfo(event,stocks); + clients.add(clientId, event, stocks); + } if ( event.getEventType() == CometEvent.EventType.WRITE ) { + Client client = clients.get(event); + //unregister from the write event + event.unregister(OP_WRITE); + //we can now write + byte[] data = getUpdateChunk(client.getNextUpdates()); + event.getHttpServletResponse().getOutputStream().write(data); + } else if (...) { + ... + } + ... + } +} + </source> + + <p> + Imagine that you wish to write a pseudo transactional system, + (please take the word transaction with a grain of salt), + and be able to do your own write scheduling.<br/> + In the next example we are going to demonstrate the ability to + use the isReadable() and isWriteable() methods in a poller sense, + and do all writes and reads asynchronously on a single thread.<br/> + Our goal here is to implement a comet servlet that reads a client request, + then writes a chunk of data when the request has been received. + The servlet will not write the next chunk until the first request has been read + and first chunk has been written to all clients. + The code below is far from optimal, but it demonstrates the ability + to not rely on any IO events, and schedule yourself when you wish to read + or write data. All operations are non blocking, so the AllWriterThread + will never block on any operation. <br/>In the example below, + we just do a busy spin cycle. + other + </p> + <source> +public class ExampleAllReadThenWriteComet implements CometProcessor { + ... + public class AllWriterThread extends Thread { + byte[] dataChunks = ...; + public void run() { + ... + for (int i=0; i<dataChunks.length; i++ ) { + for (int j=0; j<clients.size(); j++) { + boolean done = false; + while (!done) { + //first read the first request + //but only if our previous write was completed + if ( clients[j].getEvent().isWriteable() && clients[j].getEvent().isReadable() ) { + done = readClientData(clients[j]); //returns true if all data has been received for a request + } + } + done = false; + while (!done) { + //write the response + if ( clients[j].getEvent().isWriteable() { + clients[j].getEvent().getHttpServletResponse().write(dataChunks[i]); + done = true; + } + } + } + } + ... + } + } + ... + public void event(CometEvent event) throws IOException, ServletException { + ... + if ( event.getEventType() == CometEvent.EventType.BEGIN ) { + //configure non blocking + event.configureBlocking(false); + //disable all events + event.unregister(event.getRegisteredOps()); + //add the event to our client list + clients.add(event); + //start our writer if all clients have arrived + if (clients.size()==5) { + AllWriterThread thread = new AllWriterThread(); + thread.start(); + } + } if ( event.getEventType() == CometEvent.EventType.READ ) { + } if ( event.getEventType() == CometEvent.EventType.WRITE ) { + } else if (...) { + ... + } + ... + } + +} + </source> + + <p> + Ok, so the previous example was kind of silly, but we demonstrated that + you are able to read/write on a single thread, in a non blocking fashion. + Now we are going to achieve the exact same functionality, but not using + any asynchronous data, instead we are going to use + blocking IO and tomcat's worker threads + </p> + <source> +public class ExampleAllReadThenWriteComet implements CometProcessor { + ... + byte[] dataChunks = ...; + ... + public void event(CometEvent event) throws IOException, ServletException { + ... + if ( event.getEventType() == CometEvent.EventType.BEGIN ) { + //configure blocking + event.configureBlocking(true); + //disable all events + event.unregister(event.getRegisteredOps()); + //add the event to our client list + clients.add(event); + //if all our clients have arrived, register them for read. + if (atomicClientCounter.addAndGet(1)==5) { + atomicClientReadCounter.set(5); + for (Client c : clients) { + c.getEvent().register(OP_READ); + } + } + } if ( event.getEventType() == CometEvent.EventType.READ ) { + event.unregister(OP_READ); + Client client = clients.get(event); + readClientData(client); + if (atomicClientReadCounter.addAndGet(-1) == 0 ) { + //all clients have read + atomicClientWriteCounter.set(5); + for (Client c : clients) { + c.getEvent().register(OP_WRITE); + } + } + } if ( event.getEventType() == CometEvent.EventType.WRITE ) { + event.unregister(OP_WRITE); + Client client = clients.get(event); + writeNextChunk(client); + if (atomicClientWriteCounter.addAndGet(-1) == 0 ) { + //all clients have been written, start reading the next request + atomicClientReadCounter.set(5); + for (Client c : clients) { + c.getEvent().register(OP_READ); + } + } + } else if (...) { + ... + } + ... + } +} + </source> + + + </subsection> + + <subsection name="Example code"> <p> The following pseudo code servlet implments asynchronous chat functionality using the API described above: --------------------------------------------------------------------- To unsubscribe, e-mail: [EMAIL PROTECTED] For additional commands, e-mail: [EMAIL PROTECTED]