NIFI-282: Begin refactoring and creating client
Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/fdf75846 Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/fdf75846 Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/fdf75846 Branch: refs/heads/site-to-site-client Commit: fdf75846002877f7f0c857ff2b18593c6f2d825d Parents: f21b502 Author: Mark Payne <marka...@hotmail.com> Authored: Sun Jan 18 19:22:27 2015 -0500 Committer: Mark Payne <marka...@hotmail.com> Committed: Sun Jan 18 19:22:27 2015 -0500 ---------------------------------------------------------------------- nifi/commons/pom.xml | 9 +- nifi/commons/site-to-site-client/pom.xml | 31 + .../remote/AbstractCommunicationsSession.java | 54 ++ .../main/java/org/apache/nifi/remote/Peer.java | 113 ++++ .../java/org/apache/nifi/remote/PeerStatus.java | 72 +++ .../nifi/remote/PortAuthorizationResult.java | 25 + .../nifi/remote/RemoteAuthorizationState.java | 27 + .../nifi/remote/RemoteResourceInitiator.java | 64 ++ .../apache/nifi/remote/TransferDirection.java | 23 + .../nifi/remote/VersionedRemoteResource.java | 24 + .../apache/nifi/remote/client/DataPacket.java | 28 + .../nifi/remote/client/SiteToSiteClient.java | 27 + .../client/socket/EndpointConnectionState.java | 54 ++ .../socket/EndpointConnectionStatePool.java | 648 +++++++++++++++++++ .../nifi/remote/client/socket/SocketClient.java | 37 ++ .../remote/cluster/AdaptedNodeInformation.java | 66 ++ .../remote/cluster/ClusterNodeInformation.java | 67 ++ .../nifi/remote/cluster/NodeInformant.java | 22 + .../nifi/remote/cluster/NodeInformation.java | 98 +++ .../remote/cluster/NodeInformationAdapter.java | 41 ++ .../apache/nifi/remote/codec/FlowFileCodec.java | 79 +++ .../remote/codec/StandardFlowFileCodec.java | 169 +++++ .../remote/exception/BadRequestException.java | 30 + .../remote/exception/HandshakeException.java | 30 + .../exception/NotAuthorizedException.java | 26 + .../exception/PortNotRunningException.java | 26 + .../remote/exception/ProtocolException.java | 34 + .../exception/RequestExpiredException.java | 26 + .../remote/exception/UnknownPortException.java | 26 + .../exception/UnsupportedCodecException.java | 31 + .../SocketChannelCommunicationsSession.java | 90 +++ .../remote/io/socket/SocketChannelInput.java | 66 ++ .../remote/io/socket/SocketChannelOutput.java | 58 ++ .../SSLSocketChannelCommunicationsSession.java | 93 +++ .../io/socket/ssl/SSLSocketChannelInput.java | 50 ++ .../io/socket/ssl/SSLSocketChannelOutput.java | 44 ++ .../nifi/remote/protocol/ClientProtocol.java | 78 +++ .../remote/protocol/CommunicationsInput.java | 27 + .../remote/protocol/CommunicationsOutput.java | 27 + .../remote/protocol/CommunicationsSession.java | 64 ++ .../nifi/remote/protocol/RequestType.java | 43 ++ .../protocol/socket/HandshakeProperty.java | 23 + .../nifi/remote/protocol/socket/Response.java | 51 ++ .../remote/protocol/socket/ResponseCode.java | 152 +++++ .../protocol/socket/SocketClientProtocol.java | 517 +++++++++++++++ .../nifi/remote/util/PeerStatusCache.java | 43 ++ .../nifi/cluster/manager/ClusterManager.java | 2 +- .../cluster/manager/impl/WebClusterManager.java | 5 +- .../framework-bundle/framework/core-api/pom.xml | 5 + .../nifi/cluster/AdaptedNodeInformation.java | 66 -- .../nifi/cluster/ClusterNodeInformation.java | 67 -- .../org/apache/nifi/cluster/NodeInformant.java | 22 - .../apache/nifi/cluster/NodeInformation.java | 98 --- .../nifi/cluster/NodeInformationAdapter.java | 39 -- .../apache/nifi/groups/RemoteProcessGroup.java | 6 - .../main/java/org/apache/nifi/remote/Peer.java | 107 --- .../java/org/apache/nifi/remote/PeerStatus.java | 72 --- .../nifi/remote/PortAuthorizationResult.java | 25 - .../nifi/remote/RemoteAuthorizationState.java | 27 - .../org/apache/nifi/remote/RemoteGroupPort.java | 22 +- .../apache/nifi/remote/TransferDirection.java | 23 - .../nifi/remote/VersionedRemoteResource.java | 24 - .../apache/nifi/remote/codec/FlowFileCodec.java | 79 --- .../remote/exception/BadRequestException.java | 30 - .../remote/exception/HandshakeException.java | 30 - .../exception/NotAuthorizedException.java | 26 - .../exception/PortNotRunningException.java | 26 - .../remote/exception/ProtocolException.java | 34 - .../exception/RequestExpiredException.java | 26 - .../remote/exception/UnknownPortException.java | 26 - .../nifi/remote/protocol/ClientProtocol.java | 78 --- .../remote/protocol/CommunicationsInput.java | 27 - .../remote/protocol/CommunicationsOutput.java | 27 - .../remote/protocol/CommunicationsSession.java | 64 -- .../nifi/remote/protocol/RequestType.java | 43 -- .../nifi/remote/protocol/ServerProtocol.java | 2 +- .../nifi/remote/StandardRemoteProcessGroup.java | 245 +------ .../framework/site-to-site/pom.xml | 4 + .../remote/AbstractCommunicationsSession.java | 54 -- .../nifi/remote/RemoteResourceFactory.java | 42 +- .../nifi/remote/SocketRemoteSiteListener.java | 2 +- .../nifi/remote/StandardRemoteGroupPort.java | 498 +++----------- .../remote/codec/StandardFlowFileCodec.java | 169 ----- .../exception/UnsupportedCodecException.java | 31 - .../SocketChannelCommunicationsSession.java | 90 --- .../remote/io/socket/SocketChannelInput.java | 66 -- .../remote/io/socket/SocketChannelOutput.java | 58 -- .../SSLSocketChannelCommunicationsSession.java | 93 --- .../io/socket/ssl/SSLSocketChannelInput.java | 50 -- .../io/socket/ssl/SSLSocketChannelOutput.java | 44 -- .../socket/ClusterManagerServerProtocol.java | 7 +- .../protocol/socket/HandshakeProperty.java | 23 - .../nifi/remote/protocol/socket/Response.java | 51 -- .../remote/protocol/socket/ResponseCode.java | 152 ----- .../protocol/socket/SocketClientProtocol.java | 510 --------------- .../socket/SocketFlowFileServerProtocol.java | 3 +- .../remote/TestStandardRemoteGroupPort.java | 6 +- nifi/nar-bundles/framework-bundle/pom.xml | 5 + .../apache/nifi/remote/RemoteDestination.java | 37 ++ 99 files changed, 3606 insertions(+), 3195 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/fdf75846/nifi/commons/pom.xml ---------------------------------------------------------------------- diff --git a/nifi/commons/pom.xml b/nifi/commons/pom.xml index f85e337..43dc0d8 100644 --- a/nifi/commons/pom.xml +++ b/nifi/commons/pom.xml @@ -12,9 +12,7 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. ---> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> +--><project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> @@ -39,6 +37,7 @@ <module>nifi-utils</module> <module>nifi-web-utils</module> <module>processor-utilities</module> + <module>site-to-site-client</module> <module>wali</module> - </modules> -</project> + </modules> +</project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/fdf75846/nifi/commons/site-to-site-client/pom.xml ---------------------------------------------------------------------- diff --git a/nifi/commons/site-to-site-client/pom.xml b/nifi/commons/site-to-site-client/pom.xml new file mode 100644 index 0000000..7719d55 --- /dev/null +++ b/nifi/commons/site-to-site-client/pom.xml @@ -0,0 +1,31 @@ +<?xml version="1.0"?> +<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-commons-parent</artifactId> + <version>0.0.1-incubating-SNAPSHOT</version> + </parent> + + <artifactId>site-to-site-client</artifactId> + <name>NiFi Site-to-Site Client</name> + + <dependencies> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-api</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-utils</artifactId> + </dependency> + + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + </dependency> + </dependencies> +</project> http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/fdf75846/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/AbstractCommunicationsSession.java ---------------------------------------------------------------------- diff --git a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/AbstractCommunicationsSession.java b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/AbstractCommunicationsSession.java new file mode 100644 index 0000000..4babb92 --- /dev/null +++ b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/AbstractCommunicationsSession.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.remote; + +import org.apache.nifi.remote.protocol.CommunicationsSession; + +public abstract class AbstractCommunicationsSession implements CommunicationsSession { + private String userDn; + + private volatile String uri; + + public AbstractCommunicationsSession(final String uri) { + this.uri = uri; + } + + @Override + public String toString() { + return uri; + } + + @Override + public void setUri(final String uri) { + this.uri = uri; + } + + @Override + public String getUri() { + return uri; + } + + @Override + public String getUserDn() { + return userDn; + } + + @Override + public void setUserDn(final String dn) { + this.userDn = dn; + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/fdf75846/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/Peer.java ---------------------------------------------------------------------- diff --git a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/Peer.java b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/Peer.java new file mode 100644 index 0000000..e811c68 --- /dev/null +++ b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/Peer.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.remote; + +import java.io.IOException; +import java.net.URI; + +import org.apache.nifi.remote.protocol.CommunicationsSession; + +public class Peer { + + private final CommunicationsSession commsSession; + private final String url; + private final String clusterUrl; + private final String host; + private long penalizationExpiration = 0L; + private boolean closed = false; + + public Peer(final CommunicationsSession commsSession, final String peerUrl, final String clusterUrl) { + this.commsSession = commsSession; + this.url = peerUrl; + this.clusterUrl = clusterUrl; + + try { + this.host = new URI(peerUrl).getHost(); + } catch (final Exception e) { + throw new IllegalArgumentException("Invalid URL: " + peerUrl); + } + } + + public String getUrl() { + return url; + } + + public String getClusterUrl() { + return clusterUrl; + } + + public CommunicationsSession getCommunicationsSession() { + return commsSession; + } + + public void close() throws IOException { + this.closed = true; + + // TODO: Consume the InputStream so that it doesn't linger on the Peer's outgoing socket buffer + commsSession.close(); + } + + public void penalize(final long millis) { + penalizationExpiration = Math.max(penalizationExpiration, System.currentTimeMillis() + millis); + } + + public boolean isPenalized() { + return penalizationExpiration > System.currentTimeMillis(); + } + + public boolean isClosed() { + return closed; + } + + public String getHost() { + return host; + } + + @Override + public int hashCode() { + return 8320 + url.hashCode(); + } + + @Override + public boolean equals(final Object obj) { + if (obj == null) { + return false; + } + if (obj == this) { + return true; + } + if (!(obj instanceof Peer)) { + return false; + } + + final Peer other = (Peer) obj; + return this.url.equals(other.url); + } + + @Override + public String toString() { + final StringBuilder sb = new StringBuilder(); + sb.append("Peer[url=").append(url); + if (closed) { + sb.append(",CLOSED"); + } else if (isPenalized()) { + sb.append(",PENALIZED"); + } + sb.append("]"); + return sb.toString(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/fdf75846/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/PeerStatus.java ---------------------------------------------------------------------- diff --git a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/PeerStatus.java b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/PeerStatus.java new file mode 100644 index 0000000..d1cb076 --- /dev/null +++ b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/PeerStatus.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.remote; + +public class PeerStatus { + + private final String hostname; + private final int port; + private final boolean secure; + private final int numFlowFiles; + + public PeerStatus(final String hostname, final int port, final boolean secure, final int numFlowFiles) { + this.hostname = hostname; + this.port = port; + this.secure = secure; + this.numFlowFiles = numFlowFiles; + } + + public String getHostname() { + return hostname; + } + + public int getPort() { + return port; + } + + public boolean isSecure() { + return secure; + } + + public int getFlowFileCount() { + return numFlowFiles; + } + + @Override + public String toString() { + return "PeerStatus[hostname=" + hostname + ",port=" + port + ",secure=" + secure + ",flowFileCount=" + numFlowFiles + "]"; + } + + @Override + public int hashCode() { + return 9824372 + hostname.hashCode() + port; + } + + @Override + public boolean equals(final Object obj) { + if (obj == null) { + return false; + } + + if (!(obj instanceof PeerStatus)) { + return false; + } + + final PeerStatus other = (PeerStatus) obj; + return port == other.port && hostname.equals(other.hostname); + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/fdf75846/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/PortAuthorizationResult.java ---------------------------------------------------------------------- diff --git a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/PortAuthorizationResult.java b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/PortAuthorizationResult.java new file mode 100644 index 0000000..8f2603a --- /dev/null +++ b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/PortAuthorizationResult.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.remote; + +public interface PortAuthorizationResult { + + boolean isAuthorized(); + + String getExplanation(); + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/fdf75846/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/RemoteAuthorizationState.java ---------------------------------------------------------------------- diff --git a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/RemoteAuthorizationState.java b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/RemoteAuthorizationState.java new file mode 100644 index 0000000..12a3d33 --- /dev/null +++ b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/RemoteAuthorizationState.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.remote; + +/** + * + */ +public enum RemoteAuthorizationState { + + UNKNOWN, + UNAUTHORIZED, + AUTHORIZED; +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/fdf75846/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/RemoteResourceInitiator.java ---------------------------------------------------------------------- diff --git a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/RemoteResourceInitiator.java b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/RemoteResourceInitiator.java new file mode 100644 index 0000000..8eb5d8d --- /dev/null +++ b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/RemoteResourceInitiator.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.remote; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; + +import org.apache.nifi.remote.exception.HandshakeException; + +public class RemoteResourceInitiator { + public static final int RESOURCE_OK = 20; + public static final int DIFFERENT_RESOURCE_VERSION = 21; + public static final int ABORT = 255; + + + public static VersionedRemoteResource initiateResourceNegotiation(final VersionedRemoteResource resource, final DataInputStream dis, final DataOutputStream dos) throws IOException, HandshakeException { + // Write the classname of the RemoteStreamCodec, followed by its version + dos.writeUTF(resource.getResourceName()); + final VersionNegotiator negotiator = resource.getVersionNegotiator(); + dos.writeInt(negotiator.getVersion()); + dos.flush(); + + // wait for response from server. + final int statusCode = dis.read(); + switch (statusCode) { + case RESOURCE_OK: // server accepted our proposal of codec name/version + return resource; + case DIFFERENT_RESOURCE_VERSION: // server accepted our proposal of codec name but not the version + // Get server's preferred version + final int newVersion = dis.readInt(); + + // Determine our new preferred version that is no greater than the server's preferred version. + final Integer newPreference = negotiator.getPreferredVersion(newVersion); + // If we could not agree with server on a version, fail now. + if ( newPreference == null ) { + throw new HandshakeException("Could not agree on version for " + resource); + } + + negotiator.setVersion(newPreference); + + // Attempt negotiation of resource based on our new preferred version. + return initiateResourceNegotiation(resource, dis, dos); + case ABORT: + throw new HandshakeException("Remote destination aborted connection with message: " + dis.readUTF()); + default: + return null; // Unable to negotiate codec + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/fdf75846/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/TransferDirection.java ---------------------------------------------------------------------- diff --git a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/TransferDirection.java b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/TransferDirection.java new file mode 100644 index 0000000..56432d5 --- /dev/null +++ b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/TransferDirection.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.remote; + +public enum TransferDirection { + + SEND, + RECEIVE; +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/fdf75846/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/VersionedRemoteResource.java ---------------------------------------------------------------------- diff --git a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/VersionedRemoteResource.java b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/VersionedRemoteResource.java new file mode 100644 index 0000000..bfccd98 --- /dev/null +++ b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/VersionedRemoteResource.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.remote; + +public interface VersionedRemoteResource { + + VersionNegotiator getVersionNegotiator(); + + String getResourceName(); +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/fdf75846/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/DataPacket.java ---------------------------------------------------------------------- diff --git a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/DataPacket.java b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/DataPacket.java new file mode 100644 index 0000000..ec77f2c --- /dev/null +++ b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/DataPacket.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.remote.client; + +import java.io.InputStream; +import java.util.Map; + +public interface DataPacket { + + Map<String, String> getAttributes(); + + InputStream getData(); + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/fdf75846/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java ---------------------------------------------------------------------- diff --git a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java new file mode 100644 index 0000000..47a09be --- /dev/null +++ b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/SiteToSiteClient.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.remote.client; + +import java.io.IOException; + +public interface SiteToSiteClient { + + void send(DataPacket dataPacket) throws IOException; + + DataPacket receive() throws IOException; + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/fdf75846/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionState.java ---------------------------------------------------------------------- diff --git a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionState.java b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionState.java new file mode 100644 index 0000000..f4ac727 --- /dev/null +++ b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionState.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.remote.client.socket; + +import org.apache.nifi.remote.Peer; +import org.apache.nifi.remote.codec.FlowFileCodec; +import org.apache.nifi.remote.protocol.socket.SocketClientProtocol; + +public class EndpointConnectionState { + private final Peer peer; + private final SocketClientProtocol socketClientProtocol; + private final FlowFileCodec codec; + private volatile long lastUsed; + + public EndpointConnectionState(final Peer peer, final SocketClientProtocol socketClientProtocol, final FlowFileCodec codec) { + this.peer = peer; + this.socketClientProtocol = socketClientProtocol; + this.codec = codec; + } + + public FlowFileCodec getCodec() { + return codec; + } + + public SocketClientProtocol getSocketClientProtocol() { + return socketClientProtocol; + } + + public Peer getPeer() { + return peer; + } + + public void setLastTimeUsed() { + lastUsed = System.currentTimeMillis(); + } + + public long getLastTimeUsed() { + return lastUsed; + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/fdf75846/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionStatePool.java ---------------------------------------------------------------------- diff --git a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionStatePool.java b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionStatePool.java new file mode 100644 index 0000000..2dd489d --- /dev/null +++ b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionStatePool.java @@ -0,0 +1,648 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.remote.client.socket; + +import java.io.BufferedReader; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.OutputStream; +import java.net.InetSocketAddress; +import java.net.URI; +import java.net.URISyntaxException; +import java.nio.channels.SocketChannel; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.ReentrantLock; +import java.util.regex.Pattern; + +import javax.net.ssl.SSLContext; +import javax.security.cert.CertificateExpiredException; +import javax.security.cert.CertificateNotYetValidException; + +import org.apache.nifi.events.EventReporter; +import org.apache.nifi.remote.Peer; +import org.apache.nifi.remote.PeerStatus; +import org.apache.nifi.remote.RemoteDestination; +import org.apache.nifi.remote.RemoteResourceInitiator; +import org.apache.nifi.remote.TransferDirection; +import org.apache.nifi.remote.cluster.ClusterNodeInformation; +import org.apache.nifi.remote.cluster.NodeInformation; +import org.apache.nifi.remote.codec.FlowFileCodec; +import org.apache.nifi.remote.exception.BadRequestException; +import org.apache.nifi.remote.exception.HandshakeException; +import org.apache.nifi.remote.exception.PortNotRunningException; +import org.apache.nifi.remote.exception.ProtocolException; +import org.apache.nifi.remote.exception.TransmissionDisabledException; +import org.apache.nifi.remote.exception.UnknownPortException; +import org.apache.nifi.remote.io.socket.SocketChannelCommunicationsSession; +import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannel; +import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannelCommunicationsSession; +import org.apache.nifi.remote.protocol.CommunicationsSession; +import org.apache.nifi.remote.protocol.socket.SocketClientProtocol; +import org.apache.nifi.remote.util.PeerStatusCache; +import org.apache.nifi.reporting.Severity; +import org.apache.nifi.stream.io.BufferedOutputStream; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class EndpointConnectionStatePool { + public static final long PEER_REFRESH_PERIOD = 60000L; + public static final String CATEGORY = "Site-to-Site"; + private static final long PEER_CACHE_MILLIS = TimeUnit.MILLISECONDS.convert(1, TimeUnit.MINUTES); + + private static final Logger logger = LoggerFactory.getLogger(EndpointConnectionStatePool.class); + + private final ConcurrentMap<String, BlockingQueue<EndpointConnectionState>> endpointConnectionMap = new ConcurrentHashMap<>(); + private final ConcurrentMap<PeerStatus, Long> peerTimeoutExpirations = new ConcurrentHashMap<>(); + + private final AtomicLong peerIndex = new AtomicLong(0L); + + private final ReentrantLock peerRefreshLock = new ReentrantLock(); + private volatile List<PeerStatus> peerStatuses; + private volatile long peerRefreshTime = 0L; + private volatile PeerStatusCache peerStatusCache; + private final Set<CommunicationsSession> activeCommsChannels = new HashSet<>(); + + private final File peersFile; + private final EventReporter eventReporter; + private final SSLContext sslContext; + + public EndpointConnectionStatePool(final EventReporter eventReporter, final File persistenceFile) { + this(null, eventReporter, persistenceFile); + } + + public EndpointConnectionStatePool(final SSLContext sslContext, final EventReporter eventReporter, final File persistenceFile) { + this.sslContext = sslContext; + this.peersFile = persistenceFile; + this.eventReporter = eventReporter; + + Set<PeerStatus> recoveredStatuses; + if ( persistenceFile != null && persistenceFile.exists() ) { + try { + recoveredStatuses = recoverPersistedPeerStatuses(peersFile); + this.peerStatusCache = new PeerStatusCache(recoveredStatuses, peersFile.lastModified()); + } catch (final IOException ioe) { + logger.warn("Failed to recover peer statuses from {} due to {}; will continue without loading information from file", persistenceFile, ioe); + } + } else { + peerStatusCache = null; + } + } + + public EndpointConnectionState getEndpointConnectionState(final String clusterUrl, final RemoteDestination remoteDestination, final TransferDirection direction) throws IOException, HandshakeException, PortNotRunningException, UnknownPortException, ProtocolException { + // + // Attempt to get a connection state that already exists for this URL. + // + BlockingQueue<EndpointConnectionState> connectionStateQueue = endpointConnectionMap.get(clusterUrl); + if ( connectionStateQueue == null ) { + connectionStateQueue = new LinkedBlockingQueue<>(); + BlockingQueue<EndpointConnectionState> existingQueue = endpointConnectionMap.putIfAbsent(clusterUrl, connectionStateQueue); + if ( existingQueue != null ) { + connectionStateQueue = existingQueue; + } + } + + FlowFileCodec codec = null; + CommunicationsSession commsSession = null; + SocketClientProtocol protocol = null; + EndpointConnectionState connectionState; + Peer peer = null; + + do { + final PeerStatus peerStatus = getNextPeerStatus(direction); + if ( peerStatus == null ) { + return null; + } + + connectionState = connectionStateQueue.poll(); + logger.debug("{} Connection State for {} = {}", this, clusterUrl, connectionState); + + // if we can't get an existing ConnectionState, create one + if ( connectionState == null ) { + protocol = new SocketClientProtocol(); + protocol.setDestination(remoteDestination); + + try { + commsSession = establishSiteToSiteConnection(peerStatus); + final DataInputStream dis = new DataInputStream(commsSession.getInput().getInputStream()); + final DataOutputStream dos = new DataOutputStream(commsSession.getOutput().getOutputStream()); + try { + RemoteResourceInitiator.initiateResourceNegotiation(protocol, dis, dos); + } catch (final HandshakeException e) { + try { + commsSession.close(); + } catch (final IOException ioe) { + throw e; + } + } + } catch (final IOException e) { + } + + + final String peerUrl = "nifi://" + peerStatus.getHostname() + ":" + peerStatus.getPort(); + peer = new Peer(commsSession, peerUrl, clusterUrl); + + // perform handshake + try { + protocol.handshake(peer); + + // handle error cases + if ( protocol.isDestinationFull() ) { + logger.warn("{} {} indicates that port's destination is full; penalizing peer", this, peer); + penalize(peer, remoteDestination.getYieldPeriod(TimeUnit.MILLISECONDS)); + connectionStateQueue.offer(connectionState); + continue; + } else if ( protocol.isPortInvalid() ) { + penalize(peer, remoteDestination.getYieldPeriod(TimeUnit.MILLISECONDS)); + cleanup(protocol, peer); + throw new PortNotRunningException(peer.toString() + " indicates that port " + remoteDestination.getIdentifier() + " is not running"); + } else if ( protocol.isPortUnknown() ) { + penalize(peer, remoteDestination.getYieldPeriod(TimeUnit.MILLISECONDS)); + cleanup(protocol, peer); + throw new UnknownPortException(peer.toString() + " indicates that port " + remoteDestination.getIdentifier() + " is not known"); + } + + // negotiate the FlowFileCodec to use + codec = protocol.negotiateCodec(peer); + } catch (final PortNotRunningException | UnknownPortException e) { + throw e; + } catch (final Exception e) { + penalize(peer, remoteDestination.getYieldPeriod(TimeUnit.MILLISECONDS)); + cleanup(protocol, peer); + + final String message = String.format("%s failed to communicate with %s due to %s", this, peer == null ? clusterUrl : peer, e.toString()); + logger.error(message); + if ( logger.isDebugEnabled() ) { + logger.error("", e); + } + throw e; + } + + connectionState = new EndpointConnectionState(peer, protocol, codec); + } else { + final long lastTimeUsed = connectionState.getLastTimeUsed(); + final long millisSinceLastUse = System.currentTimeMillis() - lastTimeUsed; + final long timeoutMillis = remoteDestination.getCommunicationsTimeout(TimeUnit.MILLISECONDS); + + if ( timeoutMillis > 0L && millisSinceLastUse >= timeoutMillis ) { + cleanup(connectionState.getSocketClientProtocol(), connectionState.getPeer()); + connectionState = null; + } else { + codec = connectionState.getCodec(); + peer = connectionState.getPeer(); + commsSession = peer.getCommunicationsSession(); + protocol = connectionState.getSocketClientProtocol(); + } + } + } while ( connectionState == null || codec == null || commsSession == null || protocol == null ); + + return connectionState; + } + + + public boolean offer(final EndpointConnectionState endpointConnectionState) { + final Peer peer = endpointConnectionState.getPeer(); + if ( peer == null ) { + return false; + } + + final String url = peer.getUrl(); + if ( url == null ) { + return false; + } + + final BlockingQueue<EndpointConnectionState> queue = endpointConnectionMap.get(url); + if ( queue == null ) { + return false; + } + + return queue.offer(endpointConnectionState); + } + + /** + * Updates internal state map to penalize a PeerStatus that points to the specified peer + * @param peer + */ + public void penalize(final Peer peer, final long penalizationMillis) { + String host; + int port; + try { + final URI uri = new URI(peer.getUrl()); + host = uri.getHost(); + port = uri.getPort(); + } catch (final URISyntaxException e) { + host = peer.getHost(); + port = -1; + } + + final PeerStatus status = new PeerStatus(host, port, true, 1); + Long expiration = peerTimeoutExpirations.get(status); + if ( expiration == null ) { + expiration = Long.valueOf(0L); + } + + final long newExpiration = Math.max(expiration, System.currentTimeMillis() + penalizationMillis); + peerTimeoutExpirations.put(status, Long.valueOf(newExpiration)); + } + + private void cleanup(final SocketClientProtocol protocol, final Peer peer) { + if ( protocol != null && peer != null ) { + try { + protocol.shutdown(peer); + } catch (final TransmissionDisabledException e) { + // User disabled transmission.... do nothing. + logger.debug(this + " Transmission Disabled by User"); + } catch (IOException e1) { + } + } + + if ( peer != null ) { + try { + peer.close(); + } catch (final TransmissionDisabledException e) { + // User disabled transmission.... do nothing. + logger.debug(this + " Transmission Disabled by User"); + } catch (IOException e1) { + } + } + } + + private PeerStatus getNextPeerStatus(final TransferDirection direction) { + List<PeerStatus> peerList = peerStatuses; + if ( (peerList == null || peerList.isEmpty() || System.currentTimeMillis() > peerRefreshTime + PEER_REFRESH_PERIOD) && peerRefreshLock.tryLock() ) { + try { + try { + peerList = createPeerStatusList(direction); + } catch (final Exception e) { + final String message = String.format("%s Failed to update list of peers due to %s", this, e.toString()); + logger.warn(message); + if ( logger.isDebugEnabled() ) { + logger.warn("", e); + } + + eventReporter.reportEvent(Severity.WARNING, CATEGORY, message); + } + + this.peerStatuses = peerList; + peerRefreshTime = System.currentTimeMillis(); + } finally { + peerRefreshLock.unlock(); + } + } + + if ( peerList == null || peerList.isEmpty() ) { + return null; + } + + PeerStatus peerStatus; + for (int i=0; i < peerList.size(); i++) { + final long idx = peerIndex.getAndIncrement(); + final int listIndex = (int) (idx % peerList.size()); + peerStatus = peerList.get(listIndex); + + if ( isPenalized(peerStatus) ) { + logger.debug("{} {} is penalized; will not communicate with this peer", this, peerStatus); + } else { + return peerStatus; + } + } + + logger.debug("{} All peers appear to be penalized; returning null", this); + return null; + } + + private boolean isPenalized(final PeerStatus peerStatus) { + final Long expirationEnd = peerTimeoutExpirations.get(peerStatus); + return (expirationEnd == null ? false : expirationEnd > System.currentTimeMillis() ); + } + + private List<PeerStatus> createPeerStatusList(final TransferDirection direction) throws IOException, BadRequestException, HandshakeException, UnknownPortException, PortNotRunningException { + final Set<PeerStatus> statuses = getPeerStatuses(); + if ( statuses == null ) { + return new ArrayList<>(); + } + + final ClusterNodeInformation clusterNodeInfo = new ClusterNodeInformation(); + final List<NodeInformation> nodeInfos = new ArrayList<>(); + for ( final PeerStatus peerStatus : statuses ) { + final NodeInformation nodeInfo = new NodeInformation(peerStatus.getHostname(), peerStatus.getPort(), 0, peerStatus.isSecure(), peerStatus.getFlowFileCount()); + nodeInfos.add(nodeInfo); + } + clusterNodeInfo.setNodeInformation(nodeInfos); + return formulateDestinationList(clusterNodeInfo, direction); + } + + + public Set<PeerStatus> getPeerStatuses() { + final PeerStatusCache cache = this.peerStatusCache; + if (cache == null || cache.getStatuses() == null || cache.getStatuses().isEmpty()) { + return null; + } + + if (cache.getTimestamp() + PEER_CACHE_MILLIS < System.currentTimeMillis()) { + final Set<PeerStatus> equalizedSet = new HashSet<>(cache.getStatuses().size()); + for (final PeerStatus status : cache.getStatuses()) { + final PeerStatus equalizedStatus = new PeerStatus(status.getHostname(), status.getPort(), status.isSecure(), 1); + equalizedSet.add(equalizedStatus); + } + + return equalizedSet; + } + + return cache.getStatuses(); + } + + private Set<PeerStatus> fetchRemotePeerStatuses(final URI destinationUri, final boolean secure) throws IOException, HandshakeException, UnknownPortException, PortNotRunningException, BadRequestException { + final String hostname = destinationUri.getHost(); + final int port = destinationUri.getPort(); + + final CommunicationsSession commsSession = establishSiteToSiteConnection(hostname, port, secure); + final Peer peer = new Peer(commsSession, "nifi://" + hostname + ":" + port, destinationUri.toString()); + final SocketClientProtocol clientProtocol = new SocketClientProtocol(); + final DataInputStream dis = new DataInputStream(commsSession.getInput().getInputStream()); + final DataOutputStream dos = new DataOutputStream(commsSession.getOutput().getOutputStream()); + try { + RemoteResourceInitiator.initiateResourceNegotiation(clientProtocol, dis, dos); + } catch (final HandshakeException e) { + throw new BadRequestException(e.toString()); + } + + // TODO: Make the 30000 millis configurable + clientProtocol.handshake(peer, null, 30000); + final Set<PeerStatus> peerStatuses = clientProtocol.getPeerStatuses(peer); + persistPeerStatuses(peerStatuses); + + try { + clientProtocol.shutdown(peer); + } catch (final IOException e) { + final String message = String.format("%s Failed to shutdown protocol when updating list of peers due to %s", this, e.toString()); + logger.warn(message); + if (logger.isDebugEnabled()) { + logger.warn("", e); + } + } + + try { + peer.close(); + } catch (final IOException e) { + final String message = String.format("%s Failed to close resources when updating list of peers due to %s", this, e.toString()); + logger.warn(message); + if (logger.isDebugEnabled()) { + logger.warn("", e); + } + } + + return peerStatuses; + } + + + private void persistPeerStatuses(final Set<PeerStatus> statuses) { + if ( peersFile == null ) { + return; + } + + try (final OutputStream fos = new FileOutputStream(peersFile); + final OutputStream out = new BufferedOutputStream(fos)) { + + for (final PeerStatus status : statuses) { + final String line = status.getHostname() + ":" + status.getPort() + ":" + status.isSecure() + "\n"; + out.write(line.getBytes(StandardCharsets.UTF_8)); + } + + } catch (final IOException e) { + logger.error("Failed to persist list of Peers due to {}; if restarted and peer's NCM is down, may be unable to transfer data until communications with NCM are restored", e.toString(), e); + } + } + + private Set<PeerStatus> recoverPersistedPeerStatuses(final File file) throws IOException { + if (!file.exists()) { + return null; + } + + final Set<PeerStatus> statuses = new HashSet<>(); + try (final InputStream fis = new FileInputStream(file); + final BufferedReader reader = new BufferedReader(new InputStreamReader(fis))) { + + String line; + while ((line = reader.readLine()) != null) { + final String[] splits = line.split(Pattern.quote(":")); + if (splits.length != 3) { + continue; + } + + final String hostname = splits[0]; + final int port = Integer.parseInt(splits[1]); + final boolean secure = Boolean.parseBoolean(splits[2]); + + statuses.add(new PeerStatus(hostname, port, secure, 1)); + } + } + + return statuses; + } + + + public CommunicationsSession establishSiteToSiteConnection(final PeerStatus peerStatus) throws IOException { + return establishSiteToSiteConnection(peerStatus.getHostname(), peerStatus.getPort(), peerStatus.isSecure()); + } + + public CommunicationsSession establishSiteToSiteConnection(final String hostname, final int port, final boolean secure) throws IOException { + final String destinationUri = "nifi://" + hostname + ":" + port; + + CommunicationsSession commsSession = null; + try { + if ( secure ) { + if ( sslContext == null ) { + throw new IOException("Unable to communicate with " + hostname + ":" + port + " because it requires Secure Site-to-Site communications, but this instance is not configured for secure communications"); + } + + final SSLSocketChannel socketChannel = new SSLSocketChannel(sslContext, hostname, port, true); + socketChannel.connect(); + + commsSession = new SSLSocketChannelCommunicationsSession(socketChannel, destinationUri); + + try { + commsSession.setUserDn(socketChannel.getDn()); + } catch (final CertificateNotYetValidException | CertificateExpiredException ex) { + throw new IOException(ex); + } + } else { + final SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress(hostname, port)); + commsSession = new SocketChannelCommunicationsSession(socketChannel, destinationUri); + } + + commsSession.getOutput().getOutputStream().write(CommunicationsSession.MAGIC_BYTES); + + commsSession.setUri(destinationUri); + } catch (final IOException ioe) { + if ( commsSession != null ) { + commsSession.close(); + } + + throw ioe; + } + + return commsSession; + } + + +// private List<PeerStatus> formulateDestinationList(final ClusterNodeInformation clusterNodeInfo) throws IOException { +// return formulateDestinationList(clusterNodeInfo, getConnectableType()); +// } + + static List<PeerStatus> formulateDestinationList(final ClusterNodeInformation clusterNodeInfo, final TransferDirection direction) { + final Collection<NodeInformation> nodeInfoSet = clusterNodeInfo.getNodeInformation(); + final int numDestinations = Math.max(128, nodeInfoSet.size()); + final Map<NodeInformation, Integer> entryCountMap = new HashMap<>(); + + long totalFlowFileCount = 0L; + for (final NodeInformation nodeInfo : nodeInfoSet) { + totalFlowFileCount += nodeInfo.getTotalFlowFiles(); + } + + int totalEntries = 0; + for (final NodeInformation nodeInfo : nodeInfoSet) { + final int flowFileCount = nodeInfo.getTotalFlowFiles(); + // don't allow any node to get more than 80% of the data + final double percentageOfFlowFiles = Math.min(0.8D, ((double) flowFileCount / (double) totalFlowFileCount)); + final double relativeWeighting = (direction == TransferDirection.RECEIVE) ? (1 - percentageOfFlowFiles) : percentageOfFlowFiles; + final int entries = Math.max(1, (int) (numDestinations * relativeWeighting)); + + entryCountMap.put(nodeInfo, Math.max(1, entries)); + totalEntries += entries; + } + + final List<PeerStatus> destinations = new ArrayList<>(totalEntries); + for (int i=0; i < totalEntries; i++) { + destinations.add(null); + } + for ( final Map.Entry<NodeInformation, Integer> entry : entryCountMap.entrySet() ) { + final NodeInformation nodeInfo = entry.getKey(); + final int numEntries = entry.getValue(); + + int skipIndex = numEntries; + for (int i=0; i < numEntries; i++) { + int n = (skipIndex * i); + while (true) { + final int index = n % destinations.size(); + PeerStatus status = destinations.get(index); + if ( status == null ) { + status = new PeerStatus(nodeInfo.getHostname(), nodeInfo.getSiteToSitePort(), nodeInfo.isSiteToSiteSecure(), nodeInfo.getTotalFlowFiles()); + destinations.set(index, status); + break; + } else { + n++; + } + } + } + } + + final StringBuilder distributionDescription = new StringBuilder(); + distributionDescription.append("New Weighted Distribution of Nodes:"); + for ( final Map.Entry<NodeInformation, Integer> entry : entryCountMap.entrySet() ) { + final double percentage = entry.getValue() * 100D / (double) destinations.size(); + distributionDescription.append("\n").append(entry.getKey()).append(" will receive ").append(percentage).append("% of FlowFiles"); + } + logger.info(distributionDescription.toString()); + + // Jumble the list of destinations. + return destinations; + } + + + public void cleanupExpiredSockets() { + final List<EndpointConnectionState> states = new ArrayList<>(); + + for ( final BlockingQueue<EndpointConnectionState> queue : endpointConnectionMap.values() ) { + states.clear(); + + EndpointConnectionState state; + while ((state = queue.poll()) != null) { + // If the socket has not been used in 10 seconds, shut it down. + final long lastUsed = state.getLastTimeUsed(); + if ( lastUsed < System.currentTimeMillis() - 10000L ) { + try { + state.getSocketClientProtocol().shutdown(state.getPeer()); + } catch (final Exception e) { + logger.debug("Failed to shut down {} using {} due to {}", + new Object[] {state.getSocketClientProtocol(), state.getPeer(), e} ); + } + + cleanup(state.getSocketClientProtocol(), state.getPeer()); + } else { + states.add(state); + } + } + + queue.addAll(states); + } + } + + public void shutdown() { + peerTimeoutExpirations.clear(); + + for ( final CommunicationsSession commsSession : activeCommsChannels ) { + commsSession.interrupt(); + } + + for ( final BlockingQueue<EndpointConnectionState> queue : endpointConnectionMap.values() ) { + EndpointConnectionState state; + while ( (state = queue.poll()) != null) { + cleanup(state.getSocketClientProtocol(), state.getPeer()); + } + } + + endpointConnectionMap.clear(); + } + + public void refreshPeers(final URI targetUri, final boolean secure) { + final PeerStatusCache existingCache = peerStatusCache; + if (existingCache != null && (existingCache.getTimestamp() + PEER_CACHE_MILLIS > System.currentTimeMillis())) { + return; + } + + try { + final Set<PeerStatus> statuses = fetchRemotePeerStatuses(targetUri, secure); + peerStatusCache = new PeerStatusCache(statuses); + logger.info("{} Successfully refreshed Peer Status; remote instance consists of {} peers", this, statuses.size()); + } catch (Exception e) { + logger.warn("{} Unable to refresh Remote Group's peers due to {}", this, e); + if (logger.isDebugEnabled()) { + logger.warn("", e); + } + } + + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/fdf75846/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java ---------------------------------------------------------------------- diff --git a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java new file mode 100644 index 0000000..48e9cc5 --- /dev/null +++ b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.remote.client.socket; + +import java.io.IOException; + +import org.apache.nifi.remote.client.DataPacket; +import org.apache.nifi.remote.client.SiteToSiteClient; + +public class SocketClient implements SiteToSiteClient { + + @Override + public void send(final DataPacket dataPacket) throws IOException { + // TODO Auto-generated method stub + } + + @Override + public DataPacket receive() throws IOException { + // TODO Auto-generated method stub + return null; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/fdf75846/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/cluster/AdaptedNodeInformation.java ---------------------------------------------------------------------- diff --git a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/cluster/AdaptedNodeInformation.java b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/cluster/AdaptedNodeInformation.java new file mode 100644 index 0000000..6ca5812 --- /dev/null +++ b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/cluster/AdaptedNodeInformation.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.remote.cluster; + +public class AdaptedNodeInformation { + + private String hostname; + private Integer siteToSitePort; + private int apiPort; + private boolean isSiteToSiteSecure; + private int totalFlowFiles; + + public String getHostname() { + return hostname; + } + + public void setHostname(String hostname) { + this.hostname = hostname; + } + + public Integer getSiteToSitePort() { + return siteToSitePort; + } + + public void setSiteToSitePort(Integer siteToSitePort) { + this.siteToSitePort = siteToSitePort; + } + + public int getApiPort() { + return apiPort; + } + + public void setApiPort(int apiPort) { + this.apiPort = apiPort; + } + + public boolean isSiteToSiteSecure() { + return isSiteToSiteSecure; + } + + public void setSiteToSiteSecure(boolean isSiteToSiteSecure) { + this.isSiteToSiteSecure = isSiteToSiteSecure; + } + + public int getTotalFlowFiles() { + return totalFlowFiles; + } + + public void setTotalFlowFiles(int totalFlowFiles) { + this.totalFlowFiles = totalFlowFiles; + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/fdf75846/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/cluster/ClusterNodeInformation.java ---------------------------------------------------------------------- diff --git a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/cluster/ClusterNodeInformation.java b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/cluster/ClusterNodeInformation.java new file mode 100644 index 0000000..1bc83b9 --- /dev/null +++ b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/cluster/ClusterNodeInformation.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.remote.cluster; + +import java.io.InputStream; +import java.io.OutputStream; +import java.util.Collection; + +import javax.xml.bind.JAXBContext; +import javax.xml.bind.JAXBException; +import javax.xml.bind.Marshaller; +import javax.xml.bind.Unmarshaller; +import javax.xml.bind.annotation.XmlRootElement; +import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter; + +@XmlRootElement +public class ClusterNodeInformation { + + private Collection<NodeInformation> nodeInfo; + + private static final JAXBContext JAXB_CONTEXT; + + static { + try { + JAXB_CONTEXT = JAXBContext.newInstance(ClusterNodeInformation.class); + } catch (JAXBException e) { + throw new RuntimeException("Unable to create JAXBContext.", e); + } + } + + public ClusterNodeInformation() { + this.nodeInfo = null; + } + + public void setNodeInformation(final Collection<NodeInformation> nodeInfo) { + this.nodeInfo = nodeInfo; + } + + @XmlJavaTypeAdapter(NodeInformationAdapter.class) + public Collection<NodeInformation> getNodeInformation() { + return nodeInfo; + } + + public void marshal(final OutputStream os) throws JAXBException { + final Marshaller marshaller = JAXB_CONTEXT.createMarshaller(); + marshaller.marshal(this, os); + } + + public static ClusterNodeInformation unmarshal(final InputStream is) throws JAXBException { + final Unmarshaller unmarshaller = JAXB_CONTEXT.createUnmarshaller(); + return (ClusterNodeInformation) unmarshaller.unmarshal(is); + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/fdf75846/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/cluster/NodeInformant.java ---------------------------------------------------------------------- diff --git a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/cluster/NodeInformant.java b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/cluster/NodeInformant.java new file mode 100644 index 0000000..e46ff5c --- /dev/null +++ b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/cluster/NodeInformant.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.remote.cluster; + +public interface NodeInformant { + + ClusterNodeInformation getNodeInformation(); +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/fdf75846/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/cluster/NodeInformation.java ---------------------------------------------------------------------- diff --git a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/cluster/NodeInformation.java b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/cluster/NodeInformation.java new file mode 100644 index 0000000..2041268 --- /dev/null +++ b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/cluster/NodeInformation.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.remote.cluster; + +public class NodeInformation { + + private final String hostname; + private final Integer siteToSitePort; + private final int apiPort; + private final boolean isSiteToSiteSecure; + private final int totalFlowFiles; + + public NodeInformation(final String hostname, final Integer siteToSitePort, final int apiPort, + final boolean isSiteToSiteSecure, final int totalFlowFiles) { + this.hostname = hostname; + this.siteToSitePort = siteToSitePort; + this.apiPort = apiPort; + this.isSiteToSiteSecure = isSiteToSiteSecure; + this.totalFlowFiles = totalFlowFiles; + } + + public String getHostname() { + return hostname; + } + + public int getAPIPort() { + return apiPort; + } + + public Integer getSiteToSitePort() { + return siteToSitePort; + } + + public boolean isSiteToSiteSecure() { + return isSiteToSiteSecure; + } + + public int getTotalFlowFiles() { + return totalFlowFiles; + } + + @Override + public boolean equals(final Object obj) { + if (obj == this) { + return true; + } + if (obj == null) { + return false; + } + if (!(obj instanceof NodeInformation)) { + return false; + } + + final NodeInformation other = (NodeInformation) obj; + if (!hostname.equals(other.hostname)) { + return false; + } + if (siteToSitePort == null && other.siteToSitePort != null) { + return false; + } + if (siteToSitePort != null && other.siteToSitePort == null) { + return false; + } else if (siteToSitePort != null && siteToSitePort.intValue() != other.siteToSitePort.intValue()) { + return false; + } + if (apiPort != other.apiPort) { + return false; + } + if (isSiteToSiteSecure != other.isSiteToSiteSecure) { + return false; + } + return true; + } + + @Override + public int hashCode() { + return 83832 + hostname.hashCode() + (siteToSitePort == null ? 8 : siteToSitePort.hashCode()) + apiPort + (isSiteToSiteSecure ? 3829 : 0); + } + + @Override + public String toString() { + return "Node[" + hostname + ":" + apiPort + "]"; + } +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/fdf75846/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/cluster/NodeInformationAdapter.java ---------------------------------------------------------------------- diff --git a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/cluster/NodeInformationAdapter.java b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/cluster/NodeInformationAdapter.java new file mode 100644 index 0000000..440463c --- /dev/null +++ b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/cluster/NodeInformationAdapter.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.remote.cluster; + +import javax.xml.bind.annotation.adapters.XmlAdapter; + +import org.apache.nifi.remote.cluster.NodeInformation; + +public class NodeInformationAdapter extends XmlAdapter<AdaptedNodeInformation, NodeInformation> { + + @Override + public NodeInformation unmarshal(final AdaptedNodeInformation adapted) throws Exception { + return new NodeInformation(adapted.getHostname(), adapted.getSiteToSitePort(), adapted.getApiPort(), adapted.isSiteToSiteSecure(), adapted.getTotalFlowFiles()); + } + + @Override + public AdaptedNodeInformation marshal(final NodeInformation nodeInformation) throws Exception { + final AdaptedNodeInformation adapted = new AdaptedNodeInformation(); + adapted.setHostname(nodeInformation.getHostname()); + adapted.setSiteToSitePort(nodeInformation.getSiteToSitePort()); + adapted.setApiPort(nodeInformation.getAPIPort()); + adapted.setSiteToSiteSecure(nodeInformation.isSiteToSiteSecure()); + adapted.setTotalFlowFiles(nodeInformation.getTotalFlowFiles()); + return adapted; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/fdf75846/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/codec/FlowFileCodec.java ---------------------------------------------------------------------- diff --git a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/codec/FlowFileCodec.java b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/codec/FlowFileCodec.java new file mode 100644 index 0000000..b4206b3 --- /dev/null +++ b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/codec/FlowFileCodec.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.remote.codec; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.List; + +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.remote.VersionedRemoteResource; +import org.apache.nifi.remote.exception.ProtocolException; +import org.apache.nifi.remote.exception.TransmissionDisabledException; + +/** + * <p> + * Provides a mechanism for encoding and decoding FlowFiles as streams so that + * they can be transferred remotely. + * </p> + */ +public interface FlowFileCodec extends VersionedRemoteResource { + + /** + * Returns a List of all versions that this codec is able to support, in the + * order that they are preferred by the codec + * + * @return + */ + public List<Integer> getSupportedVersions(); + + /** + * Encodes a FlowFile and its content as a single stream of data and writes + * that stream to the output. If checksum is not null, it will be calculated + * as the stream is read + * + * @param flowFile the FlowFile to encode + * @param session a session that can be used to transactionally create and + * transfer flow files + * @param outStream the stream to write the data to + * + * @return the updated FlowFile + * + * @throws IOException + */ + FlowFile encode(FlowFile flowFile, ProcessSession session, OutputStream outStream) throws IOException, TransmissionDisabledException; + + /** + * Decodes the contents of the InputStream, interpreting the data to + * determine the next FlowFile's attributes and content, as well as their + * destinations. If not null, checksum will be used to calculate the + * checksum as the data is read. + * + * @param stream an InputStream containing FlowFiles' contents, attributes, + * and destinations + * @param session + * + * @return the FlowFile that was created, or <code>null</code> if the stream + * was out of data + * + * @throws IOException + * @throws ProtocolException if the input is malformed + */ + FlowFile decode(InputStream stream, ProcessSession session) throws IOException, ProtocolException, TransmissionDisabledException; +}