NIFI-282: Fixed bug that caused client not to be able to communicate with 
remote NiFi instance


Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/5c8a9e22
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/5c8a9e22
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/5c8a9e22

Branch: refs/heads/develop
Commit: 5c8a9e22d11007487b00d42455bc630451c76f82
Parents: d1e058c
Author: Mark Payne <marka...@hotmail.com>
Authored: Thu Feb 12 09:15:07 2015 -0500
Committer: Mark Payne <marka...@hotmail.com>
Committed: Thu Feb 12 09:15:07 2015 -0500

----------------------------------------------------------------------
 .../client/socket/EndpointConnectionPool.java   | 27 +++++++++----
 .../nifi/remote/client/socket/SocketClient.java | 12 ++++--
 .../client/socket/TestSiteToSiteClient.java     | 41 ++++++++++----------
 3 files changed, 47 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/5c8a9e22/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java
 
b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java
index 43bc8e5..c0e4761 100644
--- 
a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java
+++ 
b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/EndpointConnectionPool.java
@@ -38,6 +38,7 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
@@ -114,6 +115,7 @@ public class EndpointConnectionPool {
     private final SSLContext sslContext;
     private final ScheduledExecutorService taskExecutor;
     private final int idleExpirationMillis;
+    private final RemoteDestination remoteDestination;
     
     private final ReadWriteLock listeningPortRWLock = new 
ReentrantReadWriteLock();
     private final Lock remoteInfoReadLock = listeningPortRWLock.readLock();
@@ -128,15 +130,17 @@ public class EndpointConnectionPool {
     private volatile boolean shutdown = false;
     
     
-    public EndpointConnectionPool(final String clusterUrl, final int 
commsTimeoutMillis, final int idleExpirationMillis, 
-            final EventReporter eventReporter, final File persistenceFile) 
+    public EndpointConnectionPool(final String clusterUrl, final 
RemoteDestination remoteDestination, final int commsTimeoutMillis, 
+            final int idleExpirationMillis, final EventReporter eventReporter, 
final File persistenceFile) 
     {
-       this(clusterUrl, commsTimeoutMillis, idleExpirationMillis, null, 
eventReporter, persistenceFile);
+       this(clusterUrl, remoteDestination, commsTimeoutMillis, 
idleExpirationMillis, null, eventReporter, persistenceFile);
     }
     
-    public EndpointConnectionPool(final String clusterUrl, final int 
commsTimeoutMillis, final int idleExpirationMillis,
+    public EndpointConnectionPool(final String clusterUrl, final 
RemoteDestination remoteDestination, final int commsTimeoutMillis, final int 
idleExpirationMillis,
             final SSLContext sslContext, final EventReporter eventReporter, 
final File persistenceFile) 
     {
+        Objects.requireNonNull(clusterUrl, "URL cannot be null");
+        Objects.requireNonNull(remoteDestination, "Remote Destination/Port 
Identifier cannot be null");
        try {
                this.clusterUrl = new URI(clusterUrl);
        } catch (final URISyntaxException e) {
@@ -150,6 +154,7 @@ public class EndpointConnectionPool {
         }
         apiUri = this.clusterUrl.getScheme() + "://" + 
this.clusterUrl.getHost() + ":" + this.clusterUrl.getPort() + uriPath + "-api";
         
+        this.remoteDestination = remoteDestination;
        this.sslContext = sslContext;
        this.peersFile = persistenceFile;
        this.eventReporter = eventReporter;
@@ -197,12 +202,12 @@ public class EndpointConnectionPool {
     }
     
     
-    public EndpointConnection getEndpointConnection(final RemoteDestination 
remoteDestination, final TransferDirection direction) throws IOException, 
HandshakeException, PortNotRunningException, UnknownPortException, 
ProtocolException {
-        return getEndpointConnection(remoteDestination, direction, null);
+    public EndpointConnection getEndpointConnection(final TransferDirection 
direction) throws IOException, HandshakeException, PortNotRunningException, 
UnknownPortException, ProtocolException {
+        return getEndpointConnection(direction, null);
     }
     
     
-    public EndpointConnection getEndpointConnection(final RemoteDestination 
remoteDestination, final TransferDirection direction, final 
SiteToSiteClientConfig config) throws IOException, HandshakeException, 
PortNotRunningException, UnknownPortException, ProtocolException {
+    public EndpointConnection getEndpointConnection(final TransferDirection 
direction, final SiteToSiteClientConfig config) throws IOException, 
HandshakeException, PortNotRunningException, UnknownPortException, 
ProtocolException {
        //
         // Attempt to get a connection state that already exists for this URL.
         //
@@ -419,6 +424,7 @@ public class EndpointConnectionPool {
         return (peerList == null || peerList.isEmpty() || 
System.currentTimeMillis() > peerRefreshTime + PEER_REFRESH_PERIOD);
     }
     
+    
     private PeerStatus getNextPeerStatus(final TransferDirection direction) {
         List<PeerStatus> peerList = peerStatuses;
         if ( isPeerRefreshNeeded(peerList) ) {
@@ -532,7 +538,12 @@ public class EndpointConnectionPool {
         RemoteResourceInitiator.initiateResourceNegotiation(clientProtocol, 
dis, dos);
 
         clientProtocol.setTimeout(commsTimeout);
-        clientProtocol.handshake(peer, null);
+        if (clientProtocol.getVersionNegotiator().getVersion() < 5) {
+            clientProtocol.handshake(peer, remoteDestination.getIdentifier());
+        } else {
+            clientProtocol.handshake(peer, null);
+        }
+        
         final Set<PeerStatus> peerStatuses = 
clientProtocol.getPeerStatuses(peer);
         persistPeerStatuses(peerStatuses);
 

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/5c8a9e22/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java
 
b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java
index aae19b3..016e67f 100644
--- 
a/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java
+++ 
b/nifi/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/client/socket/SocketClient.java
@@ -40,9 +40,11 @@ public class SocketClient implements SiteToSiteClient {
        private final String portName;
        private final long penalizationNanos;
        private volatile String portIdentifier;
+       private volatile boolean closed = false;
        
        public SocketClient(final SiteToSiteClientConfig config) {
-               pool = new EndpointConnectionPool(config.getUrl(), (int) 
config.getTimeout(TimeUnit.MILLISECONDS),
+               pool = new EndpointConnectionPool(config.getUrl(), 
createRemoteDestination(config.getPortIdentifier()), 
+                       (int) config.getTimeout(TimeUnit.MILLISECONDS),
                        (int) 
config.getIdleConnectionExpiration(TimeUnit.MILLISECONDS),
                                config.getSslContext(), 
config.getEventReporter(), config.getPeerPersistenceFile());
                
@@ -107,15 +109,16 @@ public class SocketClient implements SiteToSiteClient {
        
        @Override
        public Transaction createTransaction(final TransferDirection direction) 
throws IOException {
+           if ( closed ) {
+               throw new IllegalStateException("Client is closed");
+           }
                final String portId = getPortIdentifier(direction);
                
                if ( portId == null ) {
                        throw new IOException("Could not find Port with name '" 
+ portName + "' for remote NiFi instance");
                }
                
-               final RemoteDestination remoteDestination = 
createRemoteDestination(portId);
-               
-               final EndpointConnection connectionState = 
pool.getEndpointConnection(remoteDestination, direction, getConfig());
+               final EndpointConnection connectionState = 
pool.getEndpointConnection(direction, getConfig());
                if ( connectionState == null ) {
                    return null;
                }
@@ -196,6 +199,7 @@ public class SocketClient implements SiteToSiteClient {
        
        @Override
        public void close() throws IOException {
+           closed = true;
                pool.shutdown();
        }
        

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/5c8a9e22/nifi/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/socket/TestSiteToSiteClient.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/socket/TestSiteToSiteClient.java
 
b/nifi/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/socket/TestSiteToSiteClient.java
index 2fd90f8..75becd3 100644
--- 
a/nifi/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/socket/TestSiteToSiteClient.java
+++ 
b/nifi/nifi-commons/nifi-site-to-site-client/src/test/java/org/apache/nifi/remote/client/socket/TestSiteToSiteClient.java
@@ -75,28 +75,27 @@ public class TestSiteToSiteClient {
     public void testSend() throws IOException {
         
System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.remote", 
"DEBUG");
         
-        final SiteToSiteClient client = new SiteToSiteClient.Builder()
-            .url("http://10.0.64.63:8080/nifi";)
-            .portName("input")
-            .nodePenalizationPeriod(10, TimeUnit.MILLISECONDS)
-            .build();
+            final SiteToSiteClient client = new SiteToSiteClient.Builder()
+                .url("http://localhost:8080/nifi";)
+                .portName("input")
+                .build();
         
-        try {
-            final Transaction transaction = 
client.createTransaction(TransferDirection.SEND);
-            Assert.assertNotNull(transaction);
-            
-            final Map<String, String> attrs = new HashMap<>();
-            attrs.put("site-to-site", "yes, please!");
-            final byte[] bytes = "Hello".getBytes();
-            final ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
-            final DataPacket packet = new StandardDataPacket(attrs, bais, 
bytes.length);
-            transaction.send(packet);
-
-            transaction.confirm();
-            transaction.complete();
-        } finally {
-            client.close();
-        }
+            try {
+                final Transaction transaction = 
client.createTransaction(TransferDirection.SEND);
+                Assert.assertNotNull(transaction);
+                
+                    final Map<String, String> attrs = new HashMap<>();
+                    attrs.put("site-to-site", "yes, please!");
+                    final byte[] bytes = "Hello".getBytes();
+                    final ByteArrayInputStream bais = new 
ByteArrayInputStream(bytes);
+                    final DataPacket packet = new StandardDataPacket(attrs, 
bais, bytes.length);
+                    transaction.send(packet);
+                
+                transaction.confirm();
+                transaction.complete();
+            } finally {
+                client.close();
+            }
     }
     
 }

Reply via email to