This is an automated email from the ASF dual-hosted git repository.

vinish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-xtable.git


The following commit(s) were added to refs/heads/main by this push:
     new 680cf9c1 Handle timestamp_ntz in delta and iceberg
680cf9c1 is described below

commit 680cf9c170982f1471b88a02cb2ed4021c4b3272
Author: Vinish Reddy <[email protected]>
AuthorDate: Tue Apr 1 14:48:46 2025 -0700

    Handle timestamp_ntz in delta and iceberg
---
 pom.xml                                            |  2 +-
 .../apache/xtable/avro/AvroSchemaConverter.java    | 24 ++++++++-
 .../apache/xtable/delta/DeltaConversionTarget.java | 14 +++--
 .../apache/xtable/delta/DeltaSchemaExtractor.java  | 15 ++++--
 .../xtable/iceberg/IcebergSchemaExtractor.java     |  3 +-
 .../apache/xtable/schema/SparkSchemaExtractor.java |  3 +-
 .../xtable/avro/TestAvroSchemaConverter.java       |  4 +-
 .../xtable/delta/TestDeltaSchemaExtractor.java     | 25 ++++++++-
 .../org/apache/xtable/delta/TestDeltaSync.java     | 61 ++++++++++++++++++++++
 .../xtable/iceberg/TestIcebergSchemaExtractor.java | 12 +++--
 .../xtable/schema/TestSparkSchemaExtractor.java    |  4 +-
 11 files changed, 144 insertions(+), 23 deletions(-)

diff --git a/pom.xml b/pom.xml
index a30a4c98..1a3ed564 100644
--- a/pom.xml
+++ b/pom.xml
@@ -87,7 +87,7 @@
         <spotless.version>2.43.0</spotless.version>
         <apache.rat.version>0.16.1</apache.rat.version>
         <google.java.format.version>1.8</google.java.format.version>
