Repository: incubator-nifi Updated Branches: refs/heads/develop 42f69196c -> 705ee852b
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/05b64593/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/codec/StandardFlowFileCodec.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/codec/StandardFlowFileCodec.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/codec/StandardFlowFileCodec.java deleted file mode 100644 index d18a4ee..0000000 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/codec/StandardFlowFileCodec.java +++ /dev/null @@ -1,169 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.remote.codec; - -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.io.EOFException; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import org.apache.nifi.flowfile.FlowFile; -import org.apache.nifi.stream.io.StreamUtils; -import org.apache.nifi.processor.ProcessSession; -import org.apache.nifi.processor.io.InputStreamCallback; -import org.apache.nifi.processor.io.OutputStreamCallback; -import org.apache.nifi.remote.StandardVersionNegotiator; -import org.apache.nifi.remote.VersionNegotiator; -import org.apache.nifi.remote.exception.ProtocolException; - -public class StandardFlowFileCodec implements FlowFileCodec { - public static final int MAX_NUM_ATTRIBUTES = 25000; - - public static final String DEFAULT_FLOWFILE_PATH = "./"; - - private final VersionNegotiator versionNegotiator; - - public StandardFlowFileCodec() { - versionNegotiator = new StandardVersionNegotiator(1); - } - - @Override - public FlowFile encode(final FlowFile flowFile, final ProcessSession session, final OutputStream encodedOut) throws IOException { - final DataOutputStream out = new DataOutputStream(encodedOut); - - final Map<String, String> attributes = flowFile.getAttributes(); - out.writeInt(attributes.size()); - for ( final Map.Entry<String, String> entry : attributes.entrySet() ) { - writeString(entry.getKey(), out); - writeString(entry.getValue(), out); - } - - out.writeLong(flowFile.getSize()); - - session.read(flowFile, new InputStreamCallback() { - @Override - public void process(final InputStream in) throws IOException { - final byte[] buffer = new byte[8192]; - int len; - while ( (len = in.read(buffer)) > 0 ) { - encodedOut.write(buffer, 0, len); - } - - encodedOut.flush(); - } - }); - - return flowFile; - } - - - @Override - public FlowFile decode(final InputStream stream, final ProcessSession session) throws IOException, ProtocolException { - final DataInputStream in = new DataInputStream(stream); - - final int numAttributes; - try { - numAttributes = in.readInt(); - } catch (final EOFException e) { - // we're out of data. - return null; - } - - // This is here because if the stream is not properly formed, we could get up to Integer.MAX_VALUE attributes, which will - // generally result in an OutOfMemoryError. - if ( numAttributes > MAX_NUM_ATTRIBUTES ) { - throw new ProtocolException("FlowFile exceeds maximum number of attributes with a total of " + numAttributes); - } - - try { - final Map<String, String> attributes = new HashMap<>(numAttributes); - for (int i=0; i < numAttributes; i++) { - final String attrName = readString(in); - final String attrValue = readString(in); - attributes.put(attrName, attrValue); - } - - final long numBytes = in.readLong(); - - FlowFile flowFile = session.create(); - flowFile = session.putAllAttributes(flowFile, attributes); - flowFile = session.write(flowFile, new OutputStreamCallback() { - @Override - public void process(final OutputStream out) throws IOException { - int len; - long size = 0; - final byte[] buffer = new byte[8192]; - - while ( size < numBytes && (len = in.read(buffer, 0, (int) Math.min(buffer.length, numBytes - size))) > 0 ) { - out.write(buffer, 0, len); - size += len; - } - - if ( size != numBytes ) { - throw new EOFException("Expected " + numBytes + " bytes but received only " + size); - } - } - }); - - return flowFile; - } catch (final EOFException e) { - session.rollback(); - - // we throw the general IOException here because we did not expect to hit EOFException - throw e; - } - } - - private void writeString(final String val, final DataOutputStream out) throws IOException { - final byte[] bytes = val.getBytes("UTF-8"); - out.writeInt(bytes.length); - out.write(bytes); - } - - - private String readString(final DataInputStream in) throws IOException { - final int numBytes = in.readInt(); - final byte[] bytes = new byte[numBytes]; - StreamUtils.fillBuffer(in, bytes, true); - return new String(bytes, "UTF-8"); - } - - @Override - public List<Integer> getSupportedVersions() { - return versionNegotiator.getSupportedVersions(); - } - - @Override - public VersionNegotiator getVersionNegotiator() { - return versionNegotiator; - } - - @Override - public String toString() { - return "Standard FlowFile Codec, Version " + versionNegotiator.getVersion(); - } - - @Override - public String getResourceName() { - return "StandardFlowFileCodec"; - } -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/05b64593/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelCommunicationsSession.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelCommunicationsSession.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelCommunicationsSession.java deleted file mode 100644 index 0822b6a..0000000 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelCommunicationsSession.java +++ /dev/null @@ -1,90 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.remote.io.socket; - -import java.io.IOException; -import java.nio.channels.SocketChannel; - -import org.apache.nifi.remote.AbstractCommunicationsSession; - -public class SocketChannelCommunicationsSession extends AbstractCommunicationsSession { - private final SocketChannel channel; - private final SocketChannelInput request; - private final SocketChannelOutput response; - private int timeout = 30000; - - public SocketChannelCommunicationsSession(final SocketChannel socketChannel, final String uri) throws IOException { - super(uri); - request = new SocketChannelInput(socketChannel); - response = new SocketChannelOutput(socketChannel); - channel = socketChannel; - socketChannel.configureBlocking(false); - } - - @Override - public boolean isClosed() { - return !channel.isConnected(); - } - - @Override - public SocketChannelInput getInput() { - return request; - } - - @Override - public SocketChannelOutput getOutput() { - return response; - } - - @Override - public void setTimeout(final int millis) throws IOException { - request.setTimeout(millis); - response.setTimeout(millis); - this.timeout = millis; - } - - @Override - public int getTimeout() throws IOException { - return timeout; - } - - @Override - public void close() throws IOException { - channel.close(); - } - - @Override - public boolean isDataAvailable() { - return request.isDataAvailable(); - } - - @Override - public long getBytesWritten() { - return response.getBytesWritten(); - } - - @Override - public long getBytesRead() { - return request.getBytesRead(); - } - - @Override - public void interrupt() { - request.interrupt(); - response.interrupt(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/05b64593/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelInput.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelInput.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelInput.java deleted file mode 100644 index 9e451fd..0000000 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelInput.java +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.remote.io.socket; - -import java.io.IOException; -import java.io.InputStream; -import java.nio.channels.SocketChannel; - -import org.apache.nifi.stream.io.BufferedInputStream; -import org.apache.nifi.stream.io.ByteCountingInputStream; -import org.apache.nifi.remote.io.InterruptableInputStream; -import org.apache.nifi.remote.protocol.CommunicationsInput; - -public class SocketChannelInput implements CommunicationsInput { - private final SocketChannelInputStream socketIn; - private final ByteCountingInputStream countingIn; - private final InputStream bufferedIn; - private final InterruptableInputStream interruptableIn; - - public SocketChannelInput(final SocketChannel socketChannel) throws IOException { - this.socketIn = new SocketChannelInputStream(socketChannel); - countingIn = new ByteCountingInputStream(socketIn); - bufferedIn = new BufferedInputStream(countingIn); - interruptableIn = new InterruptableInputStream(bufferedIn); - } - - @Override - public InputStream getInputStream() throws IOException { - return interruptableIn; - } - - public void setTimeout(final int millis) { - socketIn.setTimeout(millis); - } - - public boolean isDataAvailable() { - try { - return interruptableIn.available() > 0; - } catch (final Exception e) { - return false; - } - } - - @Override - public long getBytesRead() { - return countingIn.getBytesRead(); - } - - public void interrupt() { - interruptableIn.interrupt(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/05b64593/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelOutput.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelOutput.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelOutput.java deleted file mode 100644 index 26c0164..0000000 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/io/socket/SocketChannelOutput.java +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.remote.io.socket; - -import java.io.IOException; -import java.io.OutputStream; -import java.nio.channels.SocketChannel; - -import org.apache.nifi.stream.io.BufferedOutputStream; -import org.apache.nifi.stream.io.ByteCountingOutputStream; -import org.apache.nifi.remote.io.InterruptableOutputStream; -import org.apache.nifi.remote.protocol.CommunicationsOutput; - -public class SocketChannelOutput implements CommunicationsOutput { - private final SocketChannelOutputStream socketOutStream; - private final ByteCountingOutputStream countingOut; - private final OutputStream bufferedOut; - private final InterruptableOutputStream interruptableOut; - - public SocketChannelOutput(final SocketChannel socketChannel) throws IOException { - socketOutStream = new SocketChannelOutputStream(socketChannel); - countingOut = new ByteCountingOutputStream(socketOutStream); - bufferedOut = new BufferedOutputStream(countingOut); - interruptableOut = new InterruptableOutputStream(bufferedOut); - } - - @Override - public OutputStream getOutputStream() throws IOException { - return interruptableOut; - } - - public void setTimeout(final int timeout) { - socketOutStream.setTimeout(timeout); - } - - @Override - public long getBytesWritten() { - return countingOut.getBytesWritten(); - } - - public void interrupt() { - interruptableOut.interrupt(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/05b64593/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/ClusterManagerServerProtocol.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/ClusterManagerServerProtocol.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/ClusterManagerServerProtocol.java index a526f4c..391d52b 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/ClusterManagerServerProtocol.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/ClusterManagerServerProtocol.java @@ -21,9 +21,6 @@ import java.io.DataOutputStream; import java.io.IOException; import java.util.Collection; -import org.apache.nifi.cluster.ClusterNodeInformation; -import org.apache.nifi.cluster.NodeInformant; -import org.apache.nifi.cluster.NodeInformation; import org.apache.nifi.groups.ProcessGroup; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; @@ -31,12 +28,14 @@ import org.apache.nifi.remote.Peer; import org.apache.nifi.remote.RootGroupPort; import org.apache.nifi.remote.StandardVersionNegotiator; import org.apache.nifi.remote.VersionNegotiator; +import org.apache.nifi.remote.cluster.ClusterNodeInformation; +import org.apache.nifi.remote.cluster.NodeInformant; +import org.apache.nifi.remote.cluster.NodeInformation; import org.apache.nifi.remote.codec.FlowFileCodec; import org.apache.nifi.remote.exception.HandshakeException; import org.apache.nifi.remote.protocol.CommunicationsSession; import org.apache.nifi.remote.protocol.RequestType; import org.apache.nifi.remote.protocol.ServerProtocol; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/05b64593/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/HandshakeProperty.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/HandshakeProperty.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/HandshakeProperty.java deleted file mode 100644 index c4519cd..0000000 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/HandshakeProperty.java +++ /dev/null @@ -1,23 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.remote.protocol.socket; - -public enum HandshakeProperty { - GZIP, - PORT_IDENTIFIER, - REQUEST_EXPIRATION_MILLIS; -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/05b64593/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/Response.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/Response.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/Response.java deleted file mode 100644 index eae1940..0000000 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/Response.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.remote.protocol.socket; - -import java.io.DataInputStream; -import java.io.IOException; - -import org.apache.nifi.remote.exception.ProtocolException; - -public class Response { - private final ResponseCode code; - private final String message; - - private Response(final ResponseCode code, final String explanation) { - this.code = code; - this.message = explanation; - } - - public ResponseCode getCode() { - return code; - } - - public String getMessage() { - return message; - } - - public static Response read(final DataInputStream in) throws IOException, ProtocolException { - final ResponseCode code = ResponseCode.readCode(in); - final String message = code.containsMessage() ? in.readUTF() : null; - return new Response(code, message); - } - - @Override - public String toString() { - return code + ": " + message; - } -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/05b64593/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/ResponseCode.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/ResponseCode.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/ResponseCode.java deleted file mode 100644 index 0e588cd..0000000 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/ResponseCode.java +++ /dev/null @@ -1,152 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.remote.protocol.socket; - -import java.io.DataOutputStream; -import java.io.EOFException; -import java.io.IOException; -import java.io.InputStream; - -import org.apache.nifi.remote.exception.ProtocolException; - - -public enum ResponseCode { - RESERVED(0, "Reserved for Future Use", false), // This will likely be used if we ever need to expand the length of - // ResponseCode, so that we can indicate a 0 followed by some other bytes - - // handshaking properties - PROPERTIES_OK(1, "Properties OK", false), - UNKNOWN_PROPERTY_NAME(230, "Unknown Property Name", true), - ILLEGAL_PROPERTY_VALUE(231, "Illegal Property Value", true), - MISSING_PROPERTY(232, "Missing Property", true), - - // transaction indicators - CONTINUE_TRANSACTION(10, "Continue Transaction", false), - FINISH_TRANSACTION(11, "Finish Transaction", false), - CONFIRM_TRANSACTION(12, "Confirm Transaction", true), // "Explanation" of this code is the checksum - TRANSACTION_FINISHED(13, "Transaction Finished", false), - TRANSACTION_FINISHED_BUT_DESTINATION_FULL(14, "Transaction Finished But Destination is Full", false), - BAD_CHECKSUM(19, "Bad Checksum", false), - - // data availability indicators - MORE_DATA(20, "More Data Exists", false), - NO_MORE_DATA(21, "No More Data Exists", false), - - // port state indicators - UNKNOWN_PORT(200, "Unknown Port", false), - PORT_NOT_IN_VALID_STATE(201, "Port Not in a Valid State", true), - PORTS_DESTINATION_FULL(202, "Port's Destination is Full", false), - - // authorization - UNAUTHORIZED(240, "User Not Authorized", true), - - // error indicators - ABORT(250, "Abort", true), - UNRECOGNIZED_RESPONSE_CODE(254, "Unrecognized Response Code", false), - END_OF_STREAM(255, "End of Stream", false); - - private static final ResponseCode[] codeArray = new ResponseCode[256]; - - static { - for ( final ResponseCode responseCode : ResponseCode.values() ) { - codeArray[responseCode.getCode()] = responseCode; - } - } - - private static final byte CODE_SEQUENCE_VALUE_1 = (byte) 'R'; - private static final byte CODE_SEQUENCE_VALUE_2 = (byte) 'C'; - private final int code; - private final byte[] codeSequence; - private final String description; - private final boolean containsMessage; - - private ResponseCode(final int code, final String description, final boolean containsMessage) { - this.codeSequence = new byte[] {CODE_SEQUENCE_VALUE_1, CODE_SEQUENCE_VALUE_2, (byte) code}; - this.code = code; - this.description = description; - this.containsMessage = containsMessage; - } - - public int getCode() { - return code; - } - - public byte[] getCodeSequence() { - return codeSequence; - } - - @Override - public String toString() { - return description; - } - - public boolean containsMessage() { - return containsMessage; - } - - public void writeResponse(final DataOutputStream out) throws IOException { - if ( containsMessage() ) { - throw new IllegalArgumentException("ResponseCode " + code + " expects an explanation"); - } - - out.write(getCodeSequence()); - out.flush(); - } - - public void writeResponse(final DataOutputStream out, final String explanation) throws IOException { - if ( !containsMessage() ) { - throw new IllegalArgumentException("ResponseCode " + code + " does not expect an explanation"); - } - - out.write(getCodeSequence()); - out.writeUTF(explanation); - out.flush(); - } - - static ResponseCode readCode(final InputStream in) throws IOException, ProtocolException { - final int byte1 = in.read(); - if ( byte1 < 0 ) { - throw new EOFException(); - } else if ( byte1 != CODE_SEQUENCE_VALUE_1 ) { - throw new ProtocolException("Expected to receive ResponseCode, but the stream did not have a ResponseCode"); - } - - final int byte2 = in.read(); - if ( byte2 < 0 ) { - throw new EOFException(); - } else if ( byte2 != CODE_SEQUENCE_VALUE_2 ) { - throw new ProtocolException("Expected to receive ResponseCode, but the stream did not have a ResponseCode"); - } - - final int byte3 = in.read(); - if ( byte3 < 0 ) { - throw new EOFException(); - } - - final ResponseCode responseCode = codeArray[byte3]; - if (responseCode == null) { - throw new ProtocolException("Received Response Code of " + byte3 + " but do not recognize this code"); - } - return responseCode; - } - - public static ResponseCode fromSequence(final byte[] value) { - final int code = value[3] & 0xFF; - final ResponseCode responseCode = codeArray[code]; - return (responseCode == null) ? UNRECOGNIZED_RESPONSE_CODE : responseCode; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/05b64593/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java deleted file mode 100644 index d4b4f61..0000000 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java +++ /dev/null @@ -1,510 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.remote.protocol.socket; - -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; -import java.util.UUID; -import java.util.concurrent.TimeUnit; -import java.util.zip.CRC32; -import java.util.zip.CheckedInputStream; -import java.util.zip.CheckedOutputStream; - -import org.apache.nifi.flowfile.FlowFile; -import org.apache.nifi.flowfile.attributes.CoreAttributes; -import org.apache.nifi.processor.ProcessContext; -import org.apache.nifi.processor.ProcessSession; -import org.apache.nifi.processor.Relationship; -import org.apache.nifi.remote.Peer; -import org.apache.nifi.remote.PeerStatus; -import org.apache.nifi.remote.RemoteGroupPort; -import org.apache.nifi.remote.RemoteResourceFactory; -import org.apache.nifi.remote.StandardVersionNegotiator; -import org.apache.nifi.remote.VersionNegotiator; -import org.apache.nifi.remote.codec.FlowFileCodec; -import org.apache.nifi.remote.codec.StandardFlowFileCodec; -import org.apache.nifi.remote.exception.HandshakeException; -import org.apache.nifi.remote.exception.ProtocolException; -import org.apache.nifi.remote.io.CompressionInputStream; -import org.apache.nifi.remote.io.CompressionOutputStream; -import org.apache.nifi.remote.protocol.ClientProtocol; -import org.apache.nifi.remote.protocol.CommunicationsSession; -import org.apache.nifi.remote.protocol.RequestType; -import org.apache.nifi.util.FormatUtils; -import org.apache.nifi.util.StopWatch; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class SocketClientProtocol implements ClientProtocol { - private final VersionNegotiator versionNegotiator = new StandardVersionNegotiator(4, 3, 2, 1); - - - private RemoteGroupPort port; - private boolean useCompression; - - private String commsIdentifier; - private boolean handshakeComplete = false; - - private final Logger logger = LoggerFactory.getLogger(SocketClientProtocol.class); - - private Response handshakeResponse = null; - private boolean readyForFileTransfer = false; - private String transitUriPrefix = null; - - private static final long BATCH_SEND_NANOS = TimeUnit.SECONDS.toNanos(5L); // send batches of up to 5 seconds - - public SocketClientProtocol() { - } - - public void setPort(final RemoteGroupPort port) { - this.port = port; - this.useCompression = port.isUseCompression(); - } - - @Override - public void handshake(final Peer peer) throws IOException, HandshakeException { - if ( handshakeComplete ) { - throw new IllegalStateException("Handshake has already been completed"); - } - commsIdentifier = UUID.randomUUID().toString(); - logger.debug("{} handshaking with {}", this, peer); - - final Map<HandshakeProperty, String> properties = new HashMap<>(); - properties.put(HandshakeProperty.GZIP, String.valueOf(useCompression)); - properties.put(HandshakeProperty.PORT_IDENTIFIER, port.getIdentifier()); - properties.put(HandshakeProperty.REQUEST_EXPIRATION_MILLIS, String.valueOf( - port.getRemoteProcessGroup().getCommunicationsTimeout(TimeUnit.MILLISECONDS)) ); - - final CommunicationsSession commsSession = peer.getCommunicationsSession(); - commsSession.setTimeout(port.getRemoteProcessGroup().getCommunicationsTimeout(TimeUnit.MILLISECONDS)); - final DataInputStream dis = new DataInputStream(commsSession.getInput().getInputStream()); - final DataOutputStream dos = new DataOutputStream(commsSession.getOutput().getOutputStream()); - - dos.writeUTF(commsIdentifier); - - if ( versionNegotiator.getVersion() >= 3 ) { - dos.writeUTF(peer.getUrl()); - transitUriPrefix = peer.getUrl(); - - if ( !transitUriPrefix.endsWith("/") ) { - transitUriPrefix = transitUriPrefix + "/"; - } - } - - dos.writeInt(properties.size()); - for ( final Map.Entry<HandshakeProperty, String> entry : properties.entrySet() ) { - dos.writeUTF(entry.getKey().name()); - dos.writeUTF(entry.getValue()); - } - - dos.flush(); - - try { - handshakeResponse = Response.read(dis); - } catch (final ProtocolException e) { - throw new HandshakeException(e); - } - - switch (handshakeResponse.getCode()) { - case PORT_NOT_IN_VALID_STATE: - case UNKNOWN_PORT: - case PORTS_DESTINATION_FULL: - break; - case PROPERTIES_OK: - readyForFileTransfer = true; - break; - default: - logger.error("{} received unexpected response {} from {} when negotiating Codec", new Object[] { - this, handshakeResponse, peer}); - peer.close(); - throw new HandshakeException("Received unexpected response " + handshakeResponse); - } - - logger.debug("{} Finished handshake with {}", this, peer); - handshakeComplete = true; - } - - public boolean isReadyForFileTransfer() { - return readyForFileTransfer; - } - - public boolean isPortInvalid() { - if ( !handshakeComplete ) { - throw new IllegalStateException("Handshake has not completed successfully"); - } - return handshakeResponse.getCode() == ResponseCode.PORT_NOT_IN_VALID_STATE; - } - - public boolean isPortUnknown() { - if ( !handshakeComplete ) { - throw new IllegalStateException("Handshake has not completed successfully"); - } - return handshakeResponse.getCode() == ResponseCode.UNKNOWN_PORT; - } - - public boolean isDestinationFull() { - if ( !handshakeComplete ) { - throw new IllegalStateException("Handshake has not completed successfully"); - } - return handshakeResponse.getCode() == ResponseCode.PORTS_DESTINATION_FULL; - } - - @Override - public Set<PeerStatus> getPeerStatuses(final Peer peer) throws IOException { - if ( !handshakeComplete ) { - throw new IllegalStateException("Handshake has not been performed"); - } - - logger.debug("{} Get Peer Statuses from {}", this, peer); - final CommunicationsSession commsSession = peer.getCommunicationsSession(); - final DataInputStream dis = new DataInputStream(commsSession.getInput().getInputStream()); - final DataOutputStream dos = new DataOutputStream(commsSession.getOutput().getOutputStream()); - - RequestType.REQUEST_PEER_LIST.writeRequestType(dos); - dos.flush(); - final int numPeers = dis.readInt(); - final Set<PeerStatus> peers = new HashSet<>(numPeers); - for (int i=0; i < numPeers; i++) { - final String hostname = dis.readUTF(); - final int port = dis.readInt(); - final boolean secure = dis.readBoolean(); - final int flowFileCount = dis.readInt(); - peers.add(new PeerStatus(hostname, port, secure, flowFileCount)); - } - - logger.debug("{} Received {} Peer Statuses from {}", this, peers.size(), peer); - return peers; - } - - @Override - public FlowFileCodec negotiateCodec(final Peer peer) throws IOException, ProtocolException { - if ( !handshakeComplete ) { - throw new IllegalStateException("Handshake has not been performed"); - } - - logger.debug("{} Negotiating Codec with {}", this, peer); - final CommunicationsSession commsSession = peer.getCommunicationsSession(); - final DataInputStream dis = new DataInputStream(commsSession.getInput().getInputStream()); - final DataOutputStream dos = new DataOutputStream(commsSession.getOutput().getOutputStream()); - - RequestType.NEGOTIATE_FLOWFILE_CODEC.writeRequestType(dos); - - FlowFileCodec codec = new StandardFlowFileCodec(); - try { - codec = (FlowFileCodec) RemoteResourceFactory.initiateResourceNegotiation(codec, dis, dos); - } catch (HandshakeException e) { - throw new ProtocolException(e.toString()); - } - logger.debug("{} negotiated FlowFileCodec {} with {}", new Object[] {this, codec, commsSession}); - - return codec; - } - - - @Override - public void receiveFlowFiles(final Peer peer, final ProcessContext context, final ProcessSession session, final FlowFileCodec codec) throws IOException, ProtocolException { - if ( !handshakeComplete ) { - throw new IllegalStateException("Handshake has not been performed"); - } - if ( !readyForFileTransfer ) { - throw new IllegalStateException("Cannot receive files; handshake resolution was " + handshakeResponse); - } - - logger.debug("{} Receiving FlowFiles from {}", this, peer); - final CommunicationsSession commsSession = peer.getCommunicationsSession(); - final DataInputStream dis = new DataInputStream(commsSession.getInput().getInputStream()); - final DataOutputStream dos = new DataOutputStream(commsSession.getOutput().getOutputStream()); - String userDn = commsSession.getUserDn(); - if ( userDn == null ) { - userDn = "none"; - } - - // Indicate that we would like to have some data - RequestType.RECEIVE_FLOWFILES.writeRequestType(dos); - dos.flush(); - - // Determine if Peer will send us data or has no data to send us - final Response dataAvailableCode = Response.read(dis); - switch (dataAvailableCode.getCode()) { - case MORE_DATA: - logger.debug("{} {} Indicates that data is available", this, peer); - break; - case NO_MORE_DATA: - logger.debug("{} No data available from {}", peer); - return; - default: - throw new ProtocolException("Got unexpected response when asking for data: " + dataAvailableCode); - } - - final StopWatch stopWatch = new StopWatch(true); - final Set<FlowFile> flowFilesReceived = new HashSet<>(); - long bytesReceived = 0L; - final CRC32 crc = new CRC32(); - - // Peer has data. Decode the bytes into FlowFiles until peer says he's finished sending data. - boolean continueTransaction = true; - String calculatedCRC = ""; - while (continueTransaction) { - final InputStream flowFileInputStream = useCompression ? new CompressionInputStream(dis) : dis; - final CheckedInputStream checkedIn = new CheckedInputStream(flowFileInputStream, crc); - - final long startNanos = System.nanoTime(); - FlowFile flowFile = codec.decode(checkedIn, session); - final long transmissionNanos = System.nanoTime() - startNanos; - final long transmissionMillis = TimeUnit.MILLISECONDS.convert(transmissionNanos, TimeUnit.NANOSECONDS); - - final String sourceFlowFileIdentifier = flowFile.getAttribute(CoreAttributes.UUID.key()); - flowFile = session.putAttribute(flowFile, CoreAttributes.UUID.key(), UUID.randomUUID().toString()); - - final String transitUri = (transitUriPrefix == null) ? peer.getUrl() : transitUriPrefix + sourceFlowFileIdentifier; - session.getProvenanceReporter().receive(flowFile, transitUri, "urn:nifi:" + sourceFlowFileIdentifier, "Remote Host=" + peer.getHost() + ", Remote DN=" + userDn, transmissionMillis); - - session.transfer(flowFile, Relationship.ANONYMOUS); - bytesReceived += flowFile.getSize(); - flowFilesReceived.add(flowFile); - logger.debug("{} Received {} from {}", this, flowFile, peer); - - final Response transactionCode = Response.read(dis); - switch (transactionCode.getCode()) { - case CONTINUE_TRANSACTION: - logger.trace("{} Received ContinueTransaction indicator from {}", this, peer); - break; - case FINISH_TRANSACTION: - logger.trace("{} Received FinishTransaction indicator from {}", this, peer); - continueTransaction = false; - calculatedCRC = String.valueOf(checkedIn.getChecksum().getValue()); - break; - default: - throw new ProtocolException("Received unexpected response from peer: when expecting Continue Transaction or Finish Transaction, received" + transactionCode); - } - } - - // we received a FINISH_TRANSACTION indicator. Send back a CONFIRM_TRANSACTION message - // to peer so that we can verify that the connection is still open. This is a two-phase commit, - // which helps to prevent the chances of data duplication. Without doing this, we may commit the - // session and then when we send the response back to the peer, the peer may have timed out and may not - // be listening. As a result, it will re-send the data. By doing this two-phase commit, we narrow the - // Critical Section involved in this transaction so that rather than the Critical Section being the - // time window involved in the entire transaction, it is reduced to a simple round-trip conversation. - logger.trace("{} Sending CONFIRM_TRANSACTION Response Code to {}", this, peer); - ResponseCode.CONFIRM_TRANSACTION.writeResponse(dos, calculatedCRC); - - final Response confirmTransactionResponse = Response.read(dis); - logger.trace("{} Received {} from {}", this, confirmTransactionResponse, peer); - - switch (confirmTransactionResponse.getCode()) { - case CONFIRM_TRANSACTION: - break; - case BAD_CHECKSUM: - session.rollback(); - throw new IOException(this + " Received a BadChecksum response from peer " + peer); - default: - throw new ProtocolException(this + " Received unexpected Response from peer " + peer + " : " + confirmTransactionResponse + "; expected 'Confirm Transaction' Response Code"); - } - - // Commit the session so that we have persisted the data - session.commit(); - - if ( context.getAvailableRelationships().isEmpty() ) { - // Confirm that we received the data and the peer can now discard it but that the peer should not - // send any more data for a bit - logger.debug("{} Sending TRANSACTION_FINISHED_BUT_DESTINATION_FULL to {}", this, peer); - ResponseCode.TRANSACTION_FINISHED_BUT_DESTINATION_FULL.writeResponse(dos); - } else { - // Confirm that we received the data and the peer can now discard it - logger.debug("{} Sending TRANSACTION_FINISHED to {}", this, peer); - ResponseCode.TRANSACTION_FINISHED.writeResponse(dos); - } - - stopWatch.stop(); - final String flowFileDescription = flowFilesReceived.size() < 20 ? flowFilesReceived.toString() : flowFilesReceived.size() + " FlowFiles"; - final String uploadDataRate = stopWatch.calculateDataRate(bytesReceived); - final long uploadMillis = stopWatch.getDuration(TimeUnit.MILLISECONDS); - final String dataSize = FormatUtils.formatDataSize(bytesReceived); - logger.info("{} Successfully receveied {} ({}) from {} in {} milliseconds at a rate of {}", new Object[] { - this, flowFileDescription, dataSize, peer, uploadMillis, uploadDataRate}); - } - - @Override - public void transferFlowFiles(final Peer peer, final ProcessContext context, final ProcessSession session, final FlowFileCodec codec) throws IOException, ProtocolException { - if ( !handshakeComplete ) { - throw new IllegalStateException("Handshake has not been performed"); - } - if ( !readyForFileTransfer ) { - throw new IllegalStateException("Cannot transfer files; handshake resolution was " + handshakeResponse); - } - - FlowFile flowFile = session.get(); - if ( flowFile == null ) { - return; - } - - logger.debug("{} Sending FlowFiles to {}", this, peer); - final CommunicationsSession commsSession = peer.getCommunicationsSession(); - final DataInputStream dis = new DataInputStream(commsSession.getInput().getInputStream()); - final DataOutputStream dos = new DataOutputStream(commsSession.getOutput().getOutputStream()); - String userDn = commsSession.getUserDn(); - if ( userDn == null ) { - userDn = "none"; - } - - // Indicate that we would like to have some data - RequestType.SEND_FLOWFILES.writeRequestType(dos); - dos.flush(); - - final StopWatch stopWatch = new StopWatch(true); - final CRC32 crc = new CRC32(); - - long bytesSent = 0L; - final Set<FlowFile> flowFilesSent = new HashSet<>(); - boolean continueTransaction = true; - String calculatedCRC = ""; - final long startSendingNanos = System.nanoTime(); - while (continueTransaction) { - final OutputStream flowFileOutputStream = useCompression ? new CompressionOutputStream(dos) : dos; - logger.debug("{} Sending {} to {}", this, flowFile, peer); - - final CheckedOutputStream checkedOutStream = new CheckedOutputStream(flowFileOutputStream, crc); - - final long startNanos = System.nanoTime(); - flowFile = codec.encode(flowFile, session, checkedOutStream); - final long transferNanos = System.nanoTime() - startNanos; - final long transferMillis = TimeUnit.MILLISECONDS.convert(transferNanos, TimeUnit.NANOSECONDS); - - // need to close the CompressionOutputStream in order to force it write out any remaining bytes. - // Otherwise, do NOT close it because we don't want to close the underlying stream - // (CompressionOutputStream will not close the underlying stream when it's closed) - if ( useCompression ) { - checkedOutStream.close(); - } - - flowFilesSent.add(flowFile); - bytesSent += flowFile.getSize(); - logger.debug("{} Sent {} to {}", this, flowFile, peer); - - final String transitUri = (transitUriPrefix == null) ? peer.getUrl() : transitUriPrefix + flowFile.getAttribute(CoreAttributes.UUID.key()); - session.getProvenanceReporter().send(flowFile, transitUri, "Remote Host=" + peer.getHost() + ", Remote DN=" + userDn, transferMillis, false); - session.remove(flowFile); - - final long sendingNanos = System.nanoTime() - startSendingNanos; - if ( sendingNanos < BATCH_SEND_NANOS ) { - flowFile = session.get(); - } else { - flowFile = null; - } - - continueTransaction = (flowFile != null); - if ( continueTransaction ) { - logger.debug("{} Sent CONTINUE_TRANSACTION indicator to {}", this, peer); - ResponseCode.CONTINUE_TRANSACTION.writeResponse(dos); - } else { - logger.debug("{} Sent FINISH_TRANSACTION indicator to {}", this, peer); - ResponseCode.FINISH_TRANSACTION.writeResponse(dos); - - calculatedCRC = String.valueOf( checkedOutStream.getChecksum().getValue() ); - } - } - - // we've sent a FINISH_TRANSACTION. Now we'll wait for the peer to send a 'Confirm Transaction' response - final Response transactionConfirmationResponse = Response.read(dis); - if ( transactionConfirmationResponse.getCode() == ResponseCode.CONFIRM_TRANSACTION ) { - // Confirm checksum and echo back the confirmation. - logger.trace("{} Received {} from {}", this, transactionConfirmationResponse, peer); - final String receivedCRC = transactionConfirmationResponse.getMessage(); - - if ( versionNegotiator.getVersion() > 3 ) { - if ( !receivedCRC.equals(calculatedCRC) ) { - ResponseCode.BAD_CHECKSUM.writeResponse(dos); - session.rollback(); - throw new IOException(this + " Sent data to peer " + peer + " but calculated CRC32 Checksum as " + calculatedCRC + " while peer calculated CRC32 Checksum as " + receivedCRC + "; canceling transaction and rolling back session"); - } - } - - ResponseCode.CONFIRM_TRANSACTION.writeResponse(dos, ""); - } else { - throw new ProtocolException("Expected to receive 'Confirm Transaction' response from peer " + peer + " but received " + transactionConfirmationResponse); - } - - final String flowFileDescription = (flowFilesSent.size() < 20) ? flowFilesSent.toString() : flowFilesSent.size() + " FlowFiles"; - - final Response transactionResponse; - try { - transactionResponse = Response.read(dis); - } catch (final IOException e) { - logger.error("{} Failed to receive a response from {} when expecting a TransactionFinished Indicator." + - " It is unknown whether or not the peer successfully received/processed the data." + - " Therefore, {} will be rolled back, possibly resulting in data duplication of {}", - this, peer, session, flowFileDescription); - session.rollback(); - throw e; - } - - logger.debug("{} Received {} from {}", this, transactionResponse, peer); - if ( transactionResponse.getCode() == ResponseCode.TRANSACTION_FINISHED_BUT_DESTINATION_FULL ) { - peer.penalize(port.getYieldPeriod(TimeUnit.MILLISECONDS)); - } else if ( transactionResponse.getCode() != ResponseCode.TRANSACTION_FINISHED ) { - throw new ProtocolException("After sending data, expected TRANSACTION_FINISHED response but got " + transactionResponse); - } - - // consume input stream entirely, ignoring its contents. If we - // don't do this, the Connection will not be returned to the pool - stopWatch.stop(); - final String uploadDataRate = stopWatch.calculateDataRate(bytesSent); - final long uploadMillis = stopWatch.getDuration(TimeUnit.MILLISECONDS); - final String dataSize = FormatUtils.formatDataSize(bytesSent); - - session.commit(); - - logger.info("{} Successfully sent {} ({}) to {} in {} milliseconds at a rate of {}", new Object[] { - this, flowFileDescription, dataSize, peer, uploadMillis, uploadDataRate}); - } - - @Override - public VersionNegotiator getVersionNegotiator() { - return versionNegotiator; - } - - @Override - public void shutdown(final Peer peer) throws IOException { - readyForFileTransfer = false; - final CommunicationsSession commsSession = peer.getCommunicationsSession(); - final DataOutputStream dos = new DataOutputStream(commsSession.getOutput().getOutputStream()); - - logger.debug("{} Shutting down with {}", this, peer); - // Indicate that we would like to have some data - RequestType.SHUTDOWN.writeRequestType(dos); - dos.flush(); - } - - @Override - public String getResourceName() { - return "SocketFlowFileProtocol"; - } - - @Override - public String toString() { - return "SocketClientProtocol[CommsID=" + commsIdentifier + "]"; - } -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/05b64593/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketFlowFileServerProtocol.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketFlowFileServerProtocol.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketFlowFileServerProtocol.java index 5edd4f9..eb22b0e 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketFlowFileServerProtocol.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/socket/SocketFlowFileServerProtocol.java @@ -32,7 +32,6 @@ import java.util.zip.CRC32; import java.util.zip.CheckedInputStream; import java.util.zip.CheckedOutputStream; -import org.apache.nifi.cluster.NodeInformant; import org.apache.nifi.connectable.Connection; import org.apache.nifi.connectable.Port; import org.apache.nifi.flowfile.FlowFile; @@ -41,24 +40,27 @@ import org.apache.nifi.groups.ProcessGroup; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.io.InputStreamCallback; import org.apache.nifi.remote.Peer; import org.apache.nifi.remote.PortAuthorizationResult; import org.apache.nifi.remote.RemoteResourceFactory; import org.apache.nifi.remote.RootGroupPort; import org.apache.nifi.remote.StandardVersionNegotiator; import org.apache.nifi.remote.VersionNegotiator; +import org.apache.nifi.remote.cluster.NodeInformant; import org.apache.nifi.remote.codec.FlowFileCodec; import org.apache.nifi.remote.exception.HandshakeException; import org.apache.nifi.remote.exception.ProtocolException; import org.apache.nifi.remote.io.CompressionInputStream; import org.apache.nifi.remote.io.CompressionOutputStream; import org.apache.nifi.remote.protocol.CommunicationsSession; +import org.apache.nifi.remote.protocol.DataPacket; import org.apache.nifi.remote.protocol.RequestType; import org.apache.nifi.remote.protocol.ServerProtocol; +import org.apache.nifi.remote.util.StandardDataPacket; import org.apache.nifi.util.FormatUtils; import org.apache.nifi.util.NiFiProperties; import org.apache.nifi.util.StopWatch; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -76,10 +78,14 @@ public class SocketFlowFileServerProtocol implements ServerProtocol { private FlowFileCodec negotiatedFlowFileCodec = null; private String transitUriPrefix = null; - private final VersionNegotiator versionNegotiator = new StandardVersionNegotiator(4, 3, 2, 1); + private int requestedBatchCount = 0; + private long requestedBatchBytes = 0L; + private long requestedBatchNanos = 0L; + private static final long DEFAULT_BATCH_NANOS = TimeUnit.SECONDS.toNanos(5L); + + private final VersionNegotiator versionNegotiator = new StandardVersionNegotiator(5, 4, 3, 2, 1); private final Logger logger = LoggerFactory.getLogger(SocketFlowFileServerProtocol.class); - private static final long BATCH_NANOS = TimeUnit.SECONDS.toNanos(5L); // send batches of up to 5 seconds @Override @@ -135,68 +141,90 @@ public class SocketFlowFileServerProtocol implements ServerProtocol { throw new HandshakeException("Received unknown property: " + propertyName); } - switch (property) { - case GZIP: { - useGzip = Boolean.parseBoolean(value); - break; - } - case REQUEST_EXPIRATION_MILLIS: - requestExpirationMillis = Long.parseLong(value); - break; - case PORT_IDENTIFIER: { - Port receivedPort = rootGroup.getInputPort(value); - if ( receivedPort == null ) { - receivedPort = rootGroup.getOutputPort(value); - } - if ( receivedPort == null ) { - logger.debug("Responding with ResponseCode UNKNOWN_PORT for identifier {}", value); - ResponseCode.UNKNOWN_PORT.writeResponse(dos); - throw new HandshakeException("Received unknown port identifier: " + value); - } - if ( !(receivedPort instanceof RootGroupPort) ) { - logger.debug("Responding with ResponseCode UNKNOWN_PORT for identifier {}", value); - ResponseCode.UNKNOWN_PORT.writeResponse(dos); - throw new HandshakeException("Received port identifier " + value + ", but this Port is not a RootGroupPort"); - } - - this.port = (RootGroupPort) receivedPort; - final PortAuthorizationResult portAuthResult = this.port.checkUserAuthorization(peer.getCommunicationsSession().getUserDn()); - if ( !portAuthResult.isAuthorized() ) { - logger.debug("Responding with ResponseCode UNAUTHORIZED: ", portAuthResult.getExplanation()); - ResponseCode.UNAUTHORIZED.writeResponse(dos, portAuthResult.getExplanation()); - responseWritten = true; + try { + switch (property) { + case GZIP: { + useGzip = Boolean.parseBoolean(value); break; } - - if ( !receivedPort.isValid() ) { - logger.debug("Responding with ResponseCode PORT_NOT_IN_VALID_STATE for {}", receivedPort); - ResponseCode.PORT_NOT_IN_VALID_STATE.writeResponse(dos, "Port is not valid"); - responseWritten = true; + case REQUEST_EXPIRATION_MILLIS: + requestExpirationMillis = Long.parseLong(value); break; - } - - if ( !receivedPort.isRunning() ) { - logger.debug("Responding with ResponseCode PORT_NOT_IN_VALID_STATE for {}", receivedPort); - ResponseCode.PORT_NOT_IN_VALID_STATE.writeResponse(dos, "Port not running"); - responseWritten = true; + case BATCH_COUNT: + requestedBatchCount = Integer.parseInt(value); + if ( requestedBatchCount < 0 ) { + throw new HandshakeException("Cannot request Batch Count less than 1; requested value: " + value); + } break; - } - - // PORTS_DESTINATION_FULL was introduced in version 2. If version 1, just ignore this - // we we will simply not service the request but the sender will timeout - if ( getVersionNegotiator().getVersion() > 1 ) { - for ( final Connection connection : port.getConnections() ) { - if ( connection.getFlowFileQueue().isFull() ) { - logger.debug("Responding with ResponseCode PORTS_DESTINATION_FULL for {}", receivedPort); - ResponseCode.PORTS_DESTINATION_FULL.writeResponse(dos); - responseWritten = true; - break; + case BATCH_SIZE: + requestedBatchBytes = Long.parseLong(value); + if ( requestedBatchBytes < 0 ) { + throw new HandshakeException("Cannot request Batch Size less than 1; requested value: " + value); + } + break; + case BATCH_DURATION: + requestedBatchNanos = TimeUnit.MILLISECONDS.toNanos(Long.parseLong(value)); + if ( requestedBatchNanos < 0 ) { + throw new HandshakeException("Cannot request Batch Duration less than 1; requested value: " + value); + } + break; + case PORT_IDENTIFIER: { + Port receivedPort = rootGroup.getInputPort(value); + if ( receivedPort == null ) { + receivedPort = rootGroup.getOutputPort(value); + } + if ( receivedPort == null ) { + logger.debug("Responding with ResponseCode UNKNOWN_PORT for identifier {}", value); + ResponseCode.UNKNOWN_PORT.writeResponse(dos); + throw new HandshakeException("Received unknown port identifier: " + value); + } + if ( !(receivedPort instanceof RootGroupPort) ) { + logger.debug("Responding with ResponseCode UNKNOWN_PORT for identifier {}", value); + ResponseCode.UNKNOWN_PORT.writeResponse(dos); + throw new HandshakeException("Received port identifier " + value + ", but this Port is not a RootGroupPort"); + } + + this.port = (RootGroupPort) receivedPort; + final PortAuthorizationResult portAuthResult = this.port.checkUserAuthorization(peer.getCommunicationsSession().getUserDn()); + if ( !portAuthResult.isAuthorized() ) { + logger.debug("Responding with ResponseCode UNAUTHORIZED: ", portAuthResult.getExplanation()); + ResponseCode.UNAUTHORIZED.writeResponse(dos, portAuthResult.getExplanation()); + responseWritten = true; + break; + } + + if ( !receivedPort.isValid() ) { + logger.debug("Responding with ResponseCode PORT_NOT_IN_VALID_STATE for {}", receivedPort); + ResponseCode.PORT_NOT_IN_VALID_STATE.writeResponse(dos, "Port is not valid"); + responseWritten = true; + break; + } + + if ( !receivedPort.isRunning() ) { + logger.debug("Responding with ResponseCode PORT_NOT_IN_VALID_STATE for {}", receivedPort); + ResponseCode.PORT_NOT_IN_VALID_STATE.writeResponse(dos, "Port not running"); + responseWritten = true; + break; + } + + // PORTS_DESTINATION_FULL was introduced in version 2. If version 1, just ignore this + // we we will simply not service the request but the sender will timeout + if ( getVersionNegotiator().getVersion() > 1 ) { + for ( final Connection connection : port.getConnections() ) { + if ( connection.getFlowFileQueue().isFull() ) { + logger.debug("Responding with ResponseCode PORTS_DESTINATION_FULL for {}", receivedPort); + ResponseCode.PORTS_DESTINATION_FULL.writeResponse(dos); + responseWritten = true; + break; + } } } + + break; } - - break; } + } catch (final NumberFormatException nfe) { + throw new HandshakeException("Received invalid value for property '" + property + "'; invalid value: " + value); } } @@ -205,11 +233,6 @@ public class SocketFlowFileServerProtocol implements ServerProtocol { ResponseCode.MISSING_PROPERTY.writeResponse(dos, HandshakeProperty.GZIP.name()); throw new HandshakeException("Missing Property " + HandshakeProperty.GZIP.name()); } - if ( port == null ) { - logger.debug("Responding with ResponseCode MISSING_PROPERTY because Port Identifier property is missing"); - ResponseCode.MISSING_PROPERTY.writeResponse(dos, HandshakeProperty.PORT_IDENTIFIER.name()); - throw new HandshakeException("Missing Property " + HandshakeProperty.PORT_IDENTIFIER.name()); - } // send "OK" response if ( !responseWritten ) { @@ -244,6 +267,10 @@ public class SocketFlowFileServerProtocol implements ServerProtocol { final DataInputStream dis = new DataInputStream(commsSession.getInput().getInputStream()); final DataOutputStream dos = new DataOutputStream(commsSession.getOutput().getOutputStream()); + if ( port == null ) { + RemoteResourceFactory.rejectCodecNegotiation(dis, dos, "Cannot transfer FlowFiles because no port was specified"); + } + // Negotiate the FlowFileCodec to use. try { negotiatedFlowFileCodec = RemoteResourceFactory.receiveCodecNegotiation(dis, dos); @@ -306,7 +333,16 @@ public class SocketFlowFileServerProtocol implements ServerProtocol { final CheckedOutputStream checkedOutputStream = new CheckedOutputStream(flowFileOutputStream, crc); final StopWatch transferWatch = new StopWatch(true); - flowFile = codec.encode(flowFile, session, checkedOutputStream); + + final FlowFile toSend = flowFile; + session.read(flowFile, new InputStreamCallback() { + @Override + public void process(final InputStream in) throws IOException { + final DataPacket dataPacket = new StandardDataPacket(toSend.getAttributes(), in, toSend.getSize()); + codec.encode(dataPacket, checkedOutputStream); + } + }); + final long transmissionMillis = transferWatch.getElapsed(TimeUnit.MILLISECONDS); // need to close the CompressionOutputStream in order to force it write out any remaining bytes. @@ -323,8 +359,25 @@ public class SocketFlowFileServerProtocol implements ServerProtocol { session.getProvenanceReporter().send(flowFile, transitUri, "Remote Host=" + peer.getHost() + ", Remote DN=" + remoteDn, transmissionMillis, false); session.remove(flowFile); + // determine if we should check for more data on queue. final long sendingNanos = System.nanoTime() - startNanos; - if ( sendingNanos < BATCH_NANOS ) { + boolean poll = true; + if ( sendingNanos >= requestedBatchNanos && requestedBatchNanos > 0L ) { + poll = false; + } + if ( bytesSent >= requestedBatchBytes && requestedBatchBytes > 0L ) { + poll = false; + } + if ( flowFilesSent.size() >= requestedBatchCount && requestedBatchCount > 0 ) { + poll = false; + } + + if ( requestedBatchNanos == 0 && requestedBatchBytes == 0 && requestedBatchCount == 0 ) { + poll = (sendingNanos < DEFAULT_BATCH_NANOS); + } + + if ( poll ) { + // we've not elapsed the requested sending duration, so get more data. flowFile = session.get(); } else { flowFile = null; @@ -429,7 +482,11 @@ public class SocketFlowFileServerProtocol implements ServerProtocol { final InputStream flowFileInputStream = useGzip ? new CompressionInputStream(dis) : dis; final CheckedInputStream checkedInputStream = new CheckedInputStream(flowFileInputStream, crc); - FlowFile flowFile = codec.decode(checkedInputStream, session); + final DataPacket dataPacket = codec.decode(checkedInputStream); + FlowFile flowFile = session.create(); + flowFile = session.importFrom(dataPacket.getData(), flowFile); + flowFile = session.putAllAttributes(flowFile, dataPacket.getAttributes()); + final long transferNanos = System.nanoTime() - startNanos; final long transferMillis = TimeUnit.MILLISECONDS.convert(transferNanos, TimeUnit.NANOSECONDS); final String sourceSystemFlowFileUuid = flowFile.getAttribute(CoreAttributes.UUID.key()); @@ -451,6 +508,10 @@ public class SocketFlowFileServerProtocol implements ServerProtocol { continueTransaction = false; calculatedCRC = String.valueOf(checkedInputStream.getChecksum().getValue()); break; + case CANCEL_TRANSACTION: + logger.info("{} Received CancelTransaction indicator from {} with explanation {}", this, peer, transactionResponse.getMessage()); + session.rollback(); + return 0; default: throw new ProtocolException("Received unexpected response from peer: when expecting Continue Transaction or Finish Transaction, received" + transactionResponse); } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/05b64593/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/TestStandardRemoteGroupPort.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/TestStandardRemoteGroupPort.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/TestStandardRemoteGroupPort.java deleted file mode 100644 index e074010..0000000 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/test/java/org/apache/nifi/remote/TestStandardRemoteGroupPort.java +++ /dev/null @@ -1,97 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.remote; - -import org.apache.nifi.remote.StandardRemoteGroupPort; -import org.apache.nifi.remote.PeerStatus; -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; - -import org.apache.nifi.cluster.ClusterNodeInformation; -import org.apache.nifi.cluster.NodeInformation; -import org.apache.nifi.connectable.ConnectableType; - -import org.junit.Test; - -public class TestStandardRemoteGroupPort { - - @Test - public void testFormulateDestinationListForOutput() throws IOException { - final ClusterNodeInformation clusterNodeInfo = new ClusterNodeInformation(); - final List<NodeInformation> collection = new ArrayList<>(); - collection.add(new NodeInformation("ShouldGetMedium", 1, 1111, true, 4096)); - collection.add(new NodeInformation("ShouldGetLots", 2, 2222, true, 10240)); - collection.add(new NodeInformation("ShouldGetLittle", 3, 3333, true, 1024)); - collection.add(new NodeInformation("ShouldGetMedium", 4, 4444, true, 4096)); - collection.add(new NodeInformation("ShouldGetMedium", 5, 5555, true, 4096)); - - clusterNodeInfo.setNodeInformation(collection); - final List<PeerStatus> destinations = StandardRemoteGroupPort.formulateDestinationList(clusterNodeInfo, ConnectableType.REMOTE_OUTPUT_PORT); - for ( final PeerStatus peerStatus : destinations ) { - System.out.println(peerStatus.getHostname() + ":" + peerStatus.getPort()); - } - } - - @Test - public void testFormulateDestinationListForOutputHugeDifference() throws IOException { - final ClusterNodeInformation clusterNodeInfo = new ClusterNodeInformation(); - final List<NodeInformation> collection = new ArrayList<>(); - collection.add(new NodeInformation("ShouldGetLittle", 1, 1111, true, 500)); - collection.add(new NodeInformation("ShouldGetLots", 2, 2222, true, 50000)); - - clusterNodeInfo.setNodeInformation(collection); - final List<PeerStatus> destinations = StandardRemoteGroupPort.formulateDestinationList(clusterNodeInfo, ConnectableType.REMOTE_OUTPUT_PORT); - for ( final PeerStatus peerStatus : destinations ) { - System.out.println(peerStatus.getHostname() + ":" + peerStatus.getPort()); - } - } - - - - - @Test - public void testFormulateDestinationListForInputPorts() throws IOException { - final ClusterNodeInformation clusterNodeInfo = new ClusterNodeInformation(); - final List<NodeInformation> collection = new ArrayList<>(); - collection.add(new NodeInformation("ShouldGetMedium", 1, 1111, true, 4096)); - collection.add(new NodeInformation("ShouldGetLittle", 2, 2222, true, 10240)); - collection.add(new NodeInformation("ShouldGetLots", 3, 3333, true, 1024)); - collection.add(new NodeInformation("ShouldGetMedium", 4, 4444, true, 4096)); - collection.add(new NodeInformation("ShouldGetMedium", 5, 5555, true, 4096)); - - clusterNodeInfo.setNodeInformation(collection); - final List<PeerStatus> destinations = StandardRemoteGroupPort.formulateDestinationList(clusterNodeInfo, ConnectableType.REMOTE_INPUT_PORT); - for ( final PeerStatus peerStatus : destinations ) { - System.out.println(peerStatus.getHostname() + ":" + peerStatus.getPort()); - } - } - - @Test - public void testFormulateDestinationListForInputPortsHugeDifference() throws IOException { - final ClusterNodeInformation clusterNodeInfo = new ClusterNodeInformation(); - final List<NodeInformation> collection = new ArrayList<>(); - collection.add(new NodeInformation("ShouldGetLots", 1, 1111, true, 500)); - collection.add(new NodeInformation("ShouldGetLittle", 2, 2222, true, 50000)); - - clusterNodeInfo.setNodeInformation(collection); - final List<PeerStatus> destinations = StandardRemoteGroupPort.formulateDestinationList(clusterNodeInfo, ConnectableType.REMOTE_INPUT_PORT); - for ( final PeerStatus peerStatus : destinations ) { - System.out.println(peerStatus.getHostname() + ":" + peerStatus.getPort()); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/05b64593/nifi/pom.xml ---------------------------------------------------------------------- diff --git a/nifi/pom.xml b/nifi/pom.xml index 8eda682..eb395c9 100644 --- a/nifi/pom.xml +++ b/nifi/pom.xml @@ -624,6 +624,11 @@ <artifactId>nifi-utils</artifactId> <version>0.0.2-incubating-SNAPSHOT</version> </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-site-to-site-client</artifactId> + <version>0.0.2-incubating-SNAPSHOT</version> + </dependency> <dependency> <groupId>org.apache.nifi</groupId> <artifactId>nifi-web-utils</artifactId>