Sebastiaan,

Yes you're right. I forgot to add something like isClosed() to your
SynchronizedOutputStream so I can check if it's closed before writing.

Anyways, I found that after a certain limit (for ex. 6000 -> 8000 users) the
system starts to behave incorrectly. Some clients never get an answer from
the server, some clients only receive partial data, and some of them really
end. I guess I found the limit in the box, but attached are the source code
files if someone wants to see the sample or simply spot something in it.

Cheers,
Martin

On 5/2/07, Sebastiaan van Erk <[EMAIL PROTECTED]> wrote:

Praveen Balaji wrote:
> The
> last few days I have been evaluating using CometProcessor to work like
an
> Async Servlet for me. I pick up the CometEvent object on BEGIN event and
> process the whole request asynchronously. When I am done, I close the
I/O stream.
>
>
> I
> would like to know what the Tomcat developers and the developers in
general
> think about this approach. Is it a misuse of the API to process requests
> asynchronously, outside of the event method? Are there any issues that I
should be cautious about? Synchronization issues?
>
>
>
See the mailing list archive of the last month and filter on "comet".
There have been quite a few posts on exactly this subject.

There are multiple issues:

1) Closing the I/O stream (on the response) does not end the
request/response. It takes another 30-60 seconds before the event method
gets called with an END event leaving a lot of request open unnecessarily.

2) Nothing is synchronized in Comet, so you have to do all
synchronization yourself. You have to make sure that you do not write to
the output stream after the event is closed (even though you may still
have the reference in your asynchronous application code). I seem to
have eliminated most issues by synchronizing access to the response
output stream and using the same lock around the event.close() method in
the event() method of the CometProcessor. However I still very
sporadically get a ClientAbortException/ClosedChannelException which
suggests that I've missed a place where synchronization is necessary. It
might not be possible to synchronize this though, because it could
happen deep inside Tomcat. The API has no information about what to
synchronize for asynchronous use.

3) If you rely on the POST method to send parameters to your
CometProcessor, then you have to parse the parameters yourself; there is
an issue that doing getParameter() on the request in the BEGIN event may
return null because the request body has not yet been received by the
server at the moment of the getParameter() call.

One of the Comet developers (Remy) suggests that it might be better to
just use normal servlets and buy some extra memory for the extra threads.

As far as the current API goes, I would say it is probably a "misuse",
though I would very much like to see the API improved so that this is no
longer the case.

Regards,
Sebastiaan

> Thanks,
> Praveen Balaji.
>
>
>
>
>
>
>
> __________________________________________________
> Do You Yahoo!?
> Tired of spam?  Yahoo! Mail has the best spam protection around
> http://mail.yahoo.com
>

---------------------------------------------------------------------
To start a new topic, e-mail: users@tomcat.apache.org
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]


package com.cognotec.streaming;

import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;

import javax.servlet.ServletException;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;

import org.apache.catalina.CometEvent;
import org.apache.catalina.CometProcessor;
import org.apache.log4j.Logger;

import com.cognotec.streaming.jmssim.RateGenerator;
import com.cognotec.streaming.jmssim.RateGeneratorFactory;
import com.cognotec.streaming.jmssim.RateListener;

