http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/05b64593/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelOutput.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelOutput.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelOutput.java new file mode 100644 index 0000000..dc3d68f --- /dev/null +++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/io/socket/ssl/SSLSocketChannelOutput.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.remote.io.socket.ssl; + +import java.io.IOException; +import java.io.OutputStream; + +import org.apache.nifi.stream.io.BufferedOutputStream; +import org.apache.nifi.stream.io.ByteCountingOutputStream; +import org.apache.nifi.remote.protocol.CommunicationsOutput; + +public class SSLSocketChannelOutput implements CommunicationsOutput { + private final OutputStream out; + private final ByteCountingOutputStream countingOut; + + public SSLSocketChannelOutput(final SSLSocketChannel channel) { + countingOut = new ByteCountingOutputStream(new SSLSocketChannelOutputStream(channel)); + out = new BufferedOutputStream(countingOut); + } + + @Override + public OutputStream getOutputStream() throws IOException { + return out; + } + + @Override + public long getBytesWritten() { + return countingOut.getBytesWritten(); + } +}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/05b64593/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/ClientProtocol.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/ClientProtocol.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/ClientProtocol.java new file mode 100644 index 0000000..befbdaa --- /dev/null +++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/ClientProtocol.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.remote.protocol; + +import java.io.IOException; +import java.util.Set; + +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.remote.Peer; +import org.apache.nifi.remote.PeerStatus; +import org.apache.nifi.remote.Transaction; +import org.apache.nifi.remote.TransferDirection; +import org.apache.nifi.remote.VersionedRemoteResource; +import org.apache.nifi.remote.codec.FlowFileCodec; +import org.apache.nifi.remote.exception.HandshakeException; +import org.apache.nifi.remote.exception.PortNotRunningException; +import org.apache.nifi.remote.exception.ProtocolException; +import org.apache.nifi.remote.exception.UnknownPortException; + +public interface ClientProtocol extends VersionedRemoteResource { + + void handshake(Peer peer) throws IOException, HandshakeException, UnknownPortException, PortNotRunningException; + + Set<PeerStatus> getPeerStatuses(Peer currentPeer) throws IOException, ProtocolException; + + FlowFileCodec negotiateCodec(Peer peer) throws IOException, ProtocolException; + + void receiveFlowFiles(Peer peer, ProcessContext context, ProcessSession session, FlowFileCodec codec) throws IOException, ProtocolException; + + void transferFlowFiles(Peer peer, ProcessContext context, ProcessSession session, FlowFileCodec codec) throws IOException, ProtocolException; + + void shutdown(Peer peer) throws IOException, ProtocolException; + + boolean isReadyForFileTransfer(); + + + + + Transaction startTransaction(Peer peer, FlowFileCodec codec, TransferDirection direction) throws IOException; + + + /** + * returns <code>true</code> if remote instance indicates that the port is + * invalid + * + * @return + * @throws IllegalStateException if a handshake has not successfully + * completed + */ + boolean isPortInvalid() throws IllegalStateException; + + /** + * returns <code>true</code> if remote instance indicates that the port is + * unknown + * + * @return + * @throws IllegalStateException if a handshake has not successfully + * completed + */ + boolean isPortUnknown(); + + /** + * returns <code>true</code> if remote instance indicates that the port's + * destination is full + * + * @return + * @throws IllegalStateException if a handshake has not successfully + * completed + */ + boolean isDestinationFull(); +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/05b64593/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/CommunicationsInput.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/CommunicationsInput.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/CommunicationsInput.java new file mode 100644 index 0000000..d2e2946 --- /dev/null +++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/CommunicationsInput.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.remote.protocol; + +import java.io.IOException; +import java.io.InputStream; + +public interface CommunicationsInput { + + InputStream getInputStream() throws IOException; + + long getBytesRead(); +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/05b64593/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/CommunicationsOutput.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/CommunicationsOutput.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/CommunicationsOutput.java new file mode 100644 index 0000000..95cab29 --- /dev/null +++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/CommunicationsOutput.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.remote.protocol; + +import java.io.IOException; +import java.io.OutputStream; + +public interface CommunicationsOutput { + + OutputStream getOutputStream() throws IOException; + + long getBytesWritten(); +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/05b64593/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/CommunicationsSession.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/CommunicationsSession.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/CommunicationsSession.java new file mode 100644 index 0000000..d009cec --- /dev/null +++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/CommunicationsSession.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.remote.protocol; + +import java.io.Closeable; +import java.io.IOException; + +public interface CommunicationsSession extends Closeable { + + public static final byte[] MAGIC_BYTES = {(byte) 'N', (byte) 'i', (byte) 'F', (byte) 'i'}; + + CommunicationsInput getInput(); + + CommunicationsOutput getOutput(); + + void setTimeout(int millis) throws IOException; + + int getTimeout() throws IOException; + + void setUri(String uri); + + String getUri(); + + String getUserDn(); + + void setUserDn(String dn); + + boolean isDataAvailable(); + + long getBytesWritten(); + + long getBytesRead(); + + /** + * Asynchronously interrupts this FlowFileCodec. Implementations must ensure + * that they stop sending and receiving data as soon as possible after this + * method has been called, even if doing so results in sending only partial + * data to the peer. This will usually result in the peer throwing a + * SocketTimeoutException. + */ + void interrupt(); + + /** + * Returns <code>true</code> if the connection is closed, <code>false</code> + * otherwise. + * + * @return + */ + boolean isClosed(); +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/05b64593/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/DataPacket.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/DataPacket.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/DataPacket.java new file mode 100644 index 0000000..f4fa4d0 --- /dev/null +++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/DataPacket.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.remote.protocol; + +import java.io.InputStream; +import java.util.Map; + +public interface DataPacket { + + Map<String, String> getAttributes(); + + InputStream getData(); + + long getSize(); +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/05b64593/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/RequestType.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/RequestType.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/RequestType.java new file mode 100644 index 0000000..41334fe --- /dev/null +++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/RequestType.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.remote.protocol; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; + +public enum RequestType { + + NEGOTIATE_FLOWFILE_CODEC, + REQUEST_PEER_LIST, + SEND_FLOWFILES, + RECEIVE_FLOWFILES, + SHUTDOWN; + + public void writeRequestType(final DataOutputStream dos) throws IOException { + dos.writeUTF(name()); + } + + public static RequestType readRequestType(final DataInputStream dis) throws IOException { + final String requestTypeVal = dis.readUTF(); + try { + return RequestType.valueOf(requestTypeVal); + } catch (final Exception e) { + throw new IOException("Could not determine RequestType: received invalid value " + requestTypeVal); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/05b64593/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/HandshakeProperty.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/HandshakeProperty.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/HandshakeProperty.java new file mode 100644 index 0000000..41dc276 --- /dev/null +++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/HandshakeProperty.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.remote.protocol.socket; + + +/** + * Enumeration of Properties that can be used for the Site-to-Site Socket Protocol. + */ +public enum HandshakeProperty { + /** + * Boolean value indicating whether or not the contents of a FlowFile should be + * GZipped when transferred. + */ + GZIP, + + /** + * The unique identifier of the port to communicate with + */ + PORT_IDENTIFIER, + + /** + * Indicates the number of milliseconds after the request was made that the client + * will wait for a response. If no response has been received by the time this value + * expires, the server can move on without attempting to service the request because + * the client will have already disconnected. + */ + REQUEST_EXPIRATION_MILLIS, + + /** + * The preferred number of FlowFiles that the server should send to the client + * when pulling data. This property was introduced in version 5 of the protocol. + */ + BATCH_COUNT, + + /** + * The preferred number of bytes that the server should send to the client when + * pulling data. This property was introduced in version 5 of the protocol. + */ + BATCH_SIZE, + + /** + * The preferred amount of time that the server should send data to the client + * when pulling data. This property was introduced in version 5 of the protocol. + * Value is in milliseconds. + */ + BATCH_DURATION; +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/05b64593/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/Response.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/Response.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/Response.java new file mode 100644 index 0000000..eae1940 --- /dev/null +++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/Response.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.remote.protocol.socket; + +import java.io.DataInputStream; +import java.io.IOException; + +import org.apache.nifi.remote.exception.ProtocolException; + +public class Response { + private final ResponseCode code; + private final String message; + + private Response(final ResponseCode code, final String explanation) { + this.code = code; + this.message = explanation; + } + + public ResponseCode getCode() { + return code; + } + + public String getMessage() { + return message; + } + + public static Response read(final DataInputStream in) throws IOException, ProtocolException { + final ResponseCode code = ResponseCode.readCode(in); + final String message = code.containsMessage() ? in.readUTF() : null; + return new Response(code, message); + } + + @Override + public String toString() { + return code + ": " + message; + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/05b64593/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/ResponseCode.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/ResponseCode.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/ResponseCode.java new file mode 100644 index 0000000..8860e73 --- /dev/null +++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/ResponseCode.java @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.remote.protocol.socket; + +import java.io.DataOutputStream; +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; + +import org.apache.nifi.remote.exception.ProtocolException; + + +public enum ResponseCode { + RESERVED(0, "Reserved for Future Use", false), // This will likely be used if we ever need to expand the length of + // ResponseCode, so that we can indicate a 0 followed by some other bytes + + // handshaking properties + PROPERTIES_OK(1, "Properties OK", false), + UNKNOWN_PROPERTY_NAME(230, "Unknown Property Name", true), + ILLEGAL_PROPERTY_VALUE(231, "Illegal Property Value", true), + MISSING_PROPERTY(232, "Missing Property", true), + + // transaction indicators + CONTINUE_TRANSACTION(10, "Continue Transaction", false), + FINISH_TRANSACTION(11, "Finish Transaction", false), + CONFIRM_TRANSACTION(12, "Confirm Transaction", true), // "Explanation" of this code is the checksum + TRANSACTION_FINISHED(13, "Transaction Finished", false), + TRANSACTION_FINISHED_BUT_DESTINATION_FULL(14, "Transaction Finished But Destination is Full", false), + CANCEL_TRANSACTION(15, "Cancel Transaction", true), + BAD_CHECKSUM(19, "Bad Checksum", false), + + // data availability indicators + MORE_DATA(20, "More Data Exists", false), + NO_MORE_DATA(21, "No More Data Exists", false), + + // port state indicators + UNKNOWN_PORT(200, "Unknown Port", false), + PORT_NOT_IN_VALID_STATE(201, "Port Not in a Valid State", true), + PORTS_DESTINATION_FULL(202, "Port's Destination is Full", false), + + // authorization + UNAUTHORIZED(240, "User Not Authorized", true), + + // error indicators + ABORT(250, "Abort", true), + UNRECOGNIZED_RESPONSE_CODE(254, "Unrecognized Response Code", false), + END_OF_STREAM(255, "End of Stream", false); + + private static final ResponseCode[] codeArray = new ResponseCode[256]; + + static { + for ( final ResponseCode responseCode : ResponseCode.values() ) { + codeArray[responseCode.getCode()] = responseCode; + } + } + + private static final byte CODE_SEQUENCE_VALUE_1 = (byte) 'R'; + private static final byte CODE_SEQUENCE_VALUE_2 = (byte) 'C'; + private final int code; + private final byte[] codeSequence; + private final String description; + private final boolean containsMessage; + + private ResponseCode(final int code, final String description, final boolean containsMessage) { + this.codeSequence = new byte[] {CODE_SEQUENCE_VALUE_1, CODE_SEQUENCE_VALUE_2, (byte) code}; + this.code = code; + this.description = description; + this.containsMessage = containsMessage; + } + + public int getCode() { + return code; + } + + public byte[] getCodeSequence() { + return codeSequence; + } + + @Override + public String toString() { + return description; + } + + public boolean containsMessage() { + return containsMessage; + } + + public void writeResponse(final DataOutputStream out) throws IOException { + if ( containsMessage() ) { + throw new IllegalArgumentException("ResponseCode " + code + " expects an explanation"); + } + + out.write(getCodeSequence()); + out.flush(); + } + + public void writeResponse(final DataOutputStream out, final String explanation) throws IOException { + if ( !containsMessage() ) { + throw new IllegalArgumentException("ResponseCode " + code + " does not expect an explanation"); + } + + out.write(getCodeSequence()); + out.writeUTF(explanation); + out.flush(); + } + + static ResponseCode readCode(final InputStream in) throws IOException, ProtocolException { + final int byte1 = in.read(); + if ( byte1 < 0 ) { + throw new EOFException(); + } else if ( byte1 != CODE_SEQUENCE_VALUE_1 ) { + throw new ProtocolException("Expected to receive ResponseCode, but the stream did not have a ResponseCode"); + } + + final int byte2 = in.read(); + if ( byte2 < 0 ) { + throw new EOFException(); + } else if ( byte2 != CODE_SEQUENCE_VALUE_2 ) { + throw new ProtocolException("Expected to receive ResponseCode, but the stream did not have a ResponseCode"); + } + + final int byte3 = in.read(); + if ( byte3 < 0 ) { + throw new EOFException(); + } + + final ResponseCode responseCode = codeArray[byte3]; + if (responseCode == null) { + throw new ProtocolException("Received Response Code of " + byte3 + " but do not recognize this code"); + } + return responseCode; + } + + public static ResponseCode fromSequence(final byte[] value) { + final int code = value[3] & 0xFF; + final ResponseCode responseCode = codeArray[code]; + return (responseCode == null) ? UNRECOGNIZED_RESPONSE_CODE : responseCode; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/05b64593/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java new file mode 100644 index 0000000..5f194f8 --- /dev/null +++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientProtocol.java @@ -0,0 +1,437 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.remote.protocol.socket; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.io.InputStreamCallback; +import org.apache.nifi.remote.Peer; +import org.apache.nifi.remote.PeerStatus; +import org.apache.nifi.remote.RemoteDestination; +import org.apache.nifi.remote.RemoteResourceInitiator; +import org.apache.nifi.remote.StandardVersionNegotiator; +import org.apache.nifi.remote.Transaction; +import org.apache.nifi.remote.TransferDirection; +import org.apache.nifi.remote.VersionNegotiator; +import org.apache.nifi.remote.codec.FlowFileCodec; +import org.apache.nifi.remote.codec.StandardFlowFileCodec; +import org.apache.nifi.remote.exception.HandshakeException; +import org.apache.nifi.remote.exception.ProtocolException; +import org.apache.nifi.remote.protocol.ClientProtocol; +import org.apache.nifi.remote.protocol.CommunicationsSession; +import org.apache.nifi.remote.protocol.DataPacket; +import org.apache.nifi.remote.protocol.RequestType; +import org.apache.nifi.remote.util.StandardDataPacket; +import org.apache.nifi.util.FormatUtils; +import org.apache.nifi.util.StopWatch; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class SocketClientProtocol implements ClientProtocol { + private final VersionNegotiator versionNegotiator = new StandardVersionNegotiator(5, 4, 3, 2, 1); + + private RemoteDestination destination; + private boolean useCompression = false; + + private String commsIdentifier; + private boolean handshakeComplete = false; + + private final Logger logger = LoggerFactory.getLogger(SocketClientProtocol.class); + + private Response handshakeResponse = null; + private boolean readyForFileTransfer = false; + private String transitUriPrefix = null; + private int timeoutMillis = 30000; + + private int batchCount; + private long batchSize; + private long batchMillis; + + private static final long BATCH_SEND_NANOS = TimeUnit.SECONDS.toNanos(5L); // send batches of up to 5 seconds + + public SocketClientProtocol() { + } + + public void setPreferredBatchCount(final int count) { + this.batchCount = count; + } + + public void setPreferredBatchSize(final long bytes) { + this.batchSize = bytes; + } + + public void setPreferredBatchDuration(final long millis) { + this.batchMillis = millis; + } + + public void setDestination(final RemoteDestination destination) { + this.destination = destination; + this.useCompression = destination.isUseCompression(); + } + + public void setTimeout(final int timeoutMillis) { + this.timeoutMillis = timeoutMillis; + } + + @Override + public void handshake(final Peer peer) throws IOException, HandshakeException { + handshake(peer, destination.getIdentifier()); + } + + public void handshake(final Peer peer, final String destinationId) throws IOException, HandshakeException { + if ( handshakeComplete ) { + throw new IllegalStateException("Handshake has already been completed"); + } + commsIdentifier = UUID.randomUUID().toString(); + logger.debug("{} handshaking with {}", this, peer); + + final Map<HandshakeProperty, String> properties = new HashMap<>(); + properties.put(HandshakeProperty.GZIP, String.valueOf(useCompression)); + + if ( destinationId != null ) { + properties.put(HandshakeProperty.PORT_IDENTIFIER, destination.getIdentifier()); + } + + properties.put(HandshakeProperty.REQUEST_EXPIRATION_MILLIS, String.valueOf(timeoutMillis) ); + + if ( versionNegotiator.getVersion() >= 5 ) { + if ( batchCount > 0 ) { + properties.put(HandshakeProperty.BATCH_COUNT, String.valueOf(batchCount)); + } + if ( batchSize > 0L ) { + properties.put(HandshakeProperty.BATCH_SIZE, String.valueOf(batchSize)); + } + if ( batchMillis > 0L ) { + properties.put(HandshakeProperty.BATCH_DURATION, String.valueOf(batchMillis)); + } + } + + final CommunicationsSession commsSession = peer.getCommunicationsSession(); + commsSession.setTimeout(timeoutMillis); + final DataInputStream dis = new DataInputStream(commsSession.getInput().getInputStream()); + final DataOutputStream dos = new DataOutputStream(commsSession.getOutput().getOutputStream()); + + dos.writeUTF(commsIdentifier); + + if ( versionNegotiator.getVersion() >= 3 ) { + dos.writeUTF(peer.getUrl()); + transitUriPrefix = peer.getUrl(); + + if ( !transitUriPrefix.endsWith("/") ) { + transitUriPrefix = transitUriPrefix + "/"; + } + } + + dos.writeInt(properties.size()); + for ( final Map.Entry<HandshakeProperty, String> entry : properties.entrySet() ) { + dos.writeUTF(entry.getKey().name()); + dos.writeUTF(entry.getValue()); + } + + dos.flush(); + + try { + handshakeResponse = Response.read(dis); + } catch (final ProtocolException e) { + throw new HandshakeException(e); + } + + switch (handshakeResponse.getCode()) { + case PORT_NOT_IN_VALID_STATE: + case UNKNOWN_PORT: + case PORTS_DESTINATION_FULL: + break; + case PROPERTIES_OK: + readyForFileTransfer = true; + break; + default: + logger.error("{} received unexpected response {} from {} when negotiating Codec", new Object[] { + this, handshakeResponse, peer}); + peer.close(); + throw new HandshakeException("Received unexpected response " + handshakeResponse); + } + + logger.debug("{} Finished handshake with {}", this, peer); + handshakeComplete = true; + } + + public boolean isReadyForFileTransfer() { + return readyForFileTransfer; + } + + public boolean isPortInvalid() { + if ( !handshakeComplete ) { + throw new IllegalStateException("Handshake has not completed successfully"); + } + return handshakeResponse.getCode() == ResponseCode.PORT_NOT_IN_VALID_STATE; + } + + public boolean isPortUnknown() { + if ( !handshakeComplete ) { + throw new IllegalStateException("Handshake has not completed successfully"); + } + return handshakeResponse.getCode() == ResponseCode.UNKNOWN_PORT; + } + + public boolean isDestinationFull() { + if ( !handshakeComplete ) { + throw new IllegalStateException("Handshake has not completed successfully"); + } + return handshakeResponse.getCode() == ResponseCode.PORTS_DESTINATION_FULL; + } + + @Override + public Set<PeerStatus> getPeerStatuses(final Peer peer) throws IOException { + if ( !handshakeComplete ) { + throw new IllegalStateException("Handshake has not been performed"); + } + + logger.debug("{} Get Peer Statuses from {}", this, peer); + final CommunicationsSession commsSession = peer.getCommunicationsSession(); + final DataInputStream dis = new DataInputStream(commsSession.getInput().getInputStream()); + final DataOutputStream dos = new DataOutputStream(commsSession.getOutput().getOutputStream()); + + RequestType.REQUEST_PEER_LIST.writeRequestType(dos); + dos.flush(); + final int numPeers = dis.readInt(); + final Set<PeerStatus> peers = new HashSet<>(numPeers); + for (int i=0; i < numPeers; i++) { + final String hostname = dis.readUTF(); + final int port = dis.readInt(); + final boolean secure = dis.readBoolean(); + final int flowFileCount = dis.readInt(); + peers.add(new PeerStatus(hostname, port, secure, flowFileCount)); + } + + logger.debug("{} Received {} Peer Statuses from {}", this, peers.size(), peer); + return peers; + } + + @Override + public FlowFileCodec negotiateCodec(final Peer peer) throws IOException, ProtocolException { + if ( !handshakeComplete ) { + throw new IllegalStateException("Handshake has not been performed"); + } + + logger.debug("{} Negotiating Codec with {}", this, peer); + final CommunicationsSession commsSession = peer.getCommunicationsSession(); + final DataInputStream dis = new DataInputStream(commsSession.getInput().getInputStream()); + final DataOutputStream dos = new DataOutputStream(commsSession.getOutput().getOutputStream()); + + RequestType.NEGOTIATE_FLOWFILE_CODEC.writeRequestType(dos); + + FlowFileCodec codec = new StandardFlowFileCodec(); + try { + codec = (FlowFileCodec) RemoteResourceInitiator.initiateResourceNegotiation(codec, dis, dos); + } catch (HandshakeException e) { + throw new ProtocolException(e.toString()); + } + logger.debug("{} negotiated FlowFileCodec {} with {}", new Object[] {this, codec, commsSession}); + + return codec; + } + + + @Override + public Transaction startTransaction(final Peer peer, final FlowFileCodec codec, final TransferDirection direction) throws IOException, ProtocolException { + if ( !handshakeComplete ) { + throw new IllegalStateException("Handshake has not been performed"); + } + if ( !readyForFileTransfer ) { + throw new IllegalStateException("Cannot start transaction; handshake resolution was " + handshakeResponse); + } + + return new SocketClientTransaction(versionNegotiator.getVersion(), peer, codec, + direction, useCompression, (int) destination.getYieldPeriod(TimeUnit.MILLISECONDS)); + } + + + @Override + public void receiveFlowFiles(final Peer peer, final ProcessContext context, final ProcessSession session, final FlowFileCodec codec) throws IOException, ProtocolException { + final String userDn = peer.getCommunicationsSession().getUserDn(); + final Transaction transaction = startTransaction(peer, codec, TransferDirection.RECEIVE); + + final StopWatch stopWatch = new StopWatch(true); + final Set<FlowFile> flowFilesReceived = new HashSet<>(); + long bytesReceived = 0L; + + while (true) { + final long start = System.nanoTime(); + final DataPacket dataPacket = transaction.receive(); + if ( dataPacket == null ) { + if ( flowFilesReceived.isEmpty() ) { + peer.penalize(destination.getYieldPeriod(TimeUnit.MILLISECONDS)); + } + break; + } + + FlowFile flowFile = session.create(); + flowFile = session.putAllAttributes(flowFile, dataPacket.getAttributes()); + flowFile = session.importFrom(dataPacket.getData(), flowFile); + final long receiveNanos = System.nanoTime() - start; + + String sourceFlowFileIdentifier = dataPacket.getAttributes().get(CoreAttributes.UUID.key()); + if ( sourceFlowFileIdentifier == null ) { + sourceFlowFileIdentifier = "<Unknown Identifier>"; + } + + final String transitUri = (transitUriPrefix == null) ? peer.getUrl() : transitUriPrefix + sourceFlowFileIdentifier; + session.getProvenanceReporter().receive(flowFile, transitUri, "urn:nifi:" + sourceFlowFileIdentifier, "Remote Host=" + peer.getHost() + ", Remote DN=" + userDn, TimeUnit.NANOSECONDS.toMillis(receiveNanos)); + + session.transfer(flowFile, Relationship.ANONYMOUS); + bytesReceived += dataPacket.getSize(); + } + + // Confirm that what we received was the correct data. + transaction.confirm(); + + // Commit the session so that we have persisted the data + session.commit(); + + // We want to apply backpressure if the outgoing connections are full. I.e., there are no available relationships. + final boolean applyBackpressure = context.getAvailableRelationships().isEmpty(); + + transaction.complete(applyBackpressure); + logger.debug("{} Sending TRANSACTION_FINISHED_BUT_DESTINATION_FULL to {}", this, peer); + + if ( flowFilesReceived.isEmpty() ) { + return; + } + + stopWatch.stop(); + final String flowFileDescription = flowFilesReceived.size() < 20 ? flowFilesReceived.toString() : flowFilesReceived.size() + " FlowFiles"; + final String uploadDataRate = stopWatch.calculateDataRate(bytesReceived); + final long uploadMillis = stopWatch.getDuration(TimeUnit.MILLISECONDS); + final String dataSize = FormatUtils.formatDataSize(bytesReceived); + logger.info("{} Successfully receveied {} ({}) from {} in {} milliseconds at a rate of {}", new Object[] { + this, flowFileDescription, dataSize, peer, uploadMillis, uploadDataRate }); + } + + + @Override + public void transferFlowFiles(final Peer peer, final ProcessContext context, final ProcessSession session, final FlowFileCodec codec) throws IOException, ProtocolException { + FlowFile flowFile = session.get(); + if (flowFile == null) { + return; + } + + try { + final String userDn = peer.getCommunicationsSession().getUserDn(); + final long startSendingNanos = System.nanoTime(); + final StopWatch stopWatch = new StopWatch(true); + long bytesSent = 0L; + + final Transaction transaction = startTransaction(peer, codec, TransferDirection.SEND); + + final Set<FlowFile> flowFilesSent = new HashSet<>(); + boolean continueTransaction = true; + while (continueTransaction) { + final long startNanos = System.nanoTime(); + // call codec.encode within a session callback so that we have the InputStream to read the FlowFile + final FlowFile toWrap = flowFile; + session.read(flowFile, new InputStreamCallback() { + @Override + public void process(final InputStream in) throws IOException { + final DataPacket dataPacket = new StandardDataPacket(toWrap.getAttributes(), in, toWrap.getSize()); + transaction.send(dataPacket); + } + }); + + final long transferNanos = System.nanoTime() - startNanos; + final long transferMillis = TimeUnit.MILLISECONDS.convert(transferNanos, TimeUnit.NANOSECONDS); + + flowFilesSent.add(flowFile); + bytesSent += flowFile.getSize(); + logger.debug("{} Sent {} to {}", this, flowFile, peer); + + final String transitUri = (transitUriPrefix == null) ? peer.getUrl() : transitUriPrefix + flowFile.getAttribute(CoreAttributes.UUID.key()); + session.getProvenanceReporter().send(flowFile, transitUri, "Remote Host=" + peer.getHost() + ", Remote DN=" + userDn, transferMillis, false); + session.remove(flowFile); + + final long sendingNanos = System.nanoTime() - startSendingNanos; + if ( sendingNanos < BATCH_SEND_NANOS ) { + flowFile = session.get(); + } else { + flowFile = null; + } + + continueTransaction = (flowFile != null); + } + + transaction.confirm(); + + // consume input stream entirely, ignoring its contents. If we + // don't do this, the Connection will not be returned to the pool + stopWatch.stop(); + final String uploadDataRate = stopWatch.calculateDataRate(bytesSent); + final long uploadMillis = stopWatch.getDuration(TimeUnit.MILLISECONDS); + final String dataSize = FormatUtils.formatDataSize(bytesSent); + + session.commit(); + transaction.complete(false); + + final String flowFileDescription = (flowFilesSent.size() < 20) ? flowFilesSent.toString() : flowFilesSent.size() + " FlowFiles"; + logger.info("{} Successfully sent {} ({}) to {} in {} milliseconds at a rate of {}", new Object[] { + this, flowFileDescription, dataSize, peer, uploadMillis, uploadDataRate}); + } catch (final Exception e) { + session.rollback(); + throw e; + } + } + + + @Override + public VersionNegotiator getVersionNegotiator() { + return versionNegotiator; + } + + @Override + public void shutdown(final Peer peer) throws IOException { + readyForFileTransfer = false; + final CommunicationsSession commsSession = peer.getCommunicationsSession(); + final DataOutputStream dos = new DataOutputStream(commsSession.getOutput().getOutputStream()); + + logger.debug("{} Shutting down with {}", this, peer); + // Indicate that we would like to have some data + RequestType.SHUTDOWN.writeRequestType(dos); + dos.flush(); + } + + @Override + public String getResourceName() { + return "SocketFlowFileProtocol"; + } + + @Override + public String toString() { + return "SocketClientProtocol[CommsID=" + commsIdentifier + "]"; + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/05b64593/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientTransaction.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientTransaction.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientTransaction.java new file mode 100644 index 0000000..edb360e --- /dev/null +++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/protocol/socket/SocketClientTransaction.java @@ -0,0 +1,357 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.remote.protocol.socket; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.util.zip.CRC32; +import java.util.zip.CheckedInputStream; +import java.util.zip.CheckedOutputStream; + +import org.apache.nifi.remote.Peer; +import org.apache.nifi.remote.Transaction; +import org.apache.nifi.remote.TransferDirection; +import org.apache.nifi.remote.codec.FlowFileCodec; +import org.apache.nifi.remote.exception.ProtocolException; +import org.apache.nifi.remote.protocol.DataPacket; +import org.apache.nifi.remote.protocol.RequestType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class SocketClientTransaction implements Transaction { + private static final Logger logger = LoggerFactory.getLogger(SocketClientTransaction.class); + + + private final CRC32 crc = new CRC32(); + private final int protocolVersion; + private final FlowFileCodec codec; + private final DataInputStream dis; + private final DataOutputStream dos; + private final TransferDirection direction; + private final boolean compress; + private final Peer peer; + private final int penaltyMillis; + + private boolean dataAvailable = false; + private int transfers = 0; + private TransactionState state; + + SocketClientTransaction(final int protocolVersion, final Peer peer, final FlowFileCodec codec, + final TransferDirection direction, final boolean useCompression, final int penaltyMillis) throws IOException { + this.protocolVersion = protocolVersion; + this.peer = peer; + this.codec = codec; + this.direction = direction; + this.dis = new DataInputStream(peer.getCommunicationsSession().getInput().getInputStream()); + this.dos = new DataOutputStream(peer.getCommunicationsSession().getOutput().getOutputStream()); + this.compress = useCompression; + this.state = TransactionState.TRANSACTION_STARTED; + this.penaltyMillis = penaltyMillis; + + initialize(); + } + + private void initialize() throws IOException { + try { + if ( direction == TransferDirection.RECEIVE ) { + // Indicate that we would like to have some data + RequestType.RECEIVE_FLOWFILES.writeRequestType(dos); + dos.flush(); + + final Response dataAvailableCode = Response.read(dis); + switch (dataAvailableCode.getCode()) { + case MORE_DATA: + logger.debug("{} {} Indicates that data is available", this, peer); + this.dataAvailable = true; + break; + case NO_MORE_DATA: + logger.debug("{} No data available from {}", peer); + this.dataAvailable = false; + return; + default: + throw new ProtocolException("Got unexpected response when asking for data: " + dataAvailableCode); + } + + } else { + // Indicate that we would like to have some data + RequestType.SEND_FLOWFILES.writeRequestType(dos); + dos.flush(); + } + } catch (final Exception e) { + error(); + throw e; + } + } + + + @Override + public DataPacket receive() throws IOException { + try { + if ( state != TransactionState.DATA_EXCHANGED && state != TransactionState.TRANSACTION_STARTED) { + throw new IllegalStateException("Cannot receive data because Transaction State is " + state); + } + + if ( direction == TransferDirection.SEND ) { + throw new IllegalStateException("Attempting to receive data but started a SEND Transaction"); + } + + // if we already know there's no data, just return null + if ( !dataAvailable ) { + return null; + } + + // if we have already received a packet, check if another is available. + if ( transfers > 0 ) { + // Determine if Peer will send us data or has no data to send us + final Response dataAvailableCode = Response.read(dis); + switch (dataAvailableCode.getCode()) { + case CONTINUE_TRANSACTION: + logger.debug("{} {} Indicates Transaction should continue", this, peer); + this.dataAvailable = true; + break; + case FINISH_TRANSACTION: + logger.debug("{} {} Indicates Transaction should finish", peer); + this.dataAvailable = false; + break; + default: + throw new ProtocolException("Got unexpected response when asking for data: " + dataAvailableCode); + } + } + + // if no data available, return null + if ( !dataAvailable ) { + return null; + } + + logger.debug("{} Receiving data from {}", this, peer); + final DataPacket packet = codec.decode(new CheckedInputStream(dis, crc)); + + if ( packet == null ) { + this.dataAvailable = false; + } else { + transfers++; + } + + this.state = TransactionState.DATA_EXCHANGED; + return packet; + } catch (final Exception e) { + error(); + throw e; + } + } + + + @Override + public void send(DataPacket dataPacket) throws IOException { + try { + if ( state != TransactionState.DATA_EXCHANGED && state != TransactionState.TRANSACTION_STARTED) { + throw new IllegalStateException("Cannot send data because Transaction State is " + state); + } + + if ( direction == TransferDirection.RECEIVE ) { + throw new IllegalStateException("Attempting to send data but started a RECEIVE Transaction"); + } + + if ( transfers > 0 ) { + ResponseCode.CONTINUE_TRANSACTION.writeResponse(dos); + } + + logger.debug("{} Sending data to {}", this, peer); + + final OutputStream out = new CheckedOutputStream(dos, crc); + codec.encode(dataPacket, out); + + // need to close the CompressionOutputStream in order to force it write out any remaining bytes. + // Otherwise, do NOT close it because we don't want to close the underlying stream + // (CompressionOutputStream will not close the underlying stream when it's closed) + if ( compress ) { + out.close(); + } + + transfers++; + this.state = TransactionState.DATA_EXCHANGED; + } catch (final Exception e) { + error(); + throw e; + } + } + + + @Override + public void cancel(final String explanation) throws IOException { + if ( state == TransactionState.TRANSACTION_CANCELED || state == TransactionState.TRANSACTION_COMPLETED || state == TransactionState.ERROR ) { + throw new IllegalStateException("Cannot cancel transaction because state is already " + state); + } + + try { + ResponseCode.CANCEL_TRANSACTION.writeResponse(dos, explanation == null ? "<No explanation given>" : explanation); + state = TransactionState.TRANSACTION_CANCELED; + } catch (final IOException ioe) { + error(); + throw ioe; + } + } + + + @Override + public void complete(boolean requestBackoff) throws IOException { + try { + if ( state != TransactionState.TRANSACTION_CONFIRMED ) { + throw new IllegalStateException("Cannot complete transaction because state is " + state + + "; Transaction can only be completed when state is " + TransactionState.TRANSACTION_CONFIRMED); + } + + if ( direction == TransferDirection.RECEIVE ) { + if ( transfers == 0 ) { + state = TransactionState.TRANSACTION_COMPLETED; + return; + } + + if ( requestBackoff ) { + // Confirm that we received the data and the peer can now discard it but that the peer should not + // send any more data for a bit + logger.debug("{} Sending TRANSACTION_FINISHED_BUT_DESTINATION_FULL to {}", this, peer); + ResponseCode.TRANSACTION_FINISHED_BUT_DESTINATION_FULL.writeResponse(dos); + } else { + // Confirm that we received the data and the peer can now discard it + logger.debug("{} Sending TRANSACTION_FINISHED to {}", this, peer); + ResponseCode.TRANSACTION_FINISHED.writeResponse(dos); + } + + state = TransactionState.TRANSACTION_COMPLETED; + } else { + final Response transactionResponse; + try { + transactionResponse = Response.read(dis); + } catch (final IOException e) { + throw new IOException(this + " Failed to receive a response from " + peer + " when expecting a TransactionFinished Indicator. " + + "It is unknown whether or not the peer successfully received/processed the data.", e); + } + + logger.debug("{} Received {} from {}", this, transactionResponse, peer); + if ( transactionResponse.getCode() == ResponseCode.TRANSACTION_FINISHED_BUT_DESTINATION_FULL ) { + peer.penalize(penaltyMillis); + } else if ( transactionResponse.getCode() != ResponseCode.TRANSACTION_FINISHED ) { + throw new ProtocolException("After sending data, expected TRANSACTION_FINISHED response but got " + transactionResponse); + } + + state = TransactionState.TRANSACTION_COMPLETED; + } + } catch (final Exception e) { + error(); + throw e; + } + } + + + @Override + public void confirm() throws IOException { + try { + if ( state == TransactionState.TRANSACTION_STARTED && !dataAvailable && direction == TransferDirection.RECEIVE ) { + // client requested to receive data but no data available. no need to confirm. + state = TransactionState.TRANSACTION_CONFIRMED; + return; + } + + if ( state != TransactionState.DATA_EXCHANGED ) { + throw new IllegalStateException("Cannot confirm Transaction because state is " + state + + "; Transaction can only be confirmed when state is " + TransactionState.DATA_EXCHANGED ); + } + + if ( direction == TransferDirection.RECEIVE ) { + if ( dataAvailable ) { + throw new IllegalStateException("Cannot complete transaction because the sender has already sent more data than client has consumed."); + } + + // we received a FINISH_TRANSACTION indicator. Send back a CONFIRM_TRANSACTION message + // to peer so that we can verify that the connection is still open. This is a two-phase commit, + // which helps to prevent the chances of data duplication. Without doing this, we may commit the + // session and then when we send the response back to the peer, the peer may have timed out and may not + // be listening. As a result, it will re-send the data. By doing this two-phase commit, we narrow the + // Critical Section involved in this transaction so that rather than the Critical Section being the + // time window involved in the entire transaction, it is reduced to a simple round-trip conversation. + logger.trace("{} Sending CONFIRM_TRANSACTION Response Code to {}", this, peer); + final String calculatedCRC = String.valueOf(crc.getValue()); + ResponseCode.CONFIRM_TRANSACTION.writeResponse(dos, calculatedCRC); + + final Response confirmTransactionResponse; + try { + confirmTransactionResponse = Response.read(dis); + } catch (final IOException ioe) { + logger.error("Failed to receive response code from {} when expected confirmation of transaction", peer); + throw ioe; + } + + logger.trace("{} Received {} from {}", this, confirmTransactionResponse, peer); + + switch (confirmTransactionResponse.getCode()) { + case CONFIRM_TRANSACTION: + break; + case BAD_CHECKSUM: + throw new IOException(this + " Received a BadChecksum response from peer " + peer); + default: + throw new ProtocolException(this + " Received unexpected Response from peer " + peer + " : " + confirmTransactionResponse + "; expected 'Confirm Transaction' Response Code"); + } + + state = TransactionState.TRANSACTION_CONFIRMED; + } else { + logger.debug("{} Sent FINISH_TRANSACTION indicator to {}", this, peer); + ResponseCode.FINISH_TRANSACTION.writeResponse(dos); + + final String calculatedCRC = String.valueOf(crc.getValue()); + + // we've sent a FINISH_TRANSACTION. Now we'll wait for the peer to send a 'Confirm Transaction' response + final Response transactionConfirmationResponse = Response.read(dis); + if ( transactionConfirmationResponse.getCode() == ResponseCode.CONFIRM_TRANSACTION ) { + // Confirm checksum and echo back the confirmation. + logger.trace("{} Received {} from {}", this, transactionConfirmationResponse, peer); + final String receivedCRC = transactionConfirmationResponse.getMessage(); + + // CRC was not used before version 4 + if ( protocolVersion > 3 ) { + if ( !receivedCRC.equals(calculatedCRC) ) { + ResponseCode.BAD_CHECKSUM.writeResponse(dos); + throw new IOException(this + " Sent data to peer " + peer + " but calculated CRC32 Checksum as " + calculatedCRC + " while peer calculated CRC32 Checksum as " + receivedCRC + "; canceling transaction and rolling back session"); + } + } + + ResponseCode.CONFIRM_TRANSACTION.writeResponse(dos, ""); + } else { + throw new ProtocolException("Expected to receive 'Confirm Transaction' response from peer " + peer + " but received " + transactionConfirmationResponse); + } + + state = TransactionState.TRANSACTION_CONFIRMED; + } + } catch (final Exception e) { + error(); + throw e; + } + } + + @Override + public void error() { + this.state = TransactionState.ERROR; + } + + @Override + public TransactionState getState() { + return state; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/05b64593/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/PeerStatusCache.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/PeerStatusCache.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/PeerStatusCache.java new file mode 100644 index 0000000..6dab77b --- /dev/null +++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/PeerStatusCache.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.remote.util; + +import java.util.Set; + +import org.apache.nifi.remote.PeerStatus; + +public class PeerStatusCache { + private final Set<PeerStatus> statuses; + private final long timestamp; + + public PeerStatusCache(final Set<PeerStatus> statuses) { + this(statuses, System.currentTimeMillis()); + } + + public PeerStatusCache(final Set<PeerStatus> statuses, final long timestamp) { + this.statuses = statuses; + this.timestamp = timestamp; + } + + public Set<PeerStatus> getStatuses() { + return statuses; + } + + public long getTimestamp() { + return timestamp; + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/05b64593/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/RemoteNiFiUtils.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/RemoteNiFiUtils.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/RemoteNiFiUtils.java new file mode 100644 index 0000000..b2dbdcd --- /dev/null +++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/RemoteNiFiUtils.java @@ -0,0 +1,216 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.remote.util; + +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.Map; + +import javax.net.ssl.SSLContext; +import javax.ws.rs.core.MediaType; + +import org.apache.nifi.web.api.dto.ControllerDTO; +import org.apache.nifi.web.api.entity.ControllerEntity; +import org.apache.nifi.web.util.WebUtils; + +import com.sun.jersey.api.client.Client; +import com.sun.jersey.api.client.ClientHandlerException; +import com.sun.jersey.api.client.ClientResponse; +import com.sun.jersey.api.client.ClientResponse.Status; +import com.sun.jersey.api.client.config.ClientConfig; +import com.sun.jersey.api.client.UniformInterfaceException; +import com.sun.jersey.api.client.WebResource; +import com.sun.jersey.core.util.MultivaluedMapImpl; + +/** + * + */ +public class RemoteNiFiUtils { + + public static final String CONTROLLER_URI_PATH = "/controller"; + + private static final int CONNECT_TIMEOUT = 10000; + private static final int READ_TIMEOUT = 10000; + + private final Client client; + + public RemoteNiFiUtils(final SSLContext sslContext) { + this.client = getClient(sslContext); + } + + + /** + * Gets the content at the specified URI. + * + * @param uri + * @param timeoutMillis + * @return + * @throws ClientHandlerException + * @throws UniformInterfaceException + */ + public ClientResponse get(final URI uri, final int timeoutMillis) throws ClientHandlerException, UniformInterfaceException { + return get(uri, timeoutMillis, null); + } + + /** + * Gets the content at the specified URI using the given query parameters. + * + * @param uri + * @param timeoutMillis + * @param queryParams + * @return + * @throws ClientHandlerException + * @throws UniformInterfaceException + */ + public ClientResponse get(final URI uri, final int timeoutMillis, final Map<String, String> queryParams) throws ClientHandlerException, UniformInterfaceException { + // perform the request + WebResource webResource = client.resource(uri); + if ( queryParams != null ) { + for ( final Map.Entry<String, String> queryEntry : queryParams.entrySet() ) { + webResource = webResource.queryParam(queryEntry.getKey(), queryEntry.getValue()); + } + } + + webResource.setProperty(ClientConfig.PROPERTY_READ_TIMEOUT, timeoutMillis); + webResource.setProperty(ClientConfig.PROPERTY_CONNECT_TIMEOUT, timeoutMillis); + + return webResource.accept(MediaType.APPLICATION_JSON).get(ClientResponse.class); + } + + /** + * Performs a HEAD request to the specified URI. + * + * @param uri + * @param timeoutMillis + * @return + * @throws ClientHandlerException + * @throws UniformInterfaceException + */ + public ClientResponse head(final URI uri, final int timeoutMillis) throws ClientHandlerException, UniformInterfaceException { + // perform the request + WebResource webResource = client.resource(uri); + webResource.setProperty(ClientConfig.PROPERTY_READ_TIMEOUT, timeoutMillis); + webResource.setProperty(ClientConfig.PROPERTY_CONNECT_TIMEOUT, timeoutMillis); + return webResource.head(); + } + + /** + * Gets a client based on the specified URI. + * + * @param uri + * @return + */ + private Client getClient(final SSLContext sslContext) { + final Client client; + if (sslContext == null) { + client = WebUtils.createClient(null); + } else { + client = WebUtils.createClient(null, sslContext); + } + + client.setReadTimeout(READ_TIMEOUT); + client.setConnectTimeout(CONNECT_TIMEOUT); + + return client; + } + + + /** + * Returns the port on which the remote instance is listening for Flow File transfers, or <code>null</code> if the remote instance + * is not configured to use Site-to-Site transfers. + * + * @param uri the base URI of the remote instance. This should include the path only to the nifi-api level, as well as the protocol, host, and port. + * @param timeoutMillis + * @return + * @throws IOException + */ + public Integer getRemoteListeningPort(final String uri, final int timeoutMillis) throws IOException { + try { + final URI uriObject = new URI(uri + CONTROLLER_URI_PATH); + return getRemoteListeningPort(uriObject, timeoutMillis); + } catch (URISyntaxException e) { + throw new IOException("Unable to establish connection to remote host because URI is invalid: " + uri); + } + } + + public String getRemoteRootGroupId(final String uri, final int timeoutMillis) throws IOException { + try { + final URI uriObject = new URI(uri + CONTROLLER_URI_PATH); + return getRemoteRootGroupId(uriObject, timeoutMillis); + } catch (URISyntaxException e) { + throw new IOException("Unable to establish connection to remote host because URI is invalid: " + uri); + } + } + + public String getRemoteInstanceId(final String uri, final int timeoutMillis) throws IOException { + try { + final URI uriObject = new URI(uri + CONTROLLER_URI_PATH); + return getController(uriObject, timeoutMillis).getInstanceId(); + } catch (URISyntaxException e) { + throw new IOException("Unable to establish connection to remote host because URI is invalid: " + uri); + } + } + + /** + * Returns the port on which the remote instance is listening for Flow File transfers, or <code>null</code> if the remote instance + * is not configured to use Site-to-Site transfers. + * + * @param uri the full URI to fetch, including the path. + * @return + * @throws IOException + */ + private Integer getRemoteListeningPort(final URI uri, final int timeoutMillis) throws IOException { + return getController(uri, timeoutMillis).getRemoteSiteListeningPort(); + } + + private String getRemoteRootGroupId(final URI uri, final int timeoutMillis) throws IOException { + return getController(uri, timeoutMillis).getId(); + } + + public ControllerDTO getController(final URI uri, final int timeoutMillis) throws IOException { + final ClientResponse response = get(uri, timeoutMillis); + + if (Status.OK.getStatusCode() == response.getStatusInfo().getStatusCode()) { + final ControllerEntity entity = response.getEntity(ControllerEntity.class); + return entity.getController(); + } else { + final String responseMessage = response.getEntity(String.class); + throw new IOException("Got HTTP response Code " + response.getStatusInfo().getStatusCode() + ": " + response.getStatusInfo().getReasonPhrase() + " with explanation: " + responseMessage); + } + } + + /** + * Issues a registration request on behalf of the current user. + * + * @param baseApiUri + * @return + */ + public ClientResponse issueRegistrationRequest(String baseApiUri) { + final URI uri = URI.create(String.format("%s/%s", baseApiUri, "/controller/users")); + + // set up the query params + MultivaluedMapImpl entity = new MultivaluedMapImpl(); + entity.add("justification", "A Remote instance of NiFi has attempted to create a reference to this NiFi. This action must be approved first."); + + // create the web resource + WebResource webResource = client.resource(uri); + + // get the client utils and make the request + return webResource.accept(MediaType.APPLICATION_JSON).type(MediaType.APPLICATION_FORM_URLENCODED).entity(entity).post(ClientResponse.class); + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/05b64593/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/StandardDataPacket.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/StandardDataPacket.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/StandardDataPacket.java new file mode 100644 index 0000000..bd1b50c --- /dev/null +++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/util/StandardDataPacket.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.remote.util; + +import java.io.InputStream; +import java.util.Map; + +import org.apache.nifi.remote.protocol.DataPacket; +import org.apache.nifi.stream.io.LimitingInputStream; +import org.apache.nifi.stream.io.MinimumLengthInputStream; + +public class StandardDataPacket implements DataPacket { + + private final Map<String, String> attributes; + private final InputStream stream; + private final long size; + + public StandardDataPacket(final Map<String, String> attributes, final InputStream stream, final long size) { + this.attributes = attributes; + this.stream = new MinimumLengthInputStream(new LimitingInputStream(stream, size), size); + this.size = size; + } + + public Map<String, String> getAttributes() { + return attributes; + } + + public InputStream getData() { + return stream; + } + + public long getSize() { + return size; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/05b64593/nifi/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/socket/TestEndpointConnectionStatePool.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/socket/TestEndpointConnectionStatePool.java b/nifi/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/socket/TestEndpointConnectionStatePool.java new file mode 100644 index 0000000..d8899ea --- /dev/null +++ b/nifi/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/socket/TestEndpointConnectionStatePool.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.remote.client.socket; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.nifi.remote.PeerStatus; +import org.apache.nifi.remote.TransferDirection; +import org.apache.nifi.remote.cluster.ClusterNodeInformation; +import org.apache.nifi.remote.cluster.NodeInformation; +import org.junit.Test; + +public class TestEndpointConnectionStatePool { + + @Test + public void testFormulateDestinationListForOutput() throws IOException { + final ClusterNodeInformation clusterNodeInfo = new ClusterNodeInformation(); + final List<NodeInformation> collection = new ArrayList<>(); + collection.add(new NodeInformation("ShouldGetMedium", 1, 1111, true, 4096)); + collection.add(new NodeInformation("ShouldGetLots", 2, 2222, true, 10240)); + collection.add(new NodeInformation("ShouldGetLittle", 3, 3333, true, 1024)); + collection.add(new NodeInformation("ShouldGetMedium", 4, 4444, true, 4096)); + collection.add(new NodeInformation("ShouldGetMedium", 5, 5555, true, 4096)); + + clusterNodeInfo.setNodeInformation(collection); + final List<PeerStatus> destinations = EndpointConnectionStatePool.formulateDestinationList(clusterNodeInfo, TransferDirection.SEND); + for ( final PeerStatus peerStatus : destinations ) { + System.out.println(peerStatus.getHostname() + ":" + peerStatus.getPort()); + } + } + + @Test + public void testFormulateDestinationListForOutputHugeDifference() throws IOException { + final ClusterNodeInformation clusterNodeInfo = new ClusterNodeInformation(); + final List<NodeInformation> collection = new ArrayList<>(); + collection.add(new NodeInformation("ShouldGetLittle", 1, 1111, true, 500)); + collection.add(new NodeInformation("ShouldGetLots", 2, 2222, true, 50000)); + + clusterNodeInfo.setNodeInformation(collection); + final List<PeerStatus> destinations = EndpointConnectionStatePool.formulateDestinationList(clusterNodeInfo, TransferDirection.SEND); + for ( final PeerStatus peerStatus : destinations ) { + System.out.println(peerStatus.getHostname() + ":" + peerStatus.getPort()); + } + } + + + + + @Test + public void testFormulateDestinationListForInputPorts() throws IOException { + final ClusterNodeInformation clusterNodeInfo = new ClusterNodeInformation(); + final List<NodeInformation> collection = new ArrayList<>(); + collection.add(new NodeInformation("ShouldGetMedium", 1, 1111, true, 4096)); + collection.add(new NodeInformation("ShouldGetLittle", 2, 2222, true, 10240)); + collection.add(new NodeInformation("ShouldGetLots", 3, 3333, true, 1024)); + collection.add(new NodeInformation("ShouldGetMedium", 4, 4444, true, 4096)); + collection.add(new NodeInformation("ShouldGetMedium", 5, 5555, true, 4096)); + + clusterNodeInfo.setNodeInformation(collection); + final List<PeerStatus> destinations = EndpointConnectionStatePool.formulateDestinationList(clusterNodeInfo, TransferDirection.SEND); + for ( final PeerStatus peerStatus : destinations ) { + System.out.println(peerStatus.getHostname() + ":" + peerStatus.getPort()); + } + } + + @Test + public void testFormulateDestinationListForInputPortsHugeDifference() throws IOException { + final ClusterNodeInformation clusterNodeInfo = new ClusterNodeInformation(); + final List<NodeInformation> collection = new ArrayList<>(); + collection.add(new NodeInformation("ShouldGetLots", 1, 1111, true, 500)); + collection.add(new NodeInformation("ShouldGetLittle", 2, 2222, true, 50000)); + + clusterNodeInfo.setNodeInformation(collection); + final List<PeerStatus> destinations = EndpointConnectionStatePool.formulateDestinationList(clusterNodeInfo, TransferDirection.SEND); + for ( final PeerStatus peerStatus : destinations ) { + System.out.println(peerStatus.getHostname() + ":" + peerStatus.getPort()); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/05b64593/nifi/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/LimitingInputStream.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/LimitingInputStream.java b/nifi/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/LimitingInputStream.java new file mode 100644 index 0000000..421d579 --- /dev/null +++ b/nifi/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/stream/io/LimitingInputStream.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.stream.io; + +import java.io.IOException; +import java.io.InputStream; + +public class LimitingInputStream extends InputStream { + + private final InputStream in; + private final long limit; + private long bytesRead = 0; + + public LimitingInputStream(final InputStream in, final long limit) { + this.in = in; + this.limit = limit; + } + + @Override + public int read() throws IOException { + if (bytesRead >= limit) { + return -1; + } + + final int val = in.read(); + if (val > -1) { + bytesRead++; + } + return val; + } + + @Override + public int read(final byte[] b) throws IOException { + if (bytesRead >= limit) { + return -1; + } + + final int maxToRead = (int) Math.min(b.length, limit - bytesRead); + + final int val = in.read(b, 0, maxToRead); + if (val > 0) { + bytesRead += val; + } + return val; + } + + @Override + public int read(byte[] b, int off, int len) throws IOException { + if (bytesRead >= limit) { + return -1; + } + + final int maxToRead = (int) Math.min(len, limit - bytesRead); + + final int val = in.read(b, off, maxToRead); + if (val > 0) { + bytesRead += val; + } + return val; + } + + @Override + public long skip(final long n) throws IOException { + final long skipped = in.skip(Math.min(n, limit - bytesRead)); + bytesRead += skipped; + return skipped; + } + + @Override + public int available() throws IOException { + return in.available(); + } + + @Override + public void close() throws IOException { + in.close(); + } + + @Override + public void mark(int readlimit) { + in.mark(readlimit); + } + + @Override + public boolean markSupported() { + return in.markSupported(); + } + + @Override + public void reset() throws IOException { + in.reset(); + } + + public long getLimit() { + return limit; + } +}