This is an automated email from the ASF dual-hosted git repository. trohrmann pushed a commit to branch release-1.14 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 95777d1b7e4be231aaf6cec5a7b26c75206f8c50 Author: Etienne Chauchot <echauc...@apache.org> AuthorDate: Mon Dec 27 18:13:41 2021 +0100 [FLINK-25415] Add retries to CasandraConnectorITCase Add 3 retrials to all tests and to startAndInitializeCassandra() methods in CassandraConnectorITCase upon NoHostAvailableException which happens under load when cluster.connect() is called. --- .../cassandra/CassandraConnectorITCase.java | 44 +++++++++++++++++++--- 1 file changed, 39 insertions(+), 5 deletions(-) diff --git a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java index 78ce453..50a137d 100644 --- a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java +++ b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java @@ -46,6 +46,8 @@ import org.apache.flink.streaming.api.functions.sink.SinkContextUtil; import org.apache.flink.streaming.runtime.operators.WriteAheadSinkTestBase; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.table.api.internal.TableEnvironmentInternal; +import org.apache.flink.testutils.junit.RetryOnException; +import org.apache.flink.testutils.junit.RetryRule; import org.apache.flink.types.Row; import org.apache.flink.util.DockerImageVersions; @@ -54,14 +56,17 @@ import com.datastax.driver.core.ConsistencyLevel; import com.datastax.driver.core.QueryOptions; import com.datastax.driver.core.ResultSet; import com.datastax.driver.core.Session; +import com.datastax.driver.core.exceptions.NoHostAvailableException; import com.datastax.driver.mapping.Mapper; import org.junit.AfterClass; import org.junit.Assert; import org.junit.Before; import org.junit.BeforeClass; import org.junit.ClassRule; -import org.junit.Ignore; +import org.junit.Rule; import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.testcontainers.containers.CassandraContainer; import java.net.InetSocketAddress; @@ -83,7 +88,8 @@ import static org.junit.Assert.assertTrue; /** IT cases for all cassandra sinks. */ @SuppressWarnings("serial") -@Ignore(value = "Flaky test") +// NoHostAvailableException is raised by Cassandra client under load while connecting to the cluster +@RetryOnException(times = 3, exception = NoHostAvailableException.class) public class CassandraConnectorITCase extends WriteAheadSinkTestBase< Tuple3<String, Integer, Integer>, @@ -92,6 +98,12 @@ public class CassandraConnectorITCase @ClassRule public static final CassandraContainer CASSANDRA_CONTAINER = createCassandraContainer(); + private static final int MAX_CONNECTION_RETRY = 3; + private static final long CONNECTION_RETRY_DELAY = 500L; + private static final Logger LOG = LoggerFactory.getLogger(CassandraConnectorITCase.class); + + @Rule public final RetryRule retryRule = new RetryRule(); + private static final int PORT = 9042; private static Cluster cluster; @@ -126,8 +138,7 @@ public class CassandraConnectorITCase private static final String TABLE_NAME_VARIABLE = "$TABLE"; private static final String CREATE_KEYSPACE_QUERY = "CREATE KEYSPACE flink WITH replication= {'class':'SimpleStrategy', 'replication_factor':1};"; - private static final String DROP_KEYSPACE_QUERY = - "DROP KEYSPACE IF EXISTS flink ;"; + private static final String DROP_KEYSPACE_QUERY = "DROP KEYSPACE IF EXISTS flink ;"; private static final String CREATE_TABLE_QUERY = "CREATE TABLE flink." + TABLE_NAME_VARIABLE @@ -168,7 +179,30 @@ public class CassandraConnectorITCase // CASSANDRA_CONTAINER#start() already contains retrials CASSANDRA_CONTAINER.start(); cluster = CASSANDRA_CONTAINER.getCluster(); - session = cluster.connect(); + int retried = 0; + while (retried < MAX_CONNECTION_RETRY) { + try { + session = cluster.connect(); + break; + } catch (NoHostAvailableException e) { + retried++; + LOG.debug( + "Connection failed with NoHostAvailableException : retry number {}, will retry to connect within {} ms", + retried, + CONNECTION_RETRY_DELAY); + if (retried == MAX_CONNECTION_RETRY) { + throw new RuntimeException( + String.format( + "Failed to connect to Cassandra cluster after %d retries every %d ms", + retried, CONNECTION_RETRY_DELAY), + e); + } + try { + Thread.sleep(CONNECTION_RETRY_DELAY); + } catch (InterruptedException ignored) { + } + } + } session.execute(DROP_KEYSPACE_QUERY); session.execute(CREATE_KEYSPACE_QUERY); session.execute(