This is an automated email from the ASF dual-hosted git repository. turcsanyi pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/master by this push: new c2f46c4 NIFI-7586 In CassandraSesionProvider added properties to set socket-level read timeout and connect timeout. c2f46c4 is described below commit c2f46c44ca29a07a0f418f6d46845f7ae7bccf91 Author: Tamas Palfy <tamas.bertalan.pa...@gmail.com> AuthorDate: Mon Jun 29 16:39:58 2020 +0200 NIFI-7586 In CassandraSesionProvider added properties to set socket-level read timeout and connect timeout. In QueryCassandra when writing flowfile to the sesion it's done on the raw OutputStream. Wrapped it in a BufferedOutputStream for better performance. This closes #4368. Signed-off-by: Peter Turcsanyi <turcsa...@apache.org> --- .../nifi/processors/cassandra/QueryCassandra.java | 5 ++- .../nifi/service/CassandraSessionProvider.java | 43 +++++++++++++++++++++- .../nifi/service/TestCassandraSessionProvider.java | 2 +- 3 files changed, 45 insertions(+), 5 deletions(-) diff --git a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/QueryCassandra.java b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/QueryCassandra.java index 197e983..ebad736 100644 --- a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/QueryCassandra.java +++ b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/QueryCassandra.java @@ -56,6 +56,7 @@ import org.apache.nifi.processor.io.OutputStreamCallback; import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.util.StopWatch; +import java.io.BufferedOutputStream; import java.io.IOException; import java.io.OutputStream; import java.nio.charset.Charset; @@ -210,8 +211,8 @@ public class QueryCassandra extends AbstractCassandraProcessor { fileToProcess = session.write(fileToProcess, new OutputStreamCallback() { @Override - public void process(final OutputStream out) throws IOException { - try { + public void process(final OutputStream rawOut) throws IOException { + try (final OutputStream out = new BufferedOutputStream(rawOut)) { logger.debug("Executing CQL query {}", new Object[]{selectQuery}); final ResultSet resultSet; if (queryTimeout > 0) { diff --git a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-services/src/main/java/org/apache/nifi/service/CassandraSessionProvider.java b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-services/src/main/java/org/apache/nifi/service/CassandraSessionProvider.java index e645edc..a917ecb 100644 --- a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-services/src/main/java/org/apache/nifi/service/CassandraSessionProvider.java +++ b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-services/src/main/java/org/apache/nifi/service/CassandraSessionProvider.java @@ -25,7 +25,9 @@ import com.datastax.driver.core.Session; import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.List; +import java.util.Optional; import javax.net.ssl.SSLContext; +import com.datastax.driver.core.SocketOptions; import org.apache.commons.lang3.StringUtils; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; @@ -126,6 +128,24 @@ public class CassandraSessionProvider extends AbstractControllerService implemen .defaultValue("NONE") .build(); + static final PropertyDescriptor READ_TIMEOUT_MS = new PropertyDescriptor.Builder() + .name("read-timeout-ms") + .displayName("Read Timout (ms)") + .description("Read timeout (in milliseconds). 0 means no timeout. If no value is set, the underlying default will be used.") + .required(false) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR) + .build(); + + static final PropertyDescriptor CONNECT_TIMEOUT_MS = new PropertyDescriptor.Builder() + .name("connect-timeout-ms") + .displayName("Connect Timout (ms)") + .description("Connection timeout (in milliseconds). 0 means no timeout. If no value is set, the underlying default will be used.") + .required(false) + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR) + .build(); + private List<PropertyDescriptor> properties; private Cluster cluster; private Session cassandraSession; @@ -142,6 +162,8 @@ public class CassandraSessionProvider extends AbstractControllerService implemen props.add(USERNAME); props.add(PASSWORD); props.add(PROP_SSL_CONTEXT_SERVICE); + props.add(READ_TIMEOUT_MS); + props.add(CONNECT_TIMEOUT_MS); properties = props; } @@ -238,8 +260,18 @@ public class CassandraSessionProvider extends AbstractControllerService implemen password = null; } + PropertyValue readTimeoutMillisProperty = context.getProperty(READ_TIMEOUT_MS).evaluateAttributeExpressions(); + Optional<Integer> readTimeoutMillisOptional = Optional.ofNullable(readTimeoutMillisProperty) + .filter(PropertyValue::isSet) + .map(PropertyValue::asInteger); + + PropertyValue connectTimeoutMillisProperty = context.getProperty(CONNECT_TIMEOUT_MS).evaluateAttributeExpressions(); + Optional<Integer> connectTimeoutMillisOptional = Optional.ofNullable(connectTimeoutMillisProperty) + .filter(PropertyValue::isSet) + .map(PropertyValue::asInteger); + // Create the cluster and connect to it - Cluster newCluster = createCluster(contactPoints, sslContext, username, password, compressionType); + Cluster newCluster = createCluster(contactPoints, sslContext, username, password, compressionType, readTimeoutMillisOptional, connectTimeoutMillisOptional); PropertyValue keyspaceProperty = context.getProperty(KEYSPACE).evaluateAttributeExpressions(); final Session newSession; if (keyspaceProperty != null) { @@ -277,7 +309,8 @@ public class CassandraSessionProvider extends AbstractControllerService implemen } private Cluster createCluster(List<InetSocketAddress> contactPoints, SSLContext sslContext, - String username, String password, String compressionType) { + String username, String password, String compressionType, + Optional<Integer> readTimeoutMillisOptional, Optional<Integer> connectTimeoutMillisOptional) { Cluster.Builder builder = Cluster.builder().addContactPointsWithPorts(contactPoints); if (sslContext != null) { @@ -297,6 +330,12 @@ public class CassandraSessionProvider extends AbstractControllerService implemen builder = builder.withCompression(ProtocolOptions.Compression.LZ4); } + SocketOptions socketOptions = new SocketOptions(); + readTimeoutMillisOptional.ifPresent(socketOptions::setReadTimeoutMillis); + connectTimeoutMillisOptional.ifPresent(socketOptions::setConnectTimeoutMillis); + + builder.withSocketOptions(socketOptions); + return builder.build(); } } diff --git a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-services/src/test/java/org/apache/nifi/service/TestCassandraSessionProvider.java b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-services/src/test/java/org/apache/nifi/service/TestCassandraSessionProvider.java index 2f49d02..85d2d9a 100644 --- a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-services/src/test/java/org/apache/nifi/service/TestCassandraSessionProvider.java +++ b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-services/src/test/java/org/apache/nifi/service/TestCassandraSessionProvider.java @@ -46,7 +46,7 @@ public class TestCassandraSessionProvider { public void testGetPropertyDescriptors() { List<PropertyDescriptor> properties = sessionProvider.getPropertyDescriptors(); - assertEquals(8, properties.size()); + assertEquals(10, properties.size()); assertTrue(properties.contains(CassandraSessionProvider.CLIENT_AUTH)); assertTrue(properties.contains(CassandraSessionProvider.CONSISTENCY_LEVEL)); assertTrue(properties.contains(CassandraSessionProvider.CONTACT_POINTS));