User: hiram   
  Date: 00/11/16 14:42:52

  Added:       src/java/org/spydermq/multiplexor StreamDemux.java
  Log:
  These classes provide a way to multiplex the use of a
  socket.
  
  Revision  Changes    Path
  1.1                  spyderMQ/src/java/org/spydermq/multiplexor/StreamDemux.java
  
  Index: StreamDemux.java
  ===================================================================
  /*
   * spyderMQ, the OpenSource JMS implementation
   *
   * Distributable under GPL license.
   * See terms of license at gnu.org.
   */
  package org.spydermq.multiplexor;
  
  import java.io.*;
  import java.util.*;
  
  /**
   * This class is used to demultiplex from
   * a single stream into multiple streams.
   *
   *    @author Hiram Chirino ([EMAIL PROTECTED])
   * 
   *    @version $Revision: 1.1 $
   */
  public class StreamDemux {
  
        short frameSize = 512;
        HashMap openStreams = new HashMap();
        InputStream in;
        DataInputStream objectIn;
  
        boolean pumpingData = false;
        byte inputBuffer[] = new byte[frameSize];
  
        /**
         * StreamMux constructor comment.
         * @param out java.io.OutputStream
         */
        public StreamDemux(InputStream in) throws IOException {
                this.in = in;
                this.objectIn = new DataInputStream(in);
        }
  
        void closeStream(short id) throws IOException {
                if (id == 0)
                        throw new IOException("Stream id 0 is reserved for internal 
use.");
  
                synchronized (openStreams) {
                        openStreams.remove(new Short(id));
                }
  
        }
        
        /**
         * Creation date: (11/15/00 5:30:55 PM)
         * @return short
         */
        public short getFrameSize() {
                synchronized (openStreams) {
                        return frameSize;
                }
        }
        
        public InputStream getStream(short id) throws IOException {
                if (id == 0)
                        throw new IOException("Stream id 0 is reserved for internal 
use.");
  
                InputStream s;
                synchronized (openStreams) {
                        s = (InputStream) openStreams.get(new Short(id));
                        ;
  
                        if (s != null)
                                return s;
  
                        s = new DemuxInputStream(this, id);
                        openStreams.put(new Short(id), s);
                }
                return s;
        }
  
        /**
         * Creation date: (11/15/00 5:30:55 PM)
         * @param newFrameSize short
         */
        public void setFrameSize(short newFrameSize) throws IOException {
                synchronized (openStreams) {
                        if (openStreams.size() > 0)
                                throw new IOException("Cannot change the frame size 
while there are open streams.");
                        frameSize = newFrameSize;
                        inputBuffer = new byte[frameSize];
                }
        }
        
        public int available(DemuxInputStream s) throws IOException {
                return objectIn.available();
        }
         
        /**
         * Pumps data to all input streams until data for the
         * dest Stream arrives.  Only on thread is allowed to pump
         * data at a time and this method returns true if it pumped
         * data into its input buffer.  It returns false if another
         *thread is allready pumping data.
         */
        public boolean pumpData(DemuxInputStream dest) throws IOException {
  
                synchronized (this) {
                        if (pumpingData)
                                return false;
                        else
                                pumpingData = true;
                }
  
                // Start pumping the data
                short nextFrameSize = frameSize;
                while (true) {
                        short streamId = objectIn.readShort();
                        // Was it a command on the admin stream?
                        if (streamId == 0) {
                                // Next byte is the command.
                                switch (objectIn.readByte()) {
                                        case StreamMux.OPEN_STREAM_COMMAND :
                                                getStream(objectIn.readShort());
                                                break;
                                        case StreamMux.CLOSE_STREAM_COMMAND :
                                                DemuxInputStream s;
                                                synchronized (openStreams) {
                                                        s = (DemuxInputStream) 
openStreams.get(new Short(streamId));
                                                }
                                                if (s != null) {
                                                        closeStream(s.streamId);
                                                        s.atEOF = true;
                                                        if (s == dest) {
                                                                break;
                                                        } else {
                                                                // Wake up the thread 
that was waiting for input (it got a EOF)
                                                                synchronized 
(s.bufferMutex) {
                                                                        
s.bufferMutex.notify();
                                                                }
                                                        }
                                                }
                                                break;
                                        case StreamMux.NEXT_FRAME_SHORT_COMMAND :
                                                nextFrameSize = objectIn.readShort();
                                                break;
                                }
  
                        } else {
                                objectIn.readFully(inputBuffer, 0, nextFrameSize);
                                DemuxInputStream s;
                                synchronized (openStreams) {
                                        s = (DemuxInputStream) openStreams.get(new 
Short(streamId));
                                }
                                if (s == null)
                                        continue;
  
                                s.loadBuffer(inputBuffer, nextFrameSize);
                                if (s == dest)
                                        break;
  
                                nextFrameSize = frameSize;
                        }
                }
  
                synchronized (this) {
                        pumpingData = false;
                }
  
                // we are done pumping but another thread may be
                // intrested in pumping.
                synchronized (openStreams) {
                        Iterator iter = openStreams.values().iterator();
                        while (iter.hasNext() && pumpingData == false) {
                                DemuxInputStream s = (DemuxInputStream) iter.next();
                                synchronized (s.bufferMutex) {
                                        s.bufferMutex.notify();
                                }
                        }
                }
                return true;
        }
  }
  
  
  

Reply via email to