This is an automated email from the ASF dual-hosted git repository. valdar pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
commit 58540c6bd631dddfc6b0507dc09482fd7ab16cb9 Author: Andrea Tarocchi <andrea.taroc...@gmail.com> AuthorDate: Fri Oct 28 18:34:07 2022 +0200 Fixed tests. --- .../aws/v2/clients/AWSSQSClient.java | 2 ++ .../aws/v2/sns/sink/CamelSinkAWSSNSITCase.java | 2 +- .../couchbase/sink/CamelSinkCouchbaseITCase.java | 31 ++++++++++++++-------- 3 files changed, 23 insertions(+), 12 deletions(-) diff --git a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/clients/AWSSQSClient.java b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/clients/AWSSQSClient.java index cad0fee33..b4c0d33c8 100644 --- a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/clients/AWSSQSClient.java +++ b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/clients/AWSSQSClient.java @@ -62,6 +62,8 @@ public class AWSSQSClient { .queueName(queue) .build(); + LOG.debug("Queue: {} QueueName: {} createFifoQueueRequest: {}", queue, createFifoQueueRequest.queueName(), createFifoQueueRequest); + CreateQueueResponse response = sqs.createQueue(createFifoQueueRequest); if (response.sdkHttpResponse().isSuccessful()) { diff --git a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/sns/sink/CamelSinkAWSSNSITCase.java b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/sns/sink/CamelSinkAWSSNSITCase.java index 2389a47be..b3408d7da 100644 --- a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/sns/sink/CamelSinkAWSSNSITCase.java +++ b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/sns/sink/CamelSinkAWSSNSITCase.java @@ -115,7 +115,7 @@ public class CamelSinkAWSSNSITCase extends CamelSinkTestSupport { protected void consumeMessages(CountDownLatch latch) { try { - awsSqsClient.receive(sqsQueueUrl, this::checkMessages); + awsSqsClient.receive(queueName, this::checkMessages); } catch (Throwable t) { LOG.error("Failed to consume messages: {}", t.getMessage(), t); fail(t.getMessage()); diff --git a/tests/itests-couchbase/src/test/java/org/apache/camel/kafkaconnector/couchbase/sink/CamelSinkCouchbaseITCase.java b/tests/itests-couchbase/src/test/java/org/apache/camel/kafkaconnector/couchbase/sink/CamelSinkCouchbaseITCase.java index 25dc25ce3..4fff797ea 100644 --- a/tests/itests-couchbase/src/test/java/org/apache/camel/kafkaconnector/couchbase/sink/CamelSinkCouchbaseITCase.java +++ b/tests/itests-couchbase/src/test/java/org/apache/camel/kafkaconnector/couchbase/sink/CamelSinkCouchbaseITCase.java @@ -43,6 +43,7 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestInstance; import org.junit.jupiter.api.Timeout; +import org.junit.jupiter.api.condition.EnabledIfSystemProperty; import org.junit.jupiter.api.extension.RegisterExtension; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -51,13 +52,13 @@ import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.fail; /* - This test is slow and flaky. It tends to fail on systems with limited resources and slow I/O. Therefore, it is - disabled by default. Also, suffers from bugs in the couchbase test container: + This test is slow and potentially flaky. It might fail on systems with limited resources and slow I/O. + Most probably due to this bug in the couchbase test container: - https://github.com/testcontainers/testcontainers-java/issues/2993 - Therefore, this test is marked as flaky and only runs if specifically enabled. + Therefore, it is marked as slow test and must be explicitly enabled to be run. */ -//@EnabledIfSystemProperty(named = "enable.flaky.tests", matches = "true") +@EnabledIfSystemProperty(named = "enable.slow.tests", matches = "true") @TestInstance(TestInstance.Lifecycle.PER_CLASS) public class CamelSinkCouchbaseITCase extends CamelSinkTestSupport { @RegisterExtension @@ -116,13 +117,8 @@ public class CamelSinkCouchbaseITCase extends CamelSinkTestSupport { topic = getTopicForTest(this); - try { - String startDelay = System.getProperty("couchbase.test.start.delay", "1000"); - - int delay = Integer.parseInt(startDelay); - Thread.sleep(delay); - } catch (InterruptedException e) { - Thread.currentThread().interrupted(); + if (!TestUtils.waitFor(this::isQueryServiceUp)) { + fail("Query Service failed to become ready in 30 seconds."); } } @@ -191,6 +187,19 @@ public class CamelSinkCouchbaseITCase extends CamelSinkTestSupport { return false; } + private boolean isQueryServiceUp() { + try { + String query = String.format("select count(*) as count from `%s`", bucketName); + QueryResult queryResult = cluster.query(query); + queryResult.rowsAsObject(); + + return true; + } catch (Exception e) { + LOG.warn("Exception while checking if Query service is up: {}", e.getMessage(), e); + return false; + } + } + private void verifyRecords() { String query = String.format("select * from `%s` USE KEYS \"1\"", bucketName); QueryResult queryResult = cluster.query(query);