This is an automated email from the ASF dual-hosted git repository.

exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 9a4ce2607d NIFI-9878 Added timeout handling for Cache Client 
handshaking
9a4ce2607d is described below

commit 9a4ce2607dfbf6e9a9731a19536aa7a4f5552ffd
Author: Jon Shoemaker <[email protected]>
AuthorDate: Tue Sep 13 15:48:29 2022 +0000

    NIFI-9878 Added timeout handling for Cache Client handshaking
    
    This closes #6414
    
    Co-authored-by: Nissim Shiman <[email protected]>
    Co-authored-by: Jon Shoemaker <[email protected]>
    Signed-off-by: David Handermann <[email protected]>
---
 .../client/CacheClientChannelInitializer.java      |  2 +-
 .../cache/client/CacheClientHandshakeHandler.java  | 38 ++++++++++++++--
 .../cache/client/CacheClientRequestHandler.java    | 24 +++++-----
 .../map/TestDistributedMapServerAndClient.java     | 51 ++++++++++++++++++++++
 4 files changed, 101 insertions(+), 14 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/CacheClientChannelInitializer.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/CacheClientChannelInitializer.java
index 4f891f448b..13e50bd9f7 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/CacheClientChannelInitializer.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/CacheClientChannelInitializer.java
@@ -78,7 +78,7 @@ public class CacheClientChannelInitializer extends 
ChannelInitializer<Channel> {
         final VersionNegotiator versionNegotiator = 
versionNegotiatorFactory.create();
         channelPipeline.addFirst(new 
IdleStateHandler(idleTimeout.getSeconds(), idleTimeout.getSeconds(), 
idleTimeout.getSeconds(), TimeUnit.SECONDS));
         channelPipeline.addLast(new 
WriteTimeoutHandler(writeTimeout.toMillis(), TimeUnit.MILLISECONDS));
-        channelPipeline.addLast(new CacheClientHandshakeHandler(channel, 
versionNegotiator));
+        channelPipeline.addLast(new CacheClientHandshakeHandler(channel, 
versionNegotiator, writeTimeout.toMillis()));
         channelPipeline.addLast(new CacheClientRequestHandler());
         channelPipeline.addLast(new CloseContextIdleStateHandler());
     }
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/CacheClientHandshakeHandler.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/CacheClientHandshakeHandler.java
index 2dd0be35b7..899ed01523 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/CacheClientHandshakeHandler.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/CacheClientHandshakeHandler.java
@@ -32,6 +32,7 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 import java.util.Optional;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
 /**
@@ -64,24 +65,37 @@ public class CacheClientHandshakeHandler extends 
ChannelInboundHandlerAdapter {
      */
     private final VersionNegotiator versionNegotiator;
 
+    /**
+     * THe network timeout associated with handshake completion
+     */
+    private final long timeoutMillis;
+
     /**
      * Constructor.
      *
      * @param channel           the channel to which this {@link 
io.netty.channel.ChannelHandler} is bound.
      * @param versionNegotiator coordinator used to broker the version of the 
distributed cache protocol with the service
+     * @param timeoutMillis     the network timeout associated with handshake 
completion
      */
