[ 
https://issues.apache.org/jira/browse/BEAM-10017?focusedWorklogId=435417&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-435417
 ]

ASF GitHub Bot logged work on BEAM-10017:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 20/May/20 10:18
            Start Date: 20/May/20 10:18
    Worklog Time Spent: 10m 
      Work Description: echauchot commented on a change in pull request #11732:
URL: https://github.com/apache/beam/pull/11732#discussion_r427861203



##########
File path: 
sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java
##########
@@ -106,19 +107,13 @@
  *
  * <h3>Cassandra Socket Options</h3>
  *
- * <p>The following example illustrates various options for tuning client 
socket:
+ * <p>The following example illustrates setting timeouts for the Cassandra 
client:

Review comment:
       Can you also change a leftover in the javadoc: An IO to read **and 
write** **from/to** Apache Cassandra ?

##########
File path: 
sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java
##########
@@ -330,6 +331,24 @@ private CassandraIO() {}
       return builder().setMinNumberOfSplits(minNumberOfSplits).build();
     }
 
+    /** Cassandra client socket option to set the connect timeout. */
+    public Read<T> withConnectTimeout(Integer timeout) {
+      return withConnectTimeout(ValueProvider.StaticValueProvider.of(timeout));
+    }
+
+    public Read<T> withConnectTimeout(ValueProvider<Integer> timeout) {

Review comment:
       - you need a javadoc for both methods as they are both public (2 
versions for backward compatibility as valueprovider was introduced lately)
   
   - specify that they are millis
   
   - add links to cassandra socketoptions setConnectTimeOut

##########
File path: 
sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java
##########
@@ -330,6 +331,24 @@ private CassandraIO() {}
       return builder().setMinNumberOfSplits(minNumberOfSplits).build();
     }
 
+    /** Cassandra client socket option to set the connect timeout. */
+    public Read<T> withConnectTimeout(Integer timeout) {
+      return withConnectTimeout(ValueProvider.StaticValueProvider.of(timeout));

Review comment:
       Add input values check (!= null && > 0) as in the other methods with the 
checkArgument call. As the value provider version method relies on this 
version, put the checkArgument here. Do not forget to put the validation in the 
other parameters methods

##########
File path: 
sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java
##########
@@ -330,6 +331,24 @@ private CassandraIO() {}
       return builder().setMinNumberOfSplits(minNumberOfSplits).build();
     }
 
