This is an automated email from the ASF dual-hosted git repository.

exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 46a5e3f  NIFI-9207 - Added Max Read Size to Distributed Cache Servers
46a5e3f is described below

commit 46a5e3f0968ae16d32b378cd422293e7ad4713bf
Author: Paul Grey <[email protected]>
AuthorDate: Thu Sep 9 11:39:21 2021 -0400

    NIFI-9207 - Added Max Read Size to Distributed Cache Servers
    
    This closes #5420
    
    Signed-off-by: David Handermann <[email protected]>
---
 .../cache/client/CacheClientRequestHandler.java    | 19 ++++++++-
 .../client/DistributedMapCacheClientService.java   |  2 +-
 .../cache/client/adapter/NullInboundAdapter.java   |  2 +-
 .../cache/server/AbstractCacheServer.java          | 43 +++++++++++++++++---
 .../cache/server/DistributedCacheServer.java       |  9 +++++
 .../cache/server/DistributedSetCacheServer.java    |  4 +-
 .../distributed/cache/server/SetCacheServer.java   |  9 ++---
 .../server/map/DistributedMapCacheServer.java      | 10 +++--
 .../cache/server/map/MapCacheServer.java           | 14 ++-----
 .../cache/server/TestServerAndClient.java          | 47 +++++++++++++++++++++-
 10 files changed, 126 insertions(+), 33 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/CacheClientRequestHandler.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/CacheClientRequestHandler.java
