Added: incubator/synapse/branches/NIO/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/io/PipedInputStream.java URL: http://svn.apache.org/viewvc/incubator/synapse/branches/NIO/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/io/PipedInputStream.java?view=auto&rev=467075 ============================================================================== --- incubator/synapse/branches/NIO/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/io/PipedInputStream.java (added) +++ incubator/synapse/branches/NIO/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/io/PipedInputStream.java Mon Oct 23 11:38:50 2006 @@ -0,0 +1,373 @@ +/* +* Copyright 2004,2005 The Apache Software Foundation. +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +package org.apache.axis2.transport.niohttp.impl.io; + +import java.io.InputStream; +import java.io.IOException; + +/** + * Copied from JDK 1.4 and slightly modified + * + * A piped input stream should be connected + * to a piped output stream; the piped input + * stream then provides whatever data bytes + * are written to the piped output stream. + * Typically, data is read from a <code>PipedInputStream</code> + * object by one thread and data is written + * to the corresponding <code>PipedOutputStream</code> + * by some other thread. Attempting to use + * both objects from a single thread is not + * recommended, as it may deadlock the thread. + * The piped input stream contains a buffer, + * decoupling read operations from write operations, + * within limits. + * + * @author James Gosling + * @version 1.32, 01/23/03 + * @see java.io.PipedOutputStream + * @since JDK1.0 + */ +public class PipedInputStream extends InputStream { + boolean closedByWriter = false; + boolean closedByReader = false; + boolean connected = false; + + /* REMIND: identification of the read and write sides needs to be + more sophisticated. Either using thread groups (but what about + pipes within a thread?) or using finalization (but it may be a + long time until the next GC). */ + Thread readSide; + Thread writeSide; + + /** + * The size of the pipe's circular input buffer. + * + * @since JDK1.1 + */ + protected static final int PIPE_SIZE = 1024; + + /** + * The circular buffer into which incoming data is placed. + * + * @since JDK1.1 + */ + protected byte buffer[] = new byte[PIPE_SIZE]; + + /** + * The index of the position in the circular buffer at which the + * next byte of data will be stored when received from the connected + * piped output stream. <code>in<0</code> implies the buffer is empty, + * <code>in==out</code> implies the buffer is full + * + * @since JDK1.1 + */ + protected int in = -1; + + /** + * The index of the position in the circular buffer at which the next + * byte of data will be read by this piped input stream. + * + * @since JDK1.1 + */ + protected int out = 0; + + /** + * Creates a <code>PipedInputStream</code> so + * that it is connected to the piped output + * stream <code>src</code>. Data bytes written + * to <code>src</code> will then be available + * as input from this stream. + * + * @param src the stream to connect to. + * @throws java.io.IOException if an I/O error occurs. + */ + public PipedInputStream(PipedOutputStream src) throws IOException { + connect(src); + } + + /** + * Creates a <code>PipedInputStream</code> so + * that it is not yet connected. It must be + * connected to a <code>PipedOutputStream</code> + * before being used. + * + * @see java.io.PipedInputStream#connect(java.io.PipedOutputStream) + * @see java.io.PipedOutputStream#connect(java.io.PipedInputStream) + */ + public PipedInputStream() { + } + + /** + * Causes this piped input stream to be connected + * to the piped output stream <code>src</code>. + * If this object is already connected to some + * other piped output stream, an <code>IOException</code> + * is thrown. + * <p/> + * If <code>src</code> is an + * unconnected piped output stream and <code>snk</code> + * is an unconnected piped input stream, they + * may be connected by either the call: + * <p/> + * <pre><code>snk.connect(src)</code> </pre> + * <p/> + * or the call: + * <p/> + * <pre><code>src.connect(snk)</code> </pre> + * <p/> + * The two + * calls have the same effect. + * + * @param src The piped output stream to connect to. + * @throws IOException if an I/O error occurs. + */ + public void connect(PipedOutputStream src) throws IOException { + src.connect(this); + } + + /** + * Receives a byte of data. + * + * @param b the byte being received + * @return bytes received (i.e. 1) or -1 if buffer is full + * @throws IOException If the pipe is broken. + * @since JDK1.1 + */ + protected synchronized int receive(int b) throws IOException { + if (!connected) { + throw new IOException("Pipe not connected"); + } else if (closedByWriter || closedByReader) { + throw new IOException("Pipe closed"); + } else if (readSide != null && !readSide.isAlive()) { + throw new IOException("Read end dead"); + } + + if (in == out) { + // buffer full + return -1; + } + + writeSide = Thread.currentThread(); + while (in == out) { + if ((readSide != null) && !readSide.isAlive()) { + throw new IOException("Pipe broken"); + } + /* full: kick any waiting readers */ + notifyAll(); + try { + wait(100); + } catch (InterruptedException ex) { + throw new java.io.InterruptedIOException(); + } + } + if (in < 0) { + in = 0; + out = 0; + } + buffer[in++] = (byte) (b & 0xFF); + if (in >= buffer.length) { + in = 0; + } + return 1; + } + + /** + * Receives data into an array of bytes. This method will + * block until some input is available. + * + * @param b the buffer into which the data is received + * @param off the start offset of the data + * @param len the maximum number of bytes received + * @return the actual number of bytes received, -1 is + * returned when the buffer is full + * @throws IOException If an I/O error has occurred. + */ + synchronized int receive(byte b[], int off, int len) throws IOException { + int read = 0; + while (--len >= 0) { + if (receive(b[off++]) == -1) { + return read; + } else { + read++; + } + } + return -1; // should never happen + } + + /** + * Notifies all waiting threads that the last byte of data has been + * received. + */ + synchronized void receivedLast() { + closedByWriter = true; + notifyAll(); + } + + /** + * Reads the next byte of data from this piped input stream. The + * value byte is returned as an <code>int</code> in the range + * <code>0</code> to <code>255</code>. If no byte is available + * because the end of the stream has been reached, the value + * <code>-1</code> is returned. This method blocks until input data + * is available, the end of the stream is detected, or an exception + * is thrown. + * If a thread was providing data bytes + * to the connected piped output stream, but + * the thread is no longer alive, then an + * <code>IOException</code> is thrown. + * + * @return the next byte of data, or <code>-1</code> if the end of the + * stream is reached. + * @throws IOException if the pipe is broken. + */ + public synchronized int read() throws IOException { + if (!connected) { + throw new IOException("Pipe not connected"); + } else if (closedByReader) { + throw new IOException("Pipe closed"); + } else if (writeSide != null && !writeSide.isAlive() + && !closedByWriter && (in < 0)) { + throw new IOException("Write end dead"); + } + + readSide = Thread.currentThread(); + int trials = 2; + while (in < 0) { + if (closedByWriter) { + /* closed by writer, return EOF */ + return -1; + } + if ((writeSide != null) && (!writeSide.isAlive()) && (--trials < 0)) { + throw new IOException("Pipe broken"); + } + /* might be a writer waiting */ + notifyAll(); + try { + wait(100); + } catch (InterruptedException ex) { + throw new java.io.InterruptedIOException(); + } + } + int ret = buffer[out++] & 0xFF; + if (out >= buffer.length) { + out = 0; + } + if (in == out) { + /* now empty */ + in = -1; + } + return ret; + } + + /** + * Reads up to <code>len</code> bytes of data from this piped input + * stream into an array of bytes. Less than <code>len</code> bytes + * will be read if the end of the data stream is reached. This method + * blocks until at least one byte of input is available. + * If a thread was providing data bytes + * to the connected piped output stream, but + * the thread is no longer alive, then an + * <code>IOException</code> is thrown. + * + * @param b the buffer into which the data is read. + * @param off the start offset of the data. + * @param len the maximum number of bytes read. + * @return the total number of bytes read into the buffer, or + * <code>-1</code> if there is no more data because the end of + * the stream has been reached. + * @throws IOException if an I/O error occurs. + */ + public synchronized int read(byte b[], int off, int len) throws IOException { + if (b == null) { + throw new NullPointerException(); + } else if ((off < 0) || (off > b.length) || (len < 0) || + ((off + len) > b.length) || ((off + len) < 0)) { + throw new IndexOutOfBoundsException(); + } else if (len == 0) { + return 0; + } + + /* possibly wait on the first character */ + int c = read(); + if (c < 0) { + return -1; + } + b[off] = (byte) c; + int rlen = 1; + while ((in >= 0) && (--len > 0)) { + b[off + rlen] = buffer[out++]; + rlen++; + if (out >= buffer.length) { + out = 0; + } + if (in == out) { + /* now empty */ + in = -1; + } + } + return rlen; + } + + /** + * Returns the number of bytes that can be read from this input + * stream without blocking. This method overrides the <code>available</code> + * method of the parent class. + * + * @return the number of bytes that can be read from this input stream + * without blocking. + * @throws IOException if an I/O error occurs. + * @since JDK1.0.2 + */ + public synchronized int available() throws IOException { + if (in < 0) + return 0; + else if (in == out) + return buffer.length; + else if (in > out) + return in - out; + else + return in + buffer.length - out; + } + + /** + * Return the number of bytes that can be written into the internal buffer + * of this input stream + * @return the number of bytes which can be written + */ + public synchronized int availableForWrite() { + if (in < 0) { + // buffer emptry + return buffer.length; + } else if (in == out) { + // buffer full + return 0; + } else { + // out < in since if as soon as out == in, in is resetted + return buffer.length - in; + } + } + + /** + * Closes this piped input stream and releases any system resources + * associated with the stream. + * + * @throws IOException if an I/O error occurs. + */ + public void close() throws IOException { + in = -1; + closedByReader = true; + } +}
Added: incubator/synapse/branches/NIO/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/io/PipedOutputStream.java URL: http://svn.apache.org/viewvc/incubator/synapse/branches/NIO/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/io/PipedOutputStream.java?view=auto&rev=467075 ============================================================================== --- incubator/synapse/branches/NIO/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/io/PipedOutputStream.java (added) +++ incubator/synapse/branches/NIO/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/io/PipedOutputStream.java Mon Oct 23 11:38:50 2006 @@ -0,0 +1,181 @@ +/* +* Copyright 2004,2005 The Apache Software Foundation. +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +package org.apache.axis2.transport.niohttp.impl.io; + +import java.io.*; + +/** + * Copied from JDK 1.4 and slightly modified + * + * A piped output stream can be connected to a piped input stream + * to create a communications pipe. The piped output stream is the + * sending end of the pipe. Typically, data is written to a + * <code>PipedOutputStream</code> object by one thread and data is + * read from the connected <code>PipedInputStream</code> by some + * other thread. Attempting to use both objects from a single thread + * is not recommended as it may deadlock the thread. + * + * @author James Gosling + * @version 1.25, 01/23/03 + * @see java.io.PipedInputStream + * @since JDK1.0 + */ +public class PipedOutputStream extends OutputStream { + + /* REMIND: identification of the read and write sides needs to be + more sophisticated. Either using thread groups (but what about + pipes within a thread?) or using finalization (but it may be a + long time until the next GC). */ + private PipedInputStream sink; + + /** + * Creates a piped output stream connected to the specified piped + * input stream. Data bytes written to this stream will then be + * available as input from <code>snk</code>. + * + * @param snk The piped input stream to connect to. + * @throws IOException if an I/O error occurs. + */ + public PipedOutputStream(PipedInputStream snk) throws IOException { + connect(snk); + } + + /** + * Creates a piped output stream that is not yet connected to a + * piped input stream. It must be connected to a piped input stream, + * either by the receiver or the sender, before being used. + * + * @see java.io.PipedInputStream#connect(java.io.PipedOutputStream) + * @see java.io.PipedOutputStream#connect(java.io.PipedInputStream) + */ + public PipedOutputStream() { + } + + /** + * Connects this piped output stream to a receiver. If this object + * is already connected to some other piped input stream, an + * <code>IOException</code> is thrown. + * <p/> + * If <code>snk</code> is an unconnected piped input stream and + * <code>src</code> is an unconnected piped output stream, they may + * be connected by either the call: + * <blockquote><pre> + * src.connect(snk)</pre></blockquote> + * or the call: + * <blockquote><pre> + * snk.connect(src)</pre></blockquote> + * The two calls have the same effect. + * + * @param snk the piped input stream to connect to. + * @throws IOException if an I/O error occurs. + */ + public synchronized void connect(PipedInputStream snk) throws IOException { + if (snk == null) { + throw new NullPointerException(); + } else if (sink != null || snk.connected) { + throw new IOException("Already connected"); + } + sink = snk; + snk.in = -1; + snk.out = 0; + snk.connected = true; + } + + /** + * Writes the specified <code>byte</code> to the piped output stream. + * If a thread was reading data bytes from the connected piped input + * stream, but the thread is no longer alive, then an + * <code>IOException</code> is thrown. + * <p/> + * Implements the <code>write</code> method of <code>OutputStream</code>. + * + * @param b the <code>byte</code> to be written. + * @throws IOException if an I/O error occurs or if + * the internal buffer of the connected Piped input stream is full + */ + public void write(int b) throws IOException { + if (sink == null) { + throw new IOException("Pipe not connected"); + } + if (sink.availableForWrite() == 0) { + throw new IOException("Internal buffer of the Piped input stream is full"); + } + sink.receive(b); + } + + /** + * Writes <code>len</code> bytes from the specified byte array + * starting at offset <code>off</code> to this piped output stream. + * If a thread was reading data bytes from the connected piped input + * stream, but the thread is no longer alive, then an + * <code>IOException</code> is thrown. + * + * @param b the data. + * @param off the start offset in the data. + * @param len the number of bytes to write. + * @throws IOException if an I/O error occurs. + */ + public void write(byte b[], int off, int len) throws IOException { + if (sink == null) { + throw new IOException("Pipe not connected"); + } else if (b == null) { + throw new NullPointerException(); + } else if ((off < 0) || (off > b.length) || (len < 0) || + ((off + len) > b.length) || ((off + len) < 0)) { + throw new IndexOutOfBoundsException(); + } else if (len == 0) { + return; + } + sink.receive(b, off, len); + } + + /** + * Flushes this output stream and forces any buffered output bytes + * to be written out. + * This will notify any readers that bytes are waiting in the pipe. + * + * @throws IOException if an I/O error occurs. + */ + public synchronized void flush() throws IOException { + if (sink != null) { + synchronized (sink) { + sink.notifyAll(); + } + } + } + + /** + * Closes this piped output stream and releases any system resources + * associated with this stream. This stream may no longer be used for + * writing bytes. + * + * @throws IOException if an I/O error occurs. + */ + public void close() throws IOException { + if (sink != null) { + sink.receivedLast(); + } + } + + /** + * Returns the number of bytes that can be written to this Piped output stream + * without blocking. + * @return number of bytes that can be written without blocking + */ + public int availableForWrite() { + return sink.availableForWrite(); + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: [EMAIL PROTECTED] For additional commands, e-mail: [EMAIL PROTECTED]
