Sebastiaan,

Yes you're right. I forgot to add something like isClosed() to your
SynchronizedOutputStream so I can check if it's closed before writing.

Anyways, I found that after a certain limit (for ex. 6000 -> 8000 users) the
system starts to behave incorrectly. Some clients never get an answer from
the server, some clients only receive partial data, and some of them really
end. I guess I found the limit in the box, but attached are the source code
files if someone wants to see the sample or simply spot something in it.

Cheers,
Martin

On 5/1/07, Sebastiaan van Erk <[EMAIL PROTECTED]> wrote:

Hi,

Did you synchronize the event.close() with same lock you use to write to
the output stream (the SynchronizedOuputStream instance)?

I saw exactly the same stack trace you are getting due to (incorrect)
concurrent access to the CoyoteOutputStream. The NullPointerException
suggest that the request has already been cleaned up/recycled and yet
you still write to it. Make sure that when you close the event, either
the write becomes a null operation, or better, does not get called. It
is a bug to call write on the response output stream after the event has
been closed.

With the SynchronizedOutputStream construction I am relying on Tomcat
not to close or call any methods on the CoyoteOutputStream outside of
the CometProcessor's event() method. I don't know if this is correct (it
has worked for me in practice so far, and I've been doing some heavy
duty testing); however I do not know what else I could synchronize on.
If anybody knows a better approach, I would like to know. :-)

Regards,
Sebastiaan

Martin Perez wrote:
> Hi,
>
> Thanks for the tip Sebastian, but it doesn't work for me. I added your
> class, wrapped all the output streams and synchronized close events
> but the
> result is the same.
>
> Exception in thread "MessageSender[]" java.lang.NullPointerException
>        at org.apache.coyote.http11.InternalNioOutputBuffer.addToBB(
> InternalNioOutputBuffer.java:607)
>        at org.apache.coyote.http11.InternalNioOutputBuffer.commit(
> InternalNioOutputBuffer.java:600)
>        at org.apache.coyote.http11.Http11NioProcessor.action(
> Http11NioProcessor.java:1032)
>        at org.apache.coyote.Response.action(Response.java:181)
>        at org.apache.coyote.http11.InternalNioOutputBuffer.doWrite(
> InternalNioOutputBuffer.java:571)
>        at org.apache.coyote.Response.doWrite(Response.java:560)
>        at org.apache.catalina.connector.OutputBuffer.realWriteBytes(
> OutputBuffer.java:353)
>        at org.apache.tomcat.util.buf.ByteChunk.flushBuffer(
ByteChunk.java
> :434)
>        at org.apache.tomcat.util.buf.ByteChunk.append(ByteChunk.java
:349)
>        at org.apache.catalina.connector.OutputBuffer.writeBytes(
> OutputBuffer.java:381)
>        at org.apache.catalina.connector.OutputBuffer.write(
> OutputBuffer.java:370)
>        at org.apache.catalina.connector.CoyoteOutputStream.write(
> CoyoteOutputStream.java:89)
>        at org.apache.catalina.connector.CoyoteOutputStream.write(
> CoyoteOutputStream.java:83)
>        at com.cognotec.streaming.SynchronizedOutputStream.write(
> SynchronizedOutputStream.java:25)
>        at com.cognotec.streaming.CometRateSender.run(
CometRateSender.java
> :86)
>        at java.lang.Thread.run(Thread.java:595)
>
> (As I said, I also synchronized all the close method calls either in
> stream
> either in comet event)
>
> So, the question is: Is this a bug or simply I have reached the
> scalability
> limit for my box?
>
> Cheers,
> Martin
>
> On 4/30/07, Sebastiaan van Erk <[EMAIL PROTECTED]> wrote:
>>
>> Hi,
>>
>> I had the same problems. It was a synchronization issue with me. What I
>> did was make a synchronized output stream wrapper (see attached file)
>> which wraps all access to the output stream, and I synchronized on this
>> output stream whenever I called event.close() on the Comet event
>> (especially in the READ event). When doing serveral IO operations on
the
>> output stream which should not be interrupted, you can simply
>> synchronize on it (since SynchronizedOutputStream locks on itself).
>>
>> Regards,
>> Sebastiaan
>>
>>
>> Martin Perez wrote:
>> > Hi all,
>> >
>> > I have been testing Tomcat comet support during the last days. I've
>> > created
>> > a test application that simulates data streaming to multiple clients.
>> > Clients open a connection to the server and the connection is hold
>> > open. In
>> > the server there are several "sender" threads that leverage comet
>> > support to
>> > periodically send stream updates to the subscribed clients. Once a
>> client
>> > receives a specified amount of data the communication is closed.
>> >
>> > So, I'm running the test with tomcat svn version from 1 week ago. I
>> have
>> > configured 150 tomcat threads, plus 25 acceptor threads. Tomcat
>> runs in
>> a
>> > Sun-Fire-V240 box with 2 CPU cores. I have 75 sender threads that
>> > periodically sleep (100ms) to try to reduce the CPU usage. Once a
>> sender
>> > thread wakes up, it will send all the data to the assigned clients.
>> >
>> > So basically the story is that the box is able to manage about 3500
>> > persistent comet connections smoothly. But when I raise the level to
>> 4000
>> > (80% CPU usage, 1.2G mem.) I start to get these exceptions for some
>> (not
>> > all) connections:
>> >
>> > Exception in thread "MessageSender[]" java.lang.NullPointerException
>> >        at org.apache.coyote.http11.InternalNioOutputBuffer.addToBB(
>> > InternalNioOutputBuffer.java:607)
>> >        at org.apache.coyote.http11.InternalNioOutputBuffer.commit(
>> > InternalNioOutputBuffer.java:600)
>> >        at org.apache.coyote.http11.Http11NioProcessor.action(
>> > Http11NioProcessor.java:1032)
>> >        at org.apache.coyote.Response.action(Response.java:181)
>> >        at org.apache.coyote.http11.InternalNioOutputBuffer.doWrite(
>> > InternalNioOutputBuffer.java:571)
>> >        at org.apache.coyote.Response.doWrite(Response.java:560)
>> >        at org.apache.catalina.connector.OutputBuffer.realWriteBytes(
>> > OutputBuffer.java:353)
>> >        at org.apache.tomcat.util.buf.ByteChunk.flushBuffer(
>> ByteChunk.java
>> > :434)
>> >        at org.apache.tomcat.util.buf.ByteChunk.append(ByteChunk.java
>> :349)
>> >        at org.apache.catalina.connector.OutputBuffer.writeBytes(
>> > OutputBuffer.java:381)
>> >        at org.apache.catalina.connector.OutputBuffer.write(
>> > OutputBuffer.java:370)
>> >        at org.apache.catalina.connector.CoyoteOutputStream.write(
>> > CoyoteOutputStream.java:89)
>> >        at org.apache.catalina.connector.CoyoteOutputStream.write(
>> > CoyoteOutputStream.java:83)
>> >        at com.cognotec.streaming.CometRateSender.run(
>> CometRateSender.java
>> > :86)
>> >        at java.lang.Thread.run(Thread.java:595)
>> >
>> > So well, does anybody knows what can be the problem? Any suggestions?
>> > It can
>> > be also my problem, so I will be happy to provide more information.
>> >
>> > Cheers,
>> > Martin
>> >
>>
>> ---------------------------------------------------------------------
>> To start a new topic, e-mail: users@tomcat.apache.org
>> To unsubscribe, e-mail: [EMAIL PROTECTED]
>> For additional commands, e-mail: [EMAIL PROTECTED]
>>
>>
>

