This is an automated email from the ASF dual-hosted git repository. orpiske pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
commit 11ca8b0f84da9047cdca3dfd0131fceff3ea75c9 Author: Otavio Rodolfo Piske <opi...@redhat.com> AuthorDate: Fri Jan 22 10:08:47 2021 +0100 Created a base test class for AWS sink tests --- .../aws/v2/common/CamelSinkAWSTestSupport.java | 70 ++++++++++++++++++++++ .../aws/v2/sqs/sink/CamelSinkAWSSQSITCase.java | 62 +++++-------------- 2 files changed, 86 insertions(+), 46 deletions(-) diff --git a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/common/CamelSinkAWSTestSupport.java b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/common/CamelSinkAWSTestSupport.java new file mode 100644 index 0000000..c42cb36 --- /dev/null +++ b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/common/CamelSinkAWSTestSupport.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.kafkaconnector.aws.v2.common; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +import org.apache.camel.kafkaconnector.common.AbstractKafkaTest; +import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory; +import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient; +import org.apache.camel.kafkaconnector.common.utils.TestUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.junit.jupiter.api.Assertions.fail; + +public abstract class CamelSinkAWSTestSupport extends AbstractKafkaTest { + private static final Logger LOG = LoggerFactory.getLogger(CamelSinkAWSTestSupport.class); + + + protected void produceMessages(int count) { + try { + KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers()); + + for (int i = 0; i < count; i++) { + kafkaClient.produce(TestUtils.getDefaultTestTopic(this.getClass()), "Sink test message " + i); + } + } catch (Throwable t) { + LOG.error("Unable to publish messages to the broker: {}", t.getMessage(), t); + fail(String.format("Unable to publish messages to the broker: %s", t.getMessage())); + } + } + + public void runTest(ConnectorPropertyFactory connectorPropertyFactory, int count) throws Exception { + connectorPropertyFactory.log(); + getKafkaConnectService().initializeConnectorBlocking(connectorPropertyFactory, 1); + + LOG.debug("Creating the consumer ..."); + ExecutorService service = Executors.newCachedThreadPool(); + + CountDownLatch latch = new CountDownLatch(1); + service.submit(() -> consumeMessages(latch)); + + LOG.debug("Creating the producer and sending messages ..."); + produceMessages(count); + + LOG.debug("Waiting for the test to complete"); + verifyMessages(latch); + } + + protected abstract void consumeMessages(CountDownLatch latch); + + protected abstract void verifyMessages(CountDownLatch latch) throws InterruptedException; +} diff --git a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/sqs/sink/CamelSinkAWSSQSITCase.java b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/sqs/sink/CamelSinkAWSSQSITCase.java index 0f770bc..6cc9b79 100644 --- a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/sqs/sink/CamelSinkAWSSQSITCase.java +++ b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/sqs/sink/CamelSinkAWSSQSITCase.java @@ -20,14 +20,11 @@ package org.apache.camel.kafkaconnector.aws.v2.sqs.sink; import java.util.List; import java.util.Properties; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import org.apache.camel.kafkaconnector.aws.v2.clients.AWSSQSClient; -import org.apache.camel.kafkaconnector.common.AbstractKafkaTest; +import org.apache.camel.kafkaconnector.aws.v2.common.CamelSinkAWSTestSupport; import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory; -import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient; import org.apache.camel.kafkaconnector.common.utils.TestUtils; import org.apache.camel.test.infra.aws.common.AWSCommon; import org.apache.camel.test.infra.aws.common.AWSConfigs; @@ -53,13 +50,12 @@ import static org.junit.jupiter.api.Assertions.fail; @TestInstance(TestInstance.Lifecycle.PER_CLASS) @EnabledIfSystemProperty(named = "enable.slow.tests", matches = "true") -public class CamelSinkAWSSQSITCase extends AbstractKafkaTest { +public class CamelSinkAWSSQSITCase extends CamelSinkAWSTestSupport { @RegisterExtension public static AWSService awsService = AWSServiceFactory.createSQSService(); private static final Logger LOG = LoggerFactory.getLogger(CamelSinkAWSSQSITCase.class); - private AWSSQSClient awssqsClient; private String queueName; @@ -91,6 +87,16 @@ public class CamelSinkAWSSQSITCase extends AbstractKafkaTest { } } + @Override + protected void verifyMessages(CountDownLatch latch) throws InterruptedException { + if (latch.await(110, TimeUnit.SECONDS)) { + assertEquals(expect, received, "Didn't process the expected amount of messages: " + received + " != " + expect); + } else { + fail(String.format("Failed to receive the messages within the specified time: received %d of %d", + received, expect)); + } + } + private boolean checkMessages(List<Message> messages) { for (Message message : messages) { LOG.info("Received: {}", message.body()); @@ -106,7 +112,7 @@ public class CamelSinkAWSSQSITCase extends AbstractKafkaTest { } - private void consumeMessages(CountDownLatch latch) { + protected void consumeMessages(CountDownLatch latch) { try { awssqsClient.receive(queueName, this::checkMessages); } catch (Throwable t) { @@ -116,42 +122,6 @@ public class CamelSinkAWSSQSITCase extends AbstractKafkaTest { } } - private void produceMessages() { - try { - KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers()); - - for (int i = 0; i < expect; i++) { - kafkaClient.produce(TestUtils.getDefaultTestTopic(this.getClass()), "Sink test message " + i); - } - } catch (Throwable t) { - LOG.error("Unable to publish messages to the broker: {}", t.getMessage(), t); - fail(String.format("Unable to publish messages to the broker: %s", t.getMessage())); - } - } - - public void runTest(ConnectorPropertyFactory connectorPropertyFactory) throws Exception { - connectorPropertyFactory.log(); - getKafkaConnectService().initializeConnectorBlocking(connectorPropertyFactory, 1); - - LOG.debug("Creating the consumer ..."); - ExecutorService service = Executors.newCachedThreadPool(); - - CountDownLatch latch = new CountDownLatch(1); - service.submit(() -> consumeMessages(latch)); - - LOG.debug("Creating the producer and sending messages ..."); - produceMessages(); - - LOG.debug("Waiting for the test to complete"); - if (latch.await(110, TimeUnit.SECONDS)) { - assertEquals(expect, received, "Didn't process the expected amount of messages: " + received + " != " + expect); - } else { - fail(String.format("Failed to receive the messages within the specified time: received %d of %d", - received, expect)); - } - } - - @Test @Timeout(value = 120) public void testBasicSendReceive() { @@ -165,7 +135,7 @@ public class CamelSinkAWSSQSITCase extends AbstractKafkaTest { .withAmazonConfig(amazonProperties) .withQueueNameOrArn(queueName); - runTest(testProperties); + runTest(testProperties, expect); } catch (Exception e) { LOG.error("Amazon SQS test failed: {}", e.getMessage(), e); fail(e.getMessage()); @@ -186,7 +156,7 @@ public class CamelSinkAWSSQSITCase extends AbstractKafkaTest { .withAmazonConfig(amazonProperties, CamelAWSSQSPropertyFactory.KAFKA_STYLE) .withQueueNameOrArn(queueName); - runTest(testProperties); + runTest(testProperties, expect); } catch (Exception e) { LOG.error("Amazon SQS test failed: {}", e.getMessage(), e); @@ -214,7 +184,7 @@ public class CamelSinkAWSSQSITCase extends AbstractKafkaTest { .append("amazonAWSHost", amazonProperties.getProperty(AWSConfigs.AMAZON_AWS_HOST)) .buildUrl(); - runTest(testProperties); + runTest(testProperties, expect); } catch (Exception e) { LOG.error("Amazon SQS test failed: {}", e.getMessage(), e);