This is an automated email from the ASF dual-hosted git repository. orpiske pushed a commit to branch camel-master in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
The following commit(s) were added to refs/heads/camel-master by this push: new 28db5ba Add integration tests for AWS v2 lambda 28db5ba is described below commit 28db5ba3f9b3b143af97298740de620c6b5cc381 Author: Otavio Rodolfo Piske <opi...@redhat.com> AuthorDate: Thu Feb 25 11:31:36 2021 +0100 Add integration tests for AWS v2 lambda --- tests/itests-aws-v2/pom.xml | 5 + .../lambda/sink/CamelAWSLambdaPropertyFactory.java | 79 +++++++++ .../aws/v2/lambda/sink/CamelSinkLambdaITCase.java | 187 +++++++++++++++++++++ .../v2/lambda/sink/TestLambda2Configuration.java | 35 ++++ .../clients/kafka/ByteProducerPropertyFactory.java | 52 ++++++ .../common/clients/kafka/KafkaClient.java | 14 +- .../common/test/AbstractTestMessageProducer.java | 6 +- 7 files changed, 375 insertions(+), 3 deletions(-) diff --git a/tests/itests-aws-v2/pom.xml b/tests/itests-aws-v2/pom.xml index 31b8a89..185a1c1 100644 --- a/tests/itests-aws-v2/pom.xml +++ b/tests/itests-aws-v2/pom.xml @@ -92,6 +92,11 @@ <groupId>org.apache.camel</groupId> <artifactId>camel-aws2-sns</artifactId> </dependency> + + <dependency> + <groupId>org.apache.camel</groupId> + <artifactId>camel-aws2-lambda</artifactId> + </dependency> </dependencies> <build> diff --git a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/lambda/sink/CamelAWSLambdaPropertyFactory.java b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/lambda/sink/CamelAWSLambdaPropertyFactory.java new file mode 100644 index 0000000..e8a1433 --- /dev/null +++ b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/lambda/sink/CamelAWSLambdaPropertyFactory.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.kafkaconnector.aws.v2.lambda.sink; + +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; + +import org.apache.camel.kafkaconnector.aws.v2.common.AWSPropertiesUtils; +import org.apache.camel.kafkaconnector.common.SinkConnectorPropertyFactory; +import org.apache.camel.test.infra.aws.common.AWSConfigs; + +public class CamelAWSLambdaPropertyFactory extends SinkConnectorPropertyFactory<CamelAWSLambdaPropertyFactory> { + public static final Map<String, String> SPRING_STYLE = new HashMap<>(); + public static final Map<String, String> KAFKA_STYLE = new HashMap<>(); + + static { + SPRING_STYLE.put(AWSConfigs.ACCESS_KEY, "camel.component.aws2-lambda.accessKey"); + SPRING_STYLE.put(AWSConfigs.SECRET_KEY, "camel.component.aws2-lambda.secretKey"); + SPRING_STYLE.put(AWSConfigs.REGION, "camel.component.aws2-lambda.region"); + + KAFKA_STYLE.put(AWSConfigs.ACCESS_KEY, "camel.component.aws2-lambda.access-key"); + KAFKA_STYLE.put(AWSConfigs.SECRET_KEY, "camel.component.aws2-lambda.secret-key"); + KAFKA_STYLE.put(AWSConfigs.REGION, "camel.component.aws2-lambda.region"); + } + + + public CamelAWSLambdaPropertyFactory withAmazonConfig(Properties amazonConfigs) { + return withAmazonConfig(amazonConfigs, this.SPRING_STYLE); + } + + public CamelAWSLambdaPropertyFactory withAmazonConfig(Properties amazonConfigs, Map<String, String> style) { + AWSPropertiesUtils.setCommonProperties(amazonConfigs, style, this); + + return this; + } + + public CamelAWSLambdaPropertyFactory withSinkPathFunction(String value) { + return setProperty("camel.sink.path.function", value); + } + + public CamelAWSLambdaPropertyFactory withSinkEndpointOperation(String value) { + return setProperty("camel.sink.endpoint.operation", value); + } + + public CamelAWSLambdaPropertyFactory withConfiguration(String value) { + return setProperty("camel.component.aws2-lambda.configuration", classRef(value)); + } + + public CamelAWSLambdaPropertyFactory withPojoRequest(boolean value) { + return setProperty("camel.sink.endpoint.pojoRequest", value); + } + + public static CamelAWSLambdaPropertyFactory basic() { + return new CamelAWSLambdaPropertyFactory() + .withTasksMax(1) + .withName("CamelAws2lambdaSinkConnector") + .withConnectorClass("org.apache.camel.kafkaconnector.aws2lambda.CamelAws2lambdaSinkConnector") + .withKeyConverterClass("org.apache.kafka.connect.storage.StringConverter") + .withValueConverterClass("org.apache.kafka.connect.storage.StringConverter"); + + } + +} diff --git a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/lambda/sink/CamelSinkLambdaITCase.java b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/lambda/sink/CamelSinkLambdaITCase.java new file mode 100644 index 0000000..e9911ac --- /dev/null +++ b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/lambda/sink/CamelSinkLambdaITCase.java @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.kafkaconnector.aws.v2.lambda.sink; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.zip.ZipEntry; +import java.util.zip.ZipOutputStream; + +import org.apache.camel.kafkaconnector.CamelSinkTask; +import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory; +import org.apache.camel.kafkaconnector.common.clients.kafka.ByteProducerPropertyFactory; +import org.apache.camel.kafkaconnector.common.clients.kafka.ConsumerPropertyFactory; +import org.apache.camel.kafkaconnector.common.clients.kafka.DefaultConsumerPropertyFactory; +import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient; +import org.apache.camel.kafkaconnector.common.clients.kafka.ProducerPropertyFactory; +import org.apache.camel.kafkaconnector.common.test.AbstractTestMessageProducer; +import org.apache.camel.kafkaconnector.common.test.CamelSinkTestSupport; +import org.apache.camel.kafkaconnector.common.utils.TestUtils; +import org.apache.camel.test.infra.aws.common.services.AWSService; +import org.apache.camel.test.infra.aws2.clients.AWSSDKClientUtils; +import org.apache.camel.test.infra.aws2.services.AWSServiceFactory; +import org.apache.kafka.common.utils.Bytes; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; +import org.junit.jupiter.api.condition.EnabledIfSystemProperty; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import software.amazon.awssdk.services.lambda.LambdaClient; +import software.amazon.awssdk.services.lambda.model.FunctionConfiguration; +import software.amazon.awssdk.services.lambda.model.ListFunctionsResponse; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.fail; + +@EnabledIfSystemProperty(named = "enable.slow.tests", matches = "true") +public class CamelSinkLambdaITCase extends CamelSinkTestSupport { + @RegisterExtension + public static AWSService awsService = AWSServiceFactory.createLambdaService(); + private static final Logger LOG = LoggerFactory.getLogger(CamelSinkLambdaITCase.class); + + private LambdaClient client; + private String function; + + private volatile int received; + private final int expect = 1; + + + private static class CustomProducer extends AbstractTestMessageProducer<Bytes> { + public CustomProducer(String bootstrapServer, String topicName, int count) { + super(bootstrapServer, topicName, count); + } + + @Override + protected KafkaClient<String, Bytes> createKafkaClient(String bootstrapServer) { + ConsumerPropertyFactory consumerPropertyFactory = new DefaultConsumerPropertyFactory(bootstrapServer); + ProducerPropertyFactory producerPropertyFactory = new ByteProducerPropertyFactory(bootstrapServer); + + return new KafkaClient<>(consumerPropertyFactory, producerPropertyFactory); + } + + @Override + public Bytes testMessageContent(int current) { + + try (ByteArrayOutputStream out = new ByteArrayOutputStream()) { + ZipOutputStream zip = new ZipOutputStream(out); + + ZipEntry entry = new ZipEntry("test"); + zip.putNextEntry(entry); + zip.write("hello test".getBytes()); + zip.closeEntry(); + zip.finish(); + + return Bytes.wrap(out.toByteArray()); + } catch (IOException e) { + LOG.error("I/O error writing zip entry: {}", e.getMessage(), e); + fail("I/O error writing zip entry"); + } + + return null; + } + + @Override + public Map<String, String> messageHeaders(Bytes text, int current) { + Map<String, String> headers = new HashMap<>(); + + headers.put(CamelSinkTask.HEADER_CAMEL_PREFIX + "CamelAwsLambdaOperation", + "createFunction"); + + headers.put(CamelSinkTask.HEADER_CAMEL_PREFIX + "CamelAwsLambdaRole", + "admin"); + headers.put(CamelSinkTask.HEADER_CAMEL_PREFIX + "CamelAwsLambdaRuntime", + "java8"); + headers.put(CamelSinkTask.HEADER_CAMEL_PREFIX + "CamelAwsLambdaHandler", + "org.apache.camel.kafkaconnector.SomeHandler"); + + return headers; + } + } + + @Override + protected String[] getConnectorsInTest() { + return new String[] {"camel-aws2-lambda-kafka-connector"}; + } + + @BeforeEach + public void setUp() { + client = AWSSDKClientUtils.newLambdaClient(); + + function = "function-" + TestUtils.randomWithRange(0, 100); + LOG.debug("Using function {} for the test", function); + } + + @Override + protected void consumeMessages(CountDownLatch latch) { + try { + while (true) { + ListFunctionsResponse response = client.listFunctions(); + + for (FunctionConfiguration functionConfiguration : response.functions()) { + LOG.info("Retrieved function {}", functionConfiguration.functionName()); + + if (functionConfiguration.functionName().equals(function)) { + received = 1; + return; + } + } + + if (!waitForData()) { + break; + } + } + } finally { + latch.countDown(); + } + } + + @Override + protected void verifyMessages(CountDownLatch latch) throws InterruptedException { + if (latch.await(110, TimeUnit.SECONDS)) { + assertEquals(expect, received, "Didn't process the expected amount of messages: " + received + " != " + expect); + } else { + fail(String.format("Failed to receive the messages within the specified time: received %d of %d", + received, expect)); + } + } + + @Test + @Timeout(90) + public void testBasicSendReceive() throws Exception { + Properties amazonProperties = awsService.getConnectionProperties(); + String topicName = TestUtils.getDefaultTestTopic(this.getClass()); + + ConnectorPropertyFactory testProperties = CamelAWSLambdaPropertyFactory + .basic() + .withTopics(topicName) + .withConfiguration(TestLambda2Configuration.class.getName()) + .withAmazonConfig(amazonProperties) + .withSinkPathFunction(function) + .withSinkEndpointOperation("createFunction"); + + runTest(testProperties, new CustomProducer(getKafkaService().getBootstrapServers(), topicName, expect)); + } + +} diff --git a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/lambda/sink/TestLambda2Configuration.java b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/lambda/sink/TestLambda2Configuration.java new file mode 100644 index 0000000..cb11efd --- /dev/null +++ b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/lambda/sink/TestLambda2Configuration.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.kafkaconnector.aws.v2.lambda.sink; + +import org.apache.camel.component.aws2.lambda.Lambda2Configuration; +import org.apache.camel.test.infra.aws2.clients.AWSSDKClientUtils; +import software.amazon.awssdk.services.lambda.LambdaClient; + +public class TestLambda2Configuration extends Lambda2Configuration { + private LambdaClient client; + + @Override + public LambdaClient getAwsLambdaClient() { + if (client == null) { + client = AWSSDKClientUtils.newLambdaClient(); + } + + return client; + } +} diff --git a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/clients/kafka/ByteProducerPropertyFactory.java b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/clients/kafka/ByteProducerPropertyFactory.java new file mode 100644 index 0000000..2d48d13 --- /dev/null +++ b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/clients/kafka/ByteProducerPropertyFactory.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.kafkaconnector.common.clients.kafka; + +import java.util.Properties; +import java.util.UUID; + +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.BytesSerializer; +import org.apache.kafka.common.serialization.StringSerializer; + +public class ByteProducerPropertyFactory implements ProducerPropertyFactory { + private final String bootstrapServer; + + /** + * Constructs the properties using the given bootstrap server + * @param bootstrapServer the address of the server in the format + * PLAINTEXT://${address}:${port} + */ + public ByteProducerPropertyFactory(String bootstrapServer) { + this.bootstrapServer = bootstrapServer; + } + + @Override + public Properties getProperties() { + Properties props = new Properties(); + + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer); + props.put(ProducerConfig.CLIENT_ID_CONFIG, UUID.randomUUID().toString()); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, + StringSerializer.class.getName()); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, + BytesSerializer.class.getName()); + + return props; + } +} diff --git a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/clients/kafka/KafkaClient.java b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/clients/kafka/KafkaClient.java index 4e33566..c3cd953 100644 --- a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/clients/kafka/KafkaClient.java +++ b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/clients/kafka/KafkaClient.java @@ -74,8 +74,18 @@ public class KafkaClient<K, V> { * PLAINTEXT://${address}:${port} */ public KafkaClient(String bootstrapServer) { - consumerPropertyFactory = new DefaultConsumerPropertyFactory(bootstrapServer); - producerPropertyFactory = new DefaultProducerPropertyFactory(bootstrapServer); + this(new DefaultConsumerPropertyFactory(bootstrapServer), new DefaultProducerPropertyFactory(bootstrapServer)); + } + + /** + * Constructs the properties using the given bootstrap server + * + * @param consumerPropertyFactory a property factory for Kafka client consumers + * @param producerPropertyFactory a property factory for Kafka client producers + */ + public KafkaClient(ConsumerPropertyFactory consumerPropertyFactory, ProducerPropertyFactory producerPropertyFactory) { + this.consumerPropertyFactory = consumerPropertyFactory; + this.producerPropertyFactory = producerPropertyFactory; producer = new KafkaProducer<>(producerPropertyFactory.getProperties()); consumer = new KafkaConsumer<>(consumerPropertyFactory.getProperties()); diff --git a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/AbstractTestMessageProducer.java b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/AbstractTestMessageProducer.java index 28d3d0d..477fafa 100644 --- a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/AbstractTestMessageProducer.java +++ b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/test/AbstractTestMessageProducer.java @@ -38,11 +38,15 @@ public abstract class AbstractTestMessageProducer<T> implements TestMessageProdu } public AbstractTestMessageProducer(String bootstrapServer, String topicName, int count) { - this.kafkaClient = new KafkaClient<>(bootstrapServer); + this.kafkaClient = createKafkaClient(bootstrapServer); this.topicName = topicName; this.count = count; } + protected KafkaClient<String, T> createKafkaClient(String bootstrapServer) { + return new KafkaClient<>(bootstrapServer); + } + public void produceMessages() throws ExecutionException, InterruptedException { LOG.trace("Producing messages ..."); for (int i = 0; i < count; i++) {