This is an automated email from the ASF dual-hosted git repository.

kevinjqliu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new 38c78c4497 Flink: Nanosecond gaps in SortKeySerializer and 
ColumnStatsWatermarkExtractor for v2.0 (#16322)
38c78c4497 is described below

commit 38c78c44973805ec15a792dafbe337452df607ce
Author: Talat UYARER <[email protected]>
AuthorDate: Wed May 13 12:19:31 2026 -0700

    Flink: Nanosecond gaps in SortKeySerializer and 
ColumnStatsWatermarkExtractor for v2.0 (#16322)
---
 .../flink/sink/shuffle/SortKeySerializer.java      |  2 +
 .../reader/ColumnStatsWatermarkExtractor.java      | 19 +++++++--
 .../flink/sink/TestRowDataPartitionKey.java        | 49 +++++++++++++++++++++-
 .../shuffle/TestSortKeySerializerPrimitives.java   |  3 ++
 .../reader/TestColumnStatsWatermarkExtractor.java  | 19 ++++++++-
 5 files changed, 86 insertions(+), 6 deletions(-)

diff --git 
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/SortKeySerializer.java
 
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/SortKeySerializer.java
index 6f5bb67227..ec099fbfe8 100644
--- 
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/SortKeySerializer.java
+++ 
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/SortKeySerializer.java
@@ -159,6 +159,7 @@ class SortKeySerializer extends TypeSerializer<SortKey> {
         case LONG:
         case TIME:
         case TIMESTAMP:
+        case TIMESTAMP_NANO:
           target.writeLong(record.get(i, Long.class));
           break;
         case FLOAT:
@@ -237,6 +238,7 @@ class SortKeySerializer extends TypeSerializer<SortKey> {
         case LONG:
         case TIME:
         case TIMESTAMP:
+        case TIMESTAMP_NANO:
           reuse.set(i, source.readLong());
           break;
         case FLOAT:
diff --git 
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/source/reader/ColumnStatsWatermarkExtractor.java
 
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/source/reader/ColumnStatsWatermarkExtractor.java
index 4bb6f0a98c..34de689c6c 100644
--- 
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/source/reader/ColumnStatsWatermarkExtractor.java
+++ 
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/source/reader/ColumnStatsWatermarkExtractor.java
@@ -53,13 +53,24 @@ public class ColumnStatsWatermarkExtractor implements 
SplitWatermarkExtractor, S
     Types.NestedField field = schema.findField(eventTimeFieldName);
     TypeID typeID = field.type().typeId();
     Preconditions.checkArgument(
-        typeID.equals(TypeID.LONG) || typeID.equals(TypeID.TIMESTAMP),
-        "Found %s, expected a LONG or TIMESTAMP column for watermark 
generation.",
+        typeID.equals(TypeID.LONG)
+            || typeID.equals(TypeID.TIMESTAMP)
+            || typeID.equals(TypeID.TIMESTAMP_NANO),
+        "Found %s, expected a LONG, TIMESTAMP, or TIMESTAMP_NANO column for 
watermark generation.",
         typeID);
     this.eventTimeFieldId = field.fieldId();
     this.eventTimeFieldName = eventTimeFieldName;
-    // Use the timeUnit only for Long columns.
-    this.timeUnit = typeID.equals(TypeID.LONG) ? timeUnit : 
TimeUnit.MICROSECONDS;
+    // Use the timeUnit only for Long columns; timestamp columns store 
fixed-precision longs.
+    switch (typeID) {
+      case LONG:
+        this.timeUnit = timeUnit;
+        break;
+      case TIMESTAMP_NANO:
+        this.timeUnit = TimeUnit.NANOSECONDS;
+        break;
+      default:
+        this.timeUnit = TimeUnit.MICROSECONDS;
+    }
   }
 
   @VisibleForTesting
diff --git 
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/TestRowDataPartitionKey.java
 
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/TestRowDataPartitionKey.java
index 919fef579a..2e7e6db176 100644
--- 
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/TestRowDataPartitionKey.java
+++ 
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/TestRowDataPartitionKey.java
@@ -57,7 +57,11 @@ public class TestRowDataPartitionKey {
           Types.NestedField.required(12, "decimalType2", 
Types.DecimalType.of(10, 5)),
           Types.NestedField.required(13, "decimalType3", 
Types.DecimalType.of(38, 19)),
           Types.NestedField.required(14, "floatType", Types.FloatType.get()),
-          Types.NestedField.required(15, "doubleType", 
Types.DoubleType.get()));
+          Types.NestedField.required(15, "doubleType", Types.DoubleType.get()),
+          Types.NestedField.required(
+              16, "timestampNanoWithoutZone", 
Types.TimestampNanoType.withoutZone()),
+          Types.NestedField.required(
+              17, "timestampNanoWithZone", 
Types.TimestampNanoType.withZone()));
 
   private static final List<String> SUPPORTED_PRIMITIVES =
       
SCHEMA.asStruct().fields().stream().map(Types.NestedField::name).collect(Collectors.toList());
@@ -248,4 +252,47 @@ public class TestRowDataPartitionKey {
       }
     }
   }
