This is an automated email from the ASF dual-hosted git repository.

orpiske pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit 11ca8b0f84da9047cdca3dfd0131fceff3ea75c9
Author: Otavio Rodolfo Piske <opi...@redhat.com>
AuthorDate: Fri Jan 22 10:08:47 2021 +0100

    Created a base test class for AWS sink tests
---
 .../aws/v2/common/CamelSinkAWSTestSupport.java     | 70 ++++++++++++++++++++++
 .../aws/v2/sqs/sink/CamelSinkAWSSQSITCase.java     | 62 +++++--------------
 2 files changed, 86 insertions(+), 46 deletions(-)

diff --git 
a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/common/CamelSinkAWSTestSupport.java
 
b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/common/CamelSinkAWSTestSupport.java
new file mode 100644
index 0000000..c42cb36
--- /dev/null
+++ 
b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/common/CamelSinkAWSTestSupport.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.kafkaconnector.aws.v2.common;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import org.apache.camel.kafkaconnector.common.AbstractKafkaTest;
+import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
+import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient;
+import org.apache.camel.kafkaconnector.common.utils.TestUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.junit.jupiter.api.Assertions.fail;
+
+public abstract class CamelSinkAWSTestSupport extends AbstractKafkaTest {
+    private static final Logger LOG = 
LoggerFactory.getLogger(CamelSinkAWSTestSupport.class);
+
+
+    protected void produceMessages(int count)  {
+        try {
+            KafkaClient<String, String> kafkaClient = new 
KafkaClient<>(getKafkaService().getBootstrapServers());
+
+            for (int i = 0; i < count; i++) {
+                
kafkaClient.produce(TestUtils.getDefaultTestTopic(this.getClass()), "Sink test 
message " + i);
+            }
+        } catch (Throwable t) {
+            LOG.error("Unable to publish messages to the broker: {}", 
t.getMessage(), t);
+            fail(String.format("Unable to publish messages to the broker: %s", 
t.getMessage()));
+        }
+    }
+
+    public void runTest(ConnectorPropertyFactory connectorPropertyFactory, int 
count) throws Exception {
+        connectorPropertyFactory.log();
+        
getKafkaConnectService().initializeConnectorBlocking(connectorPropertyFactory, 
1);
+
+        LOG.debug("Creating the consumer ...");
+        ExecutorService service = Executors.newCachedThreadPool();
+
+        CountDownLatch latch = new CountDownLatch(1);
+        service.submit(() -> consumeMessages(latch));
+
+        LOG.debug("Creating the producer and sending messages ...");
+        produceMessages(count);
+
+        LOG.debug("Waiting for the test to complete");
+        verifyMessages(latch);
+    }
+
+    protected abstract void consumeMessages(CountDownLatch latch);
+
+    protected abstract void verifyMessages(CountDownLatch latch) throws 
InterruptedException;
+}
diff --git 
a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/sqs/sink/CamelSinkAWSSQSITCase.java
 
b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/sqs/sink/CamelSinkAWSSQSITCase.java
index 0f770bc..6cc9b79 100644
--- 
a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/sqs/sink/CamelSinkAWSSQSITCase.java
+++ 
b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/sqs/sink/CamelSinkAWSSQSITCase.java
@@ -20,14 +20,11 @@ package org.apache.camel.kafkaconnector.aws.v2.sqs.sink;
 import java.util.List;
 import java.util.Properties;
 import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.camel.kafkaconnector.aws.v2.clients.AWSSQSClient;
-import org.apache.camel.kafkaconnector.common.AbstractKafkaTest;
+import org.apache.camel.kafkaconnector.aws.v2.common.CamelSinkAWSTestSupport;
 import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
-import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient;
 import org.apache.camel.kafkaconnector.common.utils.TestUtils;
 import org.apache.camel.test.infra.aws.common.AWSCommon;
 import org.apache.camel.test.infra.aws.common.AWSConfigs;
@@ -53,13 +50,12 @@ import static org.junit.jupiter.api.Assertions.fail;
 
 @TestInstance(TestInstance.Lifecycle.PER_CLASS)
 @EnabledIfSystemProperty(named = "enable.slow.tests", matches = "true")
-public class CamelSinkAWSSQSITCase extends AbstractKafkaTest {
+public class CamelSinkAWSSQSITCase extends CamelSinkAWSTestSupport {
 
     @RegisterExtension
     public static AWSService awsService = AWSServiceFactory.createSQSService();
     private static final Logger LOG = 
LoggerFactory.getLogger(CamelSinkAWSSQSITCase.class);
 
-
     private AWSSQSClient awssqsClient;
     private String queueName;
 
@@ -91,6 +87,16 @@ public class CamelSinkAWSSQSITCase extends AbstractKafkaTest 
{
         }
     }
 
+    @Override
+    protected void verifyMessages(CountDownLatch latch) throws 
InterruptedException {
+        if (latch.await(110, TimeUnit.SECONDS)) {
+            assertEquals(expect, received, "Didn't process the expected amount 
of messages: " + received + " != " + expect);
+        } else {
+            fail(String.format("Failed to receive the messages within the 
specified time: received %d of %d",
+                    received, expect));
+        }
+    }
+
     private boolean checkMessages(List<Message> messages) {
         for (Message message : messages) {
             LOG.info("Received: {}", message.body());
@@ -106,7 +112,7 @@ public class CamelSinkAWSSQSITCase extends 
AbstractKafkaTest {
     }
 
 
-    private void consumeMessages(CountDownLatch latch) {
+    protected void consumeMessages(CountDownLatch latch) {
         try {
             awssqsClient.receive(queueName, this::checkMessages);
         } catch (Throwable t) {
@@ -116,42 +122,6 @@ public class CamelSinkAWSSQSITCase extends 
AbstractKafkaTest {
         }
     }
 
-    private void produceMessages()  {
-        try {
-            KafkaClient<String, String> kafkaClient = new 
KafkaClient<>(getKafkaService().getBootstrapServers());
-
-            for (int i = 0; i < expect; i++) {
-                
kafkaClient.produce(TestUtils.getDefaultTestTopic(this.getClass()), "Sink test 
message " + i);
-            }
-        } catch (Throwable t) {
-            LOG.error("Unable to publish messages to the broker: {}", 
t.getMessage(), t);
-            fail(String.format("Unable to publish messages to the broker: %s", 
t.getMessage()));
-        }
-    }
-
-    public void runTest(ConnectorPropertyFactory connectorPropertyFactory) 
throws Exception {
-        connectorPropertyFactory.log();
-        
getKafkaConnectService().initializeConnectorBlocking(connectorPropertyFactory, 
1);
-
-        LOG.debug("Creating the consumer ...");
-        ExecutorService service = Executors.newCachedThreadPool();
-
-        CountDownLatch latch = new CountDownLatch(1);
-        service.submit(() -> consumeMessages(latch));
-
-        LOG.debug("Creating the producer and sending messages ...");
-        produceMessages();
-
-        LOG.debug("Waiting for the test to complete");
-        if (latch.await(110, TimeUnit.SECONDS)) {
-            assertEquals(expect, received, "Didn't process the expected amount 
of messages: " + received + " != " + expect);
-        } else {
-            fail(String.format("Failed to receive the messages within the 
specified time: received %d of %d",
-                    received, expect));
-        }
-    }
-
-
     @Test
     @Timeout(value = 120)
     public void testBasicSendReceive() {
@@ -165,7 +135,7 @@ public class CamelSinkAWSSQSITCase extends 
AbstractKafkaTest {
                     .withAmazonConfig(amazonProperties)
                     .withQueueNameOrArn(queueName);
 
-            runTest(testProperties);
+            runTest(testProperties, expect);
         } catch (Exception e) {
             LOG.error("Amazon SQS test failed: {}", e.getMessage(), e);
             fail(e.getMessage());
@@ -186,7 +156,7 @@ public class CamelSinkAWSSQSITCase extends 
AbstractKafkaTest {
                     .withAmazonConfig(amazonProperties, 
CamelAWSSQSPropertyFactory.KAFKA_STYLE)
                     .withQueueNameOrArn(queueName);
 
-            runTest(testProperties);
+            runTest(testProperties, expect);
 
         } catch (Exception e) {
             LOG.error("Amazon SQS test failed: {}", e.getMessage(), e);
@@ -214,7 +184,7 @@ public class CamelSinkAWSSQSITCase extends 
AbstractKafkaTest {
                         .append("amazonAWSHost", 
amazonProperties.getProperty(AWSConfigs.AMAZON_AWS_HOST))
                         .buildUrl();
 
-            runTest(testProperties);
+            runTest(testProperties, expect);
 
         } catch (Exception e) {
             LOG.error("Amazon SQS test failed: {}", e.getMessage(), e);

Reply via email to