This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new c7dff63e8d3 [improve][io] Upgrade to Debezium 3.2.2 (#24712)
c7dff63e8d3 is described below

commit c7dff63e8d38bfdc7ac881b5fec8c1b486346e6e
Author: Enrique Fernández <[email protected]>
AuthorDate: Mon Sep 8 21:18:31 2025 +0200

    [improve][io] Upgrade to Debezium 3.2.2 (#24712)
    
    Co-authored-by: Lari Hotari <[email protected]>
---
 pom.xml                                            | 13 +++---
 .../apache/pulsar/io/debezium/DebeziumSource.java  | 20 +++++----
 ...tabaseHistory.java => PulsarSchemaHistory.java} | 48 +++++++++++-----------
 .../org/apache/pulsar/io/debezium/SerDeUtils.java  |  2 +-
 ...storyTest.java => PulsarSchemaHistoryTest.java} | 40 +++++++++---------
 .../io/debezium/mongodb/DebeziumMongoDbSource.java |  7 ++++
 .../resources/debezium-mongodb-source-config.yaml  |  5 ++-
 .../io/debezium/mssql/DebeziumMsSqlSource.java     |  7 ++++
 .../resources/debezium-mssql-source-config.yaml    |  7 ++--
 pulsar-io/debezium/mysql/pom.xml                   |  4 +-
 .../io/debezium/mysql/DebeziumMysqlSource.java     |  7 ++++
 .../resources/debezium-mysql-source-config.yaml    |  7 ++--
 .../io/debezium/oracle/DebeziumOracleSource.java   |  7 ++++
 .../resources/debezium-oracle-source-config.yaml   |  5 ++-
 pulsar-io/debezium/pom.xml                         | 40 ------------------
 .../debezium/postgres/DebeziumPostgresSource.java  |  7 ++++
 .../resources/debezium-postgres-source-config.yaml |  7 ++--
 pulsar-io/mongo/pom.xml                            |  2 +-
 .../apache/pulsar/io/mongodb/MongoSinkTest.java    | 19 ++++++++-
 .../apache/pulsar/io/mongodb/MongoSourceTest.java  | 12 ++++--
 src/owasp-dependency-check-suppressions.xml        | 43 -------------------
 tests/integration/pom.xml                          |  2 +-
 .../containers/DebeziumMongoDbContainer.java       |  2 +-
 .../containers/DebeziumMySQLContainer.java         |  2 +-
 .../containers/DebeziumPostgreSqlContainer.java    |  2 +-
 .../io/sources/PulsarIOSourceRunner.java           | 15 +++++++
 .../tests/integration/io/sources/SourceTester.java |  2 +
 .../debezium/DebeziumMongoDbSourceTester.java      | 22 +++++-----
 .../debezium/DebeziumMsSqlSourceTester.java        | 14 ++++---
 .../debezium/DebeziumMySqlSourceTester.java        |  7 ++--
 .../debezium/DebeziumOracleDbSourceTester.java     |  6 ++-
 .../debezium/DebeziumPostgreSqlSourceTester.java   | 10 ++---
 .../debezium/PulsarDebeziumSourcesTest.java        |  4 +-
 tests/scripts/pre-integ-tests.sh                   |  4 +-
 34 files changed, 202 insertions(+), 199 deletions(-)

diff --git a/pom.xml b/pom.xml
index f69f4f3ce86..8c0d4e952db 100644
--- a/pom.xml
+++ b/pom.xml
@@ -240,19 +240,16 @@ flexible messaging model and an intuitive client 
API.</description>
     <jclouds.version>2.6.0</jclouds.version>
     <guice.version>5.1.0</guice.version>
     <sqlite-jdbc.version>3.47.1.0</sqlite-jdbc.version>
-    <mysql-jdbc.version>8.0.11</mysql-jdbc.version>
-    <postgresql-jdbc.version>42.5.5</postgresql-jdbc.version>
+    <postgresql-jdbc.version>42.7.7</postgresql-jdbc.version>
     <clickhouse-jdbc.version>0.4.6</clickhouse-jdbc.version>
-    <mariadb-jdbc.version>2.7.5</mariadb-jdbc.version>
+    <mariadb-jdbc.version>3.5.5</mariadb-jdbc.version>
     <openmldb-jdbc.version>0.4.4-hotfix1</openmldb-jdbc.version>
     <json-smart.version>2.5.2</json-smart.version>
     <opensearch.version>2.16.0</opensearch.version>
     <elasticsearch-java.version>8.15.3</elasticsearch-java.version>
-    <debezium.version>1.9.7.Final</debezium.version>
-    <debezium.postgresql.version>42.5.5</debezium.postgresql.version>
-    <debezium.mysql.version>8.0.33</debezium.mysql.version>
-    <!-- Override version that brings CVE-2022-3143 with debezium -->
-    <wildfly-elytron.version>1.15.16.Final</wildfly-elytron.version>
+    <debezium.version>3.2.2.Final</debezium.version>
+    
<debezium.postgresql.version>${postgresql-jdbc.version}</debezium.postgresql.version>
+    <debezium.mysql.version>9.4.0</debezium.mysql.version>
     <jsonwebtoken.version>0.11.1</jsonwebtoken.version>
     <opencensus.version>0.28.0</opencensus.version>
     <hadoop3.version>3.4.0</hadoop3.version>
diff --git 
a/pulsar-io/debezium/core/src/main/java/org/apache/pulsar/io/debezium/DebeziumSource.java
 
