This is an automated email from the ASF dual-hosted git repository.
chamikara pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 97e5ccf0c6f Upgrade ClickHouseIO to use ClickHouse Java Client V2
(#37611)
97e5ccf0c6f is described below
commit 97e5ccf0c6f000eecef8edafb2e17babc417b767
Author: Bentsi Leviav <[email protected]>
AuthorDate: Tue Feb 17 22:01:22 2026 +0200
Upgrade ClickHouseIO to use ClickHouse Java Client V2 (#37611)
* update links
* add jdbc string parser to support backward compatibility
* add tests to jdbc parser
* add tests to check backward compatibility with the new jdbc parser
* bumping up the java client version
* set hardcoded api version due to
https://github.com/testcontainers/testcontainers-java/issues/11212
* update the base clickhouse test to use the recent java client
* refactor ClickHouseIO to use the new java client while maintaining
backward compatibility with previous API
* Adjust tests to use the new java client
* move numbers to CONSTs
* make consts static
* add github issue for better tracking
* add explanation of SharedMergeTree
* another const
* Update CHANGES.md
* correct clickhouse website
* add issue link instead of pr
* add documentation links
* Fix old yandex documentation links
* formating
* fail on JDBC options and properties mismatch
* add link to CH deduplication strategies
---
CHANGES.md | 3 +-
sdks/java/io/clickhouse/build.gradle | 7 +-
.../beam/sdk/io/clickhouse/ClickHouseIO.java | 393 ++++++++++++++++-----
.../sdk/io/clickhouse/ClickHouseJdbcUrlParser.java | 261 ++++++++++++++
.../apache/beam/sdk/io/clickhouse/TableSchema.java | 8 +-
.../beam/sdk/io/clickhouse/AtomicInsertTest.java | 28 +-
.../beam/sdk/io/clickhouse/BaseClickHouseTest.java | 140 ++++++--
.../ClickHouseIOJdbcBackwardCompatibilityTest.java | 98 +++++
.../ClickHouseIOPropertyMergingTest.java | 211 +++++++++++
.../beam/sdk/io/clickhouse/ClickHouseIOTest.java | 142 ++++----
.../io/clickhouse/ClickHouseJdbcUrlParserTest.java | 341 ++++++++++++++++++
11 files changed, 1442 insertions(+), 190 deletions(-)
diff --git a/CHANGES.md b/CHANGES.md
index bd24be1989d..1b72f92d278 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -68,6 +68,7 @@
* Add Datadog IO support (Java)
([#37318](https://github.com/apache/beam/issues/37318)).
* Remove Pubsublite IO support, since service will be deprecated in March
2026. ([#37375](https://github.com/apache/beam/issues/37375)).
+* (Java) ClickHouse - migrating from the legacy JDBC driver (v0.6.3) to
ClickHouse Java Client v2 (v0.9.6). See the [class
documentation](https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/clickhouse/ClickHouseIO.html)
for migration guide ([#37610](https://github.com/apache/beam/issues/37610)).
## New Features / Improvements
@@ -2357,4 +2358,4 @@ Schema Options, it will be removed in version `2.23.0`.
([BEAM-9704](https://iss
## Highlights
-- For versions 2.19.0 and older release notes are available on [Apache Beam
Blog](https://beam.apache.org/blog/).
\ No newline at end of file
+- For versions 2.19.0 and older release notes are available on [Apache Beam
Blog](https://beam.apache.org/blog/).
diff --git a/sdks/java/io/clickhouse/build.gradle
b/sdks/java/io/clickhouse/build.gradle
index 70f3c1a6387..4923bf32a43 100644
--- a/sdks/java/io/clickhouse/build.gradle
+++ b/sdks/java/io/clickhouse/build.gradle
@@ -31,7 +31,7 @@ applyJavaNature(
)
description = "Apache Beam :: SDKs :: Java :: IO :: ClickHouse"
-ext.summary = "IO to write to ClickHouse (https://clickhouse.yandex)."
+ext.summary = "IO to write to ClickHouse (https://clickhouse.com)."
// Match the output directory for generated code with the package, to be more
tool-friendly
def generatedJavaccSourceDir = "${project.buildDir}/generated/javacc"
@@ -50,7 +50,7 @@ idea {
}
}
-def clickhouse_jdbc_version = "0.6.4"
+def clickhouse_java_client_version = "0.9.6"
dependencies {
javacc "net.java.dev.javacc:javacc:7.0.9"
@@ -59,11 +59,12 @@ dependencies {
implementation library.java.joda_time
implementation library.java.slf4j_api
implementation library.java.vendored_guava_32_1_2_jre
- implementation "com.clickhouse:clickhouse-jdbc:$clickhouse_jdbc_version:all"
+ implementation "com.clickhouse:client-v2:$clickhouse_java_client_version:all"
testImplementation library.java.slf4j_api
testImplementation library.java.junit
testImplementation library.java.hamcrest
testImplementation library.java.testcontainers_clickhouse
+ testImplementation
"com.clickhouse:clickhouse-jdbc:$clickhouse_java_client_version:all"
testRuntimeOnly library.java.slf4j_jdk14
testRuntimeOnly project(path: ":runners:direct-java", configuration:
"shadow")
}
diff --git
a/sdks/java/io/clickhouse/src/main/java/org/apache/beam/sdk/io/clickhouse/ClickHouseIO.java
b/sdks/java/io/clickhouse/src/main/java/org/apache/beam/sdk/io/clickhouse/ClickHouseIO.java
index 52dca7cfa64..fc00a1e420e 100644
---
a/sdks/java/io/clickhouse/src/main/java/org/apache/beam/sdk/io/clickhouse/ClickHouseIO.java
+++
b/sdks/java/io/clickhouse/src/main/java/org/apache/beam/sdk/io/clickhouse/ClickHouseIO.java
@@ -17,18 +17,17 @@
*/
package org.apache.beam.sdk.io.clickhouse;
-import com.clickhouse.client.ClickHouseRequest;
+import com.clickhouse.client.api.Client;
+import com.clickhouse.client.api.insert.InsertResponse;
+import com.clickhouse.client.api.query.GenericRecord;
+import com.clickhouse.client.api.query.Records;
import com.clickhouse.data.ClickHouseFormat;
-import com.clickhouse.jdbc.ClickHouseConnection;
-import com.clickhouse.jdbc.ClickHouseDataSource;
-import com.clickhouse.jdbc.ClickHouseStatement;
import com.google.auto.value.AutoValue;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.sql.Statement;
+import java.io.ByteArrayOutputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
+import java.util.Map;
import java.util.Properties;
import java.util.stream.Collectors;
import org.apache.beam.sdk.io.clickhouse.TableSchema.ColumnType;
@@ -63,15 +62,27 @@ import org.slf4j.LoggerFactory;
*
* <h3>Writing to ClickHouse</h3>
*
- * <p>To write to ClickHouse, use {@link ClickHouseIO#write(String, String)},
which writes elements
- * from input {@link PCollection}. It's required that your ClickHouse cluster
already has table you
- * are going to insert into.
+ * <p>To write to ClickHouse, use {@link ClickHouseIO#write(String, String,
String)}, which writes
+ * elements from input {@link PCollection}. It's required that your ClickHouse
cluster already has
+ * table you are going to insert into.
*
* <pre>{@code
+ * // New way (recommended):
+ * Properties props = new Properties();
+ * props.setProperty("user", "admin");
+ * props.setProperty("password", "secret");
+ *
+ * pipeline
+ * .apply(...)
+ * .apply(
+ * ClickHouseIO.<POJO>write("http://localhost:8123", "default", "my_table")
+ * .withProperties(props));
+ *
+ * // Old way (deprecated):
* pipeline
* .apply(...)
* .apply(
- * ClickHouseIO.<POJO>write("jdbc:clickhouse:localhost:8123/default",
"my_table"));
+ * ClickHouseIO.<POJO>write("jdbc:clickhouse://localhost:8123/default",
"my_table"));
* }</pre>
*
* <p>Optionally, you can provide connection settings, for instance, specify
insert block size with
@@ -80,14 +91,21 @@ import org.slf4j.LoggerFactory;
*
* <h4>Deduplication</h4>
*
- * Deduplication is performed by ClickHouse if inserting to <a
- *
href="https://clickhouse.yandex/docs/en/single/#data-replication">ReplicatedMergeTree</a>
or <a
- *
href="https://clickhouse.yandex/docs/en/single/#distributed">Distributed</a>
table on top of
- * ReplicatedMergeTree. Without replication, inserting into regular MergeTree
can produce
- * duplicates, if insert fails, and then successfully retries. However, each
block is inserted
- * atomically, and you can configure block size with {@link
Write#withMaxInsertBlockSize(long)}.
+ * <p>Deduplication is performed by ClickHouse if inserting to <a
+ *
href="https://clickhouse.com/docs/engines/table-engines/mergetree-family/replication">ReplicatedMergeTree</a>
+ * or <a
+ *
href="https://clickhouse.com/docs/engines/table-engines/special/distributed">Distributed</a>
+ * table on top of ReplicatedMergeTree. Without replication, inserting into
regular MergeTree can
+ * produce duplicates, if insert fails, and then successfully retries.
However, each block is
+ * inserted atomically, and you can configure block size with {@link
+ * Write#withMaxInsertBlockSize(long)}.
*
- * <p>Deduplication is performed using checksums of inserted blocks.
+ * <p>Deduplication is performed using checksums of inserted blocks. For <a
+ *
href="https://clickhouse.com/docs/engines/table-engines/mergetree-family/shared-merge-tree">SharedMergeTree</a>
+ * tables in ClickHouse Cloud, deduplication behavior is similar to
ReplicatedMergeTree. For more
+ * information about deduplication, please visit the <a
+ *
href="https://clickhouse.com/docs/guides/developer/deduplication">Deduplication
strategies
+ * documentation</a>
*
* <h4>Mapping between Beam and ClickHouse types</h4>
*
@@ -114,8 +132,10 @@ import org.slf4j.LoggerFactory;
* <tr><td>{@link TableSchema.TypeName#TUPLE}</td> <td>{@link
Schema.TypeName#ROW}</td></tr>
* </table>
*
- * Nullable row columns are supported through Nullable type in ClickHouse. Low
cardinality hint is
- * supported through LowCardinality DataType in ClickHouse.
+ * <p>Nullable row columns are supported through <a
+ *
href="https://clickhouse.com/docs/sql-reference/data-types/nullable">Nullable
type</a> in
+ * ClickHouse. <a
href="https://clickhouse.com/docs/sql-reference/data-types/LowCardinality">Low
+ * cardinality hint </a> is supported through LowCardinality DataType in
ClickHouse.
*
* <p>Nested rows should be unnested using {@link Select#flattenedSchema()}.
Type casting should be
* done using {@link org.apache.beam.sdk.schemas.transforms.Cast} before
{@link ClickHouseIO}.
@@ -130,9 +150,57 @@ public class ClickHouseIO {
public static final Duration DEFAULT_MAX_CUMULATIVE_BACKOFF =
Duration.standardDays(1000);
public static final Duration DEFAULT_INITIAL_BACKOFF =
Duration.standardSeconds(5);
+ /**
+ * Creates a write transform using a JDBC URL format.
+ *
+ * <p><b>Deprecated:</b> Use {@link #write(String, String, String)} instead
with separate URL,
+ * database, and table parameters.
+ *
+ * <p>This method is provided for backward compatibility. It parses the JDBC
URL to extract the
+ * connection URL, database name, and any connection properties specified in
the query string.
+ * Properties can be overridden later using {@link
Write#withProperties(Properties)}.
+ *
+ * <p>Example:
+ *
+ * <pre>{@code
+ * // Old way (deprecated):
+ *
ClickHouseIO.write("jdbc:clickhouse://localhost:8123/mydb?user=admin&password=secret",
"table")
+ *
+ * // New way:
+ * ClickHouseIO.write("http://localhost:8123", "mydb", "table")
+ * .withProperties(props)
+ * }</pre>
+ *
+ * <p><b>Property Precedence:</b> Properties from the JDBC URL can be
overridden by calling {@link
+ * Write#withProperties(Properties)}. Later calls to withProperties()
override earlier settings.
+ *
+ * @param jdbcUrl JDBC connection URL (e.g.,
jdbc:clickhouse://host:port/database?param=value)
+ * @param table table name
+ * @return a {@link PTransform} writing data to ClickHouse
+ * @deprecated Use {@link #write(String, String, String)} with explicit URL,
database, and table
+ */
+ @Deprecated
public static <T> Write<T> write(String jdbcUrl, String table) {
+ ClickHouseJdbcUrlParser.ParsedJdbcUrl parsed =
ClickHouseJdbcUrlParser.parse(jdbcUrl);
+
return new AutoValue_ClickHouseIO_Write.Builder<T>()
- .jdbcUrl(jdbcUrl)
+ .clickHouseUrl(parsed.getClickHouseUrl())
+ .database(parsed.getDatabase())
+ .table(table)
+ .properties(parsed.getProperties()) // Start with JDBC URL properties
+ .maxInsertBlockSize(DEFAULT_MAX_INSERT_BLOCK_SIZE)
+ .initialBackoff(DEFAULT_INITIAL_BACKOFF)
+ .maxRetries(DEFAULT_MAX_RETRIES)
+ .maxCumulativeBackoff(DEFAULT_MAX_CUMULATIVE_BACKOFF)
+ .build()
+ .withInsertDeduplicate(true)
+ .withInsertDistributedSync(true);
+ }
+
+ public static <T> Write<T> write(String clickHouseUrl, String database,
String table) {
+ return new AutoValue_ClickHouseIO_Write.Builder<T>()
+ .clickHouseUrl(clickHouseUrl)
+ .database(database)
.table(table)
.properties(new Properties())
.maxInsertBlockSize(DEFAULT_MAX_INSERT_BLOCK_SIZE)
@@ -148,7 +216,9 @@ public class ClickHouseIO {
@AutoValue
public abstract static class Write<T> extends PTransform<PCollection<T>,
PDone> {
- public abstract String jdbcUrl();
+ public abstract String clickHouseUrl();
+
+ public abstract String database();
public abstract String table();
@@ -176,7 +246,7 @@ public class ClickHouseIO {
public PDone expand(PCollection<T> input) {
TableSchema tableSchema = tableSchema();
if (tableSchema == null) {
- tableSchema = getTableSchema(jdbcUrl(), table());
+ tableSchema = getTableSchema(clickHouseUrl(), database(), table(),
properties());
}
String sdkVersion = ReleaseInfo.getReleaseInfo().getSdkVersion();
@@ -192,7 +262,8 @@ public class ClickHouseIO {
WriteFn<T> fn =
new AutoValue_ClickHouseIO_WriteFn.Builder<T>()
- .jdbcUrl(jdbcUrl())
+ .clickHouseUrl(clickHouseUrl())
+ .database(database())
.table(table())
.maxInsertBlockSize(maxInsertBlockSize())
.schema(tableSchema)
@@ -212,7 +283,8 @@ public class ClickHouseIO {
*
* @param value number of rows
* @return a {@link PTransform} writing data to ClickHouse
- * @see <a
href="https://clickhouse.yandex/docs/en/single/#max_insert_block_size">ClickHouse
+ * @see <a
+ *
href="https://clickhouse.com/docs/operations/settings/settings#max_insert_block_size">ClickHouse
* documentation</a>
*/
public Write<T> withMaxInsertBlockSize(long value) {
@@ -238,7 +310,8 @@ public class ClickHouseIO {
*
* @param value number of replicas, 0 for disabling, null for server
default
* @return a {@link PTransform} writing data to ClickHouse
- * @see <a
href="https://clickhouse.yandex/docs/en/single/#insert_quorum">ClickHouse
+ * @see <a
+ *
href="https://clickhouse.com/docs/operations/settings/settings#insert_quorum">ClickHouse
* documentation</a>
*/
public Write<T> withInsertQuorum(@Nullable Long value) {
@@ -305,11 +378,56 @@ public class ClickHouseIO {
return toBuilder().tableSchema(tableSchema).build();
}
+ /**
+ * Set connection properties (user, password, etc.).
+ *
+ * <p><b>Important:</b> If using the deprecated JDBC URL-based {@link
#write(String, String)}
+ * method, this will fail if any properties specified here conflict with
properties already
+ * extracted from the JDBC URL. This prevents accidental property
conflicts.
+ *
+ * <p>For the new API {@link #write(String, String, String)}, properties
can be set freely since
+ * there are no URL-embedded properties to conflict with.
+ *
+ * @param properties connection properties
+ * @return a {@link PTransform} writing data to ClickHouse
+ * @throws IllegalArgumentException if properties is null or if any
property conflicts with
+ * existing properties (e.g., from JDBC URL)
+ */
+ public Write<T> withProperties(Properties properties) {
+ if (properties == null) {
+ throw new IllegalArgumentException("Properties cannot be null");
+ }
+
+ // Check for conflicts with existing properties
+ Properties existing = properties();
+ for (String key : properties.stringPropertyNames()) {
+ if (existing.containsKey(key)) {
+ String existingValue = existing.getProperty(key);
+ String newValue = properties.getProperty(key);
+ if (!existingValue.equals(newValue)) {
+ throw new IllegalArgumentException(
+ String.format(
+ "Property conflict: '%s' is already set to '%s' (likely
from JDBC URL), "
+ + "but attempting to set it to '%s'. "
+ + "Please use either JDBC URL properties OR
withProperties(), not both for the same keys.",
+ key, existingValue, newValue));
+ }
+ }
+ }
+
+ // Merge properties: new properties are added to existing ones
+ Properties merged = new Properties();
+ merged.putAll(existing);
+ merged.putAll(properties);
+ return toBuilder().properties(merged).build();
+ }
/** Builder for {@link Write}. */
@AutoValue.Builder
abstract static class Builder<T> {
- public abstract Builder<T> jdbcUrl(String jdbcUrl);
+ public abstract Builder<T> clickHouseUrl(String clickHouseUrl);
+
+ public abstract Builder<T> database(String database);
public abstract Builder<T> table(String table);
@@ -348,7 +466,7 @@ public class ClickHouseIO {
private static final String RETRY_ATTEMPT_LOG =
"Error writing to ClickHouse. Retry attempt[{}]";
- private ClickHouseConnection connection;
+ private Client client;
private FluentBackoff retryBackoff;
private final List<Row> buffer = new ArrayList<>();
private final Distribution batchSize = Metrics.distribution(Write.class,
"batch_size");
@@ -360,7 +478,9 @@ public class ClickHouseIO {
@FieldAccess("filterFields")
final FieldAccessDescriptor fieldAccessDescriptor =
FieldAccessDescriptor.withAllFields();
- public abstract String jdbcUrl();
+ public abstract String clickHouseUrl();
+
+ public abstract String database();
public abstract String table();
@@ -387,9 +507,36 @@ public class ClickHouseIO {
}
@Setup
- public void setup() throws SQLException {
+ public void setup() throws Exception {
+
+ String user = properties().getProperty("user", "default");
+ String password = properties().getProperty("password", "");
+
+ // add the options to the client builder
+ Map<String, String> options =
+ properties().stringPropertyNames().stream()
+ .filter(key -> !key.equals("user") && !key.equals("password"))
+ .collect(Collectors.toMap(key -> key,
properties()::getProperty));
+
+ // Create ClickHouse Java Client
+ Client.Builder clientBuilder =
+ new Client.Builder()
+ .addEndpoint(clickHouseUrl())
+ .setUsername(user)
+ .setPassword(password)
+ .setDefaultDatabase(database())
+ .setOptions(options)
+ .setClientName(
+ String.format("Apache Beam/%s",
ReleaseInfo.getReleaseInfo().getSdkVersion()));
+
+ // Add optional compression if specified in properties
+ String compress = properties().getProperty("compress", "false");
+ if (Boolean.parseBoolean(compress)) {
+ clientBuilder.compressServerResponse(true);
+ clientBuilder.compressClientRequest(true);
+ }
- connection = new ClickHouseDataSource(jdbcUrl(),
properties()).getConnection();
+ client = clientBuilder.build();
retryBackoff =
FluentBackoff.DEFAULT
@@ -400,7 +547,9 @@ public class ClickHouseIO {
@Teardown
public void tearDown() throws Exception {
- connection.close();
+ if (client != null) {
+ client.close();
+ }
}
@StartBundle
@@ -431,25 +580,46 @@ public class ClickHouseIO {
}
batchSize.update(buffer.size());
+
+ // Serialize rows to RowBinary format
+ ByteArrayOutputStream byteStream = new ByteArrayOutputStream();
+
+ // Wrap ByteArrayOutputStream with ClickHouseOutputStream
+ try (com.clickhouse.data.ClickHouseOutputStream outputStream =
+ com.clickhouse.data.ClickHouseOutputStream.of(byteStream)) {
+ for (Row row : buffer) {
+ ClickHouseWriter.writeRow(outputStream, schema(), row);
+ }
+ outputStream.flush();
+ }
+ byte[] data = byteStream.toByteArray();
+
while (true) {
- try (ClickHouseStatement statement = connection.createStatement()) {
- statement
- .unwrap(ClickHouseRequest.class)
- .write()
- .table(table())
- .format(ClickHouseFormat.RowBinary)
- .data(
- out -> {
- for (Row row : buffer) {
- ClickHouseWriter.writeRow(out, schema(), row);
- }
- })
- .executeAndWait(); // query happens in a separate thread
+ try {
+
+ // Perform the insert using ClickHouse Java Client
+ InsertResponse response =
+ client
+ .insert(
+ table(), new java.io.ByteArrayInputStream(data),
ClickHouseFormat.RowBinary)
+ .get();
+
+ if (response != null) {
+ LOG.debug(
+ "Successfully inserted {} rows out of {} into table {}. total
size written {} bytes",
+ response.getWrittenRows(),
+ buffer.size(),
+ table(),
+ response.getWrittenBytes());
+ } else {
+ LOG.debug("Successfully inserted {} rows into table {}",
buffer.size(), table());
+ }
+
buffer.clear();
break;
- } catch (SQLException e) {
+ } catch (Exception e) {
if (!BackOffUtils.next(Sleeper.DEFAULT, backOff)) {
- throw e;
+ throw new RuntimeException("Failed to write to ClickHouse after
retries", e);
} else {
retries.inc();
LOG.warn(RETRY_ATTEMPT_LOG, attempt, e);
@@ -462,7 +632,9 @@ public class ClickHouseIO {
@AutoValue.Builder
abstract static class Builder<T> {
- public abstract Builder<T> jdbcUrl(String jdbcUrl);
+ public abstract Builder<T> clickHouseUrl(String clickHouseUrl);
+
+ public abstract Builder<T> database(String database);
public abstract Builder<T> table(String table);
@@ -491,57 +663,106 @@ public class ClickHouseIO {
String.join(",", l).trim().replaceAll("Tuple\\(",
"Tuple('").replaceAll(",", ",'");
return content;
}
+
/**
- * Returns {@link TableSchema} for a given table.
+ * Returns {@link TableSchema} for a given table using JDBC URL format.
+ *
+ * <p><b>Deprecated:</b> Use {@link #getTableSchema(String, String, String,
Properties)} instead
+ * with separate URL, database, table, and properties parameters.
+ *
+ * <p>This method parses the JDBC URL to extract connection details and
properties. For new code,
+ * use the explicit parameter version for better clarity and control.
*
- * @param jdbcUrl jdbc connection url
+ * <p>Example migration:
+ *
+ * <pre>{@code
+ * // Old way (deprecated):
+ * TableSchema schema = ClickHouseIO.getTableSchema(
+ * "jdbc:clickhouse://localhost:8123/mydb?user=admin", "my_table");
+ *
+ * // New way:
+ * Properties props = new Properties();
+ * props.setProperty("user", "admin");
+ * TableSchema schema = ClickHouseIO.getTableSchema(
+ * "http://localhost:8123", "mydb", "my_table", props);
+ * }</pre>
+ *
+ * @param jdbcUrl JDBC connection URL (e.g.,
jdbc:clickhouse://host:port/database?param=value)
* @param table table name
* @return table schema
+ * @deprecated Use {@link #getTableSchema(String, String, String,
Properties)} with explicit
+ * parameters
*/
+ @Deprecated
public static TableSchema getTableSchema(String jdbcUrl, String table) {
- List<TableSchema.Column> columns = new ArrayList<>();
-
- try (ClickHouseConnection connection = new
ClickHouseDataSource(jdbcUrl).getConnection();
- Statement statement = connection.createStatement()) {
-
- ResultSet rs = null; // try-finally is used because findbugs doesn't
like try-with-resource
- try {
- rs = statement.executeQuery("DESCRIBE TABLE " +
quoteIdentifier(table));
-
- while (rs.next()) {
- String name = rs.getString("name");
- String type = rs.getString("type");
- String defaultTypeStr = rs.getString("default_type");
- String defaultExpression = rs.getString("default_expression");
+ ClickHouseJdbcUrlParser.ParsedJdbcUrl parsed =
ClickHouseJdbcUrlParser.parse(jdbcUrl);
+ return getTableSchema(
+ parsed.getClickHouseUrl(), parsed.getDatabase(), table,
parsed.getProperties());
+ }
- ColumnType columnType = null;
- if (type.toLowerCase().trim().startsWith("tuple(")) {
- String content = tuplePreprocessing(type);
- columnType = ColumnType.parse(content);
- } else {
- columnType = ColumnType.parse(type);
- }
- DefaultType defaultType =
DefaultType.parse(defaultTypeStr).orElse(null);
+ /**
+ * Returns {@link TableSchema} for a given table using ClickHouse Java
Client.
+ *
+ * @param clickHouseUrl ClickHouse connection url
+ * @param database ClickHouse database
+ * @param table table name
+ * @param properties connection properties
+ * @return table schema
+ * @since 2.72.0
+ */
+ public static TableSchema getTableSchema(
+ String clickHouseUrl, String database, String table, Properties
properties) {
+ List<TableSchema.Column> columns = new ArrayList<>();
- Object defaultValue;
- if (DefaultType.DEFAULT.equals(defaultType)
- && !Strings.isNullOrEmpty(defaultExpression)) {
- defaultValue = ColumnType.parseDefaultExpression(columnType,
defaultExpression);
- } else {
- defaultValue = null;
+ try {
+ String user = properties.getProperty("user", "default");
+ String password = properties.getProperty("password", "");
+
+ // Create ClickHouse Java Client
+ Client.Builder clientBuilder =
+ new Client.Builder()
+ .addEndpoint(clickHouseUrl)
+ .setUsername(user)
+ .setPassword(password)
+ .setDefaultDatabase(database)
+ .setClientName(
+ String.format("Apache Beam/%s",
ReleaseInfo.getReleaseInfo().getSdkVersion()));
+
+ try (Client client = clientBuilder.build()) {
+ String query = "DESCRIBE TABLE " + quoteIdentifier(table);
+
+ try (Records records = client.queryRecords(query).get()) {
+ for (GenericRecord record : records) {
+ String name = record.getString("name");
+ String type = record.getString("type");
+ String defaultTypeStr = record.getString("default_type");
+ String defaultExpression = record.getString("default_expression");
+
+ ColumnType columnType;
+ if (type.toLowerCase().trim().startsWith("tuple(")) {
+ String content = tuplePreprocessing(type);
+ columnType = ColumnType.parse(content);
+ } else {
+ columnType = ColumnType.parse(type);
+ }
+ DefaultType defaultType =
DefaultType.parse(defaultTypeStr).orElse(null);
+
+ Object defaultValue;
+ if (DefaultType.DEFAULT.equals(defaultType)
+ && !Strings.isNullOrEmpty(defaultExpression)) {
+ defaultValue = ColumnType.parseDefaultExpression(columnType,
defaultExpression);
+ } else {
+ defaultValue = null;
+ }
+
+ columns.add(TableSchema.Column.of(name, columnType, defaultType,
defaultValue));
}
-
- columns.add(TableSchema.Column.of(name, columnType, defaultType,
defaultValue));
- }
- } finally {
- if (rs != null) {
- rs.close();
}
}
return TableSchema.of(columns.toArray(new TableSchema.Column[0]));
- } catch (SQLException e) {
- throw new RuntimeException(e);
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to get table schema for table: " +
table, e);
}
}
diff --git
a/sdks/java/io/clickhouse/src/main/java/org/apache/beam/sdk/io/clickhouse/ClickHouseJdbcUrlParser.java
b/sdks/java/io/clickhouse/src/main/java/org/apache/beam/sdk/io/clickhouse/ClickHouseJdbcUrlParser.java
new file mode 100644
index 00000000000..92cd1eaeacd
--- /dev/null
+++
b/sdks/java/io/clickhouse/src/main/java/org/apache/beam/sdk/io/clickhouse/ClickHouseJdbcUrlParser.java
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.clickhouse;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.List;
+import java.util.Properties;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Splitter;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
+
+/**
+ * Utility class for parsing ClickHouse JDBC URLs and extracting connection
parameters.
+ *
+ * <p>Used for supporting backward compatibility with the deprecated {@link
+ * ClickHouseIO#write(String, String)} method that accepts JDBC URLs. New code
should use {@link
+ * ClickHouseIO#write(String, String, String)} with explicit parameters
instead.
+ *
+ * @deprecated Use {@link ClickHouseIO#write(String, String, String)} with
separate clickHouseUrl,
+ * database, and table parameters instead of JDBC URL format.
+ */
+@Deprecated
+class ClickHouseJdbcUrlParser {
+
+ /**
+ * Represents parsed components of a ClickHouse JDBC URL.
+ *
+ * <p>Contains the extracted HTTP/HTTPS URL, database name, and connection
properties from a JDBC
+ * URL string.
+ *
+ * @deprecated This class supports the deprecated JDBC URL-based API. Use
separate parameters for
+ * clickHouseUrl, database, and properties instead.
+ */
+ @Deprecated
+ static class ParsedJdbcUrl {
+ private final String clickHouseUrl;
+ private final String database;
+ private final Properties properties;
+
+ ParsedJdbcUrl(String clickHouseUrl, String database, Properties
properties) {
+ this.clickHouseUrl = clickHouseUrl;
+ this.database = database;
+ this.properties = properties;
+ }
+
+ public String getClickHouseUrl() {
+ return clickHouseUrl;
+ }
+
+ public String getDatabase() {
+ return database;
+ }
+
+ public Properties getProperties() {
+ return properties;
+ }
+ }
+
+ /**
+ * Parses a ClickHouse JDBC URL into its components.
+ *
+ * <p>Supported formats:
+ *
+ * <ul>
+ * <li>jdbc:clickhouse://host:port/database?param=value
+ * <li>jdbc:clickhouse:http://host:port/database?param=value
+ * <li>jdbc:clickhouse:https://host:port/database?param=value
+ * <li>jdbc:ch://host:port/database?param=value (ClickHouse JDBC driver
shorthand)
+ * </ul>
+ *
+ * @param jdbcUrl the JDBC URL to parse
+ * @return ParsedJdbcUrl containing the HTTP/HTTPS URL, database, and
properties
+ * @throws IllegalArgumentException if the URL format is invalid
+ */
+ static ParsedJdbcUrl parse(String jdbcUrl) {
+ if (Strings.isNullOrEmpty(jdbcUrl)) {
+ throw new IllegalArgumentException("JDBC URL cannot be null or empty");
+ }
+
+ String actualUrl = extractHttpUrl(jdbcUrl);
+
+ try {
+ URI uri = new URI(actualUrl);
+
+ validateScheme(uri.getScheme());
+ String host = validateAndGetHost(uri.getHost(), jdbcUrl);
+ int port = getPortOrDefault(uri.getPort(), uri.getScheme());
+
+ String clickHouseUrl = String.format("%s://%s:%d", uri.getScheme(),
host, port);
+ String database = extractDatabase(uri.getPath());
+ Properties properties = extractProperties(uri.getQuery());
+
+ return new ParsedJdbcUrl(clickHouseUrl, database, properties);
+
+ } catch (URISyntaxException e) {
+ throw new IllegalArgumentException("Invalid JDBC URL format: " +
jdbcUrl, e);
+ } catch (java.io.UnsupportedEncodingException e) {
+ throw new IllegalArgumentException("Failed to decode URL parameters: " +
jdbcUrl, e);
+ }
+ }
+
+ /**
+ * Extracts and normalizes the HTTP/HTTPS URL from a JDBC URL.
+ *
+ * @param jdbcUrl the JDBC URL to process
+ * @return normalized HTTP/HTTPS URL
+ * @throws IllegalArgumentException if the URL format is invalid
+ */
+ private static String extractHttpUrl(String jdbcUrl) {
+ // Remove jdbc: prefix
+ String urlWithoutJdbc = jdbcUrl;
+ if (jdbcUrl.toLowerCase().startsWith("jdbc:")) {
+ urlWithoutJdbc = jdbcUrl.substring(5);
+ }
+
+ // Handle jdbc:clickhouse: or jdbc:ch: prefix
+ String actualUrl;
+ if (urlWithoutJdbc.toLowerCase().startsWith("clickhouse:")) {
+ actualUrl = urlWithoutJdbc.substring(11);
+ } else if (urlWithoutJdbc.toLowerCase().startsWith("ch:")) {
+ actualUrl = urlWithoutJdbc.substring(3);
+ } else {
+ throw new IllegalArgumentException(
+ "Invalid JDBC URL format. Expected 'jdbc:clickhouse:' or 'jdbc:ch:'
prefix. Got: "
+ + jdbcUrl);
+ }
+
+ // Check if URL already has a scheme and validate it
+ if (actualUrl.toLowerCase().startsWith("http://")
+ || actualUrl.toLowerCase().startsWith("https://")) {
+ return actualUrl;
+ }
+
+ // Check for invalid schemes before prepending http://
+ if (actualUrl.contains("://")) {
+ // Extract the scheme part
+ int schemeEnd = actualUrl.indexOf("://");
+ String scheme = actualUrl.substring(0, schemeEnd).toLowerCase();
+ if (!scheme.equals("http") && !scheme.equals("https")) {
+ throw new IllegalArgumentException(
+ "Invalid scheme in JDBC URL. Expected 'http' or 'https'. Got: " +
scheme);
+ }
+ }
+
+ // If URL doesn't start with http:// or https://, assume http://
+ if (actualUrl.startsWith("//")) {
+ actualUrl = "http:" + actualUrl;
+ } else {
+ actualUrl = "http://" + actualUrl;
+ }
+
+ return actualUrl;
+ }
+
+ /**
+ * Validates the URI scheme.
+ *
+ * @param scheme the scheme to validate
+ * @throws IllegalArgumentException if scheme is invalid
+ */
+ private static void validateScheme(String scheme) {
+ if (scheme == null || (!scheme.equals("http") && !scheme.equals("https")))
{
+ throw new IllegalArgumentException(
+ "Invalid scheme. Expected 'http' or 'https'. Got: " + scheme);
+ }
+ }
+
+ /**
+ * Validates and returns the host from the URI.
+ *
+ * @param host the host to validate
+ * @param jdbcUrl the original JDBC URL (for error reporting)
+ * @return the validated host
+ * @throws IllegalArgumentException if host is invalid
+ */
+ private static String validateAndGetHost(String host, String jdbcUrl) {
+ if (Strings.isNullOrEmpty(host)) {
+ throw new IllegalArgumentException("Host cannot be empty in JDBC URL: "
+ jdbcUrl);
+ }
+ return host;
+ }
+
+ /**
+ * Returns the port or default port based on scheme.
+ *
+ * @param port the port from URI (-1 if not specified)
+ * @param scheme the URI scheme (http or https)
+ * @return the port number
+ */
+ private static int getPortOrDefault(int port, String scheme) {
+ if (port == -1) {
+ return scheme.equals("https") ? 8443 : 8123; // Default ClickHouse ports
+ }
+ return port;
+ }
+
+ /**
+ * Extracts database name from URI path.
+ *
+ * @param path the URI path
+ * @return the database name, or "default" if not specified
+ */
+ private static String extractDatabase(String path) {
+ if (Strings.isNullOrEmpty(path)) {
+ return "default";
+ }
+
+ // Remove leading slash
+ String pathWithoutSlash = path.startsWith("/") ? path.substring(1) : path;
+ return pathWithoutSlash.isEmpty() ? "default" : pathWithoutSlash;
+ }
+
+ /**
+ * Extracts connection properties from URI query string.
+ *
+ * @param query the URI query string
+ * @return Properties object containing the parsed parameters
+ * @throws java.io.UnsupportedEncodingException if URL decoding fails
+ */
+ private static Properties extractProperties(String query)
+ throws java.io.UnsupportedEncodingException {
+ Properties properties = new Properties();
+
+ if (Strings.isNullOrEmpty(query)) {
+ return properties;
+ }
+
+ // Use Guava Splitter instead of String.split()
+ for (String param : Splitter.on('&').split(query)) {
+ // Split key-value pairs, handling parameters without values
+ List<String> parts = Splitter.on('=').limit(2).splitToList(param);
+
+ if (parts.size() == 2) {
+ String key = java.net.URLDecoder.decode(parts.get(0), "UTF-8");
+ String value = java.net.URLDecoder.decode(parts.get(1), "UTF-8");
+ properties.setProperty(key, value);
+ } else if (parts.size() == 1) {
+ // Parameter without value (e.g., ?compress)
+ String key = java.net.URLDecoder.decode(parts.get(0), "UTF-8");
+ properties.setProperty(key, "true");
+ }
+ }
+
+ return properties;
+ }
+}
diff --git
a/sdks/java/io/clickhouse/src/main/java/org/apache/beam/sdk/io/clickhouse/TableSchema.java
b/sdks/java/io/clickhouse/src/main/java/org/apache/beam/sdk/io/clickhouse/TableSchema.java
index b89a88b3fae..baee77c5f9a 100644
---
a/sdks/java/io/clickhouse/src/main/java/org/apache/beam/sdk/io/clickhouse/TableSchema.java
+++
b/sdks/java/io/clickhouse/src/main/java/org/apache/beam/sdk/io/clickhouse/TableSchema.java
@@ -29,7 +29,10 @@ import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.logicaltypes.FixedBytes;
import org.checkerframework.checker.nullness.qual.Nullable;
-/** A descriptor for ClickHouse table schema. */
+/**
+ * A descriptor for ClickHouse table schema. To be updated with ClickHouse
table schema API -
+ * https://github.com/apache/beam/issues/37613
+ */
@AutoValue
@SuppressWarnings({
"nullness" // TODO(https://github.com/apache/beam/issues/20497)
@@ -185,7 +188,8 @@ public abstract class TableSchema implements Serializable {
/**
* An enumeration of possible kinds of default values in ClickHouse.
*
- * @see <a
href="https://clickhouse.yandex/docs/en/single/#default-values">ClickHouse
+ * @see <a
+ *
href="https://clickhouse.com/docs/sql-reference/statements/create/table#default_values">ClickHouse
* documentation</a>
*/
public enum DefaultType {
diff --git
a/sdks/java/io/clickhouse/src/test/java/org/apache/beam/sdk/io/clickhouse/AtomicInsertTest.java
b/sdks/java/io/clickhouse/src/test/java/org/apache/beam/sdk/io/clickhouse/AtomicInsertTest.java
index 3a881ff0459..73a822b8ec0 100644
---
a/sdks/java/io/clickhouse/src/test/java/org/apache/beam/sdk/io/clickhouse/AtomicInsertTest.java
+++
b/sdks/java/io/clickhouse/src/test/java/org/apache/beam/sdk/io/clickhouse/AtomicInsertTest.java
@@ -20,7 +20,6 @@ package org.apache.beam.sdk.io.clickhouse;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
-import java.sql.SQLException;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.beam.sdk.Pipeline;
@@ -48,6 +47,7 @@ public class AtomicInsertTest extends BaseClickHouseTest {
private static final int MIN_ATTEMPTS = 2;
private static final int MAX_ATTEMPTS = 20; // should be enough to succeed
at least once
+ static final int TEST_BATCH_SIZE = 100000;
private static boolean shouldAttempt(int i, long count) {
return i < MIN_ATTEMPTS || (count == 0 && i < MAX_ATTEMPTS);
@@ -55,8 +55,7 @@ public class AtomicInsertTest extends BaseClickHouseTest {
/** With sufficient block size, ClickHouse will atomically insert all or
nothing. */
@Test
- public void testAtomicInsert() throws SQLException {
- int size = 100000;
+ public void testAtomicInsert() throws Exception {
int done = 0;
// inserts to such table fail with 60% chance for 1M batch size
@@ -64,16 +63,16 @@ public class AtomicInsertTest extends BaseClickHouseTest {
"CREATE TABLE test_atomic_insert ("
+ " f0 Int64, "
+ " f1 Int64 MATERIALIZED CAST(if((rand() % "
- + size
+ + TEST_BATCH_SIZE
+ ") = 0, '', '1') AS Int64)"
+ ") ENGINE=MergeTree ORDER BY (f0)");
pipeline
// make sure we get one big bundle
- .apply(RangeBundle.of(size))
+ .apply(RangeBundle.of(TEST_BATCH_SIZE))
.apply(
- ClickHouseIO.<Row>write(clickHouse.getJdbcUrl(),
"test_atomic_insert")
- .withMaxInsertBlockSize(size)
+ ClickHouseIO.<Row>write(clickHouseUrl, database,
"test_atomic_insert")
+ .withMaxInsertBlockSize(TEST_BATCH_SIZE)
.withInitialBackoff(Duration.millis(1))
.withMaxRetries(2));
@@ -84,7 +83,7 @@ public class AtomicInsertTest extends BaseClickHouseTest {
}
// each insert is atomic, so we get exactly done * size elements
- assertEquals(((long) done) * size, count);
+ assertEquals(((long) done) * TEST_BATCH_SIZE, count);
assertTrue("insert didn't succeed after " + MAX_ATTEMPTS + " attempts",
count > 0L);
}
@@ -93,25 +92,24 @@ public class AtomicInsertTest extends BaseClickHouseTest {
* replicated tables, it will deduplicate blocks.
*/
@Test
- public void testIdempotentInsert() throws SQLException {
- int size = 100000;
+ public void testIdempotentInsert() throws Exception {
// inserts to such table fail with 60% chance for 1M batch size
executeSql(
"CREATE TABLE test_idempotent_insert ("
+ " f0 Int64, "
+ " f1 Int64 MATERIALIZED CAST(if((rand() % "
- + size
+ + TEST_BATCH_SIZE
+ ") = 0, '', '1') AS Int64)"
+ ")
ENGINE=ReplicatedMergeTree('/clickHouse/tables/0/test_idempotent_insert',
'replica_0') "
+ "ORDER BY (f0)");
pipeline
// make sure we get one big bundle
- .apply(RangeBundle.of(size))
+ .apply(RangeBundle.of(TEST_BATCH_SIZE))
.apply(
- ClickHouseIO.<Row>write(clickHouse.getJdbcUrl(),
"test_idempotent_insert")
- .withMaxInsertBlockSize(size)
+ ClickHouseIO.<Row>write(clickHouseUrl, database,
"test_idempotent_insert")
+ .withMaxInsertBlockSize(TEST_BATCH_SIZE)
.withInitialBackoff(Duration.millis(1))
.withMaxRetries(2));
@@ -122,7 +120,7 @@ public class AtomicInsertTest extends BaseClickHouseTest {
}
// inserts should be deduplicated, so we get exactly `size` elements
- assertEquals(size, count);
+ assertEquals(TEST_BATCH_SIZE, count);
assertTrue("insert didn't succeed after " + MAX_ATTEMPTS + " attempts",
count > 0L);
}
diff --git
a/sdks/java/io/clickhouse/src/test/java/org/apache/beam/sdk/io/clickhouse/BaseClickHouseTest.java
b/sdks/java/io/clickhouse/src/test/java/org/apache/beam/sdk/io/clickhouse/BaseClickHouseTest.java
index 5f90f3f3184..d3f6c398252 100644
---
a/sdks/java/io/clickhouse/src/test/java/org/apache/beam/sdk/io/clickhouse/BaseClickHouseTest.java
+++
b/sdks/java/io/clickhouse/src/test/java/org/apache/beam/sdk/io/clickhouse/BaseClickHouseTest.java
@@ -17,11 +17,13 @@
*/
package org.apache.beam.sdk.io.clickhouse;
+import static org.testcontainers.containers.ClickHouseContainer.HTTP_PORT;
+
+import com.clickhouse.client.api.Client;
+import com.clickhouse.client.api.query.GenericRecord;
+import com.clickhouse.client.api.query.Records;
import java.io.IOException;
-import java.sql.Connection;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.sql.Statement;
+import java.util.concurrent.TimeUnit;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
@@ -41,14 +43,19 @@ import org.testcontainers.containers.Network;
public class BaseClickHouseTest {
public static ClickHouseContainer clickHouse;
+ public static String clickHouseUrl;
+ public static String database;
public static Network network;
public static GenericContainer zookeeper;
+ static final int CLIENT_TIMEOUT = 30;
+
private static final Logger LOG =
LoggerFactory.getLogger(BaseClickHouseTest.class);
- private Connection connection;
+ private Client client;
@BeforeClass
public static void setup() throws IOException, InterruptedException {
+ System.setProperty("api.version", "1.44");
network = Network.newNetwork();
zookeeper =
@@ -72,45 +79,130 @@ public class BaseClickHouseTest {
;
clickHouse.start();
LOG.info("Start Clickhouse");
+ clickHouseUrl = "http://" + clickHouse.getHost() + ":" +
clickHouse.getMappedPort(HTTP_PORT);
+ database = "default";
}
@AfterClass
public static void tearDown() {
- clickHouse.close();
- zookeeper.close();
+ if (clickHouse != null) {
+ clickHouse.close();
+ }
+ if (zookeeper != null) {
+ zookeeper.close();
+ }
}
@Before
- public void setUp() throws SQLException {
- connection = clickHouse.createConnection("");
+ public void setUp() throws Exception {
+ // Create ClickHouse Java Client
+ Client.Builder clientBuilder =
+ new Client.Builder()
+ .addEndpoint(clickHouseUrl)
+ .setUsername(clickHouse.getUsername())
+ .setPassword(clickHouse.getPassword())
+ .setDefaultDatabase(database);
+
+ client = clientBuilder.build();
}
@After
public void after() {
- if (connection != null) {
+ if (client != null) {
try {
- connection.close();
- } catch (SQLException e) {
- // failed to close connection, ignore
+ client.close();
+ } catch (Exception e) {
+ LOG.warn("Failed to close ClickHouse client", e);
} finally {
- connection = null;
+ client = null;
}
}
}
- boolean executeSql(String sql) throws SQLException {
- Statement statement = connection.createStatement();
- return statement.execute(sql);
+ /**
+ * Executes a SQL statement (DDL, DML, etc.).
+ *
+ * @param sql SQL statement to execute
+ * @return true if execution was successful
+ * @throws Exception if execution fails
+ */
+ boolean executeSql(String sql) throws Exception {
+ try {
+ client.query(sql).get(CLIENT_TIMEOUT, TimeUnit.SECONDS);
+ return true;
+ } catch (Exception e) {
+ LOG.error("Failed to execute SQL: {}", sql, e);
+ throw e;
+ }
+ }
+
+ /**
+ * Executes a query and returns the results.
+ *
+ * @param sql SQL query to execute
+ * @return Records containing query results
+ * @throws Exception if query fails
+ */
+ Records executeQuery(String sql) throws Exception {
+ try {
+ return client.queryRecords(sql).get(CLIENT_TIMEOUT, TimeUnit.SECONDS);
+ } catch (Exception e) {
+ LOG.error("Failed to execute query: {}", sql, e);
+ throw e;
+ }
+ }
+
+ /**
+ * Executes a query and returns the first column of the first row as a long.
Useful for COUNT
+ * queries or other single-value results.
+ *
+ * @param sql SQL query to execute
+ * @return long value from first column of first row
+ * @throws Exception if query fails or result is empty
+ */
+ long executeQueryAsLong(String sql) throws Exception {
+ try (Records records = executeQuery(sql)) {
+ for (GenericRecord record : records) {
+ // Get the first column value - assuming it's numeric
+ return record.getLong(1); // Column index is 1-based
+ }
+ throw new IllegalStateException("Query returned no results: " + sql);
+ } catch (Exception e) {
+ LOG.error("Failed to execute query as long: {}", sql, e);
+ throw e;
+ }
}
- ResultSet executeQuery(String sql) throws SQLException {
- Statement statement = connection.createStatement();
- return statement.executeQuery(sql);
+ /**
+ * Executes a query and returns the first column of the first row as a
String.
+ *
+ * @param sql SQL query to execute
+ * @return String value from first column of first row
+ * @throws Exception if query fails or result is empty
+ */
+ String executeQueryAsString(String sql) throws Exception {
+ try (Records records = executeQuery(sql)) {
+ for (GenericRecord record : records) {
+ return record.getString(1); // Column index is 1-based
+ }
+ throw new IllegalStateException("Query returned no results: " + sql);
+ } catch (Exception e) {
+ LOG.error("Failed to execute query as string: {}", sql, e);
+ throw e;
+ }
}
- long executeQueryAsLong(String sql) throws SQLException {
- ResultSet rs = executeQuery(sql);
- rs.next();
- return rs.getLong(1);
+ /**
+ * Checks if the ClickHouse server is alive and responsive.
+ *
+ * @return true if server responds to ping
+ */
+ boolean isServerAlive() {
+ try {
+ return client != null && client.ping();
+ } catch (Exception e) {
+ LOG.warn("Server ping failed", e);
+ return false;
+ }
}
}
diff --git
a/sdks/java/io/clickhouse/src/test/java/org/apache/beam/sdk/io/clickhouse/ClickHouseIOJdbcBackwardCompatibilityTest.java
b/sdks/java/io/clickhouse/src/test/java/org/apache/beam/sdk/io/clickhouse/ClickHouseIOJdbcBackwardCompatibilityTest.java
new file mode 100644
index 00000000000..3a4a00421a4
--- /dev/null
+++
b/sdks/java/io/clickhouse/src/test/java/org/apache/beam/sdk/io/clickhouse/ClickHouseIOJdbcBackwardCompatibilityTest.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.clickhouse;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import org.apache.beam.sdk.io.clickhouse.ClickHouseIO.Write;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Integration tests for JDBC URL backward compatibility. */
+@RunWith(JUnit4.class)
+public class ClickHouseIOJdbcBackwardCompatibilityTest {
+
+ @Test
+ public void testDeprecatedWriteMethodWithBasicJdbcUrl() {
+ String jdbcUrl = "jdbc:clickhouse://localhost:8123/testdb";
+ String table = "test_table";
+
+ @SuppressWarnings("deprecation")
+ Write<?> write = ClickHouseIO.write(jdbcUrl, table);
+
+ assertEquals("http://localhost:8123", write.clickHouseUrl());
+ assertEquals("testdb", write.database());
+ assertEquals(table, write.table());
+ }
+
+ @Test
+ public void testDeprecatedWriteMethodWithParameters() {
+ String jdbcUrl =
"jdbc:clickhouse://localhost:8123/testdb?user=admin&password=secret";
+ String table = "test_table";
+
+ @SuppressWarnings("deprecation")
+ Write<?> write = ClickHouseIO.write(jdbcUrl, table);
+
+ assertEquals("http://localhost:8123", write.clickHouseUrl());
+ assertEquals("testdb", write.database());
+ assertEquals(table, write.table());
+ assertEquals("admin", write.properties().getProperty("user"));
+ assertEquals("secret", write.properties().getProperty("password"));
+ }
+
+ @Test
+ public void testDeprecatedWriteMethodPreservesDefaults() {
+ String jdbcUrl = "jdbc:clickhouse://localhost:8123/testdb";
+ String table = "test_table";
+
+ @SuppressWarnings("deprecation")
+ Write<?> write = ClickHouseIO.write(jdbcUrl, table);
+
+ // Verify defaults are set
+ assertEquals(ClickHouseIO.DEFAULT_MAX_INSERT_BLOCK_SIZE,
write.maxInsertBlockSize());
+ assertEquals(ClickHouseIO.DEFAULT_MAX_RETRIES, write.maxRetries());
+ assertEquals(ClickHouseIO.DEFAULT_INITIAL_BACKOFF, write.initialBackoff());
+ assertEquals(ClickHouseIO.DEFAULT_MAX_CUMULATIVE_BACKOFF,
write.maxCumulativeBackoff());
+ assertTrue(write.insertDeduplicate());
+ assertTrue(write.insertDistributedSync());
+ }
+
+ @Test
+ public void testNewWriteMethodEquivalence() {
+ String jdbcUrl = "jdbc:clickhouse://localhost:8123/testdb?user=admin";
+ String table = "test_table";
+
+ // Old way (deprecated)
+ @SuppressWarnings("deprecation")
+ Write<?> oldWrite = ClickHouseIO.write(jdbcUrl, table);
+
+ // New way
+ Write<?> newWrite =
+ ClickHouseIO.write("http://localhost:8123", "testdb", table)
+ .withProperties(oldWrite.properties());
+
+ // Should produce equivalent configurations
+ assertEquals(oldWrite.clickHouseUrl(), newWrite.clickHouseUrl());
+ assertEquals(oldWrite.database(), newWrite.database());
+ assertEquals(oldWrite.table(), newWrite.table());
+ assertEquals(
+ oldWrite.properties().getProperty("user"),
newWrite.properties().getProperty("user"));
+ }
+}
diff --git
a/sdks/java/io/clickhouse/src/test/java/org/apache/beam/sdk/io/clickhouse/ClickHouseIOPropertyMergingTest.java
b/sdks/java/io/clickhouse/src/test/java/org/apache/beam/sdk/io/clickhouse/ClickHouseIOPropertyMergingTest.java
new file mode 100644
index 00000000000..5bd0687f532
--- /dev/null
+++
b/sdks/java/io/clickhouse/src/test/java/org/apache/beam/sdk/io/clickhouse/ClickHouseIOPropertyMergingTest.java
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.clickhouse;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.util.Properties;
+import org.apache.beam.sdk.io.clickhouse.ClickHouseIO.Write;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for property conflict detection in ClickHouseIO. */
+@RunWith(JUnit4.class)
+public class ClickHouseIOPropertyMergingTest {
+
+ @Test
+ @SuppressWarnings("deprecation")
+ public void testDeprecatedWriteExtractsPropertiesFromJdbcUrl() {
+ String jdbcUrl =
+
"jdbc:clickhouse://localhost:8123/testdb?user=admin&password=secret&compress=true";
+ String table = "test_table";
+
+ Write<?> write = ClickHouseIO.write(jdbcUrl, table);
+
+ Properties props = write.properties();
+ assertEquals("admin", props.getProperty("user"));
+ assertEquals("secret", props.getProperty("password"));
+ assertEquals("true", props.getProperty("compress"));
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ @SuppressWarnings("deprecation")
+ public void testWithPropertiesConflictThrows() {
+ String jdbcUrl =
"jdbc:clickhouse://localhost:8123/testdb?user=admin&password=old_secret";
+ String table = "test_table";
+
+ Properties conflictingProps = new Properties();
+ conflictingProps.setProperty("password", "new_secret"); // Conflicts!
+
+ ClickHouseIO.write(jdbcUrl, table).withProperties(conflictingProps); //
Should throw
+ }
+
+ @Test
+ @SuppressWarnings("deprecation")
+ public void testWithPropertiesNoConflictWhenSameValue() {
+ String jdbcUrl =
"jdbc:clickhouse://localhost:8123/testdb?user=admin&password=secret";
+ String table = "test_table";
+
+ Properties sameProps = new Properties();
+ sameProps.setProperty("user", "admin"); // Same value - OK
+ sameProps.setProperty("password", "secret"); // Same value - OK
+
+ Write<?> write = ClickHouseIO.write(jdbcUrl,
table).withProperties(sameProps);
+
+ assertEquals("admin", write.properties().getProperty("user"));
+ assertEquals("secret", write.properties().getProperty("password"));
+ }
+
+ @Test
+ @SuppressWarnings("deprecation")
+ public void testWithPropertiesAddsNewPropertiesWithoutConflict() {
+ String jdbcUrl = "jdbc:clickhouse://localhost:8123/testdb?user=admin";
+ String table = "test_table";
+
+ Properties additionalProps = new Properties();
+ additionalProps.setProperty("socket_timeout", "30000"); // New property -
OK
+ additionalProps.setProperty("compress", "true"); // New property - OK
+
+ Write<?> write = ClickHouseIO.write(jdbcUrl,
table).withProperties(additionalProps);
+
+ Properties finalProps = write.properties();
+ assertEquals("admin", finalProps.getProperty("user"));
+ assertEquals("30000", finalProps.getProperty("socket_timeout"));
+ assertEquals("true", finalProps.getProperty("compress"));
+ }
+
+ @Test
+ public void testNewWriteMethodWithProperties() {
+ Properties props = new Properties();
+ props.setProperty("user", "admin");
+ props.setProperty("password", "secret");
+
+ Write<?> write =
+ ClickHouseIO.write("http://localhost:8123", "testdb",
"test_table").withProperties(props);
+
+ Properties finalProps = write.properties();
+ assertEquals("admin", finalProps.getProperty("user"));
+ assertEquals("secret", finalProps.getProperty("password"));
+ }
+
+ @Test
+ @SuppressWarnings("deprecation")
+ public void testEmptyPropertiesDoesNotAffectExisting() {
+ String jdbcUrl =
"jdbc:clickhouse://localhost:8123/testdb?user=admin&password=secret";
+ String table = "test_table";
+
+ Properties emptyProps = new Properties();
+
+ Write<?> write = ClickHouseIO.write(jdbcUrl,
table).withProperties(emptyProps);
+
+ Properties finalProps = write.properties();
+ assertEquals("admin", finalProps.getProperty("user"));
+ assertEquals("secret", finalProps.getProperty("password"));
+ }
+
+ @Test
+ @SuppressWarnings("deprecation")
+ public void testWithPropertiesConflictHasDetailedMessage() {
+ String jdbcUrl = "jdbc:clickhouse://localhost:8123/testdb?compress=false";
+ String table = "test_table";
+
+ Properties conflictingProps = new Properties();
+ conflictingProps.setProperty("compress", "true"); // Different value
+
+ try {
+ ClickHouseIO.write(jdbcUrl, table).withProperties(conflictingProps);
+ fail("Expected IllegalArgumentException for property conflict");
+ } catch (IllegalArgumentException e) {
+ // Verify error message is helpful
+ assertTrue(e.getMessage().contains("compress"));
+ assertTrue(e.getMessage().contains("false"));
+ assertTrue(e.getMessage().contains("true"));
+ assertTrue(e.getMessage().contains("conflict"));
+ }
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ @SuppressWarnings("deprecation")
+ public void testMultipleWithPropertiesCallsWithConflict() {
+ String jdbcUrl =
"jdbc:clickhouse://localhost:8123/testdb?password=original";
+ String table = "test_table";
+
+ Properties props1 = new Properties();
+ props1.setProperty("compress", "true"); // New property - OK
+
+ Properties props2 = new Properties();
+ props2.setProperty("password", "secret2"); // Conflicts with JDBC URL!
+
+ ClickHouseIO.write(jdbcUrl,
table).withProperties(props1).withProperties(props2);
+ }
+
+ @Test
+ @SuppressWarnings("deprecation")
+ public void testMultipleWithPropertiesCallsWithoutConflict() {
+ String jdbcUrl = "jdbc:clickhouse://localhost:8123/testdb?user=admin";
+ String table = "test_table";
+
+ Properties props1 = new Properties();
+ props1.setProperty("compress", "true"); // New property - OK
+
+ Properties props2 = new Properties();
+ props2.setProperty("socket_timeout", "30000"); // New property - OK
+
+ Write<?> write =
+ ClickHouseIO.write(jdbcUrl,
table).withProperties(props1).withProperties(props2);
+
+ Properties finalProps = write.properties();
+ assertEquals("admin", finalProps.getProperty("user")); // From JDBC URL
+ assertEquals("true", finalProps.getProperty("compress")); // From first
withProperties
+ assertEquals("30000", finalProps.getProperty("socket_timeout")); // From
second withProperties
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ @SuppressWarnings("deprecation")
+ public void testCannotOverrideJdbcUrlProperties() {
+ // This test verifies the NEW behavior: conflicts are not allowed
+ String jdbcUrl =
"jdbc:clickhouse://localhost:8123/testdb?user=url_user&password=url_pass";
+ String table = "test_table";
+
+ Properties conflictingProps = new Properties();
+ conflictingProps.setProperty("user", "explicit_user"); // Conflict!
+
+ ClickHouseIO.write(jdbcUrl, table).withProperties(conflictingProps); //
Should throw
+ }
+
+ @Test
+ @SuppressWarnings("deprecation")
+ public void testCanAddPropertiesToJdbcUrlWithoutConflict() {
+ String jdbcUrl = "jdbc:clickhouse://localhost:8123/testdb?user=admin";
+ String table = "test_table";
+
+ Properties additionalProps = new Properties();
+ additionalProps.setProperty("password", "secret"); // New - no conflict
+ additionalProps.setProperty("compress", "true"); // New - no conflict
+
+ Write<?> write = ClickHouseIO.write(jdbcUrl,
table).withProperties(additionalProps);
+
+ Properties finalProps = write.properties();
+ assertEquals("admin", finalProps.getProperty("user"));
+ assertEquals("secret", finalProps.getProperty("password"));
+ assertEquals("true", finalProps.getProperty("compress"));
+ }
+}
diff --git
a/sdks/java/io/clickhouse/src/test/java/org/apache/beam/sdk/io/clickhouse/ClickHouseIOTest.java
b/sdks/java/io/clickhouse/src/test/java/org/apache/beam/sdk/io/clickhouse/ClickHouseIOTest.java
index 8e5dc7ebe38..64f7f86177b 100644
---
a/sdks/java/io/clickhouse/src/test/java/org/apache/beam/sdk/io/clickhouse/ClickHouseIOTest.java
+++
b/sdks/java/io/clickhouse/src/test/java/org/apache/beam/sdk/io/clickhouse/ClickHouseIOTest.java
@@ -19,10 +19,14 @@ package org.apache.beam.sdk.io.clickhouse;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
-import java.sql.ResultSet;
+import com.clickhouse.client.api.query.GenericRecord;
+import com.clickhouse.client.api.query.Records;
+import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Objects;
+import java.util.Properties;
import org.apache.beam.sdk.io.clickhouse.TableSchema.ColumnType;
import org.apache.beam.sdk.schemas.JavaFieldSchema;
import org.apache.beam.sdk.schemas.Schema;
@@ -155,15 +159,17 @@ public class ClickHouseIOTest extends BaseClickHouseTest {
pipeline.run().waitUntilFinish();
- try (ResultSet rs = executeQuery("SELECT * FROM test_named_tuples")) {
- rs.next();
- assertEquals("[tuple, true]", rs.getString("t0"));
+ try (Records records = executeQuery("SELECT * FROM test_named_tuples")) {
+ for (GenericRecord record : records) {
+ assertArrayEquals(new Object[] {"tuple", true}, record.getTuple("t0"));
+ }
}
- try (ResultSet rs = executeQuery("SELECT t0.f0 as f0, t0.f1 as f1 FROM
test_named_tuples")) {
- rs.next();
- assertEquals("tuple", rs.getString("f0"));
- assertEquals("true", rs.getString("f1"));
+ try (Records records = executeQuery("SELECT t0.f0 as f0, t0.f1 as f1 FROM
test_named_tuples")) {
+ for (GenericRecord record : records) {
+ assertEquals("tuple", record.getString("f0"));
+ assertTrue(record.getBoolean("f1"));
+ }
}
}
@@ -202,17 +208,24 @@ public class ClickHouseIOTest extends BaseClickHouseTest {
pipeline.run().waitUntilFinish();
- try (ResultSet rs = executeQuery("SELECT * FROM
test_named_complex_tuples")) {
- rs.next();
- assertEquals("[[test, [10, 20], 1.0.0], mobile]", rs.getString("prop"));
+ try (Records records = executeQuery("SELECT * FROM
test_named_complex_tuples")) {
+ for (GenericRecord record : records) {
+ // Object[] propValue = record.getTuple("prop");
+ // Adjust assertion based on actual output
+ assertArrayEquals(
+ new Object[] {new Object[] {"test", new Object[] {10L, 20L},
"1.0.0"}, "mobile"},
+ record.getTuple("prop"));
+ // assertEquals("(('test',[10,20],'1.0.0'),'mobile')",
propValue);
+ }
}
- try (ResultSet rs =
+ try (Records records =
executeQuery(
"SELECT prop.browser.name as name, prop.browser.size as size FROM
test_named_complex_tuples")) {
- rs.next();
- assertEquals("test", rs.getString("name"));
- assertEquals("[10, 20]", rs.getString("size"));
+ for (GenericRecord record : records) {
+ assertEquals("test", record.getString("name"));
+ assertArrayEquals(new Object[] {10L, 20L}, record.getTuple("size"));
+ }
}
}
@@ -292,29 +305,32 @@ public class ClickHouseIOTest extends BaseClickHouseTest {
pipeline.run().waitUntilFinish();
- try (ResultSet rs = executeQuery("SELECT * FROM test_primitive_types")) {
- rs.next();
-
- assertEquals("2030-10-01", rs.getString("f0"));
- assertEquals("2030-10-09 08:07:06", rs.getString("f1"));
- assertEquals("2.2", rs.getString("f2"));
- assertEquals("3.3", rs.getString("f3"));
- assertEquals("4", rs.getString("f4"));
- assertEquals("5", rs.getString("f5"));
- assertEquals("6", rs.getString("f6"));
- assertEquals("7", rs.getString("f7"));
- assertEquals("eight", rs.getString("f8"));
- assertEquals("9", rs.getString("f9"));
- assertEquals("10", rs.getString("f10"));
- assertEquals("11", rs.getString("f11"));
- assertEquals("12", rs.getString("f12"));
- assertEquals("abc", rs.getString("f13"));
- assertEquals("cde", rs.getString("f14"));
- assertArrayEquals(new byte[] {'q', 'w', 'e'}, rs.getBytes("f15"));
- assertArrayEquals(new byte[] {'a', 's', 'd'}, rs.getBytes("f16"));
- assertArrayEquals(new byte[] {'z', 'x', 'c'}, rs.getBytes("f17"));
- assertEquals("true", rs.getString("f18"));
- assertEquals("lowcardenality", rs.getString("f19"));
+ try (Records records = executeQuery("SELECT * FROM test_primitive_types"))
{
+ for (GenericRecord record : records) {
+ assertEquals("2030-10-01", record.getString("f0"));
+ assertEquals("2030-10-09 08:07:06", record.getString("f1"));
+ assertEquals("2.2", record.getString("f2"));
+ assertEquals("3.3", record.getString("f3"));
+ assertEquals("4", record.getString("f4"));
+ assertEquals("5", record.getString("f5"));
+ assertEquals("6", record.getString("f6"));
+ assertEquals("7", record.getString("f7"));
+ assertEquals("eight", record.getString("f8"));
+ assertEquals("9", record.getString("f9"));
+ assertEquals("10", record.getString("f10"));
+ assertEquals("11", record.getString("f11"));
+ assertEquals("12", record.getString("f12"));
+ assertEquals("abc", record.getString("f13"));
+ assertEquals("cde", record.getString("f14"));
+ assertArrayEquals(
+ new byte[] {'q', 'w', 'e'},
record.getString("f15").getBytes(StandardCharsets.UTF_8));
+ assertArrayEquals(
+ new byte[] {'a', 's', 'd'},
record.getString("f16").getBytes(StandardCharsets.UTF_8));
+ assertArrayEquals(
+ new byte[] {'z', 'x', 'c'},
record.getString("f17").getBytes(StandardCharsets.UTF_8));
+ assertEquals("true", record.getString("f18"));
+ assertEquals("lowcardenality", record.getString("f19"));
+ }
}
}
@@ -388,26 +404,28 @@ public class ClickHouseIOTest extends BaseClickHouseTest {
pipeline.run().waitUntilFinish();
- try (ResultSet rs = executeQuery("SELECT * FROM
test_array_of_primitive_types")) {
- rs.next();
-
- assertEquals("[2030-10-01, 2031-10-01]", rs.getString("f0"));
- assertEquals("[2030-10-09T08:07:06, 2031-10-09T08:07:06]",
rs.getString("f1"));
- // Since comparing float/double values is not precise, we compare the
string representation
- assertEquals("[2.2,3.3]", rs.getString("f2"));
- assertEquals("[3.3,4.4]", rs.getString("f3"));
- assertArrayEquals(new byte[] {4, 5}, (byte[])
rs.getArray("f4").getArray());
- assertArrayEquals(new short[] {5, 6}, (short[])
rs.getArray("f5").getArray());
- assertArrayEquals(new int[] {6, 7}, (int[])
rs.getArray("f6").getArray());
- assertArrayEquals(new long[] {7L, 8L}, (long[])
rs.getArray("f7").getArray());
- assertArrayEquals(new String[] {"eight", "nine"}, (String[])
rs.getArray("f8").getArray());
- assertArrayEquals(new byte[] {9, 10}, (byte[])
rs.getArray("f9").getArray());
- assertArrayEquals(new short[] {10, 11}, (short[])
rs.getArray("f10").getArray());
- assertArrayEquals(new int[] {11, 12}, (int[])
rs.getArray("f11").getArray());
- assertArrayEquals(new long[] {12L, 13L}, (long[])
rs.getArray("f12").getArray());
- assertArrayEquals(new String[] {"abc", "cde"}, (String[])
rs.getArray("f13").getArray());
- assertArrayEquals(new String[] {"cde", "abc"}, (String[])
rs.getArray("f14").getArray());
- assertArrayEquals(new boolean[] {true, false}, (boolean[])
rs.getArray("f15").getArray());
+ try (Records records = executeQuery("SELECT * FROM
test_array_of_primitive_types")) {
+ for (GenericRecord record : records) {
+ // Date/time arrays as strings
+ assertEquals("[2030-10-01, 2031-10-01]", record.getString("f0"));
+ assertEquals("[2030-10-09 08:07:06, 2031-10-09 08:07:06]",
record.getString("f1"));
+ assertEquals("[2.2, 3.3]", record.getString("f2"));
+ assertEquals("[3.3, 4.4]", record.getString("f3"));
+
+ // Use the proper typed array methods
+ assertArrayEquals(new byte[] {4, 5}, record.getByteArray("f4")); //
Int8
+ assertArrayEquals(new short[] {5, 6}, record.getShortArray("f5")); //
Int16
+ assertArrayEquals(new int[] {6, 7}, record.getIntArray("f6")); // Int32
+ assertArrayEquals(new long[] {7L, 8L}, record.getLongArray("f7")); //
Int64
+ assertArrayEquals(new String[] {"eight", "nine"},
record.getStringArray("f8")); // String
+ assertArrayEquals(new short[] {9, 10}, record.getShortArray("f9")); //
UInt8 -> short
+ assertArrayEquals(new int[] {10, 11}, record.getIntArray("f10")); //
UInt16 -> int
+ assertArrayEquals(new long[] {11, 12}, record.getLongArray("f11")); //
UInt32 -> long
+ assertEquals("[12, 13]", record.getString("f12")); // UInt64
+ assertEquals("[abc, cde]", record.getString("f13")); // FixedString
+ assertEquals("[cde, abc]", record.getString("f14")); // FixedString
+ assertArrayEquals(new boolean[] {true, false},
record.getBooleanArray("f15"));
+ }
}
}
@@ -475,6 +493,12 @@ public class ClickHouseIOTest extends BaseClickHouseTest {
}
private <T> ClickHouseIO.Write<T> write(String table) {
- return ClickHouseIO.<T>write(clickHouse.getJdbcUrl(),
table).withMaxRetries(0);
+ Properties properties = new Properties();
+ properties.setProperty("user", clickHouse.getUsername());
+ properties.setProperty("password", clickHouse.getPassword());
+
+ return ClickHouseIO.<T>write(clickHouseUrl, database, table)
+ .withProperties(properties)
+ .withMaxRetries(0);
}
}
diff --git
a/sdks/java/io/clickhouse/src/test/java/org/apache/beam/sdk/io/clickhouse/ClickHouseJdbcUrlParserTest.java
b/sdks/java/io/clickhouse/src/test/java/org/apache/beam/sdk/io/clickhouse/ClickHouseJdbcUrlParserTest.java
new file mode 100644
index 00000000000..4b994522d9b
--- /dev/null
+++
b/sdks/java/io/clickhouse/src/test/java/org/apache/beam/sdk/io/clickhouse/ClickHouseJdbcUrlParserTest.java
@@ -0,0 +1,341 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.clickhouse;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Properties;
+import org.apache.beam.sdk.io.clickhouse.ClickHouseJdbcUrlParser.ParsedJdbcUrl;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link ClickHouseJdbcUrlParser}. */
+@RunWith(JUnit4.class)
+public class ClickHouseJdbcUrlParserTest {
+
+ @Test
+ public void testBasicJdbcUrl() {
+ String jdbcUrl = "jdbc:clickhouse://localhost:8123/default";
+ ParsedJdbcUrl parsed = ClickHouseJdbcUrlParser.parse(jdbcUrl);
+
+ assertEquals("http://localhost:8123", parsed.getClickHouseUrl());
+ assertEquals("default", parsed.getDatabase());
+ assertTrue(parsed.getProperties().isEmpty());
+ }
+
+ @Test
+ public void testJdbcUrlWithCustomDatabase() {
+ String jdbcUrl = "jdbc:clickhouse://localhost:8123/mydb";
+ ParsedJdbcUrl parsed = ClickHouseJdbcUrlParser.parse(jdbcUrl);
+
+ assertEquals("http://localhost:8123", parsed.getClickHouseUrl());
+ assertEquals("mydb", parsed.getDatabase());
+ assertTrue(parsed.getProperties().isEmpty());
+ }
+
+ @Test
+ public void testJdbcUrlWithoutPort() {
+ String jdbcUrl = "jdbc:clickhouse://localhost/mydb";
+ ParsedJdbcUrl parsed = ClickHouseJdbcUrlParser.parse(jdbcUrl);
+
+ assertEquals("http://localhost:8123", parsed.getClickHouseUrl());
+ assertEquals("mydb", parsed.getDatabase());
+ }
+
+ @Test
+ public void testJdbcUrlWithoutDatabase() {
+ String jdbcUrl = "jdbc:clickhouse://localhost:8123";
+ ParsedJdbcUrl parsed = ClickHouseJdbcUrlParser.parse(jdbcUrl);
+
+ assertEquals("http://localhost:8123", parsed.getClickHouseUrl());
+ assertEquals("default", parsed.getDatabase());
+ }
+
+ @Test
+ public void testJdbcUrlWithTrailingSlash() {
+ String jdbcUrl = "jdbc:clickhouse://localhost:8123/";
+ ParsedJdbcUrl parsed = ClickHouseJdbcUrlParser.parse(jdbcUrl);
+
+ assertEquals("http://localhost:8123", parsed.getClickHouseUrl());
+ assertEquals("default", parsed.getDatabase());
+ }
+
+ @Test
+ public void testJdbcUrlWithHttpPrefix() {
+ String jdbcUrl = "jdbc:clickhouse:http://localhost:8123/mydb";
+ ParsedJdbcUrl parsed = ClickHouseJdbcUrlParser.parse(jdbcUrl);
+
+ assertEquals("http://localhost:8123", parsed.getClickHouseUrl());
+ assertEquals("mydb", parsed.getDatabase());
+ }
+
+ @Test
+ public void testJdbcUrlWithHttpsPrefix() {
+ String jdbcUrl = "jdbc:clickhouse:https://localhost:8443/mydb";
+ ParsedJdbcUrl parsed = ClickHouseJdbcUrlParser.parse(jdbcUrl);
+
+ assertEquals("https://localhost:8443", parsed.getClickHouseUrl());
+ assertEquals("mydb", parsed.getDatabase());
+ }
+
+ @Test
+ public void testJdbcUrlWithHttpsWithoutPort() {
+ String jdbcUrl = "jdbc:clickhouse:https://localhost/mydb";
+ ParsedJdbcUrl parsed = ClickHouseJdbcUrlParser.parse(jdbcUrl);
+
+ assertEquals("https://localhost:8443", parsed.getClickHouseUrl());
+ assertEquals("mydb", parsed.getDatabase());
+ }
+
+ @Test
+ public void testJdbcUrlWithSingleParameter() {
+ String jdbcUrl = "jdbc:clickhouse://localhost:8123/mydb?user=admin";
+ ParsedJdbcUrl parsed = ClickHouseJdbcUrlParser.parse(jdbcUrl);
+
+ assertEquals("http://localhost:8123", parsed.getClickHouseUrl());
+ assertEquals("mydb", parsed.getDatabase());
+ assertEquals("admin", parsed.getProperties().getProperty("user"));
+ }
+
+ @Test
+ public void testJdbcUrlWithMultipleParameters() {
+ String jdbcUrl =
+
"jdbc:clickhouse://localhost:8123/mydb?user=admin&password=secret&compress=true";
+ ParsedJdbcUrl parsed = ClickHouseJdbcUrlParser.parse(jdbcUrl);
+
+ assertEquals("http://localhost:8123", parsed.getClickHouseUrl());
+ assertEquals("mydb", parsed.getDatabase());
+
+ Properties props = parsed.getProperties();
+ assertEquals("admin", props.getProperty("user"));
+ assertEquals("secret", props.getProperty("password"));
+ assertEquals("true", props.getProperty("compress"));
+ }
+
+ @Test
+ public void testJdbcUrlWithUrlEncodedParameters() {
+ String jdbcUrl =
"jdbc:clickhouse://localhost:8123/mydb?user=my%20user&password=p%40ssw0rd";
+ ParsedJdbcUrl parsed = ClickHouseJdbcUrlParser.parse(jdbcUrl);
+
+ Properties props = parsed.getProperties();
+ assertEquals("my user", props.getProperty("user"));
+ assertEquals("p@ssw0rd", props.getProperty("password"));
+ }
+
+ @Test
+ public void testJdbcUrlWithParameterWithoutValue() {
+ String jdbcUrl = "jdbc:clickhouse://localhost:8123/mydb?compress";
+ ParsedJdbcUrl parsed = ClickHouseJdbcUrlParser.parse(jdbcUrl);
+
+ assertEquals("true", parsed.getProperties().getProperty("compress"));
+ }
+
+ @Test
+ public void testJdbcUrlShorthandCh() {
+ String jdbcUrl = "jdbc:ch://localhost:8123/mydb";
+ ParsedJdbcUrl parsed = ClickHouseJdbcUrlParser.parse(jdbcUrl);
+
+ assertEquals("http://localhost:8123", parsed.getClickHouseUrl());
+ assertEquals("mydb", parsed.getDatabase());
+ }
+
+ @Test
+ public void testJdbcUrlWithRemoteHost() {
+ String jdbcUrl =
"jdbc:clickhouse://clickhouse.example.com:9000/production";
+ ParsedJdbcUrl parsed = ClickHouseJdbcUrlParser.parse(jdbcUrl);
+
+ assertEquals("http://clickhouse.example.com:9000",
parsed.getClickHouseUrl());
+ assertEquals("production", parsed.getDatabase());
+ }
+
+ @Test
+ public void testJdbcUrlWithIpAddress() {
+ String jdbcUrl = "jdbc:clickhouse://192.168.1.100:8123/mydb";
+ ParsedJdbcUrl parsed = ClickHouseJdbcUrlParser.parse(jdbcUrl);
+
+ assertEquals("http://192.168.1.100:8123", parsed.getClickHouseUrl());
+ assertEquals("mydb", parsed.getDatabase());
+ }
+
+ @Test
+ public void testJdbcUrlWithComplexQueryString() {
+ String jdbcUrl =
+ "jdbc:clickhouse://localhost:8123/mydb?"
+ + "user=admin&password=secret&"
+ + "socket_timeout=30000&"
+ + "connection_timeout=10000&"
+ + "compress=true";
+ ParsedJdbcUrl parsed = ClickHouseJdbcUrlParser.parse(jdbcUrl);
+
+ Properties props = parsed.getProperties();
+ assertEquals("admin", props.getProperty("user"));
+ assertEquals("secret", props.getProperty("password"));
+ assertEquals("30000", props.getProperty("socket_timeout"));
+ assertEquals("10000", props.getProperty("connection_timeout"));
+ assertEquals("true", props.getProperty("compress"));
+ }
+
+ @Test
+ public void testJdbcUrlCaseInsensitivePrefix() {
+ String jdbcUrl = "JDBC:CLICKHOUSE://localhost:8123/mydb";
+ ParsedJdbcUrl parsed = ClickHouseJdbcUrlParser.parse(jdbcUrl);
+
+ assertEquals("http://localhost:8123", parsed.getClickHouseUrl());
+ assertEquals("mydb", parsed.getDatabase());
+ }
+
+ @Test
+ public void testJdbcUrlWithDatabaseContainingUnderscore() {
+ String jdbcUrl = "jdbc:clickhouse://localhost:8123/my_database_name";
+ ParsedJdbcUrl parsed = ClickHouseJdbcUrlParser.parse(jdbcUrl);
+
+ assertEquals("my_database_name", parsed.getDatabase());
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testNullJdbcUrl() {
+ ClickHouseJdbcUrlParser.parse(null);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testEmptyJdbcUrl() {
+ ClickHouseJdbcUrlParser.parse("");
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testInvalidJdbcPrefix() {
+ ClickHouseJdbcUrlParser.parse("jdbc:mysql://localhost:3306/mydb");
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testInvalidSchemeFtp() {
+ ClickHouseJdbcUrlParser.parse("jdbc:clickhouse:ftp://localhost:8123/mydb");
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testInvalidSchemeGopher() {
+
ClickHouseJdbcUrlParser.parse("jdbc:clickhouse:gopher://localhost:8123/mydb");
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testInvalidSchemeFile() {
+
ClickHouseJdbcUrlParser.parse("jdbc:clickhouse:file://localhost:8123/mydb");
+ }
+
+ @Test
+ public void testValidHttpSchemeExplicit() {
+ String jdbcUrl = "jdbc:clickhouse:http://localhost:8123/mydb";
+ ParsedJdbcUrl parsed = ClickHouseJdbcUrlParser.parse(jdbcUrl);
+
+ assertEquals("http://localhost:8123", parsed.getClickHouseUrl());
+ assertEquals("mydb", parsed.getDatabase());
+ }
+
+ @Test
+ public void testValidHttpsSchemeExplicit() {
+ String jdbcUrl = "jdbc:clickhouse:https://localhost:8443/mydb";
+ ParsedJdbcUrl parsed = ClickHouseJdbcUrlParser.parse(jdbcUrl);
+
+ assertEquals("https://localhost:8443", parsed.getClickHouseUrl());
+ assertEquals("mydb", parsed.getDatabase());
+ }
+
+ @Test
+ public void testImplicitHttpScheme() {
+ String jdbcUrl = "jdbc:clickhouse://localhost:8123/mydb";
+ ParsedJdbcUrl parsed = ClickHouseJdbcUrlParser.parse(jdbcUrl);
+
+ assertEquals("http://localhost:8123", parsed.getClickHouseUrl());
+ assertEquals("mydb", parsed.getDatabase());
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testMissingHost() {
+ ClickHouseJdbcUrlParser.parse("jdbc:clickhouse://:8123/mydb");
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testMalformedUrl() {
+
ClickHouseJdbcUrlParser.parse("jdbc:clickhouse://localhost:invalid_port/mydb");
+ }
+
+ @Test
+ public void testJdbcUrlWithoutJdbcPrefix() {
+ // Should still work if user somehow passes URL without jdbc: prefix
+ String jdbcUrl = "clickhouse://localhost:8123/mydb";
+ ParsedJdbcUrl parsed = ClickHouseJdbcUrlParser.parse(jdbcUrl);
+
+ assertEquals("http://localhost:8123", parsed.getClickHouseUrl());
+ assertEquals("mydb", parsed.getDatabase());
+ }
+
+ @Test
+ public void testBackwardCompatibilityScenario() {
+ // Simulating a real-world legacy JDBC URL
+ String legacyJdbcUrl =
+ "jdbc:clickhouse://prod-clickhouse.internal:8123/analytics?"
+ + "user=analytics_user&"
+ + "password=secure123&"
+ + "compress=true&"
+ + "socket_timeout=60000";
+
+ ParsedJdbcUrl parsed = ClickHouseJdbcUrlParser.parse(legacyJdbcUrl);
+
+ assertEquals("http://prod-clickhouse.internal:8123",
parsed.getClickHouseUrl());
+ assertEquals("analytics", parsed.getDatabase());
+
+ Properties props = parsed.getProperties();
+ assertEquals("analytics_user", props.getProperty("user"));
+ assertEquals("secure123", props.getProperty("password"));
+ assertEquals("true", props.getProperty("compress"));
+ assertEquals("60000", props.getProperty("socket_timeout"));
+ }
+
+ @Test
+ public void testJdbcUrlWithMultipleSlashesInPath() {
+ // Edge case: malformed URL with multiple slashes
+ String jdbcUrl = "jdbc:clickhouse://localhost:8123//mydb";
+ ParsedJdbcUrl parsed = ClickHouseJdbcUrlParser.parse(jdbcUrl);
+
+ // URI parsing should normalize this
+ assertEquals("http://localhost:8123", parsed.getClickHouseUrl());
+ assertEquals("/mydb", parsed.getDatabase()); // Will have leading slash
+ }
+
+ @Test
+ public void testJdbcUrlWithQueryButNoDatabase() {
+ String jdbcUrl = "jdbc:clickhouse://localhost:8123?user=admin";
+ ParsedJdbcUrl parsed = ClickHouseJdbcUrlParser.parse(jdbcUrl);
+
+ assertEquals("http://localhost:8123", parsed.getClickHouseUrl());
+ assertEquals("default", parsed.getDatabase());
+ assertEquals("admin", parsed.getProperties().getProperty("user"));
+ }
+
+ @Test
+ public void testJdbcUrlWithEmptyQueryParameter() {
+ String jdbcUrl =
"jdbc:clickhouse://localhost:8123/mydb?user=&password=secret";
+ ParsedJdbcUrl parsed = ClickHouseJdbcUrlParser.parse(jdbcUrl);
+
+ Properties props = parsed.getProperties();
+ assertEquals("", props.getProperty("user"));
+ assertEquals("secret", props.getProperty("password"));
+ }
+}