This is an automated email from the ASF dual-hosted git repository.

fpaul pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 9d44bc0e973cca6d03ed222158b7b5ce70306ab0
Author: Etienne Chauchot <echauc...@apache.org>
AuthorDate: Tue Jan 25 16:16:04 2022 +0100

    [FLINK-25771][connectors][Cassandra][test] Raise all 
read/write/miscellaneous requests timeouts
---
 .../cassandra/CassandraConnectorITCase.java        | 41 ++++++++++++++++++++--
 1 file changed, 38 insertions(+), 3 deletions(-)

diff --git 
a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
 
b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
index cd85989..c1681d5 100644
--- 
a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
+++ 
b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
@@ -66,11 +66,16 @@ import org.junit.BeforeClass;
 import org.junit.ClassRule;
 import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testcontainers.containers.CassandraContainer;
 
+import java.io.IOException;
 import java.net.InetSocketAddress;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Comparator;
@@ -81,6 +86,8 @@ import java.util.UUID;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+
 import scala.collection.JavaConverters;
 import scala.collection.Seq;
 
@@ -97,15 +104,17 @@ public class CassandraConnectorITCase
                 Tuple3<String, Integer, Integer>,
                 CassandraTupleWriteAheadSink<Tuple3<String, Integer, 
Integer>>> {
 
-    @ClassRule
-    public static final CassandraContainer CASSANDRA_CONTAINER = 
createCassandraContainer();
-
     private static final int MAX_CONNECTION_RETRY = 3;
     private static final long CONNECTION_RETRY_DELAY = 500L;
     private static final Logger LOG = 
LoggerFactory.getLogger(CassandraConnectorITCase.class);
+    private static final Slf4jLogConsumer LOG_CONSUMER = new 
Slf4jLogConsumer(LOG);
     private static final String TABLE_POJO = "test";
     private static final String TABLE_POJO_NO_ANNOTATED_KEYSPACE = 
"testPojoNoAnnotatedKeyspace";
 
+    @ClassRule
+    public static final TemporaryFolder TEMPORARY_FOLDER = new 
TemporaryFolder();
+    @ClassRule
+    public static final CassandraContainer CASSANDRA_CONTAINER = 
createCassandraContainer();
     @Rule public final RetryRule retryRule = new RetryRule();
 
     private static final int PORT = 9042;
@@ -199,11 +208,13 @@ public class CassandraConnectorITCase
     public static CassandraContainer createCassandraContainer() {
         CassandraContainer cassandra = new 
CassandraContainer(DockerImageVersions.CASSANDRA_3);
         cassandra.withJmxReporting(false);
+        cassandra.withLogConsumer(LOG_CONSUMER);
         return cassandra;
     }
 
     @BeforeClass
     public static void startAndInitializeCassandra() {
+        raiseCassandraRequestsTimeouts();
         // CASSANDRA_CONTAINER#start() already contains retrials
         CASSANDRA_CONTAINER.start();
         cluster = CASSANDRA_CONTAINER.getCluster();
@@ -237,6 +248,30 @@ public class CassandraConnectorITCase
                 CREATE_TABLE_QUERY.replace(TABLE_NAME_VARIABLE, 
TABLE_NAME_PREFIX + "initial"));
     }
 
+    private static void raiseCassandraRequestsTimeouts() {
+        try {
+            final Path configurationPath = TEMPORARY_FOLDER.newFile().toPath();
+            CASSANDRA_CONTAINER.copyFileFromContainer(
+                    "/etc/cassandra/cassandra.yaml", 
configurationPath.toAbsolutePath().toString());
+            String configuration =
+                    new String(Files.readAllBytes(configurationPath), 
StandardCharsets.UTF_8);
+            String patchedConfiguration =
+                    configuration
+                            .replaceAll("request_timeout_in_ms: [0-9]+", 
"request_timeout_in_ms: 30000")
+                            .replaceAll(
+                                    "read_request_timeout_in_ms: [0-9]+",
+                                    "read_request_timeout_in_ms: 15000")
+                            .replaceAll(
+                                    "write_request_timeout_in_ms: [0-9]+",
+                                    "write_request_timeout_in_ms: 6000");
+            Files.write(configurationPath, 
patchedConfiguration.getBytes(StandardCharsets.UTF_8));
+            CASSANDRA_CONTAINER.withConfigurationOverride(
+                    configurationPath.toAbsolutePath().toString());
+        } catch (IOException e) {
+            throw new RuntimeException("Unable to open Cassandra configuration 
file ", e);
+        }
+    }
+
     @Before
     public void createTable() {
         tableID = random.nextInt(Integer.MAX_VALUE);

Reply via email to