User: hiram
Date: 00/11/16 14:42:52
Added: src/java/org/spydermq/multiplexor StreamDemux.java
Log:
These classes provide a way to multiplex the use of a
socket.
Revision Changes Path
1.1 spyderMQ/src/java/org/spydermq/multiplexor/StreamDemux.java
Index: StreamDemux.java
===================================================================
/*
* spyderMQ, the OpenSource JMS implementation
*
* Distributable under GPL license.
* See terms of license at gnu.org.
*/
package org.spydermq.multiplexor;
import java.io.*;
import java.util.*;
/**
* This class is used to demultiplex from
* a single stream into multiple streams.
*
* @author Hiram Chirino ([EMAIL PROTECTED])
*
* @version $Revision: 1.1 $
*/
public class StreamDemux {
short frameSize = 512;
HashMap openStreams = new HashMap();
InputStream in;
DataInputStream objectIn;
boolean pumpingData = false;
byte inputBuffer[] = new byte[frameSize];
/**
* StreamMux constructor comment.
* @param out java.io.OutputStream
*/
public StreamDemux(InputStream in) throws IOException {
this.in = in;
this.objectIn = new DataInputStream(in);
}
void closeStream(short id) throws IOException {
if (id == 0)
throw new IOException("Stream id 0 is reserved for internal
use.");
synchronized (openStreams) {
openStreams.remove(new Short(id));
}
}
/**
* Creation date: (11/15/00 5:30:55 PM)
* @return short
*/
public short getFrameSize() {
synchronized (openStreams) {
return frameSize;
}
}
public InputStream getStream(short id) throws IOException {
if (id == 0)
throw new IOException("Stream id 0 is reserved for internal
use.");
InputStream s;
synchronized (openStreams) {
s = (InputStream) openStreams.get(new Short(id));
;
if (s != null)
return s;
s = new DemuxInputStream(this, id);
openStreams.put(new Short(id), s);
}
return s;
}
/**
* Creation date: (11/15/00 5:30:55 PM)
* @param newFrameSize short
*/
public void setFrameSize(short newFrameSize) throws IOException {
synchronized (openStreams) {
if (openStreams.size() > 0)
throw new IOException("Cannot change the frame size
while there are open streams.");
frameSize = newFrameSize;
inputBuffer = new byte[frameSize];
}
}
public int available(DemuxInputStream s) throws IOException {
return objectIn.available();
}
/**
* Pumps data to all input streams until data for the
* dest Stream arrives. Only on thread is allowed to pump
* data at a time and this method returns true if it pumped
* data into its input buffer. It returns false if another
*thread is allready pumping data.
*/
public boolean pumpData(DemuxInputStream dest) throws IOException {
synchronized (this) {
if (pumpingData)
return false;
else
pumpingData = true;
}
// Start pumping the data
short nextFrameSize = frameSize;
while (true) {
short streamId = objectIn.readShort();
// Was it a command on the admin stream?
if (streamId == 0) {
// Next byte is the command.
switch (objectIn.readByte()) {
case StreamMux.OPEN_STREAM_COMMAND :
getStream(objectIn.readShort());
break;
case StreamMux.CLOSE_STREAM_COMMAND :
DemuxInputStream s;
synchronized (openStreams) {
s = (DemuxInputStream)
openStreams.get(new Short(streamId));
}
if (s != null) {
closeStream(s.streamId);
s.atEOF = true;
if (s == dest) {
break;
} else {
// Wake up the thread
that was waiting for input (it got a EOF)
synchronized
(s.bufferMutex) {
s.bufferMutex.notify();
}
}
}
break;
case StreamMux.NEXT_FRAME_SHORT_COMMAND :
nextFrameSize = objectIn.readShort();
break;
}
} else {
objectIn.readFully(inputBuffer, 0, nextFrameSize);
DemuxInputStream s;
synchronized (openStreams) {
s = (DemuxInputStream) openStreams.get(new
Short(streamId));
}
if (s == null)
continue;
s.loadBuffer(inputBuffer, nextFrameSize);
if (s == dest)
break;
nextFrameSize = frameSize;
}
}
synchronized (this) {
pumpingData = false;
}
// we are done pumping but another thread may be
// intrested in pumping.
synchronized (openStreams) {
Iterator iter = openStreams.values().iterator();
while (iter.hasNext() && pumpingData == false) {
DemuxInputStream s = (DemuxInputStream) iter.next();
synchronized (s.bufferMutex) {
s.bufferMutex.notify();
}
}
}
return true;
}
}