This is an automated email from the ASF dual-hosted git repository.
Abacn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 378e10de4ea ClickHouseIO: Add DateTime64 support for sub-second
timestamp precision (#38510)
378e10de4ea is described below
commit 378e10de4eaa26d7a9eec3e13573a1ec794acaa3
Author: Elia Liu <[email protected]>
AuthorDate: Tue Jun 16 02:34:01 2026 +1000
ClickHouseIO: Add DateTime64 support for sub-second timestamp precision
(#38510)
ClickHouseIO previously only recognized second-precision DateTime, so
pipelines emitting sub-second timestamps could not write to DateTime64
columns.
This adds TypeName.DATETIME64 with a validated precision (0-9) and
ColumnType.dateTime64(precision), plus a no-arg factory that defaults
to precision 3. The parser accepts DateTime64[(precision[, 'timezone'])],
including a bare DateTime64 and the type nested inside Nullable(...) and
Array(...); the timezone argument is accepted but not stored.
Beam field-type mapping is Joda DATETIME for precision up to 3,
SqlTypes.TIMESTAMP for 4-6, and NanosInstant for 7 and above. The writer
encodes the value as a little-endian int64 of
epoch_seconds * 10^precision + sub_second_units, using floor division so
negative timestamps match ClickHouse.
Tests cover the parser, schema mapping, the encoder (Joda and java.time
inputs, negative and overflow cases, null input), and ClickHouse
testcontainer round-trips for precisions 3, 6 and 9 plus nullable cases.
Closes #38466
---
CHANGES.md | 1 +
.../beam/sdk/io/clickhouse/ClickHouseIO.java | 3 +
.../beam/sdk/io/clickhouse/ClickHouseWriter.java | 40 ++++++
.../apache/beam/sdk/io/clickhouse/TableSchema.java | 53 +++++++-
.../clickhouse/src/main/javacc/ColumnTypeParser.jj | 27 ++++
.../beam/sdk/io/clickhouse/ClickHouseIOIT.java | 119 +++++++++++++++++
.../sdk/io/clickhouse/ClickHouseWriterTest.java | 141 +++++++++++++++++++++
.../beam/sdk/io/clickhouse/TableSchemaTest.java | 102 +++++++++++++++
8 files changed, 485 insertions(+), 1 deletion(-)
diff --git a/CHANGES.md b/CHANGES.md
index c70fef0cf8e..eb24a71d0bf 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -67,6 +67,7 @@
* Support for reading from Delta Lake added (Java)
([#38551](https://github.com/apache/beam/issues/38551)).
* Support for X source added (Java/Python)
([#X](https://github.com/apache/beam/issues/X)).
+* ClickHouseIO: support writing `DateTime64(precision[, 'timezone'])` columns
with sub-second precision (Java)
([#38466](https://github.com/apache/beam/issues/38466)).
## New Features / Improvements
diff --git
a/sdks/java/io/clickhouse/src/main/java/org/apache/beam/sdk/io/clickhouse/ClickHouseIO.java
b/sdks/java/io/clickhouse/src/main/java/org/apache/beam/sdk/io/clickhouse/ClickHouseIO.java
index a8875407b43..6798e2f2bd7 100644
---
a/sdks/java/io/clickhouse/src/main/java/org/apache/beam/sdk/io/clickhouse/ClickHouseIO.java
+++
b/sdks/java/io/clickhouse/src/main/java/org/apache/beam/sdk/io/clickhouse/ClickHouseIO.java
@@ -38,6 +38,8 @@ import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.schemas.FieldAccessDescriptor;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.logicaltypes.FixedBytes;
+import org.apache.beam.sdk.schemas.logicaltypes.NanosInstant;
+import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes;
import org.apache.beam.sdk.schemas.transforms.Select;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
@@ -137,6 +139,7 @@ import org.slf4j.LoggerFactory;
* <tr><td>{@link TableSchema.TypeName#UINT64}</td> <td>{@link
Schema.TypeName#INT64}</td></tr>
* <tr><td>{@link TableSchema.TypeName#DATE}</td> <td>{@link
Schema.TypeName#DATETIME}</td></tr>
* <tr><td>{@link TableSchema.TypeName#DATETIME}</td> <td>{@link
Schema.TypeName#DATETIME}</td></tr>
+ * <tr><td>{@link TableSchema.TypeName#DATETIME64}</td> <td>{@link
Schema.TypeName#DATETIME} (precision ≤ 3), {@link SqlTypes#TIMESTAMP}
(4–6), or {@link NanosInstant} (≥ 7)</td></tr>
* <tr><td>{@link TableSchema.TypeName#ARRAY}</td> <td>{@link
Schema.TypeName#ARRAY}</td></tr>
* <tr><td>{@link TableSchema.TypeName#ENUM8}</td> <td>{@link
Schema.TypeName#STRING}</td></tr>
* <tr><td>{@link TableSchema.TypeName#ENUM16}</td> <td>{@link
Schema.TypeName#STRING}</td></tr>
diff --git
a/sdks/java/io/clickhouse/src/main/java/org/apache/beam/sdk/io/clickhouse/ClickHouseWriter.java
b/sdks/java/io/clickhouse/src/main/java/org/apache/beam/sdk/io/clickhouse/ClickHouseWriter.java
index 73735f56864..4d9f072e598 100644
---
a/sdks/java/io/clickhouse/src/main/java/org/apache/beam/sdk/io/clickhouse/ClickHouseWriter.java
+++
b/sdks/java/io/clickhouse/src/main/java/org/apache/beam/sdk/io/clickhouse/ClickHouseWriter.java
@@ -39,6 +39,39 @@ import org.joda.time.ReadableInstant;
public class ClickHouseWriter {
private static final Instant EPOCH_INSTANT = new Instant(0L);
+ // 10^0 through 10^9 inclusive — precision is validated in [0, 9] by
ColumnType.dateTime64.
+ private static final long[] POW10 = {
+ 1L, 10L, 100L, 1_000L, 10_000L, 100_000L, 1_000_000L, 10_000_000L,
100_000_000L, 1_000_000_000L
+ };
+
+ /**
+ * Encodes a timestamp into ClickHouse's {@code DateTime64(precision)}
representation: a signed
+ * 64-bit integer counting ticks of size 10<sup>-precision</sup> seconds
since the Unix epoch.
+ *
+ * <p>Accepts either a Joda {@link ReadableInstant} (millisecond precision)
or a {@link
+ * java.time.Instant} (nanosecond precision). Sub-tick fractions are
truncated toward negative
+ * infinity, matching ClickHouse's own encoding for negative timestamps.
+ */
+ static long encodeDateTime64(Object value, int precision) {
+ long epochSecond;
+ int nanoOfSecond;
+ if (value instanceof java.time.Instant) {
+ java.time.Instant inst = (java.time.Instant) value;
+ epochSecond = inst.getEpochSecond();
+ nanoOfSecond = inst.getNano();
+ } else if (value instanceof ReadableInstant) {
+ long millis = ((ReadableInstant) value).getMillis();
+ epochSecond = Math.floorDiv(millis, 1000L);
+ nanoOfSecond = (int) Math.floorMod(millis, 1000L) * 1_000_000;
+ } else {
+ throw new IllegalArgumentException(
+ "DateTime64 requires a Joda ReadableInstant or java.time.Instant,
got "
+ + (value == null ? "null" : value.getClass().getName()));
+ }
+ long subSecondTicks = nanoOfSecond / POW10[9 - precision];
+ return Math.addExact(Math.multiplyExact(epochSecond, POW10[precision]),
subSecondTicks);
+ }
+
@SuppressWarnings("unchecked")
static void writeNullableValue(ClickHouseOutputStream stream, ColumnType
columnType, Object value)
throws IOException {
@@ -138,6 +171,13 @@ public class ClickHouseWriter {
BinaryStreamUtils.writeUnsignedInt32(stream, epochSeconds);
break;
+ case DATETIME64:
+ int precision =
+ Preconditions.checkNotNull(
+ columnType.precision(), "DateTime64 column is missing
precision");
+ BinaryStreamUtils.writeInt64(stream, encodeDateTime64(value,
precision));
+ break;
+
case ARRAY:
List<Object> values = (List<Object>) value;
BinaryStreamUtils.writeVarInt(stream, values.size());
diff --git
a/sdks/java/io/clickhouse/src/main/java/org/apache/beam/sdk/io/clickhouse/TableSchema.java
b/sdks/java/io/clickhouse/src/main/java/org/apache/beam/sdk/io/clickhouse/TableSchema.java
index baee77c5f9a..1b9fdffd4c8 100644
---
a/sdks/java/io/clickhouse/src/main/java/org/apache/beam/sdk/io/clickhouse/TableSchema.java
+++
b/sdks/java/io/clickhouse/src/main/java/org/apache/beam/sdk/io/clickhouse/TableSchema.java
@@ -27,6 +27,8 @@ import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.logicaltypes.FixedBytes;
+import org.apache.beam.sdk.schemas.logicaltypes.NanosInstant;
+import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes;
import org.checkerframework.checker.nullness.qual.Nullable;
/**
@@ -39,6 +41,9 @@ import org.checkerframework.checker.nullness.qual.Nullable;
})
public abstract class TableSchema implements Serializable {
+ private static final Schema.FieldType NANOS_INSTANT_TYPE =
+ Schema.FieldType.logicalType(new NanosInstant());
+
public abstract List<Column> columns();
public static TableSchema of(Column... columns) {
@@ -76,6 +81,22 @@ public abstract class TableSchema implements Serializable {
case DATETIME:
return Schema.FieldType.DATETIME;
+ case DATETIME64:
+ // Pick the narrowest Beam logical type that still round-trips the
requested precision:
+ // ≤ 3 (milliseconds) → Joda DATETIME, keeping existing
pipelines unchanged.
+ // 4–6 (down to microseconds) → SqlTypes.TIMESTAMP (MicrosInstant) —
interoperable
+ // with BigQueryIO, Avro and Beam SQL.
+ // ≥ 7 (sub-microsecond) → NanosInstant, the only built-in type
that preserves
+ // full nanosecond precision through
Row construction.
+ int p = columnType.precision();
+ if (p <= 3) {
+ return Schema.FieldType.DATETIME;
+ } else if (p <= 6) {
+ return Schema.FieldType.logicalType(SqlTypes.TIMESTAMP);
+ } else {
+ return NANOS_INSTANT_TYPE;
+ }
+
case STRING:
return Schema.FieldType.STRING;
@@ -163,6 +184,7 @@ public abstract class TableSchema implements Serializable {
// Primitive types
DATE,
DATETIME,
+ DATETIME64,
ENUM8,
ENUM16,
FIXEDSTRING,
@@ -238,6 +260,9 @@ public abstract class TableSchema implements Serializable {
public abstract @Nullable Map<String, ColumnType> tupleTypes();
+ /** Sub-second precision (0–9) of {@code DateTime64}. {@code null} for
other types. */
+ public abstract @Nullable Integer precision();
+
public ColumnType withNullable(boolean nullable) {
return toBuilder().nullable(nullable).build();
}
@@ -258,6 +283,26 @@ public abstract class TableSchema implements Serializable {
.build();
}
+ /** Default {@code DateTime64} precision in ClickHouse. */
+ public static final int DEFAULT_DATETIME64_PRECISION = 3;
+
+ /** Returns a {@code DateTime64} type with ClickHouse's default precision
of 3. */
+ public static ColumnType dateTime64() {
+ return dateTime64(DEFAULT_DATETIME64_PRECISION);
+ }
+
+ public static ColumnType dateTime64(int precision) {
+ if (precision < 0 || precision > 9) {
+ throw new IllegalArgumentException(
+ "DateTime64 precision must be in [0, 9], got " + precision);
+ }
+ return ColumnType.builder()
+ .typeName(TypeName.DATETIME64)
+ .nullable(false)
+ .precision(precision)
+ .build();
+ }
+
public static ColumnType enum8(Map<String, Integer> enumValues) {
return ColumnType.builder()
.typeName(TypeName.ENUM8)
@@ -296,13 +341,17 @@ public abstract class TableSchema implements Serializable
{
*
* @param str string representation of ClickHouse type
* @return type of ClickHouse column
+ * @throws IllegalArgumentException if {@code str} is not a valid
ClickHouse column type
*/
public static ColumnType parse(String str) {
try {
return new
org.apache.beam.sdk.io.clickhouse.impl.parser.ColumnTypeParser(
new StringReader(str))
.parse();
- } catch (org.apache.beam.sdk.io.clickhouse.impl.parser.ParseException e)
{
+ } catch (org.apache.beam.sdk.io.clickhouse.impl.parser.ParseException
+ | org.apache.beam.sdk.io.clickhouse.impl.parser.TokenMgrError
+ | IllegalArgumentException e) {
+ // Funnel lexical, syntactic and validation failures into one error
surface.
throw new IllegalArgumentException("failed to parse", e);
}
}
@@ -367,6 +416,8 @@ public abstract class TableSchema implements Serializable {
public abstract Builder tupleTypes(Map<String, ColumnType>
tupleElements);
+ public abstract Builder precision(@Nullable Integer precision);
+
public abstract ColumnType build();
}
}
diff --git a/sdks/java/io/clickhouse/src/main/javacc/ColumnTypeParser.jj
b/sdks/java/io/clickhouse/src/main/javacc/ColumnTypeParser.jj
index 5bb9ba4171a..53ad991b67c 100644
--- a/sdks/java/io/clickhouse/src/main/javacc/ColumnTypeParser.jj
+++ b/sdks/java/io/clickhouse/src/main/javacc/ColumnTypeParser.jj
@@ -79,6 +79,7 @@ TOKEN :
{
< ARRAY : "ARRAY" >
| < DATE : "DATE" >
+ | < DATETIME64 : "DATETIME64" >
| < DATETIME : "DATETIME" >
| < ENUM8 : "ENUM8" >
| < ENUM16 : "ENUM16" >
@@ -206,6 +207,7 @@ private ColumnType primitive() :
{
TypeName type;
String size;
+ ColumnType ct;
}
{
(
@@ -214,10 +216,35 @@ private ColumnType primitive() :
(<FIXEDSTRING> <LPAREN> ( size = integer() ) <RPAREN>) {
return ColumnType.fixedString(Integer.valueOf(size));
}
+ |
+ (ct = dateTime64()) { return ct; }
)
}
+private ColumnType dateTime64() :
+{
+ String precision = null;
+}
+{
+ (
+ <DATETIME64>
+ (
+ <LPAREN> ( precision = integer() )
+ // The timezone is display-only metadata; accept the syntax and
ignore the value.
+ ( <COMMA> string() )?
+ <RPAREN>
+ )?
+ )
+ {
+ // Bare DateTime64 defaults to precision 3, matching ClickHouse.
+ if (precision == null) {
+ return ColumnType.dateTime64();
+ }
+ return ColumnType.dateTime64(Integer.parseInt(precision));
+ }
+}
+
private ColumnType nullable() :
{
ColumnType ct;
diff --git
a/sdks/java/io/clickhouse/src/test/java/org/apache/beam/sdk/io/clickhouse/ClickHouseIOIT.java
b/sdks/java/io/clickhouse/src/test/java/org/apache/beam/sdk/io/clickhouse/ClickHouseIOIT.java
index 8ce412c5f88..da435f20684 100644
---
a/sdks/java/io/clickhouse/src/test/java/org/apache/beam/sdk/io/clickhouse/ClickHouseIOIT.java
+++
b/sdks/java/io/clickhouse/src/test/java/org/apache/beam/sdk/io/clickhouse/ClickHouseIOIT.java
@@ -32,6 +32,8 @@ import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.Schema.FieldType;
import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
import org.apache.beam.sdk.schemas.logicaltypes.FixedBytes;
+import org.apache.beam.sdk.schemas.logicaltypes.NanosInstant;
+import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.util.ReleaseInfo;
@@ -49,6 +51,18 @@ import org.junit.runners.JUnit4;
@RunWith(JUnit4.class)
public class ClickHouseIOIT extends BaseClickHouseTest {
+ private static final long MICROS_PER_SECOND = 1_000_000L;
+ private static final long NANOS_PER_SECOND = 1_000_000_000L;
+
+ // Shared DateTime64 test instant 2026-05-15T12:34:56Z; its .789012345
sub-second component
+ // exercises every precision bucket.
+ private static final long TEST_EPOCH_SECONDS = 1_778_848_496L;
+ // Nano-of-second; the trailing 345 is not micro-aligned.
+ private static final long TEST_NANOS_OF_SECOND = 789_012_345L;
+ // The same sub-second component truncated to whole microseconds.
+ private static final long TEST_MICROS_OF_SECOND = 789_012L;
+ private static final long TEST_MICRO_ALIGNED_NANOS_OF_SECOND =
TEST_MICROS_OF_SECOND * 1_000L;
+
@Rule public TestPipeline pipeline = TestPipeline.create();
@Test
@@ -480,6 +494,111 @@ public class ClickHouseIOIT extends BaseClickHouseTest {
assertEquals(12L, sum1);
}
+ @Test
+ public void testDateTime64Millis() throws Exception {
+ Schema schema = Schema.of(Schema.Field.of("ts", FieldType.DATETIME));
+ DateTime ts = new DateTime(2026, 5, 15, 12, 34, 56, 789, DateTimeZone.UTC);
+ Row row = Row.withSchema(schema).addValue(ts).build();
+
+ executeSql("CREATE TABLE test_datetime64_ms (ts DateTime64(3, 'UTC'))
ENGINE=Log");
+
+
pipeline.apply(Create.of(row).withRowSchema(schema)).apply(write("test_datetime64_ms"));
+ pipeline.run().waitUntilFinish();
+
+ // toUnixTimestamp64Milli returns the underlying tick count, which is the
most stable thing to
+ // assert across CH versions (string formatting may include trailing zeros
depending on
+ // version).
+ long ticks = executeQueryAsLong("SELECT toUnixTimestamp64Milli(ts) FROM
test_datetime64_ms");
+ assertEquals(ts.getMillis(), ticks);
+ }
+
+ @Test
+ public void testDateTime64Micros() throws Exception {
+ Schema schema = Schema.of(Schema.Field.of("ts",
FieldType.logicalType(SqlTypes.TIMESTAMP)));
+ // Micro-aligned nanos, so MicrosInstant accepts the value.
+ java.time.Instant ts =
+ java.time.Instant.ofEpochSecond(TEST_EPOCH_SECONDS,
TEST_MICRO_ALIGNED_NANOS_OF_SECOND);
+ Row row = Row.withSchema(schema).addValue(ts).build();
+
+ executeSql("CREATE TABLE test_datetime64_us (ts DateTime64(6))
ENGINE=Log");
+
+
pipeline.apply(Create.of(row).withRowSchema(schema)).apply(write("test_datetime64_us"));
+ pipeline.run().waitUntilFinish();
+
+ long ticks = executeQueryAsLong("SELECT toUnixTimestamp64Micro(ts) FROM
test_datetime64_us");
+ assertEquals(TEST_EPOCH_SECONDS * MICROS_PER_SECOND +
TEST_MICROS_OF_SECOND, ticks);
+ }
+
+ @Test
+ public void testDateTime64Nanos() throws Exception {
+ // DateTime64(9) must preserve full nanosecond precision. Use NanosInstant
directly
+ // because SqlTypes.TIMESTAMP (MicrosInstant) rejects non-micro-aligned
nanos like the
+ // trailing 345.
+ Schema schema = Schema.of(Schema.Field.of("ts", FieldType.logicalType(new
NanosInstant())));
+ java.time.Instant ts =
+ java.time.Instant.ofEpochSecond(TEST_EPOCH_SECONDS,
TEST_NANOS_OF_SECOND);
+ Row row = Row.withSchema(schema).addValue(ts).build();
+
+ executeSql("CREATE TABLE test_datetime64_ns (ts DateTime64(9))
ENGINE=Log");
+
+
pipeline.apply(Create.of(row).withRowSchema(schema)).apply(write("test_datetime64_ns"));
+ pipeline.run().waitUntilFinish();
+
+ long ticks = executeQueryAsLong("SELECT toUnixTimestamp64Nano(ts) FROM
test_datetime64_ns");
+ assertEquals(TEST_EPOCH_SECONDS * NANOS_PER_SECOND + TEST_NANOS_OF_SECOND,
ticks);
+ }
+
+ @Test
+ public void testNullableDateTime64() throws Exception {
+ Schema schema =
+ Schema.of(Schema.Field.nullable("ts",
FieldType.logicalType(SqlTypes.TIMESTAMP)));
+ java.time.Instant ts =
+ java.time.Instant.ofEpochSecond(TEST_EPOCH_SECONDS,
TEST_MICRO_ALIGNED_NANOS_OF_SECOND);
+ Row row1 = Row.withSchema(schema).addValue(ts).build();
+ Row row2 = Row.withSchema(schema).addValue(null).build();
+
+ executeSql("CREATE TABLE test_nullable_datetime64 (ts
Nullable(DateTime64(6))) ENGINE=Log");
+
+ pipeline
+ .apply(Create.of(row1, row2).withRowSchema(schema))
+ .apply(write("test_nullable_datetime64"));
+ pipeline.run().waitUntilFinish();
+
+ long total = executeQueryAsLong("SELECT COUNT(*) FROM
test_nullable_datetime64");
+ long nonNull = executeQueryAsLong("SELECT COUNT(ts) FROM
test_nullable_datetime64");
+ assertEquals(2L, total);
+ assertEquals(1L, nonNull);
+ }
+
+ @Test
+ public void testNullableDateTime64Nanos() throws Exception {
+ // Nullable columns take the writeNullableValue path; verify it preserves
full nanosecond
+ // precision alongside an actual null.
+ Schema schema =
+ Schema.of(Schema.Field.nullable("ts", FieldType.logicalType(new
NanosInstant())));
+ java.time.Instant ts =
+ java.time.Instant.ofEpochSecond(TEST_EPOCH_SECONDS,
TEST_NANOS_OF_SECOND);
+ Row row1 = Row.withSchema(schema).addValue(ts).build();
+ Row row2 = Row.withSchema(schema).addValue(null).build();
+
+ executeSql("CREATE TABLE test_nullable_datetime64_ns (ts
Nullable(DateTime64(9))) ENGINE=Log");
+
+ pipeline
+ .apply(Create.of(row1, row2).withRowSchema(schema))
+ .apply(write("test_nullable_datetime64_ns"));
+ pipeline.run().waitUntilFinish();
+
+ long total = executeQueryAsLong("SELECT COUNT(*) FROM
test_nullable_datetime64_ns");
+ long nonNull = executeQueryAsLong("SELECT COUNT(ts) FROM
test_nullable_datetime64_ns");
+ long ticks =
+ executeQueryAsLong(
+ "SELECT toUnixTimestamp64Nano(ts) FROM test_nullable_datetime64_ns"
+ + " WHERE ts IS NOT NULL");
+ assertEquals(2L, total);
+ assertEquals(1L, nonNull);
+ assertEquals(TEST_EPOCH_SECONDS * NANOS_PER_SECOND + TEST_NANOS_OF_SECOND,
ticks);
+ }
+
@Test
public void testUserAgentInQueryLog() throws Exception {
Schema schema = Schema.of(Schema.Field.of("f0", FieldType.INT64));
diff --git
a/sdks/java/io/clickhouse/src/test/java/org/apache/beam/sdk/io/clickhouse/ClickHouseWriterTest.java
b/sdks/java/io/clickhouse/src/test/java/org/apache/beam/sdk/io/clickhouse/ClickHouseWriterTest.java
new file mode 100644
index 00000000000..89f5c8b7c85
--- /dev/null
+++
b/sdks/java/io/clickhouse/src/test/java/org/apache/beam/sdk/io/clickhouse/ClickHouseWriterTest.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.clickhouse;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThrows;
+
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Unit tests for {@link ClickHouseWriter}. */
+@RunWith(JUnit4.class)
+public class ClickHouseWriterTest {
+
+ private static final long MICROS_PER_SECOND = 1_000_000L;
+ private static final long NANOS_PER_SECOND = 1_000_000_000L;
+
+ // Shared test instant 2026-05-15T12:34:56Z; its .789012345 sub-second
component exercises
+ // every precision bucket.
+ private static final long TEST_EPOCH_SECONDS = 1_778_848_496L;
+ // Nano-of-second; the trailing 345 is not micro-aligned.
+ private static final long TEST_NANOS_OF_SECOND = 789_012_345L;
+ // The same sub-second component truncated to whole microseconds.
+ private static final long TEST_MICROS_OF_SECOND = 789_012L;
+ private static final long TEST_MICRO_ALIGNED_NANOS_OF_SECOND =
TEST_MICROS_OF_SECOND * 1_000L;
+
+ // Long.MAX_VALUE nanoseconds past the epoch:
2262-04-11T23:47:16.854775807Z, the last
+ // instant representable in DateTime64(9).
+ private static final long MAX_NANOS_EPOCH_SECONDS = 9_223_372_036L;
+ private static final long MAX_NANOS_NANO_OF_SECOND = 854_775_807L;
+
+ @Test
+ public void encodeDateTime64MillisFromJoda() {
+ DateTime jodaTs = new DateTime(2026, 5, 15, 12, 34, 56, 789,
DateTimeZone.UTC);
+ long expectedMillis = jodaTs.getMillis();
+ assertEquals(expectedMillis,
ClickHouseWriter.encodeDateTime64(jodaTs.toInstant(), 3));
+ }
+
+ @Test
+ public void encodeDateTime64MicrosFromJavaInstant() {
+ java.time.Instant ts =
+ java.time.Instant.ofEpochSecond(TEST_EPOCH_SECONDS,
TEST_MICRO_ALIGNED_NANOS_OF_SECOND);
+ long expectedMicros = TEST_EPOCH_SECONDS * MICROS_PER_SECOND +
TEST_MICROS_OF_SECOND;
+ assertEquals(expectedMicros, ClickHouseWriter.encodeDateTime64(ts, 6));
+ }
+
+ @Test
+ public void encodeDateTime64NanosFromJavaInstant() {
+ // The non-micro-aligned trailing 345 must survive the encoding.
+ java.time.Instant ts =
+ java.time.Instant.ofEpochSecond(TEST_EPOCH_SECONDS,
TEST_NANOS_OF_SECOND);
+ long expectedNanos = TEST_EPOCH_SECONDS * NANOS_PER_SECOND +
TEST_NANOS_OF_SECOND;
+ assertEquals(expectedNanos, ClickHouseWriter.encodeDateTime64(ts, 9));
+ }
+
+ @Test
+ public void encodeDateTime64Precision7TruncatesBelow100Nanos() {
+ // Precision 7 means 100 ns ticks: .789012345 becomes 7890123 ticks,
dropping the final 45.
+ java.time.Instant ts =
+ java.time.Instant.ofEpochSecond(TEST_EPOCH_SECONDS,
TEST_NANOS_OF_SECOND);
+ long expected = TEST_EPOCH_SECONDS * 10_000_000L + 7_890_123L;
+ assertEquals(expected, ClickHouseWriter.encodeDateTime64(ts, 7));
+ }
+
+ @Test
+ public void encodeDateTime64NanosTruncatesSubNanoFromJoda() {
+ // Joda only carries ms precision, so encoding into nanos shifts left by 6
with no loss.
+ DateTime jodaTs = new DateTime(2030, 1, 1, 0, 0, 0, 123, DateTimeZone.UTC);
+ long expected = jodaTs.getMillis() * 1_000_000L;
+ assertEquals(expected,
ClickHouseWriter.encodeDateTime64(jodaTs.toInstant(), 9));
+ }
+
+ @Test
+ public void encodeDateTime64HandlesNegativeMillisWithFloorDivision() {
+ // -1ms maps to (-1s, +999ms), encoded at precision 3 should be exactly -1.
+ org.joda.time.Instant jodaTs = new org.joda.time.Instant(-1L);
+ assertEquals(-1L, ClickHouseWriter.encodeDateTime64(jodaTs, 3));
+ }
+
+ @Test
+ public void encodeDateTime64ZeroPrecisionRoundsTowardEpochSeconds() {
+ java.time.Instant ts = java.time.Instant.ofEpochSecond(42L, 999_999_999L);
+ // Precision 0 means whole-second ticks; sub-second component is truncated.
+ assertEquals(42L, ClickHouseWriter.encodeDateTime64(ts, 0));
+ }
+
+ @Test
+ public void encodeDateTime64NanosMaxRepresentableInstant() {
+ java.time.Instant ts =
+ java.time.Instant.ofEpochSecond(MAX_NANOS_EPOCH_SECONDS,
MAX_NANOS_NANO_OF_SECOND);
+ assertEquals(Long.MAX_VALUE, ClickHouseWriter.encodeDateTime64(ts, 9));
+ }
+
+ @Test(expected = ArithmeticException.class)
+ public void encodeDateTime64NanosOverflowsPastYear2262() {
+ // Math.multiplyExact must fail loudly instead of silently wrapping around.
+ java.time.Instant ts =
java.time.Instant.ofEpochSecond(MAX_NANOS_EPOCH_SECONDS + 1, 0L);
+ ClickHouseWriter.encodeDateTime64(ts, 9);
+ }
+
+ @Test(expected = ArithmeticException.class)
+ public void encodeDateTime64NanosOverflowsOneNanoPastMax() {
+ // One nanosecond past the last representable tick overflows in
Math.addExact.
+ java.time.Instant ts =
+ java.time.Instant.ofEpochSecond(MAX_NANOS_EPOCH_SECONDS,
MAX_NANOS_NANO_OF_SECOND + 1);
+ ClickHouseWriter.encodeDateTime64(ts, 9);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void encodeDateTime64RejectsUnsupportedValue() {
+ ClickHouseWriter.encodeDateTime64("not-a-timestamp", 3);
+ }
+
+ @Test
+ public void encodeDateTime64RejectsNull() {
+ IllegalArgumentException e =
+ assertThrows(
+ IllegalArgumentException.class, () ->
ClickHouseWriter.encodeDateTime64(null, 3));
+ assertEquals(
+ "DateTime64 requires a Joda ReadableInstant or java.time.Instant, got
null",
+ e.getMessage());
+ }
+}
diff --git
a/sdks/java/io/clickhouse/src/test/java/org/apache/beam/sdk/io/clickhouse/TableSchemaTest.java
b/sdks/java/io/clickhouse/src/test/java/org/apache/beam/sdk/io/clickhouse/TableSchemaTest.java
index f560d6268af..2ce9c27d02b 100644
---
a/sdks/java/io/clickhouse/src/test/java/org/apache/beam/sdk/io/clickhouse/TableSchemaTest.java
+++
b/sdks/java/io/clickhouse/src/test/java/org/apache/beam/sdk/io/clickhouse/TableSchemaTest.java
@@ -18,6 +18,7 @@
package org.apache.beam.sdk.io.clickhouse;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThrows;
import java.util.HashMap;
import java.util.Map;
@@ -39,6 +40,60 @@ public class TableSchemaTest {
assertEquals(ColumnType.DATETIME, ColumnType.parse("DateTime"));
}
+ @Test
+ public void testParseDateTime64Millis() {
+ assertEquals(ColumnType.dateTime64(3), ColumnType.parse("DateTime64(3)"));
+ }
+
+ @Test
+ public void testParseDateTime64MicrosWithTimezone() {
+ // The timezone argument is display-only metadata; the parser accepts and
ignores it.
+ assertEquals(ColumnType.dateTime64(6), ColumnType.parse("DateTime64(6,
'UTC')"));
+ }
+
+ @Test
+ public void testParseBareDateTime64DefaultsToPrecision3() {
+ assertEquals(ColumnType.dateTime64(3), ColumnType.parse("DateTime64"));
+ }
+
+ @Test
+ public void testParseDateTime64Nanos() {
+ assertEquals(ColumnType.dateTime64(9), ColumnType.parse("DateTime64(9)"));
+ }
+
+ @Test
+ public void testParseNullableDateTime64() {
+ assertEquals(
+ ColumnType.dateTime64(6).withNullable(true),
ColumnType.parse("Nullable(DateTime64(6))"));
+ }
+
+ @Test
+ public void testParseArrayOfDateTime64() {
+ assertEquals(
+ ColumnType.array(ColumnType.dateTime64(3)),
ColumnType.parse("Array(DateTime64(3))"));
+ }
+
+ @Test
+ public void testParseDateTime64OutOfRangePrecisionFailsToParse() {
+ IllegalArgumentException e =
+ assertThrows(IllegalArgumentException.class, () ->
ColumnType.parse("DateTime64(10)"));
+ assertEquals("failed to parse", e.getMessage());
+ }
+
+ @Test
+ public void testParseDateTime64NegativePrecisionFailsToParse() {
+ IllegalArgumentException e =
+ assertThrows(IllegalArgumentException.class, () ->
ColumnType.parse("DateTime64(-1)"));
+ assertEquals("failed to parse", e.getMessage());
+ }
+
+ @Test
+ public void testParseDateTime64GarbagePrecisionFailsToParse() {
+ IllegalArgumentException e =
+ assertThrows(IllegalArgumentException.class, () ->
ColumnType.parse("DateTime64(abc)"));
+ assertEquals("failed to parse", e.getMessage());
+ }
+
@Test
public void testParseFloat32() {
assertEquals(ColumnType.FLOAT32, ColumnType.parse("Float32"));
@@ -198,6 +253,53 @@ public class TableSchemaTest {
assertEquals(expected, TableSchema.getEquivalentSchema(tableSchema));
}
+ @Test
+ public void testEquivalentSchemaDateTime64Millis() {
+ // Precision ≤ 3 keeps the legacy Joda-backed DATETIME so that existing
pipelines using
+ // millisecond timestamps continue to work without code changes.
+ TableSchema tableSchema = TableSchema.of(TableSchema.Column.of("ts",
ColumnType.dateTime64(3)));
+ Schema expected = Schema.of(Schema.Field.of("ts",
Schema.FieldType.DATETIME));
+ assertEquals(expected, TableSchema.getEquivalentSchema(tableSchema));
+ }
+
+ @Test
+ public void testEquivalentSchemaDateTime64Micros() {
+ // Precision 4–6 maps to SqlTypes.TIMESTAMP (MicrosInstant) —
interoperable with
+ // BigQueryIO and Beam SQL, sufficient for microsecond ticks.
+ TableSchema tableSchema = TableSchema.of(TableSchema.Column.of("ts",
ColumnType.dateTime64(6)));
+ Schema expected =
+ Schema.of(
+ Schema.Field.of(
+ "ts",
+ Schema.FieldType.logicalType(
+
org.apache.beam.sdk.schemas.logicaltypes.SqlTypes.TIMESTAMP)));
+ assertEquals(expected, TableSchema.getEquivalentSchema(tableSchema));
+ }
+
+ @Test
+ public void testEquivalentSchemaDateTime64Nanos() {
+ // Precision 7–9 needs nanosecond precision; MicrosInstant rejects
non-micro-aligned
+ // nanos, so the mapping must use NanosInstant.
+ TableSchema tableSchema = TableSchema.of(TableSchema.Column.of("ts",
ColumnType.dateTime64(9)));
+ Schema expected =
+ Schema.of(
+ Schema.Field.of(
+ "ts",
+ Schema.FieldType.logicalType(
+ new
org.apache.beam.sdk.schemas.logicaltypes.NanosInstant())));
+ assertEquals(expected, TableSchema.getEquivalentSchema(tableSchema));
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testDateTime64RejectsNegativePrecision() {
+ ColumnType.dateTime64(-1);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testDateTime64RejectsPrecisionAboveNine() {
+ ColumnType.dateTime64(10);
+ }
+
@Test
public void testParseTupleSingle() {
Map<String, ColumnType> m1 = new HashMap<>();