http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/fdf75846/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/util/PeerStatusCache.java ---------------------------------------------------------------------- diff --git a/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/util/PeerStatusCache.java b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/util/PeerStatusCache.java new file mode 100644 index 0000000..6dab77b --- /dev/null +++ b/nifi/commons/site-to-site-client/src/main/java/org/apache/nifi/remote/util/PeerStatusCache.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.remote.util; + +import java.util.Set; + +import org.apache.nifi.remote.PeerStatus; + +public class PeerStatusCache { + private final Set<PeerStatus> statuses; + private final long timestamp; + + public PeerStatusCache(final Set<PeerStatus> statuses) { + this(statuses, System.currentTimeMillis()); + } + + public PeerStatusCache(final Set<PeerStatus> statuses, final long timestamp) { + this.statuses = statuses; + this.timestamp = timestamp; + } + + public Set<PeerStatus> getStatuses() { + return statuses; + } + + public long getTimestamp() { + return timestamp; + } +}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/fdf75846/nifi/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/ClusterManager.java ---------------------------------------------------------------------- diff --git a/nifi/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/ClusterManager.java b/nifi/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/ClusterManager.java index 0fcac8c..00a7687 100644 --- a/nifi/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/ClusterManager.java +++ b/nifi/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/ClusterManager.java @@ -23,7 +23,6 @@ import org.apache.nifi.cluster.manager.exception.IneligiblePrimaryNodeException; import org.apache.nifi.cluster.manager.exception.IllegalNodeDeletionException; import org.apache.nifi.cluster.manager.exception.IllegalNodeReconnectionException; import org.apache.nifi.cluster.manager.exception.NodeDisconnectionException; -import org.apache.nifi.cluster.NodeInformant; import org.apache.nifi.cluster.event.Event; import org.apache.nifi.cluster.node.Node; import org.apache.nifi.cluster.node.Node.Status; @@ -33,6 +32,7 @@ import org.apache.nifi.cluster.protocol.Heartbeat; import org.apache.nifi.cluster.protocol.NodeIdentifier; import org.apache.nifi.controller.status.ProcessGroupStatus; import org.apache.nifi.diagnostics.SystemDiagnostics; +import org.apache.nifi.remote.cluster.NodeInformant; import org.apache.nifi.reporting.BulletinRepository; import java.util.List; http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/fdf75846/nifi/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java ---------------------------------------------------------------------- diff --git a/nifi/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java b/nifi/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java index 511bb7d..ea523b0 100644 --- a/nifi/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java +++ b/nifi/nar-bundles/framework-bundle/framework/cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java @@ -65,9 +65,7 @@ import javax.xml.validation.Validator; import org.apache.nifi.admin.service.AuditService; import org.apache.nifi.cluster.BulletinsPayload; -import org.apache.nifi.cluster.ClusterNodeInformation; import org.apache.nifi.cluster.HeartbeatPayload; -import org.apache.nifi.cluster.NodeInformation; import org.apache.nifi.cluster.context.ClusterContext; import org.apache.nifi.cluster.context.ClusterContextImpl; import org.apache.nifi.cluster.event.Event; @@ -155,6 +153,8 @@ import org.apache.nifi.processor.StandardValidationContextFactory; import org.apache.nifi.remote.RemoteResourceManager; import org.apache.nifi.remote.RemoteSiteListener; import org.apache.nifi.remote.SocketRemoteSiteListener; +import org.apache.nifi.remote.cluster.ClusterNodeInformation; +import org.apache.nifi.remote.cluster.NodeInformation; import org.apache.nifi.remote.protocol.socket.ClusterManagerServerProtocol; import org.apache.nifi.reporting.Bulletin; import org.apache.nifi.reporting.BulletinRepository; @@ -191,7 +191,6 @@ import org.apache.nifi.web.api.entity.ProvenanceEventEntity; import org.apache.nifi.web.api.entity.RemoteProcessGroupEntity; import org.apache.nifi.web.api.entity.RemoteProcessGroupsEntity; import org.apache.nifi.web.util.WebUtils; - import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/fdf75846/nifi/nar-bundles/framework-bundle/framework/core-api/pom.xml ---------------------------------------------------------------------- diff --git a/nifi/nar-bundles/framework-bundle/framework/core-api/pom.xml b/nifi/nar-bundles/framework-bundle/framework/core-api/pom.xml index f8d8e13..643121e 100644 --- a/nifi/nar-bundles/framework-bundle/framework/core-api/pom.xml +++ b/nifi/nar-bundles/framework-bundle/framework/core-api/pom.xml @@ -34,6 +34,11 @@ </dependency> <dependency> <groupId>org.apache.nifi</groupId> + <artifactId>site-to-site-client</artifactId> + </dependency> + + <dependency> + <groupId>org.apache.nifi</groupId> <artifactId>nifi-runtime</artifactId> </dependency> <dependency> http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/fdf75846/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/cluster/AdaptedNodeInformation.java ---------------------------------------------------------------------- diff --git a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/cluster/AdaptedNodeInformation.java b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/cluster/AdaptedNodeInformation.java deleted file mode 100644 index 0092f7a..0000000 --- a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/cluster/AdaptedNodeInformation.java +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.cluster; - -public class AdaptedNodeInformation { - - private String hostname; - private Integer siteToSitePort; - private int apiPort; - private boolean isSiteToSiteSecure; - private int totalFlowFiles; - - public String getHostname() { - return hostname; - } - - public void setHostname(String hostname) { - this.hostname = hostname; - } - - public Integer getSiteToSitePort() { - return siteToSitePort; - } - - public void setSiteToSitePort(Integer siteToSitePort) { - this.siteToSitePort = siteToSitePort; - } - - public int getApiPort() { - return apiPort; - } - - public void setApiPort(int apiPort) { - this.apiPort = apiPort; - } - - public boolean isSiteToSiteSecure() { - return isSiteToSiteSecure; - } - - public void setSiteToSiteSecure(boolean isSiteToSiteSecure) { - this.isSiteToSiteSecure = isSiteToSiteSecure; - } - - public int getTotalFlowFiles() { - return totalFlowFiles; - } - - public void setTotalFlowFiles(int totalFlowFiles) { - this.totalFlowFiles = totalFlowFiles; - } -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/fdf75846/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/cluster/ClusterNodeInformation.java ---------------------------------------------------------------------- diff --git a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/cluster/ClusterNodeInformation.java b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/cluster/ClusterNodeInformation.java deleted file mode 100644 index 5751c32..0000000 --- a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/cluster/ClusterNodeInformation.java +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.cluster; - -import java.io.InputStream; -import java.io.OutputStream; -import java.util.Collection; - -import javax.xml.bind.JAXBContext; -import javax.xml.bind.JAXBException; -import javax.xml.bind.Marshaller; -import javax.xml.bind.Unmarshaller; -import javax.xml.bind.annotation.XmlRootElement; -import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter; - -@XmlRootElement -public class ClusterNodeInformation { - - private Collection<NodeInformation> nodeInfo; - - private static final JAXBContext JAXB_CONTEXT; - - static { - try { - JAXB_CONTEXT = JAXBContext.newInstance(ClusterNodeInformation.class); - } catch (JAXBException e) { - throw new RuntimeException("Unable to create JAXBContext.", e); - } - } - - public ClusterNodeInformation() { - this.nodeInfo = null; - } - - public void setNodeInformation(final Collection<NodeInformation> nodeInfo) { - this.nodeInfo = nodeInfo; - } - - @XmlJavaTypeAdapter(NodeInformationAdapter.class) - public Collection<NodeInformation> getNodeInformation() { - return nodeInfo; - } - - public void marshal(final OutputStream os) throws JAXBException { - final Marshaller marshaller = JAXB_CONTEXT.createMarshaller(); - marshaller.marshal(this, os); - } - - public static ClusterNodeInformation unmarshal(final InputStream is) throws JAXBException { - final Unmarshaller unmarshaller = JAXB_CONTEXT.createUnmarshaller(); - return (ClusterNodeInformation) unmarshaller.unmarshal(is); - } -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/fdf75846/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/cluster/NodeInformant.java ---------------------------------------------------------------------- diff --git a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/cluster/NodeInformant.java b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/cluster/NodeInformant.java deleted file mode 100644 index 987ff65..0000000 --- a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/cluster/NodeInformant.java +++ /dev/null @@ -1,22 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.cluster; - -public interface NodeInformant { - - ClusterNodeInformation getNodeInformation(); -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/fdf75846/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/cluster/NodeInformation.java ---------------------------------------------------------------------- diff --git a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/cluster/NodeInformation.java b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/cluster/NodeInformation.java deleted file mode 100644 index 848eb7e..0000000 --- a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/cluster/NodeInformation.java +++ /dev/null @@ -1,98 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.cluster; - -public class NodeInformation { - - private final String hostname; - private final Integer siteToSitePort; - private final int apiPort; - private final boolean isSiteToSiteSecure; - private final int totalFlowFiles; - - public NodeInformation(final String hostname, final Integer siteToSitePort, final int apiPort, - final boolean isSiteToSiteSecure, final int totalFlowFiles) { - this.hostname = hostname; - this.siteToSitePort = siteToSitePort; - this.apiPort = apiPort; - this.isSiteToSiteSecure = isSiteToSiteSecure; - this.totalFlowFiles = totalFlowFiles; - } - - public String getHostname() { - return hostname; - } - - public int getAPIPort() { - return apiPort; - } - - public Integer getSiteToSitePort() { - return siteToSitePort; - } - - public boolean isSiteToSiteSecure() { - return isSiteToSiteSecure; - } - - public int getTotalFlowFiles() { - return totalFlowFiles; - } - - @Override - public boolean equals(final Object obj) { - if (obj == this) { - return true; - } - if (obj == null) { - return false; - } - if (!(obj instanceof NodeInformation)) { - return false; - } - - final NodeInformation other = (NodeInformation) obj; - if (!hostname.equals(other.hostname)) { - return false; - } - if (siteToSitePort == null && other.siteToSitePort != null) { - return false; - } - if (siteToSitePort != null && other.siteToSitePort == null) { - return false; - } else if (siteToSitePort != null && siteToSitePort.intValue() != other.siteToSitePort.intValue()) { - return false; - } - if (apiPort != other.apiPort) { - return false; - } - if (isSiteToSiteSecure != other.isSiteToSiteSecure) { - return false; - } - return true; - } - - @Override - public int hashCode() { - return 83832 + hostname.hashCode() + (siteToSitePort == null ? 8 : siteToSitePort.hashCode()) + apiPort + (isSiteToSiteSecure ? 3829 : 0); - } - - @Override - public String toString() { - return "Node[" + hostname + ":" + apiPort + "]"; - } -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/fdf75846/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/cluster/NodeInformationAdapter.java ---------------------------------------------------------------------- diff --git a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/cluster/NodeInformationAdapter.java b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/cluster/NodeInformationAdapter.java deleted file mode 100644 index 630631f..0000000 --- a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/cluster/NodeInformationAdapter.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.cluster; - -import javax.xml.bind.annotation.adapters.XmlAdapter; - -public class NodeInformationAdapter extends XmlAdapter<AdaptedNodeInformation, NodeInformation> { - - @Override - public NodeInformation unmarshal(final AdaptedNodeInformation adapted) throws Exception { - return new NodeInformation(adapted.getHostname(), adapted.getSiteToSitePort(), adapted.getApiPort(), adapted.isSiteToSiteSecure(), adapted.getTotalFlowFiles()); - } - - @Override - public AdaptedNodeInformation marshal(final NodeInformation nodeInformation) throws Exception { - final AdaptedNodeInformation adapted = new AdaptedNodeInformation(); - adapted.setHostname(nodeInformation.getHostname()); - adapted.setSiteToSitePort(nodeInformation.getSiteToSitePort()); - adapted.setApiPort(nodeInformation.getAPIPort()); - adapted.setSiteToSiteSecure(nodeInformation.isSiteToSiteSecure()); - adapted.setTotalFlowFiles(nodeInformation.getTotalFlowFiles()); - return adapted; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/fdf75846/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroup.java ---------------------------------------------------------------------- diff --git a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroup.java b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroup.java index e0cca64..2e35422 100644 --- a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroup.java +++ b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroup.java @@ -27,7 +27,6 @@ import org.apache.nifi.controller.exception.CommunicationsException; import org.apache.nifi.events.EventReporter; import org.apache.nifi.remote.PeerStatus; import org.apache.nifi.remote.RemoteGroupPort; -import org.apache.nifi.remote.protocol.CommunicationsSession; public interface RemoteProcessGroup { @@ -211,11 +210,6 @@ public interface RemoteProcessGroup { */ void removeNonExistentPort(final RemoteGroupPort port); - /** - * - * @return @throws IOException - */ - CommunicationsSession establishSiteToSiteConnection() throws IOException; /** * Called whenever RemoteProcessGroup is removed from the flow, so that any http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/fdf75846/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/Peer.java ---------------------------------------------------------------------- diff --git a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/Peer.java b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/Peer.java deleted file mode 100644 index 2422fe1..0000000 --- a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/Peer.java +++ /dev/null @@ -1,107 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.remote; - -import java.io.IOException; -import java.net.URI; - -import org.apache.nifi.remote.protocol.CommunicationsSession; - -public class Peer { - - private final CommunicationsSession commsSession; - private final String url; - private final String host; - private long penalizationExpiration = 0L; - private boolean closed = false; - - public Peer(final CommunicationsSession commsSession, final String url) { - this.commsSession = commsSession; - this.url = url; - - try { - this.host = new URI(url).getHost(); - } catch (final Exception e) { - throw new IllegalArgumentException("Invalid URL: " + url); - } - } - - public String getUrl() { - return url; - } - - public CommunicationsSession getCommunicationsSession() { - return commsSession; - } - - public void close() throws IOException { - this.closed = true; - - // TODO: Consume the InputStream so that it doesn't linger on the Peer's outgoing socket buffer - commsSession.close(); - } - - public void penalize(final long millis) { - penalizationExpiration = Math.max(penalizationExpiration, System.currentTimeMillis() + millis); - } - - public boolean isPenalized() { - return penalizationExpiration > System.currentTimeMillis(); - } - - public boolean isClosed() { - return closed; - } - - public String getHost() { - return host; - } - - @Override - public int hashCode() { - return 8320 + url.hashCode(); - } - - @Override - public boolean equals(final Object obj) { - if (obj == null) { - return false; - } - if (obj == this) { - return true; - } - if (!(obj instanceof Peer)) { - return false; - } - - final Peer other = (Peer) obj; - return this.url.equals(other.url); - } - - @Override - public String toString() { - final StringBuilder sb = new StringBuilder(); - sb.append("Peer[url=").append(url); - if (closed) { - sb.append(",CLOSED"); - } else if (isPenalized()) { - sb.append(",PENALIZED"); - } - sb.append("]"); - return sb.toString(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/fdf75846/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/PeerStatus.java ---------------------------------------------------------------------- diff --git a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/PeerStatus.java b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/PeerStatus.java deleted file mode 100644 index d1cb076..0000000 --- a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/PeerStatus.java +++ /dev/null @@ -1,72 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.remote; - -public class PeerStatus { - - private final String hostname; - private final int port; - private final boolean secure; - private final int numFlowFiles; - - public PeerStatus(final String hostname, final int port, final boolean secure, final int numFlowFiles) { - this.hostname = hostname; - this.port = port; - this.secure = secure; - this.numFlowFiles = numFlowFiles; - } - - public String getHostname() { - return hostname; - } - - public int getPort() { - return port; - } - - public boolean isSecure() { - return secure; - } - - public int getFlowFileCount() { - return numFlowFiles; - } - - @Override - public String toString() { - return "PeerStatus[hostname=" + hostname + ",port=" + port + ",secure=" + secure + ",flowFileCount=" + numFlowFiles + "]"; - } - - @Override - public int hashCode() { - return 9824372 + hostname.hashCode() + port; - } - - @Override - public boolean equals(final Object obj) { - if (obj == null) { - return false; - } - - if (!(obj instanceof PeerStatus)) { - return false; - } - - final PeerStatus other = (PeerStatus) obj; - return port == other.port && hostname.equals(other.hostname); - } -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/fdf75846/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/PortAuthorizationResult.java ---------------------------------------------------------------------- diff --git a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/PortAuthorizationResult.java b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/PortAuthorizationResult.java deleted file mode 100644 index 8f2603a..0000000 --- a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/PortAuthorizationResult.java +++ /dev/null @@ -1,25 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.remote; - -public interface PortAuthorizationResult { - - boolean isAuthorized(); - - String getExplanation(); - -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/fdf75846/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/RemoteAuthorizationState.java ---------------------------------------------------------------------- diff --git a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/RemoteAuthorizationState.java b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/RemoteAuthorizationState.java deleted file mode 100644 index 12a3d33..0000000 --- a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/RemoteAuthorizationState.java +++ /dev/null @@ -1,27 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.remote; - -/** - * - */ -public enum RemoteAuthorizationState { - - UNKNOWN, - UNAUTHORIZED, - AUTHORIZED; -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/fdf75846/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/RemoteGroupPort.java ---------------------------------------------------------------------- diff --git a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/RemoteGroupPort.java b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/RemoteGroupPort.java index d4ad374..f08277c 100644 --- a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/RemoteGroupPort.java +++ b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/RemoteGroupPort.java @@ -16,20 +16,28 @@ */ package org.apache.nifi.remote; +import org.apache.nifi.connectable.ConnectableType; import org.apache.nifi.connectable.Port; +import org.apache.nifi.controller.AbstractPort; +import org.apache.nifi.controller.ProcessScheduler; +import org.apache.nifi.groups.ProcessGroup; import org.apache.nifi.groups.RemoteProcessGroup; -public interface RemoteGroupPort extends Port { +public abstract class RemoteGroupPort extends AbstractPort implements Port, RemoteDestination { - RemoteProcessGroup getRemoteProcessGroup(); + public RemoteGroupPort(String id, String name, ProcessGroup processGroup, ConnectableType type, ProcessScheduler scheduler) { + super(id, name, processGroup, type, scheduler); + } - TransferDirection getTransferDirection(); + public abstract RemoteProcessGroup getRemoteProcessGroup(); - boolean isUseCompression(); + public abstract TransferDirection getTransferDirection(); - void setUseCompression(boolean useCompression); + public abstract boolean isUseCompression(); - boolean getTargetExists(); + public abstract void setUseCompression(boolean useCompression); - boolean isTargetRunning(); + public abstract boolean getTargetExists(); + + public abstract boolean isTargetRunning(); } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/fdf75846/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/TransferDirection.java ---------------------------------------------------------------------- diff --git a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/TransferDirection.java b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/TransferDirection.java deleted file mode 100644 index 56432d5..0000000 --- a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/TransferDirection.java +++ /dev/null @@ -1,23 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.remote; - -public enum TransferDirection { - - SEND, - RECEIVE; -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/fdf75846/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/VersionedRemoteResource.java ---------------------------------------------------------------------- diff --git a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/VersionedRemoteResource.java b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/VersionedRemoteResource.java deleted file mode 100644 index bfccd98..0000000 --- a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/VersionedRemoteResource.java +++ /dev/null @@ -1,24 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.remote; - -public interface VersionedRemoteResource { - - VersionNegotiator getVersionNegotiator(); - - String getResourceName(); -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/fdf75846/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/codec/FlowFileCodec.java ---------------------------------------------------------------------- diff --git a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/codec/FlowFileCodec.java b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/codec/FlowFileCodec.java deleted file mode 100644 index b4206b3..0000000 --- a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/codec/FlowFileCodec.java +++ /dev/null @@ -1,79 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.remote.codec; - -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.util.List; - -import org.apache.nifi.flowfile.FlowFile; -import org.apache.nifi.processor.ProcessSession; -import org.apache.nifi.remote.VersionedRemoteResource; -import org.apache.nifi.remote.exception.ProtocolException; -import org.apache.nifi.remote.exception.TransmissionDisabledException; - -/** - * <p> - * Provides a mechanism for encoding and decoding FlowFiles as streams so that - * they can be transferred remotely. - * </p> - */ -public interface FlowFileCodec extends VersionedRemoteResource { - - /** - * Returns a List of all versions that this codec is able to support, in the - * order that they are preferred by the codec - * - * @return - */ - public List<Integer> getSupportedVersions(); - - /** - * Encodes a FlowFile and its content as a single stream of data and writes - * that stream to the output. If checksum is not null, it will be calculated - * as the stream is read - * - * @param flowFile the FlowFile to encode - * @param session a session that can be used to transactionally create and - * transfer flow files - * @param outStream the stream to write the data to - * - * @return the updated FlowFile - * - * @throws IOException - */ - FlowFile encode(FlowFile flowFile, ProcessSession session, OutputStream outStream) throws IOException, TransmissionDisabledException; - - /** - * Decodes the contents of the InputStream, interpreting the data to - * determine the next FlowFile's attributes and content, as well as their - * destinations. If not null, checksum will be used to calculate the - * checksum as the data is read. - * - * @param stream an InputStream containing FlowFiles' contents, attributes, - * and destinations - * @param session - * - * @return the FlowFile that was created, or <code>null</code> if the stream - * was out of data - * - * @throws IOException - * @throws ProtocolException if the input is malformed - */ - FlowFile decode(InputStream stream, ProcessSession session) throws IOException, ProtocolException, TransmissionDisabledException; -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/fdf75846/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/exception/BadRequestException.java ---------------------------------------------------------------------- diff --git a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/exception/BadRequestException.java b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/exception/BadRequestException.java deleted file mode 100644 index f6c2f4f..0000000 --- a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/exception/BadRequestException.java +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.remote.exception; - -public class BadRequestException extends Exception { - - private static final long serialVersionUID = -8034602852256106560L; - - public BadRequestException(final String message) { - super(message); - } - - public BadRequestException(final Throwable cause) { - super(cause); - } -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/fdf75846/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/exception/HandshakeException.java ---------------------------------------------------------------------- diff --git a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/exception/HandshakeException.java b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/exception/HandshakeException.java deleted file mode 100644 index b61fc65..0000000 --- a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/exception/HandshakeException.java +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.remote.exception; - -public class HandshakeException extends Exception { - - private static final long serialVersionUID = 178192341908726L; - - public HandshakeException(final String message) { - super(message); - } - - public HandshakeException(final Throwable cause) { - super(cause); - } -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/fdf75846/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/exception/NotAuthorizedException.java ---------------------------------------------------------------------- diff --git a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/exception/NotAuthorizedException.java b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/exception/NotAuthorizedException.java deleted file mode 100644 index 24ff3a5..0000000 --- a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/exception/NotAuthorizedException.java +++ /dev/null @@ -1,26 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.remote.exception; - -public class NotAuthorizedException extends Exception { - - private static final long serialVersionUID = 2952623568114035498L; - - public NotAuthorizedException(final String message) { - super(message); - } -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/fdf75846/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/exception/PortNotRunningException.java ---------------------------------------------------------------------- diff --git a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/exception/PortNotRunningException.java b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/exception/PortNotRunningException.java deleted file mode 100644 index af0f467..0000000 --- a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/exception/PortNotRunningException.java +++ /dev/null @@ -1,26 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.remote.exception; - -public class PortNotRunningException extends Exception { - - private static final long serialVersionUID = -2790940982005516375L; - - public PortNotRunningException(final String message) { - super(message); - } -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/fdf75846/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/exception/ProtocolException.java ---------------------------------------------------------------------- diff --git a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/exception/ProtocolException.java b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/exception/ProtocolException.java deleted file mode 100644 index 0f50b98..0000000 --- a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/exception/ProtocolException.java +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.remote.exception; - -public class ProtocolException extends Exception { - - private static final long serialVersionUID = 5763900324505818495L; - - public ProtocolException(final String message, final Throwable cause) { - super(message, cause); - } - - public ProtocolException(final String message) { - super(message); - } - - public ProtocolException(final Throwable cause) { - super(cause); - } -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/fdf75846/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/exception/RequestExpiredException.java ---------------------------------------------------------------------- diff --git a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/exception/RequestExpiredException.java b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/exception/RequestExpiredException.java deleted file mode 100644 index dd675b3..0000000 --- a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/exception/RequestExpiredException.java +++ /dev/null @@ -1,26 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.remote.exception; - -/** - * Used to indicate that by the time the request was serviced, it had already - * expired - */ -public class RequestExpiredException extends Exception { - - private static final long serialVersionUID = -7037025330562827852L; -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/fdf75846/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/exception/UnknownPortException.java ---------------------------------------------------------------------- diff --git a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/exception/UnknownPortException.java b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/exception/UnknownPortException.java deleted file mode 100644 index e6a0fe7..0000000 --- a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/exception/UnknownPortException.java +++ /dev/null @@ -1,26 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.remote.exception; - -public class UnknownPortException extends Exception { - - private static final long serialVersionUID = -2790940982005516375L; - - public UnknownPortException(final String message) { - super(message); - } -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/fdf75846/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/protocol/ClientProtocol.java ---------------------------------------------------------------------- diff --git a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/protocol/ClientProtocol.java b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/protocol/ClientProtocol.java deleted file mode 100644 index 32274eb..0000000 --- a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/protocol/ClientProtocol.java +++ /dev/null @@ -1,78 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.remote.protocol; - -import java.io.IOException; -import java.util.Set; - -import org.apache.nifi.processor.ProcessContext; -import org.apache.nifi.processor.ProcessSession; -import org.apache.nifi.remote.Peer; -import org.apache.nifi.remote.PeerStatus; -import org.apache.nifi.remote.VersionedRemoteResource; -import org.apache.nifi.remote.codec.FlowFileCodec; -import org.apache.nifi.remote.exception.HandshakeException; -import org.apache.nifi.remote.exception.PortNotRunningException; -import org.apache.nifi.remote.exception.ProtocolException; -import org.apache.nifi.remote.exception.UnknownPortException; - -public interface ClientProtocol extends VersionedRemoteResource { - - void handshake(Peer peer) throws IOException, HandshakeException, UnknownPortException, PortNotRunningException; - - Set<PeerStatus> getPeerStatuses(Peer currentPeer) throws IOException, ProtocolException; - - FlowFileCodec negotiateCodec(Peer peer) throws IOException, ProtocolException; - - void receiveFlowFiles(Peer peer, ProcessContext context, ProcessSession session, FlowFileCodec codec) throws IOException, ProtocolException; - - void transferFlowFiles(Peer peer, ProcessContext context, ProcessSession session, FlowFileCodec codec) throws IOException, ProtocolException; - - void shutdown(Peer peer) throws IOException, ProtocolException; - - boolean isReadyForFileTransfer(); - - /** - * returns <code>true</code> if remote instance indicates that the port is - * invalid - * - * @return - * @throws IllegalStateException if a handshake has not successfully - * completed - */ - boolean isPortInvalid() throws IllegalStateException; - - /** - * returns <code>true</code> if remote instance indicates that the port is - * unknown - * - * @return - * @throws IllegalStateException if a handshake has not successfully - * completed - */ - boolean isPortUnknown(); - - /** - * returns <code>true</code> if remote instance indicates that the port's - * destination is full - * - * @return - * @throws IllegalStateException if a handshake has not successfully - * completed - */ - boolean isDestinationFull(); -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/fdf75846/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/protocol/CommunicationsInput.java ---------------------------------------------------------------------- diff --git a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/protocol/CommunicationsInput.java b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/protocol/CommunicationsInput.java deleted file mode 100644 index d2e2946..0000000 --- a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/protocol/CommunicationsInput.java +++ /dev/null @@ -1,27 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.remote.protocol; - -import java.io.IOException; -import java.io.InputStream; - -public interface CommunicationsInput { - - InputStream getInputStream() throws IOException; - - long getBytesRead(); -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/fdf75846/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/protocol/CommunicationsOutput.java ---------------------------------------------------------------------- diff --git a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/protocol/CommunicationsOutput.java b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/protocol/CommunicationsOutput.java deleted file mode 100644 index 95cab29..0000000 --- a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/protocol/CommunicationsOutput.java +++ /dev/null @@ -1,27 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.remote.protocol; - -import java.io.IOException; -import java.io.OutputStream; - -public interface CommunicationsOutput { - - OutputStream getOutputStream() throws IOException; - - long getBytesWritten(); -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/fdf75846/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/protocol/CommunicationsSession.java ---------------------------------------------------------------------- diff --git a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/protocol/CommunicationsSession.java b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/protocol/CommunicationsSession.java deleted file mode 100644 index d009cec..0000000 --- a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/protocol/CommunicationsSession.java +++ /dev/null @@ -1,64 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.remote.protocol; - -import java.io.Closeable; -import java.io.IOException; - -public interface CommunicationsSession extends Closeable { - - public static final byte[] MAGIC_BYTES = {(byte) 'N', (byte) 'i', (byte) 'F', (byte) 'i'}; - - CommunicationsInput getInput(); - - CommunicationsOutput getOutput(); - - void setTimeout(int millis) throws IOException; - - int getTimeout() throws IOException; - - void setUri(String uri); - - String getUri(); - - String getUserDn(); - - void setUserDn(String dn); - - boolean isDataAvailable(); - - long getBytesWritten(); - - long getBytesRead(); - - /** - * Asynchronously interrupts this FlowFileCodec. Implementations must ensure - * that they stop sending and receiving data as soon as possible after this - * method has been called, even if doing so results in sending only partial - * data to the peer. This will usually result in the peer throwing a - * SocketTimeoutException. - */ - void interrupt(); - - /** - * Returns <code>true</code> if the connection is closed, <code>false</code> - * otherwise. - * - * @return - */ - boolean isClosed(); -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/fdf75846/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/protocol/RequestType.java ---------------------------------------------------------------------- diff --git a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/protocol/RequestType.java b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/protocol/RequestType.java deleted file mode 100644 index 41334fe..0000000 --- a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/protocol/RequestType.java +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.remote.protocol; - -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.io.IOException; - -public enum RequestType { - - NEGOTIATE_FLOWFILE_CODEC, - REQUEST_PEER_LIST, - SEND_FLOWFILES, - RECEIVE_FLOWFILES, - SHUTDOWN; - - public void writeRequestType(final DataOutputStream dos) throws IOException { - dos.writeUTF(name()); - } - - public static RequestType readRequestType(final DataInputStream dis) throws IOException { - final String requestTypeVal = dis.readUTF(); - try { - return RequestType.valueOf(requestTypeVal); - } catch (final Exception e) { - throw new IOException("Could not determine RequestType: received invalid value " + requestTypeVal); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/fdf75846/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/protocol/ServerProtocol.java ---------------------------------------------------------------------- diff --git a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/protocol/ServerProtocol.java b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/protocol/ServerProtocol.java index 0d18f2e..0118534 100644 --- a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/protocol/ServerProtocol.java +++ b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/remote/protocol/ServerProtocol.java @@ -18,13 +18,13 @@ package org.apache.nifi.remote.protocol; import java.io.IOException; -import org.apache.nifi.cluster.NodeInformant; import org.apache.nifi.groups.ProcessGroup; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.remote.Peer; import org.apache.nifi.remote.RootGroupPort; import org.apache.nifi.remote.VersionedRemoteResource; +import org.apache.nifi.remote.cluster.NodeInformant; import org.apache.nifi.remote.codec.FlowFileCodec; import org.apache.nifi.remote.exception.BadRequestException; import org.apache.nifi.remote.exception.HandshakeException; http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/fdf75846/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java ---------------------------------------------------------------------- diff --git a/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java b/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java index 2c1b085..bfa3d25 100644 --- a/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java +++ b/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java @@ -18,21 +18,10 @@ package org.apache.nifi.remote; import static java.util.Objects.requireNonNull; -import java.io.BufferedReader; -import java.io.DataInputStream; -import java.io.DataOutputStream; import java.io.File; -import java.io.FileInputStream; -import java.io.FileOutputStream; import java.io.IOException; -import java.io.InputStream; -import java.io.InputStreamReader; -import java.io.OutputStream; -import java.net.InetSocketAddress; import java.net.URI; import java.net.URISyntaxException; -import java.nio.channels.SocketChannel; -import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Date; import java.util.HashMap; @@ -49,11 +38,8 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; -import java.util.regex.Pattern; import javax.net.ssl.SSLContext; -import javax.security.cert.CertificateExpiredException; -import javax.security.cert.CertificateNotYetValidException; import javax.ws.rs.core.Response; import org.apache.nifi.connectable.ConnectableType; @@ -72,16 +58,7 @@ import org.apache.nifi.groups.ProcessGroup; import org.apache.nifi.groups.ProcessGroupCounts; import org.apache.nifi.groups.RemoteProcessGroup; import org.apache.nifi.groups.RemoteProcessGroupPortDescriptor; -import org.apache.nifi.stream.io.BufferedOutputStream; -import org.apache.nifi.remote.exception.BadRequestException; -import org.apache.nifi.remote.exception.HandshakeException; -import org.apache.nifi.remote.exception.PortNotRunningException; -import org.apache.nifi.remote.exception.UnknownPortException; -import org.apache.nifi.remote.io.socket.SocketChannelCommunicationsSession; -import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannel; -import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannelCommunicationsSession; -import org.apache.nifi.remote.protocol.CommunicationsSession; -import org.apache.nifi.remote.protocol.socket.SocketClientProtocol; +import org.apache.nifi.remote.client.socket.EndpointConnectionStatePool; import org.apache.nifi.reporting.BulletinRepository; import org.apache.nifi.reporting.Severity; import org.apache.nifi.util.FormatUtils; @@ -156,9 +133,7 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup { private volatile String authorizationIssue; - private volatile PeerStatusCache peerStatusCache; - private static final long PEER_CACHE_MILLIS = TimeUnit.MILLISECONDS.convert(1, TimeUnit.MINUTES); - + private final EndpointConnectionStatePool endpointConnectionPool; private final ScheduledExecutorService backgroundThreadExecutor; public StandardRemoteProcessGroup(final String id, final String targetUri, final ProcessGroup processGroup, @@ -218,45 +193,23 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup { } }; - try { - final File peersFile = getPeerPersistenceFile(); - this.peerStatusCache = new PeerStatusCache(recoverPersistedPeerStatuses(peersFile), peersFile.lastModified()); - } catch (final IOException e) { - logger.error("{} Failed to recover persisted Peer Statuses due to {}", this, e); - } + endpointConnectionPool = new EndpointConnectionStatePool(sslContext, eventReporter, getPeerPersistenceFile()); final Runnable refreshPeers = new Runnable() { @Override public void run() { - final PeerStatusCache existingCache = peerStatusCache; - if (existingCache != null && (existingCache.getTimestamp() + PEER_CACHE_MILLIS > System.currentTimeMillis())) { - return; - } - - Set<RemoteGroupPort> ports = getInputPorts(); - if (ports.isEmpty()) { - ports = getOutputPorts(); - } - - if (ports.isEmpty()){ - return; - } - - // it doesn't really matter which port we use. Since we are just getting the Peer Status, - // if the server indicates that the port cannot receive data for whatever reason, we will - // simply ignore the error. - final RemoteGroupPort port = ports.iterator().next(); - - try { - final Set<PeerStatus> statuses = fetchRemotePeerStatuses(port); - peerStatusCache = new PeerStatusCache(statuses); - logger.info("{} Successfully refreshed Peer Status; remote instance consists of {} peers", StandardRemoteProcessGroup.this, statuses.size()); - } catch (Exception e) { - logger.warn("{} Unable to refresh Remote Group's peers due to {}", StandardRemoteProcessGroup.this, e); - if (logger.isDebugEnabled()) { - logger.warn("", e); - } - } + final boolean secure; + try { + secure = isSecure(); + } catch (CommunicationsException e) { + logger.warn("{} Unable to determine if remote instance {} is configured for secure site-to-site due to {}; will not refresh list of peers", new Object[] {this, getTargetUri(), e.toString()}); + if ( logger.isDebugEnabled() ) { + logger.warn("", e); + } + return; + } + + endpointConnectionPool.refreshPeers(getTargetUri(), secure); } }; @@ -1255,52 +1208,6 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup { } } - @Override - public CommunicationsSession establishSiteToSiteConnection() throws IOException { - final URI uri = apiUri; - final String destinationUri = uri.toString(); - CommunicationsSession commsSession = null; - try { - if (isSecure()) { - if (sslContext == null) { - throw new IOException("Unable to communicate with " + getTargetUri() + " because it requires Secure Site-to-Site communications, but this instance is not configured for secure communications"); - } - - final Integer listeningPort = getListeningPort(); - if (listeningPort == null) { - throw new IOException("Remote instance is not configured to allow incoming Site-to-Site connections"); - } - - final SSLSocketChannel socketChannel = new SSLSocketChannel(sslContext, uri.getHost(), listeningPort, true); - socketChannel.connect(); - commsSession = new SSLSocketChannelCommunicationsSession(socketChannel, destinationUri); - - try { - commsSession.setUserDn(socketChannel.getDn()); - } catch (final CertificateNotYetValidException | CertificateExpiredException ex) { - throw new IOException(ex); - } - } else { - final SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress(uri.getHost(), getListeningPort())); - - commsSession = new SocketChannelCommunicationsSession(socketChannel, destinationUri); - } - - commsSession.getOutput().getOutputStream().write(CommunicationsSession.MAGIC_BYTES); - - commsSession.setUri("nifi://" + uri.getHost() + ":" + uri.getPort()); - } catch (final IOException e) { - if (commsSession != null) { - try { - commsSession.close(); - } catch (final IOException ignore) { - } - } - - throw e; - } - return commsSession; - } @Override public EventReporter getEventReporter() { @@ -1489,133 +1396,13 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup { @Override public Set<PeerStatus> getPeerStatuses() { - final PeerStatusCache cache = this.peerStatusCache; - if (cache == null || cache.getStatuses() == null || cache.getStatuses().isEmpty()) { - return null; - } - - if (cache.getTimestamp() + PEER_CACHE_MILLIS < System.currentTimeMillis()) { - final Set<PeerStatus> equalizedSet = new HashSet<>(cache.getStatuses().size()); - for (final PeerStatus status : cache.getStatuses()) { - final PeerStatus equalizedStatus = new PeerStatus(status.getHostname(), status.getPort(), status.isSecure(), 1); - equalizedSet.add(equalizedStatus); - } - - return equalizedSet; - } - - return cache.getStatuses(); + return endpointConnectionPool.getPeerStatuses(); } - private Set<PeerStatus> fetchRemotePeerStatuses(final RemoteGroupPort port) throws IOException, HandshakeException, UnknownPortException, PortNotRunningException, BadRequestException { - final CommunicationsSession commsSession = establishSiteToSiteConnection(); - final Peer peer = new Peer(commsSession, "nifi://" + getTargetUri().getHost() + ":" + getListeningPort()); - final SocketClientProtocol clientProtocol = new SocketClientProtocol(); - clientProtocol.setPort(port); - final DataInputStream dis = new DataInputStream(commsSession.getInput().getInputStream()); - final DataOutputStream dos = new DataOutputStream(commsSession.getOutput().getOutputStream()); - try { - RemoteResourceFactory.initiateResourceNegotiation(clientProtocol, dis, dos); - } catch (final HandshakeException e) { - throw new BadRequestException(e.toString()); - } - - clientProtocol.handshake(peer); - final Set<PeerStatus> peerStatuses = clientProtocol.getPeerStatuses(peer); - persistPeerStatuses(peerStatuses); - - try { - clientProtocol.shutdown(peer); - } catch (final IOException e) { - final String message = String.format("%s Failed to shutdown protocol when updating list of peers due to %s", this, e.toString()); - logger.warn(message); - if (logger.isDebugEnabled()) { - logger.warn("", e); - } - getEventReporter().reportEvent(Severity.WARNING, "Site to Site", message); - } - - try { - peer.close(); - } catch (final IOException e) { - final String message = String.format("%s Failed to close resources when updating list of peers due to %s", this, e.toString()); - logger.warn(message); - if (logger.isDebugEnabled()) { - logger.warn("", e); - } - getEventReporter().reportEvent(Severity.WARNING, "Site to Site", message); - } - - return peerStatuses; - } private File getPeerPersistenceFile() { final File stateDir = NiFiProperties.getInstance().getPersistentStateDirectory(); return new File(stateDir, getIdentifier() + ".peers"); } - private void persistPeerStatuses(final Set<PeerStatus> statuses) { - final File peersFile = getPeerPersistenceFile(); - try (final OutputStream fos = new FileOutputStream(peersFile); - final OutputStream out = new BufferedOutputStream(fos)) { - - for (final PeerStatus status : statuses) { - final String line = status.getHostname() + ":" + status.getPort() + ":" + status.isSecure() + "\n"; - out.write(line.getBytes(StandardCharsets.UTF_8)); - } - - } catch (final IOException e) { - logger.error("Failed to persist list of Peers due to {}; if restarted and peer's NCM is down, may be unable to transfer data until communications with NCM are restored", e.toString(), e); - } - } - - private Set<PeerStatus> recoverPersistedPeerStatuses(final File file) throws IOException { - if (!file.exists()) { - return null; - } - - final Set<PeerStatus> statuses = new HashSet<>(); - try (final InputStream fis = new FileInputStream(file); - final BufferedReader reader = new BufferedReader(new InputStreamReader(fis))) { - - String line; - while ((line = reader.readLine()) != null) { - final String[] splits = line.split(Pattern.quote(":")); - if (splits.length != 3) { - continue; - } - - final String hostname = splits[0]; - final int port = Integer.parseInt(splits[1]); - final boolean secure = Boolean.parseBoolean(splits[2]); - - statuses.add(new PeerStatus(hostname, port, secure, 1)); - } - } - - return statuses; - } - - private static class PeerStatusCache { - - private final Set<PeerStatus> statuses; - private final long timestamp; - - public PeerStatusCache(final Set<PeerStatus> statuses) { - this(statuses, System.currentTimeMillis()); - } - - public PeerStatusCache(final Set<PeerStatus> statuses, final long timestamp) { - this.statuses = statuses; - this.timestamp = timestamp; - } - - public Set<PeerStatus> getStatuses() { - return statuses; - } - - public long getTimestamp() { - return timestamp; - } - } } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/fdf75846/nifi/nar-bundles/framework-bundle/framework/site-to-site/pom.xml ---------------------------------------------------------------------- diff --git a/nifi/nar-bundles/framework-bundle/framework/site-to-site/pom.xml b/nifi/nar-bundles/framework-bundle/framework/site-to-site/pom.xml index 1c8d2c0..a7909a3 100644 --- a/nifi/nar-bundles/framework-bundle/framework/site-to-site/pom.xml +++ b/nifi/nar-bundles/framework-bundle/framework/site-to-site/pom.xml @@ -62,6 +62,10 @@ <artifactId>nifi-utils</artifactId> </dependency> <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>site-to-site-client</artifactId> + </dependency> + <dependency> <groupId>org.apache.httpcomponents</groupId> <artifactId>httpclient</artifactId> </dependency> http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/fdf75846/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/AbstractCommunicationsSession.java ---------------------------------------------------------------------- diff --git a/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/AbstractCommunicationsSession.java b/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/AbstractCommunicationsSession.java deleted file mode 100644 index 4babb92..0000000 --- a/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/AbstractCommunicationsSession.java +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.remote; - -import org.apache.nifi.remote.protocol.CommunicationsSession; - -public abstract class AbstractCommunicationsSession implements CommunicationsSession { - private String userDn; - - private volatile String uri; - - public AbstractCommunicationsSession(final String uri) { - this.uri = uri; - } - - @Override - public String toString() { - return uri; - } - - @Override - public void setUri(final String uri) { - this.uri = uri; - } - - @Override - public String getUri() { - return uri; - } - - @Override - public String getUserDn() { - return userDn; - } - - @Override - public void setUserDn(final String dn) { - this.userDn = dn; - } -} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/fdf75846/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/RemoteResourceFactory.java ---------------------------------------------------------------------- diff --git a/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/RemoteResourceFactory.java b/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/RemoteResourceFactory.java index 49d3c3c..922d4e7 100644 --- a/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/RemoteResourceFactory.java +++ b/nifi/nar-bundles/framework-bundle/framework/site-to-site/src/main/java/org/apache/nifi/remote/RemoteResourceFactory.java @@ -26,48 +26,8 @@ import org.apache.nifi.remote.exception.HandshakeException; import org.apache.nifi.remote.protocol.ClientProtocol; import org.apache.nifi.remote.protocol.ServerProtocol; -public class RemoteResourceFactory { +public class RemoteResourceFactory extends RemoteResourceInitiator { - public static final int RESOURCE_OK = 20; - public static final int DIFFERENT_RESOURCE_VERSION = 21; - public static final int ABORT = 255; - - - public static VersionedRemoteResource initiateResourceNegotiation(final VersionedRemoteResource resource, final DataInputStream dis, final DataOutputStream dos) throws IOException, HandshakeException { - // Write the classname of the RemoteStreamCodec, followed by its version - dos.writeUTF(resource.getResourceName()); - final VersionNegotiator negotiator = resource.getVersionNegotiator(); - dos.writeInt(negotiator.getVersion()); - dos.flush(); - - // wait for response from server. - final int statusCode = dis.read(); - switch (statusCode) { - case RESOURCE_OK: // server accepted our proposal of codec name/version - return resource; - case DIFFERENT_RESOURCE_VERSION: // server accepted our proposal of codec name but not the version - // Get server's preferred version - final int newVersion = dis.readInt(); - - // Determine our new preferred version that is no greater than the server's preferred version. - final Integer newPreference = negotiator.getPreferredVersion(newVersion); - // If we could not agree with server on a version, fail now. - if ( newPreference == null ) { - throw new HandshakeException("Could not agree on version for " + resource); - } - - negotiator.setVersion(newPreference); - - // Attempt negotiation of resource based on our new preferred version. - return initiateResourceNegotiation(resource, dis, dos); - case ABORT: - throw new HandshakeException("Remote destination aborted connection with message: " + dis.readUTF()); - default: - return null; // Unable to negotiate codec - } - } - - @SuppressWarnings("unchecked") public static <T extends FlowFileCodec> T receiveCodecNegotiation(final DataInputStream dis, final DataOutputStream dos) throws IOException, HandshakeException { final String codecName = dis.readUTF();