http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannel.java ---------------------------------------------------------------------- diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannel.java b/commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannel.java deleted file mode 100644 index 5810488..0000000 --- a/commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannel.java +++ /dev/null @@ -1,602 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.remote.io.socket.ssl; - -import java.io.Closeable; -import java.io.IOException; -import java.net.InetSocketAddress; -import java.net.Socket; -import java.net.SocketAddress; -import java.net.SocketTimeoutException; -import java.nio.ByteBuffer; -import java.nio.channels.ClosedByInterruptException; -import java.nio.channels.SocketChannel; -import java.util.concurrent.TimeUnit; - -import javax.net.ssl.SSLContext; -import javax.net.ssl.SSLEngine; -import javax.net.ssl.SSLEngineResult; -import javax.net.ssl.SSLEngineResult.Status; -import javax.net.ssl.SSLHandshakeException; -import javax.net.ssl.SSLPeerUnverifiedException; -import javax.security.cert.CertificateExpiredException; -import javax.security.cert.CertificateNotYetValidException; -import javax.security.cert.X509Certificate; - -import org.apache.nifi.remote.exception.TransmissionDisabledException; -import org.apache.nifi.remote.io.socket.BufferStateManager; -import org.apache.nifi.remote.io.socket.BufferStateManager.Direction; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class SSLSocketChannel implements Closeable { - - public static final int MAX_WRITE_SIZE = 65536; - - private static final Logger logger = LoggerFactory.getLogger(SSLSocketChannel.class); - private static final long BUFFER_FULL_EMPTY_WAIT_NANOS = TimeUnit.NANOSECONDS.convert(10, TimeUnit.MILLISECONDS); - - private final String hostname; - private final int port; - private final SSLEngine engine; - private final SocketAddress socketAddress; - - private BufferStateManager streamInManager; - private BufferStateManager streamOutManager; - private BufferStateManager appDataManager; - - private SocketChannel channel; - - private final byte[] oneByteBuffer = new byte[1]; - - private int timeoutMillis = 30000; - private volatile boolean connected = false; - private boolean handshaking = false; - private boolean closed = false; - private volatile boolean interrupted = false; - - public SSLSocketChannel(final SSLContext sslContext, final String hostname, final int port, final boolean client) throws IOException { - this.socketAddress = new InetSocketAddress(hostname, port); - this.channel = SocketChannel.open(); - this.hostname = hostname; - this.port = port; - this.engine = sslContext.createSSLEngine(); - this.engine.setUseClientMode(client); - engine.setNeedClientAuth(true); - - streamInManager = new BufferStateManager(ByteBuffer.allocate(engine.getSession().getPacketBufferSize())); - streamOutManager = new BufferStateManager(ByteBuffer.allocate(engine.getSession().getPacketBufferSize())); - appDataManager = new BufferStateManager(ByteBuffer.allocate(engine.getSession().getApplicationBufferSize())); - } - - public SSLSocketChannel(final SSLContext sslContext, final SocketChannel socketChannel, final boolean client) throws IOException { - if (!socketChannel.isConnected()) { - throw new IllegalArgumentException("Cannot pass an un-connected SocketChannel"); - } - - this.channel = socketChannel; - - this.socketAddress = socketChannel.getRemoteAddress(); - final Socket socket = socketChannel.socket(); - this.hostname = socket.getInetAddress().getHostName(); - this.port = socket.getPort(); - - this.engine = sslContext.createSSLEngine(); - this.engine.setUseClientMode(client); - engine.setNeedClientAuth(true); - - streamInManager = new BufferStateManager(ByteBuffer.allocate(engine.getSession().getPacketBufferSize())); - streamOutManager = new BufferStateManager(ByteBuffer.allocate(engine.getSession().getPacketBufferSize())); - appDataManager = new BufferStateManager(ByteBuffer.allocate(engine.getSession().getApplicationBufferSize())); - } - - public void setTimeout(final int millis) { - this.timeoutMillis = millis; - } - - public int getTimeout() { - return timeoutMillis; - } - - public void connect() throws SSLHandshakeException, IOException { - try { - channel.configureBlocking(false); - if (!channel.isConnected()) { - final long startTime = System.currentTimeMillis(); - - if (!channel.connect(socketAddress)) { - while (!channel.finishConnect()) { - if (interrupted) { - throw new TransmissionDisabledException(); - } - if (System.currentTimeMillis() > startTime + timeoutMillis) { - throw new SocketTimeoutException("Timed out connecting to " + hostname + ":" + port); - } - - try { - Thread.sleep(50L); - } catch (final InterruptedException e) { - } - } - } - } - engine.beginHandshake(); - - performHandshake(); - logger.debug("{} Successfully completed SSL handshake", this); - - streamInManager.clear(); - streamOutManager.clear(); - appDataManager.clear(); - - connected = true; - } catch (final Exception e) { - logger.error("{} Failed to connect due to {}", this, e); - if (logger.isDebugEnabled()) { - logger.error("", e); - } - closeQuietly(channel); - engine.closeInbound(); - engine.closeOutbound(); - throw e; - } - } - - public String getDn() throws CertificateExpiredException, CertificateNotYetValidException, SSLPeerUnverifiedException { - final X509Certificate[] certs = engine.getSession().getPeerCertificateChain(); - if (certs == null || certs.length == 0) { - throw new SSLPeerUnverifiedException("No certificates found"); - } - - final X509Certificate cert = certs[0]; - cert.checkValidity(); - return cert.getSubjectDN().getName().trim(); - } - - private void performHandshake() throws IOException { - // Generate handshake message - final byte[] emptyMessage = new byte[0]; - handshaking = true; - logger.debug("{} Performing Handshake", this); - - try { - while (true) { - switch (engine.getHandshakeStatus()) { - case FINISHED: - return; - case NEED_WRAP: { - final ByteBuffer appDataOut = ByteBuffer.wrap(emptyMessage); - - final ByteBuffer outboundBuffer = streamOutManager.prepareForWrite(engine.getSession().getApplicationBufferSize()); - - final SSLEngineResult wrapHelloResult = engine.wrap(appDataOut, outboundBuffer); - if (wrapHelloResult.getStatus() == Status.BUFFER_OVERFLOW) { - streamOutManager.prepareForWrite(engine.getSession().getApplicationBufferSize()); - continue; - } - - if (wrapHelloResult.getStatus() != Status.OK) { - throw new SSLHandshakeException("Could not generate SSL Handshake information: SSLEngineResult: " - + wrapHelloResult.toString()); - } - - logger.trace("{} Handshake response after wrapping: {}", this, wrapHelloResult); - - final ByteBuffer readableStreamOut = streamOutManager.prepareForRead(1); - final int bytesToSend = readableStreamOut.remaining(); - writeFully(readableStreamOut); - logger.trace("{} Sent {} bytes of wrapped data for handshake", this, bytesToSend); - - streamOutManager.clear(); - } - continue; - case NEED_UNWRAP: { - final ByteBuffer readableDataIn = streamInManager.prepareForRead(0); - final ByteBuffer appData = appDataManager.prepareForWrite(engine.getSession().getApplicationBufferSize()); - - // Read handshake response from other side - logger.trace("{} Unwrapping: {} to {}", new Object[]{this, readableDataIn, appData}); - SSLEngineResult handshakeResponseResult = engine.unwrap(readableDataIn, appData); - logger.trace("{} Handshake response after unwrapping: {}", this, handshakeResponseResult); - - if (handshakeResponseResult.getStatus() == Status.BUFFER_UNDERFLOW) { - final ByteBuffer writableDataIn = streamInManager.prepareForWrite(engine.getSession().getPacketBufferSize()); - final int bytesRead = readData(writableDataIn); - if (bytesRead > 0) { - logger.trace("{} Read {} bytes for handshake", this, bytesRead); - } - - if (bytesRead < 0) { - throw new SSLHandshakeException("Reached End-of-File marker while performing handshake"); - } - } else if (handshakeResponseResult.getStatus() == Status.CLOSED) { - throw new IOException("Channel was closed by peer during handshake"); - } else { - streamInManager.compact(); - appDataManager.clear(); - } - } - break; - case NEED_TASK: - performTasks(); - continue; - case NOT_HANDSHAKING: - return; - } - } - } finally { - handshaking = false; - } - } - - private void performTasks() { - Runnable runnable; - while ((runnable = engine.getDelegatedTask()) != null) { - runnable.run(); - } - } - - private void closeQuietly(final Closeable closeable) { - try { - closeable.close(); - } catch (final Exception e) { - } - } - - private int readData(final ByteBuffer dest) throws IOException { - final long startTime = System.currentTimeMillis(); - - while (true) { - if (interrupted) { - throw new TransmissionDisabledException(); - } - - if (dest.remaining() == 0) { - return 0; - } - - final int readCount = channel.read(dest); - - if (readCount == 0) { - if (System.currentTimeMillis() > startTime + timeoutMillis) { - throw new SocketTimeoutException("Timed out reading from socket connected to " + hostname + ":" + port); - } - try { - TimeUnit.NANOSECONDS.sleep(BUFFER_FULL_EMPTY_WAIT_NANOS); - } catch (InterruptedException e) { - close(); - Thread.currentThread().interrupt(); // set the interrupt status - throw new ClosedByInterruptException(); - } - - continue; - } - - logger.trace("{} Read {} bytes", this, readCount); - return readCount; - } - } - - private Status encryptAndWriteFully(final BufferStateManager src) throws IOException { - SSLEngineResult result = null; - - final ByteBuffer buff = src.prepareForRead(0); - final ByteBuffer outBuff = streamOutManager.prepareForWrite(engine.getSession().getApplicationBufferSize()); - - logger.trace("{} Encrypting {} bytes", this, buff.remaining()); - while (buff.remaining() > 0) { - result = engine.wrap(buff, outBuff); - if (result.getStatus() == Status.OK) { - final ByteBuffer readableOutBuff = streamOutManager.prepareForRead(0); - writeFully(readableOutBuff); - streamOutManager.clear(); - } else { - return result.getStatus(); - } - } - - return result.getStatus(); - } - - private void writeFully(final ByteBuffer src) throws IOException { - long lastByteWrittenTime = System.currentTimeMillis(); - - int bytesWritten = 0; - while (src.hasRemaining()) { - if (interrupted) { - throw new TransmissionDisabledException(); - } - - final int written = channel.write(src); - bytesWritten += written; - final long now = System.currentTimeMillis(); - if (written > 0) { - lastByteWrittenTime = now; - } else { - if (now > lastByteWrittenTime + timeoutMillis) { - throw new SocketTimeoutException("Timed out writing to socket connected to " + hostname + ":" + port); - } - try { - TimeUnit.NANOSECONDS.sleep(BUFFER_FULL_EMPTY_WAIT_NANOS); - } catch (final InterruptedException e) { - close(); - Thread.currentThread().interrupt(); // set the interrupt status - throw new ClosedByInterruptException(); - } - } - } - - logger.trace("{} Wrote {} bytes", this, bytesWritten); - } - - public boolean isClosed() { - if (closed) { - return true; - } - // need to detect if peer has sent closure handshake...if so the answer is true - final ByteBuffer writableInBuffer = streamInManager.prepareForWrite(engine.getSession().getPacketBufferSize()); - int readCount = 0; - try { - readCount = channel.read(writableInBuffer); - } catch (IOException e) { - logger.error("{} Failed to readData due to {}", new Object[]{this, e}); - if (logger.isDebugEnabled()) { - logger.error("", e); - } - readCount = -1; // treat the condition same as if End of Stream - } - if (readCount == 0) { - return false; - } - if (readCount > 0) { - logger.trace("{} Read {} bytes", this, readCount); - - final ByteBuffer streamInBuffer = streamInManager.prepareForRead(1); - final ByteBuffer appDataBuffer = appDataManager.prepareForWrite(engine.getSession().getApplicationBufferSize()); - try { - SSLEngineResult unwrapResponse = engine.unwrap(streamInBuffer, appDataBuffer); - logger.trace("{} When checking if closed, (handshake={}) Unwrap response: {}", new Object[]{this, handshaking, unwrapResponse}); - if (unwrapResponse.getStatus().equals(Status.CLOSED)) { - // Drain the incoming TCP buffer - final ByteBuffer discardBuffer = ByteBuffer.allocate(8192); - int bytesDiscarded = channel.read(discardBuffer); - while (bytesDiscarded > 0) { - discardBuffer.clear(); - bytesDiscarded = channel.read(discardBuffer); - } - engine.closeInbound(); - } else { - streamInManager.compact(); - return false; - } - } catch (IOException e) { - logger.error("{} Failed to check if closed due to {}. Closing channel.", new Object[]{this, e}); - if (logger.isDebugEnabled()) { - logger.error("", e); - } - } - } - // either readCount is -1, indicating an end of stream, or the peer sent a closure handshake - // so go ahead and close down the channel - closeQuietly(channel.socket()); - closeQuietly(channel); - closed = true; - return true; - } - - @Override - public void close() throws IOException { - logger.debug("{} Closing Connection", this); - if (channel == null) { - return; - } - - if (closed) { - return; - } - - try { - engine.closeOutbound(); - - final byte[] emptyMessage = new byte[0]; - - final ByteBuffer appDataOut = ByteBuffer.wrap(emptyMessage); - final ByteBuffer outboundBuffer = streamOutManager.prepareForWrite(engine.getSession().getApplicationBufferSize()); - final SSLEngineResult handshakeResult = engine.wrap(appDataOut, outboundBuffer); - - if (handshakeResult.getStatus() != Status.CLOSED) { - throw new IOException("Invalid close state - will not send network data"); - } - - final ByteBuffer readableStreamOut = streamOutManager.prepareForRead(1); - writeFully(readableStreamOut); - } finally { - // Drain the incoming TCP buffer - final ByteBuffer discardBuffer = ByteBuffer.allocate(8192); - try { - int bytesDiscarded = channel.read(discardBuffer); - while (bytesDiscarded > 0) { - discardBuffer.clear(); - bytesDiscarded = channel.read(discardBuffer); - } - } catch (Exception e) { - } - - closeQuietly(channel.socket()); - closeQuietly(channel); - closed = true; - } - } - - private int copyFromAppDataBuffer(final byte[] buffer, final int offset, final int len) { - // If any data already exists in the application data buffer, copy it to the buffer. - final ByteBuffer appDataBuffer = appDataManager.prepareForRead(1); - - final int appDataRemaining = appDataBuffer.remaining(); - if (appDataRemaining > 0) { - final int bytesToCopy = Math.min(len, appDataBuffer.remaining()); - appDataBuffer.get(buffer, offset, bytesToCopy); - - final int bytesCopied = appDataRemaining - appDataBuffer.remaining(); - logger.trace("{} Copied {} ({}) bytes from unencrypted application buffer to user space", - new Object[]{this, bytesToCopy, bytesCopied}); - return bytesCopied; - } - return 0; - } - - public int available() throws IOException { - ByteBuffer appDataBuffer = appDataManager.prepareForRead(1); - ByteBuffer streamDataBuffer = streamInManager.prepareForRead(1); - final int buffered = appDataBuffer.remaining() + streamDataBuffer.remaining(); - if (buffered > 0) { - return buffered; - } - - final boolean wasAbleToRead = isDataAvailable(); - if (!wasAbleToRead) { - return 0; - } - - appDataBuffer = appDataManager.prepareForRead(1); - streamDataBuffer = streamInManager.prepareForRead(1); - return appDataBuffer.remaining() + streamDataBuffer.remaining(); - } - - public boolean isDataAvailable() throws IOException { - final ByteBuffer appDataBuffer = appDataManager.prepareForRead(1); - final ByteBuffer streamDataBuffer = streamInManager.prepareForRead(1); - - if (appDataBuffer.remaining() > 0 || streamDataBuffer.remaining() > 0) { - return true; - } - - final ByteBuffer writableBuffer = streamInManager.prepareForWrite(engine.getSession().getPacketBufferSize()); - final int bytesRead = channel.read(writableBuffer); - return (bytesRead > 0); - } - - public int read() throws IOException { - final int bytesRead = read(oneByteBuffer); - if (bytesRead == -1) { - return -1; - } - return oneByteBuffer[0] & 0xFF; - } - - public int read(final byte[] buffer) throws IOException { - return read(buffer, 0, buffer.length); - } - - public int read(final byte[] buffer, final int offset, final int len) throws IOException { - logger.debug("{} Reading up to {} bytes of data", this, len); - - if (!connected) { - connect(); - } - - int copied = copyFromAppDataBuffer(buffer, offset, len); - if (copied > 0) { - return copied; - } - - appDataManager.clear(); - - while (true) { - // prepare buffers and call unwrap - final ByteBuffer streamInBuffer = streamInManager.prepareForRead(1); - SSLEngineResult unwrapResponse = null; - final ByteBuffer appDataBuffer = appDataManager.prepareForWrite(engine.getSession().getApplicationBufferSize()); - unwrapResponse = engine.unwrap(streamInBuffer, appDataBuffer); - logger.trace("{} When reading data, (handshake={}) Unwrap response: {}", new Object[]{this, handshaking, unwrapResponse}); - - switch (unwrapResponse.getStatus()) { - case BUFFER_OVERFLOW: - throw new SSLHandshakeException("Buffer Overflow, which is not allowed to happen from an unwrap"); - case BUFFER_UNDERFLOW: { -// appDataManager.prepareForRead(engine.getSession().getApplicationBufferSize()); - - final ByteBuffer writableInBuffer = streamInManager.prepareForWrite(engine.getSession().getPacketBufferSize()); - final int bytesRead = readData(writableInBuffer); - if (bytesRead < 0) { - return -1; - } - - continue; - } - case CLOSED: - throw new IOException("Channel is closed"); - case OK: { - copied = copyFromAppDataBuffer(buffer, offset, len); - if (copied == 0) { - throw new IOException("Failed to decrypt data"); - } - streamInManager.compact(); - return copied; - } - } - } - } - - public void write(final int data) throws IOException { - write(new byte[]{(byte) data}, 0, 1); - } - - public void write(final byte[] data) throws IOException { - write(data, 0, data.length); - } - - public void write(final byte[] data, final int offset, final int len) throws IOException { - logger.debug("{} Writing {} bytes of data", this, len); - - if (!connected) { - connect(); - } - - int iterations = len / MAX_WRITE_SIZE; - if (len % MAX_WRITE_SIZE > 0) { - iterations++; - } - - for (int i = 0; i < iterations; i++) { - streamOutManager.clear(); - final int itrOffset = offset + i * MAX_WRITE_SIZE; - final int itrLen = Math.min(len - itrOffset, MAX_WRITE_SIZE); - final ByteBuffer byteBuffer = ByteBuffer.wrap(data, itrOffset, itrLen); - - final BufferStateManager buffMan = new BufferStateManager(byteBuffer, Direction.READ); - final Status status = encryptAndWriteFully(buffMan); - switch (status) { - case BUFFER_OVERFLOW: - streamOutManager.ensureSize(engine.getSession().getPacketBufferSize()); - appDataManager.ensureSize(engine.getSession().getApplicationBufferSize()); - continue; - case OK: - continue; - case CLOSED: - throw new IOException("Channel is closed"); - case BUFFER_UNDERFLOW: - throw new AssertionError("Got Buffer Underflow but should not have..."); - } - } - } - - public void interrupt() { - this.interrupted = true; - } -}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelInputStream.java ---------------------------------------------------------------------- diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelInputStream.java b/commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelInputStream.java deleted file mode 100644 index 154bd08..0000000 --- a/commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelInputStream.java +++ /dev/null @@ -1,62 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.remote.io.socket.ssl; - -import java.io.IOException; -import java.io.InputStream; - -public class SSLSocketChannelInputStream extends InputStream { - - private final SSLSocketChannel channel; - - public SSLSocketChannelInputStream(final SSLSocketChannel channel) { - this.channel = channel; - } - - @Override - public int read() throws IOException { - return channel.read(); - } - - @Override - public int read(final byte[] b) throws IOException { - return channel.read(b); - } - - @Override - public int read(final byte[] b, final int off, final int len) throws IOException { - return channel.read(b, off, len); - } - - /** - * Closes the underlying SSLSocketChannel, which will also close the - * OutputStream and connection - */ - @Override - public void close() throws IOException { - channel.close(); - } - - @Override - public int available() throws IOException { - return channel.available(); - } - - public boolean isDataAvailable() throws IOException { - return available() > 0; - } -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelOutputStream.java ---------------------------------------------------------------------- diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelOutputStream.java b/commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelOutputStream.java deleted file mode 100644 index ce4e420..0000000 --- a/commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelOutputStream.java +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.remote.io.socket.ssl; - -import java.io.IOException; -import java.io.OutputStream; - -public class SSLSocketChannelOutputStream extends OutputStream { - - private final SSLSocketChannel channel; - - public SSLSocketChannelOutputStream(final SSLSocketChannel channel) { - this.channel = channel; - } - - @Override - public void write(final int b) throws IOException { - channel.write(b); - } - - @Override - public void write(byte[] b) throws IOException { - channel.write(b); - } - - @Override - public void write(byte[] b, int off, int len) throws IOException { - channel.write(b, off, len); - } - - /** - * Closes the underlying SSLSocketChannel, which also will close the - * InputStream and the connection - */ - @Override - public void close() throws IOException { - channel.close(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/BufferedInputStream.java ---------------------------------------------------------------------- diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/BufferedInputStream.java b/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/BufferedInputStream.java deleted file mode 100644 index aaf37ea..0000000 --- a/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/BufferedInputStream.java +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.stream.io; - -import java.io.InputStream; - -/** - * This class is a slight modification of the BufferedInputStream in the java.io - * package. The modification is that this implementation does not provide - * synchronization on method calls, which means that this class is not suitable - * for use by multiple threads. However, the absence of these synchronized - * blocks results in potentially much better performance. - */ -public class BufferedInputStream extends java.io.BufferedInputStream { - - public BufferedInputStream(final InputStream in) { - super(in); - } - - public BufferedInputStream(final InputStream in, final int size) { - super(in, size); - } -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/BufferedOutputStream.java ---------------------------------------------------------------------- diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/BufferedOutputStream.java b/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/BufferedOutputStream.java deleted file mode 100644 index eadfcab..0000000 --- a/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/BufferedOutputStream.java +++ /dev/null @@ -1,140 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.stream.io; - -import java.io.FilterOutputStream; -import java.io.IOException; -import java.io.OutputStream; - -/** - * This class is a slight modification of the - * {@link java.io.BufferedOutputStream} class. This implementation differs in - * that it does not mark methods as synchronized. This means that this class is - * not suitable for writing by multiple concurrent threads. However, the removal - * of the synchronized keyword results in potentially much better performance. - */ -public class BufferedOutputStream extends FilterOutputStream { - - /** - * The internal buffer where data is stored. - */ - protected byte buf[]; - - /** - * The number of valid bytes in the buffer. This value is always in the - * range <tt>0</tt> through <tt>buf.length</tt>; elements - * <tt>buf[0]</tt> through <tt>buf[count-1]</tt> contain valid byte data. - */ - protected int count; - - /** - * Creates a new buffered output stream to write data to the specified - * underlying output stream. - * - * @param out the underlying output stream. - */ - public BufferedOutputStream(OutputStream out) { - this(out, 8192); - } - - /** - * Creates a new buffered output stream to write data to the specified - * underlying output stream with the specified buffer size. - * - * @param out the underlying output stream. - * @param size the buffer size. - * @exception IllegalArgumentException if size <= 0. - */ - public BufferedOutputStream(OutputStream out, int size) { - super(out); - if (size <= 0) { - throw new IllegalArgumentException("Buffer size <= 0"); - } - buf = new byte[size]; - } - - /** - * Flush the internal buffer - */ - private void flushBuffer() throws IOException { - if (count > 0) { - out.write(buf, 0, count); - count = 0; - } - } - - /** - * Writes the specified byte to this buffered output stream. - * - * @param b the byte to be written. - * @exception IOException if an I/O error occurs. - */ - @Override - public void write(int b) throws IOException { - if (count >= buf.length) { - flushBuffer(); - } - buf[count++] = (byte) b; - } - - /** - * Writes <code>len</code> bytes from the specified byte array starting at - * offset <code>off</code> to this buffered output stream. - * - * <p> - * Ordinarily this method stores bytes from the given array into this - * stream's buffer, flushing the buffer to the underlying output stream as - * needed. If the requested length is at least as large as this stream's - * buffer, however, then this method will flush the buffer and write the - * bytes directly to the underlying output stream. Thus redundant - * <code>BufferedOutputStream</code>s will not copy data unnecessarily. - * - * @param b the data. - * @param off the start offset in the data. - * @param len the number of bytes to write. - * @exception IOException if an I/O error occurs. - */ - @Override - public void write(byte b[], int off, int len) throws IOException { - if (len >= buf.length) { - /* If the request length exceeds the size of the output buffer, - flush the output buffer and then write the data directly. - In this way buffered streams will cascade harmlessly. */ - flushBuffer(); - out.write(b, off, len); - return; - } - if (len >= buf.length - count) { - flushBuffer(); - } - System.arraycopy(b, off, buf, count, len); - count += len; - } - - /** - * Flushes this buffered output stream. This forces any buffered output - * bytes to be written out to the underlying output stream. - * - * @exception IOException if an I/O error occurs. - * @see java.io.FilterOutputStream#out - */ - @Override - public void flush() throws IOException { - flushBuffer(); - out.flush(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/ByteArrayInputStream.java ---------------------------------------------------------------------- diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/ByteArrayInputStream.java b/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/ByteArrayInputStream.java deleted file mode 100644 index 284cd54..0000000 --- a/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/ByteArrayInputStream.java +++ /dev/null @@ -1,250 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.stream.io; - -import java.io.InputStream; - -/** - * This class performs the same function as java.io.ByteArrayInputStream but - * does not mark its methods as synchronized - */ -public class ByteArrayInputStream extends InputStream { - - /** - * An array of bytes that was provided by the creator of the stream. - * Elements <code>buf[0]</code> through <code>buf[count-1]</code> are the - * only bytes that can ever be read from the stream; element - * <code>buf[pos]</code> is the next byte to be read. - */ - protected byte buf[]; - - /** - * The index of the next character to read from the input stream buffer. - * This value should always be nonnegative and not larger than the value of - * <code>count</code>. The next byte to be read from the input stream buffer - * will be <code>buf[pos]</code>. - */ - protected int pos; - - /** - * The currently marked position in the stream. ByteArrayInputStream objects - * are marked at position zero by default when constructed. They may be - * marked at another position within the buffer by the <code>mark()</code> - * method. The current buffer position is set to this point by the - * <code>reset()</code> method. - * <p> - * If no mark has been set, then the value of mark is the offset passed to - * the constructor (or 0 if the offset was not supplied). - * - * @since JDK1.1 - */ - protected int mark = 0; - - /** - * The index one greater than the last valid character in the input stream - * buffer. This value should always be nonnegative and not larger than the - * length of <code>buf</code>. It is one greater than the position of the - * last byte within <code>buf</code> that can ever be read from the input - * stream buffer. - */ - protected int count; - - /** - * Creates a <code>ByteArrayInputStream</code> so that it uses - * <code>buf</code> as its buffer array. The buffer array is not copied. The - * initial value of <code>pos</code> is <code>0</code> and the initial value - * of <code>count</code> is the length of <code>buf</code>. - * - * @param buf the input buffer. - */ - public ByteArrayInputStream(byte buf[]) { - this.buf = buf; - this.pos = 0; - this.count = buf.length; - } - - /** - * Creates <code>ByteArrayInputStream</code> that uses <code>buf</code> as - * its buffer array. The initial value of <code>pos</code> is - * <code>offset</code> and the initial value of <code>count</code> is the - * minimum of <code>offset+length</code> and <code>buf.length</code>. The - * buffer array is not copied. The buffer's mark is set to the specified - * offset. - * - * @param buf the input buffer. - * @param offset the offset in the buffer of the first byte to read. - * @param length the maximum number of bytes to read from the buffer. - */ - public ByteArrayInputStream(byte buf[], int offset, int length) { - this.buf = buf; - this.pos = offset; - this.count = Math.min(offset + length, buf.length); - this.mark = offset; - } - - /** - * Reads the next byte of data from this input stream. The value byte is - * returned as an <code>int</code> in the range <code>0</code> to - * <code>255</code>. If no byte is available because the end of the stream - * has been reached, the value <code>-1</code> is returned. - * <p> - * This <code>read</code> method cannot block. - * - * @return the next byte of data, or <code>-1</code> if the end of the - * stream has been reached. - */ - @Override - public int read() { - return (pos < count) ? (buf[pos++] & 0xff) : -1; - } - - /** - * Reads up to <code>len</code> bytes of data into an array of bytes from - * this input stream. If <code>pos</code> equals <code>count</code>, then - * <code>-1</code> is returned to indicate end of file. Otherwise, the - * number <code>k</code> of bytes read is equal to the smaller of - * <code>len</code> and <code>count-pos</code>. If <code>k</code> is - * positive, then bytes <code>buf[pos]</code> through - * <code>buf[pos+k-1]</code> are copied into <code>b[off]</code> through - * <code>b[off+k-1]</code> in the manner performed by - * <code>System.arraycopy</code>. The value <code>k</code> is added into - * <code>pos</code> and <code>k</code> is returned. - * <p> - * This <code>read</code> method cannot block. - * - * @param b the buffer into which the data is read. - * @param off the start offset in the destination array <code>b</code> - * @param len the maximum number of bytes read. - * @return the total number of bytes read into the buffer, or - * <code>-1</code> if there is no more data because the end of the stream - * has been reached. - * @exception NullPointerException If <code>b</code> is <code>null</code>. - * @exception IndexOutOfBoundsException If <code>off</code> is negative, - * <code>len</code> is negative, or <code>len</code> is greater than - * <code>b.length - off</code> - */ - @Override - public int read(byte b[], int off, int len) { - if (b == null) { - throw new NullPointerException(); - } else if (off < 0 || len < 0 || len > b.length - off) { - throw new IndexOutOfBoundsException(); - } - - if (pos >= count) { - return -1; - } - - int avail = count - pos; - if (len > avail) { - len = avail; - } - if (len <= 0) { - return 0; - } - System.arraycopy(buf, pos, b, off, len); - pos += len; - return len; - } - - /** - * Skips <code>n</code> bytes of input from this input stream. Fewer bytes - * might be skipped if the end of the input stream is reached. The actual - * number <code>k</code> of bytes to be skipped is equal to the smaller of - * <code>n</code> and <code>count-pos</code>. The value <code>k</code> is - * added into <code>pos</code> and <code>k</code> is returned. - * - * @param n the number of bytes to be skipped. - * @return the actual number of bytes skipped. - */ - @Override - public long skip(long n) { - long k = count - pos; - if (n < k) { - k = n < 0 ? 0 : n; - } - - pos += k; - return k; - } - - /** - * Returns the number of remaining bytes that can be read (or skipped over) - * from this input stream. - * <p> - * The value returned is <code>count - pos</code>, which is the number - * of bytes remaining to be read from the input buffer. - * - * @return the number of remaining bytes that can be read (or skipped over) - * from this input stream without blocking. - */ - @Override - public int available() { - return count - pos; - } - - /** - * Tests if this <code>InputStream</code> supports mark/reset. The - * <code>markSupported</code> method of <code>ByteArrayInputStream</code> - * always returns <code>true</code>. - * - * @since JDK1.1 - */ - @Override - public boolean markSupported() { - return true; - } - - /** - * Set the current marked position in the stream. ByteArrayInputStream - * objects are marked at position zero by default when constructed. They may - * be marked at another position within the buffer by this method. - * <p> - * If no mark has been set, then the value of the mark is the offset passed - * to the constructor (or 0 if the offset was not supplied). - * - * <p> - * Note: The <code>readAheadLimit</code> for this class has no meaning. - * - * @since JDK1.1 - */ - @Override - public void mark(int readAheadLimit) { - mark = pos; - } - - /** - * Resets the buffer to the marked position. The marked position is 0 unless - * another position was marked or an offset was specified in the - * constructor. - */ - @Override - public void reset() { - pos = mark; - } - - /** - * Closing a <tt>ByteArrayInputStream</tt> has no effect. The methods in - * this class can be called after the stream has been closed without - * generating an <tt>IOException</tt>. - * <p> - */ - @Override - public void close() { - } - -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/ByteArrayOutputStream.java ---------------------------------------------------------------------- diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/ByteArrayOutputStream.java b/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/ByteArrayOutputStream.java deleted file mode 100644 index 459563b..0000000 --- a/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/ByteArrayOutputStream.java +++ /dev/null @@ -1,250 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.stream.io; - -import java.io.IOException; -import java.io.OutputStream; -import java.io.UnsupportedEncodingException; -import java.util.Arrays; - -/** - * This class provides a more efficient implementation of the - * java.io.ByteArrayOutputStream. The efficiency is gained in two ways: - * <ul> - * <li>The write methods are not synchronized</li> - * <li>The class provides {@link #getUnderlyingBuffer()} and - * {@link #getBufferLength()}, which can be used to access the underlying byte - * array directly, rather than the System.arraycopy that {@link #toByteArray()} - * uses - * </ul> - * - */ -public class ByteArrayOutputStream extends OutputStream { - - /** - * The buffer where data is stored. - */ - protected byte buf[]; - - /** - * The number of valid bytes in the buffer. - */ - protected int count; - - /** - * Creates a new byte array output stream. The buffer capacity is initially - * 32 bytes, though its size increases if necessary. - */ - public ByteArrayOutputStream() { - this(32); - } - - /** - * Creates a new byte array output stream, with a buffer capacity of the - * specified size, in bytes. - * - * @param size the initial size. - * @exception IllegalArgumentException if size is negative. - */ - public ByteArrayOutputStream(int size) { - if (size < 0) { - throw new IllegalArgumentException("Negative initial size: " - + size); - } - buf = new byte[size]; - } - - /** - * Increases the capacity if necessary to ensure that it can hold at least - * the number of elements specified by the minimum capacity argument. - * - * @param minCapacity the desired minimum capacity - * @throws OutOfMemoryError if {@code minCapacity < 0}. This is interpreted - * as a request for the unsatisfiably large capacity - * {@code (long) Integer.MAX_VALUE + (minCapacity - Integer.MAX_VALUE)}. - */ - private void ensureCapacity(int minCapacity) { - // overflow-conscious code - if (minCapacity - buf.length > 0) { - grow(minCapacity); - } - } - - /** - * Increases the capacity to ensure that it can hold at least the number of - * elements specified by the minimum capacity argument. - * - * @param minCapacity the desired minimum capacity - */ - private void grow(int minCapacity) { - // overflow-conscious code - int oldCapacity = buf.length; - int newCapacity = oldCapacity << 1; - if (newCapacity - minCapacity < 0) { - newCapacity = minCapacity; - } - if (newCapacity < 0) { - if (minCapacity < 0) // overflow - { - throw new OutOfMemoryError(); - } - newCapacity = Integer.MAX_VALUE; - } - buf = Arrays.copyOf(buf, newCapacity); - } - - /** - * Writes the specified byte to this byte array output stream. - * - * @param b the byte to be written. - */ - @Override - public void write(int b) { - ensureCapacity(count + 1); - buf[count] = (byte) b; - count += 1; - } - - /** - * Writes <code>len</code> bytes from the specified byte array starting at - * offset <code>off</code> to this byte array output stream. - * - * @param b the data. - * @param off the start offset in the data. - * @param len the number of bytes to write. - */ - @Override - public void write(byte b[], int off, int len) { - if ((off < 0) || (off > b.length) || (len < 0) - || ((off + len) - b.length > 0)) { - throw new IndexOutOfBoundsException(); - } - ensureCapacity(count + len); - System.arraycopy(b, off, buf, count, len); - count += len; - } - - /** - * Writes the complete contents of this byte array output stream to the - * specified output stream argument, as if by calling the output stream's - * write method using <code>out.write(buf, 0, count)</code>. - * - * @param out the output stream to which to write the data. - * @exception IOException if an I/O error occurs. - */ - public void writeTo(OutputStream out) throws IOException { - out.write(buf, 0, count); - } - - /** - * Resets the <code>count</code> field of this byte array output stream to - * zero, so that all currently accumulated output in the output stream is - * discarded. The output stream can be used again, reusing the already - * allocated buffer space. - * - * @see java.io.ByteArrayInputStream#count - */ - public void reset() { - count = 0; - } - - /** - * Creates a newly allocated byte array. Its size is the current size of - * this output stream and the valid contents of the buffer have been copied - * into it. - * - * @return the current contents of this output stream, as a byte array. - * @see java.io.ByteArrayOutputStream#size() - */ - public byte toByteArray () - [] { - return Arrays.copyOf(buf, count); - } - - /** - * Returns the current size of the buffer. - * - * @return the value of the <code>count</code> field, which is the number of - * valid bytes in this output stream. - * @see java.io.ByteArrayOutputStream#count - */ - public int size() { - return count; - } - - /** - * Converts the buffer's contents into a string decoding bytes using the - * platform's default character set. The length of the new <tt>String</tt> - * is a function of the character set, and hence may not be equal to the - * size of the buffer. - * - * <p> - * This method always replaces malformed-input and unmappable-character - * sequences with the default replacement string for the platform's default - * character set. The {@linkplain java.nio.charset.CharsetDecoder} class - * should be used when more control over the decoding process is required. - * - * @return String decoded from the buffer's contents. - * @since JDK1.1 - */ - @Override - public String toString() { - return new String(buf, 0, count); - } - - /** - * Converts the buffer's contents into a string by decoding the bytes using - * the specified {@link java.nio.charset.Charset charsetName}. The length of - * the new <tt>String</tt> is a function of the charset, and hence may not - * be equal to the length of the byte array. - * - * <p> - * This method always replaces malformed-input and unmappable-character - * sequences with this charset's default replacement string. The {@link - * java.nio.charset.CharsetDecoder} class should be used when more control - * over the decoding process is required. - * - * @param charsetName the name of a supported - * {@linkplain java.nio.charset.Charset <code>charset</code>} - * @return String decoded from the buffer's contents. - * @exception UnsupportedEncodingException If the named charset is not - * supported - * @since JDK1.1 - */ - public String toString(String charsetName) throws UnsupportedEncodingException { - return new String(buf, 0, count, charsetName); - } - - /** - * Closing a <tt>ByteArrayOutputStream</tt> has no effect. The methods in - * this class can be called after the stream has been closed without - * generating an <tt>IOException</tt>. - * <p> - * - */ - @Override - public void close() { - } - - public byte[] getUnderlyingBuffer() { - return buf; - } - - public int getBufferLength() { - return count; - } -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/ByteCountingInputStream.java ---------------------------------------------------------------------- diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/ByteCountingInputStream.java b/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/ByteCountingInputStream.java deleted file mode 100644 index 8294af3..0000000 --- a/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/ByteCountingInputStream.java +++ /dev/null @@ -1,104 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.stream.io; - -import java.io.IOException; -import java.io.InputStream; - -public class ByteCountingInputStream extends InputStream { - - private final InputStream in; - private long bytesRead = 0L; - private long bytesSkipped = 0L; - - private long bytesSinceMark = 0L; - - public ByteCountingInputStream(final InputStream in) { - this.in = in; - } - - @Override - public int read() throws IOException { - final int fromSuper = in.read(); - if (fromSuper >= 0) { - bytesRead++; - bytesSinceMark++; - } - return fromSuper; - } - - @Override - public int read(byte[] b, int off, int len) throws IOException { - final int fromSuper = in.read(b, off, len); - if (fromSuper >= 0) { - bytesRead += fromSuper; - bytesSinceMark += fromSuper; - } - - return fromSuper; - } - - @Override - public int read(byte[] b) throws IOException { - return read(b, 0, b.length); - } - - @Override - public long skip(final long n) throws IOException { - final long skipped = in.skip(n); - if (skipped >= 0) { - bytesSkipped += skipped; - bytesSinceMark += skipped; - } - return skipped; - } - - public long getBytesRead() { - return bytesRead; - } - - public long getBytesSkipped() { - return bytesSkipped; - } - - public long getBytesConsumed() { - return getBytesRead() + getBytesSkipped(); - } - - @Override - public void mark(final int readlimit) { - in.mark(readlimit); - - bytesSinceMark = 0L; - } - - @Override - public boolean markSupported() { - return in.markSupported(); - } - - @Override - public void reset() throws IOException { - in.reset(); - bytesRead -= bytesSinceMark; - } - - @Override - public void close() throws IOException { - in.close(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/ByteCountingOutputStream.java ---------------------------------------------------------------------- diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/ByteCountingOutputStream.java b/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/ByteCountingOutputStream.java deleted file mode 100644 index d8e1a42..0000000 --- a/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/ByteCountingOutputStream.java +++ /dev/null @@ -1,63 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.stream.io; - -import java.io.IOException; -import java.io.OutputStream; - -public class ByteCountingOutputStream extends OutputStream { - - private final OutputStream out; - private long bytesWritten = 0L; - - public ByteCountingOutputStream(final OutputStream out) { - this.out = out; - } - - @Override - public void write(int b) throws IOException { - out.write(b); - bytesWritten++; - } - - @Override - public void write(byte[] b) throws IOException { - write(b, 0, b.length); - } - - ; - - @Override - public void write(byte[] b, int off, int len) throws IOException { - out.write(b, off, len); - bytesWritten += len; - } - - public long getBytesWritten() { - return bytesWritten; - } - - @Override - public void flush() throws IOException { - out.flush(); - } - - @Override - public void close() throws IOException { - out.close(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/DataOutputStream.java ---------------------------------------------------------------------- diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/DataOutputStream.java b/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/DataOutputStream.java deleted file mode 100644 index 1dd90f5..0000000 --- a/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/DataOutputStream.java +++ /dev/null @@ -1,417 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.stream.io; - -import java.io.DataOutput; -import java.io.FilterOutputStream; -import java.io.IOException; -import java.io.OutputStream; -import java.io.UTFDataFormatException; - -/** - * This class is different from java.io.DataOutputStream in that it does - * synchronize on its methods. - */ -public class DataOutputStream extends FilterOutputStream implements DataOutput { - - /** - * The number of bytes written to the data output stream so far. If this - * counter overflows, it will be wrapped to Integer.MAX_VALUE. - */ - protected int written; - - /** - * bytearr is initialized on demand by writeUTF - */ - private byte[] bytearr = null; - - /** - * Creates a new data output stream to write data to the specified - * underlying output stream. The counter <code>written</code> is set to - * zero. - * - * @param out the underlying output stream, to be saved for later use. - * @see java.io.FilterOutputStream#out - */ - public DataOutputStream(OutputStream out) { - super(out); - } - - /** - * Increases the written counter by the specified value until it reaches - * Integer.MAX_VALUE. - */ - private void incCount(int value) { - int temp = written + value; - if (temp < 0) { - temp = Integer.MAX_VALUE; - } - written = temp; - } - - /** - * Writes the specified byte (the low eight bits of the argument - * <code>b</code>) to the underlying output stream. If no exception is - * thrown, the counter <code>written</code> is incremented by - * <code>1</code>. - * <p> - * Implements the <code>write</code> method of <code>OutputStream</code>. - * - * @param b the <code>byte</code> to be written. - * @exception IOException if an I/O error occurs. - * @see java.io.FilterOutputStream#out - */ - @Override - public void write(int b) throws IOException { - out.write(b); - incCount(1); - } - - /** - * Writes <code>len</code> bytes from the specified byte array starting at - * offset <code>off</code> to the underlying output stream. If no exception - * is thrown, the counter <code>written</code> is incremented by - * <code>len</code>. - * - * @param b the data. - * @param off the start offset in the data. - * @param len the number of bytes to write. - * @exception IOException if an I/O error occurs. - * @see java.io.FilterOutputStream#out - */ - @Override - public void write(byte b[], int off, int len) throws IOException { - out.write(b, off, len); - incCount(len); - } - - /** - * Flushes this data output stream. This forces any buffered output bytes to - * be written out to the stream. - * <p> - * The <code>flush</code> method of <code>DataOutputStream</code> calls the - * <code>flush</code> method of its underlying output stream. - * - * @exception IOException if an I/O error occurs. - * @see java.io.FilterOutputStream#out - * @see java.io.OutputStream#flush() - */ - @Override - public void flush() throws IOException { - out.flush(); - } - - /** - * Writes a <code>boolean</code> to the underlying output stream as a 1-byte - * value. The value <code>true</code> is written out as the value - * <code>(byte)1</code>; the value <code>false</code> is written out as the - * value <code>(byte)0</code>. If no exception is thrown, the counter - * <code>written</code> is incremented by <code>1</code>. - * - * @param v a <code>boolean</code> value to be written. - * @exception IOException if an I/O error occurs. - * @see java.io.FilterOutputStream#out - */ - @Override - public final void writeBoolean(boolean v) throws IOException { - out.write(v ? 1 : 0); - incCount(1); - } - - /** - * Writes out a <code>byte</code> to the underlying output stream as a - * 1-byte value. If no exception is thrown, the counter <code>written</code> - * is incremented by <code>1</code>. - * - * @param v a <code>byte</code> value to be written. - * @exception IOException if an I/O error occurs. - * @see java.io.FilterOutputStream#out - */ - @Override - public final void writeByte(int v) throws IOException { - out.write(v); - incCount(1); - } - - /** - * Writes a <code>short</code> to the underlying output stream as two bytes, - * high byte first. If no exception is thrown, the counter - * <code>written</code> is incremented by <code>2</code>. - * - * @param v a <code>short</code> to be written. - * @exception IOException if an I/O error occurs. - * @see java.io.FilterOutputStream#out - */ - @Override - public final void writeShort(int v) throws IOException { - out.write((v >>> 8) & 0xFF); - out.write((v) & 0xFF); - incCount(2); - } - - /** - * Writes a <code>char</code> to the underlying output stream as a 2-byte - * value, high byte first. If no exception is thrown, the counter - * <code>written</code> is incremented by <code>2</code>. - * - * @param v a <code>char</code> value to be written. - * @exception IOException if an I/O error occurs. - * @see java.io.FilterOutputStream#out - */ - @Override - public final void writeChar(int v) throws IOException { - out.write((v >>> 8) & 0xFF); - out.write((v) & 0xFF); - incCount(2); - } - - /** - * Writes an <code>int</code> to the underlying output stream as four bytes, - * high byte first. If no exception is thrown, the counter - * <code>written</code> is incremented by <code>4</code>. - * - * @param v an <code>int</code> to be written. - * @exception IOException if an I/O error occurs. - * @see java.io.FilterOutputStream#out - */ - @Override - public final void writeInt(int v) throws IOException { - out.write((v >>> 24) & 0xFF); - out.write((v >>> 16) & 0xFF); - out.write((v >>> 8) & 0xFF); - out.write((v) & 0xFF); - incCount(4); - } - - private final byte writeBuffer[] = new byte[8]; - - /** - * Writes a <code>long</code> to the underlying output stream as eight - * bytes, high byte first. In no exception is thrown, the counter - * <code>written</code> is incremented by <code>8</code>. - * - * @param v a <code>long</code> to be written. - * @exception IOException if an I/O error occurs. - * @see java.io.FilterOutputStream#out - */ - @Override - public final void writeLong(long v) throws IOException { - writeBuffer[0] = (byte) (v >>> 56); - writeBuffer[1] = (byte) (v >>> 48); - writeBuffer[2] = (byte) (v >>> 40); - writeBuffer[3] = (byte) (v >>> 32); - writeBuffer[4] = (byte) (v >>> 24); - writeBuffer[5] = (byte) (v >>> 16); - writeBuffer[6] = (byte) (v >>> 8); - writeBuffer[7] = (byte) (v); - out.write(writeBuffer, 0, 8); - incCount(8); - } - - /** - * Converts the float argument to an <code>int</code> using the - * <code>floatToIntBits</code> method in class <code>Float</code>, and then - * writes that <code>int</code> value to the underlying output stream as a - * 4-byte quantity, high byte first. If no exception is thrown, the counter - * <code>written</code> is incremented by <code>4</code>. - * - * @param v a <code>float</code> value to be written. - * @exception IOException if an I/O error occurs. - * @see java.io.FilterOutputStream#out - * @see java.lang.Float#floatToIntBits(float) - */ - @Override - public final void writeFloat(float v) throws IOException { - writeInt(Float.floatToIntBits(v)); - } - - /** - * Converts the double argument to a <code>long</code> using the - * <code>doubleToLongBits</code> method in class <code>Double</code>, and - * then writes that <code>long</code> value to the underlying output stream - * as an 8-byte quantity, high byte first. If no exception is thrown, the - * counter <code>written</code> is incremented by <code>8</code>. - * - * @param v a <code>double</code> value to be written. - * @exception IOException if an I/O error occurs. - * @see java.io.FilterOutputStream#out - * @see java.lang.Double#doubleToLongBits(double) - */ - @Override - public final void writeDouble(double v) throws IOException { - writeLong(Double.doubleToLongBits(v)); - } - - /** - * Writes out the string to the underlying output stream as a sequence of - * bytes. Each character in the string is written out, in sequence, by - * discarding its high eight bits. If no exception is thrown, the counter - * <code>written</code> is incremented by the length of <code>s</code>. - * - * @param s a string of bytes to be written. - * @exception IOException if an I/O error occurs. - * @see java.io.FilterOutputStream#out - */ - @Override - public final void writeBytes(String s) throws IOException { - int len = s.length(); - for (int i = 0; i < len; i++) { - out.write((byte) s.charAt(i)); - } - incCount(len); - } - - /** - * Writes a string to the underlying output stream as a sequence of - * characters. Each character is written to the data output stream as if by - * the <code>writeChar</code> method. If no exception is thrown, the counter - * <code>written</code> is incremented by twice the length of - * <code>s</code>. - * - * @param s a <code>String</code> value to be written. - * @exception IOException if an I/O error occurs. - * @see java.io.DataOutputStream#writeChar(int) - * @see java.io.FilterOutputStream#out - */ - @Override - public final void writeChars(String s) throws IOException { - int len = s.length(); - for (int i = 0; i < len; i++) { - int v = s.charAt(i); - out.write((v >>> 8) & 0xFF); - out.write((v) & 0xFF); - } - incCount(len * 2); - } - - /** - * Writes a string to the underlying output stream using - * <a href="DataInput.html#modified-utf-8">modified UTF-8</a> - * encoding in a machine-independent manner. - * <p> - * First, two bytes are written to the output stream as if by the - * <code>writeShort</code> method giving the number of bytes to follow. This - * value is the number of bytes actually written out, not the length of the - * string. Following the length, each character of the string is output, in - * sequence, using the modified UTF-8 encoding for the character. If no - * exception is thrown, the counter <code>written</code> is incremented by - * the total number of bytes written to the output stream. This will be at - * least two plus the length of <code>str</code>, and at most two plus - * thrice the length of <code>str</code>. - * - * @param str a string to be written. - * @exception IOException if an I/O error occurs. - */ - @Override - public final void writeUTF(String str) throws IOException { - writeUTF(str, this); - } - - /** - * Writes a string to the specified DataOutput using - * <a href="DataInput.html#modified-utf-8">modified UTF-8</a> - * encoding in a machine-independent manner. - * <p> - * First, two bytes are written to out as if by the <code>writeShort</code> - * method giving the number of bytes to follow. This value is the number of - * bytes actually written out, not the length of the string. Following the - * length, each character of the string is output, in sequence, using the - * modified UTF-8 encoding for the character. If no exception is thrown, the - * counter <code>written</code> is incremented by the total number of bytes - * written to the output stream. This will be at least two plus the length - * of <code>str</code>, and at most two plus thrice the length of - * <code>str</code>. - * - * @param str a string to be written. - * @param out destination to write to - * @return The number of bytes written out. - * @exception IOException if an I/O error occurs. - */ - static int writeUTF(String str, DataOutput out) throws IOException { - int strlen = str.length(); - int utflen = 0; - int c, count = 0; - - /* use charAt instead of copying String to char array */ - for (int i = 0; i < strlen; i++) { - c = str.charAt(i); - if ((c >= 0x0001) && (c <= 0x007F)) { - utflen++; - } else if (c > 0x07FF) { - utflen += 3; - } else { - utflen += 2; - } - } - - if (utflen > 65535) { - throw new UTFDataFormatException("encoded string too long: " + utflen + " bytes"); - } - - byte[] bytearr = null; - if (out instanceof DataOutputStream) { - DataOutputStream dos = (DataOutputStream) out; - if (dos.bytearr == null || (dos.bytearr.length < (utflen + 2))) { - dos.bytearr = new byte[(utflen * 2) + 2]; - } - bytearr = dos.bytearr; - } else { - bytearr = new byte[utflen + 2]; - } - - bytearr[count++] = (byte) ((utflen >>> 8) & 0xFF); - bytearr[count++] = (byte) ((utflen) & 0xFF); - - int i = 0; - for (i = 0; i < strlen; i++) { - c = str.charAt(i); - if (!((c >= 0x0001) && (c <= 0x007F))) { - break; - } - bytearr[count++] = (byte) c; - } - - for (; i < strlen; i++) { - c = str.charAt(i); - if ((c >= 0x0001) && (c <= 0x007F)) { - bytearr[count++] = (byte) c; - - } else if (c > 0x07FF) { - bytearr[count++] = (byte) (0xE0 | ((c >> 12) & 0x0F)); - bytearr[count++] = (byte) (0x80 | ((c >> 6) & 0x3F)); - bytearr[count++] = (byte) (0x80 | ((c) & 0x3F)); - } else { - bytearr[count++] = (byte) (0xC0 | ((c >> 6) & 0x1F)); - bytearr[count++] = (byte) (0x80 | ((c) & 0x3F)); - } - } - out.write(bytearr, 0, utflen + 2); - return utflen + 2; - } - - /** - * Returns the current value of the counter <code>written</code>, the number - * of bytes written to this data output stream so far. If the counter - * overflows, it will be wrapped to Integer.MAX_VALUE. - * - * @return the value of the <code>written</code> field. - * @see java.io.DataOutputStream#written - */ - public final int size() { - return written; - } -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/300952a9/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/GZIPOutputStream.java ---------------------------------------------------------------------- diff --git a/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/GZIPOutputStream.java b/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/GZIPOutputStream.java deleted file mode 100644 index 2864bbb..0000000 --- a/commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/GZIPOutputStream.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.stream.io; - -import java.io.IOException; -import java.io.OutputStream; - -/** - * <p> - * This class extends the {@link java.util.zip.GZIPOutputStream} by allowing the - * constructor to provide a compression level, and uses a default value of 1, - * rather than 5. - * </p> - */ -public class GZIPOutputStream extends java.util.zip.GZIPOutputStream { - - public static final int DEFAULT_COMPRESSION_LEVEL = 1; - - public GZIPOutputStream(final OutputStream out) throws IOException { - this(out, DEFAULT_COMPRESSION_LEVEL); - } - - public GZIPOutputStream(final OutputStream out, final int compressionLevel) throws IOException { - super(out); - def.setLevel(compressionLevel); - } -}