This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new c7dff63e8d3 [improve][io] Upgrade to Debezium 3.2.2 (#24712)
c7dff63e8d3 is described below
commit c7dff63e8d38bfdc7ac881b5fec8c1b486346e6e
Author: Enrique Fernández <[email protected]>
AuthorDate: Mon Sep 8 21:18:31 2025 +0200
[improve][io] Upgrade to Debezium 3.2.2 (#24712)
Co-authored-by: Lari Hotari <[email protected]>
---
pom.xml | 13 +++---
.../apache/pulsar/io/debezium/DebeziumSource.java | 20 +++++----
...tabaseHistory.java => PulsarSchemaHistory.java} | 48 +++++++++++-----------
.../org/apache/pulsar/io/debezium/SerDeUtils.java | 2 +-
...storyTest.java => PulsarSchemaHistoryTest.java} | 40 +++++++++---------
.../io/debezium/mongodb/DebeziumMongoDbSource.java | 7 ++++
.../resources/debezium-mongodb-source-config.yaml | 5 ++-
.../io/debezium/mssql/DebeziumMsSqlSource.java | 7 ++++
.../resources/debezium-mssql-source-config.yaml | 7 ++--
pulsar-io/debezium/mysql/pom.xml | 4 +-
.../io/debezium/mysql/DebeziumMysqlSource.java | 7 ++++
.../resources/debezium-mysql-source-config.yaml | 7 ++--
.../io/debezium/oracle/DebeziumOracleSource.java | 7 ++++
.../resources/debezium-oracle-source-config.yaml | 5 ++-
pulsar-io/debezium/pom.xml | 40 ------------------
.../debezium/postgres/DebeziumPostgresSource.java | 7 ++++
.../resources/debezium-postgres-source-config.yaml | 7 ++--
pulsar-io/mongo/pom.xml | 2 +-
.../apache/pulsar/io/mongodb/MongoSinkTest.java | 19 ++++++++-
.../apache/pulsar/io/mongodb/MongoSourceTest.java | 12 ++++--
src/owasp-dependency-check-suppressions.xml | 43 -------------------
tests/integration/pom.xml | 2 +-
.../containers/DebeziumMongoDbContainer.java | 2 +-
.../containers/DebeziumMySQLContainer.java | 2 +-
.../containers/DebeziumPostgreSqlContainer.java | 2 +-
.../io/sources/PulsarIOSourceRunner.java | 15 +++++++
.../tests/integration/io/sources/SourceTester.java | 2 +
.../debezium/DebeziumMongoDbSourceTester.java | 22 +++++-----
.../debezium/DebeziumMsSqlSourceTester.java | 14 ++++---
.../debezium/DebeziumMySqlSourceTester.java | 7 ++--
.../debezium/DebeziumOracleDbSourceTester.java | 6 ++-
.../debezium/DebeziumPostgreSqlSourceTester.java | 10 ++---
.../debezium/PulsarDebeziumSourcesTest.java | 4 +-
tests/scripts/pre-integ-tests.sh | 4 +-
34 files changed, 202 insertions(+), 199 deletions(-)
diff --git a/pom.xml b/pom.xml
index f69f4f3ce86..8c0d4e952db 100644
--- a/pom.xml
+++ b/pom.xml
@@ -240,19 +240,16 @@ flexible messaging model and an intuitive client
API.</description>
<jclouds.version>2.6.0</jclouds.version>
<guice.version>5.1.0</guice.version>
<sqlite-jdbc.version>3.47.1.0</sqlite-jdbc.version>
- <mysql-jdbc.version>8.0.11</mysql-jdbc.version>
- <postgresql-jdbc.version>42.5.5</postgresql-jdbc.version>
+ <postgresql-jdbc.version>42.7.7</postgresql-jdbc.version>
<clickhouse-jdbc.version>0.4.6</clickhouse-jdbc.version>
- <mariadb-jdbc.version>2.7.5</mariadb-jdbc.version>
+ <mariadb-jdbc.version>3.5.5</mariadb-jdbc.version>
<openmldb-jdbc.version>0.4.4-hotfix1</openmldb-jdbc.version>
<json-smart.version>2.5.2</json-smart.version>
<opensearch.version>2.16.0</opensearch.version>
<elasticsearch-java.version>8.15.3</elasticsearch-java.version>
- <debezium.version>1.9.7.Final</debezium.version>
- <debezium.postgresql.version>42.5.5</debezium.postgresql.version>
- <debezium.mysql.version>8.0.33</debezium.mysql.version>
- <!-- Override version that brings CVE-2022-3143 with debezium -->
- <wildfly-elytron.version>1.15.16.Final</wildfly-elytron.version>
+ <debezium.version>3.2.2.Final</debezium.version>
+
<debezium.postgresql.version>${postgresql-jdbc.version}</debezium.postgresql.version>
+ <debezium.mysql.version>9.4.0</debezium.mysql.version>
<jsonwebtoken.version>0.11.1</jsonwebtoken.version>
<opencensus.version>0.28.0</opencensus.version>
<hadoop3.version>3.4.0</hadoop3.version>
diff --git
a/pulsar-io/debezium/core/src/main/java/org/apache/pulsar/io/debezium/DebeziumSource.java
b/pulsar-io/debezium/core/src/main/java/org/apache/pulsar/io/debezium/DebeziumSource.java
index 6c422c4f036..f0eaf2938d8 100644
---
a/pulsar-io/debezium/core/src/main/java/org/apache/pulsar/io/debezium/DebeziumSource.java
+++
b/pulsar-io/debezium/core/src/main/java/org/apache/pulsar/io/debezium/DebeziumSource.java
@@ -30,7 +30,7 @@ import
org.apache.pulsar.io.kafka.connect.PulsarKafkaWorkerConfig;
@Slf4j
public abstract class DebeziumSource extends KafkaConnectSource {
private static final String DEFAULT_CONVERTER =
"org.apache.kafka.connect.json.JsonConverter";
- private static final String DEFAULT_HISTORY =
"org.apache.pulsar.io.debezium.PulsarDatabaseHistory";
+ private static final String DEFAULT_HISTORY =
"org.apache.pulsar.io.debezium.PulsarSchemaHistory";
private static final String DEFAULT_OFFSET_TOPIC = "debezium-offset-topic";
private static final String DEFAULT_HISTORY_TOPIC =
"debezium-history-topic";
@@ -76,6 +76,8 @@ public abstract class DebeziumSource extends
KafkaConnectSource {
public abstract void setDbConnectorTask(Map<String, Object> config) throws
Exception;
+ public abstract void setDbConnectorClass(Map<String, Object> config)
throws Exception;
+
@Override
public void open(Map<String, Object> config, SourceContext sourceContext)
throws Exception {
setDbConnectorTask(config);
@@ -87,28 +89,28 @@ public abstract class DebeziumSource extends
KafkaConnectSource {
// value.converter
setConfigIfNull(config,
PulsarKafkaWorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, DEFAULT_CONVERTER);
- // database.history : implementation class for database history.
- setConfigIfNull(config,
HistorizedRelationalDatabaseConnectorConfig.DATABASE_HISTORY.name(),
DEFAULT_HISTORY);
+ // schema.history : implementation class for schema history.
+ setConfigIfNull(config,
HistorizedRelationalDatabaseConnectorConfig.SCHEMA_HISTORY.name(),
DEFAULT_HISTORY);
- // database.history.pulsar.service.url
- String pulsarUrl = (String)
config.get(PulsarDatabaseHistory.SERVICE_URL.name());
+ // schema.history.internal.pulsar.service.url
+ String pulsarUrl = (String)
config.get(PulsarSchemaHistory.SERVICE_URL.name());
String topicNamespace = topicNamespace(sourceContext);
// topic.namespace
setConfigIfNull(config,
PulsarKafkaWorkerConfig.TOPIC_NAMESPACE_CONFIG, topicNamespace);
String sourceName = sourceContext.getSourceName();
- // database.history.pulsar.topic: history topic name
- setConfigIfNull(config, PulsarDatabaseHistory.TOPIC.name(),
+ // schema.history.internal.pulsar.topic: history topic name
+ setConfigIfNull(config, PulsarSchemaHistory.TOPIC.name(),
topicNamespace + "/" + sourceName + "-" + DEFAULT_HISTORY_TOPIC);
// offset.storage.topic: offset topic name
setConfigIfNull(config,
PulsarKafkaWorkerConfig.OFFSET_STORAGE_TOPIC_CONFIG,
topicNamespace + "/" + sourceName + "-" + DEFAULT_OFFSET_TOPIC);
- // pass pulsar.client.builder if database.history.pulsar.service.url
is not provided
+ // pass pulsar.client.builder if
schema.history.internal.pulsar.service.url is not provided
if (StringUtils.isEmpty(pulsarUrl)) {
String pulsarClientBuilder =
SerDeUtils.serialize(sourceContext.getPulsarClientBuilder());
- config.put(PulsarDatabaseHistory.CLIENT_BUILDER.name(),
pulsarClientBuilder);
+ config.put(PulsarSchemaHistory.CLIENT_BUILDER.name(),
pulsarClientBuilder);
}
super.open(config, sourceContext);
diff --git
a/pulsar-io/debezium/core/src/main/java/org/apache/pulsar/io/debezium/PulsarDatabaseHistory.java
b/pulsar-io/debezium/core/src/main/java/org/apache/pulsar/io/debezium/PulsarSchemaHistory.java
similarity index 86%
rename from
pulsar-io/debezium/core/src/main/java/org/apache/pulsar/io/debezium/PulsarDatabaseHistory.java
rename to
pulsar-io/debezium/core/src/main/java/org/apache/pulsar/io/debezium/PulsarSchemaHistory.java
index 7ca0d309cf9..8ae8c663a8e 100644
---
a/pulsar-io/debezium/core/src/main/java/org/apache/pulsar/io/debezium/PulsarDatabaseHistory.java
+++
b/pulsar-io/debezium/core/src/main/java/org/apache/pulsar/io/debezium/PulsarSchemaHistory.java
@@ -26,12 +26,12 @@ import io.debezium.annotation.ThreadSafe;
import io.debezium.config.Configuration;
import io.debezium.config.Field;
import io.debezium.document.DocumentReader;
-import io.debezium.relational.history.AbstractDatabaseHistory;
-import io.debezium.relational.history.DatabaseHistory;
-import io.debezium.relational.history.DatabaseHistoryException;
-import io.debezium.relational.history.DatabaseHistoryListener;
+import io.debezium.relational.history.AbstractSchemaHistory;
import io.debezium.relational.history.HistoryRecord;
import io.debezium.relational.history.HistoryRecordComparator;
+import io.debezium.relational.history.SchemaHistory;
+import io.debezium.relational.history.SchemaHistoryException;
+import io.debezium.relational.history.SchemaHistoryListener;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
@@ -52,15 +52,15 @@ import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.Schema;
/**
- * A {@link DatabaseHistory} implementation that records schema changes as
normal pulsar messages on the specified
+ * A {@link SchemaHistory} implementation that records schema changes as
normal pulsar messages on the specified
* topic, and that recovers the history by establishing a Kafka Consumer
re-processing all messages on that topic.
*/
@Slf4j
@ThreadSafe
-public final class PulsarDatabaseHistory extends AbstractDatabaseHistory {
+public final class PulsarSchemaHistory extends AbstractSchemaHistory {
public static final Field TOPIC =
Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "pulsar.topic")
- .withDisplayName("Database history topic name")
+ .withDisplayName("Schema history topic name")
.withType(Type.STRING)
.withWidth(Width.LONG)
.withImportance(Importance.HIGH)
@@ -97,7 +97,7 @@ public final class PulsarDatabaseHistory extends
AbstractDatabaseHistory {
TOPIC,
SERVICE_URL,
CLIENT_BUILDER,
- DatabaseHistory.NAME,
+ SchemaHistory.NAME,
READER_CONFIG);
private final ObjectMapper mapper = new ObjectMapper();
@@ -113,7 +113,7 @@ public final class PulsarDatabaseHistory extends
AbstractDatabaseHistory {
public void configure(
Configuration config,
HistoryRecordComparator comparator,
- DatabaseHistoryListener listener,
+ SchemaHistoryListener listener,
boolean useCatalogBeforeSchema) {
super.configure(config, comparator, listener, useCatalogBeforeSchema);
if (!config.validateAndRecord(ALL_FIELDS, logger::error)) {
@@ -148,9 +148,9 @@ public final class PulsarDatabaseHistory extends
AbstractDatabaseHistory {
}
// Copy the relevant portions of the configuration and add useful
defaults ...
- this.dbHistoryName = config.getString(DatabaseHistory.NAME,
UUID.randomUUID().toString());
+ this.dbHistoryName = config.getString(SchemaHistory.NAME,
UUID.randomUUID().toString());
- log.info("Configure to store the debezium database history {} to
pulsar topic {}",
+ log.info("Configure to store the debezium schema history {} to pulsar
topic {}",
dbHistoryName, topicName);
}
@@ -163,7 +163,7 @@ public final class PulsarDatabaseHistory extends
AbstractDatabaseHistory {
p.send("");
} catch (PulsarClientException pce) {
log.error("Failed to initialize storage", pce);
- throw new RuntimeException("Failed to initialize storage", pce);
+ throw new SchemaHistoryException("Failed to initialize storage",
pce);
}
}
@@ -172,7 +172,7 @@ public final class PulsarDatabaseHistory extends
AbstractDatabaseHistory {
try {
pulsarClient = clientBuilder.build();
} catch (PulsarClientException e) {
- throw new RuntimeException("Failed to create pulsar client to
pulsar cluster", e);
+ throw new SchemaHistoryException("Failed to create pulsar
client to pulsar cluster", e);
}
}
}
@@ -201,18 +201,18 @@ public final class PulsarDatabaseHistory extends
AbstractDatabaseHistory {
}
@Override
- protected void storeRecord(HistoryRecord record) throws
DatabaseHistoryException {
+ protected void storeRecord(HistoryRecord record) throws
SchemaHistoryException {
if (this.producer == null) {
throw new IllegalStateException("No producer is available. Ensure
that 'start()'"
- + " is called before storing database history records.");
+ + " is called before storing schema history records.");
}
if (log.isTraceEnabled()) {
- log.trace("Storing record into database history: {}", record);
+ log.trace("Storing record into schema history: {}", record);
}
try {
producer.send(record.toString());
} catch (PulsarClientException e) {
- throw new DatabaseHistoryException(e);
+ throw new SchemaHistoryException(e);
}
}
@@ -242,7 +242,7 @@ public final class PulsarDatabaseHistory extends
AbstractDatabaseHistory {
protected void recoverRecords(Consumer<HistoryRecord> records) {
setupClientIfNeeded();
try (Reader<String> historyReader = createHistoryReader()) {
- log.info("Scanning the database history topic '{}'", topicName);
+ log.info("Scanning the schema history topic '{}'", topicName);
// Read all messages in the topic ...
MessageId lastProcessedMessageId = null;
@@ -255,15 +255,15 @@ public final class PulsarDatabaseHistory extends
AbstractDatabaseHistory {
if (!isBlank(msg.getValue())) {
HistoryRecord recordObj = new
HistoryRecord(reader.read(msg.getValue()));
if (log.isTraceEnabled()) {
- log.trace("Recovering database history: {}",
recordObj);
+ log.trace("Recovering schema history: {}",
recordObj);
}
if (!recordObj.isValid()) {
- log.warn("Skipping invalid database history
record '{}'. This is often not an issue,"
+ log.warn("Skipping invalid schema history
record '{}'. This is often not an issue,"
+ " but if it happens
repeatedly please check the '{}' topic.",
recordObj, topicName);
} else {
records.accept(recordObj);
- log.trace("Recovered database history: {}",
recordObj);
+ log.trace("Recovered schema history: {}",
recordObj);
}
}
lastProcessedMessageId = msg.getMessageId();
@@ -274,7 +274,7 @@ public final class PulsarDatabaseHistory extends
AbstractDatabaseHistory {
throw e;
}
}
- log.info("Successfully completed scanning the database history
topic '{}'", topicName);
+ log.info("Successfully completed scanning the schema history topic
'{}'", topicName);
} catch (IOException ioe) {
log.error("Encountered issues on recovering history records", ioe);
throw new RuntimeException("Encountered issues on recovering
history records", ioe);
@@ -287,8 +287,8 @@ public final class PulsarDatabaseHistory extends
AbstractDatabaseHistory {
try (Reader<String> historyReader = createHistoryReader()) {
return historyReader.hasMessageAvailable();
} catch (IOException e) {
- log.error("Encountered issues on checking existence of database
history", e);
- throw new RuntimeException("Encountered issues on checking
existence of database history", e);
+ log.error("Encountered issues on checking existence of schema
history", e);
+ throw new RuntimeException("Encountered issues on checking
existence of schema history", e);
}
}
diff --git
a/pulsar-io/debezium/core/src/main/java/org/apache/pulsar/io/debezium/SerDeUtils.java
b/pulsar-io/debezium/core/src/main/java/org/apache/pulsar/io/debezium/SerDeUtils.java
index 3d8dd3bd08d..156b19b3f76 100644
---
a/pulsar-io/debezium/core/src/main/java/org/apache/pulsar/io/debezium/SerDeUtils.java
+++
b/pulsar-io/debezium/core/src/main/java/org/apache/pulsar/io/debezium/SerDeUtils.java
@@ -37,7 +37,7 @@ public class SerDeUtils {
return ois.readObject();
} catch (Exception e) {
throw new RuntimeException(
- "Failed to initialize the pulsar client to store debezium
database history", e);
+ "Failed to initialize the pulsar client to store debezium
schema history", e);
}
}
diff --git
a/pulsar-io/debezium/core/src/test/java/org/apache/pulsar/io/debezium/PulsarDatabaseHistoryTest.java
b/pulsar-io/debezium/core/src/test/java/org/apache/pulsar/io/debezium/PulsarSchemaHistoryTest.java
similarity index 87%
rename from
pulsar-io/debezium/core/src/test/java/org/apache/pulsar/io/debezium/PulsarDatabaseHistoryTest.java
rename to
pulsar-io/debezium/core/src/test/java/org/apache/pulsar/io/debezium/PulsarSchemaHistoryTest.java
index 4d076ebbc0d..c593b464c4a 100644
---
a/pulsar-io/debezium/core/src/test/java/org/apache/pulsar/io/debezium/PulsarDatabaseHistoryTest.java
+++
b/pulsar-io/debezium/core/src/test/java/org/apache/pulsar/io/debezium/PulsarSchemaHistoryTest.java
@@ -26,8 +26,8 @@ import io.debezium.config.Configuration;
import io.debezium.connector.mysql.antlr.MySqlAntlrDdlParser;
import io.debezium.relational.Tables;
import io.debezium.relational.ddl.DdlParser;
-import io.debezium.relational.history.DatabaseHistory;
-import io.debezium.relational.history.DatabaseHistoryListener;
+import io.debezium.relational.history.SchemaHistory;
+import io.debezium.relational.history.SchemaHistoryListener;
import io.debezium.text.ParsingException;
import io.debezium.util.Collect;
import java.io.ByteArrayOutputStream;
@@ -46,11 +46,11 @@ import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
/**
- * Test the implementation of {@link PulsarDatabaseHistory}.
+ * Test the implementation of {@link PulsarSchemaHistory}.
*/
-public class PulsarDatabaseHistoryTest extends ProducerConsumerBase {
+public class PulsarSchemaHistoryTest extends ProducerConsumerBase {
- private PulsarDatabaseHistory history;
+ private PulsarSchemaHistory history;
private Map<String, Object> position;
private Map<String, String> source;
private String topicName;
@@ -65,7 +65,7 @@ public class PulsarDatabaseHistoryTest extends
ProducerConsumerBase {
source = Collect.hashMapOf("server", "my-server");
setLogPosition(0);
this.topicName = "persistent://my-property/my-ns/schema-changes-topic";
- this.history = new PulsarDatabaseHistory();
+ this.history = new PulsarSchemaHistory();
}
@AfterMethod(alwaysRun = true)
@@ -78,9 +78,9 @@ public class PulsarDatabaseHistoryTest extends
ProducerConsumerBase {
private void testHistoryTopicContent(boolean skipUnparseableDDL, boolean
testWithClientBuilder,
boolean testWithReaderConfig) throws
Exception {
Configuration.Builder configBuidler = Configuration.create()
- .with(PulsarDatabaseHistory.TOPIC, topicName)
- .with(DatabaseHistory.NAME, "my-db-history")
- .with(DatabaseHistory.SKIP_UNPARSEABLE_DDL_STATEMENTS,
skipUnparseableDDL);
+ .with(PulsarSchemaHistory.TOPIC, topicName)
+ .with(SchemaHistory.NAME, "my-db-history")
+ .with(SchemaHistory.SKIP_UNPARSEABLE_DDL_STATEMENTS,
skipUnparseableDDL);
if (testWithClientBuilder) {
ClientBuilder builder =
PulsarClient.builder().serviceUrl(brokerUrl.toString());
@@ -89,18 +89,18 @@ public class PulsarDatabaseHistoryTest extends
ProducerConsumerBase {
oos.writeObject(builder);
oos.flush();
byte[] data = bao.toByteArray();
- configBuidler.with(PulsarDatabaseHistory.CLIENT_BUILDER,
Base64.getEncoder().encodeToString(data));
+ configBuidler.with(PulsarSchemaHistory.CLIENT_BUILDER,
Base64.getEncoder().encodeToString(data));
}
} else {
- configBuidler.with(PulsarDatabaseHistory.SERVICE_URL,
brokerUrl.toString());
+ configBuidler.with(PulsarSchemaHistory.SERVICE_URL,
brokerUrl.toString());
}
if (testWithReaderConfig) {
- configBuidler.with(PulsarDatabaseHistory.READER_CONFIG,
"{\"subscriptionName\":\"my-subscription\"}");
+ configBuidler.with(PulsarSchemaHistory.READER_CONFIG,
"{\"subscriptionName\":\"my-subscription\"}");
}
// Start up the history ...
- history.configure(configBuidler.build(), null,
DatabaseHistoryListener.NOOP, true);
+ history.configure(configBuidler.build(), null,
SchemaHistoryListener.NOOP, true);
history.start();
// Should be able to call start more than once ...
@@ -159,8 +159,8 @@ public class PulsarDatabaseHistoryTest extends
ProducerConsumerBase {
// Stop the history (which should stop the producer) ...
history.stop();
- history = new PulsarDatabaseHistory();
- history.configure(configBuidler.build(), null,
DatabaseHistoryListener.NOOP, true);
+ history = new PulsarSchemaHistory();
+ history.configure(configBuidler.build(), null,
SchemaHistoryListener.NOOP, true);
// no need to start
// Recover from the very beginning to just past the first change ...
@@ -244,13 +244,13 @@ public class PulsarDatabaseHistoryTest extends
ProducerConsumerBase {
// Set history to use dummy topic
Configuration config = Configuration.create()
- .with(PulsarDatabaseHistory.SERVICE_URL, brokerUrl.toString())
- .with(PulsarDatabaseHistory.TOPIC,
"persistent://my-property/my-ns/dummytopic")
- .with(DatabaseHistory.NAME, "my-db-history")
- .with(DatabaseHistory.SKIP_UNPARSEABLE_DDL_STATEMENTS, true)
+ .with(PulsarSchemaHistory.SERVICE_URL, brokerUrl.toString())
+ .with(PulsarSchemaHistory.TOPIC,
"persistent://my-property/my-ns/dummytopic")
+ .with(SchemaHistory.NAME, "my-db-history")
+ .with(SchemaHistory.SKIP_UNPARSEABLE_DDL_STATEMENTS, true)
.build();
- history.configure(config, null, DatabaseHistoryListener.NOOP, true);
+ history.configure(config, null, SchemaHistoryListener.NOOP, true);
history.start();
// dummytopic should not exist yet
diff --git
a/pulsar-io/debezium/mongodb/src/main/java/org/apache/pulsar/io/debezium/mongodb/DebeziumMongoDbSource.java
b/pulsar-io/debezium/mongodb/src/main/java/org/apache/pulsar/io/debezium/mongodb/DebeziumMongoDbSource.java
index 88acbc61c77..996aa5ac725 100644
---
a/pulsar-io/debezium/mongodb/src/main/java/org/apache/pulsar/io/debezium/mongodb/DebeziumMongoDbSource.java
+++
b/pulsar-io/debezium/mongodb/src/main/java/org/apache/pulsar/io/debezium/mongodb/DebeziumMongoDbSource.java
@@ -19,6 +19,7 @@
package org.apache.pulsar.io.debezium.mongodb;
import java.util.Map;
+import org.apache.kafka.connect.runtime.ConnectorConfig;
import org.apache.kafka.connect.runtime.TaskConfig;
import org.apache.pulsar.io.core.SourceContext;
import org.apache.pulsar.io.debezium.DebeziumSource;
@@ -27,8 +28,14 @@ import org.apache.pulsar.io.debezium.DebeziumSource;
* A pulsar source that runs debezium mongodb source.
*/
public class DebeziumMongoDbSource extends DebeziumSource {
+ private static final String DEFAULT_CONNECTOR =
"io.debezium.connector.mongodb.MongoDbConnector";
private static final String DEFAULT_TASK =
"io.debezium.connector.mongodb.MongoDbConnectorTask";
+ @Override
+ public void setDbConnectorClass(Map<String, Object> config) throws
Exception {
+ throwExceptionIfConfigNotMatch(config,
ConnectorConfig.CONNECTOR_CLASS_CONFIG, DEFAULT_CONNECTOR);
+ }
+
@Override
public void setDbConnectorTask(Map<String, Object> config) throws
Exception {
throwExceptionIfConfigNotMatch(config, TaskConfig.TASK_CLASS_CONFIG,
DEFAULT_TASK);
diff --git
a/pulsar-io/debezium/mongodb/src/main/resources/debezium-mongodb-source-config.yaml
b/pulsar-io/debezium/mongodb/src/main/resources/debezium-mongodb-source-config.yaml
index 8c1564bf4e6..54e0bbd7580 100644
---
a/pulsar-io/debezium/mongodb/src/main/resources/debezium-mongodb-source-config.yaml
+++
b/pulsar-io/debezium/mongodb/src/main/resources/debezium-mongodb-source-config.yaml
@@ -28,10 +28,11 @@ parallelism: 1
configs:
## config for pg, docker image: debezium/example-mongodb:0.8
mongodb.hosts: "rs0/mongodb:27017"
- mongodb.name: "dbserver1"
mongodb.user: "debezium"
mongodb.password: "dbz"
mongodb.task.id: "1"
database.whitelist: "inventory"
+ topic.prefix: "dbserver1"
- database.history.pulsar.service.url: "pulsar://127.0.0.1:6650"
+ schema.history.internal.pulsar.service.url: "pulsar://127.0.0.1:6650"
+ connector.class: "io.debezium.connector.mongodb.MongoDbConnector"
diff --git
a/pulsar-io/debezium/mssql/src/main/java/org/apache/pulsar/io/debezium/mssql/DebeziumMsSqlSource.java
b/pulsar-io/debezium/mssql/src/main/java/org/apache/pulsar/io/debezium/mssql/DebeziumMsSqlSource.java
index f60653e7aae..0df5eb6444a 100644
---
a/pulsar-io/debezium/mssql/src/main/java/org/apache/pulsar/io/debezium/mssql/DebeziumMsSqlSource.java
+++
b/pulsar-io/debezium/mssql/src/main/java/org/apache/pulsar/io/debezium/mssql/DebeziumMsSqlSource.java
@@ -19,6 +19,7 @@
package org.apache.pulsar.io.debezium.mssql;
import java.util.Map;
+import org.apache.kafka.connect.runtime.ConnectorConfig;
import org.apache.kafka.connect.runtime.TaskConfig;
import org.apache.pulsar.io.debezium.DebeziumSource;
@@ -27,8 +28,14 @@ import org.apache.pulsar.io.debezium.DebeziumSource;
* A pulsar source that runs debezium mssql source.
*/
public class DebeziumMsSqlSource extends DebeziumSource {
+ private static final String DEFAULT_CONNECTOR =
"io.debezium.connector.sqlserver.SqlServerConnector";
private static final String DEFAULT_TASK =
"io.debezium.connector.sqlserver.SqlServerConnectorTask";
+ @Override
+ public void setDbConnectorClass(Map<String, Object> config) throws
Exception {
+ throwExceptionIfConfigNotMatch(config,
ConnectorConfig.CONNECTOR_CLASS_CONFIG, DEFAULT_CONNECTOR);
+ }
+
@Override
public void setDbConnectorTask(Map<String, Object> config) throws
Exception {
throwExceptionIfConfigNotMatch(config, TaskConfig.TASK_CLASS_CONFIG,
DEFAULT_TASK);
diff --git
a/pulsar-io/debezium/mssql/src/main/resources/debezium-mssql-source-config.yaml
b/pulsar-io/debezium/mssql/src/main/resources/debezium-mssql-source-config.yaml
index c99fb48eab0..de09fedcb59 100644
---
a/pulsar-io/debezium/mssql/src/main/resources/debezium-mssql-source-config.yaml
+++
b/pulsar-io/debezium/mssql/src/main/resources/debezium-mssql-source-config.yaml
@@ -30,7 +30,8 @@ configs:
database.port: "1521"
database.user: "sa"
database.password: "MyP@ssword1"
- database.dbname: "MyDB"
- database.server.name: "mssql"
+ database.names: "MyDB"
+ topic.prefix: "mssql"
- database.history.pulsar.service.url: "pulsar://127.0.0.1:6650"
+ schema.history.internal.pulsar.service.url: "pulsar://127.0.0.1:6650"
+ connector.class: "io.debezium.connector.sqlserver.SqlServerConnector"
diff --git a/pulsar-io/debezium/mysql/pom.xml b/pulsar-io/debezium/mysql/pom.xml
index 7efc8d68655..536015c7913 100644
--- a/pulsar-io/debezium/mysql/pom.xml
+++ b/pulsar-io/debezium/mysql/pom.xml
@@ -55,8 +55,8 @@
<dependencyManagement>
<dependencies>
<dependency>
- <groupId>mysql</groupId>
- <artifactId>mysql-connector-java</artifactId>
+ <groupId>com.mysql</groupId>
+ <artifactId>mysql-connector-j</artifactId>
<version>${debezium.mysql.version}</version>
</dependency>
</dependencies>
diff --git
a/pulsar-io/debezium/mysql/src/main/java/org/apache/pulsar/io/debezium/mysql/DebeziumMysqlSource.java
b/pulsar-io/debezium/mysql/src/main/java/org/apache/pulsar/io/debezium/mysql/DebeziumMysqlSource.java
index 633535512d2..7a89fa13193 100644
---
a/pulsar-io/debezium/mysql/src/main/java/org/apache/pulsar/io/debezium/mysql/DebeziumMysqlSource.java
+++
b/pulsar-io/debezium/mysql/src/main/java/org/apache/pulsar/io/debezium/mysql/DebeziumMysqlSource.java
@@ -19,6 +19,7 @@
package org.apache.pulsar.io.debezium.mysql;
import java.util.Map;
+import org.apache.kafka.connect.runtime.ConnectorConfig;
import org.apache.kafka.connect.runtime.TaskConfig;
import org.apache.pulsar.io.debezium.DebeziumSource;
@@ -26,8 +27,14 @@ import org.apache.pulsar.io.debezium.DebeziumSource;
* A pulsar source that runs debezium mysql source.
*/
public class DebeziumMysqlSource extends DebeziumSource {
+ private static final String DEFAULT_CONNECTOR =
"io.debezium.connector.mysql.MySqlConnector";
private static final String DEFAULT_TASK =
"io.debezium.connector.mysql.MySqlConnectorTask";
+ @Override
+ public void setDbConnectorClass(Map<String, Object> config) throws
Exception {
+ throwExceptionIfConfigNotMatch(config,
ConnectorConfig.CONNECTOR_CLASS_CONFIG, DEFAULT_CONNECTOR);
+ }
+
@Override
public void setDbConnectorTask(Map<String, Object> config) throws
Exception {
throwExceptionIfConfigNotMatch(config, TaskConfig.TASK_CLASS_CONFIG,
DEFAULT_TASK);
diff --git
a/pulsar-io/debezium/mysql/src/main/resources/debezium-mysql-source-config.yaml
b/pulsar-io/debezium/mysql/src/main/resources/debezium-mysql-source-config.yaml
index 7a51b0092ac..e278b2276aa 100644
---
a/pulsar-io/debezium/mysql/src/main/resources/debezium-mysql-source-config.yaml
+++
b/pulsar-io/debezium/mysql/src/main/resources/debezium-mysql-source-config.yaml
@@ -32,11 +32,12 @@ configs:
database.user: "debezium"
database.password: "dbz"
database.server.id: "184054"
- database.server.name: "dbserver1"
database.whitelist: "inventory"
+ topic.prefix: "dbserver1"
- database.history.pulsar.topic: "mysql-history-topic"
- database.history.pulsar.service.url: "pulsar://127.0.0.1:6650"
+ schema.history.internal.pulsar.topic: "mysql-history-topic"
+ schema.history.internal.pulsar.service.url: "pulsar://127.0.0.1:6650"
offset.storage.topic: "mysql-offset-topic"
+ connector.class: "io.debezium.connector.mysql.MySqlConnector"
diff --git
a/pulsar-io/debezium/oracle/src/main/java/org/apache/pulsar/io/debezium/oracle/DebeziumOracleSource.java
b/pulsar-io/debezium/oracle/src/main/java/org/apache/pulsar/io/debezium/oracle/DebeziumOracleSource.java
index cb7363ac8cd..7b17203760b 100644
---
a/pulsar-io/debezium/oracle/src/main/java/org/apache/pulsar/io/debezium/oracle/DebeziumOracleSource.java
+++
b/pulsar-io/debezium/oracle/src/main/java/org/apache/pulsar/io/debezium/oracle/DebeziumOracleSource.java
@@ -19,6 +19,7 @@
package org.apache.pulsar.io.debezium.oracle;
import java.util.Map;
+import org.apache.kafka.connect.runtime.ConnectorConfig;
import org.apache.kafka.connect.runtime.TaskConfig;
import org.apache.pulsar.io.debezium.DebeziumSource;
@@ -27,8 +28,14 @@ import org.apache.pulsar.io.debezium.DebeziumSource;
* A pulsar source that runs debezium oracle source.
*/
public class DebeziumOracleSource extends DebeziumSource {
+ private static final String DEFAULT_CONNECTOR =
"io.debezium.connector.oracle.OracleConnector";
private static final String DEFAULT_TASK =
"io.debezium.connector.oracle.OracleConnectorTask";
+ @Override
+ public void setDbConnectorClass(Map<String, Object> config) throws
Exception {
+ throwExceptionIfConfigNotMatch(config,
ConnectorConfig.CONNECTOR_CLASS_CONFIG, DEFAULT_CONNECTOR);
+ }
+
@Override
public void setDbConnectorTask(Map<String, Object> config) throws
Exception {
throwExceptionIfConfigNotMatch(config, TaskConfig.TASK_CLASS_CONFIG,
DEFAULT_TASK);
diff --git
a/pulsar-io/debezium/oracle/src/main/resources/debezium-oracle-source-config.yaml
b/pulsar-io/debezium/oracle/src/main/resources/debezium-oracle-source-config.yaml
index 94173d68a11..8955585979c 100644
---
a/pulsar-io/debezium/oracle/src/main/resources/debezium-oracle-source-config.yaml
+++
b/pulsar-io/debezium/oracle/src/main/resources/debezium-oracle-source-config.yaml
@@ -33,6 +33,7 @@ configs:
database.user: "sysdba"
database.password: "oracle"
database.dbname: "XE"
- database.server.name: "XE"
+ topic.prefix: "XE"
- database.history.pulsar.service.url: "pulsar://127.0.0.1:6650"
+ schema.history.internal.pulsar.service.url: "pulsar://127.0.0.1:6650"
+ connector.class: "io.debezium.connector.oracle.OracleConnector"
diff --git a/pulsar-io/debezium/pom.xml b/pulsar-io/debezium/pom.xml
index ebe54943e1f..ece17286e0f 100644
--- a/pulsar-io/debezium/pom.xml
+++ b/pulsar-io/debezium/pom.xml
@@ -31,46 +31,6 @@
<artifactId>pulsar-io-debezium</artifactId>
<name>Pulsar IO :: Debezium</name>
- <dependencyManagement>
- <dependencies>
- <dependency>
- <groupId>org.wildfly.security</groupId>
- <artifactId>wildfly-elytron-sasl-digest</artifactId>
- <version>${wildfly-elytron.version}</version>
- </dependency>
- <dependency>
- <groupId>org.wildfly.security</groupId>
- <artifactId>wildfly-elytron-sasl-external</artifactId>
- <version>${wildfly-elytron.version}</version>
- </dependency>
- <dependency>
- <groupId>org.wildfly.security</groupId>
- <artifactId>wildfly-elytron-sasl-gs2</artifactId>
- <version>${wildfly-elytron.version}</version>
- </dependency>
- <dependency>
- <groupId>org.wildfly.security</groupId>
- <artifactId>wildfly-elytron-sasl-oauth2</artifactId>
- <version>${wildfly-elytron.version}</version>
- </dependency>
- <dependency>
- <groupId>org.wildfly.security</groupId>
- <artifactId>wildfly-elytron-sasl-plain</artifactId>
- <version>${wildfly-elytron.version}</version>
- </dependency>
- <dependency>
- <groupId>org.wildfly.security</groupId>
- <artifactId>wildfly-elytron-sasl-scram</artifactId>
- <version>${wildfly-elytron.version}</version>
- </dependency>
- <dependency>
- <groupId>org.wildfly.security</groupId>
- <artifactId>wildfly-elytron-password-impl</artifactId>
- <version>${wildfly-elytron.version}</version>
- </dependency>
- </dependencies>
- </dependencyManagement>
-
<modules>
<module>core</module>
<module>mysql</module>
diff --git
a/pulsar-io/debezium/postgres/src/main/java/org/apache/pulsar/io/debezium/postgres/DebeziumPostgresSource.java
b/pulsar-io/debezium/postgres/src/main/java/org/apache/pulsar/io/debezium/postgres/DebeziumPostgresSource.java
index ef645aad6b1..54ab110edd0 100644
---
a/pulsar-io/debezium/postgres/src/main/java/org/apache/pulsar/io/debezium/postgres/DebeziumPostgresSource.java
+++
b/pulsar-io/debezium/postgres/src/main/java/org/apache/pulsar/io/debezium/postgres/DebeziumPostgresSource.java
@@ -19,6 +19,7 @@
package org.apache.pulsar.io.debezium.postgres;
import java.util.Map;
+import org.apache.kafka.connect.runtime.ConnectorConfig;
import org.apache.kafka.connect.runtime.TaskConfig;
import org.apache.pulsar.io.debezium.DebeziumSource;
@@ -27,8 +28,14 @@ import org.apache.pulsar.io.debezium.DebeziumSource;
* A pulsar source that runs debezium postgres source.
*/
public class DebeziumPostgresSource extends DebeziumSource {
+ private static final String DEFAULT_CONNECTOR =
"io.debezium.connector.postgresql.PostgresConnector";
private static final String DEFAULT_TASK =
"io.debezium.connector.postgresql.PostgresConnectorTask";
+ @Override
+ public void setDbConnectorClass(Map<String, Object> config) throws
Exception {
+ throwExceptionIfConfigNotMatch(config,
ConnectorConfig.CONNECTOR_CLASS_CONFIG, DEFAULT_CONNECTOR);
+ }
+
@Override
public void setDbConnectorTask(Map<String, Object> config) throws
Exception {
throwExceptionIfConfigNotMatch(config, TaskConfig.TASK_CLASS_CONFIG,
DEFAULT_TASK);
diff --git
a/pulsar-io/debezium/postgres/src/main/resources/debezium-postgres-source-config.yaml
b/pulsar-io/debezium/postgres/src/main/resources/debezium-postgres-source-config.yaml
index 3f6b7eaaba2..4048c73e53b 100644
---
a/pulsar-io/debezium/postgres/src/main/resources/debezium-postgres-source-config.yaml
+++
b/pulsar-io/debezium/postgres/src/main/resources/debezium-postgres-source-config.yaml
@@ -32,7 +32,8 @@ configs:
database.user: "postgres"
database.password: "postgres"
database.dbname: "postgres"
- database.server.name: "dbserver1"
- schema.whitelist: "inventory"
+ schema.allow.list: "inventory"
+ topic.prefix: "dbserver1"
- database.history.pulsar.service.url: "pulsar://127.0.0.1:6650"
+ schema.history.internal.pulsar.service.url: "pulsar://127.0.0.1:6650"
+ connector.class: "io.debezium.connector.postgresql.PostgresConnector"
diff --git a/pulsar-io/mongo/pom.xml b/pulsar-io/mongo/pom.xml
index 3666dbef54f..9306bf2355a 100644
--- a/pulsar-io/mongo/pom.xml
+++ b/pulsar-io/mongo/pom.xml
@@ -33,7 +33,7 @@
<name>Pulsar IO :: MongoDB</name>
<properties>
- <mongo-reactivestreams.version>4.1.2</mongo-reactivestreams.version>
+ <mongo-reactivestreams.version>5.2.0</mongo-reactivestreams.version>
</properties>
<dependencies>
diff --git
a/pulsar-io/mongo/src/test/java/org/apache/pulsar/io/mongodb/MongoSinkTest.java
b/pulsar-io/mongo/src/test/java/org/apache/pulsar/io/mongodb/MongoSinkTest.java
index e442ed0bcad..ea62e546abc 100644
---
a/pulsar-io/mongo/src/test/java/org/apache/pulsar/io/mongodb/MongoSinkTest.java
+++
b/pulsar-io/mongo/src/test/java/org/apache/pulsar/io/mongodb/MongoSinkTest.java
@@ -26,13 +26,22 @@ import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import com.mongodb.MongoBulkWriteException;
+import com.mongodb.ServerAddress;
import com.mongodb.bulk.BulkWriteError;
+import com.mongodb.bulk.BulkWriteInsert;
+import com.mongodb.bulk.BulkWriteResult;
+import com.mongodb.bulk.BulkWriteUpsert;
+import com.mongodb.bulk.WriteConcernError;
+import com.mongodb.internal.bulk.WriteRequest;
import com.mongodb.reactivestreams.client.MongoClient;
import com.mongodb.reactivestreams.client.MongoCollection;
import com.mongodb.reactivestreams.client.MongoDatabase;
import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.io.core.SinkContext;
import org.bson.BsonDocument;
@@ -97,7 +106,15 @@ public class MongoSinkTest {
if (throwBulkError) {
List<BulkWriteError> writeErrors = Arrays.asList(
new BulkWriteError(0, "error", new BsonDocument(), 1));
- exc = new MongoBulkWriteException(null, writeErrors, null,
null);
+ BulkWriteResult result = BulkWriteResult.acknowledged(
+ WriteRequest.Type.INSERT, 1, 0,
+ Collections.<BulkWriteUpsert>emptyList(),
+ Collections.<BulkWriteInsert>emptyList());
+ WriteConcernError writeConcernError = null;
+ ServerAddress serverAddress = new ServerAddress("localhost",
27017);
+ Set<String> errorLabels = new HashSet<>();
+ exc = new MongoBulkWriteException(result, writeErrors,
writeConcernError,
+ serverAddress, errorLabels);
}
subscriber.onError(exc);
return null;
diff --git
a/pulsar-io/mongo/src/test/java/org/apache/pulsar/io/mongodb/MongoSourceTest.java
b/pulsar-io/mongo/src/test/java/org/apache/pulsar/io/mongodb/MongoSourceTest.java
index 6da4b1123fd..7243b365e88 100644
---
a/pulsar-io/mongo/src/test/java/org/apache/pulsar/io/mongodb/MongoSourceTest.java
+++
b/pulsar-io/mongo/src/test/java/org/apache/pulsar/io/mongodb/MongoSourceTest.java
@@ -110,17 +110,21 @@ public class MongoSourceTest {
source.open(map, mockSourceContext);
- subscriber.onNext(new ChangeStreamDocument<>(
- OperationType.INSERT,
+ subscriber.onNext(new ChangeStreamDocument<Document>(
+ OperationType.INSERT.getValue(),
BsonDocument.parse("{token: true}"),
BsonDocument.parse("{db: \"hello\", coll: \"pulsar\"}"),
BsonDocument.parse("{db: \"hello2\", coll: \"pulsar2\"}"),
new Document("hello", "pulsar"),
+ new Document("hello", "pulsar"), // documentKey
BsonDocument.parse("{_id: 1}"),
new BsonTimestamp(1234, 2),
- null,
+ null, // UpdateDescription
new BsonInt64(1),
- BsonDocument.parse("{id: 1, uid: 1}")));
+ BsonDocument.parse("{id: 1, uid: 1}"),
+ null, // BsonDateTime
+ null, // SplitEvent
+ BsonDocument.parse("{extra: 1}")));
Record<byte[]> record = source.read();
diff --git a/src/owasp-dependency-check-suppressions.xml
b/src/owasp-dependency-check-suppressions.xml
index 1ce7392a489..f302c251f96 100644
--- a/src/owasp-dependency-check-suppressions.xml
+++ b/src/owasp-dependency-check-suppressions.xml
@@ -364,49 +364,6 @@
<cpe>cpe:/a:apache:solr</cpe>
</suppress>
- <!-- debezium-related misdetections -->
- <suppress>
- <notes><![CDATA[
- file name: debezium-connector-mysql-1.9.7.Final.jar
- ]]></notes>
- <sha1>74c623b4a9b231e2f0e8f6053b38abd3bc487ce2</sha1>
- <cve>CVE-2017-15945</cve>
- </suppress>
- <suppress>
- <notes><![CDATA[
- file name: mysql-binlog-connector-java-0.27.2.jar
- ]]></notes>
- <sha1>23294cd730e29c00b8ddfbde517dfc955aba090f</sha1>
- <cve>CVE-2017-15945</cve>
- </suppress>
- <suppress>
- <notes><![CDATA[
- file name: debezium-connector-postgres-1.9.7.Final.jar
- ]]></notes>
- <sha1>300ff0bbf795643e914b7c8a6d6ba812a8354d62</sha1>
- <cve>CVE-2015-0241</cve>
- <cve>CVE-2015-0242</cve>
- <cve>CVE-2015-0243</cve>
- <cve>CVE-2015-0244</cve>
- <cve>CVE-2015-3166</cve>
- <cve>CVE-2015-3167</cve>
- <cve>CVE-2016-0766</cve>
- <cve>CVE-2016-0768</cve>
- <cve>CVE-2016-0773</cve>
- <cve>CVE-2016-5423</cve>
- <cve>CVE-2016-5424</cve>
- <cve>CVE-2016-7048</cve>
- <cve>CVE-2017-14798</cve>
- <cve>CVE-2017-7484</cve>
- <cve>CVE-2018-1115</cve>
- <cve>CVE-2019-10127</cve>
- <cve>CVE-2019-10128</cve>
- <cve>CVE-2019-10210</cve>
- <cve>CVE-2019-10211</cve>
- <cve>CVE-2020-25694</cve>
- <cve>CVE-2020-25695</cve>
- <cve>CVE-2021-23214</cve>
- </suppress>
<suppress>
<notes><![CDATA[
file name: protostream-types-4.4.1.Final.jar
diff --git a/tests/integration/pom.xml b/tests/integration/pom.xml
index 0bf0c0186c7..fdad69444bb 100644
--- a/tests/integration/pom.xml
+++ b/tests/integration/pom.xml
@@ -34,7 +34,7 @@
<properties>
<integrationTestSuiteFile>pulsar.xml</integrationTestSuiteFile>
- <mongo-reactivestreams.version>4.1.2</mongo-reactivestreams.version>
+ <mongo-reactivestreams.version>5.2.0</mongo-reactivestreams.version>
<inttest.asyncprofiler.opts>event=cpu,lock=1ms,alloc=2m,jfrsync=profile</inttest.asyncprofiler.opts>
<inttest.asyncprofiler.outputformat>jfr</inttest.asyncprofiler.outputformat>
<inttest.asyncprofiler.dir></inttest.asyncprofiler.dir>
diff --git
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/DebeziumMongoDbContainer.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/DebeziumMongoDbContainer.java
index 481725d145b..6fa2e9ff471 100644
---
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/DebeziumMongoDbContainer.java
+++
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/DebeziumMongoDbContainer.java
@@ -25,7 +25,7 @@ public class DebeziumMongoDbContainer extends
ChaosContainer<DebeziumMongoDbCont
public static final String NAME = "debezium-mongodb-example";
public static final Integer[] PORTS = { 27017 };
- private static final String IMAGE_NAME = "debezium/example-mongodb:0.10";
+ private static final String IMAGE_NAME =
"debezium/example-mongodb:3.0.0.Final";
public DebeziumMongoDbContainer(String clusterName) {
super(clusterName, IMAGE_NAME);
diff --git
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/DebeziumMySQLContainer.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/DebeziumMySQLContainer.java
index 27d624a6c82..cf59cda868d 100644
---
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/DebeziumMySQLContainer.java
+++
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/DebeziumMySQLContainer.java
@@ -26,7 +26,7 @@ public class DebeziumMySQLContainer extends
ChaosContainer<DebeziumMySQLContaine
public static final String NAME = "debezium-mysql-example";
static final Integer[] PORTS = { 3306 };
- private static final String IMAGE_NAME = "debezium/example-mysql:0.8";
+ private static final String IMAGE_NAME =
"debezium/example-mysql:3.0.0.Final";
public DebeziumMySQLContainer(String clusterName) {
super(clusterName, IMAGE_NAME);
diff --git
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/DebeziumPostgreSqlContainer.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/DebeziumPostgreSqlContainer.java
index 479869c4183..4fd391fd926 100644
---
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/DebeziumPostgreSqlContainer.java
+++
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/DebeziumPostgreSqlContainer.java
@@ -26,7 +26,7 @@ public class DebeziumPostgreSqlContainer extends
ChaosContainer<DebeziumPostgreS
public static final String NAME = "debezium-postgresql-example";
static final Integer[] PORTS = { 5432 };
- private static final String IMAGE_NAME = "debezium/example-postgres:0.10";
+ private static final String IMAGE_NAME =
"debezium/example-postgres:3.0.0.Final";
public DebeziumPostgreSqlContainer(String clusterName) {
super(clusterName, IMAGE_NAME);
diff --git
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/PulsarIOSourceRunner.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/PulsarIOSourceRunner.java
index 92918a14006..7cd1c3ebf66 100644
---
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/PulsarIOSourceRunner.java
+++
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/PulsarIOSourceRunner.java
@@ -272,6 +272,21 @@ public class PulsarIOSourceRunner extends
PulsarIOTestRunner {
result.getStdout()
);
result.assertNoStderr();
+
+ final String[] packageCommands = {
+ PulsarCluster.ADMIN_SCRIPT,
+ "packages",
+ "delete",
+ "source://" + tenant + "/" + namespace + "/" + sourceName + "@0"
+ };
+
+ try {
+ ContainerExecResult packageResult =
pulsarCluster.getAnyWorker().execCmd(packageCommands);
+ log.info("Package metadata deletion result: {}",
packageResult.getStdout());
+ } catch (Exception e) {
+ log.warn("Failed to delete package metadata for
source://{}/{}/{}@0: {}",
+ tenant, namespace, sourceName, e.getMessage());
+ }
}
protected void getSourceInfoNotFound(String tenant, String namespace,
String sourceName) throws Exception {
diff --git
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/SourceTester.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/SourceTester.java
index 95b075e660a..bd2c95b5772 100644
---
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/SourceTester.java
+++
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/SourceTester.java
@@ -58,6 +58,8 @@ public abstract class SourceTester<ServiceContainerT extends
GenericContainer> i
add("source");
add("op");
add("ts_ms");
+ add("ts_us");
+ add("ts_ns");
add("transaction");
}};
diff --git
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumMongoDbSourceTester.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumMongoDbSourceTester.java
index 4c562f3e244..1826d78f351 100644
---
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumMongoDbSourceTester.java
+++
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumMongoDbSourceTester.java
@@ -42,15 +42,17 @@ public class DebeziumMongoDbSourceTester extends
SourceTester<DebeziumMongoDbCon
this.pulsarCluster = cluster;
pulsarServiceUrl = "pulsar://pulsar-proxy:" +
PulsarContainer.BROKER_PORT;
- sourceConfig.put("mongodb.hosts", "rs0/" +
DebeziumMongoDbContainer.NAME + ":27017");
- sourceConfig.put("mongodb.name", "dbserver1");
+ sourceConfig.put("mongodb.connection.string",
+ "mongodb://debezium:dbz@" + DebeziumMongoDbContainer.NAME +
":27017/admin?replicaSet=rs0");
sourceConfig.put("mongodb.user", "debezium");
sourceConfig.put("mongodb.password", "dbz");
sourceConfig.put("mongodb.task.id", "1");
+ sourceConfig.put("topic.prefix", "dbserver1");
sourceConfig.put("database.include.list", "inventory");
- sourceConfig.put("database.history.pulsar.service.url",
pulsarServiceUrl);
+ sourceConfig.put("schema.history.internal.pulsar.service.url",
pulsarServiceUrl);
sourceConfig.put("topic.namespace", "debezium/mongodb");
- sourceConfig.put("capture.mode", "oplog");
+ sourceConfig.put("capture.mode", "change_streams_update_full");
+ sourceConfig.put("connector.class",
"io.debezium.connector.mongodb.MongoDbConnector");
}
@Override
@@ -69,10 +71,10 @@ public class DebeziumMongoDbSourceTester extends
SourceTester<DebeziumMongoDbCon
@Override
public void prepareInsertEvent() throws Exception {
this.debeziumMongoDbContainer.execCmd("/bin/bash", "-c",
- "mongo -u debezium -p dbz --authenticationDatabase admin
localhost:27017/inventory "
+ "mongosh -u debezium -p dbz --authenticationDatabase admin
localhost:27017/inventory "
+ "--eval 'db.products.find()'");
this.debeziumMongoDbContainer.execCmd("/bin/bash", "-c",
- "mongo -u debezium -p dbz --authenticationDatabase admin
localhost:27017/inventory "
+ "mongosh -u debezium -p dbz --authenticationDatabase admin
localhost:27017/inventory "
+ "--eval 'db.products.insert({ "
+ "_id : NumberLong(\"110\"),"
+ "name : \"test-debezium\","
@@ -84,20 +86,20 @@ public class DebeziumMongoDbSourceTester extends
SourceTester<DebeziumMongoDbCon
@Override
public void prepareDeleteEvent() throws Exception {
this.debeziumMongoDbContainer.execCmd("/bin/bash", "-c",
- "mongo -u debezium -p dbz --authenticationDatabase admin
localhost:27017/inventory "
+ "mongosh -u debezium -p dbz --authenticationDatabase admin
localhost:27017/inventory "
+ "--eval 'db.products.find()'");
this.debeziumMongoDbContainer.execCmd("/bin/bash", "-c",
- "mongo -u debezium -p dbz --authenticationDatabase admin
localhost:27017/inventory "
+ "mongosh -u debezium -p dbz --authenticationDatabase admin
localhost:27017/inventory "
+ "--eval 'db.products.deleteOne({name :
\"test-debezium-update\"})'");
}
@Override
public void prepareUpdateEvent() throws Exception {
this.debeziumMongoDbContainer.execCmd("/bin/bash", "-c",
- "mongo -u debezium -p dbz --authenticationDatabase admin
localhost:27017/inventory "
+ "mongosh -u debezium -p dbz --authenticationDatabase admin
localhost:27017/inventory "
+ "--eval 'db.products.find()'");
this.debeziumMongoDbContainer.execCmd("/bin/bash", "-c",
- "mongo -u debezium -p dbz --authenticationDatabase admin
localhost:27017/inventory "
+ "mongosh -u debezium -p dbz --authenticationDatabase admin
localhost:27017/inventory "
+ "--eval 'db.products.update({"
+ "_id : 110},"
+ "{$set:{name:\"test-debezium-update\", description:
\"this is update description\"}})'");
diff --git
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumMsSqlSourceTester.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumMsSqlSourceTester.java
index 25a7544f52b..2295a670660 100644
---
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumMsSqlSourceTester.java
+++
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumMsSqlSourceTester.java
@@ -58,11 +58,15 @@ public class DebeziumMsSqlSourceTester extends
SourceTester<DebeziumMsSqlContain
sourceConfig.put("database.port", "1433");
sourceConfig.put("database.user", "sa");
sourceConfig.put("database.password",
DebeziumMsSqlContainer.SA_PASSWORD);
- sourceConfig.put("database.server.name", "mssql");
- sourceConfig.put("database.dbname", "TestDB");
+ sourceConfig.put("database.names", "TestDB");
+ sourceConfig.put("database.encrypt", "false");
sourceConfig.put("snapshot.mode", "schema_only");
- sourceConfig.put("database.history.pulsar.service.url",
pulsarServiceUrl);
+ sourceConfig.put("schema.history.internal.pulsar.topic",
"debezium-schema-history-mssql");
+ sourceConfig.put("schema.history.internal.pulsar.service.url",
pulsarServiceUrl);
+ sourceConfig.put("topic.prefix", "mssql");
sourceConfig.put("topic.namespace", "debezium/mssql");
+ sourceConfig.put("task.id", "1");
+ sourceConfig.put("connector.class",
"io.debezium.connector.sqlserver.SqlServerConnector");
}
@Override
@@ -145,12 +149,12 @@ public class DebeziumMsSqlSourceTester extends
SourceTester<DebeziumMsSqlContain
@Override
public String keyContains() {
- return "mssql.dbo.customers.Key";
+ return "TestDB";
}
@Override
public String valueContains() {
- return "mssql.dbo.customers.Value";
+ return "TestDB";
}
@Override
diff --git
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumMySqlSourceTester.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumMySqlSourceTester.java
index 2c457cd2fb9..7f9ccbf5d20 100644
---
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumMySqlSourceTester.java
+++
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumMySqlSourceTester.java
@@ -58,15 +58,16 @@ public class DebeziumMySqlSourceTester extends
SourceTester<DebeziumMySQLContain
sourceConfig.put("database.user", "debezium");
sourceConfig.put("database.password", "dbz");
sourceConfig.put("database.server.id", "184054");
- sourceConfig.put("database.server.name", "dbserver1");
- sourceConfig.put("database.whitelist", "inventory");
+ sourceConfig.put("topic.prefix", "dbserver1");
+ sourceConfig.put("database.include.list", "inventory");
if (!testWithClientBuilder) {
- sourceConfig.put("database.history.pulsar.service.url",
pulsarServiceUrl);
+ sourceConfig.put("schema.history.internal.pulsar.service.url",
pulsarServiceUrl);
}
sourceConfig.put("key.converter", converterClassName);
sourceConfig.put("value.converter", converterClassName);
sourceConfig.put("topic.namespace", "debezium/mysql-"
+ (converterClassName.endsWith("AvroConverter") ? "avro" :
"json"));
+ sourceConfig.put("connector.class",
"io.debezium.connector.mysql.MySqlConnector");
}
@Override
diff --git
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumOracleDbSourceTester.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumOracleDbSourceTester.java
index 41db0a7cc18..39b4361ee8a 100644
---
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumOracleDbSourceTester.java
+++
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumOracleDbSourceTester.java
@@ -58,13 +58,15 @@ public class DebeziumOracleDbSourceTester extends
SourceTester<DebeziumOracleDbC
sourceConfig.put("database.port", "1521");
sourceConfig.put("database.user", "dbzuser");
sourceConfig.put("database.password", "dbz");
- sourceConfig.put("database.server.name", "XE");
+ sourceConfig.put("topic.prefix", "XE");
sourceConfig.put("database.dbname", "XE");
sourceConfig.put("snapshot.mode", "schema_only");
sourceConfig.put("schema.include.list", "inv");
- sourceConfig.put("database.history.pulsar.service.url",
pulsarServiceUrl);
+ sourceConfig.put("schema.history.internal.pulsar.service.url",
pulsarServiceUrl);
sourceConfig.put("topic.namespace", "debezium/oracle");
+
+ sourceConfig.put("connector.class",
"io.debezium.connector.oracle.OracleConnector");
}
@Override
diff --git
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumPostgreSqlSourceTester.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumPostgreSqlSourceTester.java
index 75b071c0bd8..e23a5f1ad50 100644
---
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumPostgreSqlSourceTester.java
+++
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumPostgreSqlSourceTester.java
@@ -70,13 +70,13 @@ public class DebeziumPostgreSqlSourceTester extends
SourceTester<DebeziumPostgre
sourceConfig.put("database.port", "5432");
sourceConfig.put("database.user", "postgres");
sourceConfig.put("database.password", "postgres");
- sourceConfig.put("database.server.id", "184055");
- sourceConfig.put("database.server.name", "dbserver1");
+ sourceConfig.put("topic.prefix", "dbserver1");
sourceConfig.put("database.dbname", "postgres");
- sourceConfig.put("schema.whitelist", "inventory");
- sourceConfig.put("table.blacklist",
"inventory.spatial_ref_sys,inventory.geom");
- sourceConfig.put("database.history.pulsar.service.url",
pulsarServiceUrl);
+ sourceConfig.put("schema.include.list", "inventory");
+ sourceConfig.put("table.exclude.list",
"inventory.spatial_ref_sys,inventory.geom");
+ sourceConfig.put("schema.history.internal.pulsar.service.url",
pulsarServiceUrl);
sourceConfig.put("topic.namespace", "debezium/postgresql");
+ sourceConfig.put("connector.class",
"io.debezium.connector.postgresql.PostgresConnector");
}
@Override
diff --git
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/PulsarDebeziumSourcesTest.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/PulsarDebeziumSourcesTest.java
index 50160d94eef..eb229410391 100644
---
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/PulsarDebeziumSourcesTest.java
+++
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/PulsarDebeziumSourcesTest.java
@@ -91,7 +91,7 @@ public class PulsarDebeziumSourcesTest extends
PulsarIOTestBase {
+ "-" + functionRuntimeType + "-" + randomName(8);
// This is the binlog count that contained in mysql container.
- final int numMessages = 47;
+ final int numMessages = 52;
@Cleanup
PulsarClient client = PulsarClient.builder()
@@ -214,7 +214,7 @@ public class PulsarDebeziumSourcesTest extends
PulsarIOTestBase {
final String tenant = TopicName.PUBLIC_TENANT;
final String namespace = TopicName.DEFAULT_NAMESPACE;
final String outputTopicName = "debe-output-topic-name-" +
testId.getAndIncrement();
- final String consumeTopicName = "debezium/mssql/mssql.dbo.customers";
+ final String consumeTopicName =
"debezium/mssql/mssql.TestDB.dbo.customers";
final String sourceName = "test-source-debezium-mssql-" +
functionRuntimeType + "-" + randomName(8);
final int numMessages = 1;
diff --git a/tests/scripts/pre-integ-tests.sh b/tests/scripts/pre-integ-tests.sh
index 9f564a3f5f9..f8ba1bdc701 100755
--- a/tests/scripts/pre-integ-tests.sh
+++ b/tests/scripts/pre-integ-tests.sh
@@ -30,5 +30,5 @@ docker pull apachepulsar/s3mock:latest
docker pull alpine/socat:latest
docker pull cassandra:3
docker pull confluentinc/cp-kafka:4.0.0
-docker pull debezium/example-mysql:0.8
-docker pull mysql:5.7.22
+docker pull debezium/example-mysql:3.0.0.Final
+docker pull mysql:9.1.0