Sebastiaan,
Yes you're right. I forgot to add something like isClosed() to your
SynchronizedOutputStream so I can check if it's closed before writing.
Anyways, I found that after a certain limit (for ex. 6000 -> 8000 users) the
system starts to behave incorrectly. Some clients never get an answer from
the server, some clients only receive partial data, and some of them really
end. I guess I found the limit in the box, but attached are the source code
files if someone wants to see the sample or simply spot something in it.
Cheers,
Martin
On 5/1/07, Sebastiaan van Erk <[EMAIL PROTECTED]> wrote:
Hi,
Did you synchronize the event.close() with same lock you use to write to
the output stream (the SynchronizedOuputStream instance)?
I saw exactly the same stack trace you are getting due to (incorrect)
concurrent access to the CoyoteOutputStream. The NullPointerException
suggest that the request has already been cleaned up/recycled and yet
you still write to it. Make sure that when you close the event, either
the write becomes a null operation, or better, does not get called. It
is a bug to call write on the response output stream after the event has
been closed.
With the SynchronizedOutputStream construction I am relying on Tomcat
not to close or call any methods on the CoyoteOutputStream outside of
the CometProcessor's event() method. I don't know if this is correct (it
has worked for me in practice so far, and I've been doing some heavy
duty testing); however I do not know what else I could synchronize on.
If anybody knows a better approach, I would like to know. :-)
Regards,
Sebastiaan
Martin Perez wrote:
> Hi,
>
> Thanks for the tip Sebastian, but it doesn't work for me. I added your
> class, wrapped all the output streams and synchronized close events
> but the
> result is the same.
>
> Exception in thread "MessageSender[]" java.lang.NullPointerException
> at org.apache.coyote.http11.InternalNioOutputBuffer.addToBB(
> InternalNioOutputBuffer.java:607)
> at org.apache.coyote.http11.InternalNioOutputBuffer.commit(
> InternalNioOutputBuffer.java:600)
> at org.apache.coyote.http11.Http11NioProcessor.action(
> Http11NioProcessor.java:1032)
> at org.apache.coyote.Response.action(Response.java:181)
> at org.apache.coyote.http11.InternalNioOutputBuffer.doWrite(
> InternalNioOutputBuffer.java:571)
> at org.apache.coyote.Response.doWrite(Response.java:560)
> at org.apache.catalina.connector.OutputBuffer.realWriteBytes(
> OutputBuffer.java:353)
> at org.apache.tomcat.util.buf.ByteChunk.flushBuffer(
ByteChunk.java
> :434)
> at org.apache.tomcat.util.buf.ByteChunk.append(ByteChunk.java
:349)
> at org.apache.catalina.connector.OutputBuffer.writeBytes(
> OutputBuffer.java:381)
> at org.apache.catalina.connector.OutputBuffer.write(
> OutputBuffer.java:370)
> at org.apache.catalina.connector.CoyoteOutputStream.write(
> CoyoteOutputStream.java:89)
> at org.apache.catalina.connector.CoyoteOutputStream.write(
> CoyoteOutputStream.java:83)
> at com.cognotec.streaming.SynchronizedOutputStream.write(
> SynchronizedOutputStream.java:25)
> at com.cognotec.streaming.CometRateSender.run(
CometRateSender.java
> :86)
> at java.lang.Thread.run(Thread.java:595)
>
> (As I said, I also synchronized all the close method calls either in
> stream
> either in comet event)
>
> So, the question is: Is this a bug or simply I have reached the
> scalability
> limit for my box?
>
> Cheers,
> Martin
>
> On 4/30/07, Sebastiaan van Erk <[EMAIL PROTECTED]> wrote:
>>
>> Hi,
>>
>> I had the same problems. It was a synchronization issue with me. What I
>> did was make a synchronized output stream wrapper (see attached file)
>> which wraps all access to the output stream, and I synchronized on this
>> output stream whenever I called event.close() on the Comet event
>> (especially in the READ event). When doing serveral IO operations on
the
>> output stream which should not be interrupted, you can simply
>> synchronize on it (since SynchronizedOutputStream locks on itself).
>>
>> Regards,
>> Sebastiaan
>>
>>
>> Martin Perez wrote:
>> > Hi all,
>> >
>> > I have been testing Tomcat comet support during the last days. I've
>> > created
>> > a test application that simulates data streaming to multiple clients.
>> > Clients open a connection to the server and the connection is hold
>> > open. In
>> > the server there are several "sender" threads that leverage comet
>> > support to
>> > periodically send stream updates to the subscribed clients. Once a
>> client
>> > receives a specified amount of data the communication is closed.
>> >
>> > So, I'm running the test with tomcat svn version from 1 week ago. I
>> have
>> > configured 150 tomcat threads, plus 25 acceptor threads. Tomcat
>> runs in
>> a
>> > Sun-Fire-V240 box with 2 CPU cores. I have 75 sender threads that
>> > periodically sleep (100ms) to try to reduce the CPU usage. Once a
>> sender
>> > thread wakes up, it will send all the data to the assigned clients.
>> >
>> > So basically the story is that the box is able to manage about 3500
>> > persistent comet connections smoothly. But when I raise the level to
>> 4000
>> > (80% CPU usage, 1.2G mem.) I start to get these exceptions for some
>> (not
>> > all) connections:
>> >
>> > Exception in thread "MessageSender[]" java.lang.NullPointerException
>> > at org.apache.coyote.http11.InternalNioOutputBuffer.addToBB(
>> > InternalNioOutputBuffer.java:607)
>> > at org.apache.coyote.http11.InternalNioOutputBuffer.commit(
>> > InternalNioOutputBuffer.java:600)
>> > at org.apache.coyote.http11.Http11NioProcessor.action(
>> > Http11NioProcessor.java:1032)
>> > at org.apache.coyote.Response.action(Response.java:181)
>> > at org.apache.coyote.http11.InternalNioOutputBuffer.doWrite(
>> > InternalNioOutputBuffer.java:571)
>> > at org.apache.coyote.Response.doWrite(Response.java:560)
>> > at org.apache.catalina.connector.OutputBuffer.realWriteBytes(
>> > OutputBuffer.java:353)
>> > at org.apache.tomcat.util.buf.ByteChunk.flushBuffer(
>> ByteChunk.java
>> > :434)
>> > at org.apache.tomcat.util.buf.ByteChunk.append(ByteChunk.java
>> :349)
>> > at org.apache.catalina.connector.OutputBuffer.writeBytes(
>> > OutputBuffer.java:381)
>> > at org.apache.catalina.connector.OutputBuffer.write(
>> > OutputBuffer.java:370)
>> > at org.apache.catalina.connector.CoyoteOutputStream.write(
>> > CoyoteOutputStream.java:89)
>> > at org.apache.catalina.connector.CoyoteOutputStream.write(
>> > CoyoteOutputStream.java:83)
>> > at com.cognotec.streaming.CometRateSender.run(
>> CometRateSender.java
>> > :86)
>> > at java.lang.Thread.run(Thread.java:595)
>> >
>> > So well, does anybody knows what can be the problem? Any suggestions?
>> > It can
>> > be also my problem, so I will be happy to provide more information.
>> >
>> > Cheers,
>> > Martin
>> >
>>
>> ---------------------------------------------------------------------
>> To start a new topic, e-mail: [email protected]
>> To unsubscribe, e-mail: [EMAIL PROTECTED]
>> For additional commands, e-mail: [EMAIL PROTECTED]
>>
>>
>
---------------------------------------------------------------------
To start a new topic, e-mail: [email protected]
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]
package com.cognotec.streaming;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.commons.lang.SerializationUtils;
import org.apache.log4j.Logger;
public class CometRateSender implements Runnable {
static Logger logger = Logger.getLogger(CometRateSender.class);
private StreamingCometTestServlet servlet;
protected ArrayList<Rate> rates = new ArrayList<Rate>();
private HashMap<String, Integer> ratesWrited = new HashMap<String, Integer>();
private HashMap<String, Integer> ratesPending = new HashMap<String, Integer>();
private boolean running;
private long statsLastDumped =-1;
private int id;
private long DUMP_STATS_INTERVAL = 5000;
public CometRateSender(StreamingCometTestServlet servlet, int id) {
this.servlet = servlet;
this.id = id;
ratesWrited.clear();
}
public void stop() {
running = false;
}
public void send(Rate rate) {
synchronized (rates) {
rates.add(rate);
rateAdded(rate);
rates.notify();
}
}
public void run() {
running = true;
while (running) {
if (rates.size() == 0) {
try {
synchronized (rates) {
rates.wait();
}
} catch (InterruptedException e) {
// Ignore
}
}
Rate[] pendingRates = null;
synchronized (rates) {
pendingRates = rates.toArray(new Rate[0]);
rates.clear();
ratesPending.clear();
}
// Send any pending message on all the open connections
List<SynchronizedOutputStream> connections = servlet.getConnections(this);
if (connections.size() > 0) {
if (logger.isDebugEnabled()) {
logger.debug("Comet sender " + id + ". Sending message to " + connections.size() + " opened connections");
}
Iterator<SynchronizedOutputStream> it = connections.iterator();
while (it.hasNext()) {
try {
SynchronizedOutputStream stream = it.next();
if (!stream.isClosed()) {
for (int j = 0; j < pendingRates.length; j++) {
byte[] buffer = SerializationUtils.serialize(pendingRates[j]);
stream.write(buffer);
rateSent(pendingRates[j]);
}
} else {
servlet.removeConnection(this, stream);
}
} catch (IOException e) {
logger.error("Comet sender " + id + ".Exception sending message to client. Probably a client has closed the stream.");
}
}
dumpStats();
}
connections.clear();
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public void dumpStats() {
long time = System.currentTimeMillis();
if ((statsLastDumped != -1) && (time - statsLastDumped < DUMP_STATS_INTERVAL)) {
return;
}
StringBuffer stats = new StringBuffer();
stats.append("Rate streaming comet server " + id + " statistics\n");
stats.append("--------------------------------\n\n");
Iterator<Map.Entry<String, Integer>> it = ratesWrited.entrySet().iterator();
while (it.hasNext()) {
Map.Entry<String,Integer> entry = it.next();
double throughput = 1.0D * entry.getValue() / ((time - statsLastDumped)/1000);
Integer ipending = ratesPending.get(entry.getKey());
int pending = ipending == null ? 0 : ipending.intValue();
stats.append(entry.getValue());
stats.append(" rates has been sent for the currency pair ");
stats.append(entry.getKey());
stats.append(". This makes a throughput of: ");
stats.append(throughput);
stats.append(" rates/sec.");
stats.append(pending);
stats.append(" rates pending.\n");
}
logger.info(stats);
statsLastDumped = time;
ratesWrited.clear();
}
public void rateAdded(Rate rate) {
// Add one to the sent rates
String rateId = rate.getId();
Integer rates = ratesPending.get(rateId);
if (rates == null) {
rates = new Integer(0);
}
ratesPending.put(rateId,new Integer(rates.intValue()+1));
}
public void rateSent(Rate rate) {
// Add one to the sent rates
String rateId = rate.getId();
Integer rates = ratesWrited.get(rateId);
if (rates == null) {
rates = new Integer(0);
}
ratesWrited.put(rateId,new Integer(rates.intValue()+1));
}
public int getId() {
return id;
}
}
package com.cognotec.streaming;
import java.io.IOException;
import java.io.OutputStream;
/**
* This output stream simply wraps another output stream and synchronizes access
* to the underlying output stream. The synchronization is done on the instance
* of this class.
*
* @author sebster
*/
public class SynchronizedOutputStream extends OutputStream {
final OutputStream out;
boolean closed;
public SynchronizedOutputStream(final OutputStream out) {
if (out == null) {
throw new NullPointerException("out"); //$NON-NLS-1$
}
this.out = out;
}
public synchronized void write(final byte[] b) throws IOException {
out.write(b);
}
public synchronized void write(final byte[] b, final int off, final int len) throws IOException {
out.write(b, off, len);
}
public synchronized void write(final int b) throws IOException {
out.write(b);
}
public synchronized void flush() throws IOException {
out.flush();
}
public synchronized void close() throws IOException {
closed=true;
out.close();
}
public synchronized boolean isClosed() {
return closed;
}
}
package com.cognotec.streaming;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.apache.catalina.CometEvent;
import org.apache.catalina.CometProcessor;
import org.apache.log4j.Logger;
import com.cognotec.streaming.jmssim.RateGenerator;
import com.cognotec.streaming.jmssim.RateGeneratorFactory;
import com.cognotec.streaming.jmssim.RateListener;
public class StreamingCometTestServlet extends HttpServlet
implements CometProcessor, RateListener {
private static final long serialVersionUID = 7520606671931567655L;
static Logger logger = Logger.getLogger(StreamingCometTestServlet.class);
//private List<HttpServletResponse> connections =
//Collections.synchronizedList(new ArrayList<HttpServletResponse>());
private ArrayList<CometRateSender> cometRateSenders = new ArrayList<CometRateSender>(75);
private HashMap<CometRateSender, List<SynchronizedOutputStream>> connections =
new HashMap<CometRateSender, List<SynchronizedOutputStream>>();
private int roundrobin = 0;
Object roundlock = new Object();
@Override
public void init() throws ServletException {
super.init();
for (int i=0;i<75;i++) {
CometRateSender cometRateSender = new CometRateSender(this,i);
Thread messageSenderThread =
new Thread(cometRateSender,"MessageSender[" + getServletInfo() + "]");
messageSenderThread.setDaemon(true);
messageSenderThread.start();
cometRateSenders.add(cometRateSender);
connections.put(cometRateSender, new ArrayList<SynchronizedOutputStream>());
}
RateGenerator generator = RateGeneratorFactory.getRateGenerator();
generator.addRateListener(this);
}
public void rateAvailable(Rate rate) {
for (CometRateSender sender: cometRateSenders) {
sender.send(rate);
}
}
public void event(CometEvent ce) throws IOException, ServletException {
HttpServletRequest request = ce.getHttpServletRequest();
HttpServletResponse response = ce.getHttpServletResponse();
if (ce.getEventType() == CometEvent.EventType.BEGIN) {
//ce.setTimeout(30*1000);
if (logger.isDebugEnabled()) {
logger.debug("Beginning comet session with " + request.getSession(true).getId());
}
//synchronized (connections) {
synchronized(roundlock) {
CometRateSender sender = cometRateSenders.get(roundrobin);
if (logger.isDebugEnabled()) {
logger.debug("Assigning session " + request.getSession().getId() + " to Comet Rate Sender " + sender.getId());
}
List<SynchronizedOutputStream> connections = this.connections.get(sender);
connections.add(new SynchronizedOutputStream(response.getOutputStream()));
roundrobin++;
if (roundrobin == cometRateSenders.size()) {
roundrobin = 0;
}
}
//}
} else if (ce.getEventType() == CometEvent.EventType.ERROR) {
if (logger.isDebugEnabled()) {
logger.debug("Error for session " + request.getSession(true).getId());
}
//synchronized (connections) {
removeResponse(ce, response);
//}
} else if (ce.getEventType() == CometEvent.EventType.END) {
if (logger.isDebugEnabled()) {
logger.debug("End for session " + request.getSession(true).getId());
}
removeResponse(ce, response);
} else if (ce.getEventType() == CometEvent.EventType.READ) {
InputStream is = request.getInputStream();
if (logger.isDebugEnabled()) {
logger.debug("Received read event. Available bytes: " + is.available());
}
byte[] buf = new byte[512];
do {
int n = is.read(buf);
if (n > 0) {
if (logger.isDebugEnabled()) {
logger.debug("Read " + n + " bytes: " + new String(buf, 0, n)
+ " for session: " + request.getSession(true).getId());
}
} else if (n < 0) {
if (logger.isDebugEnabled()) {
logger.debug("Received " + n + " bytes. Closing connection.");
}
removeResponse(ce, response);
return;
}
} while (is.available() > 0);
} else {
if (logger.isDebugEnabled()) {
logger.debug("Received unknown event:" + ce.getEventType());
}
}
}
private void removeResponse(CometEvent ce,
HttpServletResponse response) throws IOException {
for(CometRateSender sender: cometRateSenders) {
List<SynchronizedOutputStream> connections = this.connections.get(sender);
SynchronizedOutputStream foundStream = null;
for(SynchronizedOutputStream stream: connections) {
if (stream.out.equals(response.getOutputStream())) {
foundStream = null;
synchronized(stream) {
ce.close();
stream.close();
}
break;
}
}
if (foundStream != null) {
connections.remove(foundStream);
break;
}
}
}
@Override
public void destroy() {
connections.clear();
}
public List<SynchronizedOutputStream> getConnections(CometRateSender sender) {
List<SynchronizedOutputStream> connections = this.connections.get(sender);
if (connections != null) {
return new ArrayList<SynchronizedOutputStream>(connections);
} else {
return new ArrayList<SynchronizedOutputStream>();
}
}
public void removeConnection(CometRateSender sender, SynchronizedOutputStream stream) {
List<SynchronizedOutputStream> connections = this.connections.get(sender);
connections.remove(stream);
}
}
---------------------------------------------------------------------
To start a new topic, e-mail: [email protected]
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]