+    /** Cassandra client socket option to set the connect timeout. */
+    public Read<T> withConnectTimeout(Integer timeout) {
+      return withConnectTimeout(ValueProvider.StaticValueProvider.of(timeout));
+    }
+
+    public Read<T> withConnectTimeout(ValueProvider<Integer> timeout) {
+      return builder().setConnectTimeout(timeout).build();
+    }
+
+    /** Cassandra client socket option to set the read timeout. */
+    public Read<T> withReadTimeout(Integer timeout) {
+      return withReadTimeout(ValueProvider.StaticValueProvider.of(timeout));
+    }
+
+    public Read<T> withReadTimeout(ValueProvider<Integer> timeout) {

Review comment:
       same as above

##########
File path: 
sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java
##########
@@ -948,6 +980,15 @@ public T getCurrent() throws NoSuchElementException {
       return builder().setConsistencyLevel(consistencyLevel).build();
     }
 
+    /** Cassandra client socket option for connect timeout. */
+    public Write<T> withConnectTimeout(Integer timeout) {
+      return withConnectTimeout(ValueProvider.StaticValueProvider.of(timeout));
+    }
+
+    public Write<T> withConnectTimeout(ValueProvider<Integer> timeout) {
+      return builder().setConnectTimeout(timeout).build();

Review comment:
       add read timeout

##########
File path: 
sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java
##########
@@ -1023,6 +1065,8 @@ private String getMutationTypeName() {
 
       abstract Builder<T> setMutationType(MutationType mutationType);
 
+      abstract Builder<T> setConnectTimeout(ValueProvider<Integer> timeout);

Review comment:
       add read timeout

##########
File path: 
sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java
##########
@@ -948,6 +980,15 @@ public T getCurrent() throws NoSuchElementException {
       return builder().setConsistencyLevel(consistencyLevel).build();
     }
 
+    /** Cassandra client socket option for connect timeout. */
+    public Write<T> withConnectTimeout(Integer timeout) {
+      return withConnectTimeout(ValueProvider.StaticValueProvider.of(timeout));
+    }
+
+    public Write<T> withConnectTimeout(ValueProvider<Integer> timeout) {

Review comment:
       Add withReadTimeout cf global review comment

##########
File path: 
sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java
##########
@@ -811,6 +840,9 @@ public T getCurrent() throws NoSuchElementException {
 
     abstract MutationType mutationType();
 
+    @Nullable

Review comment:
       Add readTimeout cf glocal review comment

##########
File path: 
sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java
##########
@@ -1116,6 +1163,18 @@ private static Cluster getCluster(
           new 
QueryOptions().setConsistencyLevel(ConsistencyLevel.valueOf(consistencyLevel.get())));
     }
 
+    SocketOptions socketOptions = new SocketOptions();
+
+    builder.withSocketOptions(socketOptions);
+
+    if (connectTimeout != null) {

Review comment:
       no more null check needed if both timeouts are set as part of Read and 
Write and if you add the validation of inputs in the with* methods

##########
File path: 
sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java
##########
@@ -1116,6 +1163,18 @@ private static Cluster getCluster(
           new 
QueryOptions().setConsistencyLevel(ConsistencyLevel.valueOf(consistencyLevel.get())));
     }
 
+    SocketOptions socketOptions = new SocketOptions();
+
+    builder.withSocketOptions(socketOptions);
+
+    if (connectTimeout != null) {
+      socketOptions.setConnectTimeoutMillis(connectTimeout.get());
+    }
+
+    if (readTimeout != null) {

Review comment:
       same as above

##########
File path: 
sdks/java/io/cassandra/src/main/java/org/apache/beam/sdk/io/cassandra/CassandraIO.java
##########
@@ -1142,7 +1201,9 @@ private static Cluster getCluster(
               spec.username(),
               spec.password(),
               spec.localDc(),
-              spec.consistencyLevel());
+              spec.consistencyLevel(),
+              spec.connectTimeout(),
+              null);

Review comment:
       pass the read timeout




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 435417)
    Time Spent: 1h  (was: 50m)

> Expose SocketOptions timeouts in CassandraIO
> --------------------------------------------
>
>                 Key: BEAM-10017
>                 URL: https://issues.apache.org/jira/browse/BEAM-10017
>             Project: Beam
>          Issue Type: Improvement
>          Components: io-java-cassandra
>            Reporter: Nathan Fisher
>            Priority: P3
>          Time Spent: 1h
>  Remaining Estimate: 0h
>
> Currently there are no options to tune the configuration of the CassandraIO 
> reader/writer. This can be useful for either slow clusters, large queries, or 
> high latency links.
> The intent would be to expose the following configuration elements as setters 
> on the CassandraIO builder similar to withKeyspace and other methods.
>  
> |{{[SocketOptions|https://docs.datastax.com/en/drivers/java/3.8/com/datastax/driver/core/SocketOptions.html]}}|{{[setConnectTimeoutMillis|https://docs.datastax.com/en/drivers/java/3.8/com/datastax/driver/core/SocketOptions.html#setConnectTimeoutMillis-int-](int
>  connectTimeoutMillis)}}
> Sets the connection timeout in milliseconds.|
> |{{[SocketOptions|https://docs.datastax.com/en/drivers/java/3.8/com/datastax/driver/core/SocketOptions.html]}}|{{[setKeepAlive|https://docs.datastax.com/en/drivers/java/3.8/com/datastax/driver/core/SocketOptions.html#setKeepAlive-boolean-](boolean
>  keepAlive)}}
> Sets whether to enable TCP keepalive.|
> |{{[SocketOptions|https://docs.datastax.com/en/drivers/java/3.8/com/datastax/driver/core/SocketOptions.html]}}|{{[setReadTimeoutMillis|https://docs.datastax.com/en/drivers/java/3.8/com/datastax/driver/core/SocketOptions.html#setReadTimeoutMillis-int-](int
>  readTimeoutMillis)}}
> Sets the per-host read timeout in milliseconds.|
> |{{[SocketOptions|https://docs.datastax.com/en/drivers/java/3.8/com/datastax/driver/core/SocketOptions.html]}}|{{[setReceiveBufferSize|https://docs.datastax.com/en/drivers/java/3.8/com/datastax/driver/core/SocketOptions.html#setReceiveBufferSize-int-](int
>  receiveBufferSize)}}
> Sets a hint to the size of the underlying buffers for incoming network I/O.|
> |{{[SocketOptions|https://docs.datastax.com/en/drivers/java/3.8/com/datastax/driver/core/SocketOptions.html]}}|{{[setReuseAddress|https://docs.datastax.com/en/drivers/java/3.8/com/datastax/driver/core/SocketOptions.html#setReuseAddress-boolean-](boolean
>  reuseAddress)}}
> Sets whether to enable reuse-address.|
> |{{[SocketOptions|https://docs.datastax.com/en/drivers/java/3.8/com/datastax/driver/core/SocketOptions.html]}}|{{[setSendBufferSize|https://docs.datastax.com/en/drivers/java/3.8/com/datastax/driver/core/SocketOptions.html#setSendBufferSize-int-](int
>  sendBufferSize)}}
> Sets a hint to the size of the underlying buffers for outgoing network I/O.|
> |{{[SocketOptions|https://docs.datastax.com/en/drivers/java/3.8/com/datastax/driver/core/SocketOptions.html]}}|{{[setSoLinger|https://docs.datastax.com/en/drivers/java/3.8/com/datastax/driver/core/SocketOptions.html#setSoLinger-int-](int
>  soLinger)}}
> Sets the linger-on-close timeout.|
> |{{[SocketOptions|https://docs.datastax.com/en/drivers/java/3.8/com/datastax/driver/core/SocketOptions.html]}}|{{[setTcpNoDelay|https://docs.datastax.com/en/drivers/java/3.8/com/datastax/driver/core/SocketOptions.html#setTcpNoDelay-boolean-](boolean
>  tcpNoDelay)}}
> Sets whether to disable Nagle's algorithm.|



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to