http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/nifi-stream-utils/src/main/java/org/apache/nifi/io/ByteArrayInputStream.java ---------------------------------------------------------------------- diff --git a/commons/nifi-stream-utils/src/main/java/org/apache/nifi/io/ByteArrayInputStream.java b/commons/nifi-stream-utils/src/main/java/org/apache/nifi/io/ByteArrayInputStream.java new file mode 100644 index 0000000..9defab4 --- /dev/null +++ b/commons/nifi-stream-utils/src/main/java/org/apache/nifi/io/ByteArrayInputStream.java @@ -0,0 +1,250 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.io; + +import java.io.InputStream; + +/** + * This class performs the same function as java.io.ByteArrayInputStream but + * does not mark its methods as synchronized + */ +public class ByteArrayInputStream extends InputStream { + + /** + * An array of bytes that was provided by the creator of the stream. + * Elements <code>buf[0]</code> through <code>buf[count-1]</code> are the + * only bytes that can ever be read from the stream; element + * <code>buf[pos]</code> is the next byte to be read. + */ + protected byte buf[]; + + /** + * The index of the next character to read from the input stream buffer. + * This value should always be nonnegative and not larger than the value of + * <code>count</code>. The next byte to be read from the input stream buffer + * will be <code>buf[pos]</code>. + */ + protected int pos; + + /** + * The currently marked position in the stream. ByteArrayInputStream objects + * are marked at position zero by default when constructed. They may be + * marked at another position within the buffer by the <code>mark()</code> + * method. The current buffer position is set to this point by the + * <code>reset()</code> method. + * <p> + * If no mark has been set, then the value of mark is the offset passed to + * the constructor (or 0 if the offset was not supplied). + * + * @since JDK1.1 + */ + protected int mark = 0; + + /** + * The index one greater than the last valid character in the input stream + * buffer. This value should always be nonnegative and not larger than the + * length of <code>buf</code>. It is one greater than the position of the + * last byte within <code>buf</code> that can ever be read from the input + * stream buffer. + */ + protected int count; + + /** + * Creates a <code>ByteArrayInputStream</code> so that it uses + * <code>buf</code> as its buffer array. The buffer array is not copied. The + * initial value of <code>pos</code> is <code>0</code> and the initial value + * of <code>count</code> is the length of <code>buf</code>. + * + * @param buf the input buffer. + */ + public ByteArrayInputStream(byte buf[]) { + this.buf = buf; + this.pos = 0; + this.count = buf.length; + } + + /** + * Creates <code>ByteArrayInputStream</code> that uses <code>buf</code> as + * its buffer array. The initial value of <code>pos</code> is + * <code>offset</code> and the initial value of <code>count</code> is the + * minimum of <code>offset+length</code> and <code>buf.length</code>. The + * buffer array is not copied. The buffer's mark is set to the specified + * offset. + * + * @param buf the input buffer. + * @param offset the offset in the buffer of the first byte to read. + * @param length the maximum number of bytes to read from the buffer. + */ + public ByteArrayInputStream(byte buf[], int offset, int length) { + this.buf = buf; + this.pos = offset; + this.count = Math.min(offset + length, buf.length); + this.mark = offset; + } + + /** + * Reads the next byte of data from this input stream. The value byte is + * returned as an <code>int</code> in the range <code>0</code> to + * <code>255</code>. If no byte is available because the end of the stream + * has been reached, the value <code>-1</code> is returned. + * <p> + * This <code>read</code> method cannot block. + * + * @return the next byte of data, or <code>-1</code> if the end of the + * stream has been reached. + */ + @Override + public int read() { + return (pos < count) ? (buf[pos++] & 0xff) : -1; + } + + /** + * Reads up to <code>len</code> bytes of data into an array of bytes from + * this input stream. If <code>pos</code> equals <code>count</code>, then + * <code>-1</code> is returned to indicate end of file. Otherwise, the + * number <code>k</code> of bytes read is equal to the smaller of + * <code>len</code> and <code>count-pos</code>. If <code>k</code> is + * positive, then bytes <code>buf[pos]</code> through + * <code>buf[pos+k-1]</code> are copied into <code>b[off]</code> through + * <code>b[off+k-1]</code> in the manner performed by + * <code>System.arraycopy</code>. The value <code>k</code> is added into + * <code>pos</code> and <code>k</code> is returned. + * <p> + * This <code>read</code> method cannot block. + * + * @param b the buffer into which the data is read. + * @param off the start offset in the destination array <code>b</code> + * @param len the maximum number of bytes read. + * @return the total number of bytes read into the buffer, or + * <code>-1</code> if there is no more data because the end of the stream + * has been reached. + * @exception NullPointerException If <code>b</code> is <code>null</code>. + * @exception IndexOutOfBoundsException If <code>off</code> is negative, + * <code>len</code> is negative, or <code>len</code> is greater than + * <code>b.length - off</code> + */ + @Override + public int read(byte b[], int off, int len) { + if (b == null) { + throw new NullPointerException(); + } else if (off < 0 || len < 0 || len > b.length - off) { + throw new IndexOutOfBoundsException(); + } + + if (pos >= count) { + return -1; + } + + int avail = count - pos; + if (len > avail) { + len = avail; + } + if (len <= 0) { + return 0; + } + System.arraycopy(buf, pos, b, off, len); + pos += len; + return len; + } + + /** + * Skips <code>n</code> bytes of input from this input stream. Fewer bytes + * might be skipped if the end of the input stream is reached. The actual + * number <code>k</code> of bytes to be skipped is equal to the smaller of + * <code>n</code> and <code>count-pos</code>. The value <code>k</code> is + * added into <code>pos</code> and <code>k</code> is returned. + * + * @param n the number of bytes to be skipped. + * @return the actual number of bytes skipped. + */ + @Override + public long skip(long n) { + long k = count - pos; + if (n < k) { + k = n < 0 ? 0 : n; + } + + pos += k; + return k; + } + + /** + * Returns the number of remaining bytes that can be read (or skipped over) + * from this input stream. + * <p> + * The value returned is <code>count - pos</code>, which is the number + * of bytes remaining to be read from the input buffer. + * + * @return the number of remaining bytes that can be read (or skipped over) + * from this input stream without blocking. + */ + @Override + public int available() { + return count - pos; + } + + /** + * Tests if this <code>InputStream</code> supports mark/reset. The + * <code>markSupported</code> method of <code>ByteArrayInputStream</code> + * always returns <code>true</code>. + * + * @since JDK1.1 + */ + @Override + public boolean markSupported() { + return true; + } + + /** + * Set the current marked position in the stream. ByteArrayInputStream + * objects are marked at position zero by default when constructed. They may + * be marked at another position within the buffer by this method. + * <p> + * If no mark has been set, then the value of the mark is the offset passed + * to the constructor (or 0 if the offset was not supplied). + * + * <p> + * Note: The <code>readAheadLimit</code> for this class has no meaning. + * + * @since JDK1.1 + */ + @Override + public void mark(int readAheadLimit) { + mark = pos; + } + + /** + * Resets the buffer to the marked position. The marked position is 0 unless + * another position was marked or an offset was specified in the + * constructor. + */ + @Override + public void reset() { + pos = mark; + } + + /** + * Closing a <tt>ByteArrayInputStream</tt> has no effect. The methods in + * this class can be called after the stream has been closed without + * generating an <tt>IOException</tt>. + * <p> + */ + @Override + public void close() { + } + +}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/nifi-stream-utils/src/main/java/org/apache/nifi/io/ByteArrayOutputStream.java ---------------------------------------------------------------------- diff --git a/commons/nifi-stream-utils/src/main/java/org/apache/nifi/io/ByteArrayOutputStream.java b/commons/nifi-stream-utils/src/main/java/org/apache/nifi/io/ByteArrayOutputStream.java new file mode 100644 index 0000000..b504c23 --- /dev/null +++ b/commons/nifi-stream-utils/src/main/java/org/apache/nifi/io/ByteArrayOutputStream.java @@ -0,0 +1,250 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.io; + +import java.io.IOException; +import java.io.OutputStream; +import java.io.UnsupportedEncodingException; +import java.util.Arrays; + +/** + * This class provides a more efficient implementation of the + * java.io.ByteArrayOutputStream. The efficiency is gained in two ways: + * <ul> + * <li>The write methods are not synchronized</li> + * <li>The class provides {@link #getUnderlyingBuffer()} and + * {@link #getBufferLength()}, which can be used to access the underlying byte + * array directly, rather than the System.arraycopy that {@link #toByteArray()} + * uses + * </ul> + * + */ +public class ByteArrayOutputStream extends OutputStream { + + /** + * The buffer where data is stored. + */ + protected byte buf[]; + + /** + * The number of valid bytes in the buffer. + */ + protected int count; + + /** + * Creates a new byte array output stream. The buffer capacity is initially + * 32 bytes, though its size increases if necessary. + */ + public ByteArrayOutputStream() { + this(32); + } + + /** + * Creates a new byte array output stream, with a buffer capacity of the + * specified size, in bytes. + * + * @param size the initial size. + * @exception IllegalArgumentException if size is negative. + */ + public ByteArrayOutputStream(int size) { + if (size < 0) { + throw new IllegalArgumentException("Negative initial size: " + + size); + } + buf = new byte[size]; + } + + /** + * Increases the capacity if necessary to ensure that it can hold at least + * the number of elements specified by the minimum capacity argument. + * + * @param minCapacity the desired minimum capacity + * @throws OutOfMemoryError if {@code minCapacity < 0}. This is interpreted + * as a request for the unsatisfiably large capacity + * {@code (long) Integer.MAX_VALUE + (minCapacity - Integer.MAX_VALUE)}. + */ + private void ensureCapacity(int minCapacity) { + // overflow-conscious code + if (minCapacity - buf.length > 0) { + grow(minCapacity); + } + } + + /** + * Increases the capacity to ensure that it can hold at least the number of + * elements specified by the minimum capacity argument. + * + * @param minCapacity the desired minimum capacity + */ + private void grow(int minCapacity) { + // overflow-conscious code + int oldCapacity = buf.length; + int newCapacity = oldCapacity << 1; + if (newCapacity - minCapacity < 0) { + newCapacity = minCapacity; + } + if (newCapacity < 0) { + if (minCapacity < 0) // overflow + { + throw new OutOfMemoryError(); + } + newCapacity = Integer.MAX_VALUE; + } + buf = Arrays.copyOf(buf, newCapacity); + } + + /** + * Writes the specified byte to this byte array output stream. + * + * @param b the byte to be written. + */ + @Override + public void write(int b) { + ensureCapacity(count + 1); + buf[count] = (byte) b; + count += 1; + } + + /** + * Writes <code>len</code> bytes from the specified byte array starting at + * offset <code>off</code> to this byte array output stream. + * + * @param b the data. + * @param off the start offset in the data. + * @param len the number of bytes to write. + */ + @Override + public void write(byte b[], int off, int len) { + if ((off < 0) || (off > b.length) || (len < 0) + || ((off + len) - b.length > 0)) { + throw new IndexOutOfBoundsException(); + } + ensureCapacity(count + len); + System.arraycopy(b, off, buf, count, len); + count += len; + } + + /** + * Writes the complete contents of this byte array output stream to the + * specified output stream argument, as if by calling the output stream's + * write method using <code>out.write(buf, 0, count)</code>. + * + * @param out the output stream to which to write the data. + * @exception IOException if an I/O error occurs. + */ + public void writeTo(OutputStream out) throws IOException { + out.write(buf, 0, count); + } + + /** + * Resets the <code>count</code> field of this byte array output stream to + * zero, so that all currently accumulated output in the output stream is + * discarded. The output stream can be used again, reusing the already + * allocated buffer space. + * + * @see java.io.ByteArrayInputStream#count + */ + public void reset() { + count = 0; + } + + /** + * Creates a newly allocated byte array. Its size is the current size of + * this output stream and the valid contents of the buffer have been copied + * into it. + * + * @return the current contents of this output stream, as a byte array. + * @see java.io.ByteArrayOutputStream#size() + */ + public byte toByteArray () + [] { + return Arrays.copyOf(buf, count); + } + + /** + * Returns the current size of the buffer. + * + * @return the value of the <code>count</code> field, which is the number of + * valid bytes in this output stream. + * @see java.io.ByteArrayOutputStream#count + */ + public int size() { + return count; + } + + /** + * Converts the buffer's contents into a string decoding bytes using the + * platform's default character set. The length of the new <tt>String</tt> + * is a function of the character set, and hence may not be equal to the + * size of the buffer. + * + * <p> + * This method always replaces malformed-input and unmappable-character + * sequences with the default replacement string for the platform's default + * character set. The {@linkplain java.nio.charset.CharsetDecoder} class + * should be used when more control over the decoding process is required. + * + * @return String decoded from the buffer's contents. + * @since JDK1.1 + */ + @Override + public String toString() { + return new String(buf, 0, count); + } + + /** + * Converts the buffer's contents into a string by decoding the bytes using + * the specified {@link java.nio.charset.Charset charsetName}. The length of + * the new <tt>String</tt> is a function of the charset, and hence may not + * be equal to the length of the byte array. + * + * <p> + * This method always replaces malformed-input and unmappable-character + * sequences with this charset's default replacement string. The {@link + * java.nio.charset.CharsetDecoder} class should be used when more control + * over the decoding process is required. + * + * @param charsetName the name of a supported + * {@linkplain java.nio.charset.Charset <code>charset</code>} + * @return String decoded from the buffer's contents. + * @exception UnsupportedEncodingException If the named charset is not + * supported + * @since JDK1.1 + */ + public String toString(String charsetName) throws UnsupportedEncodingException { + return new String(buf, 0, count, charsetName); + } + + /** + * Closing a <tt>ByteArrayOutputStream</tt> has no effect. The methods in + * this class can be called after the stream has been closed without + * generating an <tt>IOException</tt>. + * <p> + * + */ + @Override + public void close() { + } + + public byte[] getUnderlyingBuffer() { + return buf; + } + + public int getBufferLength() { + return count; + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/nifi-stream-utils/src/main/java/org/apache/nifi/io/ByteCountingInputStream.java ---------------------------------------------------------------------- diff --git a/commons/nifi-stream-utils/src/main/java/org/apache/nifi/io/ByteCountingInputStream.java b/commons/nifi-stream-utils/src/main/java/org/apache/nifi/io/ByteCountingInputStream.java new file mode 100644 index 0000000..792cc32 --- /dev/null +++ b/commons/nifi-stream-utils/src/main/java/org/apache/nifi/io/ByteCountingInputStream.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.io; + +import java.io.IOException; +import java.io.InputStream; + +public class ByteCountingInputStream extends InputStream { + + private final InputStream in; + private long bytesRead = 0L; + private long bytesSkipped = 0L; + + private long bytesSinceMark = 0L; + + public ByteCountingInputStream(final InputStream in) { + this.in = in; + } + + @Override + public int read() throws IOException { + final int fromSuper = in.read(); + if (fromSuper >= 0) { + bytesRead++; + bytesSinceMark++; + } + return fromSuper; + } + + @Override + public int read(byte[] b, int off, int len) throws IOException { + final int fromSuper = in.read(b, off, len); + if (fromSuper >= 0) { + bytesRead += fromSuper; + bytesSinceMark += fromSuper; + } + + return fromSuper; + } + + @Override + public int read(byte[] b) throws IOException { + return read(b, 0, b.length); + } + + @Override + public long skip(final long n) throws IOException { + final long skipped = in.skip(n); + if (skipped >= 0) { + bytesSkipped += skipped; + bytesSinceMark += skipped; + } + return skipped; + } + + public long getBytesRead() { + return bytesRead; + } + + public long getBytesSkipped() { + return bytesSkipped; + } + + public long getBytesConsumed() { + return getBytesRead() + getBytesSkipped(); + } + + @Override + public void mark(final int readlimit) { + in.mark(readlimit); + + bytesSinceMark = 0L; + } + + @Override + public boolean markSupported() { + return in.markSupported(); + } + + @Override + public void reset() throws IOException { + in.reset(); + bytesRead -= bytesSinceMark; + } + + @Override + public void close() throws IOException { + in.close(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/nifi-stream-utils/src/main/java/org/apache/nifi/io/ByteCountingOutputStream.java ---------------------------------------------------------------------- diff --git a/commons/nifi-stream-utils/src/main/java/org/apache/nifi/io/ByteCountingOutputStream.java b/commons/nifi-stream-utils/src/main/java/org/apache/nifi/io/ByteCountingOutputStream.java new file mode 100644 index 0000000..c7b77ff --- /dev/null +++ b/commons/nifi-stream-utils/src/main/java/org/apache/nifi/io/ByteCountingOutputStream.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.io; + +import java.io.IOException; +import java.io.OutputStream; + +public class ByteCountingOutputStream extends OutputStream { + + private final OutputStream out; + private long bytesWritten = 0L; + + public ByteCountingOutputStream(final OutputStream out) { + this.out = out; + } + + @Override + public void write(int b) throws IOException { + out.write(b); + bytesWritten++; + } + + @Override + public void write(byte[] b) throws IOException { + write(b, 0, b.length); + } + + ; + + @Override + public void write(byte[] b, int off, int len) throws IOException { + out.write(b, off, len); + bytesWritten += len; + } + + public long getBytesWritten() { + return bytesWritten; + } + + @Override + public void flush() throws IOException { + out.flush(); + } + + @Override + public void close() throws IOException { + out.close(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/nifi-stream-utils/src/main/java/org/apache/nifi/io/DataOutputStream.java ---------------------------------------------------------------------- diff --git a/commons/nifi-stream-utils/src/main/java/org/apache/nifi/io/DataOutputStream.java b/commons/nifi-stream-utils/src/main/java/org/apache/nifi/io/DataOutputStream.java new file mode 100644 index 0000000..6af06d3 --- /dev/null +++ b/commons/nifi-stream-utils/src/main/java/org/apache/nifi/io/DataOutputStream.java @@ -0,0 +1,417 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.io; + +import java.io.DataOutput; +import java.io.FilterOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.io.UTFDataFormatException; + +/** + * This class is different from java.io.DataOutputStream in that it does + * synchronize on its methods. + */ +public class DataOutputStream extends FilterOutputStream implements DataOutput { + + /** + * The number of bytes written to the data output stream so far. If this + * counter overflows, it will be wrapped to Integer.MAX_VALUE. + */ + protected int written; + + /** + * bytearr is initialized on demand by writeUTF + */ + private byte[] bytearr = null; + + /** + * Creates a new data output stream to write data to the specified + * underlying output stream. The counter <code>written</code> is set to + * zero. + * + * @param out the underlying output stream, to be saved for later use. + * @see java.io.FilterOutputStream#out + */ + public DataOutputStream(OutputStream out) { + super(out); + } + + /** + * Increases the written counter by the specified value until it reaches + * Integer.MAX_VALUE. + */ + private void incCount(int value) { + int temp = written + value; + if (temp < 0) { + temp = Integer.MAX_VALUE; + } + written = temp; + } + + /** + * Writes the specified byte (the low eight bits of the argument + * <code>b</code>) to the underlying output stream. If no exception is + * thrown, the counter <code>written</code> is incremented by + * <code>1</code>. + * <p> + * Implements the <code>write</code> method of <code>OutputStream</code>. + * + * @param b the <code>byte</code> to be written. + * @exception IOException if an I/O error occurs. + * @see java.io.FilterOutputStream#out + */ + @Override + public void write(int b) throws IOException { + out.write(b); + incCount(1); + } + + /** + * Writes <code>len</code> bytes from the specified byte array starting at + * offset <code>off</code> to the underlying output stream. If no exception + * is thrown, the counter <code>written</code> is incremented by + * <code>len</code>. + * + * @param b the data. + * @param off the start offset in the data. + * @param len the number of bytes to write. + * @exception IOException if an I/O error occurs. + * @see java.io.FilterOutputStream#out + */ + @Override + public void write(byte b[], int off, int len) throws IOException { + out.write(b, off, len); + incCount(len); + } + + /** + * Flushes this data output stream. This forces any buffered output bytes to + * be written out to the stream. + * <p> + * The <code>flush</code> method of <code>DataOutputStream</code> calls the + * <code>flush</code> method of its underlying output stream. + * + * @exception IOException if an I/O error occurs. + * @see java.io.FilterOutputStream#out + * @see java.io.OutputStream#flush() + */ + @Override + public void flush() throws IOException { + out.flush(); + } + + /** + * Writes a <code>boolean</code> to the underlying output stream as a 1-byte + * value. The value <code>true</code> is written out as the value + * <code>(byte)1</code>; the value <code>false</code> is written out as the + * value <code>(byte)0</code>. If no exception is thrown, the counter + * <code>written</code> is incremented by <code>1</code>. + * + * @param v a <code>boolean</code> value to be written. + * @exception IOException if an I/O error occurs. + * @see java.io.FilterOutputStream#out + */ + @Override + public final void writeBoolean(boolean v) throws IOException { + out.write(v ? 1 : 0); + incCount(1); + } + + /** + * Writes out a <code>byte</code> to the underlying output stream as a + * 1-byte value. If no exception is thrown, the counter <code>written</code> + * is incremented by <code>1</code>. + * + * @param v a <code>byte</code> value to be written. + * @exception IOException if an I/O error occurs. + * @see java.io.FilterOutputStream#out + */ + @Override + public final void writeByte(int v) throws IOException { + out.write(v); + incCount(1); + } + + /** + * Writes a <code>short</code> to the underlying output stream as two bytes, + * high byte first. If no exception is thrown, the counter + * <code>written</code> is incremented by <code>2</code>. + * + * @param v a <code>short</code> to be written. + * @exception IOException if an I/O error occurs. + * @see java.io.FilterOutputStream#out + */ + @Override + public final void writeShort(int v) throws IOException { + out.write((v >>> 8) & 0xFF); + out.write((v) & 0xFF); + incCount(2); + } + + /** + * Writes a <code>char</code> to the underlying output stream as a 2-byte + * value, high byte first. If no exception is thrown, the counter + * <code>written</code> is incremented by <code>2</code>. + * + * @param v a <code>char</code> value to be written. + * @exception IOException if an I/O error occurs. + * @see java.io.FilterOutputStream#out + */ + @Override + public final void writeChar(int v) throws IOException { + out.write((v >>> 8) & 0xFF); + out.write((v) & 0xFF); + incCount(2); + } + + /** + * Writes an <code>int</code> to the underlying output stream as four bytes, + * high byte first. If no exception is thrown, the counter + * <code>written</code> is incremented by <code>4</code>. + * + * @param v an <code>int</code> to be written. + * @exception IOException if an I/O error occurs. + * @see java.io.FilterOutputStream#out + */ + @Override + public final void writeInt(int v) throws IOException { + out.write((v >>> 24) & 0xFF); + out.write((v >>> 16) & 0xFF); + out.write((v >>> 8) & 0xFF); + out.write((v) & 0xFF); + incCount(4); + } + + private final byte writeBuffer[] = new byte[8]; + + /** + * Writes a <code>long</code> to the underlying output stream as eight + * bytes, high byte first. In no exception is thrown, the counter + * <code>written</code> is incremented by <code>8</code>. + * + * @param v a <code>long</code> to be written. + * @exception IOException if an I/O error occurs. + * @see java.io.FilterOutputStream#out + */ + @Override + public final void writeLong(long v) throws IOException { + writeBuffer[0] = (byte) (v >>> 56); + writeBuffer[1] = (byte) (v >>> 48); + writeBuffer[2] = (byte) (v >>> 40); + writeBuffer[3] = (byte) (v >>> 32); + writeBuffer[4] = (byte) (v >>> 24); + writeBuffer[5] = (byte) (v >>> 16); + writeBuffer[6] = (byte) (v >>> 8); + writeBuffer[7] = (byte) (v); + out.write(writeBuffer, 0, 8); + incCount(8); + } + + /** + * Converts the float argument to an <code>int</code> using the + * <code>floatToIntBits</code> method in class <code>Float</code>, and then + * writes that <code>int</code> value to the underlying output stream as a + * 4-byte quantity, high byte first. If no exception is thrown, the counter + * <code>written</code> is incremented by <code>4</code>. + * + * @param v a <code>float</code> value to be written. + * @exception IOException if an I/O error occurs. + * @see java.io.FilterOutputStream#out + * @see java.lang.Float#floatToIntBits(float) + */ + @Override + public final void writeFloat(float v) throws IOException { + writeInt(Float.floatToIntBits(v)); + } + + /** + * Converts the double argument to a <code>long</code> using the + * <code>doubleToLongBits</code> method in class <code>Double</code>, and + * then writes that <code>long</code> value to the underlying output stream + * as an 8-byte quantity, high byte first. If no exception is thrown, the + * counter <code>written</code> is incremented by <code>8</code>. + * + * @param v a <code>double</code> value to be written. + * @exception IOException if an I/O error occurs. + * @see java.io.FilterOutputStream#out + * @see java.lang.Double#doubleToLongBits(double) + */ + @Override + public final void writeDouble(double v) throws IOException { + writeLong(Double.doubleToLongBits(v)); + } + + /** + * Writes out the string to the underlying output stream as a sequence of + * bytes. Each character in the string is written out, in sequence, by + * discarding its high eight bits. If no exception is thrown, the counter + * <code>written</code> is incremented by the length of <code>s</code>. + * + * @param s a string of bytes to be written. + * @exception IOException if an I/O error occurs. + * @see java.io.FilterOutputStream#out + */ + @Override + public final void writeBytes(String s) throws IOException { + int len = s.length(); + for (int i = 0; i < len; i++) { + out.write((byte) s.charAt(i)); + } + incCount(len); + } + + /** + * Writes a string to the underlying output stream as a sequence of + * characters. Each character is written to the data output stream as if by + * the <code>writeChar</code> method. If no exception is thrown, the counter + * <code>written</code> is incremented by twice the length of + * <code>s</code>. + * + * @param s a <code>String</code> value to be written. + * @exception IOException if an I/O error occurs. + * @see java.io.DataOutputStream#writeChar(int) + * @see java.io.FilterOutputStream#out + */ + @Override + public final void writeChars(String s) throws IOException { + int len = s.length(); + for (int i = 0; i < len; i++) { + int v = s.charAt(i); + out.write((v >>> 8) & 0xFF); + out.write((v) & 0xFF); + } + incCount(len * 2); + } + + /** + * Writes a string to the underlying output stream using + * <a href="DataInput.html#modified-utf-8">modified UTF-8</a> + * encoding in a machine-independent manner. + * <p> + * First, two bytes are written to the output stream as if by the + * <code>writeShort</code> method giving the number of bytes to follow. This + * value is the number of bytes actually written out, not the length of the + * string. Following the length, each character of the string is output, in + * sequence, using the modified UTF-8 encoding for the character. If no + * exception is thrown, the counter <code>written</code> is incremented by + * the total number of bytes written to the output stream. This will be at + * least two plus the length of <code>str</code>, and at most two plus + * thrice the length of <code>str</code>. + * + * @param str a string to be written. + * @exception IOException if an I/O error occurs. + */ + @Override + public final void writeUTF(String str) throws IOException { + writeUTF(str, this); + } + + /** + * Writes a string to the specified DataOutput using + * <a href="DataInput.html#modified-utf-8">modified UTF-8</a> + * encoding in a machine-independent manner. + * <p> + * First, two bytes are written to out as if by the <code>writeShort</code> + * method giving the number of bytes to follow. This value is the number of + * bytes actually written out, not the length of the string. Following the + * length, each character of the string is output, in sequence, using the + * modified UTF-8 encoding for the character. If no exception is thrown, the + * counter <code>written</code> is incremented by the total number of bytes + * written to the output stream. This will be at least two plus the length + * of <code>str</code>, and at most two plus thrice the length of + * <code>str</code>. + * + * @param str a string to be written. + * @param out destination to write to + * @return The number of bytes written out. + * @exception IOException if an I/O error occurs. + */ + static int writeUTF(String str, DataOutput out) throws IOException { + int strlen = str.length(); + int utflen = 0; + int c, count = 0; + + /* use charAt instead of copying String to char array */ + for (int i = 0; i < strlen; i++) { + c = str.charAt(i); + if ((c >= 0x0001) && (c <= 0x007F)) { + utflen++; + } else if (c > 0x07FF) { + utflen += 3; + } else { + utflen += 2; + } + } + + if (utflen > 65535) { + throw new UTFDataFormatException("encoded string too long: " + utflen + " bytes"); + } + + byte[] bytearr = null; + if (out instanceof DataOutputStream) { + DataOutputStream dos = (DataOutputStream) out; + if (dos.bytearr == null || (dos.bytearr.length < (utflen + 2))) { + dos.bytearr = new byte[(utflen * 2) + 2]; + } + bytearr = dos.bytearr; + } else { + bytearr = new byte[utflen + 2]; + } + + bytearr[count++] = (byte) ((utflen >>> 8) & 0xFF); + bytearr[count++] = (byte) ((utflen) & 0xFF); + + int i = 0; + for (i = 0; i < strlen; i++) { + c = str.charAt(i); + if (!((c >= 0x0001) && (c <= 0x007F))) { + break; + } + bytearr[count++] = (byte) c; + } + + for (; i < strlen; i++) { + c = str.charAt(i); + if ((c >= 0x0001) && (c <= 0x007F)) { + bytearr[count++] = (byte) c; + + } else if (c > 0x07FF) { + bytearr[count++] = (byte) (0xE0 | ((c >> 12) & 0x0F)); + bytearr[count++] = (byte) (0x80 | ((c >> 6) & 0x3F)); + bytearr[count++] = (byte) (0x80 | ((c) & 0x3F)); + } else { + bytearr[count++] = (byte) (0xC0 | ((c >> 6) & 0x1F)); + bytearr[count++] = (byte) (0x80 | ((c) & 0x3F)); + } + } + out.write(bytearr, 0, utflen + 2); + return utflen + 2; + } + + /** + * Returns the current value of the counter <code>written</code>, the number + * of bytes written to this data output stream so far. If the counter + * overflows, it will be wrapped to Integer.MAX_VALUE. + * + * @return the value of the <code>written</code> field. + * @see java.io.DataOutputStream#written + */ + public final int size() { + return written; + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/nifi-stream-utils/src/main/java/org/apache/nifi/io/GZIPOutputStream.java ---------------------------------------------------------------------- diff --git a/commons/nifi-stream-utils/src/main/java/org/apache/nifi/io/GZIPOutputStream.java b/commons/nifi-stream-utils/src/main/java/org/apache/nifi/io/GZIPOutputStream.java new file mode 100644 index 0000000..875b838 --- /dev/null +++ b/commons/nifi-stream-utils/src/main/java/org/apache/nifi/io/GZIPOutputStream.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.io; + +import java.io.IOException; +import java.io.OutputStream; + +/** + * <p> + * This class extends the {@link java.util.zip.GZIPOutputStream} by allowing the + * constructor to provide a compression level, and uses a default value of 1, + * rather than 5. + * </p> + */ +public class GZIPOutputStream extends java.util.zip.GZIPOutputStream { + + public static final int DEFAULT_COMPRESSION_LEVEL = 1; + + public GZIPOutputStream(final OutputStream out) throws IOException { + this(out, DEFAULT_COMPRESSION_LEVEL); + } + + public GZIPOutputStream(final OutputStream out, final int compressionLevel) throws IOException { + super(out); + def.setLevel(compressionLevel); + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/nifi-stream-utils/src/main/java/org/apache/nifi/io/LeakyBucketStreamThrottler.java ---------------------------------------------------------------------- diff --git a/commons/nifi-stream-utils/src/main/java/org/apache/nifi/io/LeakyBucketStreamThrottler.java b/commons/nifi-stream-utils/src/main/java/org/apache/nifi/io/LeakyBucketStreamThrottler.java new file mode 100644 index 0000000..0ebe16d --- /dev/null +++ b/commons/nifi-stream-utils/src/main/java/org/apache/nifi/io/LeakyBucketStreamThrottler.java @@ -0,0 +1,324 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.io; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +public class LeakyBucketStreamThrottler implements StreamThrottler { + + private final int maxBytesPerSecond; + private final BlockingQueue<Request> requestQueue = new LinkedBlockingQueue<Request>(); + private final ScheduledExecutorService executorService; + private final AtomicBoolean shutdown = new AtomicBoolean(false); + + public LeakyBucketStreamThrottler(final int maxBytesPerSecond) { + this.maxBytesPerSecond = maxBytesPerSecond; + + executorService = Executors.newSingleThreadScheduledExecutor(); + final Runnable task = new Drain(); + executorService.scheduleAtFixedRate(task, 0, 1000, TimeUnit.MILLISECONDS); + } + + @Override + public void close() { + this.shutdown.set(true); + + executorService.shutdown(); + try { + // Should not take more than 2 seconds because we run every second. If it takes more than + // 2 seconds, it is because the Runnable thread is blocking on a write; in this case, + // we will just ignore it and return + executorService.awaitTermination(2, TimeUnit.SECONDS); + } catch (InterruptedException e) { + } + } + + @Override + public OutputStream newThrottledOutputStream(final OutputStream toWrap) { + return new OutputStream() { + @Override + public void write(final int b) throws IOException { + write(new byte[]{(byte) b}, 0, 1); + } + + @Override + public void write(byte[] b) throws IOException { + write(b, 0, b.length); + } + + @Override + public void write(byte[] b, int off, int len) throws IOException { + final InputStream in = new ByteArrayInputStream(b, off, len); + LeakyBucketStreamThrottler.this.copy(in, toWrap); + } + + @Override + public void close() throws IOException { + toWrap.close(); + } + + @Override + public void flush() throws IOException { + toWrap.flush(); + } + }; + } + + @Override + public InputStream newThrottledInputStream(final InputStream toWrap) { + return new InputStream() { + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + + @Override + public int read() throws IOException { + final ByteArrayOutputStream baos = new ByteArrayOutputStream(1); + LeakyBucketStreamThrottler.this.copy(toWrap, baos, 1L); + if (baos.getBufferLength() < 1) { + return -1; + } + + return baos.getUnderlyingBuffer()[0] & 0xFF; + } + + @Override + public int read(final byte[] b) throws IOException { + if(b.length == 0){ + return 0; + } + return read(b, 0, b.length); + } + + @Override + public int read(byte[] b, int off, int len) throws IOException { + baos.reset(); + final int copied = (int) LeakyBucketStreamThrottler.this.copy(toWrap, baos, len); + if (copied == 0) { + return -1; + } + System.arraycopy(baos.getUnderlyingBuffer(), 0, b, off, copied); + return copied; + } + + @Override + public void close() throws IOException { + toWrap.close(); + } + + @Override + public int available() throws IOException { + return toWrap.available(); + } + }; + } + + @Override + public long copy(final InputStream in, final OutputStream out) throws IOException { + return copy(in, out, -1); + } + + @Override + public long copy(final InputStream in, final OutputStream out, final long maxBytes) throws IOException { + long totalBytesCopied = 0; + boolean finished = false; + while (!finished) { + final long requestMax = (maxBytes < 0) ? Long.MAX_VALUE : maxBytes - totalBytesCopied; + final Request request = new Request(in, out, requestMax); + boolean transferred = false; + while (!transferred) { + if (shutdown.get()) { + throw new IOException("Throttler shutdown"); + } + + try { + transferred = requestQueue.offer(request, 1000, TimeUnit.MILLISECONDS); + } catch (final InterruptedException e) { + throw new IOException("Interrupted", e); + } + } + + final BlockingQueue<Response> responseQueue = request.getResponseQueue(); + Response response = null; + while (response == null) { + try { + if (shutdown.get()) { + throw new IOException("Throttler shutdown"); + } + response = responseQueue.poll(1000L, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + throw new IOException("Interrupted", e); + } + } + + if (!response.isSuccess()) { + throw response.getError(); + } + + totalBytesCopied += response.getBytesCopied(); + finished = (response.getBytesCopied() == 0) || (totalBytesCopied >= maxBytes && maxBytes > 0); + } + + return totalBytesCopied; + } + + /** + * This class is responsible for draining water from the leaky bucket. I.e., + * it actually moves the data + */ + private class Drain implements Runnable { + + private final byte[] buffer; + + public Drain() { + final int bufferSize = Math.min(4096, maxBytesPerSecond); + buffer = new byte[bufferSize]; + } + + @Override + public void run() { + final long start = System.currentTimeMillis(); + + int bytesTransferred = 0; + while (bytesTransferred < maxBytesPerSecond) { + final long maxMillisToWait = 1000 - (System.currentTimeMillis() - start); + if (maxMillisToWait < 1) { + return; + } + + try { + final Request request = requestQueue.poll(maxMillisToWait, TimeUnit.MILLISECONDS); + if (request == null) { + return; + } + + final BlockingQueue<Response> responseQueue = request.getResponseQueue(); + + final OutputStream out = request.getOutputStream(); + final InputStream in = request.getInputStream(); + + try { + final long requestMax = request.getMaxBytesToCopy(); + long maxBytesToTransfer; + if (requestMax < 0) { + maxBytesToTransfer = Math.min(buffer.length, maxBytesPerSecond - bytesTransferred); + } else { + maxBytesToTransfer = Math.min(requestMax, + Math.min(buffer.length, maxBytesPerSecond - bytesTransferred)); + } + maxBytesToTransfer = Math.max(1L, maxBytesToTransfer); + + final int bytesCopied = fillBuffer(in, maxBytesToTransfer); + out.write(buffer, 0, bytesCopied); + + final Response response = new Response(true, bytesCopied); + responseQueue.put(response); + bytesTransferred += bytesCopied; + } catch (final IOException e) { + final Response response = new Response(e); + responseQueue.put(response); + } + } catch (InterruptedException e) { + } + } + } + + private int fillBuffer(final InputStream in, final long maxBytes) throws IOException { + int bytesRead = 0; + int len; + while (bytesRead < maxBytes && (len = in.read(buffer, bytesRead, (int) Math.min(maxBytes - bytesRead, buffer.length - bytesRead))) > 0) { + bytesRead += len; + } + + return bytesRead; + } + } + + private static class Response { + + private final boolean success; + private final IOException error; + private final int bytesCopied; + + public Response(final boolean success, final int bytesCopied) { + this.success = success; + this.bytesCopied = bytesCopied; + this.error = null; + } + + public Response(final IOException error) { + this.success = false; + this.error = error; + this.bytesCopied = -1; + } + + public boolean isSuccess() { + return success; + } + + public IOException getError() { + return error; + } + + public int getBytesCopied() { + return bytesCopied; + } + } + + private static class Request { + + private final OutputStream out; + private final InputStream in; + private final long maxBytesToCopy; + private final BlockingQueue<Response> responseQueue; + + public Request(final InputStream in, final OutputStream out, final long maxBytesToCopy) { + this.out = out; + this.in = in; + this.maxBytesToCopy = maxBytesToCopy; + this.responseQueue = new LinkedBlockingQueue<Response>(1); + } + + public BlockingQueue<Response> getResponseQueue() { + return this.responseQueue; + } + + public OutputStream getOutputStream() { + return out; + } + + public InputStream getInputStream() { + return in; + } + + public long getMaxBytesToCopy() { + return maxBytesToCopy; + } + + @Override + public String toString() { + return "Request[maxBytes=" + maxBytesToCopy + "]"; + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/nifi-stream-utils/src/main/java/org/apache/nifi/io/NonCloseableInputStream.java ---------------------------------------------------------------------- diff --git a/commons/nifi-stream-utils/src/main/java/org/apache/nifi/io/NonCloseableInputStream.java b/commons/nifi-stream-utils/src/main/java/org/apache/nifi/io/NonCloseableInputStream.java new file mode 100644 index 0000000..1fbb093 --- /dev/null +++ b/commons/nifi-stream-utils/src/main/java/org/apache/nifi/io/NonCloseableInputStream.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.io; + +import java.io.FilterInputStream; +import java.io.IOException; +import java.io.InputStream; + +/** + * Wraps and InputStream so that the underlying InputStream cannot be closed. + * This is used so that the InputStream can be wrapped with yet another + * InputStream and prevent the outer layer from closing the inner InputStream + */ +public class NonCloseableInputStream extends FilterInputStream { + + private final InputStream toWrap; + + public NonCloseableInputStream(final InputStream toWrap) { + super(toWrap); + this.toWrap = toWrap; + } + + @Override + public int read() throws IOException { + return toWrap.read(); + } + + @Override + public int read(byte[] b) throws IOException { + return toWrap.read(b); + } + + @Override + public int read(byte[] b, int off, int len) throws IOException { + return toWrap.read(b, off, len); + } + + @Override + public void close() throws IOException { + // do nothing + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/nifi-stream-utils/src/main/java/org/apache/nifi/io/NonCloseableOutputStream.java ---------------------------------------------------------------------- diff --git a/commons/nifi-stream-utils/src/main/java/org/apache/nifi/io/NonCloseableOutputStream.java b/commons/nifi-stream-utils/src/main/java/org/apache/nifi/io/NonCloseableOutputStream.java new file mode 100644 index 0000000..731e409 --- /dev/null +++ b/commons/nifi-stream-utils/src/main/java/org/apache/nifi/io/NonCloseableOutputStream.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.io; + +import java.io.FilterOutputStream; +import java.io.IOException; +import java.io.OutputStream; + +public class NonCloseableOutputStream extends FilterOutputStream { + + private final OutputStream out; + + public NonCloseableOutputStream(final OutputStream out) { + super(out); + this.out = out; + } + + @Override + public void write(byte[] b) throws IOException { + out.write(b); + } + + @Override + public void write(byte[] b, int off, int len) throws IOException { + out.write(b, off, len); + } + + @Override + public void write(int b) throws IOException { + out.write(b); + } + + @Override + public void close() throws IOException { + out.flush(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/nifi-stream-utils/src/main/java/org/apache/nifi/io/NullOutputStream.java ---------------------------------------------------------------------- diff --git a/commons/nifi-stream-utils/src/main/java/org/apache/nifi/io/NullOutputStream.java b/commons/nifi-stream-utils/src/main/java/org/apache/nifi/io/NullOutputStream.java new file mode 100644 index 0000000..60475d4 --- /dev/null +++ b/commons/nifi-stream-utils/src/main/java/org/apache/nifi/io/NullOutputStream.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.io; + +import java.io.IOException; +import java.io.OutputStream; + +/** + * OutputStream that throws away all data, just like as if writing to /dev/null + */ +public class NullOutputStream extends OutputStream { + + @Override + public void write(final int b) throws IOException { + } + + @Override + public void write(final byte[] b) throws IOException { + } + + @Override + public void write(final byte[] b, int off, int len) throws IOException { + } + + @Override + public void close() throws IOException { + } + + @Override + public void flush() throws IOException { + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/nifi-stream-utils/src/main/java/org/apache/nifi/io/StreamThrottler.java ---------------------------------------------------------------------- diff --git a/commons/nifi-stream-utils/src/main/java/org/apache/nifi/io/StreamThrottler.java b/commons/nifi-stream-utils/src/main/java/org/apache/nifi/io/StreamThrottler.java new file mode 100644 index 0000000..8c2aa80 --- /dev/null +++ b/commons/nifi-stream-utils/src/main/java/org/apache/nifi/io/StreamThrottler.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.io; + +import java.io.Closeable; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; + +public interface StreamThrottler extends Closeable { + + long copy(InputStream in, OutputStream out) throws IOException; + + long copy(InputStream in, OutputStream out, long maxBytes) throws IOException; + + InputStream newThrottledInputStream(final InputStream toWrap); + + OutputStream newThrottledOutputStream(final OutputStream toWrap); +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/nifi-stream-utils/src/main/java/org/apache/nifi/io/StreamUtils.java ---------------------------------------------------------------------- diff --git a/commons/nifi-stream-utils/src/main/java/org/apache/nifi/io/StreamUtils.java b/commons/nifi-stream-utils/src/main/java/org/apache/nifi/io/StreamUtils.java new file mode 100644 index 0000000..1596014 --- /dev/null +++ b/commons/nifi-stream-utils/src/main/java/org/apache/nifi/io/StreamUtils.java @@ -0,0 +1,257 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.io; + +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.List; + +import org.apache.nifi.io.exception.BytePatternNotFoundException; +import org.apache.nifi.io.util.NonThreadSafeCircularBuffer; + +public class StreamUtils { + + public static long copy(final InputStream source, final OutputStream destination) throws IOException { + final byte[] buffer = new byte[8192]; + int len; + long totalCount = 0L; + while ((len = source.read(buffer)) > 0) { + destination.write(buffer, 0, len); + totalCount += len; + } + return totalCount; + } + + /** + * Copies <code>numBytes</code> from <code>source</code> to + * <code>destination</code>. If <code>numBytes</code> are not available from + * <code>source</code>, throws EOFException + * + * @param source + * @param destination + * @param numBytes + * @throws IOException + */ + public static void copy(final InputStream source, final OutputStream destination, final long numBytes) throws IOException { + final byte[] buffer = new byte[8192]; + int len; + long bytesLeft = numBytes; + while ((len = source.read(buffer, 0, (int) Math.min(bytesLeft, buffer.length))) > 0) { + destination.write(buffer, 0, len); + bytesLeft -= len; + } + + if (bytesLeft > 0) { + throw new EOFException("Attempted to copy " + numBytes + " bytes but only " + (numBytes - bytesLeft) + " bytes were available"); + } + } + + /** + * Reads data from the given input stream, copying it to the destination + * byte array. If the InputStream has less data than the given byte array, + * throws an EOFException + * + * @param source + * @param destination + * @throws IOException + */ + public static void fillBuffer(final InputStream source, final byte[] destination) throws IOException { + fillBuffer(source, destination, true); + } + + /** + * Reads data from the given input stream, copying it to the destination + * byte array. If the InputStream has less data than the given byte array, + * throws an EOFException if <code>ensureCapacity</code> is true and + * otherwise returns the number of bytes copied + * + * @param source + * @param destination + * @param ensureCapacity whether or not to enforce that the InputStream have + * at least as much data as the capacity of the destination byte array + * @return + * @throws IOException + */ + public static int fillBuffer(final InputStream source, final byte[] destination, final boolean ensureCapacity) throws IOException { + int bytesRead = 0; + int len; + while (bytesRead < destination.length) { + len = source.read(destination, bytesRead, destination.length - bytesRead); + if (len < 0) { + if (ensureCapacity) { + throw new EOFException(); + } else { + break; + } + } + + bytesRead += len; + } + + return bytesRead; + } + + /** + * Copies data from in to out until either we are out of data (returns null) + * or we hit one of the byte patterns identified by the + * <code>stoppers</code> parameter (returns the byte pattern matched). The + * bytes in the stopper will be copied. + * + * @param in + * @param out + * @param maxBytes + * @param stoppers + * @return the byte array matched, or null if end of stream was reached + * @throws IOException + */ + public static byte[] copyInclusive(final InputStream in, final OutputStream out, final int maxBytes, final byte[]... stoppers) throws IOException { + if (stoppers.length == 0) { + return null; + } + + final List<NonThreadSafeCircularBuffer> circularBuffers = new ArrayList<NonThreadSafeCircularBuffer>(); + for (final byte[] stopper : stoppers) { + circularBuffers.add(new NonThreadSafeCircularBuffer(stopper)); + } + + long bytesRead = 0; + while (true) { + final int next = in.read(); + if (next == -1) { + return null; + } else if (maxBytes > 0 && ++bytesRead >= maxBytes) { + throw new BytePatternNotFoundException("Did not encounter any byte pattern that was expected; data does not appear to be in the expected format"); + } + + out.write(next); + + for (final NonThreadSafeCircularBuffer circ : circularBuffers) { + if (circ.addAndCompare((byte) next)) { + return circ.getByteArray(); + } + } + } + } + + /** + * Copies data from in to out until either we are out of data (returns null) + * or we hit one of the byte patterns identified by the + * <code>stoppers</code> parameter (returns the byte pattern matched). The + * byte pattern matched will NOT be copied to the output and will be un-read + * from the input. + * + * @param in + * @param out + * @param maxBytes + * @param stoppers + * @return the byte array matched, or null if end of stream was reached + * @throws IOException + */ + public static byte[] copyExclusive(final InputStream in, final OutputStream out, final int maxBytes, final byte[]... stoppers) throws IOException { + if (stoppers.length == 0) { + return null; + } + + int longest = 0; + NonThreadSafeCircularBuffer longestBuffer = null; + final List<NonThreadSafeCircularBuffer> circularBuffers = new ArrayList<NonThreadSafeCircularBuffer>(); + for (final byte[] stopper : stoppers) { + final NonThreadSafeCircularBuffer circularBuffer = new NonThreadSafeCircularBuffer(stopper); + if (stopper.length > longest) { + longest = stopper.length; + longestBuffer = circularBuffer; + circularBuffers.add(0, circularBuffer); + } else { + circularBuffers.add(circularBuffer); + } + } + + long bytesRead = 0; + while (true) { + final int next = in.read(); + if (next == -1) { + return null; + } else if (maxBytes > 0 && bytesRead++ > maxBytes) { + throw new BytePatternNotFoundException("Did not encounter any byte pattern that was expected; data does not appear to be in the expected format"); + } + + for (final NonThreadSafeCircularBuffer circ : circularBuffers) { + if (circ.addAndCompare((byte) next)) { + // The longest buffer has some data that may not have been written out yet; we need to make sure + // that we copy out those bytes. + final int bytesToCopy = longest - circ.getByteArray().length; + for (int i = 0; i < bytesToCopy; i++) { + final int oldestByte = longestBuffer.getOldestByte(); + if (oldestByte != -1) { + out.write(oldestByte); + longestBuffer.addAndCompare((byte) 0); + } + } + + return circ.getByteArray(); + } + } + + if (longestBuffer.isFilled()) { + out.write(longestBuffer.getOldestByte()); + } + } + } + + /** + * Skips the specified number of bytes from the InputStream + * + * If unable to skip that number of bytes, throws EOFException + * + * @param stream + * @param bytesToSkip + * @throws IOException + */ + public static void skip(final InputStream stream, final long bytesToSkip) throws IOException { + if (bytesToSkip <= 0) { + return; + } + long totalSkipped = 0L; + + // If we have a FileInputStream, calling skip(1000000) will return 1000000 even if the file is only + // 3 bytes. As a result, we will skip 1 less than the number requested, and then read the last + // byte in order to make sure that we've consumed the number of bytes requested. We then check that + // the final byte, which we read, is not -1. + final long actualBytesToSkip = bytesToSkip - 1; + while (totalSkipped < actualBytesToSkip) { + final long skippedThisIteration = stream.skip(actualBytesToSkip - totalSkipped); + if (skippedThisIteration == 0) { + final int nextByte = stream.read(); + if (nextByte == -1) { + throw new EOFException(); + } else { + totalSkipped++; + } + } + + totalSkipped += skippedThisIteration; + } + + final int lastByte = stream.read(); + if (lastByte == -1) { + throw new EOFException(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/nifi-stream-utils/src/main/java/org/apache/nifi/io/ZipOutputStream.java ---------------------------------------------------------------------- diff --git a/commons/nifi-stream-utils/src/main/java/org/apache/nifi/io/ZipOutputStream.java b/commons/nifi-stream-utils/src/main/java/org/apache/nifi/io/ZipOutputStream.java new file mode 100644 index 0000000..f285720 --- /dev/null +++ b/commons/nifi-stream-utils/src/main/java/org/apache/nifi/io/ZipOutputStream.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.io; + +import java.io.OutputStream; + +/** + * This class extends the {@link java.util.zip.ZipOutputStream} by providing a + * constructor that allows the user to specify the compression level. The + * default compression level is 1, as opposed to Java's default of 5. + */ +public class ZipOutputStream extends java.util.zip.ZipOutputStream { + + public static final int DEFAULT_COMPRESSION_LEVEL = 1; + + public ZipOutputStream(final OutputStream out) { + this(out, DEFAULT_COMPRESSION_LEVEL); + } + + public ZipOutputStream(final OutputStream out, final int compressionLevel) { + super(out); + def.setLevel(compressionLevel); + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/nifi-stream-utils/src/main/java/org/apache/nifi/io/exception/BytePatternNotFoundException.java ---------------------------------------------------------------------- diff --git a/commons/nifi-stream-utils/src/main/java/org/apache/nifi/io/exception/BytePatternNotFoundException.java b/commons/nifi-stream-utils/src/main/java/org/apache/nifi/io/exception/BytePatternNotFoundException.java new file mode 100644 index 0000000..8935767 --- /dev/null +++ b/commons/nifi-stream-utils/src/main/java/org/apache/nifi/io/exception/BytePatternNotFoundException.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.io.exception; + +import java.io.IOException; + +public class BytePatternNotFoundException extends IOException { + + private static final long serialVersionUID = -4128911284318513973L; + + public BytePatternNotFoundException(final String explanation) { + super(explanation); + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/nifi-stream-utils/src/main/java/org/apache/nifi/io/util/NonThreadSafeCircularBuffer.java ---------------------------------------------------------------------- diff --git a/commons/nifi-stream-utils/src/main/java/org/apache/nifi/io/util/NonThreadSafeCircularBuffer.java b/commons/nifi-stream-utils/src/main/java/org/apache/nifi/io/util/NonThreadSafeCircularBuffer.java new file mode 100644 index 0000000..1b87488 --- /dev/null +++ b/commons/nifi-stream-utils/src/main/java/org/apache/nifi/io/util/NonThreadSafeCircularBuffer.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.io.util; + +import java.util.Arrays; + +public class NonThreadSafeCircularBuffer { + + private final byte[] lookingFor; + private final int[] buffer; + private int insertionPointer = 0; + private int bufferSize = 0; + + public NonThreadSafeCircularBuffer(final byte[] lookingFor) { + this.lookingFor = lookingFor; + buffer = new int[lookingFor.length]; + Arrays.fill(buffer, -1); + } + + public byte[] getByteArray() { + return lookingFor; + } + + /** + * Returns the oldest byte in the buffer + * + * @return + */ + public int getOldestByte() { + return buffer[insertionPointer]; + } + + public boolean isFilled() { + return bufferSize >= buffer.length; + } + + public boolean addAndCompare(final byte data) { + buffer[insertionPointer] = data; + insertionPointer = (insertionPointer + 1) % lookingFor.length; + + bufferSize++; + if (bufferSize < lookingFor.length) { + return false; + } + + for (int i = 0; i < lookingFor.length; i++) { + final byte compare = (byte) buffer[(insertionPointer + i) % lookingFor.length]; + if (compare != lookingFor[i]) { + return false; + } + } + + return true; + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/nifi-stream-utils/src/test/java/org/apache/nifi/io/TestLeakyBucketThrottler.java ---------------------------------------------------------------------- diff --git a/commons/nifi-stream-utils/src/test/java/org/apache/nifi/io/TestLeakyBucketThrottler.java b/commons/nifi-stream-utils/src/test/java/org/apache/nifi/io/TestLeakyBucketThrottler.java new file mode 100644 index 0000000..12e1801 --- /dev/null +++ b/commons/nifi-stream-utils/src/test/java/org/apache/nifi/io/TestLeakyBucketThrottler.java @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.io; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.List; + +import org.junit.Ignore; +import org.junit.Test; + +@Ignore("Tests are time-based") +public class TestLeakyBucketThrottler { + + @Test(timeout = 10000) + public void testOutputStreamInterface() throws IOException { + // throttle rate at 1 MB/sec + final LeakyBucketStreamThrottler throttler = new LeakyBucketStreamThrottler(1024 * 1024); + + final byte[] data = new byte[1024 * 1024 * 4]; + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + final OutputStream throttledOut = throttler.newThrottledOutputStream(baos); + + final long start = System.currentTimeMillis(); + throttledOut.write(data); + throttler.close(); + final long millis = System.currentTimeMillis() - start; + // should take 4 sec give or take + assertTrue(millis > 3000); + assertTrue(millis < 6000); + } + + @Test(timeout = 10000) + public void testInputStreamInterface() throws IOException { + // throttle rate at 1 MB/sec + final LeakyBucketStreamThrottler throttler = new LeakyBucketStreamThrottler(1024 * 1024); + + final byte[] data = new byte[1024 * 1024 * 4]; + final ByteArrayInputStream bais = new ByteArrayInputStream(data); + final InputStream throttledIn = throttler.newThrottledInputStream(bais); + + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + + final byte[] buffer = new byte[4096]; + final long start = System.currentTimeMillis(); + int len; + while ((len = throttledIn.read(buffer)) > 0) { + baos.write(buffer, 0, len); + } + throttler.close(); + final long millis = System.currentTimeMillis() - start; + // should take 4 sec give or take + assertTrue(millis > 3000); + assertTrue(millis < 6000); + baos.close(); + } + + @Test(timeout = 10000) + public void testDirectInterface() throws IOException, InterruptedException { + // throttle rate at 1 MB/sec + final LeakyBucketStreamThrottler throttler = new LeakyBucketStreamThrottler(1024 * 1024); + + // create 3 threads, each sending ~2 MB + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + final List<Thread> threads = new ArrayList<Thread>(); + for (int i = 0; i < 3; i++) { + final Thread t = new WriterThread(i, throttler, baos); + threads.add(t); + } + + final long start = System.currentTimeMillis(); + for (final Thread t : threads) { + t.start(); + } + + for (final Thread t : threads) { + t.join(); + } + final long elapsed = System.currentTimeMillis() - start; + + throttler.close(); + + // To send 15 MB, it should have taken at least 5 seconds and no more than 7 seconds, to + // allow for busy-ness and the fact that we could write a tiny bit more than the limit. + assertTrue(elapsed > 5000); + assertTrue(elapsed < 7000); + + // ensure bytes were copied out appropriately + assertEquals(3 * (2 * 1024 * 1024 + 1), baos.getBufferLength()); + assertEquals((byte) 'A', baos.getUnderlyingBuffer()[baos.getBufferLength() - 1]); + } + + private static class WriterThread extends Thread { + + private final int idx; + private final byte[] data = new byte[1024 * 1024 * 2 + 1]; + private final LeakyBucketStreamThrottler throttler; + private final OutputStream out; + + public WriterThread(final int idx, final LeakyBucketStreamThrottler throttler, final OutputStream out) { + this.idx = idx; + this.throttler = throttler; + this.out = out; + this.data[this.data.length - 1] = (byte) 'A'; + } + + @Override + public void run() { + long startMillis = System.currentTimeMillis(); + long bytesWritten = 0L; + try { + throttler.copy(new ByteArrayInputStream(data), out); + } catch (IOException e) { + e.printStackTrace(); + return; + } + long now = System.currentTimeMillis(); + long millisElapsed = now - startMillis; + bytesWritten += data.length; + float bytesPerSec = (float) bytesWritten / (float) millisElapsed * 1000F; + System.out.println(idx + " : copied data at a rate of " + bytesPerSec + " bytes/sec"); + } + } + +}