NIFI-282: Fixed bug that caused client not to be able to communicate with remote NiFi instance
Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/5c8a9e22 Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/5c8a9e22 Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/5c8a9e22 Branch: refs/heads/develop Commit: 5c8a9e22d11007487b00d42455bc630451c76f82 Parents: d1e058c Author: Mark Payne <marka...@hotmail.com> Authored: Thu Feb 12 09:15:07 2015 -0500 Committer: Mark Payne <marka...@hotmail.com> Committed: Thu Feb 12 09:15:07 2015 -0500 ---------------------------------------------------------------------- .../client/socket/EndpointConnectionPool.java | 27 +++++++++---- .../nifi/remote/client/socket/SocketClient.java | 12 ++++-- .../client/socket/TestSiteToSiteClient.java | 41 ++++++++++---------- 3 files changed, 47 insertions(+), 33 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/5c8a9e22/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java index 43bc8e5..c0e4761 100644 --- a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java +++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java @@ -38,6 +38,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Set; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; @@ -114,6 +115,7 @@ public class EndpointConnectionPool { private final SSLContext sslContext; private final ScheduledExecutorService taskExecutor; private final int idleExpirationMillis; + private final RemoteDestination remoteDestination; private final ReadWriteLock listeningPortRWLock = new ReentrantReadWriteLock(); private final Lock remoteInfoReadLock = listeningPortRWLock.readLock(); @@ -128,15 +130,17 @@ public class EndpointConnectionPool { private volatile boolean shutdown = false; - public EndpointConnectionPool(final String clusterUrl, final int commsTimeoutMillis, final int idleExpirationMillis, - final EventReporter eventReporter, final File persistenceFile) + public EndpointConnectionPool(final String clusterUrl, final RemoteDestination remoteDestination, final int commsTimeoutMillis, + final int idleExpirationMillis, final EventReporter eventReporter, final File persistenceFile) { - this(clusterUrl, commsTimeoutMillis, idleExpirationMillis, null, eventReporter, persistenceFile); + this(clusterUrl, remoteDestination, commsTimeoutMillis, idleExpirationMillis, null, eventReporter, persistenceFile); } - public EndpointConnectionPool(final String clusterUrl, final int commsTimeoutMillis, final int idleExpirationMillis, + public EndpointConnectionPool(final String clusterUrl, final RemoteDestination remoteDestination, final int commsTimeoutMillis, final int idleExpirationMillis, final SSLContext sslContext, final EventReporter eventReporter, final File persistenceFile) { + Objects.requireNonNull(clusterUrl, "URL cannot be null"); + Objects.requireNonNull(remoteDestination, "Remote Destination/Port Identifier cannot be null"); try { this.clusterUrl = new URI(clusterUrl); } catch (final URISyntaxException e) { @@ -150,6 +154,7 @@ public class EndpointConnectionPool { } apiUri = this.clusterUrl.getScheme() + "://" + this.clusterUrl.getHost() + ":" + this.clusterUrl.getPort() + uriPath + "-api"; + this.remoteDestination = remoteDestination; this.sslContext = sslContext; this.peersFile = persistenceFile; this.eventReporter = eventReporter; @@ -197,12 +202,12 @@ public class EndpointConnectionPool { } - public EndpointConnection getEndpointConnection(final RemoteDestination remoteDestination, final TransferDirection direction) throws IOException, HandshakeException, PortNotRunningException, UnknownPortException, ProtocolException { - return getEndpointConnection(remoteDestination, direction, null); + public EndpointConnection getEndpointConnection(final TransferDirection direction) throws IOException, HandshakeException, PortNotRunningException, UnknownPortException, ProtocolException { + return getEndpointConnection(direction, null); } - public EndpointConnection getEndpointConnection(final RemoteDestination remoteDestination, final TransferDirection direction, final SiteToSiteClientConfig config) throws IOException, HandshakeException, PortNotRunningException, UnknownPortException, ProtocolException { + public EndpointConnection getEndpointConnection(final TransferDirection direction, final SiteToSiteClientConfig config) throws IOException, HandshakeException, PortNotRunningException, UnknownPortException, ProtocolException { // // Attempt to get a connection state that already exists for this URL. // @@ -419,6 +424,7 @@ public class EndpointConnectionPool { return (peerList == null || peerList.isEmpty() || System.currentTimeMillis() > peerRefreshTime + PEER_REFRESH_PERIOD); } + private PeerStatus getNextPeerStatus(final TransferDirection direction) { List<PeerStatus> peerList = peerStatuses; if ( isPeerRefreshNeeded(peerList) ) { @@ -532,7 +538,12 @@ public class EndpointConnectionPool { RemoteResourceInitiator.initiateResourceNegotiation(clientProtocol, dis, dos); clientProtocol.setTimeout(commsTimeout); - clientProtocol.handshake(peer, null); + if (clientProtocol.getVersionNegotiator().getVersion() < 5) { + clientProtocol.handshake(peer, remoteDestination.getIdentifier()); + } else { + clientProtocol.handshake(peer, null); + } + final Set<PeerStatus> peerStatuses = clientProtocol.getPeerStatuses(peer); persistPeerStatuses(peerStatuses); http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/5c8a9e22/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java index aae19b3..016e67f 100644 --- a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java +++ b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java @@ -40,9 +40,11 @@ public class SocketClient implements SiteToSiteClient { private final String portName; private final long penalizationNanos; private volatile String portIdentifier; + private volatile boolean closed = false; public SocketClient(final SiteToSiteClientConfig config) { - pool = new EndpointConnectionPool(config.getUrl(), (int) config.getTimeout(TimeUnit.MILLISECONDS), + pool = new EndpointConnectionPool(config.getUrl(), createRemoteDestination(config.getPortIdentifier()), + (int) config.getTimeout(TimeUnit.MILLISECONDS), (int) config.getIdleConnectionExpiration(TimeUnit.MILLISECONDS), config.getSslContext(), config.getEventReporter(), config.getPeerPersistenceFile()); @@ -107,15 +109,16 @@ public class SocketClient implements SiteToSiteClient { @Override public Transaction createTransaction(final TransferDirection direction) throws IOException { + if ( closed ) { + throw new IllegalStateException("Client is closed"); + } final String portId = getPortIdentifier(direction); if ( portId == null ) { throw new IOException("Could not find Port with name '" + portName + "' for remote NiFi instance"); } - final RemoteDestination remoteDestination = createRemoteDestination(portId); - - final EndpointConnection connectionState = pool.getEndpointConnection(remoteDestination, direction, getConfig()); + final EndpointConnection connectionState = pool.getEndpointConnection(direction, getConfig()); if ( connectionState == null ) { return null; } @@ -196,6 +199,7 @@ public class SocketClient implements SiteToSiteClient { @Override public void close() throws IOException { + closed = true; pool.shutdown(); } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/5c8a9e22/nifi/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/socket/TestSiteToSiteClient.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/socket/TestSiteToSiteClient.java b/nifi/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/socket/TestSiteToSiteClient.java index 2fd90f8..75becd3 100644 --- a/nifi/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/socket/TestSiteToSiteClient.java +++ b/nifi/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/socket/TestSiteToSiteClient.java @@ -75,28 +75,27 @@ public class TestSiteToSiteClient { public void testSend() throws IOException { System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.remote", "DEBUG"); - final SiteToSiteClient client = new SiteToSiteClient.Builder() - .url("http://10.0.64.63:8080/nifi") - .portName("input") - .nodePenalizationPeriod(10, TimeUnit.MILLISECONDS) - .build(); + final SiteToSiteClient client = new SiteToSiteClient.Builder() + .url("http://localhost:8080/nifi") + .portName("input") + .build(); - try { - final Transaction transaction = client.createTransaction(TransferDirection.SEND); - Assert.assertNotNull(transaction); - - final Map<String, String> attrs = new HashMap<>(); - attrs.put("site-to-site", "yes, please!"); - final byte[] bytes = "Hello".getBytes(); - final ByteArrayInputStream bais = new ByteArrayInputStream(bytes); - final DataPacket packet = new StandardDataPacket(attrs, bais, bytes.length); - transaction.send(packet); - - transaction.confirm(); - transaction.complete(); - } finally { - client.close(); - } + try { + final Transaction transaction = client.createTransaction(TransferDirection.SEND); + Assert.assertNotNull(transaction); + + final Map<String, String> attrs = new HashMap<>(); + attrs.put("site-to-site", "yes, please!"); + final byte[] bytes = "Hello".getBytes(); + final ByteArrayInputStream bais = new ByteArrayInputStream(bytes); + final DataPacket packet = new StandardDataPacket(attrs, bais, bytes.length); + transaction.send(packet); + + transaction.confirm(); + transaction.complete(); + } finally { + client.close(); + } } }