-    public CacheClientHandshakeHandler(final Channel channel, final 
VersionNegotiator versionNegotiator) {
+    public CacheClientHandshakeHandler(final Channel channel, final 
VersionNegotiator versionNegotiator,
+                                       final long timeoutMillis) {
         this.promiseHandshakeComplete = channel.newPromise();
         this.protocol = new AtomicInteger(PROTOCOL_UNINITIALIZED);
         this.versionNegotiator = versionNegotiator;
+        this.timeoutMillis = timeoutMillis;
     }
 
     /**
      * API providing client application with visibility into the handshake 
process.  Distributed cache requests
-     * should not be sent using this {@link Channel} until the handshake is 
complete.
+     * should not be sent using this {@link Channel} until the handshake is 
complete.  Since the handshake might fail,
+     * {@link #isSuccess()} should be called after this method completes.
      */
     public void waitHandshakeComplete() {
-        promiseHandshakeComplete.awaitUninterruptibly();
+        promiseHandshakeComplete.awaitUninterruptibly(timeoutMillis, 
TimeUnit.MILLISECONDS);
+        if (!promiseHandshakeComplete.isSuccess()) {
+            HandshakeException ex = new HandshakeException("Handshake timed 
out before completion.");
+            promiseHandshakeComplete.setFailure(ex);
+        }
     }
 
     /**
@@ -157,4 +171,22 @@ public class CacheClientHandshakeHandler extends 
ChannelInboundHandlerAdapter {
             promiseHandshakeComplete.setSuccess();
         }
     }
+
+    /**
+     * Returns if the handshake completed successfully
+     *
+     * @return success/failure of handshake
+     */
+    public boolean isSuccess() {
+        return promiseHandshakeComplete.isSuccess();
+    }
+
+    /**
+     * Return reason for handshake failure.
+     *
+     * @return cause for handshake failure or null on success
+     */
+    public Throwable cause() {
+        return promiseHandshakeComplete.cause();
+    }
 }
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/CacheClientRequestHandler.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/CacheClientRequestHandler.java
index 62eccba1d0..50eeb0de4a 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/CacheClientRequestHandler.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/CacheClientRequestHandler.java
@@ -90,16 +90,20 @@ public class CacheClientRequestHandler extends 
ChannelInboundHandlerAdapter {
     public void invoke(final Channel channel, final OutboundAdapter 
outboundAdapter, final InboundAdapter inboundAdapter) throws IOException {
         final CacheClientHandshakeHandler handshakeHandler = 
channel.pipeline().get(CacheClientHandshakeHandler.class);
         handshakeHandler.waitHandshakeComplete();
-        if (handshakeHandler.getVersionNegotiator().getVersion() < 
outboundAdapter.getMinimumVersion()) {
-            throw new UnsupportedOperationException("Remote cache server 
doesn't support protocol version " + outboundAdapter.getMinimumVersion());
-        }
-        this.inboundAdapter = inboundAdapter;
-        channelPromise = channel.newPromise();
-        
channel.writeAndFlush(Unpooled.wrappedBuffer(outboundAdapter.toBytes()));
-        channelPromise.awaitUninterruptibly();
-        this.inboundAdapter = new NullInboundAdapter();
-        if (channelPromise.cause() != null) {
-            throw new IOException("Request invocation failed", 
channelPromise.cause());
+        if (handshakeHandler.isSuccess()) {
+            if (handshakeHandler.getVersionNegotiator().getVersion() < 
outboundAdapter.getMinimumVersion()) {
+                throw new UnsupportedOperationException("Remote cache server 
doesn't support protocol version " + outboundAdapter.getMinimumVersion());
+            }
+            this.inboundAdapter = inboundAdapter;
+            channelPromise = channel.newPromise();
+            
channel.writeAndFlush(Unpooled.wrappedBuffer(outboundAdapter.toBytes()));
+            channelPromise.awaitUninterruptibly();
+            this.inboundAdapter = new NullInboundAdapter();
+            if (channelPromise.cause() != null) {
+                throw new IOException("Request invocation failed", 
channelPromise.cause());
+            }
+        } else {
+            throw new IOException("Request invocation failed", 
handshakeHandler.cause());
         }
     }
 }
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/map/TestDistributedMapServerAndClient.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/map/TestDistributedMapServerAndClient.java
index d32946e83c..46465b55ea 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/map/TestDistributedMapServerAndClient.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/map/TestDistributedMapServerAndClient.java
@@ -29,6 +29,14 @@ import 
org.apache.nifi.distributed.cache.protocol.ProtocolVersion;
 import org.apache.nifi.distributed.cache.server.CacheServer;
 import org.apache.nifi.distributed.cache.server.DistributedCacheServer;
 import org.apache.nifi.distributed.cache.server.EvictionPolicy;
+import org.apache.nifi.event.transport.EventServer;
+import org.apache.nifi.event.transport.configuration.ShutdownQuietPeriod;
+import org.apache.nifi.event.transport.configuration.ShutdownTimeout;
+import org.apache.nifi.event.transport.configuration.TransportProtocol;
+import org.apache.nifi.event.transport.message.ByteArrayMessage;
+import 
org.apache.nifi.event.transport.netty.ByteArrayMessageNettyEventServerFactory;
+import org.apache.nifi.event.transport.netty.NettyEventServerFactory;
+import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.processor.DataUnit;
 import org.apache.nifi.processor.Processor;
 import org.apache.nifi.remote.StandardVersionNegotiator;
@@ -44,6 +52,9 @@ import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.mockito.Mockito;
 
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+
 import javax.net.ssl.SSLContext;
 import java.io.File;
 import java.io.IOException;
@@ -52,6 +63,8 @@ import java.nio.charset.StandardCharsets;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -298,6 +311,44 @@ public class TestDistributedMapServerAndClient {
         }
     }
 
+    @Test
+    public void testIncompleteHandshakeScenario() throws 
InitializationException, IOException {
+        // Default port used by Distributed Server and Client
+        final int port = NetworkUtils.getAvailableTcpPort();
+
+        // This is used to simulate a DistributedCacheServer that does not 
complete the handshake response
+        final BlockingQueue<ByteArrayMessage> messages = new 
LinkedBlockingQueue<>();
+        final NettyEventServerFactory serverFactory = 
getEventServerFactory(port, messages);
+        final EventServer eventServer = serverFactory.getEventServer();
+
+        DistributedMapCacheClientService client = new 
DistributedMapCacheClientService();
+
+        runner.addControllerService("client", client);
+        runner.setProperty(client, DistributedMapCacheClientService.HOSTNAME, 
"localhost");
+        runner.setProperty(client, DistributedMapCacheClientService.PORT, 
String.valueOf(port));
+        runner.setProperty(client, 
DistributedMapCacheClientService.COMMUNICATIONS_TIMEOUT, "250 ms");
+        runner.enableControllerService(client);
+
+        final Serializer<String> valueSerializer = new StringSerializer();
+        final Serializer<String> keySerializer = new StringSerializer();
+        final Deserializer<String> deserializer = new StringDeserializer();
+
+        try {
+            assertThrows(IOException.class, () -> 
client.getAndPutIfAbsent("testKey", "test", keySerializer, valueSerializer, 
deserializer));
+        } finally {
+            eventServer.shutdown();
+        }
+    }
+
+    private NettyEventServerFactory getEventServerFactory(final int port, 
final BlockingQueue<ByteArrayMessage> messages) throws UnknownHostException {
+        final ByteArrayMessageNettyEventServerFactory factory = new 
ByteArrayMessageNettyEventServerFactory(Mockito.mock(ComponentLog.class),
+                InetAddress.getByName("127.0.0.1"), port, 
TransportProtocol.TCP, "\n".getBytes(), 1024, messages);
+        factory.setWorkerThreads(1);
+        
factory.setShutdownQuietPeriod(ShutdownQuietPeriod.QUICK.getDuration());
+        factory.setShutdownTimeout(ShutdownTimeout.QUICK.getDuration());
+        return factory;
+    }
+
     private DistributedMapCacheClientService createClient(final int port) 
throws InitializationException {
         final DistributedMapCacheClientService client = new 
DistributedMapCacheClientService();
         final MockControllerServiceInitializationContext clientInitContext = 
new MockControllerServiceInitializationContext(client, "client");

Reply via email to