This is an automated email from the ASF dual-hosted git repository.

Abacn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 378e10de4ea ClickHouseIO: Add DateTime64 support for sub-second 
timestamp precision (#38510)
378e10de4ea is described below

commit 378e10de4eaa26d7a9eec3e13573a1ec794acaa3
Author: Elia Liu <[email protected]>
AuthorDate: Tue Jun 16 02:34:01 2026 +1000

    ClickHouseIO: Add DateTime64 support for sub-second timestamp precision 
(#38510)
    
    ClickHouseIO previously only recognized second-precision DateTime, so
    pipelines emitting sub-second timestamps could not write to DateTime64
    columns.
    
    This adds TypeName.DATETIME64 with a validated precision (0-9) and
    ColumnType.dateTime64(precision), plus a no-arg factory that defaults
    to precision 3. The parser accepts DateTime64[(precision[, 'timezone'])],
    including a bare DateTime64 and the type nested inside Nullable(...) and
    Array(...); the timezone argument is accepted but not stored.
    
    Beam field-type mapping is Joda DATETIME for precision up to 3,
    SqlTypes.TIMESTAMP for 4-6, and NanosInstant for 7 and above. The writer
    encodes the value as a little-endian int64 of
    epoch_seconds * 10^precision + sub_second_units, using floor division so
    negative timestamps match ClickHouse.
    
    Tests cover the parser, schema mapping, the encoder (Joda and java.time
    inputs, negative and overflow cases, null input), and ClickHouse
    testcontainer round-trips for precisions 3, 6 and 9 plus nullable cases.
    
    Closes #38466
---
 CHANGES.md                                         |   1 +
 .../beam/sdk/io/clickhouse/ClickHouseIO.java       |   3 +
 .../beam/sdk/io/clickhouse/ClickHouseWriter.java   |  40 ++++++
 .../apache/beam/sdk/io/clickhouse/TableSchema.java |  53 +++++++-
 .../clickhouse/src/main/javacc/ColumnTypeParser.jj |  27 ++++
 .../beam/sdk/io/clickhouse/ClickHouseIOIT.java     | 119 +++++++++++++++++
 .../sdk/io/clickhouse/ClickHouseWriterTest.java    | 141 +++++++++++++++++++++
 .../beam/sdk/io/clickhouse/TableSchemaTest.java    | 102 +++++++++++++++
 8 files changed, 485 insertions(+), 1 deletion(-)

diff --git a/CHANGES.md b/CHANGES.md
index c70fef0cf8e..eb24a71d0bf 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -67,6 +67,7 @@
 
 * Support for reading from Delta Lake added (Java) 
([#38551](https://github.com/apache/beam/issues/38551)).
 * Support for X source added (Java/Python) 
([#X](https://github.com/apache/beam/issues/X)).
+* ClickHouseIO: support writing `DateTime64(precision[, 'timezone'])` columns 
with sub-second precision (Java) 
([#38466](https://github.com/apache/beam/issues/38466)).
 
 ## New Features / Improvements
 
diff --git 
a/sdks/java/io/clickhouse/src/main/java/org/apache/beam/sdk/io/clickhouse/ClickHouseIO.java
 
b/sdks/java/io/clickhouse/src/main/java/org/apache/beam/sdk/io/clickhouse/ClickHouseIO.java
index a8875407b43..6798e2f2bd7 100644
--- 
a/sdks/java/io/clickhouse/src/main/java/org/apache/beam/sdk/io/clickhouse/ClickHouseIO.java
+++ 
b/sdks/java/io/clickhouse/src/main/java/org/apache/beam/sdk/io/clickhouse/ClickHouseIO.java
@@ -38,6 +38,8 @@ import org.apache.beam.sdk.metrics.Metrics;
 import org.apache.beam.sdk.schemas.FieldAccessDescriptor;
 import org.apache.beam.sdk.schemas.Schema;
 import org.apache.beam.sdk.schemas.logicaltypes.FixedBytes;
+import org.apache.beam.sdk.schemas.logicaltypes.NanosInstant;
+import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes;
 import org.apache.beam.sdk.schemas.transforms.Select;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.PTransform;
@@ -137,6 +139,7 @@ import org.slf4j.LoggerFactory;
  * <tr><td>{@link TableSchema.TypeName#UINT64}</td> <td>{@link 
Schema.TypeName#INT64}</td></tr>
  * <tr><td>{@link TableSchema.TypeName#DATE}</td> <td>{@link 
Schema.TypeName#DATETIME}</td></tr>
  * <tr><td>{@link TableSchema.TypeName#DATETIME}</td> <td>{@link 
Schema.TypeName#DATETIME}</td></tr>
+ * <tr><td>{@link TableSchema.TypeName#DATETIME64}</td> <td>{@link 
Schema.TypeName#DATETIME} (precision &le; 3), {@link SqlTypes#TIMESTAMP} 
(4&ndash;6), or {@link NanosInstant} (&ge; 7)</td></tr>
  * <tr><td>{@link TableSchema.TypeName#ARRAY}</td> <td>{@link 
Schema.TypeName#ARRAY}</td></tr>
  * <tr><td>{@link TableSchema.TypeName#ENUM8}</td> <td>{@link 
Schema.TypeName#STRING}</td></tr>
  * <tr><td>{@link TableSchema.TypeName#ENUM16}</td> <td>{@link 
Schema.TypeName#STRING}</td></tr>
diff --git 
a/sdks/java/io/clickhouse/src/main/java/org/apache/beam/sdk/io/clickhouse/ClickHouseWriter.java
 
b/sdks/java/io/clickhouse/src/main/java/org/apache/beam/sdk/io/clickhouse/ClickHouseWriter.java
index 73735f56864..4d9f072e598 100644
--- 
a/sdks/java/io/clickhouse/src/main/java/org/apache/beam/sdk/io/clickhouse/ClickHouseWriter.java
+++ 
b/sdks/java/io/clickhouse/src/main/java/org/apache/beam/sdk/io/clickhouse/ClickHouseWriter.java
@@ -39,6 +39,39 @@ import org.joda.time.ReadableInstant;
 public class ClickHouseWriter {
   private static final Instant EPOCH_INSTANT = new Instant(0L);
 
+  // 10^0 through 10^9 inclusive — precision is validated in [0, 9] by 
ColumnType.dateTime64.
+  private static final long[] POW10 = {
+    1L, 10L, 100L, 1_000L, 10_000L, 100_000L, 1_000_000L, 10_000_000L, 
100_000_000L, 1_000_000_000L
+  };
+
+  /**
+   * Encodes a timestamp into ClickHouse's {@code DateTime64(precision)} 
representation: a signed
+   * 64-bit integer counting ticks of size 10<sup>-precision</sup> seconds 
since the Unix epoch.
+   *
+   * <p>Accepts either a Joda {@link ReadableInstant} (millisecond precision) 
or a {@link
+   * java.time.Instant} (nanosecond precision). Sub-tick fractions are 
truncated toward negative
+   * infinity, matching ClickHouse's own encoding for negative timestamps.
+   */
+  static long encodeDateTime64(Object value, int precision) {
+    long epochSecond;
+    int nanoOfSecond;
+    if (value instanceof java.time.Instant) {
+      java.time.Instant inst = (java.time.Instant) value;
+      epochSecond = inst.getEpochSecond();
+      nanoOfSecond = inst.getNano();
+    } else if (value instanceof ReadableInstant) {
+      long millis = ((ReadableInstant) value).getMillis();
+      epochSecond = Math.floorDiv(millis, 1000L);
+      nanoOfSecond = (int) Math.floorMod(millis, 1000L) * 1_000_000;
+    } else {
+      throw new IllegalArgumentException(
+          "DateTime64 requires a Joda ReadableInstant or java.time.Instant, 
got "
+              + (value == null ? "null" : value.getClass().getName()));
+    }
+    long subSecondTicks = nanoOfSecond / POW10[9 - precision];
+    return Math.addExact(Math.multiplyExact(epochSecond, POW10[precision]), 
subSecondTicks);
+  }
+
   @SuppressWarnings("unchecked")
   static void writeNullableValue(ClickHouseOutputStream stream, ColumnType 
columnType, Object value)
       throws IOException {
@@ -138,6 +171,13 @@ public class ClickHouseWriter {
         BinaryStreamUtils.writeUnsignedInt32(stream, epochSeconds);
         break;
 
+      case DATETIME64:
+        int precision =
+            Preconditions.checkNotNull(
+                columnType.precision(), "DateTime64 column is missing 
precision");
+        BinaryStreamUtils.writeInt64(stream, encodeDateTime64(value, 
precision));
+        break;
+
       case ARRAY:
         List<Object> values = (List<Object>) value;
         BinaryStreamUtils.writeVarInt(stream, values.size());
diff --git 
a/sdks/java/io/clickhouse/src/main/java/org/apache/beam/sdk/io/clickhouse/TableSchema.java
 
b/sdks/java/io/clickhouse/src/main/java/org/apache/beam/sdk/io/clickhouse/TableSchema.java
index baee77c5f9a..1b9fdffd4c8 100644
--- 
a/sdks/java/io/clickhouse/src/main/java/org/apache/beam/sdk/io/clickhouse/TableSchema.java
+++ 
b/sdks/java/io/clickhouse/src/main/java/org/apache/beam/sdk/io/clickhouse/TableSchema.java
@@ -27,6 +27,8 @@ import java.util.Optional;
 import java.util.stream.Collectors;
 import org.apache.beam.sdk.schemas.Schema;
 import org.apache.beam.sdk.schemas.logicaltypes.FixedBytes;
+import org.apache.beam.sdk.schemas.logicaltypes.NanosInstant;
+import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes;
 import org.checkerframework.checker.nullness.qual.Nullable;
 
 /**
@@ -39,6 +41,9 @@ import org.checkerframework.checker.nullness.qual.Nullable;
 })
 public abstract class TableSchema implements Serializable {
 
+  private static final Schema.FieldType NANOS_INSTANT_TYPE =
+      Schema.FieldType.logicalType(new NanosInstant());
+
   public abstract List<Column> columns();
 
   public static TableSchema of(Column... columns) {
@@ -76,6 +81,22 @@ public abstract class TableSchema implements Serializable {
       case DATETIME:
         return Schema.FieldType.DATETIME;
 
+      case DATETIME64:
+        // Pick the narrowest Beam logical type that still round-trips the 
requested precision:
+        //   ≤ 3 (milliseconds)         → Joda DATETIME, keeping existing 
pipelines unchanged.
+        //   4–6 (down to microseconds) → SqlTypes.TIMESTAMP (MicrosInstant) — 
interoperable
+        //                                with BigQueryIO, Avro and Beam SQL.
+        //   ≥ 7 (sub-microsecond)      → NanosInstant, the only built-in type 
that preserves
+        //                                full nanosecond precision through 
Row construction.
+        int p = columnType.precision();
+        if (p <= 3) {
+          return Schema.FieldType.DATETIME;
+        } else if (p <= 6) {
+          return Schema.FieldType.logicalType(SqlTypes.TIMESTAMP);
+        } else {
+          return NANOS_INSTANT_TYPE;
+        }
+
       case STRING:
         return Schema.FieldType.STRING;
 
@@ -163,6 +184,7 @@ public abstract class TableSchema implements Serializable {
     // Primitive types
     DATE,
     DATETIME,
+    DATETIME64,
     ENUM8,
     ENUM16,
     FIXEDSTRING,
@@ -238,6 +260,9 @@ public abstract class TableSchema implements Serializable {
 
     public abstract @Nullable Map<String, ColumnType> tupleTypes();
 
+    /** Sub-second precision (0–9) of {@code DateTime64}. {@code null} for 
other types. */
+    public abstract @Nullable Integer precision();
+
     public ColumnType withNullable(boolean nullable) {
       return toBuilder().nullable(nullable).build();
     }
@@ -258,6 +283,26 @@ public abstract class TableSchema implements Serializable {
           .build();
     }
 
+    /** Default {@code DateTime64} precision in ClickHouse. */
+    public static final int DEFAULT_DATETIME64_PRECISION = 3;
+
+    /** Returns a {@code DateTime64} type with ClickHouse's default precision 
of 3. */
+    public static ColumnType dateTime64() {
+      return dateTime64(DEFAULT_DATETIME64_PRECISION);
+    }
+
+    public static ColumnType dateTime64(int precision) {
+      if (precision < 0 || precision > 9) {
+        throw new IllegalArgumentException(
+            "DateTime64 precision must be in [0, 9], got " + precision);
+      }
+      return ColumnType.builder()
+          .typeName(TypeName.DATETIME64)
+          .nullable(false)
+          .precision(precision)
+          .build();
+    }
+
     public static ColumnType enum8(Map<String, Integer> enumValues) {
       return ColumnType.builder()
           .typeName(TypeName.ENUM8)
@@ -296,13 +341,17 @@ public abstract class TableSchema implements Serializable 
{
      *
      * @param str string representation of ClickHouse type
      * @return type of ClickHouse column
+     * @throws IllegalArgumentException if {@code str} is not a valid 
ClickHouse column type
      */
     public static ColumnType parse(String str) {
       try {
         return new 
org.apache.beam.sdk.io.clickhouse.impl.parser.ColumnTypeParser(
                 new StringReader(str))
             .parse();
-      } catch (org.apache.beam.sdk.io.clickhouse.impl.parser.ParseException e) 
{
+      } catch (org.apache.beam.sdk.io.clickhouse.impl.parser.ParseException
+          | org.apache.beam.sdk.io.clickhouse.impl.parser.TokenMgrError
+          | IllegalArgumentException e) {
+        // Funnel lexical, syntactic and validation failures into one error 
surface.
         throw new IllegalArgumentException("failed to parse", e);
       }
     }
@@ -367,6 +416,8 @@ public abstract class TableSchema implements Serializable {
 
       public abstract Builder tupleTypes(Map<String, ColumnType> 
tupleElements);
 
+      public abstract Builder precision(@Nullable Integer precision);
+
       public abstract ColumnType build();
     }
   }
diff --git a/sdks/java/io/clickhouse/src/main/javacc/ColumnTypeParser.jj 
b/sdks/java/io/clickhouse/src/main/javacc/ColumnTypeParser.jj
index 5bb9ba4171a..53ad991b67c 100644
--- a/sdks/java/io/clickhouse/src/main/javacc/ColumnTypeParser.jj
+++ b/sdks/java/io/clickhouse/src/main/javacc/ColumnTypeParser.jj
@@ -79,6 +79,7 @@ TOKEN :
 {
     < ARRAY          : "ARRAY" >
   | < DATE           : "DATE" >
+  | < DATETIME64     : "DATETIME64" >
   | < DATETIME       : "DATETIME" >
   | < ENUM8          : "ENUM8" >
   | < ENUM16         : "ENUM16" >
@@ -206,6 +207,7 @@ private ColumnType primitive() :
 {
     TypeName type;
     String size;
+    ColumnType ct;
 }
 {
     (
@@ -214,10 +216,35 @@ private ColumnType primitive() :
       (<FIXEDSTRING> <LPAREN> ( size = integer() ) <RPAREN>) {
         return ColumnType.fixedString(Integer.valueOf(size));
       }
+    |
+      (ct = dateTime64()) { return ct; }
     )
 
 }
 
+private ColumnType dateTime64() :
+{
+    String precision = null;
+}
+{
+    (
+        <DATETIME64>
+        (
+            <LPAREN> ( precision = integer() )
+            // The timezone is display-only metadata; accept the syntax and 
ignore the value.
+            ( <COMMA> string() )?
+            <RPAREN>
+        )?
+    )
+    {
+        // Bare DateTime64 defaults to precision 3, matching ClickHouse.
+        if (precision == null) {
+            return ColumnType.dateTime64();
+        }
+        return ColumnType.dateTime64(Integer.parseInt(precision));
+    }
+}
+
 private ColumnType nullable() :
 {
     ColumnType ct;
diff --git 
a/sdks/java/io/clickhouse/src/test/java/org/apache/beam/sdk/io/clickhouse/ClickHouseIOIT.java
 
b/sdks/java/io/clickhouse/src/test/java/org/apache/beam/sdk/io/clickhouse/ClickHouseIOIT.java
index 8ce412c5f88..da435f20684 100644
--- 
a/sdks/java/io/clickhouse/src/test/java/org/apache/beam/sdk/io/clickhouse/ClickHouseIOIT.java
+++ 
b/sdks/java/io/clickhouse/src/test/java/org/apache/beam/sdk/io/clickhouse/ClickHouseIOIT.java
@@ -32,6 +32,8 @@ import org.apache.beam.sdk.schemas.Schema;
 import org.apache.beam.sdk.schemas.Schema.FieldType;
 import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
 import org.apache.beam.sdk.schemas.logicaltypes.FixedBytes;
+import org.apache.beam.sdk.schemas.logicaltypes.NanosInstant;
+import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.util.ReleaseInfo;
@@ -49,6 +51,18 @@ import org.junit.runners.JUnit4;
 @RunWith(JUnit4.class)
 public class ClickHouseIOIT extends BaseClickHouseTest {
 
+  private static final long MICROS_PER_SECOND = 1_000_000L;
+  private static final long NANOS_PER_SECOND = 1_000_000_000L;
+
+  // Shared DateTime64 test instant 2026-05-15T12:34:56Z; its .789012345 
sub-second component
+  // exercises every precision bucket.
+  private static final long TEST_EPOCH_SECONDS = 1_778_848_496L;
+  // Nano-of-second; the trailing 345 is not micro-aligned.
+  private static final long TEST_NANOS_OF_SECOND = 789_012_345L;
+  // The same sub-second component truncated to whole microseconds.
+  private static final long TEST_MICROS_OF_SECOND = 789_012L;
+  private static final long TEST_MICRO_ALIGNED_NANOS_OF_SECOND = 
TEST_MICROS_OF_SECOND * 1_000L;
+
   @Rule public TestPipeline pipeline = TestPipeline.create();
 
   @Test
@@ -480,6 +494,111 @@ public class ClickHouseIOIT extends BaseClickHouseTest {
     assertEquals(12L, sum1);
   }
 
+  @Test
+  public void testDateTime64Millis() throws Exception {
+    Schema schema = Schema.of(Schema.Field.of("ts", FieldType.DATETIME));
+    DateTime ts = new DateTime(2026, 5, 15, 12, 34, 56, 789, DateTimeZone.UTC);
+    Row row = Row.withSchema(schema).addValue(ts).build();
+
+    executeSql("CREATE TABLE test_datetime64_ms (ts DateTime64(3, 'UTC')) 
ENGINE=Log");
+
+    
pipeline.apply(Create.of(row).withRowSchema(schema)).apply(write("test_datetime64_ms"));
+    pipeline.run().waitUntilFinish();
+
+    // toUnixTimestamp64Milli returns the underlying tick count, which is the 
most stable thing to
+    // assert across CH versions (string formatting may include trailing zeros 
depending on
+    // version).
+    long ticks = executeQueryAsLong("SELECT toUnixTimestamp64Milli(ts) FROM 
test_datetime64_ms");
+    assertEquals(ts.getMillis(), ticks);
+  }
+
+  @Test
+  public void testDateTime64Micros() throws Exception {
+    Schema schema = Schema.of(Schema.Field.of("ts", 
FieldType.logicalType(SqlTypes.TIMESTAMP)));
+    // Micro-aligned nanos, so MicrosInstant accepts the value.
+    java.time.Instant ts =
+        java.time.Instant.ofEpochSecond(TEST_EPOCH_SECONDS, 
TEST_MICRO_ALIGNED_NANOS_OF_SECOND);
+    Row row = Row.withSchema(schema).addValue(ts).build();
+
+    executeSql("CREATE TABLE test_datetime64_us (ts DateTime64(6)) 
ENGINE=Log");
+
+    
pipeline.apply(Create.of(row).withRowSchema(schema)).apply(write("test_datetime64_us"));
+    pipeline.run().waitUntilFinish();
+
+    long ticks = executeQueryAsLong("SELECT toUnixTimestamp64Micro(ts) FROM 
test_datetime64_us");
+    assertEquals(TEST_EPOCH_SECONDS * MICROS_PER_SECOND + 
TEST_MICROS_OF_SECOND, ticks);
+  }
+
+  @Test
+  public void testDateTime64Nanos() throws Exception {
+    // DateTime64(9) must preserve full nanosecond precision. Use NanosInstant 
directly
+    // because SqlTypes.TIMESTAMP (MicrosInstant) rejects non-micro-aligned 
nanos like the
+    // trailing 345.
+    Schema schema = Schema.of(Schema.Field.of("ts", FieldType.logicalType(new 
NanosInstant())));
+    java.time.Instant ts =
+        java.time.Instant.ofEpochSecond(TEST_EPOCH_SECONDS, 
TEST_NANOS_OF_SECOND);
+    Row row = Row.withSchema(schema).addValue(ts).build();
+
+    executeSql("CREATE TABLE test_datetime64_ns (ts DateTime64(9)) 
ENGINE=Log");
+
+    
pipeline.apply(Create.of(row).withRowSchema(schema)).apply(write("test_datetime64_ns"));
+    pipeline.run().waitUntilFinish();
+
+    long ticks = executeQueryAsLong("SELECT toUnixTimestamp64Nano(ts) FROM 
test_datetime64_ns");
+    assertEquals(TEST_EPOCH_SECONDS * NANOS_PER_SECOND + TEST_NANOS_OF_SECOND, 
ticks);
+  }
+
+  @Test
+  public void testNullableDateTime64() throws Exception {
+    Schema schema =
+        Schema.of(Schema.Field.nullable("ts", 
FieldType.logicalType(SqlTypes.TIMESTAMP)));
+    java.time.Instant ts =
+        java.time.Instant.ofEpochSecond(TEST_EPOCH_SECONDS, 
TEST_MICRO_ALIGNED_NANOS_OF_SECOND);
+    Row row1 = Row.withSchema(schema).addValue(ts).build();
+    Row row2 = Row.withSchema(schema).addValue(null).build();
+
+    executeSql("CREATE TABLE test_nullable_datetime64 (ts 
Nullable(DateTime64(6))) ENGINE=Log");
+
+    pipeline
+        .apply(Create.of(row1, row2).withRowSchema(schema))
+        .apply(write("test_nullable_datetime64"));
+    pipeline.run().waitUntilFinish();
+
+    long total = executeQueryAsLong("SELECT COUNT(*) FROM 
test_nullable_datetime64");
+    long nonNull = executeQueryAsLong("SELECT COUNT(ts) FROM 
test_nullable_datetime64");
+    assertEquals(2L, total);
+    assertEquals(1L, nonNull);
+  }
+
+  @Test
+  public void testNullableDateTime64Nanos() throws Exception {
+    // Nullable columns take the writeNullableValue path; verify it preserves 
full nanosecond
+    // precision alongside an actual null.
+    Schema schema =
+        Schema.of(Schema.Field.nullable("ts", FieldType.logicalType(new 
NanosInstant())));
+    java.time.Instant ts =
+        java.time.Instant.ofEpochSecond(TEST_EPOCH_SECONDS, 
TEST_NANOS_OF_SECOND);
+    Row row1 = Row.withSchema(schema).addValue(ts).build();
+    Row row2 = Row.withSchema(schema).addValue(null).build();
+
+    executeSql("CREATE TABLE test_nullable_datetime64_ns (ts 
Nullable(DateTime64(9))) ENGINE=Log");
+
+    pipeline
+        .apply(Create.of(row1, row2).withRowSchema(schema))
+        .apply(write("test_nullable_datetime64_ns"));
+    pipeline.run().waitUntilFinish();
+
+    long total = executeQueryAsLong("SELECT COUNT(*) FROM 
test_nullable_datetime64_ns");
+    long nonNull = executeQueryAsLong("SELECT COUNT(ts) FROM 
test_nullable_datetime64_ns");
+    long ticks =
+        executeQueryAsLong(
+            "SELECT toUnixTimestamp64Nano(ts) FROM test_nullable_datetime64_ns"
+                + " WHERE ts IS NOT NULL");
+    assertEquals(2L, total);
+    assertEquals(1L, nonNull);
+    assertEquals(TEST_EPOCH_SECONDS * NANOS_PER_SECOND + TEST_NANOS_OF_SECOND, 
ticks);
+  }
+
   @Test
   public void testUserAgentInQueryLog() throws Exception {
     Schema schema = Schema.of(Schema.Field.of("f0", FieldType.INT64));
diff --git 
a/sdks/java/io/clickhouse/src/test/java/org/apache/beam/sdk/io/clickhouse/ClickHouseWriterTest.java
 
b/sdks/java/io/clickhouse/src/test/java/org/apache/beam/sdk/io/clickhouse/ClickHouseWriterTest.java
new file mode 100644
index 00000000000..89f5c8b7c85
--- /dev/null
+++ 
b/sdks/java/io/clickhouse/src/test/java/org/apache/beam/sdk/io/clickhouse/ClickHouseWriterTest.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.clickhouse;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThrows;
+
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Unit tests for {@link ClickHouseWriter}. */
+@RunWith(JUnit4.class)
+public class ClickHouseWriterTest {
+
+  private static final long MICROS_PER_SECOND = 1_000_000L;
+  private static final long NANOS_PER_SECOND = 1_000_000_000L;
+
+  // Shared test instant 2026-05-15T12:34:56Z; its .789012345 sub-second 
component exercises
+  // every precision bucket.
+  private static final long TEST_EPOCH_SECONDS = 1_778_848_496L;
+  // Nano-of-second; the trailing 345 is not micro-aligned.
+  private static final long TEST_NANOS_OF_SECOND = 789_012_345L;
+  // The same sub-second component truncated to whole microseconds.
+  private static final long TEST_MICROS_OF_SECOND = 789_012L;
+  private static final long TEST_MICRO_ALIGNED_NANOS_OF_SECOND = 
TEST_MICROS_OF_SECOND * 1_000L;
+
+  // Long.MAX_VALUE nanoseconds past the epoch: 
2262-04-11T23:47:16.854775807Z, the last
+  // instant representable in DateTime64(9).
+  private static final long MAX_NANOS_EPOCH_SECONDS = 9_223_372_036L;
+  private static final long MAX_NANOS_NANO_OF_SECOND = 854_775_807L;
+
+  @Test
+  public void encodeDateTime64MillisFromJoda() {
+    DateTime jodaTs = new DateTime(2026, 5, 15, 12, 34, 56, 789, 
DateTimeZone.UTC);
+    long expectedMillis = jodaTs.getMillis();
+    assertEquals(expectedMillis, 
ClickHouseWriter.encodeDateTime64(jodaTs.toInstant(), 3));
+  }
+
+  @Test
+  public void encodeDateTime64MicrosFromJavaInstant() {
+    java.time.Instant ts =
+        java.time.Instant.ofEpochSecond(TEST_EPOCH_SECONDS, 
TEST_MICRO_ALIGNED_NANOS_OF_SECOND);
+    long expectedMicros = TEST_EPOCH_SECONDS * MICROS_PER_SECOND + 
TEST_MICROS_OF_SECOND;
+    assertEquals(expectedMicros, ClickHouseWriter.encodeDateTime64(ts, 6));
+  }
+
+  @Test
+  public void encodeDateTime64NanosFromJavaInstant() {
+    // The non-micro-aligned trailing 345 must survive the encoding.
+    java.time.Instant ts =
+        java.time.Instant.ofEpochSecond(TEST_EPOCH_SECONDS, 
TEST_NANOS_OF_SECOND);
+    long expectedNanos = TEST_EPOCH_SECONDS * NANOS_PER_SECOND + 
TEST_NANOS_OF_SECOND;
+    assertEquals(expectedNanos, ClickHouseWriter.encodeDateTime64(ts, 9));
+  }
+
+  @Test
+  public void encodeDateTime64Precision7TruncatesBelow100Nanos() {
+    // Precision 7 means 100 ns ticks: .789012345 becomes 7890123 ticks, 
dropping the final 45.
+    java.time.Instant ts =
+        java.time.Instant.ofEpochSecond(TEST_EPOCH_SECONDS, 
TEST_NANOS_OF_SECOND);
+    long expected = TEST_EPOCH_SECONDS * 10_000_000L + 7_890_123L;
+    assertEquals(expected, ClickHouseWriter.encodeDateTime64(ts, 7));
+  }
+
+  @Test
+  public void encodeDateTime64NanosTruncatesSubNanoFromJoda() {
+    // Joda only carries ms precision, so encoding into nanos shifts left by 6 
with no loss.
+    DateTime jodaTs = new DateTime(2030, 1, 1, 0, 0, 0, 123, DateTimeZone.UTC);
+    long expected = jodaTs.getMillis() * 1_000_000L;
+    assertEquals(expected, 
ClickHouseWriter.encodeDateTime64(jodaTs.toInstant(), 9));
+  }
+
+  @Test
+  public void encodeDateTime64HandlesNegativeMillisWithFloorDivision() {
+    // -1ms maps to (-1s, +999ms), encoded at precision 3 should be exactly -1.
+    org.joda.time.Instant jodaTs = new org.joda.time.Instant(-1L);
+    assertEquals(-1L, ClickHouseWriter.encodeDateTime64(jodaTs, 3));
+  }
+
+  @Test
+  public void encodeDateTime64ZeroPrecisionRoundsTowardEpochSeconds() {
+    java.time.Instant ts = java.time.Instant.ofEpochSecond(42L, 999_999_999L);
+    // Precision 0 means whole-second ticks; sub-second component is truncated.
+    assertEquals(42L, ClickHouseWriter.encodeDateTime64(ts, 0));
+  }
+
+  @Test
+  public void encodeDateTime64NanosMaxRepresentableInstant() {
+    java.time.Instant ts =
+        java.time.Instant.ofEpochSecond(MAX_NANOS_EPOCH_SECONDS, 
MAX_NANOS_NANO_OF_SECOND);
+    assertEquals(Long.MAX_VALUE, ClickHouseWriter.encodeDateTime64(ts, 9));
+  }
+
+  @Test(expected = ArithmeticException.class)
+  public void encodeDateTime64NanosOverflowsPastYear2262() {
+    // Math.multiplyExact must fail loudly instead of silently wrapping around.
+    java.time.Instant ts = 
java.time.Instant.ofEpochSecond(MAX_NANOS_EPOCH_SECONDS + 1, 0L);
+    ClickHouseWriter.encodeDateTime64(ts, 9);
+  }
+
+  @Test(expected = ArithmeticException.class)
+  public void encodeDateTime64NanosOverflowsOneNanoPastMax() {
+    // One nanosecond past the last representable tick overflows in 
Math.addExact.
+    java.time.Instant ts =
+        java.time.Instant.ofEpochSecond(MAX_NANOS_EPOCH_SECONDS, 
MAX_NANOS_NANO_OF_SECOND + 1);
+    ClickHouseWriter.encodeDateTime64(ts, 9);
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void encodeDateTime64RejectsUnsupportedValue() {
+    ClickHouseWriter.encodeDateTime64("not-a-timestamp", 3);
+  }
+
+  @Test
+  public void encodeDateTime64RejectsNull() {
+    IllegalArgumentException e =
+        assertThrows(
+            IllegalArgumentException.class, () -> 
ClickHouseWriter.encodeDateTime64(null, 3));
+    assertEquals(
+        "DateTime64 requires a Joda ReadableInstant or java.time.Instant, got 
null",
+        e.getMessage());
+  }
+}
diff --git 
a/sdks/java/io/clickhouse/src/test/java/org/apache/beam/sdk/io/clickhouse/TableSchemaTest.java
 
b/sdks/java/io/clickhouse/src/test/java/org/apache/beam/sdk/io/clickhouse/TableSchemaTest.java
index f560d6268af..2ce9c27d02b 100644
--- 
a/sdks/java/io/clickhouse/src/test/java/org/apache/beam/sdk/io/clickhouse/TableSchemaTest.java
+++ 
b/sdks/java/io/clickhouse/src/test/java/org/apache/beam/sdk/io/clickhouse/TableSchemaTest.java
@@ -18,6 +18,7 @@
 package org.apache.beam.sdk.io.clickhouse;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThrows;
 
 import java.util.HashMap;
 import java.util.Map;
@@ -39,6 +40,60 @@ public class TableSchemaTest {
     assertEquals(ColumnType.DATETIME, ColumnType.parse("DateTime"));
   }
 
+  @Test
+  public void testParseDateTime64Millis() {
+    assertEquals(ColumnType.dateTime64(3), ColumnType.parse("DateTime64(3)"));
+  }
+
+  @Test
+  public void testParseDateTime64MicrosWithTimezone() {
+    // The timezone argument is display-only metadata; the parser accepts and 
ignores it.
+    assertEquals(ColumnType.dateTime64(6), ColumnType.parse("DateTime64(6, 
'UTC')"));
+  }
+
+  @Test
+  public void testParseBareDateTime64DefaultsToPrecision3() {
+    assertEquals(ColumnType.dateTime64(3), ColumnType.parse("DateTime64"));
+  }
+
+  @Test
+  public void testParseDateTime64Nanos() {
+    assertEquals(ColumnType.dateTime64(9), ColumnType.parse("DateTime64(9)"));
+  }
+
+  @Test
+  public void testParseNullableDateTime64() {
+    assertEquals(
+        ColumnType.dateTime64(6).withNullable(true), 
ColumnType.parse("Nullable(DateTime64(6))"));
+  }
+
+  @Test
+  public void testParseArrayOfDateTime64() {
+    assertEquals(
+        ColumnType.array(ColumnType.dateTime64(3)), 
ColumnType.parse("Array(DateTime64(3))"));
+  }
+
+  @Test
+  public void testParseDateTime64OutOfRangePrecisionFailsToParse() {
+    IllegalArgumentException e =
+        assertThrows(IllegalArgumentException.class, () -> 
ColumnType.parse("DateTime64(10)"));
+    assertEquals("failed to parse", e.getMessage());
+  }
+
+  @Test
+  public void testParseDateTime64NegativePrecisionFailsToParse() {
+    IllegalArgumentException e =
+        assertThrows(IllegalArgumentException.class, () -> 
ColumnType.parse("DateTime64(-1)"));
+    assertEquals("failed to parse", e.getMessage());
+  }
+
+  @Test
+  public void testParseDateTime64GarbagePrecisionFailsToParse() {
+    IllegalArgumentException e =
+        assertThrows(IllegalArgumentException.class, () -> 
ColumnType.parse("DateTime64(abc)"));
+    assertEquals("failed to parse", e.getMessage());
+  }
+
   @Test
   public void testParseFloat32() {
     assertEquals(ColumnType.FLOAT32, ColumnType.parse("Float32"));
@@ -198,6 +253,53 @@ public class TableSchemaTest {
     assertEquals(expected, TableSchema.getEquivalentSchema(tableSchema));
   }
 
+  @Test
+  public void testEquivalentSchemaDateTime64Millis() {
+    // Precision ≤ 3 keeps the legacy Joda-backed DATETIME so that existing 
pipelines using
+    // millisecond timestamps continue to work without code changes.
+    TableSchema tableSchema = TableSchema.of(TableSchema.Column.of("ts", 
ColumnType.dateTime64(3)));
+    Schema expected = Schema.of(Schema.Field.of("ts", 
Schema.FieldType.DATETIME));
+    assertEquals(expected, TableSchema.getEquivalentSchema(tableSchema));
+  }
+
+  @Test
+  public void testEquivalentSchemaDateTime64Micros() {
+    // Precision 4–6 maps to SqlTypes.TIMESTAMP (MicrosInstant) — 
interoperable with
+    // BigQueryIO and Beam SQL, sufficient for microsecond ticks.
+    TableSchema tableSchema = TableSchema.of(TableSchema.Column.of("ts", 
ColumnType.dateTime64(6)));
+    Schema expected =
+        Schema.of(
+            Schema.Field.of(
+                "ts",
+                Schema.FieldType.logicalType(
+                    
org.apache.beam.sdk.schemas.logicaltypes.SqlTypes.TIMESTAMP)));
+    assertEquals(expected, TableSchema.getEquivalentSchema(tableSchema));
+  }
+
+  @Test
+  public void testEquivalentSchemaDateTime64Nanos() {
+    // Precision 7–9 needs nanosecond precision; MicrosInstant rejects 
non-micro-aligned
+    // nanos, so the mapping must use NanosInstant.
+    TableSchema tableSchema = TableSchema.of(TableSchema.Column.of("ts", 
ColumnType.dateTime64(9)));
+    Schema expected =
+        Schema.of(
+            Schema.Field.of(
+                "ts",
+                Schema.FieldType.logicalType(
+                    new 
org.apache.beam.sdk.schemas.logicaltypes.NanosInstant())));
+    assertEquals(expected, TableSchema.getEquivalentSchema(tableSchema));
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testDateTime64RejectsNegativePrecision() {
+    ColumnType.dateTime64(-1);
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testDateTime64RejectsPrecisionAboveNine() {
+    ColumnType.dateTime64(10);
+  }
+
   @Test
   public void testParseTupleSingle() {
     Map<String, ColumnType> m1 = new HashMap<>();

Reply via email to