This is an automated email from the ASF dual-hosted git repository.

turcsanyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/master by this push:
     new c2f46c4  NIFI-7586 In CassandraSesionProvider added properties to set 
socket-level read timeout and connect timeout.
c2f46c4 is described below

commit c2f46c44ca29a07a0f418f6d46845f7ae7bccf91
Author: Tamas Palfy <tamas.bertalan.pa...@gmail.com>
AuthorDate: Mon Jun 29 16:39:58 2020 +0200

    NIFI-7586 In CassandraSesionProvider added properties to set socket-level 
read timeout and connect timeout.
    
    In QueryCassandra when writing flowfile to the sesion it's done on the raw 
OutputStream.
    Wrapped it in a BufferedOutputStream for better performance.
    
    This closes #4368.
    
    Signed-off-by: Peter Turcsanyi <turcsa...@apache.org>
---
 .../nifi/processors/cassandra/QueryCassandra.java  |  5 ++-
 .../nifi/service/CassandraSessionProvider.java     | 43 +++++++++++++++++++++-
 .../nifi/service/TestCassandraSessionProvider.java |  2 +-
 3 files changed, 45 insertions(+), 5 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/QueryCassandra.java
 
b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/QueryCassandra.java
index 197e983..ebad736 100644
--- 
a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/QueryCassandra.java
+++ 
b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/QueryCassandra.java
@@ -56,6 +56,7 @@ import org.apache.nifi.processor.io.OutputStreamCallback;
 import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.util.StopWatch;
 