public class StreamingCometTestServlet extends HttpServlet 
									   implements CometProcessor, RateListener {

	
	private static final long serialVersionUID = 7520606671931567655L;
	static Logger logger = Logger.getLogger(StreamingCometTestServlet.class);
	
	//private List<HttpServletResponse> connections = 
		//Collections.synchronizedList(new ArrayList<HttpServletResponse>());
	
	private ArrayList<CometRateSender> cometRateSenders = new ArrayList<CometRateSender>(75);
	private HashMap<CometRateSender, List<SynchronizedOutputStream>> connections = 
		new HashMap<CometRateSender, List<SynchronizedOutputStream>>();
	
	private int roundrobin = 0;
	Object roundlock = new Object();
	
	@Override
	public void init() throws ServletException {
		
		super.init();

		for (int i=0;i<75;i++) {
			CometRateSender cometRateSender = new CometRateSender(this,i);
			Thread messageSenderThread = 
				new Thread(cometRateSender,"MessageSender[" + getServletInfo() + "]");
			messageSenderThread.setDaemon(true);
			messageSenderThread.start();			
			cometRateSenders.add(cometRateSender);
			connections.put(cometRateSender, new ArrayList<SynchronizedOutputStream>());
		}
		
		RateGenerator generator = RateGeneratorFactory.getRateGenerator();
		generator.addRateListener(this);
	}

	public void rateAvailable(Rate rate) {
		
		for (CometRateSender sender: cometRateSenders) {
			sender.send(rate);
		}
	}
	
	public void event(CometEvent ce) throws IOException, ServletException {
		
		HttpServletRequest request = ce.getHttpServletRequest();
		HttpServletResponse response = ce.getHttpServletResponse();
		
		if (ce.getEventType() == CometEvent.EventType.BEGIN) {
			//ce.setTimeout(30*1000);
			if (logger.isDebugEnabled()) {
				logger.debug("Beginning comet session with " + request.getSession(true).getId());
			}
			//synchronized (connections) {
				synchronized(roundlock) {
					CometRateSender sender = cometRateSenders.get(roundrobin);
					if (logger.isDebugEnabled()) {
						logger.debug("Assigning session " + request.getSession().getId() + " to Comet Rate Sender " + sender.getId());						
					}
				
					List<SynchronizedOutputStream> connections = this.connections.get(sender);
					connections.add(new SynchronizedOutputStream(response.getOutputStream()));
					roundrobin++;
					if (roundrobin == cometRateSenders.size()) {
						roundrobin = 0;
					}
				}
			//}
		} else if (ce.getEventType() == CometEvent.EventType.ERROR) {
			if (logger.isDebugEnabled()) {
				logger.debug("Error for session " + request.getSession(true).getId());
			}
			//synchronized (connections) {
				removeResponse(ce, response);
			//}
		} else if (ce.getEventType() == CometEvent.EventType.END) {
			if (logger.isDebugEnabled()) {
				logger.debug("End for session " + request.getSession(true).getId());
			}	
			removeResponse(ce, response);
		} else if (ce.getEventType() == CometEvent.EventType.READ) {
            InputStream is = request.getInputStream();
            
			if (logger.isDebugEnabled()) {
				logger.debug("Received read event. Available bytes: " + is.available());
			}
            
            byte[] buf = new byte[512];
            do {
                int n = is.read(buf);
                if (n > 0) {
                	if (logger.isDebugEnabled()) {
	                    logger.debug("Read " + n + " bytes: " + new String(buf, 0, n) 
	                            + " for session: " + request.getSession(true).getId());
                	}
                } else if (n < 0) {
                    if (logger.isDebugEnabled()) {
                    	logger.debug("Received " + n + " bytes. Closing connection.");
                    }
        			removeResponse(ce, response);
        			                   
                    return;
                }
            } while (is.available() > 0);			
		} else {
			if (logger.isDebugEnabled()) {
				logger.debug("Received unknown event:" + ce.getEventType());
			}
		}
	}

	private void removeResponse(CometEvent ce, 
								HttpServletResponse response) throws IOException {
		
		for(CometRateSender sender: cometRateSenders) {
			List<SynchronizedOutputStream> connections = this.connections.get(sender);
			SynchronizedOutputStream foundStream = null;
			for(SynchronizedOutputStream stream: connections) {
				if (stream.out.equals(response.getOutputStream())) {
					foundStream = null;
					synchronized(stream) {
						ce.close();
						stream.close();
					}
					break;
				}
			}
			if (foundStream != null) {
				connections.remove(foundStream);
				break;
			}
		}
	}
	
	@Override
	public void destroy() {

		connections.clear();
	}

	public List<SynchronizedOutputStream> getConnections(CometRateSender sender) {
		
		List<SynchronizedOutputStream> connections = this.connections.get(sender);
		if (connections != null) {
			return new ArrayList<SynchronizedOutputStream>(connections);
		} else {
			return new ArrayList<SynchronizedOutputStream>();
		}
	}
	
	public void removeConnection(CometRateSender sender, SynchronizedOutputStream stream) {
		
		List<SynchronizedOutputStream> connections = this.connections.get(sender);
		connections.remove(stream);
	}	
}
package com.cognotec.streaming;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;

import org.apache.commons.lang.SerializationUtils;
import org.apache.log4j.Logger;

public class CometRateSender implements Runnable {

	static Logger logger = Logger.getLogger(CometRateSender.class);
	
	private StreamingCometTestServlet servlet;
	protected ArrayList<Rate> rates = new ArrayList<Rate>();
	
	private HashMap<String, Integer> ratesWrited = new HashMap<String, Integer>();
	private HashMap<String, Integer> ratesPending = new HashMap<String, Integer>();
	
	private boolean running;
	private long statsLastDumped =-1;
	private int id;

	private long DUMP_STATS_INTERVAL = 5000;
	
	public CometRateSender(StreamingCometTestServlet servlet, int id) {
		
		this.servlet = servlet;
		this.id = id;
		ratesWrited.clear();
	}
	
	public void stop() {
		
		running = false;
	}
	