---------------------------------------------------------------------
To start a new topic, e-mail: users@tomcat.apache.org
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]


package com.cognotec.streaming;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;

import org.apache.commons.lang.SerializationUtils;
import org.apache.log4j.Logger;

public class CometRateSender implements Runnable {

	static Logger logger = Logger.getLogger(CometRateSender.class);
	
	private StreamingCometTestServlet servlet;
	protected ArrayList<Rate> rates = new ArrayList<Rate>();
	
	private HashMap<String, Integer> ratesWrited = new HashMap<String, Integer>();
	private HashMap<String, Integer> ratesPending = new HashMap<String, Integer>();
	
	private boolean running;
	private long statsLastDumped =-1;
	private int id;

	private long DUMP_STATS_INTERVAL = 5000;
	
	public CometRateSender(StreamingCometTestServlet servlet, int id) {
		
		this.servlet = servlet;
		this.id = id;
		ratesWrited.clear();
	}
	
	public void stop() {
		
		running = false;
	}
	
    public void send(Rate rate) {
    	
        synchronized (rates) {
            rates.add(rate);
            rateAdded(rate);
            rates.notify();
        }
    }
	
    public void run() {

    	running = true;
        while (running) {

            if (rates.size() == 0) {
                try {
                    synchronized (rates) {
                        rates.wait();
                    }
                } catch (InterruptedException e) {
                    // Ignore
                }
            }

            Rate[] pendingRates = null;
            synchronized (rates) {
                pendingRates = rates.toArray(new Rate[0]);
                rates.clear();
                ratesPending.clear();
            }
            // Send any pending message on all the open connections

            List<SynchronizedOutputStream> connections = servlet.getConnections(this);
            if (connections.size() > 0) {
                if (logger.isDebugEnabled()) {
            		logger.debug("Comet sender " + id + ". Sending message to " + connections.size() + " opened connections");
            	}
                Iterator<SynchronizedOutputStream> it = connections.iterator();
                while (it.hasNext()) {                	
                    try {
                    	SynchronizedOutputStream stream = it.next();
                    	if (!stream.isClosed()) {
	                        for (int j = 0; j < pendingRates.length; j++) {
	                        	byte[] buffer = SerializationUtils.serialize(pendingRates[j]);
	                        	stream.write(buffer);
	                        	rateSent(pendingRates[j]);
	                        }
                    	} else {
                    		servlet.removeConnection(this, stream);
                    	}
                    } catch (IOException e) {
                    	logger.error("Comet sender " + id + ".Exception sending message to client. Probably a client has closed the stream.");
                    }
                }
                dumpStats();
            }
            connections.clear();
            try {
				Thread.sleep(500);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
        }
    }

	public void dumpStats() {

		long time = System.currentTimeMillis();

		if ((statsLastDumped != -1) && (time - statsLastDumped < DUMP_STATS_INTERVAL)) {
			return;
		}

		StringBuffer stats = new StringBuffer();
		stats.append("Rate streaming comet server " + id + " statistics\n");
		stats.append("--------------------------------\n\n");
		
		Iterator<Map.Entry<String, Integer>> it = ratesWrited.entrySet().iterator();
		while (it.hasNext()) {			
			Map.Entry<String,Integer> entry = it.next();

			double throughput = 1.0D * entry.getValue() / ((time - statsLastDumped)/1000); 
			
			Integer ipending = ratesPending.get(entry.getKey());
			int pending = ipending == null ? 0 : ipending.intValue();

			stats.append(entry.getValue());
			stats.append(" rates has been sent for the currency pair ");
			stats.append(entry.getKey());
			stats.append(". This makes a throughput of: ");
			stats.append(throughput);
			stats.append(" rates/sec.");
			stats.append(pending);
			stats.append(" rates pending.\n");
		}
		logger.info(stats);
		
		
		statsLastDumped = time;
		ratesWrited.clear();
	}
    
	public void rateAdded(Rate rate) {
		
		// Add one to the sent rates
		String rateId = rate.getId();
		Integer rates = ratesPending.get(rateId);
		if (rates == null) {
			rates = new Integer(0);
		}
		ratesPending.put(rateId,new Integer(rates.intValue()+1));
	}
	
	public void rateSent(Rate rate) {

		// Add one to the sent rates
		String rateId = rate.getId();
		Integer rates = ratesWrited.get(rateId);
		if (rates == null) {
			rates = new Integer(0);
		}
		ratesWrited.put(rateId,new Integer(rates.intValue()+1));		
	}

	public int getId() {
		return id;
	}
	
}
package com.cognotec.streaming;

import java.io.IOException;
import java.io.OutputStream;

/**
 * This output stream simply wraps another output stream and synchronizes access
 * to the underlying output stream. The synchronization is done on the instance
 * of this class.
 * 
 * @author sebster
 */
public class SynchronizedOutputStream extends OutputStream {

