This is an automated email from the ASF dual-hosted git repository. valdar pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
commit deedb4c97061ddee4c192a8f6e2165d9a9e7a131 Author: Andrea Tarocchi <andrea.taroc...@gmail.com> AuthorDate: Mon Sep 12 08:16:37 2022 +0200 Reneabled tests. --- connectors/camel-syslog-kafka-connector/pom.xml | 36 ++++++++ .../CamelStreamCacheConverterLoader.java | 52 ++++++++++++ .../services/org/apache/camel/TypeConverterLoader | 2 + .../converters/CamelStreamCacheConverter.java | 25 ++++-- .../source/CamelSourceAWSKinesisITCase.java | 2 - .../aws/v2/sns/sink/CamelSinkAWSSNSITCase.java | 1 - .../v2/sqs/sink/CamelAWSSQSPropertyFactory.java | 4 +- .../aws/v2/sqs/sink/CamelSinkAWSSQSITCase.java | 2 - .../v2/sqs/source/CamelAWSSQSPropertyFactory.java | 4 +- .../aws/v2/sqs/source/CamelSourceAWSSQSITCase.java | 2 - tests/itests-cassandra/pom.xml | 8 ++ .../sink/CamelCassandraPropertyFactory.java | 32 +++----- .../cassandra/sink/CamelSinkCassandraITCase.java | 28 +++---- .../source/CamelCassandraPropertyFactory.java | 28 +++---- .../source/CamelSourceCassandraITCase.java | 8 +- tests/itests-couchbase/pom.xml | 2 +- .../sink/CamelCouchbasePropertyFactory.java | 34 +++----- .../couchbase/sink/CamelSinkCouchbaseITCase.java | 27 +----- .../sink/CamelSinkElasticSearchITCase.java | 5 -- .../common/CkcMongoDBLocalContainerService.java | 95 ++++++++++++++++++++++ .../common/MongoDBEnvVarServiceFactory.java | 5 +- .../common/MongoDBLocalContainerEnvVarService.java | 10 ++- .../mongodb/sink/CamelMongoDBPropertyFactory.java | 4 +- .../mongodb/sink/CamelSinkMongoDBITCase.java | 22 ++--- .../source/CamelMongoDBPropertyFactory.java | 4 +- .../mongodb/source/CamelSourceMongoDBITCase.java | 11 +-- tests/itests-parent/pom.xml | 2 +- .../source/CamelRabbitMQPropertyFactory.java | 4 + .../rabbitmq/source/RabbitMQSourceITCase.java | 37 +++++++-- .../sql/source/CamelSourceSQLITCase.java | 9 +- .../sql/source/CamelSqlPropertyFactory.java | 2 +- .../ssh/sink/CamelSinkSshITCase.java | 4 - .../ssh/sink/CamelSshPropertyFactory.java | 2 +- .../ssh/source/CamelSourceSshITCase.java | 3 - .../syslog/sink/CamelSinkSyslogITCase.java | 2 - 35 files changed, 343 insertions(+), 175 deletions(-) diff --git a/connectors/camel-syslog-kafka-connector/pom.xml b/connectors/camel-syslog-kafka-connector/pom.xml index 46e0c7f78..c9967330d 100644 --- a/connectors/camel-syslog-kafka-connector/pom.xml +++ b/connectors/camel-syslog-kafka-connector/pom.xml @@ -89,6 +89,42 @@ <target>1.8</target> </configuration> </plugin> + <plugin> + <groupId>org.apache.camel</groupId> + <artifactId>camel-component-maven-plugin</artifactId> + <executions> + <execution> + <id>generate</id> + <goals> + <goal>generate</goal> + </goals> + <phase>process-classes</phase> + </execution> + </executions> + </plugin> + <plugin> + <groupId>org.codehaus.mojo</groupId> + <artifactId>build-helper-maven-plugin</artifactId> + <executions> + <execution> + <phase>initialize</phase> + <goals> + <goal>add-source</goal> + <goal>add-resource</goal> + </goals> + <configuration> + <sources> + <source>src/generated/java</source> + </sources> + <resources> + <resource> + <directory>src/generated/resources</directory> + </resource> + </resources> + </configuration> + </execution> + </executions> + </plugin> <plugin> <artifactId>maven-assembly-plugin</artifactId> <version>2.5.3</version> diff --git a/connectors/camel-syslog-kafka-connector/src/generated/java/org/apache/camel/kafkaconnector/syslog/converters/CamelStreamCacheConverterLoader.java b/connectors/camel-syslog-kafka-connector/src/generated/java/org/apache/camel/kafkaconnector/syslog/converters/CamelStreamCacheConverterLoader.java new file mode 100644 index 000000000..2fdcd612e --- /dev/null +++ b/connectors/camel-syslog-kafka-connector/src/generated/java/org/apache/camel/kafkaconnector/syslog/converters/CamelStreamCacheConverterLoader.java @@ -0,0 +1,52 @@ +/* Generated by camel build tools - do NOT edit this file! */ +package org.apache.camel.kafkaconnector.syslog.converters; + +import org.apache.camel.CamelContext; +import org.apache.camel.CamelContextAware; +import org.apache.camel.DeferredContextBinding; +import org.apache.camel.Exchange; +import org.apache.camel.TypeConversionException; +import org.apache.camel.TypeConverterLoaderException; +import org.apache.camel.spi.TypeConverterLoader; +import org.apache.camel.spi.TypeConverterRegistry; +import org.apache.camel.support.SimpleTypeConverter; +import org.apache.camel.support.TypeConverterSupport; +import org.apache.camel.util.DoubleMap; + +/** + * Generated by camel build tools - do NOT edit this file! + */ +@SuppressWarnings("unchecked") +@DeferredContextBinding +public final class CamelStreamCacheConverterLoader implements TypeConverterLoader, CamelContextAware { + + private CamelContext camelContext; + + public CamelStreamCacheConverterLoader() { + } + + @Override + public void setCamelContext(CamelContext camelContext) { + this.camelContext = camelContext; + } + + @Override + public CamelContext getCamelContext() { + return camelContext; + } + + @Override + public void load(TypeConverterRegistry registry) throws TypeConverterLoaderException { + registerConverters(registry); + } + + private void registerConverters(TypeConverterRegistry registry) { + addTypeConverter(registry, io.netty.buffer.ByteBuf.class, org.apache.camel.StreamCache.class, false, + (type, exchange, value) -> org.apache.camel.kafkaconnector.syslog.converters.CamelStreamCacheConverter.toByteBuf((org.apache.camel.StreamCache) value)); + } + + private static void addTypeConverter(TypeConverterRegistry registry, Class<?> toType, Class<?> fromType, boolean allowNull, SimpleTypeConverter.ConversionMethod method) { + registry.addTypeConverter(toType, fromType, new SimpleTypeConverter(allowNull, method)); + } + +} diff --git a/connectors/camel-syslog-kafka-connector/src/generated/resources/META-INF/services/org/apache/camel/TypeConverterLoader b/connectors/camel-syslog-kafka-connector/src/generated/resources/META-INF/services/org/apache/camel/TypeConverterLoader new file mode 100644 index 000000000..541d7123f --- /dev/null +++ b/connectors/camel-syslog-kafka-connector/src/generated/resources/META-INF/services/org/apache/camel/TypeConverterLoader @@ -0,0 +1,2 @@ +# Generated by camel build tools - do NOT edit this file! +org.apache.camel.kafkaconnector.syslog.converters.CamelStreamCacheConverterLoader diff --git a/tests/itests-mongodb/src/test/java/org/apache/camel/kafkaconnector/mongodb/common/MongoDBLocalContainerEnvVarService.java b/connectors/camel-syslog-kafka-connector/src/main/java/org/apache/camel/kafkaconnector/syslog/converters/CamelStreamCacheConverter.java similarity index 53% copy from tests/itests-mongodb/src/test/java/org/apache/camel/kafkaconnector/mongodb/common/MongoDBLocalContainerEnvVarService.java copy to connectors/camel-syslog-kafka-connector/src/main/java/org/apache/camel/kafkaconnector/syslog/converters/CamelStreamCacheConverter.java index 65a650d05..bec532cb1 100644 --- a/tests/itests-mongodb/src/test/java/org/apache/camel/kafkaconnector/mongodb/common/MongoDBLocalContainerEnvVarService.java +++ b/connectors/camel-syslog-kafka-connector/src/main/java/org/apache/camel/kafkaconnector/syslog/converters/CamelStreamCacheConverter.java @@ -14,12 +14,27 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.camel.kafkaconnector.mongodb.common; +package org.apache.camel.kafkaconnector.syslog.converters; -import org.apache.camel.test.infra.mongodb.services.MongoDBLocalContainerService; +import java.io.IOException; -public class MongoDBLocalContainerEnvVarService extends MongoDBLocalContainerService { - public void addEnvProperty(String property, String value) { - getContainer().addEnv(property, value); +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; +import io.netty.buffer.ByteBufOutputStream; +import org.apache.camel.Converter; +import org.apache.camel.StreamCache; + +@Converter(generateLoader = true) +public final class CamelStreamCacheConverter { + private CamelStreamCacheConverter() { + + } + + @Converter + public static ByteBuf toByteBuf(StreamCache streamCache) throws IOException { + ByteBufOutputStream buf = new ByteBufOutputStream(ByteBufAllocator.DEFAULT.buffer((int) streamCache.length())); + streamCache.writeTo(buf); + buf.close(); + return buf.buffer(); } } diff --git a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/kinesis/source/CamelSourceAWSKinesisITCase.java b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/kinesis/source/CamelSourceAWSKinesisITCase.java index d1fead05e..8f824e228 100644 --- a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/kinesis/source/CamelSourceAWSKinesisITCase.java +++ b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/kinesis/source/CamelSourceAWSKinesisITCase.java @@ -30,7 +30,6 @@ import org.apache.camel.test.infra.aws2.services.AWSServiceFactory; import org.apache.camel.test.infra.common.TestUtils; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestInstance; import org.junit.jupiter.api.Timeout; @@ -43,7 +42,6 @@ import static org.apache.camel.kafkaconnector.aws.v2.kinesis.common.KinesisUtils import static org.apache.camel.kafkaconnector.aws.v2.kinesis.common.KinesisUtils.putRecords; import static org.junit.jupiter.api.Assertions.assertEquals; -@Disabled("Until https://github.com/apache/camel-kamelets/issues/908 is resolved") @TestInstance(TestInstance.Lifecycle.PER_CLASS) @EnabledIfSystemProperty(named = "enable.slow.tests", matches = "true") public class CamelSourceAWSKinesisITCase extends CamelSourceTestSupport { diff --git a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/sns/sink/CamelSinkAWSSNSITCase.java b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/sns/sink/CamelSinkAWSSNSITCase.java index 3421778d6..2389a47be 100644 --- a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/sns/sink/CamelSinkAWSSNSITCase.java +++ b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/sns/sink/CamelSinkAWSSNSITCase.java @@ -135,7 +135,6 @@ public class CamelSinkAWSSNSITCase extends CamelSinkTestSupport { .withName("CamelAWSSNSSinkConnectorDefault") .withTopics(topicName) .withTopicOrArn(queueName) -// .withSubscribeSNStoSQS(sqsQueueUrl) .withConfiguration(TestSnsConfiguration.class.getName()) .withAutoCreateTopic(true) .withAmazonConfig(amazonProperties); diff --git a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/sqs/sink/CamelAWSSQSPropertyFactory.java b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/sqs/sink/CamelAWSSQSPropertyFactory.java index 01f5758c8..1ff9000c8 100644 --- a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/sqs/sink/CamelAWSSQSPropertyFactory.java +++ b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/sqs/sink/CamelAWSSQSPropertyFactory.java @@ -35,8 +35,8 @@ final class CamelAWSSQSPropertyFactory extends SinkConnectorPropertyFactory<Came SPRING_STYLE.put(AWSConfigs.ACCESS_KEY, "camel.kamelet.aws-sqs-sink.accessKey"); SPRING_STYLE.put(AWSConfigs.SECRET_KEY, "camel.kamelet.aws-sqs-sink.secretKey"); SPRING_STYLE.put(AWSConfigs.REGION, "camel.kamelet.aws-sqs-sink.region"); - SPRING_STYLE.put(AWSConfigs.PROTOCOL, "camel.component.aws2-sqs.protocol"); - SPRING_STYLE.put(AWSConfigs.AMAZON_AWS_HOST, "camel.component.aws2-sqs.amazonAWSHost"); + SPRING_STYLE.put(AWSConfigs.PROTOCOL, "camel.kamelet.aws-sqs-sink.protocol"); + SPRING_STYLE.put(AWSConfigs.AMAZON_AWS_HOST, "camel.kamelet.aws-sqs-sink.amazonAWSHost"); } private CamelAWSSQSPropertyFactory() { diff --git a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/sqs/sink/CamelSinkAWSSQSITCase.java b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/sqs/sink/CamelSinkAWSSQSITCase.java index 8ca59de84..e21b5d24a 100644 --- a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/sqs/sink/CamelSinkAWSSQSITCase.java +++ b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/sqs/sink/CamelSinkAWSSQSITCase.java @@ -32,7 +32,6 @@ import org.apache.camel.test.infra.aws2.services.AWSServiceFactory; import org.apache.camel.test.infra.common.TestUtils; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestInstance; import org.junit.jupiter.api.Timeout; @@ -45,7 +44,6 @@ import software.amazon.awssdk.services.sqs.model.Message; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.fail; -@Disabled @TestInstance(TestInstance.Lifecycle.PER_CLASS) @EnabledIfSystemProperty(named = "enable.slow.tests", matches = "true") public class CamelSinkAWSSQSITCase extends CamelSinkTestSupport { diff --git a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/sqs/source/CamelAWSSQSPropertyFactory.java b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/sqs/source/CamelAWSSQSPropertyFactory.java index d238ebad8..16a8590d9 100644 --- a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/sqs/source/CamelAWSSQSPropertyFactory.java +++ b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/sqs/source/CamelAWSSQSPropertyFactory.java @@ -35,8 +35,8 @@ final class CamelAWSSQSPropertyFactory extends SourceConnectorPropertyFactory<Ca SPRING_STYLE.put(AWSConfigs.ACCESS_KEY, "camel.kamelet.aws-sqs-source.accessKey"); SPRING_STYLE.put(AWSConfigs.SECRET_KEY, "camel.kamelet.aws-sqs-source.secretKey"); SPRING_STYLE.put(AWSConfigs.REGION, "camel.kamelet.aws-sqs-source.region"); - SPRING_STYLE.put(AWSConfigs.PROTOCOL, "camel.component.aws2-sqs.protocol"); - SPRING_STYLE.put(AWSConfigs.AMAZON_AWS_HOST, "camel.component.aws2-sqs.amazonAWSHost"); + SPRING_STYLE.put(AWSConfigs.PROTOCOL, "camel.kamelet.aws-sqs-source.protocol"); + SPRING_STYLE.put(AWSConfigs.AMAZON_AWS_HOST, "camel.kamelet.aws-sqs-source.amazonAWSHost"); } private CamelAWSSQSPropertyFactory() { diff --git a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/sqs/source/CamelSourceAWSSQSITCase.java b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/sqs/source/CamelSourceAWSSQSITCase.java index 283ae2a06..7ee73bd5d 100644 --- a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/sqs/source/CamelSourceAWSSQSITCase.java +++ b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/sqs/source/CamelSourceAWSSQSITCase.java @@ -30,7 +30,6 @@ import org.apache.camel.test.infra.aws2.services.AWSServiceFactory; import org.apache.camel.test.infra.common.TestUtils; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestInstance; import org.junit.jupiter.api.Timeout; @@ -42,7 +41,6 @@ import org.slf4j.LoggerFactory; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.fail; -@Disabled @TestInstance(TestInstance.Lifecycle.PER_CLASS) @EnabledIfSystemProperty(named = "enable.slow.tests", matches = "true") public class CamelSourceAWSSQSITCase extends CamelSourceTestSupport { diff --git a/tests/itests-cassandra/pom.xml b/tests/itests-cassandra/pom.xml index 2dbccd9e5..08d8f4c8f 100644 --- a/tests/itests-cassandra/pom.xml +++ b/tests/itests-cassandra/pom.xml @@ -58,6 +58,14 @@ <groupId>org.apache.camel</groupId> <artifactId>camel-cassandraql</artifactId> </dependency> + <dependency> + <groupId>org.apache.camel</groupId> + <artifactId>camel-bean</artifactId> + </dependency> + <dependency> + <groupId>org.apache.camel</groupId> + <artifactId>camel-jackson</artifactId> + </dependency> <dependency> <groupId>org.apache.camel</groupId> diff --git a/tests/itests-cassandra/src/test/java/org/apache/camel/kafkaconnector/cassandra/sink/CamelCassandraPropertyFactory.java b/tests/itests-cassandra/src/test/java/org/apache/camel/kafkaconnector/cassandra/sink/CamelCassandraPropertyFactory.java index 6f8e598ca..460a7071b 100644 --- a/tests/itests-cassandra/src/test/java/org/apache/camel/kafkaconnector/cassandra/sink/CamelCassandraPropertyFactory.java +++ b/tests/itests-cassandra/src/test/java/org/apache/camel/kafkaconnector/cassandra/sink/CamelCassandraPropertyFactory.java @@ -17,7 +17,6 @@ package org.apache.camel.kafkaconnector.cassandra.sink; -import org.apache.camel.kafkaconnector.common.EndpointUrlBuilder; import org.apache.camel.kafkaconnector.common.SinkConnectorPropertyFactory; final class CamelCassandraPropertyFactory extends SinkConnectorPropertyFactory<CamelCassandraPropertyFactory> { @@ -26,17 +25,15 @@ final class CamelCassandraPropertyFactory extends SinkConnectorPropertyFactory<C } public CamelCassandraPropertyFactory withKeySpace(String keySpace) { - return setProperty("camel.sink.path.keyspace", keySpace); + return setProperty("camel.kamelet.cassandra-sink.keyspace", keySpace); } - public CamelCassandraPropertyFactory withCql(String cql) { - // RAW is required as the endpoint URI builder encodes the URI - // TODO: remove once https://issues.apache.org/jira/browse/CAMEL-15722 get fixed - return setProperty("camel.sink.endpoint.cql", "RAW(" + cql + ")"); + public CamelCassandraPropertyFactory withQuery(String query) { + return setProperty("camel.kamelet.cassandra-sink.query", query); } public CamelCassandraPropertyFactory withHosts(String hosts) { - return setProperty("camel.sink.path.hosts", hosts); + return setProperty("camel.kamelet.cassandra-sink.connectionHost", hosts); } public CamelCassandraPropertyFactory withPort(int port) { @@ -44,26 +41,17 @@ final class CamelCassandraPropertyFactory extends SinkConnectorPropertyFactory<C } public CamelCassandraPropertyFactory withPort(String port) { - return setProperty("camel.sink.path.port", port); + return setProperty("camel.kamelet.cassandra-sink.connectionPort", port); } - public CamelCassandraPropertyFactory withCluster(String cluster) { - return setProperty("camel.sink.endpoint.cluster", cluster); - } - - public EndpointUrlBuilder<CamelCassandraPropertyFactory> withUrl(String host, String keySpace) { - String queueUrl = String.format("cql://%s/%s", host, keySpace); - - return new EndpointUrlBuilder<>(this::withSinkUrl, queueUrl); - } - - public static CamelCassandraPropertyFactory basic() { return new CamelCassandraPropertyFactory() - .withName("CamelCqlSinkConnector") + .withName("CamelCassandraSinkConnector") .withTasksMax(1) - .withConnectorClass("org.apache.camel.kafkaconnector.cql.CamelCqlSinkConnector") + .withConnectorClass("org.apache.camel.kafkaconnector.cassandrasink.CamelCassandrasinkSinkConnector") .withKeyConverterClass("org.apache.kafka.connect.storage.StringConverter") - .withValueConverterClass("org.apache.kafka.connect.storage.StringConverter"); + .withValueConverterClass("org.apache.kafka.connect.storage.StringConverter") + .setProperty("camel.kamelet.cassandra-sink.prepareStatements", "false") + .setProperty("camel.component.kamelet.location", "kamelets"); } } diff --git a/tests/itests-cassandra/src/test/java/org/apache/camel/kafkaconnector/cassandra/sink/CamelSinkCassandraITCase.java b/tests/itests-cassandra/src/test/java/org/apache/camel/kafkaconnector/cassandra/sink/CamelSinkCassandraITCase.java index df7df1928..a6e2e749d 100644 --- a/tests/itests-cassandra/src/test/java/org/apache/camel/kafkaconnector/cassandra/sink/CamelSinkCassandraITCase.java +++ b/tests/itests-cassandra/src/test/java/org/apache/camel/kafkaconnector/cassandra/sink/CamelSinkCassandraITCase.java @@ -24,13 +24,13 @@ import org.apache.camel.kafkaconnector.cassandra.clients.CassandraClient; import org.apache.camel.kafkaconnector.cassandra.clients.dao.TestDataDao; import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory; import org.apache.camel.kafkaconnector.common.test.CamelSinkTestSupport; +import org.apache.camel.kafkaconnector.common.test.StringMessageProducer; import org.apache.camel.test.infra.cassandra.services.CassandraService; import org.apache.camel.test.infra.cassandra.services.CassandraServiceFactory; import org.apache.camel.test.infra.common.TestUtils; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestInstance; import org.junit.jupiter.api.Timeout; @@ -42,7 +42,6 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.fail; @TestInstance(TestInstance.Lifecycle.PER_CLASS) -@Disabled("TODO: Enable and convert once https://github.com/apache/camel-kamelets/pull/636 is published in kamelet-catalog") public class CamelSinkCassandraITCase extends CamelSinkTestSupport { @RegisterExtension public static CassandraService cassandraService = CassandraServiceFactory.createService(); @@ -58,7 +57,7 @@ public class CamelSinkCassandraITCase extends CamelSinkTestSupport { @Override protected String[] getConnectorsInTest() { - return new String[] {"camel-cql-kafka-connector"}; + return new String[] {"camel-cassandra-sink-kafka-connector"}; } @BeforeAll @@ -127,21 +126,20 @@ public class CamelSinkCassandraITCase extends CamelSinkTestSupport { .withHosts(cassandraService.getCassandraHost()) .withPort(cassandraService.getCQL3Port()) .withKeySpace(TestDataDao.KEY_SPACE) - .withCql(testDataDao.getInsertStatement()); + .withQuery(testDataDao.getInsertStatement()); - runTest(connectorPropertyFactory, topicName, expect); + runTest(connectorPropertyFactory, new CassandraStringMessageProducer(getKafkaService().getBootstrapServers(), topicName, expect)); } - @Timeout(90) - @Test - public void testFetchFromCassandraWithUrl() throws Exception { - ConnectorPropertyFactory connectorPropertyFactory = CamelCassandraPropertyFactory - .basic() - .withTopics(topicName) - .withUrl(cassandraService.getCQL3Endpoint(), TestDataDao.KEY_SPACE) - .append("cql", testDataDao.getInsertStatement()) - .buildUrl(); + private class CassandraStringMessageProducer extends StringMessageProducer { - runTest(connectorPropertyFactory, topicName, expect); + public CassandraStringMessageProducer(String bootStrapServer, String topicName, int count) { + super(bootStrapServer, topicName, count); + } + + @Override + public String testMessageContent(int current) { + return "[{ \"message\": " + current + " }]"; + } } } diff --git a/tests/itests-cassandra/src/test/java/org/apache/camel/kafkaconnector/cassandra/source/CamelCassandraPropertyFactory.java b/tests/itests-cassandra/src/test/java/org/apache/camel/kafkaconnector/cassandra/source/CamelCassandraPropertyFactory.java index 3022a09c6..486c17a9c 100644 --- a/tests/itests-cassandra/src/test/java/org/apache/camel/kafkaconnector/cassandra/source/CamelCassandraPropertyFactory.java +++ b/tests/itests-cassandra/src/test/java/org/apache/camel/kafkaconnector/cassandra/source/CamelCassandraPropertyFactory.java @@ -17,7 +17,7 @@ package org.apache.camel.kafkaconnector.cassandra.source; -import org.apache.camel.kafkaconnector.common.EndpointUrlBuilder; + import org.apache.camel.kafkaconnector.common.SourceConnectorPropertyFactory; final class CamelCassandraPropertyFactory extends SourceConnectorPropertyFactory<CamelCassandraPropertyFactory> { @@ -26,12 +26,12 @@ final class CamelCassandraPropertyFactory extends SourceConnectorPropertyFactory } - public CamelCassandraPropertyFactory withCql(String cql) { - return setProperty("camel.source.endpoint.cql", cql); + public CamelCassandraPropertyFactory withQuery(String query) { + return setProperty("camel.kamelet.cassandra-source.query", query); } public CamelCassandraPropertyFactory withHosts(String hosts) { - return setProperty("camel.source.path.hosts", hosts); + return setProperty("camel.kamelet.cassandra-source.connectionHost", hosts); } public CamelCassandraPropertyFactory withPort(int port) { @@ -39,29 +39,25 @@ final class CamelCassandraPropertyFactory extends SourceConnectorPropertyFactory } public CamelCassandraPropertyFactory withPort(String port) { - return setProperty("camel.source.path.port", port); + return setProperty("camel.kamelet.cassandra-source.connectionPort", port); } public CamelCassandraPropertyFactory withKeySpace(String value) { - return setProperty("camel.source.path.keyspace", value); + return setProperty("camel.kamelet.cassandra-source.keyspace", value); } public CamelCassandraPropertyFactory withResultSetConversionStrategy(String value) { - return setProperty("camel.source.endpoint.resultSetConversionStrategy", value); - } - - public EndpointUrlBuilder<CamelCassandraPropertyFactory> withUrl(String host, String keySpace) { - String url = String.format("cql://%s/%s", host, keySpace); - - return new EndpointUrlBuilder<>(this::withSourceUrl, url); + return setProperty("camel.endpoint.cql.resultSetConversionStrategy", value); } public static CamelCassandraPropertyFactory basic() { return new CamelCassandraPropertyFactory() - .withName("CamelCassandraQLSourceConnector") + .withName("CamelCassandraSourceConnector") .withTasksMax(1) - .withConnectorClass("org.apache.camel.kafkaconnector.cql.CamelCqlSourceConnector") + .withConnectorClass("org.apache.camel.kafkaconnector.cassandrasource.CamelCassandrasourceSourceConnector") .withKeyConverterClass("org.apache.kafka.connect.storage.StringConverter") - .withValueConverterClass("org.apache.kafka.connect.storage.StringConverter"); + .withValueConverterClass("org.apache.kafka.connect.storage.StringConverter") + .setProperty("camel.kamelet.cassandra-source.consistencyLevel", "ONE") + .setProperty("camel.component.kamelet.location", "kamelets"); } } diff --git a/tests/itests-cassandra/src/test/java/org/apache/camel/kafkaconnector/cassandra/source/CamelSourceCassandraITCase.java b/tests/itests-cassandra/src/test/java/org/apache/camel/kafkaconnector/cassandra/source/CamelSourceCassandraITCase.java index eac92420f..5754b8570 100644 --- a/tests/itests-cassandra/src/test/java/org/apache/camel/kafkaconnector/cassandra/source/CamelSourceCassandraITCase.java +++ b/tests/itests-cassandra/src/test/java/org/apache/camel/kafkaconnector/cassandra/source/CamelSourceCassandraITCase.java @@ -30,7 +30,6 @@ import org.apache.camel.test.infra.cassandra.services.CassandraServiceFactory; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestInstance; import org.junit.jupiter.api.Timeout; @@ -42,7 +41,6 @@ import static org.apache.camel.kafkaconnector.common.BasicConnectorPropertyFacto import static org.junit.jupiter.api.Assertions.assertEquals; @TestInstance(TestInstance.Lifecycle.PER_CLASS) -@Disabled("TODO: Enable and convert once https://github.com/apache/camel-kamelets/pull/636 is published in kamelet-catalog") public class CamelSourceCassandraITCase extends CamelSourceTestSupport { @RegisterExtension public static CassandraService cassandraService = CassandraServiceFactory.createService(); @@ -57,7 +55,7 @@ public class CamelSourceCassandraITCase extends CamelSourceTestSupport { @Override protected String[] getConnectorsInTest() { - return new String[] {"camel-cql-kafka-connector"}; + return new String[] {"camel-cassandra-source-kafka-connector"}; } @BeforeAll @@ -113,7 +111,7 @@ public class CamelSourceCassandraITCase extends CamelSourceTestSupport { .withPort(cassandraService.getCQL3Port()) .withKeySpace(TestDataDao.KEY_SPACE) .withResultSetConversionStrategy("ONE") - .withCql(testDataDao.getSelectStatement()); + .withQuery(testDataDao.getSelectStatement()); runTest(connectorPropertyFactory, topicName, expect); } @@ -128,7 +126,7 @@ public class CamelSourceCassandraITCase extends CamelSourceTestSupport { .withPort(cassandraService.getCQL3Port()) .withKeySpace(TestDataDao.KEY_SPACE) .withResultSetConversionStrategy(classRef(TestResultSetConversionStrategy.class.getName())) - .withCql(testDataDao.getSelectStatement()); + .withQuery(testDataDao.getSelectStatement()); runTest(connectorPropertyFactory, topicName, expect); } diff --git a/tests/itests-couchbase/pom.xml b/tests/itests-couchbase/pom.xml index f2245cd05..0eb564c69 100644 --- a/tests/itests-couchbase/pom.xml +++ b/tests/itests-couchbase/pom.xml @@ -68,7 +68,7 @@ <!-- These tests are flaky and depend on some fragile timeout logic on Couchbase --> - <rerunFailingTestsCount>2</rerunFailingTestsCount> + <rerunFailingTestsCount>1</rerunFailingTestsCount> </configuration> </plugin> </plugins> diff --git a/tests/itests-couchbase/src/test/java/org/apache/camel/kafkaconnector/couchbase/sink/CamelCouchbasePropertyFactory.java b/tests/itests-couchbase/src/test/java/org/apache/camel/kafkaconnector/couchbase/sink/CamelCouchbasePropertyFactory.java index 64d66aaf2..e97623901 100644 --- a/tests/itests-couchbase/src/test/java/org/apache/camel/kafkaconnector/couchbase/sink/CamelCouchbasePropertyFactory.java +++ b/tests/itests-couchbase/src/test/java/org/apache/camel/kafkaconnector/couchbase/sink/CamelCouchbasePropertyFactory.java @@ -17,55 +17,41 @@ package org.apache.camel.kafkaconnector.couchbase.sink; -import org.apache.camel.kafkaconnector.common.EndpointUrlBuilder; import org.apache.camel.kafkaconnector.common.SinkConnectorPropertyFactory; public class CamelCouchbasePropertyFactory extends SinkConnectorPropertyFactory<CamelCouchbasePropertyFactory> { public CamelCouchbasePropertyFactory withProtocol(String value) { - return setProperty("camel.sink.path.protocol", value); + return setProperty("camel.kamelet.couchbase-sink.protocol", value); } public CamelCouchbasePropertyFactory withHostname(String value) { - return setProperty("camel.sink.path.hostname", value); + return setProperty("camel.kamelet.couchbase-sink.couchbaseHostname", value); } public CamelCouchbasePropertyFactory withPort(int value) { - return setProperty("camel.sink.path.port", value); + return setProperty("camel.kamelet.couchbase-sink.couchbasePort", value); } public CamelCouchbasePropertyFactory withBucket(String value) { - return setProperty("camel.sink.endpoint.bucket", value); - } - - public CamelCouchbasePropertyFactory withCollection(String value) { - return setProperty("camel.sink.endpoint.collection", value); - } - - public CamelCouchbasePropertyFactory withOperation(String value) { - return setProperty("camel.sink.endpoint.operation", value); + return setProperty("camel.kamelet.couchbase-sink.bucket", value); } public CamelCouchbasePropertyFactory withUsername(String value) { - return setProperty("camel.sink.endpoint.username", value); + return setProperty("camel.kamelet.couchbase-sink.username", value); } public CamelCouchbasePropertyFactory withPassword(String value) { - return setProperty("camel.sink.endpoint.password", value); - } - - public EndpointUrlBuilder<CamelCouchbasePropertyFactory> withUrl(String protocol, String hostname, int port) { - String sinkUrl = String.format("couchbase:%s://%s:%d", protocol, hostname, port); - - return new EndpointUrlBuilder<>(this::withSinkUrl, sinkUrl); + return setProperty("camel.kamelet.couchbase-sink.password", value); } public static CamelCouchbasePropertyFactory basic() { return new CamelCouchbasePropertyFactory() .withTasksMax(1) - .withName("CamelCouchbaseSinkConnector") - .withConnectorClass("org.apache.camel.kafkaconnector.couchbase.CamelCouchbaseSinkConnector") + .withName("CamelCouchbasesinkSinkConnector") + .withConnectorClass("org.apache.camel.kafkaconnector.couchbasesink.CamelCouchbasesinkSinkConnector") .withKeyConverterClass("org.apache.kafka.connect.storage.StringConverter") - .withValueConverterClass("org.apache.kafka.connect.storage.StringConverter"); + .withValueConverterClass("org.apache.kafka.connect.storage.StringConverter") + .setProperty("camel.component.kamelet.location", "kamelets"); } } diff --git a/tests/itests-couchbase/src/test/java/org/apache/camel/kafkaconnector/couchbase/sink/CamelSinkCouchbaseITCase.java b/tests/itests-couchbase/src/test/java/org/apache/camel/kafkaconnector/couchbase/sink/CamelSinkCouchbaseITCase.java index 8c26ddd3c..25dc25ce3 100644 --- a/tests/itests-couchbase/src/test/java/org/apache/camel/kafkaconnector/couchbase/sink/CamelSinkCouchbaseITCase.java +++ b/tests/itests-couchbase/src/test/java/org/apache/camel/kafkaconnector/couchbase/sink/CamelSinkCouchbaseITCase.java @@ -40,12 +40,9 @@ import org.apache.camel.test.infra.couchbase.services.CouchbaseService; import org.apache.camel.test.infra.couchbase.services.CouchbaseServiceFactory; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Disabled; -import org.junit.jupiter.api.RepeatedTest; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestInstance; import org.junit.jupiter.api.Timeout; -import org.junit.jupiter.api.condition.EnabledIfSystemProperty; import org.junit.jupiter.api.extension.RegisterExtension; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -60,7 +57,7 @@ import static org.junit.jupiter.api.Assertions.fail; Therefore, this test is marked as flaky and only runs if specifically enabled. */ -@EnabledIfSystemProperty(named = "enable.flaky.tests", matches = "true") +//@EnabledIfSystemProperty(named = "enable.flaky.tests", matches = "true") @TestInstance(TestInstance.Lifecycle.PER_CLASS) public class CamelSinkCouchbaseITCase extends CamelSinkTestSupport { @RegisterExtension @@ -99,7 +96,7 @@ public class CamelSinkCouchbaseITCase extends CamelSinkTestSupport { @Override protected String[] getConnectorsInTest() { - return new String[] {"camel-couchbase-kafka-connector"}; + return new String[] {"camel-couchbase-sink-kafka-connector"}; } @BeforeEach @@ -204,7 +201,6 @@ public class CamelSinkCouchbaseITCase extends CamelSinkTestSupport { LOG.debug("Received record: {}", results.get(0)); } - @Disabled("Not formatting the URL correctly - issue #629") @Test @Timeout(90) public void testBasicSendReceive() throws Exception { @@ -219,23 +215,4 @@ public class CamelSinkCouchbaseITCase extends CamelSinkTestSupport { runTest(factory, new CustomProducer(getKafkaService().getBootstrapServers(), topic, expect)); } - - @RepeatedTest(10) - @Timeout(90) - public void testBasicSendReceiveUsingUrl() throws Exception { - ConnectorPropertyFactory factory = CamelCouchbasePropertyFactory.basic() - .withTopics(topic) - .withUrl("http", service.getHostname(), service.getPort()) - .append("bucket", bucketName) - .append("username", service.getUsername()) - .append("password", service.getPassword()) - .append("connectTimeout", 5000) - .append("queryTimeout", 5000) - .append("producerRetryAttempts", 10) - .append("producerRetryPause", 7500) - .buildUrl(); - - - runTest(factory, new CustomProducer(getKafkaService().getBootstrapServers(), topic, expect)); - } } diff --git a/tests/itests-elasticsearch/src/test/java/org/apache/camel/kafkaconnector/elasticsearch/sink/CamelSinkElasticSearchITCase.java b/tests/itests-elasticsearch/src/test/java/org/apache/camel/kafkaconnector/elasticsearch/sink/CamelSinkElasticSearchITCase.java index e6ba773f4..1c3a51528 100644 --- a/tests/itests-elasticsearch/src/test/java/org/apache/camel/kafkaconnector/elasticsearch/sink/CamelSinkElasticSearchITCase.java +++ b/tests/itests-elasticsearch/src/test/java/org/apache/camel/kafkaconnector/elasticsearch/sink/CamelSinkElasticSearchITCase.java @@ -33,10 +33,8 @@ import org.apache.camel.test.infra.elasticsearch.services.ElasticSearchServiceFa import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHits; import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; -import org.junit.jupiter.api.condition.DisabledIfSystemProperty; import org.junit.jupiter.api.extension.RegisterExtension; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -46,9 +44,6 @@ import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.fail; -@Disabled("TODO: Wait for xxx to be released in the kamelet catalog before enabling") -@DisabledIfSystemProperty(named = "kafka.instance.type", matches = "local-(kafka|strimzi)-container", - disabledReason = "Hangs when running with the embedded Kafka Connect instance") public class CamelSinkElasticSearchITCase extends CamelSinkTestSupport { @RegisterExtension public static ElasticSearchService elasticSearch = ElasticSearchServiceFactory.createService(); diff --git a/tests/itests-mongodb/src/test/java/org/apache/camel/kafkaconnector/mongodb/common/CkcMongoDBLocalContainerService.java b/tests/itests-mongodb/src/test/java/org/apache/camel/kafkaconnector/mongodb/common/CkcMongoDBLocalContainerService.java new file mode 100644 index 000000000..b1a7c34bd --- /dev/null +++ b/tests/itests-mongodb/src/test/java/org/apache/camel/kafkaconnector/mongodb/common/CkcMongoDBLocalContainerService.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.kafkaconnector.mongodb.common; + +import org.apache.camel.test.infra.common.services.ContainerService; +import org.apache.camel.test.infra.mongodb.services.MongoDBLocalContainerService; +import org.apache.camel.test.infra.mongodb.services.MongoDBService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.GenericContainer; + +public class CkcMongoDBLocalContainerService implements MongoDBService, ContainerService<GenericContainer> { + private static final Logger LOG = LoggerFactory.getLogger(MongoDBLocalContainerService.class); + private static final int DEFAULT_MONGODB_PORT = 27017; + private final GenericContainer container; + private final String username; + private final String password; + + public CkcMongoDBLocalContainerService(String username, String password) { + this(System.getProperty("mongodb.container"), username, password); + } + + public CkcMongoDBLocalContainerService() { + this(System.getProperty("mongodb.container")); + } + + public CkcMongoDBLocalContainerService(String imageName) { + this.container = this.initContainer(imageName); + this.password = null; + this.username = null; + } + + public CkcMongoDBLocalContainerService(String imageName, String username, String password) { + this.container = this.initContainer(imageName); + this.password = password; + this.username = username; + } + + public CkcMongoDBLocalContainerService(GenericContainer container) { + this.container = container; + this.password = null; + this.username = null; + } + + protected GenericContainer initContainer(String imageName) { + return imageName != null && !imageName.isEmpty() ? new GenericContainer(imageName).withExposedPorts(DEFAULT_MONGODB_PORT) : new GenericContainer(); + } + + public String getReplicaSetUrl() { + if (username == null || password == null) { + return String.format("mongodb://%s:%s", this.container.getContainerIpAddress(), this.container.getMappedPort(DEFAULT_MONGODB_PORT)); + } else { + return String.format("mongodb://%s:%s@%s:%s", username, password, this.container.getContainerIpAddress(), this.container.getMappedPort(DEFAULT_MONGODB_PORT)); + } + } + + public String getConnectionAddress() { + return this.container.getContainerIpAddress() + ":" + this.container.getMappedPort(DEFAULT_MONGODB_PORT); + } + + public void registerProperties() { + System.setProperty("mongodb.url", this.getReplicaSetUrl()); + System.setProperty("mongodb.connection.address", this.getConnectionAddress()); + } + + public void initialize() { + LOG.info("Trying to start the MongoDB service"); + this.container.start(); + this.registerProperties(); + LOG.info("MongoDB service running at {}", getReplicaSetUrl()); + } + + public void shutdown() { + LOG.info("Stopping the MongoDB container"); + this.container.stop(); + } + + public GenericContainer getContainer() { + return this.container; + } +} diff --git a/tests/itests-mongodb/src/test/java/org/apache/camel/kafkaconnector/mongodb/common/MongoDBEnvVarServiceFactory.java b/tests/itests-mongodb/src/test/java/org/apache/camel/kafkaconnector/mongodb/common/MongoDBEnvVarServiceFactory.java index d4c6077ac..137eba1e1 100644 --- a/tests/itests-mongodb/src/test/java/org/apache/camel/kafkaconnector/mongodb/common/MongoDBEnvVarServiceFactory.java +++ b/tests/itests-mongodb/src/test/java/org/apache/camel/kafkaconnector/mongodb/common/MongoDBEnvVarServiceFactory.java @@ -35,13 +35,14 @@ public final class MongoDBEnvVarServiceFactory { } public static MongoDBService createService(Supplier<MongoDBService> localMapping) { - return (MongoDBService)builder().addLocalMapping(localMapping).addRemoteMapping(MongoDBRemoteService::new).build(); + return builder().addLocalMapping(localMapping).addRemoteMapping(MongoDBRemoteService::new).build(); } public static MongoDBService createService(String username, String password) { - MongoDBLocalContainerEnvVarService mongoDBLocalContainerEnvVarService = new MongoDBLocalContainerEnvVarService(); + MongoDBLocalContainerEnvVarService mongoDBLocalContainerEnvVarService = new MongoDBLocalContainerEnvVarService(username, password); mongoDBLocalContainerEnvVarService.addEnvProperty("MONGO_INITDB_ROOT_USERNAME", username); mongoDBLocalContainerEnvVarService.addEnvProperty("MONGO_INITDB_ROOT_PASSWORD", password); + mongoDBLocalContainerEnvVarService.addEnvProperty("MONGO_INITDB_DATABASE", "test"); return createService(() -> mongoDBLocalContainerEnvVarService); } } diff --git a/tests/itests-mongodb/src/test/java/org/apache/camel/kafkaconnector/mongodb/common/MongoDBLocalContainerEnvVarService.java b/tests/itests-mongodb/src/test/java/org/apache/camel/kafkaconnector/mongodb/common/MongoDBLocalContainerEnvVarService.java index 65a650d05..82c858af4 100644 --- a/tests/itests-mongodb/src/test/java/org/apache/camel/kafkaconnector/mongodb/common/MongoDBLocalContainerEnvVarService.java +++ b/tests/itests-mongodb/src/test/java/org/apache/camel/kafkaconnector/mongodb/common/MongoDBLocalContainerEnvVarService.java @@ -16,9 +16,15 @@ */ package org.apache.camel.kafkaconnector.mongodb.common; -import org.apache.camel.test.infra.mongodb.services.MongoDBLocalContainerService; +public class MongoDBLocalContainerEnvVarService extends CkcMongoDBLocalContainerService { + public MongoDBLocalContainerEnvVarService(String username, String password) { + super(username, password); + } + + public MongoDBLocalContainerEnvVarService() { + super(); + } -public class MongoDBLocalContainerEnvVarService extends MongoDBLocalContainerService { public void addEnvProperty(String property, String value) { getContainer().addEnv(property, value); } diff --git a/tests/itests-mongodb/src/test/java/org/apache/camel/kafkaconnector/mongodb/sink/CamelMongoDBPropertyFactory.java b/tests/itests-mongodb/src/test/java/org/apache/camel/kafkaconnector/mongodb/sink/CamelMongoDBPropertyFactory.java index a86c50bd5..5db4a4467 100644 --- a/tests/itests-mongodb/src/test/java/org/apache/camel/kafkaconnector/mongodb/sink/CamelMongoDBPropertyFactory.java +++ b/tests/itests-mongodb/src/test/java/org/apache/camel/kafkaconnector/mongodb/sink/CamelMongoDBPropertyFactory.java @@ -41,8 +41,8 @@ final class CamelMongoDBPropertyFactory extends SinkConnectorPropertyFactory<Cam return setProperty("camel.kamelet.mongodb-sink.database", database); } - public CamelMongoDBPropertyFactory withCollection(String connection) { - return setProperty("camel.kamelet.mongodb-sink.collection", connection); + public CamelMongoDBPropertyFactory withCollection(String collection) { + return setProperty("camel.kamelet.mongodb-sink.collection", collection); } public static CamelMongoDBPropertyFactory basic() { diff --git a/tests/itests-mongodb/src/test/java/org/apache/camel/kafkaconnector/mongodb/sink/CamelSinkMongoDBITCase.java b/tests/itests-mongodb/src/test/java/org/apache/camel/kafkaconnector/mongodb/sink/CamelSinkMongoDBITCase.java index c22c24104..f082d52d5 100644 --- a/tests/itests-mongodb/src/test/java/org/apache/camel/kafkaconnector/mongodb/sink/CamelSinkMongoDBITCase.java +++ b/tests/itests-mongodb/src/test/java/org/apache/camel/kafkaconnector/mongodb/sink/CamelSinkMongoDBITCase.java @@ -31,7 +31,6 @@ import org.apache.camel.test.infra.common.TestUtils; import org.apache.camel.test.infra.mongodb.services.MongoDBService; import org.bson.Document; import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestInstance; import org.junit.jupiter.api.Timeout; @@ -42,13 +41,14 @@ import org.slf4j.LoggerFactory; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.fail; -@Disabled("Waiting for https://github.com/apache/camel-kamelets/pull/485 to be merged and published.") @TestInstance(TestInstance.Lifecycle.PER_CLASS) public class CamelSinkMongoDBITCase extends CamelSinkTestSupport { - @RegisterExtension - public static MongoDBService mongoDBService = MongoDBEnvVarServiceFactory.createService("root", "password"); - private static final Logger LOG = LoggerFactory.getLogger(CamelMongoDBPropertyFactory.class); + private static final String USERNAME = "root"; + private static final String PASSWORD = "password"; + + @RegisterExtension + public static MongoDBService mongoDBService = MongoDBEnvVarServiceFactory.createService(USERNAME, PASSWORD); private MongoClient mongoClient; private String topicName; @@ -78,6 +78,8 @@ public class CamelSinkMongoDBITCase extends CamelSinkTestSupport { public void setUp() { topicName = getTopicForTest(this); mongoClient = MongoClients.create(mongoDBService.getReplicaSetUrl()); + + mongoClient.getDatabase(databaseName).createCollection(collectionName); } @Override @@ -96,8 +98,6 @@ public class CamelSinkMongoDBITCase extends CamelSinkTestSupport { @Override protected void verifyMessages(CountDownLatch latch) throws InterruptedException { if (latch.await(15, TimeUnit.SECONDS)) { - String databaseName = "testDB"; - String collectionName = "testRecords"; verifyDocuments(databaseName, collectionName); } else { @@ -123,10 +123,10 @@ public class CamelSinkMongoDBITCase extends CamelSinkTestSupport { CamelMongoDBPropertyFactory factory = CamelMongoDBPropertyFactory.basic() .withTopics(topicName) - .withDatabase("testDB") - .withCollection("testRecords") - .withUsername("root") - .withPassword("password") + .withDatabase(databaseName) + .withCollection(collectionName) + .withUsername(USERNAME) + .withPassword(PASSWORD) .withHosts(mongoDBService.getConnectionAddress()); runTest(factory, new CustomProducer(getKafkaService().getBootstrapServers(), topicName, expect)); diff --git a/tests/itests-mongodb/src/test/java/org/apache/camel/kafkaconnector/mongodb/source/CamelMongoDBPropertyFactory.java b/tests/itests-mongodb/src/test/java/org/apache/camel/kafkaconnector/mongodb/source/CamelMongoDBPropertyFactory.java index 2c3b45e86..70d94ac42 100644 --- a/tests/itests-mongodb/src/test/java/org/apache/camel/kafkaconnector/mongodb/source/CamelMongoDBPropertyFactory.java +++ b/tests/itests-mongodb/src/test/java/org/apache/camel/kafkaconnector/mongodb/source/CamelMongoDBPropertyFactory.java @@ -42,8 +42,8 @@ final class CamelMongoDBPropertyFactory extends SourceConnectorPropertyFactory<C return setProperty("camel.kamelet.mongodb-source.database", database); } - public CamelMongoDBPropertyFactory withCollection(String connection) { - return setProperty("camel.source.endpoint.collection", connection); + public CamelMongoDBPropertyFactory withCollection(String collection) { + return setProperty("camel.kamelet.mongodb-source.collection", collection); } public static CamelMongoDBPropertyFactory basic() { diff --git a/tests/itests-mongodb/src/test/java/org/apache/camel/kafkaconnector/mongodb/source/CamelSourceMongoDBITCase.java b/tests/itests-mongodb/src/test/java/org/apache/camel/kafkaconnector/mongodb/source/CamelSourceMongoDBITCase.java index 17257b5f0..1698ffb0c 100644 --- a/tests/itests-mongodb/src/test/java/org/apache/camel/kafkaconnector/mongodb/source/CamelSourceMongoDBITCase.java +++ b/tests/itests-mongodb/src/test/java/org/apache/camel/kafkaconnector/mongodb/source/CamelSourceMongoDBITCase.java @@ -34,7 +34,6 @@ import org.apache.camel.test.infra.mongodb.services.MongoDBService; import org.bson.Document; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestInstance; import org.junit.jupiter.api.Timeout; @@ -42,11 +41,13 @@ import org.junit.jupiter.api.extension.RegisterExtension; import static org.junit.jupiter.api.Assertions.assertEquals; -@Disabled("Waiting for https://github.com/apache/camel-kamelets/pull/486 to be merged and published.") @TestInstance(TestInstance.Lifecycle.PER_CLASS) public class CamelSourceMongoDBITCase extends CamelSourceTestSupport { + private static final String USERNAME = "root"; + private static final String PASSWORD = "password"; + @RegisterExtension - public static MongoDBService mongoDBService = MongoDBEnvVarServiceFactory.createService("root", "password"); + public static MongoDBService mongoDBService = MongoDBEnvVarServiceFactory.createService(USERNAME, PASSWORD); private MongoClient mongoClient; private String topicName; @@ -113,8 +114,8 @@ public class CamelSourceMongoDBITCase extends CamelSourceTestSupport { .withKafkaTopic(topicName) .withDatabase("testDatabase") .withCollection("testCollection") - .withUsername("root") - .withPassword("password") + .withUsername(USERNAME) + .withPassword(PASSWORD) .withHosts(mongoDBService.getConnectionAddress()); runTest(factory, topicName, expect); diff --git a/tests/itests-parent/pom.xml b/tests/itests-parent/pom.xml index 7dd7750b2..f3f9f9152 100644 --- a/tests/itests-parent/pom.xml +++ b/tests/itests-parent/pom.xml @@ -37,7 +37,7 @@ <failsafe.args.jul>-Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager</failsafe.args.jul> <failsafe.args.strimzi>-Ditest.strimzi.container.image=${itest.strimzi.container.image} -Ditest.zookeeper.container.image=${itest.zookeeper.container.image}</failsafe.args.strimzi> - <common.failsafe.args>${failsafe.args.jul} ${failsafe.args.basedir} ${failsafe.args.strimzi} -Dmongodb.container=mongo:5.0.2</common.failsafe.args> + <common.failsafe.args>${failsafe.args.jul} ${failsafe.args.basedir} ${failsafe.args.strimzi} -Dmongodb.container=mongo:5.0.11</common.failsafe.args> </properties> <dependencies> diff --git a/tests/itests-rabbitmq/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/source/CamelRabbitMQPropertyFactory.java b/tests/itests-rabbitmq/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/source/CamelRabbitMQPropertyFactory.java index 769e3fa42..8b6faa118 100644 --- a/tests/itests-rabbitmq/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/source/CamelRabbitMQPropertyFactory.java +++ b/tests/itests-rabbitmq/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/source/CamelRabbitMQPropertyFactory.java @@ -40,6 +40,10 @@ public class CamelRabbitMQPropertyFactory extends SourceConnectorPropertyFactory return setProperty("camel.kamelet.rabbitmq-source.exchangeName", value); } + public CamelRabbitMQPropertyFactory withQueue(String queue) { + return setProperty("camel.kamelet.rabbitmq-source.queue", queue); + } + public static CamelRabbitMQPropertyFactory basic() { return new CamelRabbitMQPropertyFactory() .withTasksMax(1) diff --git a/tests/itests-rabbitmq/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/source/RabbitMQSourceITCase.java b/tests/itests-rabbitmq/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/source/RabbitMQSourceITCase.java index 8b094e8ce..4ee89151b 100644 --- a/tests/itests-rabbitmq/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/source/RabbitMQSourceITCase.java +++ b/tests/itests-rabbitmq/src/test/java/org/apache/camel/kafkaconnector/rabbitmq/source/RabbitMQSourceITCase.java @@ -18,6 +18,9 @@ package org.apache.camel.kafkaconnector.rabbitmq.source; import java.util.concurrent.ExecutionException; +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.Connection; +import com.rabbitmq.client.ConnectionFactory; import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory; import org.apache.camel.kafkaconnector.common.test.CamelSourceTestSupport; import org.apache.camel.kafkaconnector.common.test.TestMessageConsumer; @@ -25,7 +28,6 @@ import org.apache.camel.kafkaconnector.rabbitmq.clients.RabbitMQClient; import org.apache.camel.test.infra.rabbitmq.services.RabbitMQService; import org.apache.camel.test.infra.rabbitmq.services.RabbitMQServiceFactory; import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestInstance; import org.junit.jupiter.api.Timeout; @@ -34,8 +36,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.fail; -@Disabled("Until https://github.com/apache/camel-kamelets/pull/502 is merged and published") @TestInstance(TestInstance.Lifecycle.PER_CLASS) public class RabbitMQSourceITCase extends CamelSourceTestSupport { @RegisterExtension @@ -56,9 +58,33 @@ public class RabbitMQSourceITCase extends CamelSourceTestSupport { @BeforeEach public void setUp() { topicName = getTopicForTest(this); - rabbitMQClient = new RabbitMQClient(rabbitmqService.getAmqpUrl()); + Connection connection = null; + try { + LOG.debug("Creating the connection"); + ConnectionFactory factory = new ConnectionFactory(); + factory.setUri(rabbitmqService.getAmqpUrl()); + connection = factory.newConnection(); + LOG.debug("Connection created successfully"); + + LOG.debug("Creating the Channel"); + Channel channel = connection.createChannel(); + LOG.debug("Channel created successfully"); + channel.queueDeclare(DEFAULT_RABBITMQ_QUEUE, true, false, true, null); + } catch (Throwable t) { + LOG.trace("Something wrong happened while initializing the RabbitMQ client: {}", t.getMessage(), t); + fail(); + } finally { + if (connection != null) { + LOG.debug("Closing the connection"); + try { + connection.close(); + } catch (Throwable nestedT) { + LOG.warn("Error closing the {}: {}", "connection", nestedT.getMessage(), nestedT); + } + } + } - rabbitMQClient.createQueue(DEFAULT_RABBITMQ_QUEUE); + rabbitMQClient = new RabbitMQClient(rabbitmqService.getAmqpUrl()); } @Override @@ -83,7 +109,8 @@ public class RabbitMQSourceITCase extends CamelSourceTestSupport { .withAddresses(rabbitmqService.connectionProperties().hostname() + ":" + rabbitmqService.connectionProperties().port()) .withPassword(rabbitmqService.connectionProperties().password()) .withUsername(rabbitmqService.connectionProperties().username()) - .withExchangeName("default"); + .withExchangeName("default") + .withQueue(DEFAULT_RABBITMQ_QUEUE); runTest(factory, topicName, expect); } diff --git a/tests/itests-sql/src/test/java/org/apache/camel/kafkaconnector/sql/source/CamelSourceSQLITCase.java b/tests/itests-sql/src/test/java/org/apache/camel/kafkaconnector/sql/source/CamelSourceSQLITCase.java index 3617d0cd8..6a08de683 100644 --- a/tests/itests-sql/src/test/java/org/apache/camel/kafkaconnector/sql/source/CamelSourceSQLITCase.java +++ b/tests/itests-sql/src/test/java/org/apache/camel/kafkaconnector/sql/source/CamelSourceSQLITCase.java @@ -23,7 +23,6 @@ import org.apache.camel.kafkaconnector.common.test.CamelSourceTestSupport; import org.apache.camel.kafkaconnector.common.test.TestMessageConsumer; import org.apache.camel.test.infra.jdbc.services.JDBCService; import org.apache.camel.test.infra.jdbc.services.JDBCServiceBuilder; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; import org.junit.jupiter.api.extension.RegisterExtension; @@ -32,7 +31,7 @@ import org.testcontainers.containers.PostgreSQLContainer; import static org.junit.jupiter.api.Assertions.assertEquals; -@Disabled("Database connection fails with connection refused.") +//@Disabled("Database connection fails with connection refused.") public class CamelSourceSQLITCase extends CamelSourceTestSupport { private static final String DATABASE_NAME = "camel"; private static final String USERNAME = "ckc"; @@ -48,9 +47,9 @@ public class CamelSourceSQLITCase extends CamelSourceTestSupport { public CamelSourceSQLITCase() { JdbcDatabaseContainer<?> container = new PostgreSQLContainer<>("postgres:13.0") - .withDatabaseName("camel") - .withUsername("ckc") - .withPassword("ckcDevel123") + .withDatabaseName(DATABASE_NAME) + .withUsername(USERNAME) + .withPassword(PASSWORD) .withInitScript("schema.sql") .withStartupTimeoutSeconds(60); diff --git a/tests/itests-sql/src/test/java/org/apache/camel/kafkaconnector/sql/source/CamelSqlPropertyFactory.java b/tests/itests-sql/src/test/java/org/apache/camel/kafkaconnector/sql/source/CamelSqlPropertyFactory.java index f53781758..28b73be28 100644 --- a/tests/itests-sql/src/test/java/org/apache/camel/kafkaconnector/sql/source/CamelSqlPropertyFactory.java +++ b/tests/itests-sql/src/test/java/org/apache/camel/kafkaconnector/sql/source/CamelSqlPropertyFactory.java @@ -45,7 +45,7 @@ public final class CamelSqlPropertyFactory extends SinkConnectorPropertyFactory< } public CamelSqlPropertyFactory withPort(String port) { - return setProperty("camel.kamelet.postgresql-source.port", port); + return setProperty("camel.kamelet.postgresql-source.serverPort", port); } public static CamelSqlPropertyFactory basic() { diff --git a/tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/sink/CamelSinkSshITCase.java b/tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/sink/CamelSinkSshITCase.java index 0aa3f6f70..bc0561727 100644 --- a/tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/sink/CamelSinkSshITCase.java +++ b/tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/sink/CamelSinkSshITCase.java @@ -26,7 +26,6 @@ import org.apache.camel.kafkaconnector.common.test.StringMessageProducer; import org.apache.camel.kafkaconnector.ssh.services.SshService; import org.apache.camel.kafkaconnector.ssh.services.SshServiceFactory; import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestInstance; import org.junit.jupiter.api.Timeout; @@ -36,9 +35,6 @@ import org.slf4j.LoggerFactory; import static org.junit.jupiter.api.Assertions.fail; -@Disabled("Sink ssh kamelet needs to be implemented see: https://github.com/apache/camel-kamelets/issues/504") -//@DisabledIfSystemProperty(named = "kafka.instance.type", matches = "local-(kafka|strimzi)-container", -// disabledReason = "Hangs when running with the embedded Kafka Connect instance") @TestInstance(TestInstance.Lifecycle.PER_CLASS) public class CamelSinkSshITCase extends CamelSinkTestSupport { @RegisterExtension diff --git a/tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/sink/CamelSshPropertyFactory.java b/tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/sink/CamelSshPropertyFactory.java index 02d4cf3ff..8ad9ef678 100644 --- a/tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/sink/CamelSshPropertyFactory.java +++ b/tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/sink/CamelSshPropertyFactory.java @@ -44,7 +44,7 @@ final class CamelSshPropertyFactory extends SinkConnectorPropertyFactory<CamelSs public static CamelSshPropertyFactory basic() { return new CamelSshPropertyFactory().withName("CamelSshSourceConnector") .withTasksMax(1) - .withConnectorClass("org.apache.camel.kafkaconnector.sshsink.CamelSshsinkSinkonnector") + .withConnectorClass("org.apache.camel.kafkaconnector.sshsink.CamelSshsinkSinkConnector") .withKeyConverterClass("org.apache.kafka.connect.storage.StringConverter") .withValueConverterClass("org.apache.kafka.connect.storage.StringConverter") .setProperty("camel.component.kamelet.location", "kamelets") diff --git a/tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/source/CamelSourceSshITCase.java b/tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/source/CamelSourceSshITCase.java index c78fcb0ef..19652b651 100644 --- a/tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/source/CamelSourceSshITCase.java +++ b/tests/itests-ssh/src/test/java/org/apache/camel/kafkaconnector/ssh/source/CamelSourceSshITCase.java @@ -29,13 +29,10 @@ import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestInstance; import org.junit.jupiter.api.Timeout; -import org.junit.jupiter.api.condition.DisabledIfSystemProperty; import org.junit.jupiter.api.extension.RegisterExtension; import static org.junit.jupiter.api.Assertions.assertEquals; -@DisabledIfSystemProperty(named = "kafka.instance.type", matches = "local-(kafka|strimzi)-container", - disabledReason = "Hangs when running with the embedded Kafka Connect instance") @TestInstance(TestInstance.Lifecycle.PER_CLASS) public class CamelSourceSshITCase extends CamelSourceTestSupport { @RegisterExtension diff --git a/tests/itests-syslog/src/test/java/org/apache/camel/kafkaconnector/syslog/sink/CamelSinkSyslogITCase.java b/tests/itests-syslog/src/test/java/org/apache/camel/kafkaconnector/syslog/sink/CamelSinkSyslogITCase.java index 544659816..b5bc282ed 100644 --- a/tests/itests-syslog/src/test/java/org/apache/camel/kafkaconnector/syslog/sink/CamelSinkSyslogITCase.java +++ b/tests/itests-syslog/src/test/java/org/apache/camel/kafkaconnector/syslog/sink/CamelSinkSyslogITCase.java @@ -27,7 +27,6 @@ import org.apache.camel.kafkaconnector.common.test.StringMessageProducer; import org.apache.camel.kafkaconnector.common.utils.NetworkUtils; import org.apache.camel.kafkaconnector.syslog.services.SyslogService; import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; import org.junit.jupiter.api.extension.RegisterExtension; @@ -41,7 +40,6 @@ import static org.junit.jupiter.api.Assertions.fail; * A simple test case that checks whether the syslog send the expected number of * messages */ -@Disabled("Netty component seems to be failing on 3.18 in this scenario") public class CamelSinkSyslogITCase extends CamelSinkTestSupport { private static final String HOST = NetworkUtils.getHostname(); private static final String PROTOCOL = "udp";