+import java.io.BufferedOutputStream;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.nio.charset.Charset;
@@ -210,8 +211,8 @@ public class QueryCassandra extends 
AbstractCassandraProcessor {
 
             fileToProcess = session.write(fileToProcess, new 
OutputStreamCallback() {
                 @Override
-                public void process(final OutputStream out) throws IOException 
{
-                    try {
+                public void process(final OutputStream rawOut) throws 
IOException {
+                    try (final OutputStream out = new 
BufferedOutputStream(rawOut)) {
                         logger.debug("Executing CQL query {}", new 
Object[]{selectQuery});
                         final ResultSet resultSet;
                         if (queryTimeout > 0) {
diff --git 
a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-services/src/main/java/org/apache/nifi/service/CassandraSessionProvider.java
 
b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-services/src/main/java/org/apache/nifi/service/CassandraSessionProvider.java
index e645edc..a917ecb 100644
--- 
a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-services/src/main/java/org/apache/nifi/service/CassandraSessionProvider.java
+++ 
b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-services/src/main/java/org/apache/nifi/service/CassandraSessionProvider.java
@@ -25,7 +25,9 @@ import com.datastax.driver.core.Session;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Optional;
 import javax.net.ssl.SSLContext;
+import com.datastax.driver.core.SocketOptions;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.Tags;
@@ -126,6 +128,24 @@ public class CassandraSessionProvider extends 
AbstractControllerService implemen
             .defaultValue("NONE")
             .build();
 
+    static final PropertyDescriptor READ_TIMEOUT_MS = new 
PropertyDescriptor.Builder()
+        .name("read-timeout-ms")
+        .displayName("Read Timout (ms)")
+        .description("Read timeout (in milliseconds). 0 means no timeout. If 
no value is set, the underlying default will be used.")
+        .required(false)
+        .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+        .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
+        .build();
+
+    static final PropertyDescriptor CONNECT_TIMEOUT_MS = new 
PropertyDescriptor.Builder()
+        .name("connect-timeout-ms")
+        .displayName("Connect Timout (ms)")
+        .description("Connection timeout (in milliseconds). 0 means no 
timeout. If no value is set, the underlying default will be used.")
+        .required(false)
+        .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+        .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
+        .build();
+
     private List<PropertyDescriptor> properties;
     private Cluster cluster;
     private Session cassandraSession;
@@ -142,6 +162,8 @@ public class CassandraSessionProvider extends 
AbstractControllerService implemen
         props.add(USERNAME);
         props.add(PASSWORD);
         props.add(PROP_SSL_CONTEXT_SERVICE);
+        props.add(READ_TIMEOUT_MS);
+        props.add(CONNECT_TIMEOUT_MS);
 
         properties = props;
     }
@@ -238,8 +260,18 @@ public class CassandraSessionProvider extends 
AbstractControllerService implemen
                 password = null;
             }
 
+            PropertyValue readTimeoutMillisProperty = 
context.getProperty(READ_TIMEOUT_MS).evaluateAttributeExpressions();
+            Optional<Integer> readTimeoutMillisOptional = 
Optional.ofNullable(readTimeoutMillisProperty)
+                .filter(PropertyValue::isSet)
+                .map(PropertyValue::asInteger);
+
+            PropertyValue connectTimeoutMillisProperty = 
context.getProperty(CONNECT_TIMEOUT_MS).evaluateAttributeExpressions();
+            Optional<Integer> connectTimeoutMillisOptional = 
Optional.ofNullable(connectTimeoutMillisProperty)
+                .filter(PropertyValue::isSet)
+                .map(PropertyValue::asInteger);
+
             // Create the cluster and connect to it
-            Cluster newCluster = createCluster(contactPoints, sslContext, 
username, password, compressionType);
+            Cluster newCluster = createCluster(contactPoints, sslContext, 
username, password, compressionType, readTimeoutMillisOptional, 
connectTimeoutMillisOptional);
             PropertyValue keyspaceProperty = 
context.getProperty(KEYSPACE).evaluateAttributeExpressions();
             final Session newSession;
             if (keyspaceProperty != null) {
@@ -277,7 +309,8 @@ public class CassandraSessionProvider extends 
AbstractControllerService implemen
     }
 
     private Cluster createCluster(List<InetSocketAddress> contactPoints, 
SSLContext sslContext,
-                                  String username, String password, String 
compressionType) {
+                                  String username, String password, String 
compressionType,
+                                  Optional<Integer> readTimeoutMillisOptional, 
Optional<Integer> connectTimeoutMillisOptional) {
         Cluster.Builder builder = 
Cluster.builder().addContactPointsWithPorts(contactPoints);
 
         if (sslContext != null) {
@@ -297,6 +330,12 @@ public class CassandraSessionProvider extends 
AbstractControllerService implemen
             builder = builder.withCompression(ProtocolOptions.Compression.LZ4);
         }
 
+        SocketOptions socketOptions = new SocketOptions();
+        
readTimeoutMillisOptional.ifPresent(socketOptions::setReadTimeoutMillis);
+        
connectTimeoutMillisOptional.ifPresent(socketOptions::setConnectTimeoutMillis);
+
+        builder.withSocketOptions(socketOptions);
+
         return builder.build();
     }
 }
diff --git 
a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-services/src/test/java/org/apache/nifi/service/TestCassandraSessionProvider.java
 
b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-services/src/test/java/org/apache/nifi/service/TestCassandraSessionProvider.java
index 2f49d02..85d2d9a 100644
--- 
a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-services/src/test/java/org/apache/nifi/service/TestCassandraSessionProvider.java
+++ 
b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-services/src/test/java/org/apache/nifi/service/TestCassandraSessionProvider.java
@@ -46,7 +46,7 @@ public class TestCassandraSessionProvider {
     public void testGetPropertyDescriptors() {
         List<PropertyDescriptor> properties = 
sessionProvider.getPropertyDescriptors();
 
-        assertEquals(8, properties.size());
+        assertEquals(10, properties.size());
         assertTrue(properties.contains(CassandraSessionProvider.CLIENT_AUTH));
         
assertTrue(properties.contains(CassandraSessionProvider.CONSISTENCY_LEVEL));
         
assertTrue(properties.contains(CassandraSessionProvider.CONTACT_POINTS));

Reply via email to