b/pulsar-io/debezium/core/src/main/java/org/apache/pulsar/io/debezium/DebeziumSource.java
index 6c422c4f036..f0eaf2938d8 100644
--- 
a/pulsar-io/debezium/core/src/main/java/org/apache/pulsar/io/debezium/DebeziumSource.java
+++ 
b/pulsar-io/debezium/core/src/main/java/org/apache/pulsar/io/debezium/DebeziumSource.java
@@ -30,7 +30,7 @@ import 
org.apache.pulsar.io.kafka.connect.PulsarKafkaWorkerConfig;
 @Slf4j
 public abstract class DebeziumSource extends KafkaConnectSource {
     private static final String DEFAULT_CONVERTER = 
"org.apache.kafka.connect.json.JsonConverter";
-    private static final String DEFAULT_HISTORY = 
"org.apache.pulsar.io.debezium.PulsarDatabaseHistory";
+    private static final String DEFAULT_HISTORY = 
"org.apache.pulsar.io.debezium.PulsarSchemaHistory";
     private static final String DEFAULT_OFFSET_TOPIC = "debezium-offset-topic";
     private static final String DEFAULT_HISTORY_TOPIC = 
"debezium-history-topic";
 
@@ -76,6 +76,8 @@ public abstract class DebeziumSource extends 
KafkaConnectSource {
 
     public abstract void setDbConnectorTask(Map<String, Object> config) throws 
Exception;
 
+    public abstract void setDbConnectorClass(Map<String, Object> config) 
throws Exception;
+
     @Override
     public void open(Map<String, Object> config, SourceContext sourceContext) 
throws Exception {
         setDbConnectorTask(config);
@@ -87,28 +89,28 @@ public abstract class DebeziumSource extends 
KafkaConnectSource {
         // value.converter
         setConfigIfNull(config, 
PulsarKafkaWorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, DEFAULT_CONVERTER);
 
-        // database.history : implementation class for database history.
-        setConfigIfNull(config, 
HistorizedRelationalDatabaseConnectorConfig.DATABASE_HISTORY.name(), 
DEFAULT_HISTORY);
+        // schema.history : implementation class for schema history.
+        setConfigIfNull(config, 
HistorizedRelationalDatabaseConnectorConfig.SCHEMA_HISTORY.name(), 
DEFAULT_HISTORY);
 
-        // database.history.pulsar.service.url
-        String pulsarUrl = (String) 
config.get(PulsarDatabaseHistory.SERVICE_URL.name());
+        // schema.history.internal.pulsar.service.url
+        String pulsarUrl = (String) 
config.get(PulsarSchemaHistory.SERVICE_URL.name());
 
         String topicNamespace = topicNamespace(sourceContext);
         // topic.namespace
         setConfigIfNull(config, 
PulsarKafkaWorkerConfig.TOPIC_NAMESPACE_CONFIG, topicNamespace);
 
         String sourceName = sourceContext.getSourceName();
-        // database.history.pulsar.topic: history topic name
-        setConfigIfNull(config, PulsarDatabaseHistory.TOPIC.name(),
+        // schema.history.internal.pulsar.topic: history topic name
+        setConfigIfNull(config, PulsarSchemaHistory.TOPIC.name(),
             topicNamespace + "/" + sourceName + "-" + DEFAULT_HISTORY_TOPIC);
         // offset.storage.topic: offset topic name
         setConfigIfNull(config, 
PulsarKafkaWorkerConfig.OFFSET_STORAGE_TOPIC_CONFIG,
             topicNamespace + "/" + sourceName + "-" + DEFAULT_OFFSET_TOPIC);
 
-        // pass pulsar.client.builder if database.history.pulsar.service.url 
is not provided
+        // pass pulsar.client.builder if 
schema.history.internal.pulsar.service.url is not provided
         if (StringUtils.isEmpty(pulsarUrl)) {
             String pulsarClientBuilder = 
SerDeUtils.serialize(sourceContext.getPulsarClientBuilder());
-            config.put(PulsarDatabaseHistory.CLIENT_BUILDER.name(), 
pulsarClientBuilder);
+            config.put(PulsarSchemaHistory.CLIENT_BUILDER.name(), 
pulsarClientBuilder);
         }
 
         super.open(config, sourceContext);
diff --git 
a/pulsar-io/debezium/core/src/main/java/org/apache/pulsar/io/debezium/PulsarDatabaseHistory.java
 
b/pulsar-io/debezium/core/src/main/java/org/apache/pulsar/io/debezium/PulsarSchemaHistory.java
similarity index 86%
rename from 
pulsar-io/debezium/core/src/main/java/org/apache/pulsar/io/debezium/PulsarDatabaseHistory.java
rename to 
pulsar-io/debezium/core/src/main/java/org/apache/pulsar/io/debezium/PulsarSchemaHistory.java
index 7ca0d309cf9..8ae8c663a8e 100644
--- 
a/pulsar-io/debezium/core/src/main/java/org/apache/pulsar/io/debezium/PulsarDatabaseHistory.java
+++ 
b/pulsar-io/debezium/core/src/main/java/org/apache/pulsar/io/debezium/PulsarSchemaHistory.java
@@ -26,12 +26,12 @@ import io.debezium.annotation.ThreadSafe;
 import io.debezium.config.Configuration;
 import io.debezium.config.Field;
 import io.debezium.document.DocumentReader;
-import io.debezium.relational.history.AbstractDatabaseHistory;
-import io.debezium.relational.history.DatabaseHistory;
-import io.debezium.relational.history.DatabaseHistoryException;
-import io.debezium.relational.history.DatabaseHistoryListener;
+import io.debezium.relational.history.AbstractSchemaHistory;
 import io.debezium.relational.history.HistoryRecord;
 import io.debezium.relational.history.HistoryRecordComparator;
+import io.debezium.relational.history.SchemaHistory;
+import io.debezium.relational.history.SchemaHistoryException;
+import io.debezium.relational.history.SchemaHistoryListener;
 import java.io.IOException;
 import java.util.Collections;
 import java.util.HashMap;
@@ -52,15 +52,15 @@ import org.apache.pulsar.client.api.Reader;
 import org.apache.pulsar.client.api.Schema;
 
 /**
- * A {@link DatabaseHistory} implementation that records schema changes as 
normal pulsar messages on the specified
+ * A {@link SchemaHistory} implementation that records schema changes as 
normal pulsar messages on the specified
  * topic, and that recovers the history by establishing a Kafka Consumer 
re-processing all messages on that topic.
  */
 @Slf4j
 @ThreadSafe
-public final class PulsarDatabaseHistory extends AbstractDatabaseHistory {
+public final class PulsarSchemaHistory extends AbstractSchemaHistory {
 
     public static final Field TOPIC = 
Field.create(CONFIGURATION_FIELD_PREFIX_STRING + "pulsar.topic")
-        .withDisplayName("Database history topic name")
+        .withDisplayName("Schema history topic name")
         .withType(Type.STRING)
         .withWidth(Width.LONG)
         .withImportance(Importance.HIGH)
@@ -97,7 +97,7 @@ public final class PulsarDatabaseHistory extends 
AbstractDatabaseHistory {
         TOPIC,
         SERVICE_URL,
         CLIENT_BUILDER,
-        DatabaseHistory.NAME,
+        SchemaHistory.NAME,
         READER_CONFIG);
 
     private final ObjectMapper mapper = new ObjectMapper();
@@ -113,7 +113,7 @@ public final class PulsarDatabaseHistory extends 
AbstractDatabaseHistory {
     public void configure(
             Configuration config,
             HistoryRecordComparator comparator,
-            DatabaseHistoryListener listener,
+            SchemaHistoryListener listener,
             boolean useCatalogBeforeSchema) {
         super.configure(config, comparator, listener, useCatalogBeforeSchema);
         if (!config.validateAndRecord(ALL_FIELDS, logger::error)) {
@@ -148,9 +148,9 @@ public final class PulsarDatabaseHistory extends 
AbstractDatabaseHistory {
         }
 
         // Copy the relevant portions of the configuration and add useful 
defaults ...
-        this.dbHistoryName = config.getString(DatabaseHistory.NAME, 
UUID.randomUUID().toString());
+        this.dbHistoryName = config.getString(SchemaHistory.NAME, 
UUID.randomUUID().toString());
 
-        log.info("Configure to store the debezium database history {} to 
pulsar topic {}",
+        log.info("Configure to store the debezium schema history {} to pulsar 
topic {}",
             dbHistoryName, topicName);
     }
 
@@ -163,7 +163,7 @@ public final class PulsarDatabaseHistory extends 
AbstractDatabaseHistory {
             p.send("");
         } catch (PulsarClientException pce) {
             log.error("Failed to initialize storage", pce);
-            throw new RuntimeException("Failed to initialize storage", pce);
+            throw new SchemaHistoryException("Failed to initialize storage", 
pce);
         }
     }
 
@@ -172,7 +172,7 @@ public final class PulsarDatabaseHistory extends 
AbstractDatabaseHistory {
             try {
                 pulsarClient = clientBuilder.build();
             } catch (PulsarClientException e) {
-                throw new RuntimeException("Failed to create pulsar client to 
pulsar cluster", e);
+                throw new SchemaHistoryException("Failed to create pulsar 
client to pulsar cluster", e);
             }
         }
     }
@@ -201,18 +201,18 @@ public final class PulsarDatabaseHistory extends 
AbstractDatabaseHistory {
     }
 
     @Override
-    protected void storeRecord(HistoryRecord record) throws 
DatabaseHistoryException {
+    protected void storeRecord(HistoryRecord record) throws 
SchemaHistoryException {
         if (this.producer == null) {
             throw new IllegalStateException("No producer is available. Ensure 
that 'start()'"
-                    + " is called before storing database history records.");
+                    + " is called before storing schema history records.");
         }
         if (log.isTraceEnabled()) {
-            log.trace("Storing record into database history: {}", record);
+            log.trace("Storing record into schema history: {}", record);
         }
         try {
             producer.send(record.toString());
         } catch (PulsarClientException e) {
-            throw new DatabaseHistoryException(e);
+            throw new SchemaHistoryException(e);
         }
     }
 
@@ -242,7 +242,7 @@ public final class PulsarDatabaseHistory extends 
AbstractDatabaseHistory {
     protected void recoverRecords(Consumer<HistoryRecord> records) {
         setupClientIfNeeded();
         try (Reader<String> historyReader = createHistoryReader()) {
-            log.info("Scanning the database history topic '{}'", topicName);
+            log.info("Scanning the schema history topic '{}'", topicName);
 
             // Read all messages in the topic ...
             MessageId lastProcessedMessageId = null;
@@ -255,15 +255,15 @@ public final class PulsarDatabaseHistory extends 
AbstractDatabaseHistory {
                         if (!isBlank(msg.getValue())) {
                             HistoryRecord recordObj = new 
HistoryRecord(reader.read(msg.getValue()));
                             if (log.isTraceEnabled()) {
-                                log.trace("Recovering database history: {}", 
recordObj);
+                                log.trace("Recovering schema history: {}", 
recordObj);
                             }
                             if (!recordObj.isValid()) {
-                                log.warn("Skipping invalid database history 
record '{}'. This is often not an issue,"
+                                log.warn("Skipping invalid schema history 
record '{}'. This is often not an issue,"
                                                 + " but if it happens 
repeatedly please check the '{}' topic.",
                                     recordObj, topicName);
                             } else {
                                 records.accept(recordObj);
-                                log.trace("Recovered database history: {}", 
recordObj);
+                                log.trace("Recovered schema history: {}", 
recordObj);
                             }
                         }
                         lastProcessedMessageId = msg.getMessageId();
@@ -274,7 +274,7 @@ public final class PulsarDatabaseHistory extends 
AbstractDatabaseHistory {
                     throw e;
                 }
             }
-            log.info("Successfully completed scanning the database history 
topic '{}'", topicName);
+            log.info("Successfully completed scanning the schema history topic 
'{}'", topicName);
         } catch (IOException ioe) {
             log.error("Encountered issues on recovering history records", ioe);
             throw new RuntimeException("Encountered issues on recovering 
history records", ioe);
@@ -287,8 +287,8 @@ public final class PulsarDatabaseHistory extends 
AbstractDatabaseHistory {
         try (Reader<String> historyReader = createHistoryReader()) {
             return historyReader.hasMessageAvailable();
         } catch (IOException e) {
-            log.error("Encountered issues on checking existence of database 
history", e);
-            throw new RuntimeException("Encountered issues on checking 
existence of database history", e);
+            log.error("Encountered issues on checking existence of schema 
history", e);
+            throw new RuntimeException("Encountered issues on checking 
existence of schema history", e);
         }
     }
 
diff --git 
a/pulsar-io/debezium/core/src/main/java/org/apache/pulsar/io/debezium/SerDeUtils.java
 
b/pulsar-io/debezium/core/src/main/java/org/apache/pulsar/io/debezium/SerDeUtils.java
index 3d8dd3bd08d..156b19b3f76 100644
--- 
a/pulsar-io/debezium/core/src/main/java/org/apache/pulsar/io/debezium/SerDeUtils.java
+++ 
b/pulsar-io/debezium/core/src/main/java/org/apache/pulsar/io/debezium/SerDeUtils.java
@@ -37,7 +37,7 @@ public class SerDeUtils {
            return ois.readObject();
         } catch (Exception e) {
             throw new RuntimeException(
-                    "Failed to initialize the pulsar client to store debezium 
database history", e);
+                    "Failed to initialize the pulsar client to store debezium 
schema history", e);
         }
     }
 
diff --git 
a/pulsar-io/debezium/core/src/test/java/org/apache/pulsar/io/debezium/PulsarDatabaseHistoryTest.java
 
b/pulsar-io/debezium/core/src/test/java/org/apache/pulsar/io/debezium/PulsarSchemaHistoryTest.java
similarity index 87%
rename from 
pulsar-io/debezium/core/src/test/java/org/apache/pulsar/io/debezium/PulsarDatabaseHistoryTest.java
rename to 
pulsar-io/debezium/core/src/test/java/org/apache/pulsar/io/debezium/PulsarSchemaHistoryTest.java
index 4d076ebbc0d..c593b464c4a 100644
--- 
a/pulsar-io/debezium/core/src/test/java/org/apache/pulsar/io/debezium/PulsarDatabaseHistoryTest.java
+++ 
b/pulsar-io/debezium/core/src/test/java/org/apache/pulsar/io/debezium/PulsarSchemaHistoryTest.java
@@ -26,8 +26,8 @@ import io.debezium.config.Configuration;
 import io.debezium.connector.mysql.antlr.MySqlAntlrDdlParser;
 import io.debezium.relational.Tables;
 import io.debezium.relational.ddl.DdlParser;
-import io.debezium.relational.history.DatabaseHistory;
-import io.debezium.relational.history.DatabaseHistoryListener;
+import io.debezium.relational.history.SchemaHistory;
+import io.debezium.relational.history.SchemaHistoryListener;
 import io.debezium.text.ParsingException;
 import io.debezium.util.Collect;
 import java.io.ByteArrayOutputStream;
@@ -46,11 +46,11 @@ import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
 /**
- * Test the implementation of {@link PulsarDatabaseHistory}.
+ * Test the implementation of {@link PulsarSchemaHistory}.
  */
-public class PulsarDatabaseHistoryTest extends ProducerConsumerBase {
+public class PulsarSchemaHistoryTest extends ProducerConsumerBase {
 
-    private PulsarDatabaseHistory history;
+    private PulsarSchemaHistory history;
     private Map<String, Object> position;
     private Map<String, String> source;
     private String topicName;
@@ -65,7 +65,7 @@ public class PulsarDatabaseHistoryTest extends 
ProducerConsumerBase {
         source = Collect.hashMapOf("server", "my-server");
         setLogPosition(0);
         this.topicName = "persistent://my-property/my-ns/schema-changes-topic";
-        this.history = new PulsarDatabaseHistory();
+        this.history = new PulsarSchemaHistory();
     }
 
     @AfterMethod(alwaysRun = true)
@@ -78,9 +78,9 @@ public class PulsarDatabaseHistoryTest extends 
ProducerConsumerBase {
     private void testHistoryTopicContent(boolean skipUnparseableDDL, boolean 
testWithClientBuilder,
                                          boolean testWithReaderConfig) throws 
Exception {
         Configuration.Builder configBuidler = Configuration.create()
-                .with(PulsarDatabaseHistory.TOPIC, topicName)
-                .with(DatabaseHistory.NAME, "my-db-history")
-                .with(DatabaseHistory.SKIP_UNPARSEABLE_DDL_STATEMENTS, 
skipUnparseableDDL);
+                .with(PulsarSchemaHistory.TOPIC, topicName)
+                .with(SchemaHistory.NAME, "my-db-history")
+                .with(SchemaHistory.SKIP_UNPARSEABLE_DDL_STATEMENTS, 
skipUnparseableDDL);
 
         if (testWithClientBuilder) {
             ClientBuilder builder = 
PulsarClient.builder().serviceUrl(brokerUrl.toString());
@@ -89,18 +89,18 @@ public class PulsarDatabaseHistoryTest extends 
ProducerConsumerBase {
                 oos.writeObject(builder);
                 oos.flush();
                 byte[] data = bao.toByteArray();
-                configBuidler.with(PulsarDatabaseHistory.CLIENT_BUILDER, 
Base64.getEncoder().encodeToString(data));
+                configBuidler.with(PulsarSchemaHistory.CLIENT_BUILDER, 
Base64.getEncoder().encodeToString(data));
             }
         } else {
-            configBuidler.with(PulsarDatabaseHistory.SERVICE_URL, 
brokerUrl.toString());
+            configBuidler.with(PulsarSchemaHistory.SERVICE_URL, 
brokerUrl.toString());
         }
 
         if (testWithReaderConfig) {
-            configBuidler.with(PulsarDatabaseHistory.READER_CONFIG, 
"{\"subscriptionName\":\"my-subscription\"}");
+            configBuidler.with(PulsarSchemaHistory.READER_CONFIG, 
"{\"subscriptionName\":\"my-subscription\"}");
         }
 
         // Start up the history ...
-        history.configure(configBuidler.build(), null, 
DatabaseHistoryListener.NOOP, true);
+        history.configure(configBuidler.build(), null, 
SchemaHistoryListener.NOOP, true);
         history.start();
 
         // Should be able to call start more than once ...
@@ -159,8 +159,8 @@ public class PulsarDatabaseHistoryTest extends 
ProducerConsumerBase {
 
         // Stop the history (which should stop the producer) ...
         history.stop();
-        history = new PulsarDatabaseHistory();
-        history.configure(configBuidler.build(), null, 
DatabaseHistoryListener.NOOP, true);
+        history = new PulsarSchemaHistory();
+        history.configure(configBuidler.build(), null, 
SchemaHistoryListener.NOOP, true);
         // no need to start
 
         // Recover from the very beginning to just past the first change ...
@@ -244,13 +244,13 @@ public class PulsarDatabaseHistoryTest extends 
ProducerConsumerBase {
 
         // Set history to use dummy topic
         Configuration config = Configuration.create()
-            .with(PulsarDatabaseHistory.SERVICE_URL, brokerUrl.toString())
-            .with(PulsarDatabaseHistory.TOPIC, 
"persistent://my-property/my-ns/dummytopic")
-            .with(DatabaseHistory.NAME, "my-db-history")
-            .with(DatabaseHistory.SKIP_UNPARSEABLE_DDL_STATEMENTS, true)
+            .with(PulsarSchemaHistory.SERVICE_URL, brokerUrl.toString())
+            .with(PulsarSchemaHistory.TOPIC, 
"persistent://my-property/my-ns/dummytopic")
+            .with(SchemaHistory.NAME, "my-db-history")
+            .with(SchemaHistory.SKIP_UNPARSEABLE_DDL_STATEMENTS, true)
             .build();
 
-        history.configure(config, null, DatabaseHistoryListener.NOOP, true);
+        history.configure(config, null, SchemaHistoryListener.NOOP, true);
         history.start();
 
         // dummytopic should not exist yet
diff --git 
a/pulsar-io/debezium/mongodb/src/main/java/org/apache/pulsar/io/debezium/mongodb/DebeziumMongoDbSource.java
 
b/pulsar-io/debezium/mongodb/src/main/java/org/apache/pulsar/io/debezium/mongodb/DebeziumMongoDbSource.java
index 88acbc61c77..996aa5ac725 100644
--- 
a/pulsar-io/debezium/mongodb/src/main/java/org/apache/pulsar/io/debezium/mongodb/DebeziumMongoDbSource.java
+++ 
b/pulsar-io/debezium/mongodb/src/main/java/org/apache/pulsar/io/debezium/mongodb/DebeziumMongoDbSource.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.io.debezium.mongodb;
 
 import java.util.Map;
+import org.apache.kafka.connect.runtime.ConnectorConfig;
 import org.apache.kafka.connect.runtime.TaskConfig;
 import org.apache.pulsar.io.core.SourceContext;
 import org.apache.pulsar.io.debezium.DebeziumSource;
@@ -27,8 +28,14 @@ import org.apache.pulsar.io.debezium.DebeziumSource;
  * A pulsar source that runs debezium mongodb source.
  */
 public class DebeziumMongoDbSource extends DebeziumSource {
+    private static final String DEFAULT_CONNECTOR = 
"io.debezium.connector.mongodb.MongoDbConnector";
     private static final String DEFAULT_TASK = 
"io.debezium.connector.mongodb.MongoDbConnectorTask";
 
+    @Override
+    public void setDbConnectorClass(Map<String, Object> config) throws 
Exception {
+        throwExceptionIfConfigNotMatch(config, 
ConnectorConfig.CONNECTOR_CLASS_CONFIG, DEFAULT_CONNECTOR);
+    }
+
     @Override
     public void setDbConnectorTask(Map<String, Object> config) throws 
Exception {
         throwExceptionIfConfigNotMatch(config, TaskConfig.TASK_CLASS_CONFIG, 
DEFAULT_TASK);
diff --git 
a/pulsar-io/debezium/mongodb/src/main/resources/debezium-mongodb-source-config.yaml
 
b/pulsar-io/debezium/mongodb/src/main/resources/debezium-mongodb-source-config.yaml
index 8c1564bf4e6..54e0bbd7580 100644
--- 
a/pulsar-io/debezium/mongodb/src/main/resources/debezium-mongodb-source-config.yaml
+++ 
b/pulsar-io/debezium/mongodb/src/main/resources/debezium-mongodb-source-config.yaml
@@ -28,10 +28,11 @@ parallelism: 1
 configs:
   ## config for pg, docker image: debezium/example-mongodb:0.8
   mongodb.hosts: "rs0/mongodb:27017"
-  mongodb.name: "dbserver1"
   mongodb.user: "debezium"
   mongodb.password: "dbz"
   mongodb.task.id: "1"
   database.whitelist: "inventory"
+  topic.prefix: "dbserver1"
 
-  database.history.pulsar.service.url: "pulsar://127.0.0.1:6650"
+  schema.history.internal.pulsar.service.url: "pulsar://127.0.0.1:6650"
+  connector.class: "io.debezium.connector.mongodb.MongoDbConnector"
diff --git 
a/pulsar-io/debezium/mssql/src/main/java/org/apache/pulsar/io/debezium/mssql/DebeziumMsSqlSource.java
 
b/pulsar-io/debezium/mssql/src/main/java/org/apache/pulsar/io/debezium/mssql/DebeziumMsSqlSource.java
index f60653e7aae..0df5eb6444a 100644
--- 
a/pulsar-io/debezium/mssql/src/main/java/org/apache/pulsar/io/debezium/mssql/DebeziumMsSqlSource.java
+++ 
b/pulsar-io/debezium/mssql/src/main/java/org/apache/pulsar/io/debezium/mssql/DebeziumMsSqlSource.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.io.debezium.mssql;
 
 import java.util.Map;
+import org.apache.kafka.connect.runtime.ConnectorConfig;
 import org.apache.kafka.connect.runtime.TaskConfig;
 import org.apache.pulsar.io.debezium.DebeziumSource;
 
@@ -27,8 +28,14 @@ import org.apache.pulsar.io.debezium.DebeziumSource;
  * A pulsar source that runs debezium mssql source.
  */
 public class DebeziumMsSqlSource extends DebeziumSource {
+    private static final String DEFAULT_CONNECTOR = 
"io.debezium.connector.sqlserver.SqlServerConnector";
     private static final String DEFAULT_TASK = 
"io.debezium.connector.sqlserver.SqlServerConnectorTask";
 
+    @Override
+    public void setDbConnectorClass(Map<String, Object> config) throws 
Exception {
+        throwExceptionIfConfigNotMatch(config, 
ConnectorConfig.CONNECTOR_CLASS_CONFIG, DEFAULT_CONNECTOR);
+    }
+
     @Override
     public void setDbConnectorTask(Map<String, Object> config) throws 
Exception {
         throwExceptionIfConfigNotMatch(config, TaskConfig.TASK_CLASS_CONFIG, 
DEFAULT_TASK);
diff --git 
a/pulsar-io/debezium/mssql/src/main/resources/debezium-mssql-source-config.yaml 
b/pulsar-io/debezium/mssql/src/main/resources/debezium-mssql-source-config.yaml
index c99fb48eab0..de09fedcb59 100644
--- 
a/pulsar-io/debezium/mssql/src/main/resources/debezium-mssql-source-config.yaml
+++ 
b/pulsar-io/debezium/mssql/src/main/resources/debezium-mssql-source-config.yaml
@@ -30,7 +30,8 @@ configs:
   database.port: "1521"
   database.user: "sa"
   database.password: "MyP@ssword1"
-  database.dbname: "MyDB"
-  database.server.name: "mssql"
+  database.names: "MyDB"
+  topic.prefix: "mssql"
 
-  database.history.pulsar.service.url: "pulsar://127.0.0.1:6650"
+  schema.history.internal.pulsar.service.url: "pulsar://127.0.0.1:6650"
+  connector.class: "io.debezium.connector.sqlserver.SqlServerConnector"
diff --git a/pulsar-io/debezium/mysql/pom.xml b/pulsar-io/debezium/mysql/pom.xml
index 7efc8d68655..536015c7913 100644
--- a/pulsar-io/debezium/mysql/pom.xml
+++ b/pulsar-io/debezium/mysql/pom.xml
@@ -55,8 +55,8 @@
   <dependencyManagement>
     <dependencies>
       <dependency>
-        <groupId>mysql</groupId>
-        <artifactId>mysql-connector-java</artifactId>
+        <groupId>com.mysql</groupId>
+        <artifactId>mysql-connector-j</artifactId>
         <version>${debezium.mysql.version}</version>
       </dependency>
     </dependencies>
diff --git 
a/pulsar-io/debezium/mysql/src/main/java/org/apache/pulsar/io/debezium/mysql/DebeziumMysqlSource.java
 
b/pulsar-io/debezium/mysql/src/main/java/org/apache/pulsar/io/debezium/mysql/DebeziumMysqlSource.java
index 633535512d2..7a89fa13193 100644
--- 
a/pulsar-io/debezium/mysql/src/main/java/org/apache/pulsar/io/debezium/mysql/DebeziumMysqlSource.java
+++ 
b/pulsar-io/debezium/mysql/src/main/java/org/apache/pulsar/io/debezium/mysql/DebeziumMysqlSource.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.io.debezium.mysql;
 
 import java.util.Map;
+import org.apache.kafka.connect.runtime.ConnectorConfig;
 import org.apache.kafka.connect.runtime.TaskConfig;
 import org.apache.pulsar.io.debezium.DebeziumSource;
 
@@ -26,8 +27,14 @@ import org.apache.pulsar.io.debezium.DebeziumSource;
  * A pulsar source that runs debezium mysql source.
  */
 public class DebeziumMysqlSource extends DebeziumSource {
+    private static final String DEFAULT_CONNECTOR = 
"io.debezium.connector.mysql.MySqlConnector";
     private static final String DEFAULT_TASK = 
"io.debezium.connector.mysql.MySqlConnectorTask";
 
+    @Override
+    public void setDbConnectorClass(Map<String, Object> config) throws 
Exception {
+        throwExceptionIfConfigNotMatch(config, 
ConnectorConfig.CONNECTOR_CLASS_CONFIG, DEFAULT_CONNECTOR);
+    }
+
     @Override
     public void setDbConnectorTask(Map<String, Object> config) throws 
Exception {
         throwExceptionIfConfigNotMatch(config, TaskConfig.TASK_CLASS_CONFIG, 
DEFAULT_TASK);
diff --git 
a/pulsar-io/debezium/mysql/src/main/resources/debezium-mysql-source-config.yaml 
b/pulsar-io/debezium/mysql/src/main/resources/debezium-mysql-source-config.yaml
index 7a51b0092ac..e278b2276aa 100644
--- 
a/pulsar-io/debezium/mysql/src/main/resources/debezium-mysql-source-config.yaml
+++ 
b/pulsar-io/debezium/mysql/src/main/resources/debezium-mysql-source-config.yaml
@@ -32,11 +32,12 @@ configs:
   database.user: "debezium"
   database.password: "dbz"
   database.server.id: "184054"
-  database.server.name: "dbserver1"
   database.whitelist: "inventory"
+  topic.prefix: "dbserver1"
 
-  database.history.pulsar.topic: "mysql-history-topic"
-  database.history.pulsar.service.url: "pulsar://127.0.0.1:6650"
+  schema.history.internal.pulsar.topic: "mysql-history-topic"
+  schema.history.internal.pulsar.service.url: "pulsar://127.0.0.1:6650"
   offset.storage.topic: "mysql-offset-topic"
+  connector.class: "io.debezium.connector.mysql.MySqlConnector"
 
 
diff --git 
a/pulsar-io/debezium/oracle/src/main/java/org/apache/pulsar/io/debezium/oracle/DebeziumOracleSource.java
 
b/pulsar-io/debezium/oracle/src/main/java/org/apache/pulsar/io/debezium/oracle/DebeziumOracleSource.java
index cb7363ac8cd..7b17203760b 100644
--- 
a/pulsar-io/debezium/oracle/src/main/java/org/apache/pulsar/io/debezium/oracle/DebeziumOracleSource.java
+++ 
b/pulsar-io/debezium/oracle/src/main/java/org/apache/pulsar/io/debezium/oracle/DebeziumOracleSource.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.io.debezium.oracle;
 
 import java.util.Map;
+import org.apache.kafka.connect.runtime.ConnectorConfig;
 import org.apache.kafka.connect.runtime.TaskConfig;
 import org.apache.pulsar.io.debezium.DebeziumSource;
 
@@ -27,8 +28,14 @@ import org.apache.pulsar.io.debezium.DebeziumSource;
  * A pulsar source that runs debezium oracle source.
  */
 public class DebeziumOracleSource extends DebeziumSource {
+    private static final String DEFAULT_CONNECTOR = 
"io.debezium.connector.oracle.OracleConnector";
     private static final String DEFAULT_TASK = 
"io.debezium.connector.oracle.OracleConnectorTask";
 
+    @Override
+    public void setDbConnectorClass(Map<String, Object> config) throws 
Exception {
+        throwExceptionIfConfigNotMatch(config, 
ConnectorConfig.CONNECTOR_CLASS_CONFIG, DEFAULT_CONNECTOR);
+    }
+
     @Override
     public void setDbConnectorTask(Map<String, Object> config) throws 
Exception {
         throwExceptionIfConfigNotMatch(config, TaskConfig.TASK_CLASS_CONFIG, 
DEFAULT_TASK);
diff --git 
a/pulsar-io/debezium/oracle/src/main/resources/debezium-oracle-source-config.yaml
 
b/pulsar-io/debezium/oracle/src/main/resources/debezium-oracle-source-config.yaml
index 94173d68a11..8955585979c 100644
--- 
a/pulsar-io/debezium/oracle/src/main/resources/debezium-oracle-source-config.yaml
+++ 
b/pulsar-io/debezium/oracle/src/main/resources/debezium-oracle-source-config.yaml
@@ -33,6 +33,7 @@ configs:
   database.user: "sysdba"
   database.password: "oracle"
   database.dbname: "XE"
-  database.server.name: "XE"
+  topic.prefix: "XE"
 
-  database.history.pulsar.service.url: "pulsar://127.0.0.1:6650"
+  schema.history.internal.pulsar.service.url: "pulsar://127.0.0.1:6650"
+  connector.class: "io.debezium.connector.oracle.OracleConnector"
diff --git a/pulsar-io/debezium/pom.xml b/pulsar-io/debezium/pom.xml
index ebe54943e1f..ece17286e0f 100644
--- a/pulsar-io/debezium/pom.xml
+++ b/pulsar-io/debezium/pom.xml
@@ -31,46 +31,6 @@
   <artifactId>pulsar-io-debezium</artifactId>
   <name>Pulsar IO :: Debezium</name>
 
-  <dependencyManagement>
-    <dependencies>
-      <dependency>
-        <groupId>org.wildfly.security</groupId>
-        <artifactId>wildfly-elytron-sasl-digest</artifactId>
-        <version>${wildfly-elytron.version}</version>
-      </dependency>
-      <dependency>
-        <groupId>org.wildfly.security</groupId>
-        <artifactId>wildfly-elytron-sasl-external</artifactId>
-        <version>${wildfly-elytron.version}</version>
-      </dependency>
-      <dependency>
-        <groupId>org.wildfly.security</groupId>
-        <artifactId>wildfly-elytron-sasl-gs2</artifactId>
-        <version>${wildfly-elytron.version}</version>
-      </dependency>
-      <dependency>
-        <groupId>org.wildfly.security</groupId>
-        <artifactId>wildfly-elytron-sasl-oauth2</artifactId>
-        <version>${wildfly-elytron.version}</version>
-      </dependency>
-      <dependency>
-        <groupId>org.wildfly.security</groupId>
-        <artifactId>wildfly-elytron-sasl-plain</artifactId>
-        <version>${wildfly-elytron.version}</version>
-      </dependency>
-      <dependency>
-        <groupId>org.wildfly.security</groupId>
-        <artifactId>wildfly-elytron-sasl-scram</artifactId>
-        <version>${wildfly-elytron.version}</version>
-      </dependency>
-      <dependency>
-        <groupId>org.wildfly.security</groupId>
-        <artifactId>wildfly-elytron-password-impl</artifactId>
-        <version>${wildfly-elytron.version}</version>
-      </dependency>
-    </dependencies>
-  </dependencyManagement>
-
   <modules>
     <module>core</module>
     <module>mysql</module>
diff --git 
a/pulsar-io/debezium/postgres/src/main/java/org/apache/pulsar/io/debezium/postgres/DebeziumPostgresSource.java
 
b/pulsar-io/debezium/postgres/src/main/java/org/apache/pulsar/io/debezium/postgres/DebeziumPostgresSource.java
index ef645aad6b1..54ab110edd0 100644
--- 
a/pulsar-io/debezium/postgres/src/main/java/org/apache/pulsar/io/debezium/postgres/DebeziumPostgresSource.java
+++ 
b/pulsar-io/debezium/postgres/src/main/java/org/apache/pulsar/io/debezium/postgres/DebeziumPostgresSource.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.io.debezium.postgres;
 
 import java.util.Map;
+import org.apache.kafka.connect.runtime.ConnectorConfig;
 import org.apache.kafka.connect.runtime.TaskConfig;
 import org.apache.pulsar.io.debezium.DebeziumSource;
 
@@ -27,8 +28,14 @@ import org.apache.pulsar.io.debezium.DebeziumSource;
  * A pulsar source that runs debezium postgres source.
  */
 public class DebeziumPostgresSource extends DebeziumSource {
+    private static final String DEFAULT_CONNECTOR = 
"io.debezium.connector.postgresql.PostgresConnector";
     private static final String DEFAULT_TASK = 
"io.debezium.connector.postgresql.PostgresConnectorTask";
 
+    @Override
+    public void setDbConnectorClass(Map<String, Object> config) throws 
Exception {
+        throwExceptionIfConfigNotMatch(config, 
ConnectorConfig.CONNECTOR_CLASS_CONFIG, DEFAULT_CONNECTOR);
+    }
+
     @Override
     public void setDbConnectorTask(Map<String, Object> config) throws 
Exception {
         throwExceptionIfConfigNotMatch(config, TaskConfig.TASK_CLASS_CONFIG, 
DEFAULT_TASK);
diff --git 
a/pulsar-io/debezium/postgres/src/main/resources/debezium-postgres-source-config.yaml
 
b/pulsar-io/debezium/postgres/src/main/resources/debezium-postgres-source-config.yaml
index 3f6b7eaaba2..4048c73e53b 100644
--- 
a/pulsar-io/debezium/postgres/src/main/resources/debezium-postgres-source-config.yaml
+++ 
b/pulsar-io/debezium/postgres/src/main/resources/debezium-postgres-source-config.yaml
@@ -32,7 +32,8 @@ configs:
   database.user: "postgres"
   database.password: "postgres"
   database.dbname: "postgres"
-  database.server.name: "dbserver1"
-  schema.whitelist: "inventory"
+  schema.allow.list: "inventory"
+  topic.prefix: "dbserver1"
 
-  database.history.pulsar.service.url: "pulsar://127.0.0.1:6650"
+  schema.history.internal.pulsar.service.url: "pulsar://127.0.0.1:6650"
+  connector.class: "io.debezium.connector.postgresql.PostgresConnector"
diff --git a/pulsar-io/mongo/pom.xml b/pulsar-io/mongo/pom.xml
index 3666dbef54f..9306bf2355a 100644
--- a/pulsar-io/mongo/pom.xml
+++ b/pulsar-io/mongo/pom.xml
@@ -33,7 +33,7 @@
   <name>Pulsar IO :: MongoDB</name>
 
   <properties>
-    <mongo-reactivestreams.version>4.1.2</mongo-reactivestreams.version>
+    <mongo-reactivestreams.version>5.2.0</mongo-reactivestreams.version>
   </properties>
 
   <dependencies>
diff --git 
a/pulsar-io/mongo/src/test/java/org/apache/pulsar/io/mongodb/MongoSinkTest.java 
b/pulsar-io/mongo/src/test/java/org/apache/pulsar/io/mongodb/MongoSinkTest.java
index e442ed0bcad..ea62e546abc 100644
--- 
a/pulsar-io/mongo/src/test/java/org/apache/pulsar/io/mongodb/MongoSinkTest.java
+++ 
b/pulsar-io/mongo/src/test/java/org/apache/pulsar/io/mongodb/MongoSinkTest.java
@@ -26,13 +26,22 @@ import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 import com.mongodb.MongoBulkWriteException;
+import com.mongodb.ServerAddress;
 import com.mongodb.bulk.BulkWriteError;
+import com.mongodb.bulk.BulkWriteInsert;
+import com.mongodb.bulk.BulkWriteResult;
+import com.mongodb.bulk.BulkWriteUpsert;
+import com.mongodb.bulk.WriteConcernError;
+import com.mongodb.internal.bulk.WriteRequest;
 import com.mongodb.reactivestreams.client.MongoClient;
 import com.mongodb.reactivestreams.client.MongoCollection;
 import com.mongodb.reactivestreams.client.MongoDatabase;
 import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import org.apache.pulsar.functions.api.Record;
 import org.apache.pulsar.io.core.SinkContext;
 import org.bson.BsonDocument;
@@ -97,7 +106,15 @@ public class MongoSinkTest {
             if (throwBulkError) {
                 List<BulkWriteError> writeErrors = Arrays.asList(
                         new BulkWriteError(0, "error", new BsonDocument(), 1));
-                exc = new MongoBulkWriteException(null, writeErrors, null, 
null);
+                BulkWriteResult result = BulkWriteResult.acknowledged(
+                        WriteRequest.Type.INSERT, 1, 0,
+                        Collections.<BulkWriteUpsert>emptyList(),
+                        Collections.<BulkWriteInsert>emptyList());
+                WriteConcernError writeConcernError = null;
+                ServerAddress serverAddress = new ServerAddress("localhost", 
27017);
+                Set<String> errorLabels = new HashSet<>();
+                exc = new MongoBulkWriteException(result, writeErrors, 
writeConcernError,
+                        serverAddress, errorLabels);
             }
             subscriber.onError(exc);
             return null;
diff --git 
a/pulsar-io/mongo/src/test/java/org/apache/pulsar/io/mongodb/MongoSourceTest.java
 
b/pulsar-io/mongo/src/test/java/org/apache/pulsar/io/mongodb/MongoSourceTest.java
index 6da4b1123fd..7243b365e88 100644
--- 
a/pulsar-io/mongo/src/test/java/org/apache/pulsar/io/mongodb/MongoSourceTest.java
+++ 
b/pulsar-io/mongo/src/test/java/org/apache/pulsar/io/mongodb/MongoSourceTest.java
@@ -110,17 +110,21 @@ public class MongoSourceTest {
 
         source.open(map, mockSourceContext);
 
-        subscriber.onNext(new ChangeStreamDocument<>(
-                OperationType.INSERT,
+        subscriber.onNext(new ChangeStreamDocument<Document>(
+                OperationType.INSERT.getValue(),
                 BsonDocument.parse("{token: true}"),
                 BsonDocument.parse("{db: \"hello\", coll: \"pulsar\"}"),
                 BsonDocument.parse("{db: \"hello2\", coll: \"pulsar2\"}"),
                 new Document("hello", "pulsar"),
+                new Document("hello", "pulsar"), // documentKey
                 BsonDocument.parse("{_id: 1}"),
                 new BsonTimestamp(1234, 2),
-                null,
+                null, // UpdateDescription
                 new BsonInt64(1),
-                BsonDocument.parse("{id: 1, uid: 1}")));
+                BsonDocument.parse("{id: 1, uid: 1}"),
+                null, // BsonDateTime
+                null, // SplitEvent
+                BsonDocument.parse("{extra: 1}")));
 
         Record<byte[]> record = source.read();
 
diff --git a/src/owasp-dependency-check-suppressions.xml 
b/src/owasp-dependency-check-suppressions.xml
index 1ce7392a489..f302c251f96 100644
--- a/src/owasp-dependency-check-suppressions.xml
+++ b/src/owasp-dependency-check-suppressions.xml
@@ -364,49 +364,6 @@
         <cpe>cpe:/a:apache:solr</cpe>
     </suppress>
 
-    <!-- debezium-related misdetections -->
-    <suppress>
-        <notes><![CDATA[
-       file name: debezium-connector-mysql-1.9.7.Final.jar
-       ]]></notes>
-        <sha1>74c623b4a9b231e2f0e8f6053b38abd3bc487ce2</sha1>
-        <cve>CVE-2017-15945</cve>
-    </suppress>
-    <suppress>
-        <notes><![CDATA[
-       file name: mysql-binlog-connector-java-0.27.2.jar
-       ]]></notes>
-        <sha1>23294cd730e29c00b8ddfbde517dfc955aba090f</sha1>
-        <cve>CVE-2017-15945</cve>
-    </suppress>
-    <suppress>
-        <notes><![CDATA[
-       file name: debezium-connector-postgres-1.9.7.Final.jar
-       ]]></notes>
-        <sha1>300ff0bbf795643e914b7c8a6d6ba812a8354d62</sha1>
-        <cve>CVE-2015-0241</cve>
-        <cve>CVE-2015-0242</cve>
-        <cve>CVE-2015-0243</cve>
-        <cve>CVE-2015-0244</cve>
-        <cve>CVE-2015-3166</cve>
-        <cve>CVE-2015-3167</cve>
-        <cve>CVE-2016-0766</cve>
-        <cve>CVE-2016-0768</cve>
-        <cve>CVE-2016-0773</cve>
-        <cve>CVE-2016-5423</cve>
-        <cve>CVE-2016-5424</cve>
-        <cve>CVE-2016-7048</cve>
-        <cve>CVE-2017-14798</cve>
-        <cve>CVE-2017-7484</cve>
-        <cve>CVE-2018-1115</cve>
-        <cve>CVE-2019-10127</cve>
-        <cve>CVE-2019-10128</cve>
-        <cve>CVE-2019-10210</cve>
-        <cve>CVE-2019-10211</cve>
-        <cve>CVE-2020-25694</cve>
-        <cve>CVE-2020-25695</cve>
-        <cve>CVE-2021-23214</cve>
-    </suppress>
     <suppress>
         <notes><![CDATA[
        file name: protostream-types-4.4.1.Final.jar
diff --git a/tests/integration/pom.xml b/tests/integration/pom.xml
index 0bf0c0186c7..fdad69444bb 100644
--- a/tests/integration/pom.xml
+++ b/tests/integration/pom.xml
@@ -34,7 +34,7 @@
 
   <properties>
     <integrationTestSuiteFile>pulsar.xml</integrationTestSuiteFile>
-    <mongo-reactivestreams.version>4.1.2</mongo-reactivestreams.version>
+    <mongo-reactivestreams.version>5.2.0</mongo-reactivestreams.version>
     
<inttest.asyncprofiler.opts>event=cpu,lock=1ms,alloc=2m,jfrsync=profile</inttest.asyncprofiler.opts>
     
<inttest.asyncprofiler.outputformat>jfr</inttest.asyncprofiler.outputformat>
     <inttest.asyncprofiler.dir></inttest.asyncprofiler.dir>
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/DebeziumMongoDbContainer.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/DebeziumMongoDbContainer.java
index 481725d145b..6fa2e9ff471 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/DebeziumMongoDbContainer.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/DebeziumMongoDbContainer.java
@@ -25,7 +25,7 @@ public class DebeziumMongoDbContainer extends 
ChaosContainer<DebeziumMongoDbCont
     public static final String NAME = "debezium-mongodb-example";
 
     public static final Integer[] PORTS = { 27017 };
-    private static final String IMAGE_NAME = "debezium/example-mongodb:0.10";
+    private static final String IMAGE_NAME = 
"debezium/example-mongodb:3.0.0.Final";
 
     public DebeziumMongoDbContainer(String clusterName) {
         super(clusterName, IMAGE_NAME);
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/DebeziumMySQLContainer.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/DebeziumMySQLContainer.java
index 27d624a6c82..cf59cda868d 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/DebeziumMySQLContainer.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/DebeziumMySQLContainer.java
@@ -26,7 +26,7 @@ public class DebeziumMySQLContainer extends 
ChaosContainer<DebeziumMySQLContaine
     public static final String NAME = "debezium-mysql-example";
     static final Integer[] PORTS = { 3306 };
 
-    private static final String IMAGE_NAME = "debezium/example-mysql:0.8";
+    private static final String IMAGE_NAME = 
"debezium/example-mysql:3.0.0.Final";
 
     public DebeziumMySQLContainer(String clusterName) {
         super(clusterName, IMAGE_NAME);
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/DebeziumPostgreSqlContainer.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/DebeziumPostgreSqlContainer.java
index 479869c4183..4fd391fd926 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/DebeziumPostgreSqlContainer.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/DebeziumPostgreSqlContainer.java
@@ -26,7 +26,7 @@ public class DebeziumPostgreSqlContainer extends 
ChaosContainer<DebeziumPostgreS
     public static final String NAME = "debezium-postgresql-example";
     static final Integer[] PORTS = { 5432 };
 
-    private static final String IMAGE_NAME = "debezium/example-postgres:0.10";
+    private static final String IMAGE_NAME = 
"debezium/example-postgres:3.0.0.Final";
 
     public DebeziumPostgreSqlContainer(String clusterName) {
         super(clusterName, IMAGE_NAME);
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/PulsarIOSourceRunner.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/PulsarIOSourceRunner.java
index 92918a14006..7cd1c3ebf66 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/PulsarIOSourceRunner.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/PulsarIOSourceRunner.java
@@ -272,6 +272,21 @@ public class PulsarIOSourceRunner extends 
PulsarIOTestRunner {
             result.getStdout()
         );
         result.assertNoStderr();
+
+        final String[] packageCommands = {
+            PulsarCluster.ADMIN_SCRIPT,
+            "packages",
+            "delete",
+            "source://" + tenant + "/" + namespace + "/" + sourceName + "@0"
+        };
+
+        try {
+            ContainerExecResult packageResult = 
pulsarCluster.getAnyWorker().execCmd(packageCommands);
+            log.info("Package metadata deletion result: {}", 
packageResult.getStdout());
+        } catch (Exception e) {
+            log.warn("Failed to delete package metadata for 
source://{}/{}/{}@0: {}",
+                     tenant, namespace, sourceName, e.getMessage());
+        }
     }
 
     protected void getSourceInfoNotFound(String tenant, String namespace, 
String sourceName) throws Exception {
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/SourceTester.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/SourceTester.java
index 95b075e660a..bd2c95b5772 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/SourceTester.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/SourceTester.java
@@ -58,6 +58,8 @@ public abstract class SourceTester<ServiceContainerT extends 
GenericContainer> i
         add("source");
         add("op");
         add("ts_ms");
+        add("ts_us");
+        add("ts_ns");
         add("transaction");
     }};
 
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumMongoDbSourceTester.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumMongoDbSourceTester.java
index 4c562f3e244..1826d78f351 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumMongoDbSourceTester.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumMongoDbSourceTester.java
@@ -42,15 +42,17 @@ public class DebeziumMongoDbSourceTester extends 
SourceTester<DebeziumMongoDbCon
         this.pulsarCluster = cluster;
         pulsarServiceUrl = "pulsar://pulsar-proxy:" + 
PulsarContainer.BROKER_PORT;
 
-        sourceConfig.put("mongodb.hosts", "rs0/" + 
DebeziumMongoDbContainer.NAME + ":27017");
-        sourceConfig.put("mongodb.name", "dbserver1");
+        sourceConfig.put("mongodb.connection.string",
+                "mongodb://debezium:dbz@" + DebeziumMongoDbContainer.NAME + 
":27017/admin?replicaSet=rs0");
         sourceConfig.put("mongodb.user", "debezium");
         sourceConfig.put("mongodb.password", "dbz");
         sourceConfig.put("mongodb.task.id", "1");
+        sourceConfig.put("topic.prefix", "dbserver1");
         sourceConfig.put("database.include.list", "inventory");
-        sourceConfig.put("database.history.pulsar.service.url", 
pulsarServiceUrl);
+        sourceConfig.put("schema.history.internal.pulsar.service.url", 
pulsarServiceUrl);
         sourceConfig.put("topic.namespace", "debezium/mongodb");
-        sourceConfig.put("capture.mode", "oplog");
+        sourceConfig.put("capture.mode", "change_streams_update_full");
+        sourceConfig.put("connector.class", 
"io.debezium.connector.mongodb.MongoDbConnector");
     }
 
     @Override
@@ -69,10 +71,10 @@ public class DebeziumMongoDbSourceTester extends 
SourceTester<DebeziumMongoDbCon
     @Override
     public void prepareInsertEvent() throws Exception {
         this.debeziumMongoDbContainer.execCmd("/bin/bash", "-c",
-                "mongo -u debezium -p dbz --authenticationDatabase admin 
localhost:27017/inventory "
+                "mongosh -u debezium -p dbz --authenticationDatabase admin 
localhost:27017/inventory "
                         + "--eval 'db.products.find()'");
         this.debeziumMongoDbContainer.execCmd("/bin/bash", "-c",
-                "mongo -u debezium -p dbz --authenticationDatabase admin 
localhost:27017/inventory "
+                "mongosh -u debezium -p dbz --authenticationDatabase admin 
localhost:27017/inventory "
                         + "--eval 'db.products.insert({ "
                         + "_id : NumberLong(\"110\"),"
                         + "name : \"test-debezium\","
@@ -84,20 +86,20 @@ public class DebeziumMongoDbSourceTester extends 
SourceTester<DebeziumMongoDbCon
     @Override
     public void prepareDeleteEvent() throws Exception {
         this.debeziumMongoDbContainer.execCmd("/bin/bash", "-c",
-                "mongo -u debezium -p dbz --authenticationDatabase admin 
localhost:27017/inventory "
+                "mongosh -u debezium -p dbz --authenticationDatabase admin 
localhost:27017/inventory "
                         + "--eval 'db.products.find()'");
         this.debeziumMongoDbContainer.execCmd("/bin/bash", "-c",
-                "mongo -u debezium -p dbz --authenticationDatabase admin 
localhost:27017/inventory "
+                "mongosh -u debezium -p dbz --authenticationDatabase admin 
localhost:27017/inventory "
                         + "--eval 'db.products.deleteOne({name : 
\"test-debezium-update\"})'");
     }
 
     @Override
     public void prepareUpdateEvent() throws Exception {
         this.debeziumMongoDbContainer.execCmd("/bin/bash", "-c",
-                "mongo -u debezium -p dbz --authenticationDatabase admin 
localhost:27017/inventory "
+                "mongosh -u debezium -p dbz --authenticationDatabase admin 
localhost:27017/inventory "
                         + "--eval 'db.products.find()'");
         this.debeziumMongoDbContainer.execCmd("/bin/bash", "-c",
-                "mongo -u debezium -p dbz --authenticationDatabase admin 
localhost:27017/inventory "
+                "mongosh -u debezium -p dbz --authenticationDatabase admin 
localhost:27017/inventory "
                         + "--eval 'db.products.update({"
                         + "_id : 110},"
                         + "{$set:{name:\"test-debezium-update\", description: 
\"this is update description\"}})'");
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumMsSqlSourceTester.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumMsSqlSourceTester.java
index 25a7544f52b..2295a670660 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumMsSqlSourceTester.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumMsSqlSourceTester.java
@@ -58,11 +58,15 @@ public class DebeziumMsSqlSourceTester extends 
SourceTester<DebeziumMsSqlContain
         sourceConfig.put("database.port", "1433");
         sourceConfig.put("database.user", "sa");
         sourceConfig.put("database.password", 
DebeziumMsSqlContainer.SA_PASSWORD);
-        sourceConfig.put("database.server.name", "mssql");
-        sourceConfig.put("database.dbname", "TestDB");
+        sourceConfig.put("database.names", "TestDB");
+        sourceConfig.put("database.encrypt", "false");
         sourceConfig.put("snapshot.mode", "schema_only");
-        sourceConfig.put("database.history.pulsar.service.url", 
pulsarServiceUrl);
+        sourceConfig.put("schema.history.internal.pulsar.topic", 
"debezium-schema-history-mssql");
+        sourceConfig.put("schema.history.internal.pulsar.service.url", 
pulsarServiceUrl);
+        sourceConfig.put("topic.prefix", "mssql");
         sourceConfig.put("topic.namespace", "debezium/mssql");
+        sourceConfig.put("task.id", "1");
+        sourceConfig.put("connector.class", 
"io.debezium.connector.sqlserver.SqlServerConnector");
     }
 
     @Override
@@ -145,12 +149,12 @@ public class DebeziumMsSqlSourceTester extends 
SourceTester<DebeziumMsSqlContain
 
     @Override
     public String keyContains() {
-        return "mssql.dbo.customers.Key";
+        return "TestDB";
     }
 
     @Override
     public String valueContains() {
-        return "mssql.dbo.customers.Value";
+        return "TestDB";
     }
 
     @Override
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumMySqlSourceTester.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumMySqlSourceTester.java
index 2c457cd2fb9..7f9ccbf5d20 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumMySqlSourceTester.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumMySqlSourceTester.java
@@ -58,15 +58,16 @@ public class DebeziumMySqlSourceTester extends 
SourceTester<DebeziumMySQLContain
         sourceConfig.put("database.user", "debezium");
         sourceConfig.put("database.password", "dbz");
         sourceConfig.put("database.server.id", "184054");
-        sourceConfig.put("database.server.name", "dbserver1");
-        sourceConfig.put("database.whitelist", "inventory");
+        sourceConfig.put("topic.prefix", "dbserver1");
+        sourceConfig.put("database.include.list", "inventory");
         if (!testWithClientBuilder) {
-            sourceConfig.put("database.history.pulsar.service.url", 
pulsarServiceUrl);
+            sourceConfig.put("schema.history.internal.pulsar.service.url", 
pulsarServiceUrl);
         }
         sourceConfig.put("key.converter", converterClassName);
         sourceConfig.put("value.converter", converterClassName);
         sourceConfig.put("topic.namespace", "debezium/mysql-"
                + (converterClassName.endsWith("AvroConverter") ? "avro" : 
"json"));
+        sourceConfig.put("connector.class", 
"io.debezium.connector.mysql.MySqlConnector");
     }
 
     @Override
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumOracleDbSourceTester.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumOracleDbSourceTester.java
index 41db0a7cc18..39b4361ee8a 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumOracleDbSourceTester.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumOracleDbSourceTester.java
@@ -58,13 +58,15 @@ public class DebeziumOracleDbSourceTester extends 
SourceTester<DebeziumOracleDbC
         sourceConfig.put("database.port", "1521");
         sourceConfig.put("database.user", "dbzuser");
         sourceConfig.put("database.password", "dbz");
-        sourceConfig.put("database.server.name", "XE");
+        sourceConfig.put("topic.prefix", "XE");
         sourceConfig.put("database.dbname", "XE");
         sourceConfig.put("snapshot.mode", "schema_only");
 
         sourceConfig.put("schema.include.list", "inv");
-        sourceConfig.put("database.history.pulsar.service.url", 
pulsarServiceUrl);
+        sourceConfig.put("schema.history.internal.pulsar.service.url", 
pulsarServiceUrl);
         sourceConfig.put("topic.namespace", "debezium/oracle");
+
+        sourceConfig.put("connector.class", 
"io.debezium.connector.oracle.OracleConnector");
     }
 
     @Override
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumPostgreSqlSourceTester.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumPostgreSqlSourceTester.java
index 75b071c0bd8..e23a5f1ad50 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumPostgreSqlSourceTester.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/DebeziumPostgreSqlSourceTester.java
@@ -70,13 +70,13 @@ public class DebeziumPostgreSqlSourceTester extends 
SourceTester<DebeziumPostgre
         sourceConfig.put("database.port", "5432");
         sourceConfig.put("database.user", "postgres");
         sourceConfig.put("database.password", "postgres");
-        sourceConfig.put("database.server.id", "184055");
-        sourceConfig.put("database.server.name", "dbserver1");
+        sourceConfig.put("topic.prefix", "dbserver1");
         sourceConfig.put("database.dbname", "postgres");
-        sourceConfig.put("schema.whitelist", "inventory");
-        sourceConfig.put("table.blacklist", 
"inventory.spatial_ref_sys,inventory.geom");
-        sourceConfig.put("database.history.pulsar.service.url", 
pulsarServiceUrl);
+        sourceConfig.put("schema.include.list", "inventory");
+        sourceConfig.put("table.exclude.list", 
"inventory.spatial_ref_sys,inventory.geom");
+        sourceConfig.put("schema.history.internal.pulsar.service.url", 
pulsarServiceUrl);
         sourceConfig.put("topic.namespace", "debezium/postgresql");
+        sourceConfig.put("connector.class", 
"io.debezium.connector.postgresql.PostgresConnector");
     }
 
     @Override
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/PulsarDebeziumSourcesTest.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/PulsarDebeziumSourcesTest.java
index 50160d94eef..eb229410391 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/PulsarDebeziumSourcesTest.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/PulsarDebeziumSourcesTest.java
@@ -91,7 +91,7 @@ public class PulsarDebeziumSourcesTest extends 
PulsarIOTestBase {
                 + "-" + functionRuntimeType + "-" + randomName(8);
 
         // This is the binlog count that contained in mysql container.
-        final int numMessages = 47;
+        final int numMessages = 52;
 
         @Cleanup
         PulsarClient client = PulsarClient.builder()
@@ -214,7 +214,7 @@ public class PulsarDebeziumSourcesTest extends 
PulsarIOTestBase {
         final String tenant = TopicName.PUBLIC_TENANT;
         final String namespace = TopicName.DEFAULT_NAMESPACE;
         final String outputTopicName = "debe-output-topic-name-" + 
testId.getAndIncrement();
-        final String consumeTopicName = "debezium/mssql/mssql.dbo.customers";
+        final String consumeTopicName = 
"debezium/mssql/mssql.TestDB.dbo.customers";
         final String sourceName = "test-source-debezium-mssql-" + 
functionRuntimeType + "-" + randomName(8);
 
         final int numMessages = 1;
diff --git a/tests/scripts/pre-integ-tests.sh b/tests/scripts/pre-integ-tests.sh
index 9f564a3f5f9..f8ba1bdc701 100755
--- a/tests/scripts/pre-integ-tests.sh
+++ b/tests/scripts/pre-integ-tests.sh
@@ -30,5 +30,5 @@ docker pull apachepulsar/s3mock:latest
 docker pull alpine/socat:latest
 docker pull cassandra:3
 docker pull confluentinc/cp-kafka:4.0.0
-docker pull debezium/example-mysql:0.8
-docker pull mysql:5.7.22
+docker pull debezium/example-mysql:3.0.0.Final
+docker pull mysql:9.1.0

Reply via email to