index 93acb45..62eccba 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/CacheClientRequestHandler.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/CacheClientRequestHandler.java
@@ -45,7 +45,7 @@ public class CacheClientRequestHandler extends 
ChannelInboundHandlerAdapter {
     private ChannelPromise channelPromise;
 
     @Override
-    public void channelRead(ChannelHandlerContext ctx, Object msg) throws 
IOException {
+    public void channelRead(final ChannelHandlerContext ctx, final Object msg) 
{
         final ByteBuf byteBuf = (ByteBuf) msg;
         try {
             final byte[] bytes = new byte[byteBuf.readableBytes()];
@@ -57,13 +57,25 @@ public class CacheClientRequestHandler extends 
ChannelInboundHandlerAdapter {
     }
 
     @Override
-    public void channelReadComplete(ChannelHandlerContext ctx) throws 
IOException {
+    public void channelReadComplete(final ChannelHandlerContext ctx) throws 
IOException {
         inboundAdapter.dequeue();
         if (inboundAdapter.isComplete() && !channelPromise.isSuccess()) {
             channelPromise.setSuccess();
         }
     }
 
+    @Override
+    public void channelUnregistered(final ChannelHandlerContext ctx) {
+        if (!inboundAdapter.isComplete()) {
+            channelPromise.setFailure(new IOException("Channel unregistered 
before processing completed: " + ctx.channel().toString()));
+        }
+    }
+
+    @Override
+    public void exceptionCaught(final ChannelHandlerContext ctx, final 
Throwable cause) {
+        channelPromise.setFailure(cause);
+    }
+
     /**
      * Perform a synchronous method call to the server.  The server is 
expected to write
      * a byte stream response to the channel, which may be deserialized into a 
Java object
@@ -86,5 +98,8 @@ public class CacheClientRequestHandler extends 
ChannelInboundHandlerAdapter {
         
channel.writeAndFlush(Unpooled.wrappedBuffer(outboundAdapter.toBytes()));
         channelPromise.awaitUninterruptibly();
         this.inboundAdapter = new NullInboundAdapter();
+        if (channelPromise.cause() != null) {
+            throw new IOException("Request invocation failed", 
channelPromise.cause());
+        }
     }
 }
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClientService.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClientService.java
index 7c08fd0..c62b63b 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClientService.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/DistributedMapCacheClientService.java
@@ -84,7 +84,7 @@ public class DistributedMapCacheClientService extends 
AbstractControllerService
         .build();
 
     /**
-     * The implementation of the business logic for {@link 
DistributedSetCacheClientService}.
+     * The implementation of the business logic for {@link 
DistributedMapCacheClientService}.
      */
     private volatile NettyDistributedMapCacheClient cacheClient = null;
 
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/adapter/NullInboundAdapter.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/adapter/NullInboundAdapter.java
index eed1f84..9288bb8 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/adapter/NullInboundAdapter.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-client-service/src/main/java/org/apache/nifi/distributed/cache/client/adapter/NullInboundAdapter.java
@@ -32,7 +32,7 @@ public class NullInboundAdapter implements InboundAdapter {
 
     @Override
     public boolean isComplete() {
-        return false;
+        return true;
     }
 
     @Override
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/AbstractCacheServer.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/AbstractCacheServer.java
index fba868f..fb3d597 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/AbstractCacheServer.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/AbstractCacheServer.java
@@ -18,6 +18,7 @@ package org.apache.nifi.distributed.cache.server;
 
 import java.io.BufferedInputStream;
 import java.io.BufferedOutputStream;
+import java.io.DataInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
@@ -49,16 +50,18 @@ public abstract class AbstractCacheServer implements 
CacheServer {
 
     private final String identifier;
     private final int port;
+    private final int maxReadSize;
     private final SSLContext sslContext;
     protected volatile boolean stopped = false;
     private final Set<Thread> processInputThreads = new 
CopyOnWriteArraySet<>();
 
     private volatile ServerSocketChannel serverSocketChannel;
 
-    public AbstractCacheServer(final String identifier, final SSLContext 
sslContext, final int port) {
+    public AbstractCacheServer(final String identifier, final SSLContext 
sslContext, final int port, final int maxReadSize) {
         this.identifier = identifier;
         this.port = port;
         this.sslContext = sslContext;
+        this.maxReadSize = maxReadSize;
     }
 
     @Override
@@ -108,14 +111,14 @@ public abstract class AbstractCacheServer implements 
CacheServer {
                                     rawInputStream = new 
SSLSocketChannelInputStream(sslSocketChannel);
                                     rawOutputStream = new 
SSLSocketChannelOutputStream(sslSocketChannel);
                                 }
-                            } catch (IOException e) {
+                            } catch (final IOException e) {
                                 logger.error("Cannot create input and/or 
output streams for {}", new Object[]{identifier}, e);
                                 if (logger.isDebugEnabled()) {
                                     logger.error("", e);
                                 }
                                 try {
                                     socketChannel.close();
-                                } catch (IOException swallow) {
+                                } catch (final IOException swallow) {
                                 }
 
                                 return;
@@ -179,19 +182,19 @@ public abstract class AbstractCacheServer implements 
CacheServer {
         if (serverSocketChannel != null && serverSocketChannel.isOpen()) {
             try {
                 serverSocketChannel.close();
-            } catch (IOException e) {
+            } catch (final IOException e) {
                 logger.warn("Server Socket Close Failed", e);
             }
         }
         // need to close out the created SocketChannels...this is done by 
interrupting
         // the created threads that loop on listen().
-        for (Thread processInputThread : processInputThreads) {
+        for (final Thread processInputThread : processInputThreads) {
             processInputThread.interrupt();
             int i = 0;
             while (!processInputThread.isInterrupted() && i++ < 5) {
                 try {
                     Thread.sleep(50); // allow thread to gracefully terminate
-                } catch (InterruptedException e) {
+                } catch (final InterruptedException e) {
                 }
             }
         }
@@ -213,4 +216,32 @@ public abstract class AbstractCacheServer implements 
CacheServer {
      * @throws IOException ex
      */
     protected abstract boolean listen(InputStream in, OutputStream out, int 
version) throws IOException;
+
+    /**
+     * Read a length-prefixed value from the {@link DataInputStream}.
+     *
+     * @param dis the {@link DataInputStream} from which to read the value
+     * @return the serialized representation of the value
+     * @throws IOException on failure to read from the input stream
+     */
+    protected byte[] readValue(final DataInputStream dis) throws IOException {
+        final int numBytes = validateSize(dis.readInt());
+        final byte[] buffer = new byte[numBytes];
+        dis.readFully(buffer);
+        return buffer;
+    }
+
+    /**
+     * Validate a size value received from the {@link DataInputStream} against 
the configured maximum.
+     *
+     * @param size the size value received from the {@link DataInputStream}
+     * @return the size value, iff it passes validation; otherwise, an 
exception is thrown
+     */
+    protected int validateSize(final int size) {
+        if (size <= maxReadSize) {
+            return size;
+        } else {
+            throw new IllegalStateException(String.format("Size [%d] exceeds 
maximum configured read [%d]", size, maxReadSize));
+        }
+    }
 }
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/DistributedCacheServer.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/DistributedCacheServer.java
index 0643c1b..9d48096 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/DistributedCacheServer.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/DistributedCacheServer.java
@@ -68,6 +68,14 @@ public abstract class DistributedCacheServer extends 
AbstractControllerService {
         .required(false)
         .addValidator(StandardValidators.createDirectoryExistsValidator(true, 
true))
         .build();
+    public static final PropertyDescriptor MAX_READ_SIZE = new 
PropertyDescriptor.Builder()
+        .name("maximum-read-size")
+        .displayName("Maximum Read Size")
+        .description("The maximum number of network bytes to read for a single 
cache item")
+        .required(false)
+        .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+        .defaultValue("1 MB")
+        .build();
 
     private volatile CacheServer cacheServer;
 
@@ -79,6 +87,7 @@ public abstract class DistributedCacheServer extends 
AbstractControllerService {
         properties.add(EVICTION_POLICY);
         properties.add(PERSISTENCE_PATH);
         properties.add(SSL_CONTEXT_SERVICE);
+        properties.add(MAX_READ_SIZE);
         return properties;
     }
 
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/DistributedSetCacheServer.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/DistributedSetCacheServer.java
index a6ab0dc..cb81725 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/DistributedSetCacheServer.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/DistributedSetCacheServer.java
@@ -21,6 +21,7 @@ import javax.net.ssl.SSLContext;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.processor.DataUnit;
 import org.apache.nifi.ssl.SSLContextService;
 
 @Tags({"distributed", "set", "distinct", "cache", "server"})
@@ -35,6 +36,7 @@ public class DistributedSetCacheServer extends 
DistributedCacheServer {
         final SSLContextService sslContextService = 
context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
         final int maxSize = context.getProperty(MAX_CACHE_ENTRIES).asInteger();
         final String evictionPolicyName = 
context.getProperty(EVICTION_POLICY).getValue();
+        final int maxReadSize = 
context.getProperty(MAX_READ_SIZE).asDataSize(DataUnit.B).intValue();
 
         final SSLContext sslContext;
         if (sslContextService == null) {
@@ -61,7 +63,7 @@ public class DistributedSetCacheServer extends 
DistributedCacheServer {
         try {
             final File persistenceDir = persistencePath == null ? null : new 
File(persistencePath);
 
-            return new SetCacheServer(getIdentifier(), sslContext, port, 
maxSize, evictionPolicy, persistenceDir);
+            return new SetCacheServer(getIdentifier(), sslContext, port, 
maxSize, evictionPolicy, persistenceDir, maxReadSize);
         } catch (final Exception e) {
             throw new RuntimeException(e);
         }
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/SetCacheServer.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/SetCacheServer.java
index 3e6e7a9..5e1fb03 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/SetCacheServer.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/SetCacheServer.java
@@ -36,8 +36,8 @@ public class SetCacheServer extends AbstractCacheServer {
     private final SetCache cache;
 
     public SetCacheServer(final String identifier, final SSLContext 
sslContext, final int port, final int maxSize,
-            final EvictionPolicy evictionPolicy, final File persistencePath) 
throws IOException {
-        super(identifier, sslContext, port);
+                          final EvictionPolicy evictionPolicy, final File 
persistencePath, final int maxReadSize) throws IOException {
+        super(identifier, sslContext, port, maxReadSize);
 
         final SetCache simpleCache = new SimpleSetCache(identifier, maxSize, 
evictionPolicy);
 
@@ -60,9 +60,7 @@ public class SetCacheServer extends AbstractCacheServer {
             return false;
         }
 
-        final int valueLength = dis.readInt();
-        final byte[] value = new byte[valueLength];
-        dis.readFully(value);
+        final byte[] value = readValue(dis);
         final ByteBuffer valueBuffer = ByteBuffer.wrap(value);
 
         final SetCacheResult response;
@@ -101,5 +99,4 @@ public class SetCacheServer extends AbstractCacheServer {
             stop();
         }
     }
-
 }
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/DistributedMapCacheServer.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/DistributedMapCacheServer.java
index b07b12a..7987e14 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/DistributedMapCacheServer.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/DistributedMapCacheServer.java
@@ -26,6 +26,7 @@ import org.apache.nifi.controller.ConfigurationContext;
 import org.apache.nifi.distributed.cache.server.CacheServer;
 import org.apache.nifi.distributed.cache.server.DistributedCacheServer;
 import org.apache.nifi.distributed.cache.server.EvictionPolicy;
+import org.apache.nifi.processor.DataUnit;
 import org.apache.nifi.ssl.SSLContextService;
 
 @Tags({"distributed", "cluster", "map", "cache", "server", "key/value"})
@@ -41,6 +42,7 @@ public class DistributedMapCacheServer extends 
DistributedCacheServer {
         final SSLContextService sslContextService = 
context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
         final int maxSize = context.getProperty(MAX_CACHE_ENTRIES).asInteger();
         final String evictionPolicyName = 
context.getProperty(EVICTION_POLICY).getValue();
+        final int maxReadSize = 
context.getProperty(MAX_READ_SIZE).asDataSize(DataUnit.B).intValue();
 
         final SSLContext sslContext;
         if (sslContextService == null) {
@@ -67,14 +69,16 @@ public class DistributedMapCacheServer extends 
DistributedCacheServer {
         try {
             final File persistenceDir = persistencePath == null ? null : new 
File(persistencePath);
 
-            return createMapCacheServer(port, maxSize, sslContext, 
evictionPolicy, persistenceDir);
+            return createMapCacheServer(port, maxSize, sslContext, 
evictionPolicy, persistenceDir, maxReadSize);
         } catch (final Exception e) {
             throw new RuntimeException(e);
         }
     }
 
-    protected MapCacheServer createMapCacheServer(int port, int maxSize, 
SSLContext sslContext, EvictionPolicy evictionPolicy, File persistenceDir) 
throws IOException {
-        return new MapCacheServer(getIdentifier(), sslContext, port, maxSize, 
evictionPolicy, persistenceDir);
+    protected MapCacheServer createMapCacheServer(
+            final int port, final int maxSize, final SSLContext sslContext, 
final EvictionPolicy evictionPolicy,
+            final File persistenceDir, final int maxReadSize) throws 
IOException {
+        return new MapCacheServer(getIdentifier(), sslContext, port, maxSize, 
evictionPolicy, persistenceDir, maxReadSize);
     }
 
 }
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCacheServer.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCacheServer.java
index 57af28e..8986183 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCacheServer.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/main/java/org/apache/nifi/distributed/cache/server/map/MapCacheServer.java
@@ -38,8 +38,8 @@ public class MapCacheServer extends AbstractCacheServer {
     private final MapCache cache;
 
     public MapCacheServer(final String identifier, final SSLContext 
sslContext, final int port, final int maxSize,
-            final EvictionPolicy evictionPolicy, final File persistencePath) 
throws IOException {
-        super(identifier, sslContext, port);
+            final EvictionPolicy evictionPolicy, final File persistencePath, 
final int maxReadSize) throws IOException {
+        super(identifier, sslContext, port, maxReadSize);
 
         final MapCache simpleCache = new SimpleMapCache(identifier, maxSize, 
evictionPolicy);
 
@@ -123,7 +123,7 @@ public class MapCacheServer extends AbstractCacheServer {
                 break;
             }
             case "subMap": {
-                final int numKeys = dis.readInt();
+                final int numKeys = validateSize(dis.readInt());
                 for(int i=0;i<numKeys;i++) {
                     final byte[] key = readValue(dis);
                     final ByteBuffer existingValue = 
cache.get(ByteBuffer.wrap(key));
@@ -249,12 +249,4 @@ public class MapCacheServer extends AbstractCacheServer {
             stop();
         }
     }
-
-    private byte[] readValue(final DataInputStream dis) throws IOException {
-        final int numBytes = dis.readInt();
-        final byte[] buffer = new byte[numBytes];
-        dis.readFully(buffer);
-        return buffer;
-    }
-
 }
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/TestServerAndClient.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/TestServerAndClient.java
index 49ca249..916908b 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/TestServerAndClient.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-distributed-cache-services-bundle/nifi-distributed-cache-server/src/test/java/org/apache/nifi/distributed/cache/server/TestServerAndClient.java
@@ -19,6 +19,7 @@ package org.apache.nifi.distributed.cache.server;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThrows;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -33,6 +34,7 @@ import java.util.Map;
 import java.util.Set;
 
 import org.apache.commons.lang3.SerializationException;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.distributed.cache.client.AtomicCacheEntry;
 import org.apache.nifi.distributed.cache.client.Deserializer;
@@ -42,12 +44,14 @@ import org.apache.nifi.distributed.cache.client.Serializer;
 import 
org.apache.nifi.distributed.cache.client.exception.DeserializationException;
 import org.apache.nifi.distributed.cache.server.map.DistributedMapCacheServer;
 import org.apache.nifi.distributed.cache.server.map.MapCacheServer;
+import org.apache.nifi.processor.DataUnit;
 import org.apache.nifi.processor.Processor;
 import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.remote.StandardVersionNegotiator;
 import org.apache.nifi.reporting.InitializationException;
 import org.apache.nifi.util.MockConfigurationContext;
 import org.apache.nifi.util.MockControllerServiceInitializationContext;
+import org.apache.nifi.util.MockPropertyValue;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
 import org.junit.Test;
@@ -597,8 +601,8 @@ public class TestServerAndClient {
         // Create a server that only supports protocol version 1.
         final DistributedMapCacheServer server = new MapServer() {
             @Override
-            protected MapCacheServer createMapCacheServer(int port, int 
maxSize, SSLContext sslContext, EvictionPolicy evictionPolicy, File 
persistenceDir) throws IOException {
-                return new MapCacheServer(getIdentifier(), sslContext, port, 
maxSize, evictionPolicy, persistenceDir) {
+            protected MapCacheServer createMapCacheServer(int port, int 
maxSize, SSLContext sslContext, EvictionPolicy evictionPolicy, File 
persistenceDir, int maxReadSize) throws IOException {
+                return new MapCacheServer(getIdentifier(), sslContext, port, 
maxSize, evictionPolicy, persistenceDir, maxReadSize) {
                     @Override
                     protected StandardVersionNegotiator getVersionNegotiator() 
{
                         return new StandardVersionNegotiator(1);
@@ -666,6 +670,45 @@ public class TestServerAndClient {
         server.shutdownServer();
     }
 
+    @Test
+    public void testLimitServiceReadSizeMap() throws InitializationException, 
IOException {
+        final TestRunner runner = 
TestRunners.newTestRunner(Mockito.mock(Processor.class));
+        final DistributedMapCacheServer server = new MapServer();
+        runner.addControllerService("server", server);
+        runner.enableControllerService(server);
+
+        final DistributedMapCacheClientService client = 
createMapClient(server.getPort());
+        final Serializer<String> serializer = new StringSerializer();
+
+        final String key = "key";
+        final int maxReadSize = new 
MockPropertyValue(DistributedCacheServer.MAX_READ_SIZE.getDefaultValue()).asDataSize(DataUnit.B).intValue();
+        final int belowThreshold = maxReadSize / key.length();
+        final int aboveThreshold = belowThreshold + 1;
+        final String keyBelowThreshold = StringUtils.repeat(key, 
belowThreshold);
+        final String keyAboveThreshold = StringUtils.repeat(key, 
aboveThreshold);
+        assertFalse(client.containsKey(keyBelowThreshold, serializer));
+        assertThrows(IOException.class, () -> 
client.containsKey(keyAboveThreshold, serializer));
+    }
+
+    @Test
+    public void testLimitServiceReadSizeSet() throws InitializationException, 
IOException {
+        final TestRunner runner = 
TestRunners.newTestRunner(Mockito.mock(Processor.class));
+        final DistributedSetCacheServer server = new SetServer();
+        runner.addControllerService("server", server);
+        runner.enableControllerService(server);
+
+        final DistributedSetCacheClientService client = 
createClient(server.getPort());
+        final Serializer<String> serializer = new StringSerializer();
+
+        final String value = "value";
+        final int maxReadSize = new 
MockPropertyValue(DistributedCacheServer.MAX_READ_SIZE.getDefaultValue()).asDataSize(DataUnit.B).intValue();
+        final int belowThreshold = maxReadSize / value.length();
+        final int aboveThreshold = belowThreshold + 1;
+        final String valueBelowThreshold = StringUtils.repeat(value, 
belowThreshold);
+        final String valueAboveThreshold = StringUtils.repeat(value, 
aboveThreshold);
+        assertFalse(client.contains(valueBelowThreshold, serializer));
+        assertThrows(IOException.class, () -> 
client.contains(valueAboveThreshold, serializer));
+    }
 
     private void waitABit() {
         try {

Reply via email to