http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/fdf75846/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/codec/StandardFlowFileCodec.java ---------------------------------------------------------------------- diff --git a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/codec/StandardFlowFileCodec.java b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/codec/StandardFlowFileCodec.java new file mode 100644 index 0000000..d18a4ee --- /dev/null +++ b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/codec/StandardFlowFileCodec.java @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.remote.codec; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.stream.io.StreamUtils; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.io.InputStreamCallback; +import org.apache.nifi.processor.io.OutputStreamCallback; +import org.apache.nifi.remote.StandardVersionNegotiator; +import org.apache.nifi.remote.VersionNegotiator; +import org.apache.nifi.remote.exception.ProtocolException; + +public class StandardFlowFileCodec implements FlowFileCodec { + public static final int MAX_NUM_ATTRIBUTES = 25000; + + public static final String DEFAULT_FLOWFILE_PATH = "./"; + + private final VersionNegotiator versionNegotiator; + + public StandardFlowFileCodec() { + versionNegotiator = new StandardVersionNegotiator(1); + } + + @Override + public FlowFile encode(final FlowFile flowFile, final ProcessSession session, final OutputStream encodedOut) throws IOException { + final DataOutputStream out = new DataOutputStream(encodedOut); + + final Map<String, String> attributes = flowFile.getAttributes(); + out.writeInt(attributes.size()); + for ( final Map.Entry<String, String> entry : attributes.entrySet() ) { + writeString(entry.getKey(), out); + writeString(entry.getValue(), out); + } + + out.writeLong(flowFile.getSize()); + + session.read(flowFile, new InputStreamCallback() { + @Override + public void process(final InputStream in) throws IOException { + final byte[] buffer = new byte[8192]; + int len; + while ( (len = in.read(buffer)) > 0 ) { + encodedOut.write(buffer, 0, len); + } + + encodedOut.flush(); + } + }); + + return flowFile; + } + + + @Override + public FlowFile decode(final InputStream stream, final ProcessSession session) throws IOException, ProtocolException { + final DataInputStream in = new DataInputStream(stream); + + final int numAttributes; + try { + numAttributes = in.readInt(); + } catch (final EOFException e) { + // we're out of data. + return null; + } + + // This is here because if the stream is not properly formed, we could get up to Integer.MAX_VALUE attributes, which will + // generally result in an OutOfMemoryError. + if ( numAttributes > MAX_NUM_ATTRIBUTES ) { + throw new ProtocolException("FlowFile exceeds maximum number of attributes with a total of " + numAttributes); + } + + try { + final Map<String, String> attributes = new HashMap<>(numAttributes); + for (int i=0; i < numAttributes; i++) { + final String attrName = readString(in); + final String attrValue = readString(in); + attributes.put(attrName, attrValue); + } + + final long numBytes = in.readLong(); + + FlowFile flowFile = session.create(); + flowFile = session.putAllAttributes(flowFile, attributes); + flowFile = session.write(flowFile, new OutputStreamCallback() { + @Override + public void process(final OutputStream out) throws IOException { + int len; + long size = 0; + final byte[] buffer = new byte[8192]; + + while ( size < numBytes && (len = in.read(buffer, 0, (int) Math.min(buffer.length, numBytes - size))) > 0 ) { + out.write(buffer, 0, len); + size += len; + } + + if ( size != numBytes ) { + throw new EOFException("Expected " + numBytes + " bytes but received only " + size); + } + } + }); + + return flowFile; + } catch (final EOFException e) { + session.rollback(); + + // we throw the general IOException here because we did not expect to hit EOFException + throw e; + } + } + + private void writeString(final String val, final DataOutputStream out) throws IOException { + final byte[] bytes = val.getBytes("UTF-8"); + out.writeInt(bytes.length); + out.write(bytes); + } + + + private String readString(final DataInputStream in) throws IOException { + final int numBytes = in.readInt(); + final byte[] bytes = new byte[numBytes]; + StreamUtils.fillBuffer(in, bytes, true); + return new String(bytes, "UTF-8"); + } + + @Override + public List<Integer> getSupportedVersions() { + return versionNegotiator.getSupportedVersions(); + } + + @Override + public VersionNegotiator getVersionNegotiator() { + return versionNegotiator; + } + + @Override + public String toString() { + return "Standard FlowFile Codec, Version " + versionNegotiator.getVersion(); + } + + @Override + public String getResourceName() { + return "StandardFlowFileCodec"; + } +}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/fdf75846/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/exception/BadRequestException.java ---------------------------------------------------------------------- diff --git a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/exception/BadRequestException.java b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/exception/BadRequestException.java new file mode 100644 index 0000000..f6c2f4f --- /dev/null +++ b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/exception/BadRequestException.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.remote.exception; + +public class BadRequestException extends Exception { + + private static final long serialVersionUID = -8034602852256106560L; + + public BadRequestException(final String message) { + super(message); + } + + public BadRequestException(final Throwable cause) { + super(cause); + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/fdf75846/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/exception/HandshakeException.java ---------------------------------------------------------------------- diff --git a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/exception/HandshakeException.java b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/exception/HandshakeException.java new file mode 100644 index 0000000..b61fc65 --- /dev/null +++ b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/exception/HandshakeException.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.remote.exception; + +public class HandshakeException extends Exception { + + private static final long serialVersionUID = 178192341908726L; + + public HandshakeException(final String message) { + super(message); + } + + public HandshakeException(final Throwable cause) { + super(cause); + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/fdf75846/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/exception/NotAuthorizedException.java ---------------------------------------------------------------------- diff --git a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/exception/NotAuthorizedException.java b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/exception/NotAuthorizedException.java new file mode 100644 index 0000000..24ff3a5 --- /dev/null +++ b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/exception/NotAuthorizedException.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.remote.exception; + +public class NotAuthorizedException extends Exception { + + private static final long serialVersionUID = 2952623568114035498L; + + public NotAuthorizedException(final String message) { + super(message); + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/fdf75846/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/exception/PortNotRunningException.java ---------------------------------------------------------------------- diff --git a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/exception/PortNotRunningException.java b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/exception/PortNotRunningException.java new file mode 100644 index 0000000..af0f467 --- /dev/null +++ b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/exception/PortNotRunningException.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.remote.exception; + +public class PortNotRunningException extends Exception { + + private static final long serialVersionUID = -2790940982005516375L; + + public PortNotRunningException(final String message) { + super(message); + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/fdf75846/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/exception/ProtocolException.java ---------------------------------------------------------------------- diff --git a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/exception/ProtocolException.java b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/exception/ProtocolException.java new file mode 100644 index 0000000..0f50b98 --- /dev/null +++ b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/exception/ProtocolException.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.remote.exception; + +public class ProtocolException extends Exception { + + private static final long serialVersionUID = 5763900324505818495L; + + public ProtocolException(final String message, final Throwable cause) { + super(message, cause); + } + + public ProtocolException(final String message) { + super(message); + } + + public ProtocolException(final Throwable cause) { + super(cause); + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/fdf75846/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/exception/RequestExpiredException.java ---------------------------------------------------------------------- diff --git a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/exception/RequestExpiredException.java b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/exception/RequestExpiredException.java new file mode 100644 index 0000000..dd675b3 --- /dev/null +++ b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/exception/RequestExpiredException.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.remote.exception; + +/** + * Used to indicate that by the time the request was serviced, it had already + * expired + */ +public class RequestExpiredException extends Exception { + + private static final long serialVersionUID = -7037025330562827852L; +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/fdf75846/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/exception/UnknownPortException.java ---------------------------------------------------------------------- diff --git a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/exception/UnknownPortException.java b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/exception/UnknownPortException.java new file mode 100644 index 0000000..e6a0fe7 --- /dev/null +++ b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/exception/UnknownPortException.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.remote.exception; + +public class UnknownPortException extends Exception { + + private static final long serialVersionUID = -2790940982005516375L; + + public UnknownPortException(final String message) { + super(message); + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/fdf75846/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/exception/UnsupportedCodecException.java ---------------------------------------------------------------------- diff --git a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/exception/UnsupportedCodecException.java b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/exception/UnsupportedCodecException.java new file mode 100644 index 0000000..926809c --- /dev/null +++ b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/exception/UnsupportedCodecException.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.remote.exception; + +import org.apache.nifi.remote.codec.FlowFileCodec; + +public class UnsupportedCodecException extends RuntimeException { + private static final long serialVersionUID = 198234789237L; + + public UnsupportedCodecException(final String codecName) { + super("Codec " + codecName + " is not supported"); + } + + public UnsupportedCodecException(final FlowFileCodec codec, final int version) { + super("Codec " + codec.getClass().getName() + " does not support Version " + version); + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/fdf75846/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelCommunicationsSession.java ---------------------------------------------------------------------- diff --git a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelCommunicationsSession.java b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelCommunicationsSession.java new file mode 100644 index 0000000..0822b6a --- /dev/null +++ b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelCommunicationsSession.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.remote.io.socket; + +import java.io.IOException; +import java.nio.channels.SocketChannel; + +import org.apache.nifi.remote.AbstractCommunicationsSession; + +public class SocketChannelCommunicationsSession extends AbstractCommunicationsSession { + private final SocketChannel channel; + private final SocketChannelInput request; + private final SocketChannelOutput response; + private int timeout = 30000; + + public SocketChannelCommunicationsSession(final SocketChannel socketChannel, final String uri) throws IOException { + super(uri); + request = new SocketChannelInput(socketChannel); + response = new SocketChannelOutput(socketChannel); + channel = socketChannel; + socketChannel.configureBlocking(false); + } + + @Override + public boolean isClosed() { + return !channel.isConnected(); + } + + @Override + public SocketChannelInput getInput() { + return request; + } + + @Override + public SocketChannelOutput getOutput() { + return response; + } + + @Override + public void setTimeout(final int millis) throws IOException { + request.setTimeout(millis); + response.setTimeout(millis); + this.timeout = millis; + } + + @Override + public int getTimeout() throws IOException { + return timeout; + } + + @Override + public void close() throws IOException { + channel.close(); + } + + @Override + public boolean isDataAvailable() { + return request.isDataAvailable(); + } + + @Override + public long getBytesWritten() { + return response.getBytesWritten(); + } + + @Override + public long getBytesRead() { + return request.getBytesRead(); + } + + @Override + public void interrupt() { + request.interrupt(); + response.interrupt(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/fdf75846/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelInput.java ---------------------------------------------------------------------- diff --git a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelInput.java b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelInput.java new file mode 100644 index 0000000..9e451fd --- /dev/null +++ b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelInput.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.remote.io.socket; + +import java.io.IOException; +import java.io.InputStream; +import java.nio.channels.SocketChannel; + +import org.apache.nifi.stream.io.BufferedInputStream; +import org.apache.nifi.stream.io.ByteCountingInputStream; +import org.apache.nifi.remote.io.InterruptableInputStream; +import org.apache.nifi.remote.protocol.CommunicationsInput; + +public class SocketChannelInput implements CommunicationsInput { + private final SocketChannelInputStream socketIn; + private final ByteCountingInputStream countingIn; + private final InputStream bufferedIn; + private final InterruptableInputStream interruptableIn; + + public SocketChannelInput(final SocketChannel socketChannel) throws IOException { + this.socketIn = new SocketChannelInputStream(socketChannel); + countingIn = new ByteCountingInputStream(socketIn); + bufferedIn = new BufferedInputStream(countingIn); + interruptableIn = new InterruptableInputStream(bufferedIn); + } + + @Override + public InputStream getInputStream() throws IOException { + return interruptableIn; + } + + public void setTimeout(final int millis) { + socketIn.setTimeout(millis); + } + + public boolean isDataAvailable() { + try { + return interruptableIn.available() > 0; + } catch (final Exception e) { + return false; + } + } + + @Override + public long getBytesRead() { + return countingIn.getBytesRead(); + } + + public void interrupt() { + interruptableIn.interrupt(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/fdf75846/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelOutput.java ---------------------------------------------------------------------- diff --git a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelOutput.java b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelOutput.java new file mode 100644 index 0000000..26c0164 --- /dev/null +++ b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelOutput.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.remote.io.socket; + +import java.io.IOException; +import java.io.OutputStream; +import java.nio.channels.SocketChannel; + +import org.apache.nifi.stream.io.BufferedOutputStream; +import org.apache.nifi.stream.io.ByteCountingOutputStream; +import org.apache.nifi.remote.io.InterruptableOutputStream; +import org.apache.nifi.remote.protocol.CommunicationsOutput; + +public class SocketChannelOutput implements CommunicationsOutput { + private final SocketChannelOutputStream socketOutStream; + private final ByteCountingOutputStream countingOut; + private final OutputStream bufferedOut; + private final InterruptableOutputStream interruptableOut; + + public SocketChannelOutput(final SocketChannel socketChannel) throws IOException { + socketOutStream = new SocketChannelOutputStream(socketChannel); + countingOut = new ByteCountingOutputStream(socketOutStream); + bufferedOut = new BufferedOutputStream(countingOut); + interruptableOut = new InterruptableOutputStream(bufferedOut); + } + + @Override + public OutputStream getOutputStream() throws IOException { + return interruptableOut; + } + + public void setTimeout(final int timeout) { + socketOutStream.setTimeout(timeout); + } + + @Override + public long getBytesWritten() { + return countingOut.getBytesWritten(); + } + + public void interrupt() { + interruptableOut.interrupt(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/fdf75846/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelCommunicationsSession.java ---------------------------------------------------------------------- diff --git a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelCommunicationsSession.java b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelCommunicationsSession.java new file mode 100644 index 0000000..dca1d84 --- /dev/null +++ b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelCommunicationsSession.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.remote.io.socket.ssl; + +import java.io.IOException; + +import org.apache.nifi.remote.AbstractCommunicationsSession; + +public class SSLSocketChannelCommunicationsSession extends AbstractCommunicationsSession { + private final SSLSocketChannel channel; + private final SSLSocketChannelInput request; + private final SSLSocketChannelOutput response; + + public SSLSocketChannelCommunicationsSession(final SSLSocketChannel channel, final String uri) { + super(uri); + request = new SSLSocketChannelInput(channel); + response = new SSLSocketChannelOutput(channel); + this.channel = channel; + } + + @Override + public SSLSocketChannelInput getInput() { + return request; + } + + @Override + public SSLSocketChannelOutput getOutput() { + return response; + } + + @Override + public void setTimeout(final int millis) throws IOException { + channel.setTimeout(millis); + } + + @Override + public int getTimeout() throws IOException { + return channel.getTimeout(); + } + + @Override + public void close() throws IOException { + channel.close(); + } + + @Override + public boolean isClosed() { + return channel.isClosed(); + } + + @Override + public boolean isDataAvailable() { + try { + return request.isDataAvailable(); + } catch (final Exception e) { + return false; + } + } + + @Override + public long getBytesWritten() { + return response.getBytesWritten(); + } + + @Override + public long getBytesRead() { + return request.getBytesRead(); + } + + @Override + public void interrupt() { + channel.interrupt(); + } + + @Override + public String toString() { + return super.toString() + "[SSLSocketChannel=" + channel + "]"; + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/fdf75846/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelInput.java ---------------------------------------------------------------------- diff --git a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelInput.java b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelInput.java new file mode 100644 index 0000000..60ef33f --- /dev/null +++ b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelInput.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.remote.io.socket.ssl; + +import java.io.IOException; +import java.io.InputStream; + +import org.apache.nifi.stream.io.BufferedInputStream; +import org.apache.nifi.stream.io.ByteCountingInputStream; +import org.apache.nifi.remote.protocol.CommunicationsInput; + +public class SSLSocketChannelInput implements CommunicationsInput { + private final SSLSocketChannelInputStream in; + private final ByteCountingInputStream countingIn; + private final InputStream bufferedIn; + + public SSLSocketChannelInput(final SSLSocketChannel socketChannel) { + in = new SSLSocketChannelInputStream(socketChannel); + countingIn = new ByteCountingInputStream(in); + this.bufferedIn = new BufferedInputStream(countingIn); + } + + @Override + public InputStream getInputStream() throws IOException { + return bufferedIn; + } + + public boolean isDataAvailable() throws IOException { + return bufferedIn.available() > 0; + } + + @Override + public long getBytesRead() { + return countingIn.getBytesRead(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/fdf75846/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelOutput.java ---------------------------------------------------------------------- diff --git a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelOutput.java b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelOutput.java new file mode 100644 index 0000000..dc3d68f --- /dev/null +++ b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelOutput.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.remote.io.socket.ssl; + +import java.io.IOException; +import java.io.OutputStream; + +import org.apache.nifi.stream.io.BufferedOutputStream; +import org.apache.nifi.stream.io.ByteCountingOutputStream; +import org.apache.nifi.remote.protocol.CommunicationsOutput; + +public class SSLSocketChannelOutput implements CommunicationsOutput { + private final OutputStream out; + private final ByteCountingOutputStream countingOut; + + public SSLSocketChannelOutput(final SSLSocketChannel channel) { + countingOut = new ByteCountingOutputStream(new SSLSocketChannelOutputStream(channel)); + out = new BufferedOutputStream(countingOut); + } + + @Override + public OutputStream getOutputStream() throws IOException { + return out; + } + + @Override + public long getBytesWritten() { + return countingOut.getBytesWritten(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/fdf75846/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/ClientProtocol.java ---------------------------------------------------------------------- diff --git a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/ClientProtocol.java b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/ClientProtocol.java new file mode 100644 index 0000000..32274eb --- /dev/null +++ b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/ClientProtocol.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.remote.protocol; + +import java.io.IOException; +import java.util.Set; + +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.remote.Peer; +import org.apache.nifi.remote.PeerStatus; +import org.apache.nifi.remote.VersionedRemoteResource; +import org.apache.nifi.remote.codec.FlowFileCodec; +import org.apache.nifi.remote.exception.HandshakeException; +import org.apache.nifi.remote.exception.PortNotRunningException; +import org.apache.nifi.remote.exception.ProtocolException; +import org.apache.nifi.remote.exception.UnknownPortException; + +public interface ClientProtocol extends VersionedRemoteResource { + + void handshake(Peer peer) throws IOException, HandshakeException, UnknownPortException, PortNotRunningException; + + Set<PeerStatus> getPeerStatuses(Peer currentPeer) throws IOException, ProtocolException; + + FlowFileCodec negotiateCodec(Peer peer) throws IOException, ProtocolException; + + void receiveFlowFiles(Peer peer, ProcessContext context, ProcessSession session, FlowFileCodec codec) throws IOException, ProtocolException; + + void transferFlowFiles(Peer peer, ProcessContext context, ProcessSession session, FlowFileCodec codec) throws IOException, ProtocolException; + + void shutdown(Peer peer) throws IOException, ProtocolException; + + boolean isReadyForFileTransfer(); + + /** + * returns <code>true</code> if remote instance indicates that the port is + * invalid + * + * @return + * @throws IllegalStateException if a handshake has not successfully + * completed + */ + boolean isPortInvalid() throws IllegalStateException; + + /** + * returns <code>true</code> if remote instance indicates that the port is + * unknown + * + * @return + * @throws IllegalStateException if a handshake has not successfully + * completed + */ + boolean isPortUnknown(); + + /** + * returns <code>true</code> if remote instance indicates that the port's + * destination is full + * + * @return + * @throws IllegalStateException if a handshake has not successfully + * completed + */ + boolean isDestinationFull(); +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/fdf75846/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/CommunicationsInput.java ---------------------------------------------------------------------- diff --git a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/CommunicationsInput.java b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/CommunicationsInput.java new file mode 100644 index 0000000..d2e2946 --- /dev/null +++ b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/CommunicationsInput.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.remote.protocol; + +import java.io.IOException; +import java.io.InputStream; + +public interface CommunicationsInput { + + InputStream getInputStream() throws IOException; + + long getBytesRead(); +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/fdf75846/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/CommunicationsOutput.java ---------------------------------------------------------------------- diff --git a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/CommunicationsOutput.java b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/CommunicationsOutput.java new file mode 100644 index 0000000..95cab29 --- /dev/null +++ b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/CommunicationsOutput.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.remote.protocol; + +import java.io.IOException; +import java.io.OutputStream; + +public interface CommunicationsOutput { + + OutputStream getOutputStream() throws IOException; + + long getBytesWritten(); +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/fdf75846/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/CommunicationsSession.java ---------------------------------------------------------------------- diff --git a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/CommunicationsSession.java b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/CommunicationsSession.java new file mode 100644 index 0000000..d009cec --- /dev/null +++ b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/CommunicationsSession.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.remote.protocol; + +import java.io.Closeable; +import java.io.IOException; + +public interface CommunicationsSession extends Closeable { + + public static final byte[] MAGIC_BYTES = {(byte) 'N', (byte) 'i', (byte) 'F', (byte) 'i'}; + + CommunicationsInput getInput(); + + CommunicationsOutput getOutput(); + + void setTimeout(int millis) throws IOException; + + int getTimeout() throws IOException; + + void setUri(String uri); + + String getUri(); + + String getUserDn(); + + void setUserDn(String dn); + + boolean isDataAvailable(); + + long getBytesWritten(); + + long getBytesRead(); + + /** + * Asynchronously interrupts this FlowFileCodec. Implementations must ensure + * that they stop sending and receiving data as soon as possible after this + * method has been called, even if doing so results in sending only partial + * data to the peer. This will usually result in the peer throwing a + * SocketTimeoutException. + */ + void interrupt(); + + /** + * Returns <code>true</code> if the connection is closed, <code>false</code> + * otherwise. + * + * @return + */ + boolean isClosed(); +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/fdf75846/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/RequestType.java ---------------------------------------------------------------------- diff --git a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/RequestType.java b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/RequestType.java new file mode 100644 index 0000000..41334fe --- /dev/null +++ b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/RequestType.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.remote.protocol; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; + +public enum RequestType { + + NEGOTIATE_FLOWFILE_CODEC, + REQUEST_PEER_LIST, + SEND_FLOWFILES, + RECEIVE_FLOWFILES, + SHUTDOWN; + + public void writeRequestType(final DataOutputStream dos) throws IOException { + dos.writeUTF(name()); + } + + public static RequestType readRequestType(final DataInputStream dis) throws IOException { + final String requestTypeVal = dis.readUTF(); + try { + return RequestType.valueOf(requestTypeVal); + } catch (final Exception e) { + throw new IOException("Could not determine RequestType: received invalid value " + requestTypeVal); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/fdf75846/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/HandshakeProperty.java ---------------------------------------------------------------------- diff --git a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/HandshakeProperty.java b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/HandshakeProperty.java new file mode 100644 index 0000000..c4519cd --- /dev/null +++ b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/HandshakeProperty.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.remote.protocol.socket; + +public enum HandshakeProperty { + GZIP, + PORT_IDENTIFIER, + REQUEST_EXPIRATION_MILLIS; +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/fdf75846/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/Response.java ---------------------------------------------------------------------- diff --git a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/Response.java b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/Response.java new file mode 100644 index 0000000..eae1940 --- /dev/null +++ b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/Response.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.remote.protocol.socket; + +import java.io.DataInputStream; +import java.io.IOException; + +import org.apache.nifi.remote.exception.ProtocolException; + +public class Response { + private final ResponseCode code; + private final String message; + + private Response(final ResponseCode code, final String explanation) { + this.code = code; + this.message = explanation; + } + + public ResponseCode getCode() { + return code; + } + + public String getMessage() { + return message; + } + + public static Response read(final DataInputStream in) throws IOException, ProtocolException { + final ResponseCode code = ResponseCode.readCode(in); + final String message = code.containsMessage() ? in.readUTF() : null; + return new Response(code, message); + } + + @Override + public String toString() { + return code + ": " + message; + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/fdf75846/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/ResponseCode.java ---------------------------------------------------------------------- diff --git a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/ResponseCode.java b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/ResponseCode.java new file mode 100644 index 0000000..0e588cd --- /dev/null +++ b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/ResponseCode.java @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.remote.protocol.socket; + +import java.io.DataOutputStream; +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; + +import org.apache.nifi.remote.exception.ProtocolException; + + +public enum ResponseCode { + RESERVED(0, "Reserved for Future Use", false), // This will likely be used if we ever need to expand the length of + // ResponseCode, so that we can indicate a 0 followed by some other bytes + + // handshaking properties + PROPERTIES_OK(1, "Properties OK", false), + UNKNOWN_PROPERTY_NAME(230, "Unknown Property Name", true), + ILLEGAL_PROPERTY_VALUE(231, "Illegal Property Value", true), + MISSING_PROPERTY(232, "Missing Property", true), + + // transaction indicators + CONTINUE_TRANSACTION(10, "Continue Transaction", false), + FINISH_TRANSACTION(11, "Finish Transaction", false), + CONFIRM_TRANSACTION(12, "Confirm Transaction", true), // "Explanation" of this code is the checksum + TRANSACTION_FINISHED(13, "Transaction Finished", false), + TRANSACTION_FINISHED_BUT_DESTINATION_FULL(14, "Transaction Finished But Destination is Full", false), + BAD_CHECKSUM(19, "Bad Checksum", false), + + // data availability indicators + MORE_DATA(20, "More Data Exists", false), + NO_MORE_DATA(21, "No More Data Exists", false), + + // port state indicators + UNKNOWN_PORT(200, "Unknown Port", false), + PORT_NOT_IN_VALID_STATE(201, "Port Not in a Valid State", true), + PORTS_DESTINATION_FULL(202, "Port's Destination is Full", false), + + // authorization + UNAUTHORIZED(240, "User Not Authorized", true), + + // error indicators + ABORT(250, "Abort", true), + UNRECOGNIZED_RESPONSE_CODE(254, "Unrecognized Response Code", false), + END_OF_STREAM(255, "End of Stream", false); + + private static final ResponseCode[] codeArray = new ResponseCode[256]; + + static { + for ( final ResponseCode responseCode : ResponseCode.values() ) { + codeArray[responseCode.getCode()] = responseCode; + } + } + + private static final byte CODE_SEQUENCE_VALUE_1 = (byte) 'R'; + private static final byte CODE_SEQUENCE_VALUE_2 = (byte) 'C'; + private final int code; + private final byte[] codeSequence; + private final String description; + private final boolean containsMessage; + + private ResponseCode(final int code, final String description, final boolean containsMessage) { + this.codeSequence = new byte[] {CODE_SEQUENCE_VALUE_1, CODE_SEQUENCE_VALUE_2, (byte) code}; + this.code = code; + this.description = description; + this.containsMessage = containsMessage; + } + + public int getCode() { + return code; + } + + public byte[] getCodeSequence() { + return codeSequence; + } + + @Override + public String toString() { + return description; + } + + public boolean containsMessage() { + return containsMessage; + } + + public void writeResponse(final DataOutputStream out) throws IOException { + if ( containsMessage() ) { + throw new IllegalArgumentException("ResponseCode " + code + " expects an explanation"); + } + + out.write(getCodeSequence()); + out.flush(); + } + + public void writeResponse(final DataOutputStream out, final String explanation) throws IOException { + if ( !containsMessage() ) { + throw new IllegalArgumentException("ResponseCode " + code + " does not expect an explanation"); + } + + out.write(getCodeSequence()); + out.writeUTF(explanation); + out.flush(); + } + + static ResponseCode readCode(final InputStream in) throws IOException, ProtocolException { + final int byte1 = in.read(); + if ( byte1 < 0 ) { + throw new EOFException(); + } else if ( byte1 != CODE_SEQUENCE_VALUE_1 ) { + throw new ProtocolException("Expected to receive ResponseCode, but the stream did not have a ResponseCode"); + } + + final int byte2 = in.read(); + if ( byte2 < 0 ) { + throw new EOFException(); + } else if ( byte2 != CODE_SEQUENCE_VALUE_2 ) { + throw new ProtocolException("Expected to receive ResponseCode, but the stream did not have a ResponseCode"); + } + + final int byte3 = in.read(); + if ( byte3 < 0 ) { + throw new EOFException(); + } + + final ResponseCode responseCode = codeArray[byte3]; + if (responseCode == null) { + throw new ProtocolException("Received Response Code of " + byte3 + " but do not recognize this code"); + } + return responseCode; + } + + public static ResponseCode fromSequence(final byte[] value) { + final int code = value[3] & 0xFF; + final ResponseCode responseCode = codeArray[code]; + return (responseCode == null) ? UNRECOGNIZED_RESPONSE_CODE : responseCode; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/fdf75846/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java ---------------------------------------------------------------------- diff --git a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java new file mode 100644 index 0000000..2f4f755 --- /dev/null +++ b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java @@ -0,0 +1,517 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.remote.protocol.socket; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.TimeUnit; +import java.util.zip.CRC32; +import java.util.zip.CheckedInputStream; +import java.util.zip.CheckedOutputStream; + +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.remote.Peer; +import org.apache.nifi.remote.PeerStatus; +import org.apache.nifi.remote.RemoteDestination; +import org.apache.nifi.remote.RemoteResourceInitiator; +import org.apache.nifi.remote.StandardVersionNegotiator; +import org.apache.nifi.remote.VersionNegotiator; +import org.apache.nifi.remote.codec.FlowFileCodec; +import org.apache.nifi.remote.codec.StandardFlowFileCodec; +import org.apache.nifi.remote.exception.HandshakeException; +import org.apache.nifi.remote.exception.ProtocolException; +import org.apache.nifi.remote.io.CompressionInputStream; +import org.apache.nifi.remote.io.CompressionOutputStream; +import org.apache.nifi.remote.protocol.ClientProtocol; +import org.apache.nifi.remote.protocol.CommunicationsSession; +import org.apache.nifi.remote.protocol.RequestType; +import org.apache.nifi.util.FormatUtils; +import org.apache.nifi.util.StopWatch; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class SocketClientProtocol implements ClientProtocol { + private final VersionNegotiator versionNegotiator = new StandardVersionNegotiator(4, 3, 2, 1); + + + private RemoteDestination destination; + private boolean useCompression; + + private String commsIdentifier; + private boolean handshakeComplete = false; + + private final Logger logger = LoggerFactory.getLogger(SocketClientProtocol.class); + + private Response handshakeResponse = null; + private boolean readyForFileTransfer = false; + private String transitUriPrefix = null; + + private static final long BATCH_SEND_NANOS = TimeUnit.SECONDS.toNanos(5L); // send batches of up to 5 seconds + + public SocketClientProtocol() { + } + + public void setDestination(final RemoteDestination destination) { + this.destination = destination; + this.useCompression = destination.isUseCompression(); + } + + + @Override + public void handshake(final Peer peer) throws IOException, HandshakeException { + handshake(peer, destination.getIdentifier(), (int) destination.getCommunicationsTimeout(TimeUnit.MILLISECONDS)); + } + + public void handshake(final Peer peer, final String destinationId, final int timeoutMillis) throws IOException, HandshakeException { + if ( handshakeComplete ) { + throw new IllegalStateException("Handshake has already been completed"); + } + commsIdentifier = UUID.randomUUID().toString(); + logger.debug("{} handshaking with {}", this, peer); + + final Map<HandshakeProperty, String> properties = new HashMap<>(); + properties.put(HandshakeProperty.GZIP, String.valueOf(useCompression)); + + if ( destinationId != null ) { + properties.put(HandshakeProperty.PORT_IDENTIFIER, destination.getIdentifier()); + } + + properties.put(HandshakeProperty.REQUEST_EXPIRATION_MILLIS, String.valueOf(timeoutMillis) ); + + final CommunicationsSession commsSession = peer.getCommunicationsSession(); + commsSession.setTimeout((int) destination.getCommunicationsTimeout(TimeUnit.MILLISECONDS)); + final DataInputStream dis = new DataInputStream(commsSession.getInput().getInputStream()); + final DataOutputStream dos = new DataOutputStream(commsSession.getOutput().getOutputStream()); + + dos.writeUTF(commsIdentifier); + + if ( versionNegotiator.getVersion() >= 3 ) { + dos.writeUTF(peer.getUrl()); + transitUriPrefix = peer.getUrl(); + + if ( !transitUriPrefix.endsWith("/") ) { + transitUriPrefix = transitUriPrefix + "/"; + } + } + + dos.writeInt(properties.size()); + for ( final Map.Entry<HandshakeProperty, String> entry : properties.entrySet() ) { + dos.writeUTF(entry.getKey().name()); + dos.writeUTF(entry.getValue()); + } + + dos.flush(); + + try { + handshakeResponse = Response.read(dis); + } catch (final ProtocolException e) { + throw new HandshakeException(e); + } + + switch (handshakeResponse.getCode()) { + case PORT_NOT_IN_VALID_STATE: + case UNKNOWN_PORT: + case PORTS_DESTINATION_FULL: + break; + case PROPERTIES_OK: + readyForFileTransfer = true; + break; + default: + logger.error("{} received unexpected response {} from {} when negotiating Codec", new Object[] { + this, handshakeResponse, peer}); + peer.close(); + throw new HandshakeException("Received unexpected response " + handshakeResponse); + } + + logger.debug("{} Finished handshake with {}", this, peer); + handshakeComplete = true; + } + + public boolean isReadyForFileTransfer() { + return readyForFileTransfer; + } + + public boolean isPortInvalid() { + if ( !handshakeComplete ) { + throw new IllegalStateException("Handshake has not completed successfully"); + } + return handshakeResponse.getCode() == ResponseCode.PORT_NOT_IN_VALID_STATE; + } + + public boolean isPortUnknown() { + if ( !handshakeComplete ) { + throw new IllegalStateException("Handshake has not completed successfully"); + } + return handshakeResponse.getCode() == ResponseCode.UNKNOWN_PORT; + } + + public boolean isDestinationFull() { + if ( !handshakeComplete ) { + throw new IllegalStateException("Handshake has not completed successfully"); + } + return handshakeResponse.getCode() == ResponseCode.PORTS_DESTINATION_FULL; + } + + @Override + public Set<PeerStatus> getPeerStatuses(final Peer peer) throws IOException { + if ( !handshakeComplete ) { + throw new IllegalStateException("Handshake has not been performed"); + } + + logger.debug("{} Get Peer Statuses from {}", this, peer); + final CommunicationsSession commsSession = peer.getCommunicationsSession(); + final DataInputStream dis = new DataInputStream(commsSession.getInput().getInputStream()); + final DataOutputStream dos = new DataOutputStream(commsSession.getOutput().getOutputStream()); + + RequestType.REQUEST_PEER_LIST.writeRequestType(dos); + dos.flush(); + final int numPeers = dis.readInt(); + final Set<PeerStatus> peers = new HashSet<>(numPeers); + for (int i=0; i < numPeers; i++) { + final String hostname = dis.readUTF(); + final int port = dis.readInt(); + final boolean secure = dis.readBoolean(); + final int flowFileCount = dis.readInt(); + peers.add(new PeerStatus(hostname, port, secure, flowFileCount)); + } + + logger.debug("{} Received {} Peer Statuses from {}", this, peers.size(), peer); + return peers; + } + + @Override + public FlowFileCodec negotiateCodec(final Peer peer) throws IOException, ProtocolException { + if ( !handshakeComplete ) { + throw new IllegalStateException("Handshake has not been performed"); + } + + logger.debug("{} Negotiating Codec with {}", this, peer); + final CommunicationsSession commsSession = peer.getCommunicationsSession(); + final DataInputStream dis = new DataInputStream(commsSession.getInput().getInputStream()); + final DataOutputStream dos = new DataOutputStream(commsSession.getOutput().getOutputStream()); + + RequestType.NEGOTIATE_FLOWFILE_CODEC.writeRequestType(dos); + + FlowFileCodec codec = new StandardFlowFileCodec(); + try { + codec = (FlowFileCodec) RemoteResourceInitiator.initiateResourceNegotiation(codec, dis, dos); + } catch (HandshakeException e) { + throw new ProtocolException(e.toString()); + } + logger.debug("{} negotiated FlowFileCodec {} with {}", new Object[] {this, codec, commsSession}); + + return codec; + } + + + @Override + public void receiveFlowFiles(final Peer peer, final ProcessContext context, final ProcessSession session, final FlowFileCodec codec) throws IOException, ProtocolException { + if ( !handshakeComplete ) { + throw new IllegalStateException("Handshake has not been performed"); + } + if ( !readyForFileTransfer ) { + throw new IllegalStateException("Cannot receive files; handshake resolution was " + handshakeResponse); + } + + logger.debug("{} Receiving FlowFiles from {}", this, peer); + final CommunicationsSession commsSession = peer.getCommunicationsSession(); + final DataInputStream dis = new DataInputStream(commsSession.getInput().getInputStream()); + final DataOutputStream dos = new DataOutputStream(commsSession.getOutput().getOutputStream()); + String userDn = commsSession.getUserDn(); + if ( userDn == null ) { + userDn = "none"; + } + + // Indicate that we would like to have some data + RequestType.RECEIVE_FLOWFILES.writeRequestType(dos); + dos.flush(); + + // Determine if Peer will send us data or has no data to send us + final Response dataAvailableCode = Response.read(dis); + switch (dataAvailableCode.getCode()) { + case MORE_DATA: + logger.debug("{} {} Indicates that data is available", this, peer); + break; + case NO_MORE_DATA: + logger.debug("{} No data available from {}", peer); + return; + default: + throw new ProtocolException("Got unexpected response when asking for data: " + dataAvailableCode); + } + + final StopWatch stopWatch = new StopWatch(true); + final Set<FlowFile> flowFilesReceived = new HashSet<>(); + long bytesReceived = 0L; + final CRC32 crc = new CRC32(); + + // Peer has data. Decode the bytes into FlowFiles until peer says he's finished sending data. + boolean continueTransaction = true; + String calculatedCRC = ""; + while (continueTransaction) { + final InputStream flowFileInputStream = useCompression ? new CompressionInputStream(dis) : dis; + final CheckedInputStream checkedIn = new CheckedInputStream(flowFileInputStream, crc); + + final long startNanos = System.nanoTime(); + FlowFile flowFile = codec.decode(checkedIn, session); + final long transmissionNanos = System.nanoTime() - startNanos; + final long transmissionMillis = TimeUnit.MILLISECONDS.convert(transmissionNanos, TimeUnit.NANOSECONDS); + + final String sourceFlowFileIdentifier = flowFile.getAttribute(CoreAttributes.UUID.key()); + flowFile = session.putAttribute(flowFile, CoreAttributes.UUID.key(), UUID.randomUUID().toString()); + + final String transitUri = (transitUriPrefix == null) ? peer.getUrl() : transitUriPrefix + sourceFlowFileIdentifier; + session.getProvenanceReporter().receive(flowFile, transitUri, "urn:nifi:" + sourceFlowFileIdentifier, "Remote Host=" + peer.getHost() + ", Remote DN=" + userDn, transmissionMillis); + + session.transfer(flowFile, Relationship.ANONYMOUS); + bytesReceived += flowFile.getSize(); + flowFilesReceived.add(flowFile); + logger.debug("{} Received {} from {}", this, flowFile, peer); + + final Response transactionCode = Response.read(dis); + switch (transactionCode.getCode()) { + case CONTINUE_TRANSACTION: + logger.trace("{} Received ContinueTransaction indicator from {}", this, peer); + break; + case FINISH_TRANSACTION: + logger.trace("{} Received FinishTransaction indicator from {}", this, peer); + continueTransaction = false; + calculatedCRC = String.valueOf(checkedIn.getChecksum().getValue()); + break; + default: + throw new ProtocolException("Received unexpected response from peer: when expecting Continue Transaction or Finish Transaction, received" + transactionCode); + } + } + + // we received a FINISH_TRANSACTION indicator. Send back a CONFIRM_TRANSACTION message + // to peer so that we can verify that the connection is still open. This is a two-phase commit, + // which helps to prevent the chances of data duplication. Without doing this, we may commit the + // session and then when we send the response back to the peer, the peer may have timed out and may not + // be listening. As a result, it will re-send the data. By doing this two-phase commit, we narrow the + // Critical Section involved in this transaction so that rather than the Critical Section being the + // time window involved in the entire transaction, it is reduced to a simple round-trip conversation. + logger.trace("{} Sending CONFIRM_TRANSACTION Response Code to {}", this, peer); + ResponseCode.CONFIRM_TRANSACTION.writeResponse(dos, calculatedCRC); + + final Response confirmTransactionResponse = Response.read(dis); + logger.trace("{} Received {} from {}", this, confirmTransactionResponse, peer); + + switch (confirmTransactionResponse.getCode()) { + case CONFIRM_TRANSACTION: + break; + case BAD_CHECKSUM: + session.rollback(); + throw new IOException(this + " Received a BadChecksum response from peer " + peer); + default: + throw new ProtocolException(this + " Received unexpected Response from peer " + peer + " : " + confirmTransactionResponse + "; expected 'Confirm Transaction' Response Code"); + } + + // Commit the session so that we have persisted the data + session.commit(); + + if ( context.getAvailableRelationships().isEmpty() ) { + // Confirm that we received the data and the peer can now discard it but that the peer should not + // send any more data for a bit + logger.debug("{} Sending TRANSACTION_FINISHED_BUT_DESTINATION_FULL to {}", this, peer); + ResponseCode.TRANSACTION_FINISHED_BUT_DESTINATION_FULL.writeResponse(dos); + } else { + // Confirm that we received the data and the peer can now discard it + logger.debug("{} Sending TRANSACTION_FINISHED to {}", this, peer); + ResponseCode.TRANSACTION_FINISHED.writeResponse(dos); + } + + stopWatch.stop(); + final String flowFileDescription = flowFilesReceived.size() < 20 ? flowFilesReceived.toString() : flowFilesReceived.size() + " FlowFiles"; + final String uploadDataRate = stopWatch.calculateDataRate(bytesReceived); + final long uploadMillis = stopWatch.getDuration(TimeUnit.MILLISECONDS); + final String dataSize = FormatUtils.formatDataSize(bytesReceived); + logger.info("{} Successfully receveied {} ({}) from {} in {} milliseconds at a rate of {}", new Object[] { + this, flowFileDescription, dataSize, peer, uploadMillis, uploadDataRate}); + } + + @Override + public void transferFlowFiles(final Peer peer, final ProcessContext context, final ProcessSession session, final FlowFileCodec codec) throws IOException, ProtocolException { + if ( !handshakeComplete ) { + throw new IllegalStateException("Handshake has not been performed"); + } + if ( !readyForFileTransfer ) { + throw new IllegalStateException("Cannot transfer files; handshake resolution was " + handshakeResponse); + } + + FlowFile flowFile = session.get(); + if ( flowFile == null ) { + return; + } + + logger.debug("{} Sending FlowFiles to {}", this, peer); + final CommunicationsSession commsSession = peer.getCommunicationsSession(); + final DataInputStream dis = new DataInputStream(commsSession.getInput().getInputStream()); + final DataOutputStream dos = new DataOutputStream(commsSession.getOutput().getOutputStream()); + String userDn = commsSession.getUserDn(); + if ( userDn == null ) { + userDn = "none"; + } + + // Indicate that we would like to have some data + RequestType.SEND_FLOWFILES.writeRequestType(dos); + dos.flush(); + + final StopWatch stopWatch = new StopWatch(true); + final CRC32 crc = new CRC32(); + + long bytesSent = 0L; + final Set<FlowFile> flowFilesSent = new HashSet<>(); + boolean continueTransaction = true; + String calculatedCRC = ""; + final long startSendingNanos = System.nanoTime(); + while (continueTransaction) { + final OutputStream flowFileOutputStream = useCompression ? new CompressionOutputStream(dos) : dos; + logger.debug("{} Sending {} to {}", this, flowFile, peer); + + final CheckedOutputStream checkedOutStream = new CheckedOutputStream(flowFileOutputStream, crc); + + final long startNanos = System.nanoTime(); + flowFile = codec.encode(flowFile, session, checkedOutStream); + final long transferNanos = System.nanoTime() - startNanos; + final long transferMillis = TimeUnit.MILLISECONDS.convert(transferNanos, TimeUnit.NANOSECONDS); + + // need to close the CompressionOutputStream in order to force it write out any remaining bytes. + // Otherwise, do NOT close it because we don't want to close the underlying stream + // (CompressionOutputStream will not close the underlying stream when it's closed) + if ( useCompression ) { + checkedOutStream.close(); + } + + flowFilesSent.add(flowFile); + bytesSent += flowFile.getSize(); + logger.debug("{} Sent {} to {}", this, flowFile, peer); + + final String transitUri = (transitUriPrefix == null) ? peer.getUrl() : transitUriPrefix + flowFile.getAttribute(CoreAttributes.UUID.key()); + session.getProvenanceReporter().send(flowFile, transitUri, "Remote Host=" + peer.getHost() + ", Remote DN=" + userDn, transferMillis, false); + session.remove(flowFile); + + final long sendingNanos = System.nanoTime() - startSendingNanos; + if ( sendingNanos < BATCH_SEND_NANOS ) { + flowFile = session.get(); + } else { + flowFile = null; + } + + continueTransaction = (flowFile != null); + if ( continueTransaction ) { + logger.debug("{} Sent CONTINUE_TRANSACTION indicator to {}", this, peer); + ResponseCode.CONTINUE_TRANSACTION.writeResponse(dos); + } else { + logger.debug("{} Sent FINISH_TRANSACTION indicator to {}", this, peer); + ResponseCode.FINISH_TRANSACTION.writeResponse(dos); + + calculatedCRC = String.valueOf( checkedOutStream.getChecksum().getValue() ); + } + } + + // we've sent a FINISH_TRANSACTION. Now we'll wait for the peer to send a 'Confirm Transaction' response + final Response transactionConfirmationResponse = Response.read(dis); + if ( transactionConfirmationResponse.getCode() == ResponseCode.CONFIRM_TRANSACTION ) { + // Confirm checksum and echo back the confirmation. + logger.trace("{} Received {} from {}", this, transactionConfirmationResponse, peer); + final String receivedCRC = transactionConfirmationResponse.getMessage(); + + if ( versionNegotiator.getVersion() > 3 ) { + if ( !receivedCRC.equals(calculatedCRC) ) { + ResponseCode.BAD_CHECKSUM.writeResponse(dos); + session.rollback(); + throw new IOException(this + " Sent data to peer " + peer + " but calculated CRC32 Checksum as " + calculatedCRC + " while peer calculated CRC32 Checksum as " + receivedCRC + "; canceling transaction and rolling back session"); + } + } + + ResponseCode.CONFIRM_TRANSACTION.writeResponse(dos, ""); + } else { + throw new ProtocolException("Expected to receive 'Confirm Transaction' response from peer " + peer + " but received " + transactionConfirmationResponse); + } + + final String flowFileDescription = (flowFilesSent.size() < 20) ? flowFilesSent.toString() : flowFilesSent.size() + " FlowFiles"; + + final Response transactionResponse; + try { + transactionResponse = Response.read(dis); + } catch (final IOException e) { + logger.error("{} Failed to receive a response from {} when expecting a TransactionFinished Indicator." + + " It is unknown whether or not the peer successfully received/processed the data." + + " Therefore, {} will be rolled back, possibly resulting in data duplication of {}", + this, peer, session, flowFileDescription); + session.rollback(); + throw e; + } + + logger.debug("{} Received {} from {}", this, transactionResponse, peer); + if ( transactionResponse.getCode() == ResponseCode.TRANSACTION_FINISHED_BUT_DESTINATION_FULL ) { + peer.penalize(destination.getYieldPeriod(TimeUnit.MILLISECONDS)); + } else if ( transactionResponse.getCode() != ResponseCode.TRANSACTION_FINISHED ) { + throw new ProtocolException("After sending data, expected TRANSACTION_FINISHED response but got " + transactionResponse); + } + + // consume input stream entirely, ignoring its contents. If we + // don't do this, the Connection will not be returned to the pool + stopWatch.stop(); + final String uploadDataRate = stopWatch.calculateDataRate(bytesSent); + final long uploadMillis = stopWatch.getDuration(TimeUnit.MILLISECONDS); + final String dataSize = FormatUtils.formatDataSize(bytesSent); + + session.commit(); + + logger.info("{} Successfully sent {} ({}) to {} in {} milliseconds at a rate of {}", new Object[] { + this, flowFileDescription, dataSize, peer, uploadMillis, uploadDataRate}); + } + + @Override + public VersionNegotiator getVersionNegotiator() { + return versionNegotiator; + } + + @Override + public void shutdown(final Peer peer) throws IOException { + readyForFileTransfer = false; + final CommunicationsSession commsSession = peer.getCommunicationsSession(); + final DataOutputStream dos = new DataOutputStream(commsSession.getOutput().getOutputStream()); + + logger.debug("{} Shutting down with {}", this, peer); + // Indicate that we would like to have some data + RequestType.SHUTDOWN.writeRequestType(dos); + dos.flush(); + } + + @Override + public String getResourceName() { + return "SocketFlowFileProtocol"; + } + + @Override + public String toString() { + return "SocketClientProtocol[CommsID=" + commsIdentifier + "]"; + } +}