-        <delta.standalone.version>0.5.0</delta.standalone.version>
+        <delta.standalone.version>3.3.0</delta.standalone.version>
         <delta.hive.version>3.0.0</delta.hive.version>
         <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
         <target.dir.pattern>**/target/**</target.dir.pattern>
diff --git 
a/xtable-core/src/main/java/org/apache/xtable/avro/AvroSchemaConverter.java 
b/xtable-core/src/main/java/org/apache/xtable/avro/AvroSchemaConverter.java
index 9f40d29e..9b4c3eac 100644
--- a/xtable-core/src/main/java/org/apache/xtable/avro/AvroSchemaConverter.java
+++ b/xtable-core/src/main/java/org/apache/xtable/avro/AvroSchemaConverter.java
@@ -160,11 +160,15 @@ public class AvroSchemaConverter {
           metadata.put(
               InternalSchema.MetadataKey.TIMESTAMP_PRECISION, 
InternalSchema.MetadataValue.MICROS);
         } else if (logicalType instanceof LogicalTypes.LocalTimestampMillis) {
-          newDataType = InternalType.TIMESTAMP_NTZ;
+          // TODO: https://github.com/apache/incubator-xtable/issues/672
+          // Hudi 0.x writes INT64 in parquet, TimestampNTZType support added 
in 1.x
+          newDataType = InternalType.LONG;
           metadata.put(
               InternalSchema.MetadataKey.TIMESTAMP_PRECISION, 
InternalSchema.MetadataValue.MILLIS);
         } else if (logicalType instanceof LogicalTypes.LocalTimestampMicros) {
-          newDataType = InternalType.TIMESTAMP_NTZ;
+          // TODO: https://github.com/apache/incubator-xtable/issues/672
+          // Hudi 0.x writes INT64 in parquet, TimestampNTZType support added 
in 1.x
+          newDataType = InternalType.LONG;
           metadata.put(
               InternalSchema.MetadataKey.TIMESTAMP_PRECISION, 
InternalSchema.MetadataValue.MICROS);
         } else {
@@ -350,6 +354,22 @@ public class AvroSchemaConverter {
       case INT:
         return finalizeSchema(Schema.create(Schema.Type.INT), internalSchema);
       case LONG:
+        if (internalSchema.getMetadata() != null
+            && internalSchema
+                .getMetadata()
+                .containsKey(InternalSchema.MetadataKey.TIMESTAMP_PRECISION)) {
+          if 
(internalSchema.getMetadata().get(InternalSchema.MetadataKey.TIMESTAMP_PRECISION)
+              == InternalSchema.MetadataValue.MILLIS) {
+            return finalizeSchema(
+                
LogicalTypes.localTimestampMillis().addToSchema(Schema.create(Schema.Type.LONG)),
+                internalSchema);
+          }
+          {
+            return finalizeSchema(
+                
LogicalTypes.localTimestampMicros().addToSchema(Schema.create(Schema.Type.LONG)),
+                internalSchema);
+          }
+        }
         return finalizeSchema(Schema.create(Schema.Type.LONG), internalSchema);
       case STRING:
         return finalizeSchema(Schema.create(Schema.Type.STRING), 
internalSchema);
diff --git 
a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionTarget.java 
b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionTarget.java
index b487e2cb..30d62921 100644
--- 
a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionTarget.java
+++ 
b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaConversionTarget.java
@@ -73,9 +73,9 @@ import org.apache.xtable.spi.sync.ConversionTarget;
 
 @Log4j2
 public class DeltaConversionTarget implements ConversionTarget {
-  private static final String MIN_READER_VERSION = String.valueOf(1);
+  private static final int MIN_READER_VERSION = 1;
   // gets access to generated columns.
-  private static final String MIN_WRITER_VERSION = String.valueOf(4);
+  private static final int MIN_WRITER_VERSION = 4;
 
   private DeltaLog deltaLog;
   private DeltaSchemaExtractor schemaExtractor;
@@ -329,8 +329,14 @@ public class DeltaConversionTarget implements 
ConversionTarget {
 
     private Map<String, String> getConfigurationsForDeltaSync() {
       Map<String, String> configMap = new HashMap<>();
-      configMap.put(DeltaConfigs.MIN_READER_VERSION().key(), 
MIN_READER_VERSION);
-      configMap.put(DeltaConfigs.MIN_WRITER_VERSION().key(), 
MIN_WRITER_VERSION);
+      configMap.put(
+          DeltaConfigs.MIN_READER_VERSION().key(),
+          String.valueOf(
+              Math.max(deltaLog.snapshot().protocol().minReaderVersion(), 
MIN_READER_VERSION)));
+      configMap.put(
+          DeltaConfigs.MIN_WRITER_VERSION().key(),
+          String.valueOf(
+              Math.max(deltaLog.snapshot().protocol().minWriterVersion(), 
MIN_WRITER_VERSION)));
       configMap.put(TableSyncMetadata.XTABLE_METADATA, metadata.toJson());
       // Sets retention for the Delta Log, does not impact underlying files in 
the table
       configMap.put(
diff --git 
a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaSchemaExtractor.java 
b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaSchemaExtractor.java
index d1303e84..1376f884 100644
--- 
a/xtable-core/src/main/java/org/apache/xtable/delta/DeltaSchemaExtractor.java
+++ 
b/xtable-core/src/main/java/org/apache/xtable/delta/DeltaSchemaExtractor.java
@@ -56,6 +56,11 @@ import org.apache.xtable.schema.SchemaUtils;
 public class DeltaSchemaExtractor {
   private static final String DELTA_COLUMN_MAPPING_ID = 
"delta.columnMapping.id";
   private static final DeltaSchemaExtractor INSTANCE = new 
DeltaSchemaExtractor();
+  // Timestamps in Delta are microsecond precision by default
+  private static final Map<InternalSchema.MetadataKey, Object>
+      DEFAULT_TIMESTAMP_PRECISION_METADATA =
+          Collections.singletonMap(
+              InternalSchema.MetadataKey.TIMESTAMP_PRECISION, 
InternalSchema.MetadataValue.MICROS);
 
   public static DeltaSchemaExtractor getInstance() {
     return INSTANCE;
@@ -110,11 +115,11 @@ public class DeltaSchemaExtractor {
         break;
       case "timestamp":
         type = InternalType.TIMESTAMP;
-        // Timestamps in Delta are microsecond precision by default
-        metadata =
-            Collections.singletonMap(
-                InternalSchema.MetadataKey.TIMESTAMP_PRECISION,
-                InternalSchema.MetadataValue.MICROS);
+        metadata = DEFAULT_TIMESTAMP_PRECISION_METADATA;
+        break;
+      case "timestamp_ntz":
+        type = InternalType.TIMESTAMP_NTZ;
+        metadata = DEFAULT_TIMESTAMP_PRECISION_METADATA;
         break;
       case "struct":
         StructType structType = (StructType) dataType;
diff --git 
a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergSchemaExtractor.java
 
b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergSchemaExtractor.java
index 4cf825d7..4366bc02 100644
--- 
a/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergSchemaExtractor.java
+++ 
b/xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergSchemaExtractor.java
@@ -173,7 +173,6 @@ public class IcebergSchemaExtractor {
       case INT:
         return Types.IntegerType.get();
       case LONG:
-      case TIMESTAMP_NTZ: // TODO - revisit this
         return Types.LongType.get();
       case BYTES:
         return Types.BinaryType.get();
@@ -189,6 +188,8 @@ public class IcebergSchemaExtractor {
         return Types.DateType.get();
       case TIMESTAMP:
         return Types.TimestampType.withZone();
+      case TIMESTAMP_NTZ:
+        return Types.TimestampType.withoutZone();
       case DOUBLE:
         return Types.DoubleType.get();
       case DECIMAL:
diff --git 
a/xtable-core/src/main/java/org/apache/xtable/schema/SparkSchemaExtractor.java 
b/xtable-core/src/main/java/org/apache/xtable/schema/SparkSchemaExtractor.java
index cda64941..8db9f610 100644
--- 
a/xtable-core/src/main/java/org/apache/xtable/schema/SparkSchemaExtractor.java
+++ 
b/xtable-core/src/main/java/org/apache/xtable/schema/SparkSchemaExtractor.java
@@ -62,7 +62,6 @@ public class SparkSchemaExtractor {
       case INT:
         return DataTypes.IntegerType;
       case LONG:
-      case TIMESTAMP_NTZ:
         return DataTypes.LongType;
       case BYTES:
       case FIXED:
@@ -76,6 +75,8 @@ public class SparkSchemaExtractor {
         return DataTypes.DateType;
       case TIMESTAMP:
         return DataTypes.TimestampType;
+      case TIMESTAMP_NTZ:
+        return DataTypes.TimestampNTZType;
       case DOUBLE:
         return DataTypes.DoubleType;
       case DECIMAL:
diff --git 
a/xtable-core/src/test/java/org/apache/xtable/avro/TestAvroSchemaConverter.java 
b/xtable-core/src/test/java/org/apache/xtable/avro/TestAvroSchemaConverter.java
index 0b6823a1..4e299198 100644
--- 
a/xtable-core/src/test/java/org/apache/xtable/avro/TestAvroSchemaConverter.java
+++ 
b/xtable-core/src/test/java/org/apache/xtable/avro/TestAvroSchemaConverter.java
@@ -576,7 +576,7 @@ public class TestAvroSchemaConverter {
                         .schema(
                             InternalSchema.builder()
                                 .name("long")
-                                .dataType(InternalType.TIMESTAMP_NTZ)
+                                .dataType(InternalType.LONG)
                                 .isNullable(false)
                                 .metadata(millisMetadata)
                                 .build())
@@ -586,7 +586,7 @@ public class TestAvroSchemaConverter {
                         .schema(
                             InternalSchema.builder()
                                 .name("long")
-                                .dataType(InternalType.TIMESTAMP_NTZ)
+                                .dataType(InternalType.LONG)
                                 .isNullable(false)
                                 .metadata(microsMetadata)
                                 .build())
diff --git 
a/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaSchemaExtractor.java
 
b/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaSchemaExtractor.java
index 9c235a19..81ab34d2 100644
--- 
a/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaSchemaExtractor.java
+++ 
b/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaSchemaExtractor.java
@@ -322,13 +322,36 @@ public class TestDeltaSchemaExtractor {
                                 .metadata(metadata)
                                 .build())
                         
.defaultValue(InternalField.Constants.NULL_DEFAULT_VALUE)
+                        .build(),
+                    InternalField.builder()
+                        .name("requiredTimestampNtz")
+                        .schema(
+                            InternalSchema.builder()
+                                .name("timestamp_ntz")
+                                .dataType(InternalType.TIMESTAMP_NTZ)
+                                .isNullable(false)
+                                .metadata(metadata)
+                                .build())
+                        .build(),
+                    InternalField.builder()
+                        .name("optionalTimestampNtz")
+                        .schema(
+                            InternalSchema.builder()
+                                .name("timestamp_ntz")
+                                .dataType(InternalType.TIMESTAMP_NTZ)
+                                .isNullable(true)
+                                .metadata(metadata)
+                                .build())
+                        
.defaultValue(InternalField.Constants.NULL_DEFAULT_VALUE)
                         .build()))
             .build();
 
     StructType structRepresentationTimestamp =
         new StructType()
             .add("requiredTimestamp", DataTypes.TimestampType, false)
-            .add("optionalTimestamp", DataTypes.TimestampType, true);
+            .add("optionalTimestamp", DataTypes.TimestampType, true)
+            .add("requiredTimestampNtz", DataTypes.TimestampNTZType, false)
+            .add("optionalTimestampNtz", DataTypes.TimestampNTZType, true);
 
     Assertions.assertEquals(
         internalSchemaTimestamp,
diff --git 
a/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaSync.java 
b/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaSync.java
index 7e921efe..fc8a42c2 100644
--- a/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaSync.java
+++ b/xtable-core/src/test/java/org/apache/xtable/delta/TestDeltaSync.java
@@ -53,6 +53,7 @@ import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SparkSession;
 import org.apache.spark.sql.types.DataTypes;
 import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
@@ -94,6 +95,7 @@ import org.apache.xtable.model.stat.Range;
 import org.apache.xtable.model.storage.DataLayoutStrategy;
 import org.apache.xtable.model.storage.FileFormat;
 import org.apache.xtable.model.storage.InternalDataFile;
+import org.apache.xtable.model.storage.InternalFile;
 import org.apache.xtable.model.storage.PartitionFileGroup;
 import org.apache.xtable.model.storage.TableFormat;
 import org.apache.xtable.schema.SchemaFieldFinder;
@@ -431,6 +433,39 @@ public class TestDeltaSync {
     assertFalse(unmappedTargetId.isPresent());
   }
 
+  @Test
+  public void testTimestampNtz() {
+    InternalSchema schema1 = getInternalSchemaWithTimestampNtz();
+    List<InternalField> fields2 = new ArrayList<>(schema1.getFields());
+    fields2.add(
+        InternalField.builder()
+            .name("float_field")
+            .schema(
+                InternalSchema.builder()
+                    .name("float")
+                    .dataType(InternalType.FLOAT)
+                    .isNullable(true)
+                    .build())
+            .build());
+    InternalSchema schema2 = 
getInternalSchema().toBuilder().fields(fields2).build();
+    InternalTable table1 = getInternalTable(tableName, basePath, schema1, 
null, LAST_COMMIT_TIME);
+    InternalTable table2 = getInternalTable(tableName, basePath, schema2, 
null, LAST_COMMIT_TIME);
+
+    InternalDataFile dataFile1 = getDataFile(1, Collections.emptyList(), 
basePath);
+    InternalDataFile dataFile2 = getDataFile(2, Collections.emptyList(), 
basePath);
+    InternalDataFile dataFile3 = getDataFile(3, Collections.emptyList(), 
basePath);
+
+    InternalSnapshot snapshot1 = buildSnapshot(table1, "0", dataFile1, 
dataFile2);
+    InternalSnapshot snapshot2 = buildSnapshot(table2, "1", dataFile2, 
dataFile3);
+
+    TableFormatSync.getInstance()
+        .syncSnapshot(Collections.singletonList(conversionTarget), snapshot1);
+    validateDeltaTableUsingSpark(basePath, new 
HashSet<>(Arrays.asList(dataFile1, dataFile2)));
+    TableFormatSync.getInstance()
+        .syncSnapshot(Collections.singletonList(conversionTarget), snapshot2);
+    validateDeltaTableUsingSpark(basePath, new 
HashSet<>(Arrays.asList(dataFile2, dataFile3)));
+  }
+
   private static Stream<Arguments> timestampPartitionTestingArgs() {
     return Stream.of(
         Arguments.of(PartitionTransformType.YEAR),
@@ -472,6 +507,13 @@ public class TestDeltaSync {
         internalDataFiles.size(), count, "Number of files from DeltaScan don't 
match expectation");
   }
 
+  private void validateDeltaTableUsingSpark(
+      Path basePath, Set<InternalDataFile> internalDataFiles) {
+    Dataset<Row> dataset = 
sparkSession.read().format("delta").load(basePath.toString());
+    long countFromFiles = 
internalDataFiles.stream().mapToLong(InternalFile::getRecordCount).sum();
+    Assertions.assertEquals(countFromFiles, dataset.count());
+  }
+
   private InternalSnapshot buildSnapshot(
       InternalTable table, String sourceIdentifier, InternalDataFile... 
dataFiles) {
     return InternalSnapshot.builder()
@@ -563,6 +605,25 @@ public class TestDeltaSync {
         .build();
   }
 
+  private InternalSchema getInternalSchemaWithTimestampNtz() {
+    Map<InternalSchema.MetadataKey, Object> timestampMetadata = new 
HashMap<>();
+    timestampMetadata.put(
+        InternalSchema.MetadataKey.TIMESTAMP_PRECISION, 
InternalSchema.MetadataValue.MICROS);
+    List<InternalField> fields = new 
ArrayList<>(getInternalSchema().getFields());
+    fields.add(
+        InternalField.builder()
+            .name("timestamp_ntz_field")
+            .schema(
+                InternalSchema.builder()
+                    .name("time_ntz")
+                    .dataType(InternalType.TIMESTAMP_NTZ)
+                    .isNullable(true)
+                    .metadata(timestampMetadata)
+                    .build())
+            .build());
+    return getInternalSchema().toBuilder().fields(fields).build();
+  }
+
   private static SparkSession buildSparkSession() {
     SparkConf sparkConf =
         new SparkConf()
diff --git 
a/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergSchemaExtractor.java
 
b/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergSchemaExtractor.java
index 28776541..824e2285 100644
--- 
a/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergSchemaExtractor.java
+++ 
b/xtable-core/src/test/java/org/apache/xtable/iceberg/TestIcebergSchemaExtractor.java
@@ -501,14 +501,18 @@ public class TestIcebergSchemaExtractor {
                 1, "requiredTimestampMillis", Types.TimestampType.withZone()),
             Types.NestedField.optional(
                 2, "optionalTimestampMillis", Types.TimestampType.withZone()),
-            Types.NestedField.required(3, "requiredTimestampNtzMillis", 
Types.LongType.get()),
-            Types.NestedField.optional(4, "optionalTimestampNtzMillis", 
Types.LongType.get()),
+            Types.NestedField.required(
+                3, "requiredTimestampNtzMillis", 
Types.TimestampType.withoutZone()),
+            Types.NestedField.optional(
+                4, "optionalTimestampNtzMillis", 
Types.TimestampType.withoutZone()),
             Types.NestedField.required(
                 5, "requiredTimestampMicros", Types.TimestampType.withZone()),
             Types.NestedField.optional(
                 6, "optionalTimestampMicros", Types.TimestampType.withZone()),
-            Types.NestedField.required(7, "requiredTimestampNtzMicros", 
Types.LongType.get()),
-            Types.NestedField.optional(8, "optionalTimestampNtzMicros", 
Types.LongType.get()));
+            Types.NestedField.required(
+                7, "requiredTimestampNtzMicros", 
Types.TimestampType.withoutZone()),
+            Types.NestedField.optional(
+                8, "optionalTimestampNtzMicros", 
Types.TimestampType.withoutZone()));
     
assertTrue(expectedTargetSchema.sameSchema(SCHEMA_EXTRACTOR.toIceberg(irSchema)));
 
     Schema sourceSchema =
diff --git 
a/xtable-core/src/test/java/org/apache/xtable/schema/TestSparkSchemaExtractor.java
 
b/xtable-core/src/test/java/org/apache/xtable/schema/TestSparkSchemaExtractor.java
index 59385f05..1a5d71b1 100644
--- 
a/xtable-core/src/test/java/org/apache/xtable/schema/TestSparkSchemaExtractor.java
+++ 
b/xtable-core/src/test/java/org/apache/xtable/schema/TestSparkSchemaExtractor.java
@@ -385,8 +385,8 @@ public class TestSparkSchemaExtractor {
 
     StructType structRepresentationTimestampNtz =
         new StructType()
-            .add("requiredTimestampNtz", DataTypes.LongType, false)
-            .add("optionalTimestampNtz", DataTypes.LongType, true);
+            .add("requiredTimestampNtz", DataTypes.TimestampNTZType, false)
+            .add("optionalTimestampNtz", DataTypes.TimestampNTZType, true);
 
     Assertions.assertEquals(
         structRepresentationTimestamp,

Reply via email to