    public void send(Rate rate) {
    	
        synchronized (rates) {
            rates.add(rate);
            rateAdded(rate);
            rates.notify();
        }
    }
	
    public void run() {

    	running = true;
        while (running) {

            if (rates.size() == 0) {
                try {
                    synchronized (rates) {
                        rates.wait();
                    }
                } catch (InterruptedException e) {
                    // Ignore
                }
            }

            Rate[] pendingRates = null;
            synchronized (rates) {
                pendingRates = rates.toArray(new Rate[0]);
                rates.clear();
                ratesPending.clear();
            }
            // Send any pending message on all the open connections

            List<SynchronizedOutputStream> connections = servlet.getConnections(this);
            if (connections.size() > 0) {
                if (logger.isDebugEnabled()) {
            		logger.debug("Comet sender " + id + ". Sending message to " + connections.size() + " opened connections");
            	}
                Iterator<SynchronizedOutputStream> it = connections.iterator();
                while (it.hasNext()) {                	
                    try {
                    	SynchronizedOutputStream stream = it.next();
                    	if (!stream.isClosed()) {
	                        for (int j = 0; j < pendingRates.length; j++) {
	                        	byte[] buffer = SerializationUtils.serialize(pendingRates[j]);
	                        	stream.write(buffer);
	                        	rateSent(pendingRates[j]);
	                        }
                    	} else {
                    		servlet.removeConnection(this, stream);
                    	}
                    } catch (IOException e) {
                    	logger.error("Comet sender " + id + ".Exception sending message to client. Probably a client has closed the stream.");
                    }
                }
                dumpStats();
            }
            connections.clear();
            try {
				Thread.sleep(500);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
        }
    }

	public void dumpStats() {

		long time = System.currentTimeMillis();

		if ((statsLastDumped != -1) && (time - statsLastDumped < DUMP_STATS_INTERVAL)) {
			return;
		}

		StringBuffer stats = new StringBuffer();
		stats.append("Rate streaming comet server " + id + " statistics\n");
		stats.append("--------------------------------\n\n");
		
		Iterator<Map.Entry<String, Integer>> it = ratesWrited.entrySet().iterator();
		while (it.hasNext()) {			
			Map.Entry<String,Integer> entry = it.next();

			double throughput = 1.0D * entry.getValue() / ((time - statsLastDumped)/1000); 
			
			Integer ipending = ratesPending.get(entry.getKey());
			int pending = ipending == null ? 0 : ipending.intValue();

			stats.append(entry.getValue());
			stats.append(" rates has been sent for the currency pair ");
			stats.append(entry.getKey());
			stats.append(". This makes a throughput of: ");
			stats.append(throughput);
			stats.append(" rates/sec.");
			stats.append(pending);
			stats.append(" rates pending.\n");
		}
		logger.info(stats);
		
		
		statsLastDumped = time;
		ratesWrited.clear();
	}
    
	public void rateAdded(Rate rate) {
		
		// Add one to the sent rates
		String rateId = rate.getId();
		Integer rates = ratesPending.get(rateId);
		if (rates == null) {
			rates = new Integer(0);
		}
		ratesPending.put(rateId,new Integer(rates.intValue()+1));
	}
	
	public void rateSent(Rate rate) {

		// Add one to the sent rates
		String rateId = rate.getId();
		Integer rates = ratesWrited.get(rateId);
		if (rates == null) {
			rates = new Integer(0);
		}
		ratesWrited.put(rateId,new Integer(rates.intValue()+1));		
	}

	public int getId() {
		return id;
	}
	
}
package com.cognotec.streaming;

import java.io.IOException;
import java.io.OutputStream;

/**
 * This output stream simply wraps another output stream and synchronizes access
 * to the underlying output stream. The synchronization is done on the instance
 * of this class.
 * 
 * @author sebster
 */
public class SynchronizedOutputStream extends OutputStream {

	final OutputStream out;
	
	boolean closed;

	public SynchronizedOutputStream(final OutputStream out) {
		if (out == null) {
			throw new NullPointerException("out"); //$NON-NLS-1$
		}
		this.out = out;
	}

	public synchronized void write(final byte[] b) throws IOException {
		out.write(b);
	}

	public synchronized void write(final byte[] b, final int off, final int len) throws IOException {
		out.write(b, off, len);
	}

	public synchronized void write(final int b) throws IOException {
		out.write(b);
	}

	public synchronized void flush() throws IOException {
		out.flush();
	}

	public synchronized void close() throws IOException {
		
		closed=true;
		out.close();
	}

	public synchronized boolean isClosed() {
		return closed;
	}

}

---------------------------------------------------------------------
To start a new topic, e-mail: users@tomcat.apache.org
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]

Reply via email to