User: hiram
Date: 00/11/16 14:43:37
Added: src/java/org/spydermq/multiplexor DemuxInputStream.java
MultiplexorTest.java MuxOutputStream.java
SocketMultiplexor.java StreamMux.java
Log:
These class provide a way to multiplex the use of a socket
Revision Changes Path
1.1 spyderMQ/src/java/org/spydermq/multiplexor/DemuxInputStream.java
Index: DemuxInputStream.java
===================================================================
/*
* spyderMQ, the OpenSource JMS implementation
*
* Distributable under GPL license.
* See terms of license at gnu.org.
*/
package org.spydermq.multiplexor;
import java.io.InterruptedIOException;
import java.io.IOException;
import java.io.InputStream;
/**
* Objects of this class provide and an InputStream
* from a StreamDemux.
*
* Objects of this class are created by a StreamDemux object.
*
* @author Hiram Chirino ([EMAIL PROTECTED])
*
* @version $Revision: 1.1 $
*/
class DemuxInputStream extends InputStream {
StreamDemux streamDemux;
short streamId;
boolean atEOF = false;
Object bufferMutex = new Object();
byte buffer[];
short bufferEndPos;
short bufferStartPos;
DemuxInputStream(StreamDemux demux, short id) {
streamDemux = demux;
streamId = id;
buffer = new byte[1000];
bufferStartPos = 0;
bufferEndPos = 0;
}
public int available() throws IOException {
return getBufferFillSize();
}
public void close() throws IOException {
streamDemux.closeStream(streamId);
}
private int getBufferFillSize() {
return bufferStartPos <= bufferEndPos ? bufferEndPos - bufferStartPos
: buffer.length - (bufferStartPos - bufferEndPos);
}
private int getBufferFreeSize() {
return (buffer.length - 1) - getBufferFillSize();
}
public void loadBuffer(byte data[], short dataLength) throws IOException {
int freeSize = 0;
int dataPos = 0;
while (dataPos < dataLength) {
synchronized (bufferMutex) {
while ((freeSize = getBufferFreeSize()) == 0) {
try {
// Wait till the consumer notifies us
he has
// removed some data from the buffer.
bufferMutex.wait();
} catch (InterruptedException e) {
throw new
InterruptedIOException(e.getMessage());
}
}
// the buffer should have free space now.
freeSize = Math.min(freeSize, dataLength - dataPos);
for (int i = 0; i < freeSize; i++) {
buffer[bufferEndPos++] = data[dataPos + i];
bufferEndPos = bufferEndPos == buffer.length ?
0 : bufferEndPos;
}
}
dataPos += freeSize;
// the consumer might be waiting for bytes to come in
synchronized (bufferMutex) {
bufferMutex.notify();
}
}
}
public int read() throws IOException {
if (bufferStartPos == bufferEndPos && atEOF)
return -1;
synchronized (bufferMutex) {
// Wait till the buffer has data
while (!atEOF && bufferStartPos == bufferEndPos &&
!streamDemux.pumpData(this)) {
try {
// Wait till the producer notifies us he has
// put some data in the buffer.
bufferMutex.wait();
} catch (InterruptedException e) {
throw new
InterruptedIOException(e.getMessage());
}
}
}
// We might break out due to EOF
if (bufferStartPos == bufferEndPos)
return -1;
// the buffer should have data now.
byte result = buffer[bufferStartPos++];
bufferStartPos = bufferStartPos == buffer.length ? 0 : bufferStartPos;
// the producer might be waiting for free space in the
// buffer, we have to notify him.
synchronized (bufferMutex) {
bufferMutex.notify();
}
return result & 0xff;
}
public int read(byte b[], int off, int len) throws IOException {
if (b == null) {
throw new NullPointerException();
} else if ((off < 0) || (off > b.length) || (len < 0) || ((off + len)
> b.length) || ((off + len) < 0)) {
throw new IndexOutOfBoundsException();
} else if (len == 0) {
return 0;
}
int c = read();
if (c == -1) {
return -1;
}
b[off] = (byte) c;
len = Math.min(available(), len);
int i = 1;
try {
for (; i < len; i++) {
c = read();
b[off + i] = (byte) c;
}
} catch (IOException ee) {
}
return i;
}
}
1.1 spyderMQ/src/java/org/spydermq/multiplexor/MultiplexorTest.java
Index: MultiplexorTest.java
===================================================================
/*
* spyderMQ, the OpenSource JMS implementation
*
* Distributable under GPL license.
* See terms of license at gnu.org.
*/
package org.spydermq.multiplexor;
import java.io.ObjectInputStream;
import java.io.OutputStream;
import java.io.IOException;
import java.io.BufferedInputStream;
import java.io.PipedOutputStream;
import java.io.ObjectOutputStream;
import java.io.InterruptedIOException;
import java.io.BufferedOutputStream;
import java.io.PipedInputStream;
/**
* This class is a unit tester of the
* StreamMux and StreamDemux classes.
*
* Starts 3 concurent readers and 3 concurent
* writers using 3 fully buffered object streams multiplexed over
* a single Pipe(Input/Output)Stream. The
* writers send a 10K message with a timestamp.
*
* Readers display how long the message took to arrive.
*
* @author Hiram Chirino ([EMAIL PROTECTED])
*
* @version $Revision: 1.1 $
*/
public class MultiplexorTest {
StreamMux mux;
StreamDemux demux;
public static final int PAY_LOAD_SIZE = 1024 * 10;
public static char[] PAY_LOAD;
class WriterThread extends Thread {
ObjectOutputStream os;
short id;
WriterThread(short id) throws IOException {
super("WriterThread");
this.os = new ObjectOutputStream(new
BufferedOutputStream(mux.getStream(id)));
this.os.flush();
}
public void run() {
try {
for (int i = 0; i < 1000; i++) {
os.writeLong(System.currentTimeMillis());
os.writeObject(PAY_LOAD);
os.flush();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
class ReaderThread extends Thread {
ObjectInputStream is;
short id;
ReaderThread(short id) throws IOException {
super("ReaderThread");
this.is = new ObjectInputStream(new
BufferedInputStream(demux.getStream(id)));
this.id = id;
}
public void run() {
try {
for (int i = 0; i < 1000; i++) {
long t = is.readLong();
is.readObject();
t = System.currentTimeMillis() - t;
System.out.println("" + id + ": Packet " + i +
" Latency : " + ((double) t / (double) 1000));
System.out.flush();
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
/**
* MuxDemuxTester constructor
*/
public MultiplexorTest() {
super();
char s[] = new char[PAY_LOAD_SIZE];
char c = 'A';
for (int i = 0; i < PAY_LOAD_SIZE; i++) {
s[i] = c;
c++;
c = c > 'Z' ? 'A' : c;
}
PAY_LOAD = s;
}
public void connect() throws IOException {
PipedInputStream pis = new PipedInputStream();
PipedOutputStream pos = new PipedOutputStream(pis);
mux = new StreamMux(pos);
demux = new StreamDemux(pis);
}
public static void main(String args[]) throws Exception {
System.out.println("Initializing");
MultiplexorTest tester = new MultiplexorTest();
System.out.println("Connecting the streams");
tester.connect();
System.out.println("Starting stream 1");
tester.startStream((short) 1);
System.out.println("Starting stream 2");
tester.startStream((short) 2);
System.out.println("Starting stream 3");
tester.startStream((short) 3);
}
public void startStream(short id) throws IOException {
new WriterThread(id).start();
new ReaderThread(id).start();
}
}
1.1 spyderMQ/src/java/org/spydermq/multiplexor/MuxOutputStream.java
Index: MuxOutputStream.java
===================================================================
/*
* spyderMQ, the OpenSource JMS implementation
*
* Distributable under GPL license.
* See terms of license at gnu.org.
*/
package org.spydermq.multiplexor;
import java.io.OutputStream;
import java.io.InterruptedIOException;
import java.io.IOException;
/**
* Objects of this class provide and an OutputStream
* to a StreamMux.
*
* Objects of this class are created by a StreamMux object.
*
* @author Hiram Chirino ([EMAIL PROTECTED])
*
* @version $Revision: 1.1 $
*/
class MuxOutputStream extends OutputStream {
StreamMux streamMux;
short streamId;
byte buffer[];
short bufferLength;
MuxOutputStream(StreamMux mux, short id) {
streamMux = mux;
streamId = id;
buffer = new byte[streamMux.frameSize];
bufferLength = 0;
}
/**
* Closes this output stream and releases any system resources.
*/
public void close() throws IOException {
streamMux.closeStream(streamId);
}
/**
* Flushes this output stream and forces any buffered output bytes
* to be written out.
*/
public void flush() throws IOException {
if (bufferLength > 0)
flushBuffer();
streamMux.flush();
}
/**
* Flushes the internal buffer to the multiplexor.
*/
private void flushBuffer() throws IOException {
streamMux.write(this, buffer, bufferLength);
bufferLength = 0;
}
/**
*
*/
public void write(int data) throws IOException {
byte b = (byte) data;
buffer[bufferLength] = b;
bufferLength++;
if (bufferLength == streamMux.frameSize) {
flushBuffer();
}
}
}
1.1
spyderMQ/src/java/org/spydermq/multiplexor/SocketMultiplexor.java
Index: SocketMultiplexor.java
===================================================================
/*
* spyderMQ, the OpenSource JMS implementation
*
* Distributable under GPL license.
* See terms of license at gnu.org.
*/
package org.spydermq.multiplexor;
import java.net.Socket;
import java.io.OutputStream;
import java.io.IOException;
import java.io.InputStream;
/**
* Used to multiplex a socket's streams.
*
* With this this interface you can access the multiplexed
* streams of the socket. The multiplexed streams are
* identifed a stream id.
*
* Stream id 0 is reserved for internal use of the multiplexor.
*
* @author Hiram Chirino ([EMAIL PROTECTED])
*
* @version $Revision: 1.1 $
*/
public class SocketMultiplexor {
private Socket socket;
StreamMux mux;
StreamDemux demux;
public SocketMultiplexor(Socket s) throws IOException {
socket = s;
mux = new StreamMux(s.getOutputStream());
demux = new StreamDemux(s.getInputStream());
}
/**
* Creation date: (11/16/00 1:15:01 PM)
* @return org.spydermq.connection.StreamDemux
*/
public StreamDemux getDemux() {
return demux;
}
public InputStream getInputStream(int id) throws IOException {
return demux.getStream((short) id);
}
/**
* Creation date: (11/16/00 1:15:01 PM)
* @return org.spydermq.connection.StreamMux
*/
public StreamMux getMux() {
return mux;
}
public OutputStream getOutputStream(int id) throws IOException {
return mux.getStream((short) id);
}
/**
* Creation date: (11/16/00 1:14:41 PM)
* @return java.net.Socket
*/
public java.net.Socket getSocket() {
return socket;
}
}
1.1 spyderMQ/src/java/org/spydermq/multiplexor/StreamMux.java
Index: StreamMux.java
===================================================================
/*
* spyderMQ, the OpenSource JMS implementation
*
* Distributable under GPL license.
* See terms of license at gnu.org.
*/
package org.spydermq.multiplexor;
import java.io.*;
import java.util.*;
/**
* This class is used to multiplex from
* multiple streams into a single stream.
*
* @author Hiram Chirino ([EMAIL PROTECTED])
*
* @version $Revision: 1.1 $
*/
public class StreamMux {
short frameSize = 512;
HashMap openStreams = new HashMap();
OutputStream out;
DataOutputStream objectOut;
// Commands that can be sent over the admin stream.
static final byte OPEN_STREAM_COMMAND = 0;
static final byte CLOSE_STREAM_COMMAND = 1;
static final byte NEXT_FRAME_SHORT_COMMAND = 2;
/**
* StreamMux constructor comment.
* @param out java.io.OutputStream
*/
public StreamMux(OutputStream out) throws IOException {
this.out = out;
this.objectOut = new DataOutputStream(out);
}
void closeStream(short id) throws IOException {
if (id == 0)
throw new IOException("Stream id 0 is reserved for internal
use.");
MuxOutputStream s;
synchronized (openStreams) {
s = (MuxOutputStream) openStreams.remove(new Short(id));
}
synchronized (objectOut) {
objectOut.writeShort(0); // admin stream
objectOut.writeByte(CLOSE_STREAM_COMMAND); // command
objectOut.writeShort(id); // argument
}
}
public void flush() throws IOException {
synchronized (objectOut) {
objectOut.flush();
out.flush();
}
}
/**
* Insert the method's description here.
* Creation date: (11/15/00 5:30:55 PM)
* @return short
*/
public short getFrameSize() {
synchronized (openStreams) {
return frameSize;
}
}
public OutputStream getStream(short id) throws IOException {
if (id == 0)
throw new IOException("Stream id 0 is reserved for internal
use.");
OutputStream s;
synchronized (openStreams) {
s = (OutputStream) openStreams.get(new Short(id));
if (s != null) {
return s;
}
s = new MuxOutputStream(this, id);
openStreams.put(new Short(id), s);
}
synchronized (objectOut) {
objectOut.writeShort(0); // admin stream
objectOut.writeByte(OPEN_STREAM_COMMAND); // command
objectOut.writeShort(id); // argument
}
return s;
}
/**
* Insert the method's description here.
* Creation date: (11/15/00 5:30:55 PM)
* @param newFrameSize short
*/
public void setFrameSize(short newFrameSize) throws IOException {
synchronized (openStreams) {
if (openStreams.size() > 0)
throw new IOException("Cannot change the frame size
while there are open streams.");
frameSize = newFrameSize;
}
}
void write(MuxOutputStream s, byte b[], int len) throws IOException {
if (b == null) {
throw new NullPointerException();
} else if ((len < 0) || (len > b.length) || (len > frameSize)) {
throw new IndexOutOfBoundsException();
} else if (len == 0) {
return;
}
synchronized (objectOut) {
if (len < frameSize) {
objectOut.writeShort(0);
objectOut.writeByte(NEXT_FRAME_SHORT_COMMAND);
objectOut.writeShort(len);
}
objectOut.writeShort(s.streamId);
objectOut.write(b, 0, len);
}
}
}