tisonkun commented on PR #12999: URL: https://github.com/apache/pulsar/pull/12999#issuecomment-1339542460
This will be supported at https://github.com/ClickHouse/clickhouse-jdbc/issues/894 0.3.3 directly by: ```java String connString = "jdbc:ch://server1,server2,server3/database" + "?load_balancing_policy=random&health_check_interval=5000&failover=2"; ``` Closing and waiting for a version bump... If you do want to continue this patch, I suggest: ```diff diff --git a/pulsar-io/jdbc/clickhouse/pom.xml b/pulsar-io/jdbc/clickhouse/pom.xml index 82c5983bb2..50092e749d 100644 --- a/pulsar-io/jdbc/clickhouse/pom.xml +++ b/pulsar-io/jdbc/clickhouse/pom.xml @@ -41,7 +41,6 @@ <groupId>ru.yandex.clickhouse</groupId> <artifactId>clickhouse-jdbc</artifactId> <version>${clickhouse-jdbc.version}</version> - <scope>runtime</scope> </dependency> </dependencies> diff --git a/pulsar-io/jdbc/clickhouse/src/main/java/org/apache/pulsar/io/jdbc/ClickHouseJdbcAutoSchemaSink.java b/pulsar-io/jdbc/clickhouse/src/main/java/org/apache/pulsar/io/jdbc/ClickHouseJdbcAutoSchemaSink.java index 1dde785292..2437ed108c 100644 --- a/pulsar-io/jdbc/clickhouse/src/main/java/org/apache/pulsar/io/jdbc/ClickHouseJdbcAutoSchemaSink.java +++ b/pulsar-io/jdbc/clickhouse/src/main/java/org/apache/pulsar/io/jdbc/ClickHouseJdbcAutoSchemaSink.java @@ -18,8 +18,13 @@ */ package org.apache.pulsar.io.jdbc; +import java.sql.Connection; +import java.sql.SQLException; +import java.util.Properties; import org.apache.pulsar.io.core.annotations.Connector; import org.apache.pulsar.io.core.annotations.IOType; +import ru.yandex.clickhouse.BalancedClickhouseDataSource; +import ru.yandex.clickhouse.ClickHouseDriver; @Connector( name = "jdbc-clickhouse", @@ -28,5 +33,9 @@ import org.apache.pulsar.io.core.annotations.IOType; configClass = JdbcSinkConfig.class ) public class ClickHouseJdbcAutoSchemaSink extends BaseJdbcAutoSchemaSink { - + @Override + protected Connection createConnection(String jdbcUrl, Properties properties) throws SQLException { + final BalancedClickhouseDataSource ds = new BalancedClickhouseDataSource(jdbcUrl, properties); + return ds.getConnection(); + } } diff --git a/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java b/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java index 4586fcebcf..cbfa9b82c9 100644 --- a/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java +++ b/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java @@ -94,7 +94,7 @@ public abstract class JdbcAbstractSink<T> implements Sink<T> { properties.setProperty("password", password); } - connection = DriverManager.getConnection(jdbcSinkConfig.getJdbcUrl(), properties); + connection = createConnection(jdbcSinkConfig.getJdbcUrl(), properties); connection.setAutoCommit(!jdbcSinkConfig.isUseTransactions()); log.info("Opened jdbc connection: {}, autoCommit: {}", jdbcUrl, connection.getAutoCommit()); @@ -114,6 +114,10 @@ public abstract class JdbcAbstractSink<T> implements Sink<T> { } } + protected Connection createConnection(String jdbcUrl, Properties properties) throws SQLException { + return DriverManager.getConnection(jdbcUrl, properties); + } + private void initStatement() throws Exception { List<String> keyList = getListFromConfig(jdbcSinkConfig.getKey()); List<String> nonKeyList = getListFromConfig(jdbcSinkConfig.getNonKey()); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
