Sebastiaan, Yes you're right. I forgot to add something like isClosed() to your SynchronizedOutputStream so I can check if it's closed before writing.
Anyways, I found that after a certain limit (for ex. 6000 -> 8000 users) the system starts to behave incorrectly. Some clients never get an answer from the server, some clients only receive partial data, and some of them really end. I guess I found the limit in the box, but attached are the source code files if someone wants to see the sample or simply spot something in it. Cheers, Martin On 5/1/07, Sebastiaan van Erk <[EMAIL PROTECTED]> wrote:
Hi, Did you synchronize the event.close() with same lock you use to write to the output stream (the SynchronizedOuputStream instance)? I saw exactly the same stack trace you are getting due to (incorrect) concurrent access to the CoyoteOutputStream. The NullPointerException suggest that the request has already been cleaned up/recycled and yet you still write to it. Make sure that when you close the event, either the write becomes a null operation, or better, does not get called. It is a bug to call write on the response output stream after the event has been closed. With the SynchronizedOutputStream construction I am relying on Tomcat not to close or call any methods on the CoyoteOutputStream outside of the CometProcessor's event() method. I don't know if this is correct (it has worked for me in practice so far, and I've been doing some heavy duty testing); however I do not know what else I could synchronize on. If anybody knows a better approach, I would like to know. :-) Regards, Sebastiaan Martin Perez wrote: > Hi, > > Thanks for the tip Sebastian, but it doesn't work for me. I added your > class, wrapped all the output streams and synchronized close events > but the > result is the same. > > Exception in thread "MessageSender[]" java.lang.NullPointerException > at org.apache.coyote.http11.InternalNioOutputBuffer.addToBB( > InternalNioOutputBuffer.java:607) > at org.apache.coyote.http11.InternalNioOutputBuffer.commit( > InternalNioOutputBuffer.java:600) > at org.apache.coyote.http11.Http11NioProcessor.action( > Http11NioProcessor.java:1032) > at org.apache.coyote.Response.action(Response.java:181) > at org.apache.coyote.http11.InternalNioOutputBuffer.doWrite( > InternalNioOutputBuffer.java:571) > at org.apache.coyote.Response.doWrite(Response.java:560) > at org.apache.catalina.connector.OutputBuffer.realWriteBytes( > OutputBuffer.java:353) > at org.apache.tomcat.util.buf.ByteChunk.flushBuffer( ByteChunk.java > :434) > at org.apache.tomcat.util.buf.ByteChunk.append(ByteChunk.java :349) > at org.apache.catalina.connector.OutputBuffer.writeBytes( > OutputBuffer.java:381) > at org.apache.catalina.connector.OutputBuffer.write( > OutputBuffer.java:370) > at org.apache.catalina.connector.CoyoteOutputStream.write( > CoyoteOutputStream.java:89) > at org.apache.catalina.connector.CoyoteOutputStream.write( > CoyoteOutputStream.java:83) > at com.cognotec.streaming.SynchronizedOutputStream.write( > SynchronizedOutputStream.java:25) > at com.cognotec.streaming.CometRateSender.run( CometRateSender.java > :86) > at java.lang.Thread.run(Thread.java:595) > > (As I said, I also synchronized all the close method calls either in > stream > either in comet event) > > So, the question is: Is this a bug or simply I have reached the > scalability > limit for my box? > > Cheers, > Martin > > On 4/30/07, Sebastiaan van Erk <[EMAIL PROTECTED]> wrote: >> >> Hi, >> >> I had the same problems. It was a synchronization issue with me. What I >> did was make a synchronized output stream wrapper (see attached file) >> which wraps all access to the output stream, and I synchronized on this >> output stream whenever I called event.close() on the Comet event >> (especially in the READ event). When doing serveral IO operations on the >> output stream which should not be interrupted, you can simply >> synchronize on it (since SynchronizedOutputStream locks on itself). >> >> Regards, >> Sebastiaan >> >> >> Martin Perez wrote: >> > Hi all, >> > >> > I have been testing Tomcat comet support during the last days. I've >> > created >> > a test application that simulates data streaming to multiple clients. >> > Clients open a connection to the server and the connection is hold >> > open. In >> > the server there are several "sender" threads that leverage comet >> > support to >> > periodically send stream updates to the subscribed clients. Once a >> client >> > receives a specified amount of data the communication is closed. >> > >> > So, I'm running the test with tomcat svn version from 1 week ago. I >> have >> > configured 150 tomcat threads, plus 25 acceptor threads. Tomcat >> runs in >> a >> > Sun-Fire-V240 box with 2 CPU cores. I have 75 sender threads that >> > periodically sleep (100ms) to try to reduce the CPU usage. Once a >> sender >> > thread wakes up, it will send all the data to the assigned clients. >> > >> > So basically the story is that the box is able to manage about 3500 >> > persistent comet connections smoothly. But when I raise the level to >> 4000 >> > (80% CPU usage, 1.2G mem.) I start to get these exceptions for some >> (not >> > all) connections: >> > >> > Exception in thread "MessageSender[]" java.lang.NullPointerException >> > at org.apache.coyote.http11.InternalNioOutputBuffer.addToBB( >> > InternalNioOutputBuffer.java:607) >> > at org.apache.coyote.http11.InternalNioOutputBuffer.commit( >> > InternalNioOutputBuffer.java:600) >> > at org.apache.coyote.http11.Http11NioProcessor.action( >> > Http11NioProcessor.java:1032) >> > at org.apache.coyote.Response.action(Response.java:181) >> > at org.apache.coyote.http11.InternalNioOutputBuffer.doWrite( >> > InternalNioOutputBuffer.java:571) >> > at org.apache.coyote.Response.doWrite(Response.java:560) >> > at org.apache.catalina.connector.OutputBuffer.realWriteBytes( >> > OutputBuffer.java:353) >> > at org.apache.tomcat.util.buf.ByteChunk.flushBuffer( >> ByteChunk.java >> > :434) >> > at org.apache.tomcat.util.buf.ByteChunk.append(ByteChunk.java >> :349) >> > at org.apache.catalina.connector.OutputBuffer.writeBytes( >> > OutputBuffer.java:381) >> > at org.apache.catalina.connector.OutputBuffer.write( >> > OutputBuffer.java:370) >> > at org.apache.catalina.connector.CoyoteOutputStream.write( >> > CoyoteOutputStream.java:89) >> > at org.apache.catalina.connector.CoyoteOutputStream.write( >> > CoyoteOutputStream.java:83) >> > at com.cognotec.streaming.CometRateSender.run( >> CometRateSender.java >> > :86) >> > at java.lang.Thread.run(Thread.java:595) >> > >> > So well, does anybody knows what can be the problem? Any suggestions? >> > It can >> > be also my problem, so I will be happy to provide more information. >> > >> > Cheers, >> > Martin >> > >> >> --------------------------------------------------------------------- >> To start a new topic, e-mail: users@tomcat.apache.org >> To unsubscribe, e-mail: [EMAIL PROTECTED] >> For additional commands, e-mail: [EMAIL PROTECTED] >> >> > --------------------------------------------------------------------- To start a new topic, e-mail: users@tomcat.apache.org To unsubscribe, e-mail: [EMAIL PROTECTED] For additional commands, e-mail: [EMAIL PROTECTED]
package com.cognotec.streaming; import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; import org.apache.commons.lang.SerializationUtils; import org.apache.log4j.Logger; public class CometRateSender implements Runnable { static Logger logger = Logger.getLogger(CometRateSender.class); private StreamingCometTestServlet servlet; protected ArrayList<Rate> rates = new ArrayList<Rate>(); private HashMap<String, Integer> ratesWrited = new HashMap<String, Integer>(); private HashMap<String, Integer> ratesPending = new HashMap<String, Integer>(); private boolean running; private long statsLastDumped =-1; private int id; private long DUMP_STATS_INTERVAL = 5000; public CometRateSender(StreamingCometTestServlet servlet, int id) { this.servlet = servlet; this.id = id; ratesWrited.clear(); } public void stop() { running = false; } public void send(Rate rate) { synchronized (rates) { rates.add(rate); rateAdded(rate); rates.notify(); } } public void run() { running = true; while (running) { if (rates.size() == 0) { try { synchronized (rates) { rates.wait(); } } catch (InterruptedException e) { // Ignore } } Rate[] pendingRates = null; synchronized (rates) { pendingRates = rates.toArray(new Rate[0]); rates.clear(); ratesPending.clear(); } // Send any pending message on all the open connections List<SynchronizedOutputStream> connections = servlet.getConnections(this); if (connections.size() > 0) { if (logger.isDebugEnabled()) { logger.debug("Comet sender " + id + ". Sending message to " + connections.size() + " opened connections"); } Iterator<SynchronizedOutputStream> it = connections.iterator(); while (it.hasNext()) { try { SynchronizedOutputStream stream = it.next(); if (!stream.isClosed()) { for (int j = 0; j < pendingRates.length; j++) { byte[] buffer = SerializationUtils.serialize(pendingRates[j]); stream.write(buffer); rateSent(pendingRates[j]); } } else { servlet.removeConnection(this, stream); } } catch (IOException e) { logger.error("Comet sender " + id + ".Exception sending message to client. Probably a client has closed the stream."); } } dumpStats(); } connections.clear(); try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } } } public void dumpStats() { long time = System.currentTimeMillis(); if ((statsLastDumped != -1) && (time - statsLastDumped < DUMP_STATS_INTERVAL)) { return; } StringBuffer stats = new StringBuffer(); stats.append("Rate streaming comet server " + id + " statistics\n"); stats.append("--------------------------------\n\n"); Iterator<Map.Entry<String, Integer>> it = ratesWrited.entrySet().iterator(); while (it.hasNext()) { Map.Entry<String,Integer> entry = it.next(); double throughput = 1.0D * entry.getValue() / ((time - statsLastDumped)/1000); Integer ipending = ratesPending.get(entry.getKey()); int pending = ipending == null ? 0 : ipending.intValue(); stats.append(entry.getValue()); stats.append(" rates has been sent for the currency pair "); stats.append(entry.getKey()); stats.append(". This makes a throughput of: "); stats.append(throughput); stats.append(" rates/sec."); stats.append(pending); stats.append(" rates pending.\n"); } logger.info(stats); statsLastDumped = time; ratesWrited.clear(); } public void rateAdded(Rate rate) { // Add one to the sent rates String rateId = rate.getId(); Integer rates = ratesPending.get(rateId); if (rates == null) { rates = new Integer(0); } ratesPending.put(rateId,new Integer(rates.intValue()+1)); } public void rateSent(Rate rate) { // Add one to the sent rates String rateId = rate.getId(); Integer rates = ratesWrited.get(rateId); if (rates == null) { rates = new Integer(0); } ratesWrited.put(rateId,new Integer(rates.intValue()+1)); } public int getId() { return id; } }
package com.cognotec.streaming; import java.io.IOException; import java.io.OutputStream; /** * This output stream simply wraps another output stream and synchronizes access * to the underlying output stream. The synchronization is done on the instance * of this class. * * @author sebster */ public class SynchronizedOutputStream extends OutputStream { final OutputStream out; boolean closed; public SynchronizedOutputStream(final OutputStream out) { if (out == null) { throw new NullPointerException("out"); //$NON-NLS-1$ } this.out = out; } public synchronized void write(final byte[] b) throws IOException { out.write(b); } public synchronized void write(final byte[] b, final int off, final int len) throws IOException { out.write(b, off, len); } public synchronized void write(final int b) throws IOException { out.write(b); } public synchronized void flush() throws IOException { out.flush(); } public synchronized void close() throws IOException { closed=true; out.close(); } public synchronized boolean isClosed() { return closed; } }
package com.cognotec.streaming; import java.io.IOException; import java.io.InputStream; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import javax.servlet.ServletException; import javax.servlet.http.HttpServlet; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; import org.apache.catalina.CometEvent; import org.apache.catalina.CometProcessor; import org.apache.log4j.Logger; import com.cognotec.streaming.jmssim.RateGenerator; import com.cognotec.streaming.jmssim.RateGeneratorFactory; import com.cognotec.streaming.jmssim.RateListener; public class StreamingCometTestServlet extends HttpServlet implements CometProcessor, RateListener { private static final long serialVersionUID = 7520606671931567655L; static Logger logger = Logger.getLogger(StreamingCometTestServlet.class); //private List<HttpServletResponse> connections = //Collections.synchronizedList(new ArrayList<HttpServletResponse>()); private ArrayList<CometRateSender> cometRateSenders = new ArrayList<CometRateSender>(75); private HashMap<CometRateSender, List<SynchronizedOutputStream>> connections = new HashMap<CometRateSender, List<SynchronizedOutputStream>>(); private int roundrobin = 0; Object roundlock = new Object(); @Override public void init() throws ServletException { super.init(); for (int i=0;i<75;i++) { CometRateSender cometRateSender = new CometRateSender(this,i); Thread messageSenderThread = new Thread(cometRateSender,"MessageSender[" + getServletInfo() + "]"); messageSenderThread.setDaemon(true); messageSenderThread.start(); cometRateSenders.add(cometRateSender); connections.put(cometRateSender, new ArrayList<SynchronizedOutputStream>()); } RateGenerator generator = RateGeneratorFactory.getRateGenerator(); generator.addRateListener(this); } public void rateAvailable(Rate rate) { for (CometRateSender sender: cometRateSenders) { sender.send(rate); } } public void event(CometEvent ce) throws IOException, ServletException { HttpServletRequest request = ce.getHttpServletRequest(); HttpServletResponse response = ce.getHttpServletResponse(); if (ce.getEventType() == CometEvent.EventType.BEGIN) { //ce.setTimeout(30*1000); if (logger.isDebugEnabled()) { logger.debug("Beginning comet session with " + request.getSession(true).getId()); } //synchronized (connections) { synchronized(roundlock) { CometRateSender sender = cometRateSenders.get(roundrobin); if (logger.isDebugEnabled()) { logger.debug("Assigning session " + request.getSession().getId() + " to Comet Rate Sender " + sender.getId()); } List<SynchronizedOutputStream> connections = this.connections.get(sender); connections.add(new SynchronizedOutputStream(response.getOutputStream())); roundrobin++; if (roundrobin == cometRateSenders.size()) { roundrobin = 0; } } //} } else if (ce.getEventType() == CometEvent.EventType.ERROR) { if (logger.isDebugEnabled()) { logger.debug("Error for session " + request.getSession(true).getId()); } //synchronized (connections) { removeResponse(ce, response); //} } else if (ce.getEventType() == CometEvent.EventType.END) { if (logger.isDebugEnabled()) { logger.debug("End for session " + request.getSession(true).getId()); } removeResponse(ce, response); } else if (ce.getEventType() == CometEvent.EventType.READ) { InputStream is = request.getInputStream(); if (logger.isDebugEnabled()) { logger.debug("Received read event. Available bytes: " + is.available()); } byte[] buf = new byte[512]; do { int n = is.read(buf); if (n > 0) { if (logger.isDebugEnabled()) { logger.debug("Read " + n + " bytes: " + new String(buf, 0, n) + " for session: " + request.getSession(true).getId()); } } else if (n < 0) { if (logger.isDebugEnabled()) { logger.debug("Received " + n + " bytes. Closing connection."); } removeResponse(ce, response); return; } } while (is.available() > 0); } else { if (logger.isDebugEnabled()) { logger.debug("Received unknown event:" + ce.getEventType()); } } } private void removeResponse(CometEvent ce, HttpServletResponse response) throws IOException { for(CometRateSender sender: cometRateSenders) { List<SynchronizedOutputStream> connections = this.connections.get(sender); SynchronizedOutputStream foundStream = null; for(SynchronizedOutputStream stream: connections) { if (stream.out.equals(response.getOutputStream())) { foundStream = null; synchronized(stream) { ce.close(); stream.close(); } break; } } if (foundStream != null) { connections.remove(foundStream); break; } } } @Override public void destroy() { connections.clear(); } public List<SynchronizedOutputStream> getConnections(CometRateSender sender) { List<SynchronizedOutputStream> connections = this.connections.get(sender); if (connections != null) { return new ArrayList<SynchronizedOutputStream>(connections); } else { return new ArrayList<SynchronizedOutputStream>(); } } public void removeConnection(CometRateSender sender, SynchronizedOutputStream stream) { List<SynchronizedOutputStream> connections = this.connections.get(sender); connections.remove(stream); } }
--------------------------------------------------------------------- To start a new topic, e-mail: users@tomcat.apache.org To unsubscribe, e-mail: [EMAIL PROTECTED] For additional commands, e-mail: [EMAIL PROTECTED]