This is an automated email from the ASF dual-hosted git repository.
nicoloboschi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 641887c859a [fix][io] KCA connectors: fix missing runtime dependencies
(#18370)
641887c859a is described below
commit 641887c859ab5b5bb180c74c36e67d6f1adc5fd0
Author: Nicolò Boschi <[email protected]>
AuthorDate: Tue Nov 8 13:41:33 2022 +0100
[fix][io] KCA connectors: fix missing runtime dependencies (#18370)
* [fix][io] KCA connectors: fix missing runtime dependencies
* fix compile
* fix compile
* fix noclassdeffound error
* Fix dep and reduce debezium also
* Fix debezium unit test
* improve object mapper
---
pulsar-functions/runtime-all/pom.xml | 1 +
pulsar-io/debezium/core/pom.xml | 25 +++++++---
.../pulsar/io/debezium/PulsarDatabaseHistory.java | 14 ++++--
pulsar-io/debezium/mongodb/pom.xml | 7 +++
pulsar-io/debezium/mssql/pom.xml | 7 +++
pulsar-io/debezium/mysql/pom.xml | 6 +++
pulsar-io/debezium/oracle/pom.xml | 7 +++
pulsar-io/debezium/postgres/pom.xml | 7 +++
pulsar-io/kafka-connect-adaptor/pom.xml | 58 ++++++++++++++++++----
.../io/kafka/connect/PulsarOffsetBackingStore.java | 15 +++++-
.../tests/integration/io/PulsarIOTestBase.java | 5 ++
.../integration/io/sinks/PulsarSinksTest.java | 5 ++
.../debezium/PulsarDebeziumOracleSourceTest.java | 5 ++
.../debezium/PulsarDebeziumSourcesTest.java | 5 ++
14 files changed, 146 insertions(+), 21 deletions(-)
diff --git a/pulsar-functions/runtime-all/pom.xml
b/pulsar-functions/runtime-all/pom.xml
index e53f7aa6c2c..bf0b3d36eb1 100644
--- a/pulsar-functions/runtime-all/pom.xml
+++ b/pulsar-functions/runtime-all/pom.xml
@@ -42,6 +42,7 @@
6. log4j-api
7. log4j-core
8. AVRO
+ 9. protobuf-java
-->
<artifactId>pulsar-functions-runtime-all</artifactId>
diff --git a/pulsar-io/debezium/core/pom.xml b/pulsar-io/debezium/core/pom.xml
index 5b4b7f4deb8..f3701887faf 100644
--- a/pulsar-io/debezium/core/pom.xml
+++ b/pulsar-io/debezium/core/pom.xml
@@ -36,12 +36,7 @@
<groupId>${project.groupId}</groupId>
<artifactId>pulsar-io-core</artifactId>
<version>${project.version}</version>
- </dependency>
-
- <dependency>
- <groupId>${project.groupId}</groupId>
- <artifactId>pulsar-client-original</artifactId>
- <version>${project.version}</version>
+ <scope>provided</scope>
</dependency>
<dependency>
@@ -50,6 +45,17 @@
<version>${debezium.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-lang3</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>pulsar-common</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>pulsar-io-kafka-connect-adaptor</artifactId>
@@ -103,6 +109,13 @@
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>pulsar-client-original</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+
</dependencies>
</project>
diff --git
a/pulsar-io/debezium/core/src/main/java/org/apache/pulsar/io/debezium/PulsarDatabaseHistory.java
b/pulsar-io/debezium/core/src/main/java/org/apache/pulsar/io/debezium/PulsarDatabaseHistory.java
index 9669a6e131b..7ca0d309cf9 100644
---
a/pulsar-io/debezium/core/src/main/java/org/apache/pulsar/io/debezium/PulsarDatabaseHistory.java
+++
b/pulsar-io/debezium/core/src/main/java/org/apache/pulsar/io/debezium/PulsarDatabaseHistory.java
@@ -18,9 +18,9 @@
*/
package org.apache.pulsar.io.debezium;
-import static org.apache.commons.lang.StringUtils.isBlank;
-import static
org.apache.pulsar.io.common.IOConfigUtils.loadConfigFromJsonString;
+import static org.apache.commons.lang3.StringUtils.isBlank;
import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import io.debezium.annotation.ThreadSafe;
import io.debezium.config.Configuration;
@@ -33,6 +33,7 @@ import io.debezium.relational.history.DatabaseHistoryListener;
import io.debezium.relational.history.HistoryRecord;
import io.debezium.relational.history.HistoryRecordComparator;
import java.io.IOException;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
@@ -99,6 +100,7 @@ public final class PulsarDatabaseHistory extends
AbstractDatabaseHistory {
DatabaseHistory.NAME,
READER_CONFIG);
+ private final ObjectMapper mapper = new ObjectMapper();
private final DocumentReader reader = DocumentReader.defaultReader();
private String topicName;
private Map<String, Object> readerConfigMap = new HashMap<>();
@@ -120,7 +122,13 @@ public final class PulsarDatabaseHistory extends
AbstractDatabaseHistory {
}
this.topicName = config.getString(TOPIC);
try {
- this.readerConfigMap =
loadConfigFromJsonString(config.getString(READER_CONFIG));
+ final String configString = config.getString(READER_CONFIG);
+ if (configString == null) {
+ this.readerConfigMap = Collections.emptyMap();
+ } else {
+ this.readerConfigMap = mapper.readValue(configString,
Map.class);
+ }
+
} catch (JsonProcessingException exception) {
log.warn("The provided reader configs are invalid, "
+ "will not passing any extra config to the reader
builder.", exception);
diff --git a/pulsar-io/debezium/mongodb/pom.xml
b/pulsar-io/debezium/mongodb/pom.xml
index 1650fe2a650..e01948d1125 100644
--- a/pulsar-io/debezium/mongodb/pom.xml
+++ b/pulsar-io/debezium/mongodb/pom.xml
@@ -32,6 +32,13 @@
<dependencies>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>pulsar-io-core</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>pulsar-io-debezium-core</artifactId>
diff --git a/pulsar-io/debezium/mssql/pom.xml b/pulsar-io/debezium/mssql/pom.xml
index 6f5fb6dd5d8..62da307f4b8 100644
--- a/pulsar-io/debezium/mssql/pom.xml
+++ b/pulsar-io/debezium/mssql/pom.xml
@@ -32,6 +32,13 @@
<dependencies>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>pulsar-io-core</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>pulsar-io-debezium-core</artifactId>
diff --git a/pulsar-io/debezium/mysql/pom.xml b/pulsar-io/debezium/mysql/pom.xml
index fa0c7b79535..d3c18122086 100644
--- a/pulsar-io/debezium/mysql/pom.xml
+++ b/pulsar-io/debezium/mysql/pom.xml
@@ -31,6 +31,12 @@
<name>Pulsar IO :: Debezium :: mysql</name>
<dependencies>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>pulsar-io-core</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
<dependency>
<groupId>${project.groupId}</groupId>
diff --git a/pulsar-io/debezium/oracle/pom.xml
b/pulsar-io/debezium/oracle/pom.xml
index 030a72b09b4..3da3b8d9ac2 100644
--- a/pulsar-io/debezium/oracle/pom.xml
+++ b/pulsar-io/debezium/oracle/pom.xml
@@ -32,6 +32,13 @@
<dependencies>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>pulsar-io-core</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>pulsar-io-debezium-core</artifactId>
diff --git a/pulsar-io/debezium/postgres/pom.xml
b/pulsar-io/debezium/postgres/pom.xml
index 7c8d74a229f..2b9596c9f88 100644
--- a/pulsar-io/debezium/postgres/pom.xml
+++ b/pulsar-io/debezium/postgres/pom.xml
@@ -32,6 +32,13 @@
<dependencies>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>pulsar-io-core</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>pulsar-io-debezium-core</artifactId>
diff --git a/pulsar-io/kafka-connect-adaptor/pom.xml
b/pulsar-io/kafka-connect-adaptor/pom.xml
index af62fbb6126..19aa1f522d0 100644
--- a/pulsar-io/kafka-connect-adaptor/pom.xml
+++ b/pulsar-io/kafka-connect-adaptor/pom.xml
@@ -36,18 +36,17 @@
<groupId>${project.groupId}</groupId>
<artifactId>pulsar-io-core</artifactId>
<version>${project.version}</version>
+ <scope>provided</scope>
</dependency>
<dependency>
- <groupId>${project.groupId}</groupId>
- <artifactId>pulsar-io-common</artifactId>
- <version>${project.version}</version>
- <exclusions>
- <exclusion>
- <groupId>*</groupId>
- <artifactId>*</artifactId>
- </exclusion>
- </exclusions>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-databind</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>com.fasterxml.jackson.dataformat</groupId>
+ <artifactId>jackson-dataformat-yaml</artifactId>
</dependency>
<dependency>
@@ -82,11 +81,28 @@
<version>${kafka-client.version}</version>
</dependency>
+ <!-- pulsar-client is only needed for MessageId conversion (for seeking),
commons-lang3 and Netty buffer manipulation -->
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>pulsar-client-original</artifactId>
<version>${project.version}</version>
- <scope>provided</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>*</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-buffer</artifactId>
+ <version>${netty.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-lang3</artifactId>
</dependency>
<dependency>
@@ -105,6 +121,9 @@
</exclusions>
</dependency>
+
+
+
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>pulsar-broker</artifactId>
@@ -134,11 +153,30 @@
<type>test-jar</type>
</dependency>
+ <dependency>
+ <groupId>org.apache.avro</groupId>
+ <artifactId>avro</artifactId>
+ <version>${avro.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<scope>provided</scope>
</dependency>
+ <dependency>
+ <groupId>org.asynchttpclient</groupId>
+ <artifactId>async-http-client</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>com.typesafe.netty</groupId>
+ <artifactId>netty-reactive-streams</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
+
</project>
diff --git
a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarOffsetBackingStore.java
b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarOffsetBackingStore.java
index ed15ab62ef0..02f315af68f 100644
---
a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarOffsetBackingStore.java
+++
b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarOffsetBackingStore.java
@@ -20,15 +20,17 @@ package org.apache.pulsar.io.kafka.connect;
import static com.google.common.base.Preconditions.checkArgument;
import static java.nio.charset.StandardCharsets.UTF_8;
-import static org.apache.commons.lang.StringUtils.isBlank;
-import static
org.apache.pulsar.io.common.IOConfigUtils.loadConfigFromJsonString;
+import static org.apache.commons.lang3.StringUtils.isBlank;
import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.Unpooled;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
@@ -53,6 +55,7 @@ import org.apache.pulsar.client.api.Schema;
@Slf4j
public class PulsarOffsetBackingStore implements OffsetBackingStore {
+ private final ObjectMapper mapper = new ObjectMapper();
private final Map<ByteBuffer, ByteBuffer> data = new ConcurrentHashMap<>();
private PulsarClient client;
private String topic;
@@ -249,4 +252,12 @@ public class PulsarOffsetBackingStore implements
OffsetBackingStore {
}
});
}
+
+ private Map<String, Object> loadConfigFromJsonString(String config) throws
JsonProcessingException {
+ if (!isBlank(config)) {
+ return mapper.readValue(config, new TypeReference<>() {});
+ } else {
+ return Collections.emptyMap();
+ }
+ }
}
diff --git
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/PulsarIOTestBase.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/PulsarIOTestBase.java
index e33bdd8e54d..92aa6fd0134 100644
---
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/PulsarIOTestBase.java
+++
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/PulsarIOTestBase.java
@@ -23,10 +23,15 @@ import
org.apache.pulsar.tests.integration.io.sinks.PulsarIOSinkRunner;
import org.apache.pulsar.tests.integration.io.sinks.SinkTester;
import org.apache.pulsar.tests.integration.io.sources.PulsarIOSourceRunner;
import org.apache.pulsar.tests.integration.io.sources.SourceTester;
+import org.apache.pulsar.tests.integration.topologies.FunctionRuntimeType;
import org.testcontainers.containers.GenericContainer;
public abstract class PulsarIOTestBase extends PulsarFunctionsTestBase {
+ public PulsarIOTestBase(FunctionRuntimeType functionRuntimeType) {
+ super(functionRuntimeType);
+ }
+
@SuppressWarnings({"unchecked", "rawtypes"})
protected void testSink(SinkTester tester, boolean builtin) throws
Exception {
tester.startServiceContainer(pulsarCluster);
diff --git
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/PulsarSinksTest.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/PulsarSinksTest.java
index 1775b693f18..42630741833 100644
---
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/PulsarSinksTest.java
+++
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sinks/PulsarSinksTest.java
@@ -23,12 +23,17 @@ import
org.apache.pulsar.tests.integration.io.PulsarIOTestBase;
import org.apache.pulsar.tests.integration.io.RabbitMQSinkTester;
import org.apache.pulsar.tests.integration.io.RabbitMQSourceTester;
import org.apache.pulsar.tests.integration.io.sources.KafkaSourceTester;
+import org.apache.pulsar.tests.integration.topologies.FunctionRuntimeType;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
public class PulsarSinksTest extends PulsarIOTestBase {
+ public PulsarSinksTest() {
+ super(FunctionRuntimeType.PROCESS);
+ }
+
@DataProvider(name = "withSchema")
public Object[][] withSchema() {
return new Object[][]{{Boolean.TRUE}, {Boolean.FALSE}};
diff --git
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/PulsarDebeziumOracleSourceTest.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/PulsarDebeziumOracleSourceTest.java
index 10e5d6dbdaf..76413d0843c 100644
---
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/PulsarDebeziumOracleSourceTest.java
+++
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/PulsarDebeziumOracleSourceTest.java
@@ -29,6 +29,7 @@ import
org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import
org.apache.pulsar.tests.integration.containers.DebeziumOracleDbContainer;
import org.apache.pulsar.tests.integration.io.PulsarIOTestBase;
+import org.apache.pulsar.tests.integration.topologies.FunctionRuntimeType;
import org.testng.annotations.Test;
import java.util.concurrent.atomic.AtomicInteger;
@@ -38,6 +39,10 @@ public class PulsarDebeziumOracleSourceTest extends
PulsarIOTestBase {
protected final AtomicInteger testId = new AtomicInteger(0);
+ public PulsarDebeziumOracleSourceTest() {
+ super(FunctionRuntimeType.PROCESS);
+ }
+
@Test(groups = "source", timeOut = 1800000)
public void testDebeziumOracleDbSource() throws Exception{
testDebeziumOracleDbConnect("org.apache.kafka.connect.json.JsonConverter",
true);
diff --git
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/PulsarDebeziumSourcesTest.java
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/PulsarDebeziumSourcesTest.java
index e0e48101c99..9da1f10e74a 100644
---
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/PulsarDebeziumSourcesTest.java
+++
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/sources/debezium/PulsarDebeziumSourcesTest.java
@@ -33,6 +33,7 @@ import
org.apache.pulsar.tests.integration.containers.DebeziumMsSqlContainer;
import org.apache.pulsar.tests.integration.containers.DebeziumMySQLContainer;
import
org.apache.pulsar.tests.integration.containers.DebeziumPostgreSqlContainer;
import org.apache.pulsar.tests.integration.io.PulsarIOTestBase;
+import org.apache.pulsar.tests.integration.topologies.FunctionRuntimeType;
import org.testng.annotations.Test;
import lombok.Cleanup;
@@ -43,6 +44,10 @@ public class PulsarDebeziumSourcesTest extends
PulsarIOTestBase {
protected final AtomicInteger testId = new AtomicInteger(0);
+ public PulsarDebeziumSourcesTest() {
+ super(FunctionRuntimeType.PROCESS);
+ }
+
@Test(groups = "source")
public void testDebeziumMySqlSourceJson() throws Exception {
testDebeziumMySqlConnect("org.apache.kafka.connect.json.JsonConverter", true,
false);