	final OutputStream out;
	
	boolean closed;

	public SynchronizedOutputStream(final OutputStream out) {
		if (out == null) {
			throw new NullPointerException("out"); //$NON-NLS-1$
		}
		this.out = out;
	}

	public synchronized void write(final byte[] b) throws IOException {
		out.write(b);
	}

	public synchronized void write(final byte[] b, final int off, final int len) throws IOException {
		out.write(b, off, len);
	}

	public synchronized void write(final int b) throws IOException {
		out.write(b);
	}

	public synchronized void flush() throws IOException {
		out.flush();
	}

	public synchronized void close() throws IOException {
		
		closed=true;
		out.close();
	}

	public synchronized boolean isClosed() {
		return closed;
	}

}

package com.cognotec.streaming;

import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;

import javax.servlet.ServletException;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;

import org.apache.catalina.CometEvent;
import org.apache.catalina.CometProcessor;
import org.apache.log4j.Logger;

import com.cognotec.streaming.jmssim.RateGenerator;
import com.cognotec.streaming.jmssim.RateGeneratorFactory;
import com.cognotec.streaming.jmssim.RateListener;

public class StreamingCometTestServlet extends HttpServlet 
									   implements CometProcessor, RateListener {

	
	private static final long serialVersionUID = 7520606671931567655L;
	static Logger logger = Logger.getLogger(StreamingCometTestServlet.class);
	
	//private List<HttpServletResponse> connections = 
		//Collections.synchronizedList(new ArrayList<HttpServletResponse>());
	
	private ArrayList<CometRateSender> cometRateSenders = new ArrayList<CometRateSender>(75);
	private HashMap<CometRateSender, List<SynchronizedOutputStream>> connections = 
		new HashMap<CometRateSender, List<SynchronizedOutputStream>>();
	
	private int roundrobin = 0;
	Object roundlock = new Object();
	
	@Override
	public void init() throws ServletException {
		
		super.init();

		for (int i=0;i<75;i++) {
			CometRateSender cometRateSender = new CometRateSender(this,i);
			Thread messageSenderThread = 
				new Thread(cometRateSender,"MessageSender[" + getServletInfo() + "]");
			messageSenderThread.setDaemon(true);
			messageSenderThread.start();			
			cometRateSenders.add(cometRateSender);
			connections.put(cometRateSender, new ArrayList<SynchronizedOutputStream>());
		}
		
		RateGenerator generator = RateGeneratorFactory.getRateGenerator();
		generator.addRateListener(this);
	}

	public void rateAvailable(Rate rate) {
		
		for (CometRateSender sender: cometRateSenders) {
			sender.send(rate);
		}
	}
	
	public void event(CometEvent ce) throws IOException, ServletException {
		
		HttpServletRequest request = ce.getHttpServletRequest();
		HttpServletResponse response = ce.getHttpServletResponse();
		
		if (ce.getEventType() == CometEvent.EventType.BEGIN) {
			//ce.setTimeout(30*1000);
			if (logger.isDebugEnabled()) {
				logger.debug("Beginning comet session with " + request.getSession(true).getId());
			}
			//synchronized (connections) {
				synchronized(roundlock) {
					CometRateSender sender = cometRateSenders.get(roundrobin);
					if (logger.isDebugEnabled()) {
						logger.debug("Assigning session " + request.getSession().getId() + " to Comet Rate Sender " + sender.getId());						
					}
				
					List<SynchronizedOutputStream> connections = this.connections.get(sender);
					connections.add(new SynchronizedOutputStream(response.getOutputStream()));
					roundrobin++;
					if (roundrobin == cometRateSenders.size()) {
						roundrobin = 0;
					}
				}
			//}
		} else if (ce.getEventType() == CometEvent.EventType.ERROR) {
			if (logger.isDebugEnabled()) {
				logger.debug("Error for session " + request.getSession(true).getId());
			}
			//synchronized (connections) {
				removeResponse(ce, response);
			//}
		} else if (ce.getEventType() == CometEvent.EventType.END) {
			if (logger.isDebugEnabled()) {
				logger.debug("End for session " + request.getSession(true).getId());
			}	
			removeResponse(ce, response);
		} else if (ce.getEventType() == CometEvent.EventType.READ) {
            InputStream is = request.getInputStream();
            
			if (logger.isDebugEnabled()) {
				logger.debug("Received read event. Available bytes: " + is.available());
			}
            
            byte[] buf = new byte[512];
            do {
                int n = is.read(buf);
                if (n > 0) {
                	if (logger.isDebugEnabled()) {
	                    logger.debug("Read " + n + " bytes: " + new String(buf, 0, n) 
	                            + " for session: " + request.getSession(true).getId());
                	}
                } else if (n < 0) {
                    if (logger.isDebugEnabled()) {
                    	logger.debug("Received " + n + " bytes. Closing connection.");
                    }
        			removeResponse(ce, response);
        			                   
                    return;
                }
            } while (is.available() > 0);			
		} else {
			if (logger.isDebugEnabled()) {
				logger.debug("Received unknown event:" + ce.getEventType());
			}
		}
	}

	private void removeResponse(CometEvent ce, 
								HttpServletResponse response) throws IOException {
		
		for(CometRateSender sender: cometRateSenders) {
			List<SynchronizedOutputStream> connections = this.connections.get(sender);
			SynchronizedOutputStream foundStream = null;
			for(SynchronizedOutputStream stream: connections) {
				if (stream.out.equals(response.getOutputStream())) {
					foundStream = null;
					synchronized(stream) {
						ce.close();
						stream.close();
					}
					break;
				}
			}
			if (foundStream != null) {
				connections.remove(foundStream);
				break;
			}
		}
	}
	
	@Override
	public void destroy() {

		connections.clear();
	}

	public List<SynchronizedOutputStream> getConnections(CometRateSender sender) {
		
		List<SynchronizedOutputStream> connections = this.connections.get(sender);
		if (connections != null) {
			return new ArrayList<SynchronizedOutputStream>(connections);
		} else {
			return new ArrayList<SynchronizedOutputStream>();
		}
	}
	
	public void removeConnection(CometRateSender sender, SynchronizedOutputStream stream) {
		
		List<SynchronizedOutputStream> connections = this.connections.get(sender);
		connections.remove(stream);
	}	
}
---------------------------------------------------------------------
To start a new topic, e-mail: users@tomcat.apache.org
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]

Reply via email to