This is an automated email from the ASF dual-hosted git repository. ppalaga pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel-quarkus.git
commit a17a5c3f04749fd9b5af67766b4e00f5413d32a8 Author: JiriOndrusek <[email protected]> AuthorDate: Fri Dec 11 15:27:53 2020 +0100 Debezium MongoDB Connector native support #1190 --- .../reference/extensions/debezium-mongodb.adoc | 8 +- .../reference/components/debezium-mongodb.adoc | 6 +- .../debezium-mongodb/integration-test/pom.xml | 66 --------- .../mongodb/it/DebeziumMongodbResource.java | 51 ------- extensions-jvm/pom.xml | 1 - .../debezium-mongodb/deployment/pom.xml | 14 +- .../deployment/DebeziumMongodbProcessor.java | 23 ++- .../debezium-mongodb/pom.xml | 1 - .../debezium-mongodb/runtime/pom.xml | 2 +- .../main/resources/META-INF/quarkus-extension.yaml | 3 +- extensions/pom.xml | 1 + integration-tests/debezium/pom.xml | 24 ++++ .../common/it/DebeziumMongodbResource.java | 69 +++++++++ .../quarkus/component/debezium/common/it/Type.java | 2 +- .../debezium/common/it/AbstractDebeziumTest.java | 41 +++--- .../common/it/AbstractDebeziumTestResource.java | 13 +- .../common/it/mongodb/DebeziumMongodbIT.java | 18 +-- .../common/it/mongodb/DebeziumMongodbTest.java | 160 +++++++++++++++++++++ .../it/mongodb/DebeziumMongodbTestResource.java | 95 ++++++++++++ .../sqlserver/DebeziumSqlserverTestResource.java | 5 +- .../debezium/src/test/resources/initMongodb.txt | 36 +++++ pom.xml | 2 +- poms/bom-test/pom.xml | 5 + poms/bom/pom.xml | 11 +- 24 files changed, 465 insertions(+), 192 deletions(-) diff --git a/docs/modules/ROOT/pages/reference/extensions/debezium-mongodb.adoc b/docs/modules/ROOT/pages/reference/extensions/debezium-mongodb.adoc index 31e93f1..92ed46c 100644 --- a/docs/modules/ROOT/pages/reference/extensions/debezium-mongodb.adoc +++ b/docs/modules/ROOT/pages/reference/extensions/debezium-mongodb.adoc @@ -3,15 +3,15 @@ = Debezium MongoDB Connector :page-aliases: extensions/debezium-mongodb.adoc :cq-artifact-id: camel-quarkus-debezium-mongodb -:cq-native-supported: false -:cq-status: Preview +:cq-native-supported: true +:cq-status: Stable :cq-description: Capture changes from a MongoDB database. :cq-deprecated: false :cq-jvm-since: 1.0.0 -:cq-native-since: 1.0.0 +:cq-native-since: 1.6.0 [.badges] -[.badge-key]##JVM since##[.badge-supported]##1.0.0## [.badge-key]##Native##[.badge-unsupported]##unsupported## +[.badge-key]##JVM since##[.badge-supported]##1.0.0## [.badge-key]##Native since##[.badge-supported]##1.6.0## Capture changes from a MongoDB database. diff --git a/docs/modules/ROOT/partials/reference/components/debezium-mongodb.adoc b/docs/modules/ROOT/partials/reference/components/debezium-mongodb.adoc index e53e90c..e4c1c31 100644 --- a/docs/modules/ROOT/partials/reference/components/debezium-mongodb.adoc +++ b/docs/modules/ROOT/partials/reference/components/debezium-mongodb.adoc @@ -2,11 +2,11 @@ // This file was generated by camel-quarkus-maven-plugin:update-extension-doc-page :cq-artifact-id: camel-quarkus-debezium-mongodb :cq-artifact-id-base: debezium-mongodb -:cq-native-supported: false -:cq-status: Preview +:cq-native-supported: true +:cq-status: Stable :cq-deprecated: false :cq-jvm-since: 1.0.0 -:cq-native-since: 1.0.0 +:cq-native-since: 1.6.0 :cq-camel-part-name: debezium-mongodb :cq-camel-part-title: Debezium MongoDB Connector :cq-camel-part-description: Capture changes from a MongoDB database. diff --git a/extensions-jvm/debezium-mongodb/integration-test/pom.xml b/extensions-jvm/debezium-mongodb/integration-test/pom.xml deleted file mode 100644 index 80ed99c..0000000 --- a/extensions-jvm/debezium-mongodb/integration-test/pom.xml +++ /dev/null @@ -1,66 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- - - Licensed to the Apache Software Foundation (ASF) under one or more - contributor license agreements. See the NOTICE file distributed with - this work for additional information regarding copyright ownership. - The ASF licenses this file to You under the Apache License, Version 2.0 - (the "License"); you may not use this file except in compliance with - the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. - ---> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> - <modelVersion>4.0.0</modelVersion> - <parent> - <groupId>org.apache.camel.quarkus</groupId> - <artifactId>camel-quarkus-build-parent-it</artifactId> - <version>1.6.0-SNAPSHOT</version> - <relativePath>../../../poms/build-parent-it/pom.xml</relativePath> - </parent> - - <artifactId>camel-quarkus-debezium-mongodb-integration-test</artifactId> - <name>Camel Quarkus :: Debezium MongoDB Connector :: Integration Test</name> - <description>Integration tests for Camel Quarkus Debezium MongoDB Connector extension</description> - - <properties> - <!-- mvnd, a.k.a. Maven Daemon: https://github.com/mvndaemon/mvnd --> - <!-- The following rule tells mvnd to build the listed deployment modules before this module. --> - <!-- This is important because mvnd builds modules in parallel by default. The deployment modules are not --> - <!-- explicit dependencies of this module in the Maven sense, although they are required by the Quarkus Maven plugin. --> - <!-- Please update the rule whenever you change the dependencies of this module by running --> - <!-- mvn process-resources -Pformat from the root directory --> - <mvnd.builder.rule>camel-quarkus-debezium-mongodb-deployment</mvnd.builder.rule> - </properties> - - <dependencies> - <dependency> - <groupId>org.apache.camel.quarkus</groupId> - <artifactId>camel-quarkus-debezium-mongodb</artifactId> - </dependency> - <dependency> - <groupId>io.quarkus</groupId> - <artifactId>quarkus-resteasy</artifactId> - </dependency> - - <!-- test dependencies --> - <dependency> - <groupId>io.quarkus</groupId> - <artifactId>quarkus-junit5</artifactId> - <scope>test</scope> - </dependency> - <dependency> - <groupId>io.rest-assured</groupId> - <artifactId>rest-assured</artifactId> - <scope>test</scope> - </dependency> - </dependencies> - -</project> diff --git a/extensions-jvm/debezium-mongodb/integration-test/src/main/java/org/apache/camel/quarkus/component/debezium/mongodb/it/DebeziumMongodbResource.java b/extensions-jvm/debezium-mongodb/integration-test/src/main/java/org/apache/camel/quarkus/component/debezium/mongodb/it/DebeziumMongodbResource.java deleted file mode 100644 index 6341702..0000000 --- a/extensions-jvm/debezium-mongodb/integration-test/src/main/java/org/apache/camel/quarkus/component/debezium/mongodb/it/DebeziumMongodbResource.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.camel.quarkus.component.debezium.mongodb.it; - -import javax.enterprise.context.ApplicationScoped; -import javax.inject.Inject; -import javax.ws.rs.GET; -import javax.ws.rs.Path; -import javax.ws.rs.Produces; -import javax.ws.rs.core.MediaType; -import javax.ws.rs.core.Response; - -import org.apache.camel.CamelContext; -import org.jboss.logging.Logger; - -@Path("/debezium-mongodb") -@ApplicationScoped -public class DebeziumMongodbResource { - - private static final Logger LOG = Logger.getLogger(DebeziumMongodbResource.class); - - private static final String COMPONENT_DEBEZIUM_MONGODB = "debezium-mongodb"; - @Inject - CamelContext context; - - @Path("/load/component/debezium-mongodb") - @GET - @Produces(MediaType.TEXT_PLAIN) - public Response loadComponentDebeziumMongodb() throws Exception { - /* This is an autogenerated test */ - if (context.getComponent(COMPONENT_DEBEZIUM_MONGODB) != null) { - return Response.ok().build(); - } - LOG.warnf("Could not load [%s] from the Camel context", COMPONENT_DEBEZIUM_MONGODB); - return Response.status(500, COMPONENT_DEBEZIUM_MONGODB + " could not be loaded from the Camel context").build(); - } -} diff --git a/extensions-jvm/pom.xml b/extensions-jvm/pom.xml index 6d803fe..bb4ec67 100644 --- a/extensions-jvm/pom.xml +++ b/extensions-jvm/pom.xml @@ -59,7 +59,6 @@ <module>cometd</module> <module>corda</module> <module>couchbase</module> - <module>debezium-mongodb</module> <module>digitalocean</module> <module>djl</module> <module>dns</module> diff --git a/extensions-jvm/debezium-mongodb/deployment/pom.xml b/extensions/debezium-mongodb/deployment/pom.xml similarity index 87% rename from extensions-jvm/debezium-mongodb/deployment/pom.xml rename to extensions/debezium-mongodb/deployment/pom.xml index e13bb64..d4523ba 100644 --- a/extensions-jvm/debezium-mongodb/deployment/pom.xml +++ b/extensions/debezium-mongodb/deployment/pom.xml @@ -17,7 +17,9 @@ limitations under the License. --> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.apache.camel.quarkus</groupId> @@ -36,15 +38,19 @@ </dependency> <dependency> <groupId>org.apache.camel.quarkus</groupId> - <artifactId>camel-quarkus-support-debezium-deployment</artifactId> + <artifactId>camel-quarkus-debezium-mongodb</artifactId> </dependency> <dependency> <groupId>org.apache.camel.quarkus</groupId> - <artifactId>camel-quarkus-mongodb-deployment</artifactId> + <artifactId>camel-quarkus-support-debezium-deployment</artifactId> + </dependency> + <dependency> + <groupId>io.quarkus</groupId> + <artifactId>quarkus-mongodb-client-deployment</artifactId> </dependency> <dependency> <groupId>org.apache.camel.quarkus</groupId> - <artifactId>camel-quarkus-debezium-mongodb</artifactId> + <artifactId>camel-quarkus-mongodb-deployment</artifactId> </dependency> </dependencies> diff --git a/extensions-jvm/debezium-mongodb/deployment/src/main/java/org/apache/camel/quarkus/component/debezium/mongodb/deployment/DebeziumMongodbProcessor.java b/extensions/debezium-mongodb/deployment/src/main/java/org/apache/camel/quarkus/component/debezium/mongodb/deployment/DebeziumMongodbProcessor.java similarity index 62% rename from extensions-jvm/debezium-mongodb/deployment/src/main/java/org/apache/camel/quarkus/component/debezium/mongodb/deployment/DebeziumMongodbProcessor.java rename to extensions/debezium-mongodb/deployment/src/main/java/org/apache/camel/quarkus/component/debezium/mongodb/deployment/DebeziumMongodbProcessor.java index 2231405..45c7089 100644 --- a/extensions-jvm/debezium-mongodb/deployment/src/main/java/org/apache/camel/quarkus/component/debezium/mongodb/deployment/DebeziumMongodbProcessor.java +++ b/extensions/debezium-mongodb/deployment/src/main/java/org/apache/camel/quarkus/component/debezium/mongodb/deployment/DebeziumMongodbProcessor.java @@ -16,16 +16,13 @@ */ package org.apache.camel.quarkus.component.debezium.mongodb.deployment; +import io.debezium.connector.mongodb.MongoDbConnector; +import io.debezium.connector.mongodb.MongoDbConnectorTask; import io.quarkus.deployment.annotations.BuildStep; -import io.quarkus.deployment.annotations.ExecutionTime; -import io.quarkus.deployment.annotations.Record; import io.quarkus.deployment.builditem.FeatureBuildItem; -import io.quarkus.deployment.pkg.steps.NativeBuild; -import org.apache.camel.quarkus.core.JvmOnlyRecorder; -import org.jboss.logging.Logger; +import io.quarkus.deployment.builditem.nativeimage.ReflectiveClassBuildItem; class DebeziumMongodbProcessor { - private static final Logger LOG = Logger.getLogger(DebeziumMongodbProcessor.class); private static final String FEATURE = "camel-debezium-mongodb"; @@ -34,14 +31,10 @@ class DebeziumMongodbProcessor { return new FeatureBuildItem(FEATURE); } - /** - * Remove this once this extension starts supporting the native mode. - */ - @BuildStep(onlyIf = NativeBuild.class) - @Record(value = ExecutionTime.RUNTIME_INIT) - void warnJvmInNative(JvmOnlyRecorder recorder) { - JvmOnlyRecorder.warnJvmInNative(LOG, FEATURE); // warn at build time - recorder.warnJvmInNative(FEATURE); // warn at runtime + @BuildStep + ReflectiveClassBuildItem reflectiveClasses() { + return new ReflectiveClassBuildItem(false, false, + new String[] { MongoDbConnector.class.getName(), + MongoDbConnectorTask.class.getName() }); } - } diff --git a/extensions-jvm/debezium-mongodb/pom.xml b/extensions/debezium-mongodb/pom.xml similarity index 97% rename from extensions-jvm/debezium-mongodb/pom.xml rename to extensions/debezium-mongodb/pom.xml index 2d64168..8d2cdd8 100644 --- a/extensions-jvm/debezium-mongodb/pom.xml +++ b/extensions/debezium-mongodb/pom.xml @@ -33,6 +33,5 @@ <modules> <module>deployment</module> <module>runtime</module> - <module>integration-test</module> </modules> </project> diff --git a/extensions-jvm/debezium-mongodb/runtime/pom.xml b/extensions/debezium-mongodb/runtime/pom.xml similarity index 98% rename from extensions-jvm/debezium-mongodb/runtime/pom.xml rename to extensions/debezium-mongodb/runtime/pom.xml index 60820b7..28165cf 100644 --- a/extensions-jvm/debezium-mongodb/runtime/pom.xml +++ b/extensions/debezium-mongodb/runtime/pom.xml @@ -31,7 +31,7 @@ <properties> <camel.quarkus.jvmSince>1.0.0</camel.quarkus.jvmSince> - <camel.quarkus.nativeSince>1.0.0</camel.quarkus.nativeSince> + <camel.quarkus.nativeSince>1.6.0</camel.quarkus.nativeSince> </properties> <dependencyManagement> diff --git a/extensions-jvm/debezium-mongodb/runtime/src/main/resources/META-INF/quarkus-extension.yaml b/extensions/debezium-mongodb/runtime/src/main/resources/META-INF/quarkus-extension.yaml similarity index 97% rename from extensions-jvm/debezium-mongodb/runtime/src/main/resources/META-INF/quarkus-extension.yaml rename to extensions/debezium-mongodb/runtime/src/main/resources/META-INF/quarkus-extension.yaml index ce828ea..d6d0ac4 100644 --- a/extensions-jvm/debezium-mongodb/runtime/src/main/resources/META-INF/quarkus-extension.yaml +++ b/extensions/debezium-mongodb/runtime/src/main/resources/META-INF/quarkus-extension.yaml @@ -24,9 +24,8 @@ name: "Camel Debezium MongoDB Connector" description: "Capture changes from a MongoDB database" metadata: - unlisted: true guide: "https://camel.apache.org/camel-quarkus/latest/reference/extensions/debezium-mongodb.html" categories: - "integration" status: - - "preview" + - "stable" diff --git a/extensions/pom.xml b/extensions/pom.xml index 445b245..6ea25f0 100644 --- a/extensions/pom.xml +++ b/extensions/pom.xml @@ -91,6 +91,7 @@ <module>csimple</module> <module>csv</module> <module>dataformat</module> + <module>debezium-mongodb</module> <module>debezium-mysql</module> <module>debezium-postgres</module> <module>debezium-sqlserver</module> diff --git a/integration-tests/debezium/pom.xml b/integration-tests/debezium/pom.xml index e17770d..c81fb2b 100644 --- a/integration-tests/debezium/pom.xml +++ b/integration-tests/debezium/pom.xml @@ -57,6 +57,10 @@ <groupId>org.apache.camel.quarkus</groupId> <artifactId>camel-quarkus-debezium-postgres</artifactId> </dependency> + <dependency> + <groupId>org.apache.camel.quarkus</groupId> + <artifactId>camel-quarkus-debezium-mongodb</artifactId> + </dependency> <dependency> <groupId>io.quarkus</groupId> @@ -98,6 +102,26 @@ <artifactId>mssqlserver</artifactId> <scope>test</scope> </dependency> + <dependency> + <groupId>org.testcontainers</groupId> + <artifactId>mongodb</artifactId> + <scope>test</scope> + </dependency> + + <!-- The following dependencies guarantee that this module is built after them. You can update them by running `mvn process-resources -Pformat -N` from the source tree root directory --> + <dependency> + <groupId>org.apache.camel.quarkus</groupId> + <artifactId>camel-quarkus-debezium-mongodb-deployment</artifactId> + <version>${project.version}</version> + <type>pom</type> + <scope>test</scope> + <exclusions> + <exclusion> + <groupId>*</groupId> + <artifactId>*</artifactId> + </exclusion> + </exclusions> + </dependency> <!-- The following dependencies guarantee that this module is built after them. You can update them by running `mvn process-resources -Pformat -N` from the source tree root directory --> <dependency> diff --git a/integration-tests/debezium/src/main/java/org/apache/camel/quarkus/component/debezium/common/it/DebeziumMongodbResource.java b/integration-tests/debezium/src/main/java/org/apache/camel/quarkus/component/debezium/common/it/DebeziumMongodbResource.java new file mode 100644 index 0000000..336374b --- /dev/null +++ b/integration-tests/debezium/src/main/java/org/apache/camel/quarkus/component/debezium/common/it/DebeziumMongodbResource.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.quarkus.component.debezium.common.it; + +import javax.enterprise.context.ApplicationScoped; +import javax.ws.rs.GET; +import javax.ws.rs.Path; +import javax.ws.rs.Produces; +import javax.ws.rs.core.MediaType; + +@Path("/debezium-mongodb") +@ApplicationScoped +public class DebeziumMongodbResource extends AbstractDebeziumResource { + + public DebeziumMongodbResource() { + super(Type.mongodb); + } + + @Path("/receiveAsRecord") + @GET + @Produces(MediaType.APPLICATION_JSON) + public Record receiveAsRecord() { + return super.receiveAsRecord(); + } + + @Path("/receive") + @GET + @Produces(MediaType.TEXT_PLAIN) + public String receive() { + return super.receive(); + } + + @Path("/receiveOperation") + @GET + @Produces(MediaType.TEXT_PLAIN) + public String receiveOperation() { + Record record = receiveAsRecord(); + + if (record == null) { + return null; + } + return record.getOperation(); + } + + @Override + String getEndpoinUrl(String hostname, String port, String username, String password, String databaseServerName, + String offsetStorageFileName) { + return Type.mongodb.getComponent() + ":localhost?" + + "offsetStorageFileName=" + offsetStorageFileName + + "&mongodbUser=" + System.getProperty(Type.mongodb.getPropertyUsername()) + + "&mongodbPassword=" + System.getProperty(Type.mongodb.getPropertyPassword()) + + "&mongodbName=docker-rs" + + "&mongodbHosts=" + hostname + ":" + port; + } +} diff --git a/integration-tests/debezium/src/main/java/org/apache/camel/quarkus/component/debezium/common/it/Type.java b/integration-tests/debezium/src/main/java/org/apache/camel/quarkus/component/debezium/common/it/Type.java index 13c046e..503e6a0 100644 --- a/integration-tests/debezium/src/main/java/org/apache/camel/quarkus/component/debezium/common/it/Type.java +++ b/integration-tests/debezium/src/main/java/org/apache/camel/quarkus/component/debezium/common/it/Type.java @@ -23,7 +23,7 @@ package org.apache.camel.quarkus.component.debezium.common.it; */ public enum Type { - postgres, mysql, sqlserver; + postgres, mysql, sqlserver, mongodb; /** name of the camel component */ public String getComponent() { diff --git a/integration-tests/debezium/src/test/java/org/apache/camel/quarkus/component/debezium/common/it/AbstractDebeziumTest.java b/integration-tests/debezium/src/test/java/org/apache/camel/quarkus/component/debezium/common/it/AbstractDebeziumTest.java index 8438b85..26b0c30 100644 --- a/integration-tests/debezium/src/test/java/org/apache/camel/quarkus/component/debezium/common/it/AbstractDebeziumTest.java +++ b/integration-tests/debezium/src/test/java/org/apache/camel/quarkus/component/debezium/common/it/AbstractDebeziumTest.java @@ -39,10 +39,10 @@ import static org.hamcrest.Matchers.is; public abstract class AbstractDebeziumTest { private static final Logger LOG = Logger.getLogger(AbstractDebeziumTest.class); - private static String COMPANY_1 = "Best Company"; - private static String COMPANY_2 = "Even Better Company"; - private static String CITY_1 = "Prague"; - private static String CITY_2 = "Paris"; + protected static String COMPANY_1 = "Best Company"; + protected static String COMPANY_2 = "Even Better Company"; + protected static String CITY_1 = "Prague"; + protected static String CITY_2 = "Paris"; public static int REPEAT_COUNT = 3; /** @@ -63,16 +63,12 @@ public abstract class AbstractDebeziumTest { @Test @Order(1) public void testInsert() throws SQLException { - if (getConnection() == null) { - LOG.warn("Test 'testInsert' is skipped, because container is not running."); - return; - } + isInitialized("Test 'testInsert' is skipped, because container is not running."); int i = 0; while (i++ < REPEAT_COUNT) { - executeUpdate(String.format("INSERT INTO %s (name, city) VALUES ('%s', '%s')", getCompanyTableName(), - COMPANY_1 + "_" + i, CITY_1)); + insertCompany(COMPANY_1 + "_" + i, CITY_1); Response response = receiveResponse(); @@ -94,13 +90,19 @@ public abstract class AbstractDebeziumTest { i < REPEAT_COUNT); } + protected void isInitialized(String s) { + Assert.assertNotNull(s, getConnection()); + } + + protected void insertCompany(String name, String city) throws SQLException { + executeUpdate(String.format("INSERT INTO %s (name, city) VALUES ('%s', '%s')", getCompanyTableName(), + name, city)); + } + @Test @Order(2) public void testUpdate() throws SQLException { - if (getConnection() == null) { - LOG.warn("Test 'testUpdate' is skipped, because container is not running."); - return; - } + isInitialized("Test 'testUpdate' is skipped, because container is not running."); executeUpdate(String.format("INSERT INTO %s (name, city) VALUES ('%s', '%s')", getCompanyTableName(), COMPANY_2, CITY_2)); @@ -120,10 +122,7 @@ public abstract class AbstractDebeziumTest { @Test @Order(3) public void testDelete() throws SQLException { - if (getConnection() == null) { - LOG.warn("Test 'testDelete' is skipped, because container is not running."); - return; - } + isInitialized("Test 'testDelete' is skipped, because container is not running."); int res = executeUpdate("DELETE FROM " + getCompanyTableName()); int i = 0; @@ -148,6 +147,12 @@ public abstract class AbstractDebeziumTest { .body(stringMatcher); } + protected void receiveResponse(int statusCode, Matcher<String> stringMatcher, String methodName) { + receiveResponse(methodName).then() + .statusCode(statusCode) + .body(stringMatcher); + } + protected int executeUpdate(String sql) throws SQLException { try (Statement statement = getConnection().createStatement()) { return statement.executeUpdate(sql); diff --git a/integration-tests/debezium/src/test/java/org/apache/camel/quarkus/component/debezium/common/it/AbstractDebeziumTestResource.java b/integration-tests/debezium/src/test/java/org/apache/camel/quarkus/component/debezium/common/it/AbstractDebeziumTestResource.java index 86be357..68f168a 100644 --- a/integration-tests/debezium/src/test/java/org/apache/camel/quarkus/component/debezium/common/it/AbstractDebeziumTestResource.java +++ b/integration-tests/debezium/src/test/java/org/apache/camel/quarkus/component/debezium/common/it/AbstractDebeziumTestResource.java @@ -61,16 +61,19 @@ public abstract class AbstractDebeziumTestResource<T extends GenericContainer> i container = createContainer(); - container.start(); + startContainer(); Map<String, String> map = CollectionHelper.mapOf( type.getPropertyHostname(), container.getContainerIpAddress(), type.getPropertyPort(), container.getMappedPort(getPort()) + "", - type.getPropertyUsername(), getUsername(), - type.getPropertyPassword(), getPassword(), type.getPropertyOffsetFileName(), storeFile.toString(), type.getPropertyJdbc(), getJdbcUrl()); + if (getUsername() != null) { + map.put(type.getPropertyUsername(), getUsername()); + map.put(type.getPropertyPassword(), getPassword()); + } + return map; } catch (Exception e) { @@ -79,6 +82,10 @@ public abstract class AbstractDebeziumTestResource<T extends GenericContainer> i } } + protected void startContainer() throws Exception { + container.start(); + } + @Override public void stop() { try { diff --git a/extensions-jvm/debezium-mongodb/integration-test/src/test/java/org/apache/camel/quarkus/component/debezium/mongodb/it/DebeziumMongodbTest.java b/integration-tests/debezium/src/test/java/org/apache/camel/quarkus/component/debezium/common/it/mongodb/DebeziumMongodbIT.java similarity index 63% rename from extensions-jvm/debezium-mongodb/integration-test/src/test/java/org/apache/camel/quarkus/component/debezium/mongodb/it/DebeziumMongodbTest.java rename to integration-tests/debezium/src/test/java/org/apache/camel/quarkus/component/debezium/common/it/mongodb/DebeziumMongodbIT.java index f14194a..25b04d6 100644 --- a/extensions-jvm/debezium-mongodb/integration-test/src/test/java/org/apache/camel/quarkus/component/debezium/mongodb/it/DebeziumMongodbTest.java +++ b/integration-tests/debezium/src/test/java/org/apache/camel/quarkus/component/debezium/common/it/mongodb/DebeziumMongodbIT.java @@ -14,21 +14,11 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.camel.quarkus.component.debezium.mongodb.it; +package org.apache.camel.quarkus.component.debezium.common.it.mongodb; -import io.quarkus.test.junit.QuarkusTest; -import io.restassured.RestAssured; -import org.junit.jupiter.api.Test; +import io.quarkus.test.junit.NativeImageTest; -@QuarkusTest -class DebeziumMongodbTest { - - @Test - public void loadComponentDebeziumMongodb() { - /* A simple autogenerated test */ - RestAssured.get("/debezium-mongodb/load/component/debezium-mongodb") - .then() - .statusCode(200); - } +@NativeImageTest +class DebeziumMongodbIT extends DebeziumMongodbTest { } diff --git a/integration-tests/debezium/src/test/java/org/apache/camel/quarkus/component/debezium/common/it/mongodb/DebeziumMongodbTest.java b/integration-tests/debezium/src/test/java/org/apache/camel/quarkus/component/debezium/common/it/mongodb/DebeziumMongodbTest.java new file mode 100644 index 0000000..dc7c161 --- /dev/null +++ b/integration-tests/debezium/src/test/java/org/apache/camel/quarkus/component/debezium/common/it/mongodb/DebeziumMongodbTest.java @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.quarkus.component.debezium.common.it.mongodb; + +import java.sql.Connection; +import java.sql.SQLException; + +import com.mongodb.client.MongoClient; +import com.mongodb.client.MongoClients; +import com.mongodb.client.MongoCollection; +import com.mongodb.client.MongoDatabase; +import com.mongodb.client.result.DeleteResult; +import io.quarkus.test.common.QuarkusTestResource; +import io.quarkus.test.junit.QuarkusTest; +import org.apache.camel.quarkus.component.debezium.common.it.AbstractDebeziumTest; +import org.apache.camel.quarkus.component.debezium.common.it.Type; +import org.bson.Document; +import org.jboss.logging.Logger; +import org.junit.Assert; +import org.junit.Before; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.MethodOrderer; +import org.junit.jupiter.api.Order; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestMethodOrder; +import org.junit.jupiter.api.condition.EnabledIfSystemProperty; + +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; + +@QuarkusTest +@QuarkusTestResource(DebeziumMongodbTestResource.class) +@TestMethodOrder(MethodOrderer.OrderAnnotation.class) +class DebeziumMongodbTest extends AbstractDebeziumTest { + private static final Logger LOG = Logger.getLogger(DebeziumMongodbTest.class); + + //constant with value of Type.mongodb.getJdbcProperty + public static final String PROPERTY_JDBC = "mongodb_jdbc"; + + private static MongoClient mongoClient; + + private static MongoCollection companies; + + public DebeziumMongodbTest() { + super(Type.mongodb); + } + + @BeforeAll + public static void setUp() throws SQLException { + final String mongoUrl = System.getProperty(Type.mongodb.getPropertyJdbc()); + + if (mongoUrl != null) { + mongoClient = MongoClients.create(mongoUrl); + } else { + LOG.warn("Container is not running. Connection is not created."); + } + + org.junit.Assume.assumeTrue(mongoClient != null); + + MongoDatabase db = mongoClient.getDatabase("test"); + + companies = db.getCollection("companies"); + } + + @Before + public void before() { + org.junit.Assume.assumeTrue(mongoClient != null); + } + + @AfterAll + public static void cleanUp() throws SQLException { + if (mongoClient != null) { + mongoClient.close(); + } + } + + @Override + protected Connection getConnection() { + throw new IllegalStateException("Not used"); + } + + @Override + protected String getCompanyTableName() { + throw new IllegalStateException("Not used"); + } + + @Test + @Order(0) + @EnabledIfSystemProperty(named = PROPERTY_JDBC, matches = ".*") + public void testReceiveInit() { + receiveResponse() + .then() + .statusCode(200) + .body(containsString("init")); + } + + @Override + protected void insertCompany(String name, String city) { + Document doc = new Document(); + doc.put("name", name); + doc.put("city", city); + companies.insertOne(doc); + } + + @Override + protected void isInitialized(String s) { + Assert.assertNotNull(s, mongoClient); + } + + @Test + @Order(1) + @EnabledIfSystemProperty(named = PROPERTY_JDBC, matches = ".*") + public void testInsert() throws SQLException { + super.testInsert(); + } + + @Test + @Order(2) + @EnabledIfSystemProperty(named = PROPERTY_JDBC, matches = ".*") + public void testUpdate() throws SQLException { + Document doc = new Document().append("name", COMPANY_2).append("city", CITY_2); + companies.insertOne(doc); + + //validate that event is received + receiveResponse(200, containsString(COMPANY_2)); + + Document searchQuery = new Document().append("name", COMPANY_2); + Document updateQuery = new Document().append("$set", new Document().append("city", CITY_2 + "_changed")); + companies.updateMany(searchQuery, updateQuery); + + //validate that event for create is in queue + receiveResponse(200, containsString(CITY_2 + "_changed")); + } + + @Test + @Order(3) + @EnabledIfSystemProperty(named = PROPERTY_JDBC, matches = ".*") + public void testDelete() throws SQLException { + DeleteResult dr = companies.deleteMany(new Document().append("name", COMPANY_2)); + Assert.assertEquals("Only one company should be deleted.", 1, dr.getDeletedCount()); + + //validate that event for delete is in queue + receiveResponse(200, equalTo("d"), "/receiveOperation"); + } +} diff --git a/integration-tests/debezium/src/test/java/org/apache/camel/quarkus/component/debezium/common/it/mongodb/DebeziumMongodbTestResource.java b/integration-tests/debezium/src/test/java/org/apache/camel/quarkus/component/debezium/common/it/mongodb/DebeziumMongodbTestResource.java new file mode 100644 index 0000000..f604aa4 --- /dev/null +++ b/integration-tests/debezium/src/test/java/org/apache/camel/quarkus/component/debezium/common/it/mongodb/DebeziumMongodbTestResource.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.quarkus.component.debezium.common.it.mongodb; + +import java.nio.file.Files; +import java.nio.file.Paths; + +import org.apache.camel.quarkus.component.debezium.common.it.AbstractDebeziumTestResource; +import org.apache.camel.quarkus.component.debezium.common.it.Type; +import org.jboss.logging.Logger; +import org.testcontainers.containers.Container; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.Network; +import org.testcontainers.containers.wait.strategy.Wait; + +public class DebeziumMongodbTestResource extends AbstractDebeziumTestResource<GenericContainer> { + private static final Logger LOG = Logger.getLogger(DebeziumMongodbTestResource.class); + + private static final String PRIVATE_HOST = "mongodb_private"; + private static final String DB_USERNAME = "debezium"; + private static final String DB_PASSWORD = "dbz"; + private static int DB_PORT = 27017; + + public DebeziumMongodbTestResource() { + super(Type.mongodb); + } + + private Network net = Network.newNetwork();; + + @Override + protected GenericContainer createContainer() { + return new GenericContainer("mongo") + .withExposedPorts(DB_PORT) + .withCommand("--replSet", "my-mongo-set") + .withNetwork(net) + .withNetworkAliases(PRIVATE_HOST) + .waitingFor( + Wait.forLogMessage(".*Waiting for connections.*", 1)); + + } + + @Override + protected void startContainer() throws Exception { + super.startContainer(); + + execScriptInContainer("initMongodb.txt"); + } + + private void execScriptInContainer(String script) throws Exception { + String cmd = new String(Files.readAllBytes(Paths.get(getClass().getResource("/" + script).toURI()))); + String[] cmds = cmd.split("\\n\\n"); + + for (int i = 0; i < cmds.length; i++) { + Container.ExecResult er = container.execInContainer(new String[] { "mongo", "--eval", cmds[i] }); + } + } + + @Override + protected String getJdbcUrl() { + final String jdbcUrl = String.format("mongodb://%s:%s@%s:%d", DB_USERNAME, DB_PASSWORD, container.getHost(), + container.getMappedPort(DB_PORT)); + + return jdbcUrl; + } + + @Override + protected String getUsername() { + return DB_USERNAME; + } + + @Override + protected String getPassword() { + return DB_PASSWORD; + } + + @Override + protected int getPort() { + return DB_PORT; + } +} diff --git a/integration-tests/debezium/src/test/java/org/apache/camel/quarkus/component/debezium/common/it/sqlserver/DebeziumSqlserverTestResource.java b/integration-tests/debezium/src/test/java/org/apache/camel/quarkus/component/debezium/common/it/sqlserver/DebeziumSqlserverTestResource.java index 2fee946..e32258d 100644 --- a/integration-tests/debezium/src/test/java/org/apache/camel/quarkus/component/debezium/common/it/sqlserver/DebeziumSqlserverTestResource.java +++ b/integration-tests/debezium/src/test/java/org/apache/camel/quarkus/component/debezium/common/it/sqlserver/DebeziumSqlserverTestResource.java @@ -29,6 +29,7 @@ import org.apache.camel.quarkus.component.debezium.common.it.DebeziumSqlserverRe import org.apache.camel.quarkus.component.debezium.common.it.Type; import org.jboss.logging.Logger; import org.testcontainers.containers.MSSQLServerContainer; +import org.testcontainers.containers.wait.strategy.Wait; public class DebeziumSqlserverTestResource extends AbstractDebeziumTestResource<MSSQLServerContainer> { private static final Logger LOG = Logger.getLogger(DebeziumSqlserverTestResource.class); @@ -44,7 +45,9 @@ public class DebeziumSqlserverTestResource extends AbstractDebeziumTestResource< @Override protected MSSQLServerContainer createContainer() { return new MSSQLServerContainer<>().withEnv(Collections.singletonMap("MSSQL_AGENT_ENABLED", "True")) - .withInitScript("initSqlserver.sql"); + .withInitScript("initSqlserver.sql") + .waitingFor( + Wait.forLogMessage(".*xp_sqlagent_notify.*", 1)); } @Override diff --git a/integration-tests/debezium/src/test/resources/initMongodb.txt b/integration-tests/debezium/src/test/resources/initMongodb.txt new file mode 100644 index 0000000..d9cfc06 --- /dev/null +++ b/integration-tests/debezium/src/test/resources/initMongodb.txt @@ -0,0 +1,36 @@ +rs.initiate( { + '_id' : 'my-mongo-set', + 'members' : [{ + '_id' : 0, + 'host' : 'mongodb_private:27017', + 'priority': 2}] + }); + +db.getSiblingDB('admin').runCommand( { + createRole: 'listDatabases', + privileges: [ { + resource: { cluster : true }, + actions: [ 'listDatabases'] + } ] , + roles: [] +}); + +db.getSiblingDB('admin').createUser({ + user: "debezium", + pwd:"dbz", + roles: [ { + role: "userAdminAnyDatabase", + db: "admin" + }, { + role: "dbAdminAnyDatabase", + db: "admin" + }, { + role: "readWriteAnyDatabase", + db:"admin" + }, { + role: "clusterAdmin", + db: "admin" + }] +}); + +db.test.insert({'name':'init'}) \ No newline at end of file diff --git a/pom.xml b/pom.xml index ab17719..1f33bbc 100644 --- a/pom.xml +++ b/pom.xml @@ -56,7 +56,7 @@ <commons-lang.version>2.6</commons-lang.version><!-- used by hbase, should be pretty stable as commons-lang is not developed actively anymore --> <commons-math3.version>3.6.1</commons-math3.version><!-- Mess in the transitive dependencies of Spark and hbase-testing-util --> <curator.version>4.3.0</curator.version><!-- Mess in the transitive dependencies of Spark, Zookeeper and other hadoop related components --> - <debezium.version>1.3.0.Final</debezium.version> + <debezium.version>1.4.0.Final</debezium.version> <derby.version>10.15.2.0</derby.version><!-- Spark --> <freemarker.version>2.3.30</freemarker.version> <fommil.netlib.core.version>1.1.2</fommil.netlib.core.version><!-- Mess in Weka transitive deps --> diff --git a/poms/bom-test/pom.xml b/poms/bom-test/pom.xml index 76ccd47..ad0f9d2 100644 --- a/poms/bom-test/pom.xml +++ b/poms/bom-test/pom.xml @@ -159,6 +159,11 @@ </dependency> <dependency> <groupId>org.testcontainers</groupId> + <artifactId>mongodb</artifactId> + <version>${testcontainers.version}</version> + </dependency> + <dependency> + <groupId>org.testcontainers</groupId> <artifactId>kafka</artifactId> <version>${testcontainers.version}</version> </dependency> diff --git a/poms/bom/pom.xml b/poms/bom/pom.xml index 60f6143..c531eab 100644 --- a/poms/bom/pom.xml +++ b/poms/bom/pom.xml @@ -705,12 +705,6 @@ <groupId>org.apache.camel</groupId> <artifactId>camel-debezium-mongodb</artifactId> <version>${camel.version}</version> - <exclusions> - <exclusion> - <groupId>org.mongodb</groupId> - <artifactId>*</artifactId> - </exclusion> - </exclusions> </dependency> <dependency> <groupId>org.apache.camel</groupId> @@ -5586,6 +5580,11 @@ </dependency> <dependency> <groupId>io.debezium</groupId> + <artifactId>debezium-connector-mongodb</artifactId> + <version>${debezium.version}</version> + </dependency> + <dependency> + <groupId>io.debezium</groupId> <artifactId>debezium-connector-mysql</artifactId> <version>${debezium.version}</version> </dependency>