+
+  @Test
+  public void testTimestampNanoPartitionTransforms() {
+    RowType rowType = FlinkSchemaUtil.convert(SCHEMA);
+    RowDataWrapper rowWrapper = new RowDataWrapper(rowType, SCHEMA.asStruct());
+    InternalRecordWrapper recordWrapper = new 
InternalRecordWrapper(SCHEMA.asStruct());
+
+    List<Record> records = RandomGenericData.generate(SCHEMA, 10, 1995);
+    List<RowData> rows = Lists.newArrayList(RandomRowData.convert(SCHEMA, 
records));
+
+    String[] columns = {"timestampNanoWithoutZone", "timestampNanoWithZone"};
+    for (String column : columns) {
+      List<PartitionSpec> specs =
+          Lists.newArrayList(
+              PartitionSpec.builderFor(SCHEMA).identity(column).build(),
+              PartitionSpec.builderFor(SCHEMA).year(column).build(),
+              PartitionSpec.builderFor(SCHEMA).month(column).build(),
+              PartitionSpec.builderFor(SCHEMA).day(column).build(),
+              PartitionSpec.builderFor(SCHEMA).hour(column).build(),
+              PartitionSpec.builderFor(SCHEMA).bucket(column, 16).build());
+
+      for (PartitionSpec spec : specs) {
+        Class<?>[] javaClasses = spec.javaClasses();
+        PartitionKey pk = new PartitionKey(spec, SCHEMA);
+        PartitionKey expectedPK = new PartitionKey(spec, SCHEMA);
+
+        for (int j = 0; j < rows.size(); j++) {
+          pk.partition(rowWrapper.wrap(rows.get(j)));
+          expectedPK.partition(recordWrapper.wrap(records.get(j)));
+
+          assertThat(pk.size()).isEqualTo(1);
+          assertThat(pk.get(0, javaClasses[0]))
+              .as(
+                  "Partition with column "
+                      + column
+                      + " and spec "
+                      + spec
+                      + " should match Iceberg-side computation")
+              .isEqualTo(expectedPK.get(0, javaClasses[0]));
+        }
+      }
+    }
+  }
 }
diff --git 
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/shuffle/TestSortKeySerializerPrimitives.java
 
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/shuffle/TestSortKeySerializerPrimitives.java
index ac2e2784e6..44791094de 100644
--- 
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/shuffle/TestSortKeySerializerPrimitives.java
+++ 
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/shuffle/TestSortKeySerializerPrimitives.java
@@ -54,6 +54,9 @@ public class TestSortKeySerializerPrimitives extends 
TestSortKeySerializerBase {
         .sortBy(Expressions.bucket("uuid_field", 16), SortDirection.ASC, 
NullOrder.NULLS_FIRST)
         .sortBy(Expressions.hour("ts_with_zone_field"), SortDirection.ASC, 
NullOrder.NULLS_FIRST)
         .sortBy(Expressions.day("ts_without_zone_field"), SortDirection.ASC, 
NullOrder.NULLS_FIRST)
+        .asc("ts_ns_with_zone_field")
+        .sortBy(
+            Expressions.hour("ts_ns_without_zone_field"), SortDirection.ASC, 
NullOrder.NULLS_FIRST)
         // can not test HeapByteBuffer due to equality test inside 
SerializerTestBase
         // .sortBy(Expressions.truncate("binary_field", 2), SortDirection.ASC,
         // NullOrder.NULLS_FIRST)
diff --git 
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestColumnStatsWatermarkExtractor.java
 
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestColumnStatsWatermarkExtractor.java
index 761e562227..191dfe088c 100644
--- 
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestColumnStatsWatermarkExtractor.java
+++ 
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/source/reader/TestColumnStatsWatermarkExtractor.java
@@ -61,6 +61,13 @@ public class TestColumnStatsWatermarkExtractor {
           required(3, "long_column", Types.LongType.get()),
           required(4, "string_column", Types.StringType.get()));
 
+  // Separate schema for nanosecond columns: TIMESTAMP_NANO requires table 
format v3, which the
+  // HadoopTableExtension above does not provision. Tested via constructor 
preconditions only.
+  private static final Schema NANO_SCHEMA =
+      new Schema(
+          required(1, "timestamp_ns_column", 
Types.TimestampNanoType.withoutZone()),
+          required(2, "timestamptz_ns_column", 
Types.TimestampNanoType.withZone()));
+
   private static final List<List<Record>> TEST_RECORDS =
       ImmutableList.of(
           RandomGenericData.generate(SCHEMA, 3, 2L), 
RandomGenericData.generate(SCHEMA, 3, 19L));
@@ -147,7 +154,17 @@ public class TestColumnStatsWatermarkExtractor {
     assertThatThrownBy(() -> new ColumnStatsWatermarkExtractor(SCHEMA, 
columnName, null))
         .isInstanceOf(IllegalArgumentException.class)
         .hasMessageContaining(
-            "Found STRING, expected a LONG or TIMESTAMP column for watermark 
generation.");
+            "Found STRING, expected a LONG, TIMESTAMP, or TIMESTAMP_NANO 
column for watermark generation.");
+  }
+
+  @TestTemplate
+  public void testTimestampNanoAccepted() {
+    // Run the precondition check exactly once across the parameterized matrix.
+    assumeThat(columnName).isEqualTo("timestamp_column");
+
+    // Both flavours of TIMESTAMP_NANO must be accepted by the extractor's 
precondition check.
+    new ColumnStatsWatermarkExtractor(NANO_SCHEMA, "timestamp_ns_column", 
null);
+    new ColumnStatsWatermarkExtractor(NANO_SCHEMA, "timestamptz_ns_column", 
null);
   }
 
   @TestTemplate

Reply via email to