Sebastiaan, Yes you're right. I forgot to add something like isClosed() to your SynchronizedOutputStream so I can check if it's closed before writing.
Anyways, I found that after a certain limit (for ex. 6000 -> 8000 users) the system starts to behave incorrectly. Some clients never get an answer from the server, some clients only receive partial data, and some of them really end. I guess I found the limit in the box, but attached are the source code files if someone wants to see the sample or simply spot something in it. Cheers, Martin On 5/2/07, Sebastiaan van Erk <[EMAIL PROTECTED]> wrote:
Praveen Balaji wrote: > The > last few days I have been evaluating using CometProcessor to work like an > Async Servlet for me. I pick up the CometEvent object on BEGIN event and > process the whole request asynchronously. When I am done, I close the I/O stream. > > > I > would like to know what the Tomcat developers and the developers in general > think about this approach. Is it a misuse of the API to process requests > asynchronously, outside of the event method? Are there any issues that I should be cautious about? Synchronization issues? > > > See the mailing list archive of the last month and filter on "comet". There have been quite a few posts on exactly this subject. There are multiple issues: 1) Closing the I/O stream (on the response) does not end the request/response. It takes another 30-60 seconds before the event method gets called with an END event leaving a lot of request open unnecessarily. 2) Nothing is synchronized in Comet, so you have to do all synchronization yourself. You have to make sure that you do not write to the output stream after the event is closed (even though you may still have the reference in your asynchronous application code). I seem to have eliminated most issues by synchronizing access to the response output stream and using the same lock around the event.close() method in the event() method of the CometProcessor. However I still very sporadically get a ClientAbortException/ClosedChannelException which suggests that I've missed a place where synchronization is necessary. It might not be possible to synchronize this though, because it could happen deep inside Tomcat. The API has no information about what to synchronize for asynchronous use. 3) If you rely on the POST method to send parameters to your CometProcessor, then you have to parse the parameters yourself; there is an issue that doing getParameter() on the request in the BEGIN event may return null because the request body has not yet been received by the server at the moment of the getParameter() call. One of the Comet developers (Remy) suggests that it might be better to just use normal servlets and buy some extra memory for the extra threads. As far as the current API goes, I would say it is probably a "misuse", though I would very much like to see the API improved so that this is no longer the case. Regards, Sebastiaan > Thanks, > Praveen Balaji. > > > > > > > > __________________________________________________ > Do You Yahoo!? > Tired of spam? Yahoo! Mail has the best spam protection around > http://mail.yahoo.com > --------------------------------------------------------------------- To start a new topic, e-mail: users@tomcat.apache.org To unsubscribe, e-mail: [EMAIL PROTECTED] For additional commands, e-mail: [EMAIL PROTECTED]
package com.cognotec.streaming; import java.io.IOException; import java.io.InputStream; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import javax.servlet.ServletException; import javax.servlet.http.HttpServlet; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; import org.apache.catalina.CometEvent; import org.apache.catalina.CometProcessor; import org.apache.log4j.Logger; import com.cognotec.streaming.jmssim.RateGenerator; import com.cognotec.streaming.jmssim.RateGeneratorFactory; import com.cognotec.streaming.jmssim.RateListener; public class StreamingCometTestServlet extends HttpServlet implements CometProcessor, RateListener { private static final long serialVersionUID = 7520606671931567655L; static Logger logger = Logger.getLogger(StreamingCometTestServlet.class); //private List<HttpServletResponse> connections = //Collections.synchronizedList(new ArrayList<HttpServletResponse>()); private ArrayList<CometRateSender> cometRateSenders = new ArrayList<CometRateSender>(75); private HashMap<CometRateSender, List<SynchronizedOutputStream>> connections = new HashMap<CometRateSender, List<SynchronizedOutputStream>>(); private int roundrobin = 0; Object roundlock = new Object(); @Override public void init() throws ServletException { super.init(); for (int i=0;i<75;i++) { CometRateSender cometRateSender = new CometRateSender(this,i); Thread messageSenderThread = new Thread(cometRateSender,"MessageSender[" + getServletInfo() + "]"); messageSenderThread.setDaemon(true); messageSenderThread.start(); cometRateSenders.add(cometRateSender); connections.put(cometRateSender, new ArrayList<SynchronizedOutputStream>()); } RateGenerator generator = RateGeneratorFactory.getRateGenerator(); generator.addRateListener(this); } public void rateAvailable(Rate rate) { for (CometRateSender sender: cometRateSenders) { sender.send(rate); } } public void event(CometEvent ce) throws IOException, ServletException { HttpServletRequest request = ce.getHttpServletRequest(); HttpServletResponse response = ce.getHttpServletResponse(); if (ce.getEventType() == CometEvent.EventType.BEGIN) { //ce.setTimeout(30*1000); if (logger.isDebugEnabled()) { logger.debug("Beginning comet session with " + request.getSession(true).getId()); } //synchronized (connections) { synchronized(roundlock) { CometRateSender sender = cometRateSenders.get(roundrobin); if (logger.isDebugEnabled()) { logger.debug("Assigning session " + request.getSession().getId() + " to Comet Rate Sender " + sender.getId()); } List<SynchronizedOutputStream> connections = this.connections.get(sender); connections.add(new SynchronizedOutputStream(response.getOutputStream())); roundrobin++; if (roundrobin == cometRateSenders.size()) { roundrobin = 0; } } //} } else if (ce.getEventType() == CometEvent.EventType.ERROR) { if (logger.isDebugEnabled()) { logger.debug("Error for session " + request.getSession(true).getId()); } //synchronized (connections) { removeResponse(ce, response); //} } else if (ce.getEventType() == CometEvent.EventType.END) { if (logger.isDebugEnabled()) { logger.debug("End for session " + request.getSession(true).getId()); } removeResponse(ce, response); } else if (ce.getEventType() == CometEvent.EventType.READ) { InputStream is = request.getInputStream(); if (logger.isDebugEnabled()) { logger.debug("Received read event. Available bytes: " + is.available()); } byte[] buf = new byte[512]; do { int n = is.read(buf); if (n > 0) { if (logger.isDebugEnabled()) { logger.debug("Read " + n + " bytes: " + new String(buf, 0, n) + " for session: " + request.getSession(true).getId()); } } else if (n < 0) { if (logger.isDebugEnabled()) { logger.debug("Received " + n + " bytes. Closing connection."); } removeResponse(ce, response); return; } } while (is.available() > 0); } else { if (logger.isDebugEnabled()) { logger.debug("Received unknown event:" + ce.getEventType()); } } } private void removeResponse(CometEvent ce, HttpServletResponse response) throws IOException { for(CometRateSender sender: cometRateSenders) { List<SynchronizedOutputStream> connections = this.connections.get(sender); SynchronizedOutputStream foundStream = null; for(SynchronizedOutputStream stream: connections) { if (stream.out.equals(response.getOutputStream())) { foundStream = null; synchronized(stream) { ce.close(); stream.close(); } break; } } if (foundStream != null) { connections.remove(foundStream); break; } } } @Override public void destroy() { connections.clear(); } public List<SynchronizedOutputStream> getConnections(CometRateSender sender) { List<SynchronizedOutputStream> connections = this.connections.get(sender); if (connections != null) { return new ArrayList<SynchronizedOutputStream>(connections); } else { return new ArrayList<SynchronizedOutputStream>(); } } public void removeConnection(CometRateSender sender, SynchronizedOutputStream stream) { List<SynchronizedOutputStream> connections = this.connections.get(sender); connections.remove(stream); } }
package com.cognotec.streaming; import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; import org.apache.commons.lang.SerializationUtils; import org.apache.log4j.Logger; public class CometRateSender implements Runnable { static Logger logger = Logger.getLogger(CometRateSender.class); private StreamingCometTestServlet servlet; protected ArrayList<Rate> rates = new ArrayList<Rate>(); private HashMap<String, Integer> ratesWrited = new HashMap<String, Integer>(); private HashMap<String, Integer> ratesPending = new HashMap<String, Integer>(); private boolean running; private long statsLastDumped =-1; private int id; private long DUMP_STATS_INTERVAL = 5000; public CometRateSender(StreamingCometTestServlet servlet, int id) { this.servlet = servlet; this.id = id; ratesWrited.clear(); } public void stop() { running = false; } public void send(Rate rate) { synchronized (rates) { rates.add(rate); rateAdded(rate); rates.notify(); } } public void run() { running = true; while (running) { if (rates.size() == 0) { try { synchronized (rates) { rates.wait(); } } catch (InterruptedException e) { // Ignore } } Rate[] pendingRates = null; synchronized (rates) { pendingRates = rates.toArray(new Rate[0]); rates.clear(); ratesPending.clear(); } // Send any pending message on all the open connections List<SynchronizedOutputStream> connections = servlet.getConnections(this); if (connections.size() > 0) { if (logger.isDebugEnabled()) { logger.debug("Comet sender " + id + ". Sending message to " + connections.size() + " opened connections"); } Iterator<SynchronizedOutputStream> it = connections.iterator(); while (it.hasNext()) { try { SynchronizedOutputStream stream = it.next(); if (!stream.isClosed()) { for (int j = 0; j < pendingRates.length; j++) { byte[] buffer = SerializationUtils.serialize(pendingRates[j]); stream.write(buffer); rateSent(pendingRates[j]); } } else { servlet.removeConnection(this, stream); } } catch (IOException e) { logger.error("Comet sender " + id + ".Exception sending message to client. Probably a client has closed the stream."); } } dumpStats(); } connections.clear(); try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } } } public void dumpStats() { long time = System.currentTimeMillis(); if ((statsLastDumped != -1) && (time - statsLastDumped < DUMP_STATS_INTERVAL)) { return; } StringBuffer stats = new StringBuffer(); stats.append("Rate streaming comet server " + id + " statistics\n"); stats.append("--------------------------------\n\n"); Iterator<Map.Entry<String, Integer>> it = ratesWrited.entrySet().iterator(); while (it.hasNext()) { Map.Entry<String,Integer> entry = it.next(); double throughput = 1.0D * entry.getValue() / ((time - statsLastDumped)/1000); Integer ipending = ratesPending.get(entry.getKey()); int pending = ipending == null ? 0 : ipending.intValue(); stats.append(entry.getValue()); stats.append(" rates has been sent for the currency pair "); stats.append(entry.getKey()); stats.append(". This makes a throughput of: "); stats.append(throughput); stats.append(" rates/sec."); stats.append(pending); stats.append(" rates pending.\n"); } logger.info(stats); statsLastDumped = time; ratesWrited.clear(); } public void rateAdded(Rate rate) { // Add one to the sent rates String rateId = rate.getId(); Integer rates = ratesPending.get(rateId); if (rates == null) { rates = new Integer(0); } ratesPending.put(rateId,new Integer(rates.intValue()+1)); } public void rateSent(Rate rate) { // Add one to the sent rates String rateId = rate.getId(); Integer rates = ratesWrited.get(rateId); if (rates == null) { rates = new Integer(0); } ratesWrited.put(rateId,new Integer(rates.intValue()+1)); } public int getId() { return id; } }
package com.cognotec.streaming; import java.io.IOException; import java.io.OutputStream; /** * This output stream simply wraps another output stream and synchronizes access * to the underlying output stream. The synchronization is done on the instance * of this class. * * @author sebster */ public class SynchronizedOutputStream extends OutputStream { final OutputStream out; boolean closed; public SynchronizedOutputStream(final OutputStream out) { if (out == null) { throw new NullPointerException("out"); //$NON-NLS-1$ } this.out = out; } public synchronized void write(final byte[] b) throws IOException { out.write(b); } public synchronized void write(final byte[] b, final int off, final int len) throws IOException { out.write(b, off, len); } public synchronized void write(final int b) throws IOException { out.write(b); } public synchronized void flush() throws IOException { out.flush(); } public synchronized void close() throws IOException { closed=true; out.close(); } public synchronized boolean isClosed() { return closed; } }
--------------------------------------------------------------------- To start a new topic, e-mail: users@tomcat.apache.org To unsubscribe, e-mail: [EMAIL PROTECTED] For additional commands, e-mail: [EMAIL PROTECTED]