http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/remote-communications-utils/src/main/java/org/apache/nifi/remote/io/CompressionOutputStream.java ---------------------------------------------------------------------- diff --git a/commons/remote-communications-utils/src/main/java/org/apache/nifi/remote/io/CompressionOutputStream.java b/commons/remote-communications-utils/src/main/java/org/apache/nifi/remote/io/CompressionOutputStream.java new file mode 100644 index 0000000..bc46b0f --- /dev/null +++ b/commons/remote-communications-utils/src/main/java/org/apache/nifi/remote/io/CompressionOutputStream.java @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.remote.io; + +import java.io.IOException; +import java.io.OutputStream; +import java.util.zip.Deflater; + +public class CompressionOutputStream extends OutputStream { + + public static final byte[] SYNC_BYTES = new byte[]{'S', 'Y', 'N', 'C'}; + + public static final int DEFAULT_COMPRESSION_LEVEL = 1; + public static final int DEFAULT_BUFFER_SIZE = 64 << 10; + public static final int MIN_BUFFER_SIZE = 8 << 10; + + private final OutputStream out; + private final Deflater deflater; + + private final byte[] buffer; + private final byte[] compressed; + + private int bufferIndex = 0; + private boolean dataWritten = false; + + public CompressionOutputStream(final OutputStream outStream) { + this(outStream, DEFAULT_BUFFER_SIZE); + } + + public CompressionOutputStream(final OutputStream outStream, final int bufferSize) { + this(outStream, bufferSize, DEFAULT_COMPRESSION_LEVEL, Deflater.DEFAULT_STRATEGY); + } + + public CompressionOutputStream(final OutputStream outStream, final int bufferSize, final int level, final int strategy) { + if (bufferSize < MIN_BUFFER_SIZE) { + throw new IllegalArgumentException("Buffer size must be at least " + MIN_BUFFER_SIZE); + } + + this.out = outStream; + this.deflater = new Deflater(level); + this.deflater.setStrategy(strategy); + buffer = new byte[bufferSize]; + compressed = new byte[bufferSize + 64]; + } + + /** + * Compresses the currently buffered chunk of data and sends it to the + * output stream + * + * @throws IOException + */ + protected void compressAndWrite() throws IOException { + if (bufferIndex <= 0) { + return; + } + + deflater.setInput(buffer, 0, bufferIndex); + deflater.finish(); + final int compressedBytes = deflater.deflate(compressed); + + writeChunkHeader(compressedBytes); + out.write(compressed, 0, compressedBytes); + + bufferIndex = 0; + deflater.reset(); + } + + private void writeChunkHeader(final int compressedBytes) throws IOException { + // If we have already written data, write out a '1' to indicate that we have more data; when we close + // the stream, we instead write a '0' to indicate that we are finished sending data. + if (dataWritten) { + out.write(1); + } + out.write(SYNC_BYTES); + dataWritten = true; + + writeInt(out, bufferIndex); + writeInt(out, compressedBytes); + } + + private void writeInt(final OutputStream out, final int val) throws IOException { + out.write(val >>> 24); + out.write(val >>> 16); + out.write(val >>> 8); + out.write(val); + } + + protected boolean bufferFull() { + return bufferIndex >= buffer.length; + } + + @Override + public void write(final int b) throws IOException { + buffer[bufferIndex++] = (byte) (b & 0xFF); + if (bufferFull()) { + compressAndWrite(); + } + } + + @Override + public void write(final byte[] b) throws IOException { + write(b, 0, b.length); + } + + @Override + public void write(final byte[] b, final int off, final int len) throws IOException { + int bytesLeft = len; + while (bytesLeft > 0) { + final int free = buffer.length - bufferIndex; + final int bytesThisIteration = Math.min(bytesLeft, free); + System.arraycopy(b, off + len - bytesLeft, buffer, bufferIndex, bytesThisIteration); + bufferIndex += bytesThisIteration; + + bytesLeft -= bytesThisIteration; + if (bufferFull()) { + compressAndWrite(); + } + } + } + + @Override + public void flush() throws IOException { + compressAndWrite(); + super.flush(); + } + + @Override + public void close() throws IOException { + compressAndWrite(); + out.write(0); // indicate that the stream is finished. + out.flush(); + } +}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/remote-communications-utils/src/main/java/org/apache/nifi/remote/io/InterruptableInputStream.java ---------------------------------------------------------------------- diff --git a/commons/remote-communications-utils/src/main/java/org/apache/nifi/remote/io/InterruptableInputStream.java b/commons/remote-communications-utils/src/main/java/org/apache/nifi/remote/io/InterruptableInputStream.java new file mode 100644 index 0000000..e03dfbf --- /dev/null +++ b/commons/remote-communications-utils/src/main/java/org/apache/nifi/remote/io/InterruptableInputStream.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.remote.io; + +import java.io.IOException; +import java.io.InputStream; + +import org.apache.nifi.remote.exception.TransmissionDisabledException; + +public class InterruptableInputStream extends InputStream { + + private volatile boolean interrupted = false; + private final InputStream in; + + public InterruptableInputStream(final InputStream in) { + this.in = in; + } + + @Override + public int read() throws IOException { + if (interrupted) { + throw new TransmissionDisabledException(); + } + + return in.read(); + } + + @Override + public int read(byte[] b) throws IOException { + if (interrupted) { + throw new TransmissionDisabledException(); + } + + return in.read(b); + } + + @Override + public int read(byte[] b, int off, int len) throws IOException { + if (interrupted) { + throw new TransmissionDisabledException(); + } + + return in.read(b, off, len); + } + + @Override + public int available() throws IOException { + if (interrupted) { + throw new TransmissionDisabledException(); + } + + return in.available(); + } + + @Override + public void close() throws IOException { + if (interrupted) { + throw new TransmissionDisabledException(); + } + + in.close(); + } + + @Override + public synchronized void mark(int readlimit) { + if (interrupted) { + throw new TransmissionDisabledException(); + } + + in.mark(readlimit); + } + + @Override + public boolean markSupported() { + if (interrupted) { + throw new TransmissionDisabledException(); + } + + return in.markSupported(); + } + + @Override + public synchronized void reset() throws IOException { + if (interrupted) { + throw new TransmissionDisabledException(); + } + + in.reset(); + } + + @Override + public long skip(long n) throws IOException { + if (interrupted) { + throw new TransmissionDisabledException(); + } + + return in.skip(n); + } + + public void interrupt() { + interrupted = true; + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/remote-communications-utils/src/main/java/org/apache/nifi/remote/io/InterruptableOutputStream.java ---------------------------------------------------------------------- diff --git a/commons/remote-communications-utils/src/main/java/org/apache/nifi/remote/io/InterruptableOutputStream.java b/commons/remote-communications-utils/src/main/java/org/apache/nifi/remote/io/InterruptableOutputStream.java new file mode 100644 index 0000000..cba5be6 --- /dev/null +++ b/commons/remote-communications-utils/src/main/java/org/apache/nifi/remote/io/InterruptableOutputStream.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.remote.io; + +import java.io.IOException; +import java.io.OutputStream; + +import org.apache.nifi.remote.exception.TransmissionDisabledException; + +public class InterruptableOutputStream extends OutputStream { + + private final OutputStream out; + private volatile boolean interrupted = false; + + public InterruptableOutputStream(final OutputStream out) { + this.out = out; + } + + @Override + public void write(int b) throws IOException { + if (interrupted) { + throw new TransmissionDisabledException(); + } + + out.write(b); + } + + @Override + public void write(byte[] b) throws IOException { + if (interrupted) { + throw new TransmissionDisabledException(); + } + + out.write(b); + } + + @Override + public void write(byte[] b, int off, int len) throws IOException { + if (interrupted) { + throw new TransmissionDisabledException(); + } + + out.write(b, off, len); + } + + @Override + public void close() throws IOException { + if (interrupted) { + throw new TransmissionDisabledException(); + } + + out.close(); + } + + @Override + public void flush() throws IOException { + if (interrupted) { + throw new TransmissionDisabledException(); + } + + out.flush(); + } + + public void interrupt() { + this.interrupted = true; + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/remote-communications-utils/src/main/java/org/apache/nifi/remote/io/socket/BufferStateManager.java ---------------------------------------------------------------------- diff --git a/commons/remote-communications-utils/src/main/java/org/apache/nifi/remote/io/socket/BufferStateManager.java b/commons/remote-communications-utils/src/main/java/org/apache/nifi/remote/io/socket/BufferStateManager.java new file mode 100644 index 0000000..68913bd --- /dev/null +++ b/commons/remote-communications-utils/src/main/java/org/apache/nifi/remote/io/socket/BufferStateManager.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.remote.io.socket; + +import java.nio.ByteBuffer; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class BufferStateManager { + + private static final Logger logger = LoggerFactory.getLogger(BufferStateManager.class); + + private ByteBuffer buffer; + private Direction direction = Direction.WRITE; + + public BufferStateManager(final ByteBuffer buffer) { + this.buffer = buffer; + } + + public BufferStateManager(final ByteBuffer buffer, final Direction direction) { + this.buffer = buffer; + this.direction = direction; + } + + /** + * Ensures that the buffer is at least as big as the size specified, + * resizing the buffer if necessary. This operation MAY change the direction + * of the buffer. + * + * @param requiredSize + */ + public void ensureSize(final int requiredSize) { + if (buffer.capacity() < requiredSize) { + final ByteBuffer newBuffer = ByteBuffer.allocate(requiredSize); + + // we have to read buffer so make sure the direction is correct. + if (direction == Direction.WRITE) { + buffer.flip(); + } + + // Copy from buffer to newBuffer + newBuffer.put(buffer); + + // Swap the buffers + buffer = newBuffer; + + // the new buffer is ready to be written to + direction = Direction.WRITE; + } + } + + public ByteBuffer prepareForWrite(final int requiredSize) { + ensureSize(requiredSize); + + if (direction == Direction.READ) { + direction = Direction.WRITE; + buffer.position(buffer.limit()); + } + + buffer.limit(buffer.capacity()); + return buffer; + } + + public ByteBuffer prepareForRead(final int requiredSize) { + ensureSize(requiredSize); + + if (direction == Direction.WRITE) { + direction = Direction.READ; + buffer.flip(); + } + + return buffer; + } + + /** + * Clears the contents of the buffer and sets direction to WRITE + */ + public void clear() { + logger.debug("Clearing {}", buffer); + buffer.clear(); + direction = Direction.WRITE; + } + + public void compact() { + final String before = buffer.toString(); + buffer.compact(); + logger.debug("Before compact: {}, after: {}", before, buffer); + direction = Direction.WRITE; + } + + public static enum Direction { + + READ, WRITE; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/remote-communications-utils/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelInputStream.java ---------------------------------------------------------------------- diff --git a/commons/remote-communications-utils/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelInputStream.java b/commons/remote-communications-utils/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelInputStream.java new file mode 100644 index 0000000..32a3f26 --- /dev/null +++ b/commons/remote-communications-utils/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelInputStream.java @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.remote.io.socket; + +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; +import java.net.SocketTimeoutException; +import java.nio.ByteBuffer; +import java.nio.channels.ClosedByInterruptException; +import java.nio.channels.SocketChannel; +import java.util.concurrent.TimeUnit; + +public class SocketChannelInputStream extends InputStream { + + private static final long CHANNEL_EMPTY_WAIT_NANOS = TimeUnit.NANOSECONDS.convert(10, TimeUnit.MILLISECONDS); + private final SocketChannel channel; + private volatile int timeoutMillis = 30000; + + private final ByteBuffer oneByteBuffer = ByteBuffer.allocate(1); + private Byte bufferedByte = null; + + public SocketChannelInputStream(final SocketChannel socketChannel) throws IOException { + // this class expects a non-blocking channel + socketChannel.configureBlocking(false); + this.channel = socketChannel; + } + + public void setTimeout(final int timeoutMillis) { + this.timeoutMillis = timeoutMillis; + } + + @Override + public int read() throws IOException { + if (bufferedByte != null) { + final int retVal = bufferedByte & 0xFF; + bufferedByte = null; + return retVal; + } + + oneByteBuffer.flip(); + oneByteBuffer.clear(); + + final long maxTime = System.currentTimeMillis() + timeoutMillis; + int bytesRead; + do { + bytesRead = channel.read(oneByteBuffer); + if (bytesRead == 0) { + if (System.currentTimeMillis() > maxTime) { + throw new SocketTimeoutException("Timed out reading from socket"); + } + try { + TimeUnit.NANOSECONDS.sleep(CHANNEL_EMPTY_WAIT_NANOS); + } catch (InterruptedException e) { + close(); + Thread.currentThread().interrupt(); // set the interrupt status + throw new ClosedByInterruptException(); // simulate an interrupted blocked read operation + } + } + } while (bytesRead == 0); + + if (bytesRead == -1) { + return -1; + } + oneByteBuffer.flip(); + return oneByteBuffer.get() & 0xFF; + } + + @Override + public int read(final byte[] b) throws IOException { + return read(b, 0, b.length); + } + + @Override + public int read(final byte[] b, final int off, final int len) throws IOException { + if (bufferedByte != null) { + final byte retVal = bufferedByte; + bufferedByte = null; + b[off] = retVal; + return 1; + } + + final ByteBuffer buffer = ByteBuffer.wrap(b, off, len); + + final long maxTime = System.currentTimeMillis() + timeoutMillis; + int bytesRead; + do { + bytesRead = channel.read(buffer); + if (bytesRead == 0) { + if (System.currentTimeMillis() > maxTime) { + throw new SocketTimeoutException("Timed out reading from socket"); + } + try { + TimeUnit.NANOSECONDS.sleep(CHANNEL_EMPTY_WAIT_NANOS); + } catch (InterruptedException e) { + close(); + Thread.currentThread().interrupt(); // set the interrupt status + throw new ClosedByInterruptException(); // simulate an interrupted blocked read operation + } + } + } while (bytesRead == 0); + + return bytesRead; + } + + @Override + public int available() throws IOException { + if (bufferedByte != null) { + return 1; + } + + isDataAvailable(); // attempt to read from socket + return (bufferedByte == null) ? 0 : 1; + } + + public boolean isDataAvailable() throws IOException { + if (bufferedByte != null) { + return true; + } + + oneByteBuffer.flip(); + oneByteBuffer.clear(); + final int bytesRead = channel.read(oneByteBuffer); + if (bytesRead == -1) { + throw new EOFException("Peer has closed the stream"); + } + if (bytesRead > 0) { + oneByteBuffer.flip(); + bufferedByte = oneByteBuffer.get(); + return true; + } + return false; + } + + /** + * Closes the underlying socket channel. + * @throws java.io.IOException + */ + @Override + public void close() throws IOException { + channel.close(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/remote-communications-utils/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelOutputStream.java ---------------------------------------------------------------------- diff --git a/commons/remote-communications-utils/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelOutputStream.java b/commons/remote-communications-utils/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelOutputStream.java new file mode 100644 index 0000000..77049ad --- /dev/null +++ b/commons/remote-communications-utils/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelOutputStream.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.remote.io.socket; + +import java.io.IOException; +import java.io.OutputStream; +import java.net.SocketTimeoutException; +import java.nio.ByteBuffer; +import java.nio.channels.ClosedByInterruptException; +import java.nio.channels.SocketChannel; +import java.util.concurrent.TimeUnit; + +public class SocketChannelOutputStream extends OutputStream { + + private static final long CHANNEL_FULL_WAIT_NANOS = TimeUnit.NANOSECONDS.convert(10, TimeUnit.MILLISECONDS); + private final SocketChannel channel; + private volatile int timeout = 30000; + + private final ByteBuffer oneByteBuffer = ByteBuffer.allocate(1); + + public SocketChannelOutputStream(final SocketChannel socketChannel) throws IOException { + // this class expects a non-blocking channel + socketChannel.configureBlocking(false); + this.channel = socketChannel; + } + + public void setTimeout(final int timeoutMillis) { + this.timeout = timeoutMillis; + } + + @Override + public void write(final int b) throws IOException { + oneByteBuffer.flip(); + oneByteBuffer.clear(); + oneByteBuffer.put((byte) b); + oneByteBuffer.flip(); + + final int timeoutMillis = this.timeout; + long maxTime = System.currentTimeMillis() + timeoutMillis; + int bytesWritten; + while (oneByteBuffer.hasRemaining()) { + bytesWritten = channel.write(oneByteBuffer); + if (bytesWritten == 0) { + if (System.currentTimeMillis() > maxTime) { + throw new SocketTimeoutException("Timed out writing to socket"); + } + try { + TimeUnit.NANOSECONDS.sleep(CHANNEL_FULL_WAIT_NANOS); + } catch (InterruptedException e) { + close(); + Thread.currentThread().interrupt(); // set the interrupt status + throw new ClosedByInterruptException(); // simulate an interrupted blocked write operation + } + } else { + return; + } + } + } + + @Override + public void write(final byte[] b) throws IOException { + write(b, 0, b.length); + } + + @Override + public void write(final byte[] b, final int off, final int len) throws IOException { + final ByteBuffer buffer = ByteBuffer.wrap(b, off, len); + + final int timeoutMillis = this.timeout; + long maxTime = System.currentTimeMillis() + timeoutMillis; + int bytesWritten; + while (buffer.hasRemaining()) { + bytesWritten = channel.write(buffer); + if (bytesWritten == 0) { + if (System.currentTimeMillis() > maxTime) { + throw new SocketTimeoutException("Timed out writing to socket"); + } + try { + TimeUnit.NANOSECONDS.sleep(CHANNEL_FULL_WAIT_NANOS); + } catch (InterruptedException e) { + close(); + Thread.currentThread().interrupt(); // set the interrupt status + throw new ClosedByInterruptException(); // simulate an interrupted blocked write operation + } + } else { + maxTime = System.currentTimeMillis() + timeoutMillis; + } + } + } + + /** + * Closes the underlying SocketChannel + * @throws java.io.IOException + */ + @Override + public void close() throws IOException { + channel.close(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/remote-communications-utils/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannel.java ---------------------------------------------------------------------- diff --git a/commons/remote-communications-utils/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannel.java b/commons/remote-communications-utils/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannel.java new file mode 100644 index 0000000..5810488 --- /dev/null +++ b/commons/remote-communications-utils/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannel.java @@ -0,0 +1,602 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.remote.io.socket.ssl; + +import java.io.Closeable; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.Socket; +import java.net.SocketAddress; +import java.net.SocketTimeoutException; +import java.nio.ByteBuffer; +import java.nio.channels.ClosedByInterruptException; +import java.nio.channels.SocketChannel; +import java.util.concurrent.TimeUnit; + +import javax.net.ssl.SSLContext; +import javax.net.ssl.SSLEngine; +import javax.net.ssl.SSLEngineResult; +import javax.net.ssl.SSLEngineResult.Status; +import javax.net.ssl.SSLHandshakeException; +import javax.net.ssl.SSLPeerUnverifiedException; +import javax.security.cert.CertificateExpiredException; +import javax.security.cert.CertificateNotYetValidException; +import javax.security.cert.X509Certificate; + +import org.apache.nifi.remote.exception.TransmissionDisabledException; +import org.apache.nifi.remote.io.socket.BufferStateManager; +import org.apache.nifi.remote.io.socket.BufferStateManager.Direction; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class SSLSocketChannel implements Closeable { + + public static final int MAX_WRITE_SIZE = 65536; + + private static final Logger logger = LoggerFactory.getLogger(SSLSocketChannel.class); + private static final long BUFFER_FULL_EMPTY_WAIT_NANOS = TimeUnit.NANOSECONDS.convert(10, TimeUnit.MILLISECONDS); + + private final String hostname; + private final int port; + private final SSLEngine engine; + private final SocketAddress socketAddress; + + private BufferStateManager streamInManager; + private BufferStateManager streamOutManager; + private BufferStateManager appDataManager; + + private SocketChannel channel; + + private final byte[] oneByteBuffer = new byte[1]; + + private int timeoutMillis = 30000; + private volatile boolean connected = false; + private boolean handshaking = false; + private boolean closed = false; + private volatile boolean interrupted = false; + + public SSLSocketChannel(final SSLContext sslContext, final String hostname, final int port, final boolean client) throws IOException { + this.socketAddress = new InetSocketAddress(hostname, port); + this.channel = SocketChannel.open(); + this.hostname = hostname; + this.port = port; + this.engine = sslContext.createSSLEngine(); + this.engine.setUseClientMode(client); + engine.setNeedClientAuth(true); + + streamInManager = new BufferStateManager(ByteBuffer.allocate(engine.getSession().getPacketBufferSize())); + streamOutManager = new BufferStateManager(ByteBuffer.allocate(engine.getSession().getPacketBufferSize())); + appDataManager = new BufferStateManager(ByteBuffer.allocate(engine.getSession().getApplicationBufferSize())); + } + + public SSLSocketChannel(final SSLContext sslContext, final SocketChannel socketChannel, final boolean client) throws IOException { + if (!socketChannel.isConnected()) { + throw new IllegalArgumentException("Cannot pass an un-connected SocketChannel"); + } + + this.channel = socketChannel; + + this.socketAddress = socketChannel.getRemoteAddress(); + final Socket socket = socketChannel.socket(); + this.hostname = socket.getInetAddress().getHostName(); + this.port = socket.getPort(); + + this.engine = sslContext.createSSLEngine(); + this.engine.setUseClientMode(client); + engine.setNeedClientAuth(true); + + streamInManager = new BufferStateManager(ByteBuffer.allocate(engine.getSession().getPacketBufferSize())); + streamOutManager = new BufferStateManager(ByteBuffer.allocate(engine.getSession().getPacketBufferSize())); + appDataManager = new BufferStateManager(ByteBuffer.allocate(engine.getSession().getApplicationBufferSize())); + } + + public void setTimeout(final int millis) { + this.timeoutMillis = millis; + } + + public int getTimeout() { + return timeoutMillis; + } + + public void connect() throws SSLHandshakeException, IOException { + try { + channel.configureBlocking(false); + if (!channel.isConnected()) { + final long startTime = System.currentTimeMillis(); + + if (!channel.connect(socketAddress)) { + while (!channel.finishConnect()) { + if (interrupted) { + throw new TransmissionDisabledException(); + } + if (System.currentTimeMillis() > startTime + timeoutMillis) { + throw new SocketTimeoutException("Timed out connecting to " + hostname + ":" + port); + } + + try { + Thread.sleep(50L); + } catch (final InterruptedException e) { + } + } + } + } + engine.beginHandshake(); + + performHandshake(); + logger.debug("{} Successfully completed SSL handshake", this); + + streamInManager.clear(); + streamOutManager.clear(); + appDataManager.clear(); + + connected = true; + } catch (final Exception e) { + logger.error("{} Failed to connect due to {}", this, e); + if (logger.isDebugEnabled()) { + logger.error("", e); + } + closeQuietly(channel); + engine.closeInbound(); + engine.closeOutbound(); + throw e; + } + } + + public String getDn() throws CertificateExpiredException, CertificateNotYetValidException, SSLPeerUnverifiedException { + final X509Certificate[] certs = engine.getSession().getPeerCertificateChain(); + if (certs == null || certs.length == 0) { + throw new SSLPeerUnverifiedException("No certificates found"); + } + + final X509Certificate cert = certs[0]; + cert.checkValidity(); + return cert.getSubjectDN().getName().trim(); + } + + private void performHandshake() throws IOException { + // Generate handshake message + final byte[] emptyMessage = new byte[0]; + handshaking = true; + logger.debug("{} Performing Handshake", this); + + try { + while (true) { + switch (engine.getHandshakeStatus()) { + case FINISHED: + return; + case NEED_WRAP: { + final ByteBuffer appDataOut = ByteBuffer.wrap(emptyMessage); + + final ByteBuffer outboundBuffer = streamOutManager.prepareForWrite(engine.getSession().getApplicationBufferSize()); + + final SSLEngineResult wrapHelloResult = engine.wrap(appDataOut, outboundBuffer); + if (wrapHelloResult.getStatus() == Status.BUFFER_OVERFLOW) { + streamOutManager.prepareForWrite(engine.getSession().getApplicationBufferSize()); + continue; + } + + if (wrapHelloResult.getStatus() != Status.OK) { + throw new SSLHandshakeException("Could not generate SSL Handshake information: SSLEngineResult: " + + wrapHelloResult.toString()); + } + + logger.trace("{} Handshake response after wrapping: {}", this, wrapHelloResult); + + final ByteBuffer readableStreamOut = streamOutManager.prepareForRead(1); + final int bytesToSend = readableStreamOut.remaining(); + writeFully(readableStreamOut); + logger.trace("{} Sent {} bytes of wrapped data for handshake", this, bytesToSend); + + streamOutManager.clear(); + } + continue; + case NEED_UNWRAP: { + final ByteBuffer readableDataIn = streamInManager.prepareForRead(0); + final ByteBuffer appData = appDataManager.prepareForWrite(engine.getSession().getApplicationBufferSize()); + + // Read handshake response from other side + logger.trace("{} Unwrapping: {} to {}", new Object[]{this, readableDataIn, appData}); + SSLEngineResult handshakeResponseResult = engine.unwrap(readableDataIn, appData); + logger.trace("{} Handshake response after unwrapping: {}", this, handshakeResponseResult); + + if (handshakeResponseResult.getStatus() == Status.BUFFER_UNDERFLOW) { + final ByteBuffer writableDataIn = streamInManager.prepareForWrite(engine.getSession().getPacketBufferSize()); + final int bytesRead = readData(writableDataIn); + if (bytesRead > 0) { + logger.trace("{} Read {} bytes for handshake", this, bytesRead); + } + + if (bytesRead < 0) { + throw new SSLHandshakeException("Reached End-of-File marker while performing handshake"); + } + } else if (handshakeResponseResult.getStatus() == Status.CLOSED) { + throw new IOException("Channel was closed by peer during handshake"); + } else { + streamInManager.compact(); + appDataManager.clear(); + } + } + break; + case NEED_TASK: + performTasks(); + continue; + case NOT_HANDSHAKING: + return; + } + } + } finally { + handshaking = false; + } + } + + private void performTasks() { + Runnable runnable; + while ((runnable = engine.getDelegatedTask()) != null) { + runnable.run(); + } + } + + private void closeQuietly(final Closeable closeable) { + try { + closeable.close(); + } catch (final Exception e) { + } + } + + private int readData(final ByteBuffer dest) throws IOException { + final long startTime = System.currentTimeMillis(); + + while (true) { + if (interrupted) { + throw new TransmissionDisabledException(); + } + + if (dest.remaining() == 0) { + return 0; + } + + final int readCount = channel.read(dest); + + if (readCount == 0) { + if (System.currentTimeMillis() > startTime + timeoutMillis) { + throw new SocketTimeoutException("Timed out reading from socket connected to " + hostname + ":" + port); + } + try { + TimeUnit.NANOSECONDS.sleep(BUFFER_FULL_EMPTY_WAIT_NANOS); + } catch (InterruptedException e) { + close(); + Thread.currentThread().interrupt(); // set the interrupt status + throw new ClosedByInterruptException(); + } + + continue; + } + + logger.trace("{} Read {} bytes", this, readCount); + return readCount; + } + } + + private Status encryptAndWriteFully(final BufferStateManager src) throws IOException { + SSLEngineResult result = null; + + final ByteBuffer buff = src.prepareForRead(0); + final ByteBuffer outBuff = streamOutManager.prepareForWrite(engine.getSession().getApplicationBufferSize()); + + logger.trace("{} Encrypting {} bytes", this, buff.remaining()); + while (buff.remaining() > 0) { + result = engine.wrap(buff, outBuff); + if (result.getStatus() == Status.OK) { + final ByteBuffer readableOutBuff = streamOutManager.prepareForRead(0); + writeFully(readableOutBuff); + streamOutManager.clear(); + } else { + return result.getStatus(); + } + } + + return result.getStatus(); + } + + private void writeFully(final ByteBuffer src) throws IOException { + long lastByteWrittenTime = System.currentTimeMillis(); + + int bytesWritten = 0; + while (src.hasRemaining()) { + if (interrupted) { + throw new TransmissionDisabledException(); + } + + final int written = channel.write(src); + bytesWritten += written; + final long now = System.currentTimeMillis(); + if (written > 0) { + lastByteWrittenTime = now; + } else { + if (now > lastByteWrittenTime + timeoutMillis) { + throw new SocketTimeoutException("Timed out writing to socket connected to " + hostname + ":" + port); + } + try { + TimeUnit.NANOSECONDS.sleep(BUFFER_FULL_EMPTY_WAIT_NANOS); + } catch (final InterruptedException e) { + close(); + Thread.currentThread().interrupt(); // set the interrupt status + throw new ClosedByInterruptException(); + } + } + } + + logger.trace("{} Wrote {} bytes", this, bytesWritten); + } + + public boolean isClosed() { + if (closed) { + return true; + } + // need to detect if peer has sent closure handshake...if so the answer is true + final ByteBuffer writableInBuffer = streamInManager.prepareForWrite(engine.getSession().getPacketBufferSize()); + int readCount = 0; + try { + readCount = channel.read(writableInBuffer); + } catch (IOException e) { + logger.error("{} Failed to readData due to {}", new Object[]{this, e}); + if (logger.isDebugEnabled()) { + logger.error("", e); + } + readCount = -1; // treat the condition same as if End of Stream + } + if (readCount == 0) { + return false; + } + if (readCount > 0) { + logger.trace("{} Read {} bytes", this, readCount); + + final ByteBuffer streamInBuffer = streamInManager.prepareForRead(1); + final ByteBuffer appDataBuffer = appDataManager.prepareForWrite(engine.getSession().getApplicationBufferSize()); + try { + SSLEngineResult unwrapResponse = engine.unwrap(streamInBuffer, appDataBuffer); + logger.trace("{} When checking if closed, (handshake={}) Unwrap response: {}", new Object[]{this, handshaking, unwrapResponse}); + if (unwrapResponse.getStatus().equals(Status.CLOSED)) { + // Drain the incoming TCP buffer + final ByteBuffer discardBuffer = ByteBuffer.allocate(8192); + int bytesDiscarded = channel.read(discardBuffer); + while (bytesDiscarded > 0) { + discardBuffer.clear(); + bytesDiscarded = channel.read(discardBuffer); + } + engine.closeInbound(); + } else { + streamInManager.compact(); + return false; + } + } catch (IOException e) { + logger.error("{} Failed to check if closed due to {}. Closing channel.", new Object[]{this, e}); + if (logger.isDebugEnabled()) { + logger.error("", e); + } + } + } + // either readCount is -1, indicating an end of stream, or the peer sent a closure handshake + // so go ahead and close down the channel + closeQuietly(channel.socket()); + closeQuietly(channel); + closed = true; + return true; + } + + @Override + public void close() throws IOException { + logger.debug("{} Closing Connection", this); + if (channel == null) { + return; + } + + if (closed) { + return; + } + + try { + engine.closeOutbound(); + + final byte[] emptyMessage = new byte[0]; + + final ByteBuffer appDataOut = ByteBuffer.wrap(emptyMessage); + final ByteBuffer outboundBuffer = streamOutManager.prepareForWrite(engine.getSession().getApplicationBufferSize()); + final SSLEngineResult handshakeResult = engine.wrap(appDataOut, outboundBuffer); + + if (handshakeResult.getStatus() != Status.CLOSED) { + throw new IOException("Invalid close state - will not send network data"); + } + + final ByteBuffer readableStreamOut = streamOutManager.prepareForRead(1); + writeFully(readableStreamOut); + } finally { + // Drain the incoming TCP buffer + final ByteBuffer discardBuffer = ByteBuffer.allocate(8192); + try { + int bytesDiscarded = channel.read(discardBuffer); + while (bytesDiscarded > 0) { + discardBuffer.clear(); + bytesDiscarded = channel.read(discardBuffer); + } + } catch (Exception e) { + } + + closeQuietly(channel.socket()); + closeQuietly(channel); + closed = true; + } + } + + private int copyFromAppDataBuffer(final byte[] buffer, final int offset, final int len) { + // If any data already exists in the application data buffer, copy it to the buffer. + final ByteBuffer appDataBuffer = appDataManager.prepareForRead(1); + + final int appDataRemaining = appDataBuffer.remaining(); + if (appDataRemaining > 0) { + final int bytesToCopy = Math.min(len, appDataBuffer.remaining()); + appDataBuffer.get(buffer, offset, bytesToCopy); + + final int bytesCopied = appDataRemaining - appDataBuffer.remaining(); + logger.trace("{} Copied {} ({}) bytes from unencrypted application buffer to user space", + new Object[]{this, bytesToCopy, bytesCopied}); + return bytesCopied; + } + return 0; + } + + public int available() throws IOException { + ByteBuffer appDataBuffer = appDataManager.prepareForRead(1); + ByteBuffer streamDataBuffer = streamInManager.prepareForRead(1); + final int buffered = appDataBuffer.remaining() + streamDataBuffer.remaining(); + if (buffered > 0) { + return buffered; + } + + final boolean wasAbleToRead = isDataAvailable(); + if (!wasAbleToRead) { + return 0; + } + + appDataBuffer = appDataManager.prepareForRead(1); + streamDataBuffer = streamInManager.prepareForRead(1); + return appDataBuffer.remaining() + streamDataBuffer.remaining(); + } + + public boolean isDataAvailable() throws IOException { + final ByteBuffer appDataBuffer = appDataManager.prepareForRead(1); + final ByteBuffer streamDataBuffer = streamInManager.prepareForRead(1); + + if (appDataBuffer.remaining() > 0 || streamDataBuffer.remaining() > 0) { + return true; + } + + final ByteBuffer writableBuffer = streamInManager.prepareForWrite(engine.getSession().getPacketBufferSize()); + final int bytesRead = channel.read(writableBuffer); + return (bytesRead > 0); + } + + public int read() throws IOException { + final int bytesRead = read(oneByteBuffer); + if (bytesRead == -1) { + return -1; + } + return oneByteBuffer[0] & 0xFF; + } + + public int read(final byte[] buffer) throws IOException { + return read(buffer, 0, buffer.length); + } + + public int read(final byte[] buffer, final int offset, final int len) throws IOException { + logger.debug("{} Reading up to {} bytes of data", this, len); + + if (!connected) { + connect(); + } + + int copied = copyFromAppDataBuffer(buffer, offset, len); + if (copied > 0) { + return copied; + } + + appDataManager.clear(); + + while (true) { + // prepare buffers and call unwrap + final ByteBuffer streamInBuffer = streamInManager.prepareForRead(1); + SSLEngineResult unwrapResponse = null; + final ByteBuffer appDataBuffer = appDataManager.prepareForWrite(engine.getSession().getApplicationBufferSize()); + unwrapResponse = engine.unwrap(streamInBuffer, appDataBuffer); + logger.trace("{} When reading data, (handshake={}) Unwrap response: {}", new Object[]{this, handshaking, unwrapResponse}); + + switch (unwrapResponse.getStatus()) { + case BUFFER_OVERFLOW: + throw new SSLHandshakeException("Buffer Overflow, which is not allowed to happen from an unwrap"); + case BUFFER_UNDERFLOW: { +// appDataManager.prepareForRead(engine.getSession().getApplicationBufferSize()); + + final ByteBuffer writableInBuffer = streamInManager.prepareForWrite(engine.getSession().getPacketBufferSize()); + final int bytesRead = readData(writableInBuffer); + if (bytesRead < 0) { + return -1; + } + + continue; + } + case CLOSED: + throw new IOException("Channel is closed"); + case OK: { + copied = copyFromAppDataBuffer(buffer, offset, len); + if (copied == 0) { + throw new IOException("Failed to decrypt data"); + } + streamInManager.compact(); + return copied; + } + } + } + } + + public void write(final int data) throws IOException { + write(new byte[]{(byte) data}, 0, 1); + } + + public void write(final byte[] data) throws IOException { + write(data, 0, data.length); + } + + public void write(final byte[] data, final int offset, final int len) throws IOException { + logger.debug("{} Writing {} bytes of data", this, len); + + if (!connected) { + connect(); + } + + int iterations = len / MAX_WRITE_SIZE; + if (len % MAX_WRITE_SIZE > 0) { + iterations++; + } + + for (int i = 0; i < iterations; i++) { + streamOutManager.clear(); + final int itrOffset = offset + i * MAX_WRITE_SIZE; + final int itrLen = Math.min(len - itrOffset, MAX_WRITE_SIZE); + final ByteBuffer byteBuffer = ByteBuffer.wrap(data, itrOffset, itrLen); + + final BufferStateManager buffMan = new BufferStateManager(byteBuffer, Direction.READ); + final Status status = encryptAndWriteFully(buffMan); + switch (status) { + case BUFFER_OVERFLOW: + streamOutManager.ensureSize(engine.getSession().getPacketBufferSize()); + appDataManager.ensureSize(engine.getSession().getApplicationBufferSize()); + continue; + case OK: + continue; + case CLOSED: + throw new IOException("Channel is closed"); + case BUFFER_UNDERFLOW: + throw new AssertionError("Got Buffer Underflow but should not have..."); + } + } + } + + public void interrupt() { + this.interrupted = true; + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/remote-communications-utils/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelInputStream.java ---------------------------------------------------------------------- diff --git a/commons/remote-communications-utils/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelInputStream.java b/commons/remote-communications-utils/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelInputStream.java new file mode 100644 index 0000000..154bd08 --- /dev/null +++ b/commons/remote-communications-utils/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelInputStream.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.remote.io.socket.ssl; + +import java.io.IOException; +import java.io.InputStream; + +public class SSLSocketChannelInputStream extends InputStream { + + private final SSLSocketChannel channel; + + public SSLSocketChannelInputStream(final SSLSocketChannel channel) { + this.channel = channel; + } + + @Override + public int read() throws IOException { + return channel.read(); + } + + @Override + public int read(final byte[] b) throws IOException { + return channel.read(b); + } + + @Override + public int read(final byte[] b, final int off, final int len) throws IOException { + return channel.read(b, off, len); + } + + /** + * Closes the underlying SSLSocketChannel, which will also close the + * OutputStream and connection + */ + @Override + public void close() throws IOException { + channel.close(); + } + + @Override + public int available() throws IOException { + return channel.available(); + } + + public boolean isDataAvailable() throws IOException { + return available() > 0; + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/remote-communications-utils/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelOutputStream.java ---------------------------------------------------------------------- diff --git a/commons/remote-communications-utils/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelOutputStream.java b/commons/remote-communications-utils/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelOutputStream.java new file mode 100644 index 0000000..ce4e420 --- /dev/null +++ b/commons/remote-communications-utils/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelOutputStream.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.remote.io.socket.ssl; + +import java.io.IOException; +import java.io.OutputStream; + +public class SSLSocketChannelOutputStream extends OutputStream { + + private final SSLSocketChannel channel; + + public SSLSocketChannelOutputStream(final SSLSocketChannel channel) { + this.channel = channel; + } + + @Override + public void write(final int b) throws IOException { + channel.write(b); + } + + @Override + public void write(byte[] b) throws IOException { + channel.write(b); + } + + @Override + public void write(byte[] b, int off, int len) throws IOException { + channel.write(b, off, len); + } + + /** + * Closes the underlying SSLSocketChannel, which also will close the + * InputStream and the connection + */ + @Override + public void close() throws IOException { + channel.close(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/remote-communications-utils/src/test/java/org/apache/nifi/remote/io/TestCompressionInputOutputStreams.java ---------------------------------------------------------------------- diff --git a/commons/remote-communications-utils/src/test/java/org/apache/nifi/remote/io/TestCompressionInputOutputStreams.java b/commons/remote-communications-utils/src/test/java/org/apache/nifi/remote/io/TestCompressionInputOutputStreams.java new file mode 100644 index 0000000..bd30a96 --- /dev/null +++ b/commons/remote-communications-utils/src/test/java/org/apache/nifi/remote/io/TestCompressionInputOutputStreams.java @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.remote.io; + +import static org.junit.Assert.assertTrue; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.Arrays; + +import org.apache.nifi.remote.io.CompressionInputStream; +import org.apache.nifi.remote.io.CompressionOutputStream; + +import org.junit.Test; + +public class TestCompressionInputOutputStreams { + + @Test + public void testSimple() throws IOException { + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + + final byte[] data = "Hello, World!".getBytes("UTF-8"); + + final CompressionOutputStream cos = new CompressionOutputStream(baos); + cos.write(data); + cos.flush(); + cos.close(); + + final byte[] compressedBytes = baos.toByteArray(); + final CompressionInputStream cis = new CompressionInputStream(new ByteArrayInputStream(compressedBytes)); + final byte[] decompressed = readFully(cis); + + assertTrue(Arrays.equals(data, decompressed)); + } + + @Test + public void testDataLargerThanBuffer() throws IOException { + final String str = "The quick brown fox jumps over the lazy dog\r\n\n\n\r"; + + final StringBuilder sb = new StringBuilder(); + for (int i = 0; i < 100; i++) { + sb.append(str); + } + final byte[] data = sb.toString().getBytes("UTF-8"); + + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + + final CompressionOutputStream cos = new CompressionOutputStream(baos, 8192); + cos.write(data); + cos.flush(); + cos.close(); + + final byte[] compressedBytes = baos.toByteArray(); + final CompressionInputStream cis = new CompressionInputStream(new ByteArrayInputStream(compressedBytes)); + final byte[] decompressed = readFully(cis); + + assertTrue(Arrays.equals(data, decompressed)); + } + + @Test + public void testDataLargerThanBufferWhileFlushing() throws IOException { + final String str = "The quick brown fox jumps over the lazy dog\r\n\n\n\r"; + final byte[] data = str.getBytes("UTF-8"); + + final StringBuilder sb = new StringBuilder(); + final byte[] data1024; + + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + + final CompressionOutputStream cos = new CompressionOutputStream(baos, 8192); + for (int i = 0; i < 1024; i++) { + cos.write(data); + cos.flush(); + sb.append(str); + } + cos.close(); + data1024 = sb.toString().getBytes("UTF-8"); + + final byte[] compressedBytes = baos.toByteArray(); + final CompressionInputStream cis = new CompressionInputStream(new ByteArrayInputStream(compressedBytes)); + final byte[] decompressed = readFully(cis); + + assertTrue(Arrays.equals(data1024, decompressed)); + } + + @Test + public void testSendingMultipleFilesBackToBackOnSameStream() throws IOException { + final String str = "The quick brown fox jumps over the lazy dog\r\n\n\n\r"; + final byte[] data = str.getBytes("UTF-8"); + + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + + final CompressionOutputStream cos = new CompressionOutputStream(baos, 8192); + for (int i = 0; i < 512; i++) { + cos.write(data); + cos.flush(); + } + cos.close(); + + final CompressionOutputStream cos2 = new CompressionOutputStream(baos, 8192); + for (int i = 0; i < 512; i++) { + cos2.write(data); + cos2.flush(); + } + cos2.close(); + + final byte[] data512; + final StringBuilder sb = new StringBuilder(); + for (int i = 0; i < 512; i++) { + sb.append(str); + } + data512 = sb.toString().getBytes("UTF-8"); + + final byte[] compressedBytes = baos.toByteArray(); + final ByteArrayInputStream bais = new ByteArrayInputStream(compressedBytes); + + final CompressionInputStream cis = new CompressionInputStream(bais); + final byte[] decompressed = readFully(cis); + assertTrue(Arrays.equals(data512, decompressed)); + + final CompressionInputStream cis2 = new CompressionInputStream(bais); + final byte[] decompressed2 = readFully(cis2); + assertTrue(Arrays.equals(data512, decompressed2)); + } + + private byte[] readFully(final InputStream in) throws IOException { + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + + final byte[] buffer = new byte[65536]; + int len; + while ((len = in.read(buffer)) >= 0) { + baos.write(buffer, 0, len); + } + + return baos.toByteArray(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/search-utils/pom.xml ---------------------------------------------------------------------- diff --git a/commons/search-utils/pom.xml b/commons/search-utils/pom.xml new file mode 100644 index 0000000..569958f --- /dev/null +++ b/commons/search-utils/pom.xml @@ -0,0 +1,32 @@ +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + --> + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-parent</artifactId> + <version>0.0.1-SNAPSHOT</version> + </parent> + + <artifactId>nifi-search-utils</artifactId> + <version>0.0.1-SNAPSHOT</version> + <packaging>jar</packaging> + + <name>search-utils</name> + + <dependencies> + </dependencies> +</project> http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/search-utils/src/main/java/org/apache/nifi/util/search/Search.java ---------------------------------------------------------------------- diff --git a/commons/search-utils/src/main/java/org/apache/nifi/util/search/Search.java b/commons/search-utils/src/main/java/org/apache/nifi/util/search/Search.java new file mode 100644 index 0000000..59b444a --- /dev/null +++ b/commons/search-utils/src/main/java/org/apache/nifi/util/search/Search.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.util.search; + +import java.io.IOException; +import java.io.InputStream; +import java.util.Set; + +import org.apache.nifi.util.search.ahocorasick.SearchState; + +/** + * Defines an interface to search for content given a set of search terms. Any + * implementation of search must be thread safe. + * + * @author + * @param <T> + */ +public interface Search<T> { + + /** + * Establishes the dictionary of terms which will be searched in subsequent + * search calls. This can be called only once + * + * @param terms + */ + void initializeDictionary(Set<SearchTerm<T>> terms); + + /** + * Searches the given input stream for matches between the already specified + * dictionary and the contents scanned. + * + * @param haystack + * @param findAll if true will find all matches if false will find only the + * first match + * @return SearchState containing results Map might be empty which indicates + * no matches found but will not be null + * @throws IOException Thrown for any exceptions occurring while searching. + * @throws IllegalStateException if the dictionary has not yet been + * initialized + */ + SearchState<T> search(InputStream haystack, boolean findAll) throws IOException; + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/search-utils/src/main/java/org/apache/nifi/util/search/SearchTerm.java ---------------------------------------------------------------------- diff --git a/commons/search-utils/src/main/java/org/apache/nifi/util/search/SearchTerm.java b/commons/search-utils/src/main/java/org/apache/nifi/util/search/SearchTerm.java new file mode 100644 index 0000000..62de964 --- /dev/null +++ b/commons/search-utils/src/main/java/org/apache/nifi/util/search/SearchTerm.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.util.search; + +import java.nio.charset.Charset; +import java.util.Arrays; + +/** + * This is an immutable thread safe object representing a search term + * + * @author + * @param <T> + */ +public class SearchTerm<T> { + + private final byte[] bytes; + private final int hashCode; + private final T reference; + + /** + * Constructs a SearchTerm. Defensively copies the given byte array + * + * @param bytes + * @throws IllegalArgument exception if given bytes are null or 0 length + */ + public SearchTerm(final byte[] bytes) { + this(bytes, true, null); + } + + /** + * Constructs a search term. Optionally performs a defensive copy of the + * given byte array. If the caller indicates a defensive copy is not + * necessary then they must not change the given arrays state any longer + * + * @param bytes + * @param defensiveCopy + * @param reference + */ + public SearchTerm(final byte[] bytes, final boolean defensiveCopy, final T reference) { + if (bytes == null || bytes.length == 0) { + throw new IllegalArgumentException(); + } + if (defensiveCopy) { + this.bytes = Arrays.copyOf(bytes, bytes.length); + } else { + this.bytes = bytes; + } + this.hashCode = Arrays.hashCode(this.bytes); + this.reference = reference; + } + + public int get(final int index) { + return bytes[index] & 0xff; + } + + /** + * @return size in of search term in bytes + */ + public int size() { + return bytes.length; + } + + /** + * @return reference object for this given search term + */ + public T getReference() { + return reference; + } + + /** + * Determines if the given window starts with the same bytes as this term + * + * @param window Current window of bytes from the haystack being evaluated. + * @param windowLength The length of the window to consider + * @return true if this term starts with the same bytes of the given window + */ + public boolean startsWith(byte[] window, int windowLength) { + if (windowLength > window.length) { + throw new IndexOutOfBoundsException(); + } + if (bytes.length < windowLength) { + return false; + } + for (int i = 0; i < bytes.length && i < windowLength; i++) { + if (bytes[i] != window[i]) { + return false; + } + } + return true; + } + + /** + * @return a defensive copy of the internal byte structure + */ + public byte[] getBytes() { + return Arrays.copyOf(bytes, bytes.length); + } + + @Override + public int hashCode() { + return hashCode; + } + + @Override + public boolean equals(final Object obj) { + if (obj == null) { + return false; + } + if (getClass() != obj.getClass()) { + return false; + } + final SearchTerm other = (SearchTerm) obj; + if (this.hashCode != other.hashCode) { + return false; + } + return Arrays.equals(this.bytes, other.bytes); + } + + @Override + public String toString() { + return new String(bytes); + } + + public String toString(final Charset charset) { + return new String(bytes, charset); + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/search-utils/src/main/java/org/apache/nifi/util/search/ahocorasick/AhoCorasick.java ---------------------------------------------------------------------- diff --git a/commons/search-utils/src/main/java/org/apache/nifi/util/search/ahocorasick/AhoCorasick.java b/commons/search-utils/src/main/java/org/apache/nifi/util/search/ahocorasick/AhoCorasick.java new file mode 100644 index 0000000..3b8afaf --- /dev/null +++ b/commons/search-utils/src/main/java/org/apache/nifi/util/search/ahocorasick/AhoCorasick.java @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.util.search.ahocorasick; + +import java.io.IOException; +import java.io.InputStream; +import java.util.LinkedList; +import java.util.Queue; +import java.util.Set; + +import org.apache.nifi.util.search.Search; +import org.apache.nifi.util.search.SearchTerm; + +public class AhoCorasick<T> implements Search<T> { + + private Node root = null; + + /** + * Constructs a new search object. + * + * @throws IllegalArgumentException if given terms are null or empty + */ + public AhoCorasick() { + } + + @Override + public void initializeDictionary(final Set<SearchTerm<T>> terms) { + if (root != null) { + throw new IllegalStateException(); + } + root = new Node(); + if (terms == null || terms.isEmpty()) { + throw new IllegalArgumentException(); + } + for (final SearchTerm<T> term : terms) { + int i = 0; + Node nextNode = root; + while (true) { + nextNode = addMatch(term, i, nextNode); + if (nextNode == null) { + break; //we're done + } + i++; + } + } + initialize(); + } + + private Node addMatch(final SearchTerm<T> term, final int offset, final Node current) { + final int index = term.get(offset); + boolean atEnd = (offset == (term.size() - 1)); + if (current.getNeighbor(index) == null) { + if (atEnd) { + current.setNeighbor(new Node(term), index); + return null; + } + current.setNeighbor(new Node(), index); + } else if (atEnd) { + current.getNeighbor(index).setMatchingTerm(term); + return null; + } + return current.getNeighbor(index); + } + + private void initialize() { + //perform bgs to build failure links + final Queue<Node> queue = new LinkedList<>(); + queue.add(root); + root.setFailureNode(null); + while (!queue.isEmpty()) { + final Node current = queue.poll(); + for (int i = 0; i < 256; i++) { + final Node next = current.getNeighbor(i); + if (next != null) { + //traverse failure to get state + Node fail = current.getFailureNode(); + while ((fail != null) && fail.getNeighbor(i) == null) { + fail = fail.getFailureNode(); + } + if (fail != null) { + next.setFailureNode(fail.getNeighbor(i)); + } else { + next.setFailureNode(root); + } + queue.add(next); + } + } + } + } + + @Override + public SearchState search(final InputStream stream, final boolean findAll) throws IOException { + return search(stream, findAll, null); + } + + private SearchState search(final InputStream stream, final boolean findAll, final SearchState state) throws IOException { + if (root == null) { + throw new IllegalStateException(); + } + final SearchState<T> currentState = (state == null) ? new SearchState(root) : state; + if (!findAll && currentState.foundMatch()) { + throw new IllegalStateException("A match has already been found yet we're being asked to keep searching"); + } + Node current = currentState.getCurrentNode(); + int currentChar; + while ((currentChar = stream.read()) >= 0) { + currentState.incrementBytesRead(1L); + Node next = current.getNeighbor(currentChar); + if (next == null) { + next = current.getFailureNode(); + while ((next != null) && next.getNeighbor(currentChar) == null) { + next = next.getFailureNode(); + } + if (next != null) { + next = next.getNeighbor(currentChar); + } else { + next = root; + } + } + if (next == null) { + throw new IllegalStateException("tree out of sync"); + } + //Accept condition + if (next.hasMatch()) { + currentState.addResult(next.getMatchingTerm()); + } + for (Node failNode = next.getFailureNode(); failNode != null; failNode = failNode.getFailureNode()) { + if (failNode.hasMatch()) { + currentState.addResult(failNode.getMatchingTerm()); + } + } + current = next; + if (currentState.foundMatch() && !findAll) { + break;//give up as soon as we have at least one match + } + } + currentState.setCurrentNode(current); + return currentState; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/search-utils/src/main/java/org/apache/nifi/util/search/ahocorasick/Node.java ---------------------------------------------------------------------- diff --git a/commons/search-utils/src/main/java/org/apache/nifi/util/search/ahocorasick/Node.java b/commons/search-utils/src/main/java/org/apache/nifi/util/search/ahocorasick/Node.java new file mode 100644 index 0000000..0ac325c --- /dev/null +++ b/commons/search-utils/src/main/java/org/apache/nifi/util/search/ahocorasick/Node.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.util.search.ahocorasick; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.nifi.util.search.SearchTerm; + +/** + * + * @author + */ +public class Node { + + private final Map<Integer, Node> neighborMap; + private Node failureNode; + private SearchTerm<?> term; + + Node(final SearchTerm<?> term) { + this(); + this.term = term; + } + + Node() { + neighborMap = new HashMap<>(); + term = null; + } + + void setFailureNode(final Node fail) { + failureNode = fail; + } + + public Node getFailureNode() { + return failureNode; + } + + public boolean hasMatch() { + return term != null; + } + + void setMatchingTerm(final SearchTerm<?> term) { + this.term = term; + } + + public SearchTerm<?> getMatchingTerm() { + return term; + } + + public Node getNeighbor(final int index) { + return neighborMap.get(index); + } + + void setNeighbor(final Node neighbor, final int index) { + neighborMap.put(index, neighbor); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/search-utils/src/main/java/org/apache/nifi/util/search/ahocorasick/SearchState.java ---------------------------------------------------------------------- diff --git a/commons/search-utils/src/main/java/org/apache/nifi/util/search/ahocorasick/SearchState.java b/commons/search-utils/src/main/java/org/apache/nifi/util/search/ahocorasick/SearchState.java new file mode 100644 index 0000000..6d36ad0 --- /dev/null +++ b/commons/search-utils/src/main/java/org/apache/nifi/util/search/ahocorasick/SearchState.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.util.search.ahocorasick; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.nifi.util.search.SearchTerm; + +public class SearchState<T> { + + private Node currentNode; + private final Map<SearchTerm<T>, List<Long>> resultMap; + private long bytesRead; + + SearchState(final Node rootNode) { + resultMap = new HashMap<>(5); + currentNode = rootNode; + bytesRead = 0L; + } + + void incrementBytesRead(final long increment) { + bytesRead += increment; + } + + void setCurrentNode(final Node curr) { + currentNode = curr; + } + + public Node getCurrentNode() { + return currentNode; + } + + public Map<SearchTerm<T>, List<Long>> getResults() { + return new HashMap<>(resultMap); + } + + void addResult(final SearchTerm matchingTerm) { + final List<Long> indexes = (resultMap.containsKey(matchingTerm)) ? resultMap.get(matchingTerm) : new ArrayList<Long>(5); + indexes.add(bytesRead); + resultMap.put(matchingTerm, indexes); + } + + public boolean foundMatch() { + return !resultMap.isEmpty(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/wali/.gitignore ---------------------------------------------------------------------- diff --git a/commons/wali/.gitignore b/commons/wali/.gitignore new file mode 100755 index 0000000..19f2e00 --- /dev/null +++ b/commons/wali/.gitignore @@ -0,0 +1,2 @@ +/target +/target http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4d998c12/commons/wali/pom.xml ---------------------------------------------------------------------- diff --git a/commons/wali/pom.xml b/commons/wali/pom.xml new file mode 100644 index 0000000..ce04973 --- /dev/null +++ b/commons/wali/pom.xml @@ -0,0 +1,42 @@ +<?xml version="1.0"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-parent</artifactId> + <version>0.0.1-SNAPSHOT</version> + </parent> + + <groupId>wali</groupId> + <artifactId>wali</artifactId> + + <version>3.0.0-SNAPSHOT</version> + <packaging>jar</packaging> + <name>WALI : Write-Ahead Log Implementation</name> + + <dependencies> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-stream-utils</artifactId> + <version>0.0.1-SNAPSHOT</version> + </dependency> + </dependencies> +</project>