This is an automated email from the ASF dual-hosted git repository.

ahmedabualsaud pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 286e29cb7a5 [Managed Iceberg] Support partitioning time types (year, 
month, day, hour) (#32939)
286e29cb7a5 is described below

commit 286e29cb7a57c7a716de080167bfaf729251fd45
Author: Ahmed Abualsaud <[email protected]>
AuthorDate: Tue Dec 17 16:33:50 2024 -0500

    [Managed Iceberg] Support partitioning time types (year, month, day, hour) 
(#32939)
---
 .../IO_Iceberg_Integration_Tests.json              |   2 +-
 CHANGES.md                                         |   1 +
 .../beam/sdk/io/iceberg/RecordWriterManager.java   |  79 +++++++-
 .../beam/sdk/io/iceberg/SerializableDataFile.java  |   5 +-
 .../apache/beam/sdk/io/iceberg/IcebergIOIT.java    |   2 +-
 .../IcebergWriteSchemaTransformProviderTest.java   |  98 +++++++++
 .../sdk/io/iceberg/RecordWriterManagerTest.java    | 224 ++++++++++++++++++++-
 7 files changed, 402 insertions(+), 9 deletions(-)

diff --git a/.github/trigger_files/IO_Iceberg_Integration_Tests.json 
b/.github/trigger_files/IO_Iceberg_Integration_Tests.json
index bbdc3a3910e..2160d3c6800 100644
--- a/.github/trigger_files/IO_Iceberg_Integration_Tests.json
+++ b/.github/trigger_files/IO_Iceberg_Integration_Tests.json
@@ -1,4 +1,4 @@
 {
     "comment": "Modify this file in a trivial way to cause this test suite to 
run",
-    "modification": 3
+    "modification": 5
 }
diff --git a/CHANGES.md b/CHANGES.md
index deaa8bfcd47..7707e252961 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -65,6 +65,7 @@
 
 * gcs-connector config options can be set via GcsOptions (Java) 
([#32769](https://github.com/apache/beam/pull/32769)).
 * Support for X source added (Java/Python) 
([#X](https://github.com/apache/beam/issues/X)).
+* [Managed Iceberg] Support partitioning by time (year, month, day, hour) for 
types `date`, `time`, `timestamp`, and `timestamp(tz)` 
([#32939](https://github.com/apache/beam/pull/32939))
 * Upgraded the default version of Hadoop dependencies to 3.4.1. Hadoop 2.10.2 
is still supported (Java) 
([#33011](https://github.com/apache/beam/issues/33011)).
 * [BigQueryIO] Create managed BigLake tables dynamically 
([#33125](https://github.com/apache/beam/pull/33125))
 
diff --git 
a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriterManager.java
 
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriterManager.java
index 255fce9ece4..4c21a0175ab 100644
--- 
a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriterManager.java
+++ 
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriterManager.java
@@ -21,6 +21,11 @@ import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Pr
 import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState;
 
 import java.io.IOException;
+import java.time.LocalDateTime;
+import java.time.YearMonth;
+import java.time.ZoneOffset;
+import java.time.format.DateTimeFormatter;
+import java.time.temporal.ChronoUnit;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
@@ -31,6 +36,7 @@ import org.apache.beam.sdk.util.Preconditions;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.Row;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Splitter;
 import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.Cache;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheBuilder;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.RemovalNotification;
@@ -38,14 +44,20 @@ import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
 import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps;
 import org.apache.iceberg.DataFile;
 import org.apache.iceberg.ManifestFile;
+import org.apache.iceberg.PartitionField;
 import org.apache.iceberg.PartitionKey;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.catalog.Catalog;
 import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.data.GenericRecord;
 import org.apache.iceberg.data.Record;
 import org.apache.iceberg.exceptions.AlreadyExistsException;
 import org.apache.iceberg.exceptions.NoSuchTableException;
+import org.apache.iceberg.expressions.Literal;
+import org.apache.iceberg.transforms.Transform;
+import org.apache.iceberg.transforms.Transforms;
+import org.apache.iceberg.types.Types;
 import org.checkerframework.checker.nullness.qual.Nullable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -90,6 +102,7 @@ class RecordWriterManager implements AutoCloseable {
     final Cache<PartitionKey, RecordWriter> writers;
     private final List<SerializableDataFile> dataFiles = Lists.newArrayList();
     @VisibleForTesting final Map<PartitionKey, Integer> writerCounts = 
Maps.newHashMap();
+    private final Map<String, PartitionField> partitionFieldMap = 
Maps.newHashMap();
     private final List<Exception> exceptions = Lists.newArrayList();
 
     DestinationState(IcebergDestination icebergDestination, Table table) {
@@ -98,6 +111,9 @@ class RecordWriterManager implements AutoCloseable {
       this.spec = table.spec();
       this.partitionKey = new PartitionKey(spec, schema);
       this.table = table;
+      for (PartitionField partitionField : spec.fields()) {
+        partitionFieldMap.put(partitionField.name(), partitionField);
+      }
 
       // build a cache of RecordWriters.
       // writers will expire after 1 min of idle time.
@@ -123,7 +139,9 @@ class RecordWriterManager implements AutoCloseable {
                       throw rethrow;
                     }
                     openWriters--;
-                    
dataFiles.add(SerializableDataFile.from(recordWriter.getDataFile(), pk));
+                    String partitionPath = getPartitionDataPath(pk.toPath(), 
partitionFieldMap);
+                    dataFiles.add(
+                        SerializableDataFile.from(recordWriter.getDataFile(), 
partitionPath));
                   })
               .build();
     }
@@ -136,7 +154,7 @@ class RecordWriterManager implements AutoCloseable {
      * can't create a new writer, the {@link Record} is rejected and {@code 
false} is returned.
      */
     boolean write(Record record) {
-      partitionKey.partition(record);
+      partitionKey.partition(getPartitionableRecord(record));
 
       if (!writers.asMap().containsKey(partitionKey) && openWriters >= 
maxNumWriters) {
         return false;
@@ -185,8 +203,65 @@ class RecordWriterManager implements AutoCloseable {
             e);
       }
     }
+
+    /**
+     * Resolves an input {@link Record}'s partition values and returns another 
{@link Record} that
+     * can be applied to the destination's {@link PartitionSpec}.
+     */
+    private Record getPartitionableRecord(Record record) {
+      if (spec.isUnpartitioned()) {
+        return record;
+      }
+      Record output = GenericRecord.create(schema);
+      for (PartitionField partitionField : spec.fields()) {
+        Transform<?, ?> transform = partitionField.transform();
+        Types.NestedField field = schema.findField(partitionField.sourceId());
+        String name = field.name();
+        Object value = record.getField(name);
+        @Nullable Literal<Object> literal = 
Literal.of(value.toString()).to(field.type());
+        if (literal == null || transform.isVoid() || transform.isIdentity()) {
+          output.setField(name, value);
+        } else {
+          output.setField(name, literal.value());
+        }
+      }
+      return output;
+    }
   }
 
+  /**
+   * Returns an equivalent partition path that is made up of partition data. 
Needed to reconstruct a
+   * {@link DataFile}.
+   */
+  @VisibleForTesting
+  static String getPartitionDataPath(
+      String partitionPath, Map<String, PartitionField> partitionFieldMap) {
+    if (partitionPath.isEmpty() || partitionFieldMap.isEmpty()) {
+      return partitionPath;
+    }
+    List<String> resolved = new ArrayList<>();
+    for (String partition : Splitter.on('/').splitToList(partitionPath)) {
+      List<String> nameAndValue = Splitter.on('=').splitToList(partition);
+      String name = nameAndValue.get(0);
+      String value = nameAndValue.get(1);
+      String transformName =
+          
Preconditions.checkArgumentNotNull(partitionFieldMap.get(name)).transform().toString();
+      if (Transforms.month().toString().equals(transformName)) {
+        int month = YearMonth.parse(value).getMonthValue();
+        value = String.valueOf(month);
+      } else if (Transforms.hour().toString().equals(transformName)) {
+        long hour = ChronoUnit.HOURS.between(EPOCH, LocalDateTime.parse(value, 
HOUR_FORMATTER));
+        value = String.valueOf(hour);
+      }
+      resolved.add(name + "=" + value);
+    }
+    return String.join("/", resolved);
+  }
+
+  private static final DateTimeFormatter HOUR_FORMATTER =
+      DateTimeFormatter.ofPattern("yyyy-MM-dd-HH");
+  private static final LocalDateTime EPOCH = LocalDateTime.ofEpochSecond(0, 0, 
ZoneOffset.UTC);
+
   private final Catalog catalog;
   private final String filePrefix;
   private final long maxFileSize;
diff --git 
a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/SerializableDataFile.java
 
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/SerializableDataFile.java
index 59b45616200..eef2b154d24 100644
--- 
a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/SerializableDataFile.java
+++ 
b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/SerializableDataFile.java
@@ -116,13 +116,14 @@ abstract class SerializableDataFile {
    * Create a {@link SerializableDataFile} from a {@link DataFile} and its 
associated {@link
    * PartitionKey}.
    */
-  static SerializableDataFile from(DataFile f, PartitionKey key) {
+  static SerializableDataFile from(DataFile f, String partitionPath) {
+
     return SerializableDataFile.builder()
         .setPath(f.path().toString())
         .setFileFormat(f.format().toString())
         .setRecordCount(f.recordCount())
         .setFileSizeInBytes(f.fileSizeInBytes())
-        .setPartitionPath(key.toPath())
+        .setPartitionPath(partitionPath)
         .setPartitionSpecId(f.specId())
         .setKeyMetadata(f.keyMetadata())
         .setSplitOffsets(f.splitOffsets())
diff --git 
a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOIT.java
 
b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOIT.java
index c79b0a55005..a060bc16d6c 100644
--- 
a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOIT.java
+++ 
b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOIT.java
@@ -354,7 +354,7 @@ public class IcebergIOIT implements Serializable {
     PartitionSpec partitionSpec =
         PartitionSpec.builderFor(ICEBERG_SCHEMA)
             .identity("bool")
-            .identity("modulo_5")
+            .hour("datetime")
             .truncate("str", "value_x".length())
             .build();
     Table table =
diff --git 
a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProviderTest.java
 
b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProviderTest.java
index 47dc9aa425d..9834547c474 100644
--- 
a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProviderTest.java
+++ 
b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProviderTest.java
@@ -23,14 +23,19 @@ import static 
org.apache.beam.sdk.io.iceberg.IcebergWriteSchemaTransformProvider
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertEquals;
 
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
+import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.managed.Managed;
 import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.testing.TestStream;
@@ -49,12 +54,16 @@ import org.apache.beam.sdk.values.ValueInSingleWindow;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.data.IcebergGenerics;
 import org.apache.iceberg.data.Record;
+import org.apache.iceberg.util.DateTimeUtil;
 import org.checkerframework.checker.nullness.qual.Nullable;
 import org.hamcrest.Matchers;
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
 import org.junit.ClassRule;
@@ -360,4 +369,93 @@ public class IcebergWriteSchemaTransformProviderTest {
       return null;
     }
   }
+
+  @Test
+  public void testWritePartitionedData() {
+    Schema schema =
+        Schema.builder()
+            .addStringField("str")
+            .addInt32Field("int")
+            .addLogicalTypeField("y_date", SqlTypes.DATE)
+            .addLogicalTypeField("y_datetime", SqlTypes.DATETIME)
+            .addDateTimeField("y_datetime_tz")
+            .addLogicalTypeField("m_date", SqlTypes.DATE)
+            .addLogicalTypeField("m_datetime", SqlTypes.DATETIME)
+            .addDateTimeField("m_datetime_tz")
+            .addLogicalTypeField("d_date", SqlTypes.DATE)
+            .addLogicalTypeField("d_datetime", SqlTypes.DATETIME)
+            .addDateTimeField("d_datetime_tz")
+            .addLogicalTypeField("h_datetime", SqlTypes.DATETIME)
+            .addDateTimeField("h_datetime_tz")
+            .build();
+    org.apache.iceberg.Schema icebergSchema = 
IcebergUtils.beamSchemaToIcebergSchema(schema);
+    PartitionSpec spec =
+        PartitionSpec.builderFor(icebergSchema)
+            .identity("str")
+            .bucket("int", 5)
+            .year("y_date")
+            .year("y_datetime")
+            .year("y_datetime_tz")
+            .month("m_date")
+            .month("m_datetime")
+            .month("m_datetime_tz")
+            .day("d_date")
+            .day("d_datetime")
+            .day("d_datetime_tz")
+            .hour("h_datetime")
+            .hour("h_datetime_tz")
+            .build();
+    String identifier = "default.table_" + 
Long.toString(UUID.randomUUID().hashCode(), 16);
+
+    warehouse.createTable(TableIdentifier.parse(identifier), icebergSchema, 
spec);
+    Map<String, Object> config =
+        ImmutableMap.of(
+            "table",
+            identifier,
+            "catalog_properties",
+            ImmutableMap.of("type", "hadoop", "warehouse", 
warehouse.location));
+
+    List<Row> rows = new ArrayList<>();
+    for (int i = 0; i < 30; i++) {
+      long millis = i * 100_00_000_000L;
+      LocalDate localDate = DateTimeUtil.dateFromDays(i * 100);
+      LocalDateTime localDateTime = DateTimeUtil.timestampFromMicros(millis * 
1000);
+      DateTime dateTime = new 
DateTime(millis).withZone(DateTimeZone.forOffsetHoursMinutes(3, 25));
+      Row row =
+          Row.withSchema(schema)
+              .addValues(
+                  "str_" + i,
+                  i,
+                  localDate,
+                  localDateTime,
+                  dateTime,
+                  localDate,
+                  localDateTime,
+                  dateTime,
+                  localDate,
+                  localDateTime,
+                  dateTime,
+                  localDateTime,
+                  dateTime)
+              .build();
+      rows.add(row);
+    }
+
+    PCollection<Row> result =
+        testPipeline
+            .apply("Records To Add", Create.of(rows))
+            .setRowSchema(schema)
+            .apply(Managed.write(Managed.ICEBERG).withConfig(config))
+            .get(SNAPSHOTS_TAG);
+
+    PAssert.that(result)
+        .satisfies(new VerifyOutputs(Collections.singletonList(identifier), 
"append"));
+    testPipeline.run().waitUntilFinish();
+
+    Pipeline p = Pipeline.create(TestPipeline.testingPipelineOptions());
+    PCollection<Row> readRows =
+        
p.apply(Managed.read(Managed.ICEBERG).withConfig(config)).getSinglePCollection();
+    PAssert.that(readRows).containsInAnyOrder(rows);
+    p.run();
+  }
 }
diff --git 
a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/RecordWriterManagerTest.java
 
b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/RecordWriterManagerTest.java
index 2bce390e099..5168f71fef9 100644
--- 
a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/RecordWriterManagerTest.java
+++ 
b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/RecordWriterManagerTest.java
@@ -27,9 +27,14 @@ import static org.junit.Assert.assertThrows;
 import static org.junit.Assert.assertTrue;
 
 import java.io.IOException;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.util.WindowedValue;
@@ -39,6 +44,7 @@ import org.apache.commons.lang3.RandomStringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.iceberg.DataFile;
 import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.PartitionField;
 import org.apache.iceberg.PartitionKey;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Table;
@@ -46,6 +52,8 @@ import org.apache.iceberg.catalog.Catalog;
 import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.hadoop.HadoopCatalog;
 import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
 import org.junit.Before;
 import org.junit.ClassRule;
 import org.junit.Rule;
@@ -85,9 +93,14 @@ public class RecordWriterManagerTest {
 
   private WindowedValue<IcebergDestination> getWindowedDestination(
       String tableName, @Nullable PartitionSpec partitionSpec) {
+    return getWindowedDestination(tableName, ICEBERG_SCHEMA, partitionSpec);
+  }
+
+  private WindowedValue<IcebergDestination> getWindowedDestination(
+      String tableName, org.apache.iceberg.Schema schema, @Nullable 
PartitionSpec partitionSpec) {
     TableIdentifier tableIdentifier = TableIdentifier.of("default", tableName);
 
-    warehouse.createTable(tableIdentifier, ICEBERG_SCHEMA, partitionSpec);
+    warehouse.createTable(tableIdentifier, schema, partitionSpec);
 
     IcebergDestination icebergDestination =
         IcebergDestination.builder()
@@ -314,8 +327,15 @@ public class RecordWriterManagerTest {
     DataFile datafile = writer.getDataFile();
     assertEquals(2L, datafile.recordCount());
 
+    Map<String, PartitionField> partitionFieldMap = new HashMap<>();
+    for (PartitionField partitionField : PARTITION_SPEC.fields()) {
+      partitionFieldMap.put(partitionField.name(), partitionField);
+    }
+
+    String partitionPath =
+        RecordWriterManager.getPartitionDataPath(partitionKey.toPath(), 
partitionFieldMap);
     DataFile roundTripDataFile =
-        SerializableDataFile.from(datafile, partitionKey)
+        SerializableDataFile.from(datafile, partitionPath)
             .createDataFile(ImmutableMap.of(PARTITION_SPEC.specId(), 
PARTITION_SPEC));
 
     checkDataFileEquality(datafile, roundTripDataFile);
@@ -347,8 +367,14 @@ public class RecordWriterManagerTest {
     writer.close();
 
     // fetch data file and its serializable version
+    Map<String, PartitionField> partitionFieldMap = new HashMap<>();
+    for (PartitionField partitionField : PARTITION_SPEC.fields()) {
+      partitionFieldMap.put(partitionField.name(), partitionField);
+    }
+    String partitionPath =
+        RecordWriterManager.getPartitionDataPath(partitionKey.toPath(), 
partitionFieldMap);
     DataFile datafile = writer.getDataFile();
-    SerializableDataFile serializableDataFile = 
SerializableDataFile.from(datafile, partitionKey);
+    SerializableDataFile serializableDataFile = 
SerializableDataFile.from(datafile, partitionPath);
 
     assertEquals(2L, datafile.recordCount());
     assertEquals(serializableDataFile.getPartitionSpecId(), datafile.specId());
@@ -415,6 +441,198 @@ public class RecordWriterManagerTest {
     }
   }
 
+  @Test
+  public void testIdentityPartitioning() throws IOException {
+    Schema primitiveTypeSchema =
+        Schema.builder()
+            .addBooleanField("bool")
+            .addInt32Field("int")
+            .addInt64Field("long")
+            .addFloatField("float")
+            .addDoubleField("double")
+            .addStringField("str")
+            .build();
+
+    Row row =
+        Row.withSchema(primitiveTypeSchema).addValues(true, 1, 1L, 1.23f, 
4.56, "str").build();
+    org.apache.iceberg.Schema icebergSchema =
+        IcebergUtils.beamSchemaToIcebergSchema(primitiveTypeSchema);
+    PartitionSpec spec =
+        PartitionSpec.builderFor(icebergSchema)
+            .identity("bool")
+            .identity("int")
+            .identity("long")
+            .identity("float")
+            .identity("double")
+            .identity("str")
+            .build();
+    WindowedValue<IcebergDestination> dest =
+        getWindowedDestination("identity_partitioning", icebergSchema, spec);
+
+    RecordWriterManager writer =
+        new RecordWriterManager(catalog, "test_prefix", Long.MAX_VALUE, 
Integer.MAX_VALUE);
+    writer.write(dest, row);
+    writer.close();
+    List<SerializableDataFile> files = 
writer.getSerializableDataFiles().get(dest);
+    assertEquals(1, files.size());
+    SerializableDataFile dataFile = files.get(0);
+    assertEquals(1, dataFile.getRecordCount());
+    // build this string: bool=true/int=1/long=1/float=1.0/double=1.0/str=str
+    List<String> expectedPartitions = new ArrayList<>();
+    for (Schema.Field field : primitiveTypeSchema.getFields()) {
+      Object val = row.getValue(field.getName());
+      expectedPartitions.add(field.getName() + "=" + val);
+    }
+    String expectedPartitionPath = String.join("/", expectedPartitions);
+    assertEquals(expectedPartitionPath, dataFile.getPartitionPath());
+    assertThat(dataFile.getPath(), containsString(expectedPartitionPath));
+  }
+
+  @Test
+  public void testBucketPartitioning() throws IOException {
+    Schema bucketSchema =
+        Schema.builder()
+            .addInt32Field("int")
+            .addInt64Field("long")
+            .addStringField("str")
+            .addLogicalTypeField("date", SqlTypes.DATE)
+            .addLogicalTypeField("time", SqlTypes.TIME)
+            .addLogicalTypeField("datetime", SqlTypes.DATETIME)
+            .addDateTimeField("datetime_tz")
+            .build();
+
+    String timestamp = "2024-10-08T13:18:20.053";
+    LocalDateTime localDateTime = LocalDateTime.parse(timestamp);
+
+    Row row =
+        Row.withSchema(bucketSchema)
+            .addValues(
+                1,
+                1L,
+                "str",
+                localDateTime.toLocalDate(),
+                localDateTime.toLocalTime(),
+                localDateTime,
+                DateTime.parse(timestamp))
+            .build();
+    org.apache.iceberg.Schema icebergSchema = 
IcebergUtils.beamSchemaToIcebergSchema(bucketSchema);
+    PartitionSpec spec =
+        PartitionSpec.builderFor(icebergSchema)
+            .bucket("int", 2)
+            .bucket("long", 2)
+            .bucket("str", 2)
+            .bucket("date", 2)
+            .bucket("time", 2)
+            .bucket("datetime", 2)
+            .bucket("datetime_tz", 2)
+            .build();
+    WindowedValue<IcebergDestination> dest =
+        getWindowedDestination("bucket_partitioning", icebergSchema, spec);
+
+    RecordWriterManager writer =
+        new RecordWriterManager(catalog, "test_prefix", Long.MAX_VALUE, 
Integer.MAX_VALUE);
+    writer.write(dest, row);
+    writer.close();
+    List<SerializableDataFile> files = 
writer.getSerializableDataFiles().get(dest);
+    assertEquals(1, files.size());
+    SerializableDataFile dataFile = files.get(0);
+    assertEquals(1, dataFile.getRecordCount());
+    for (Schema.Field field : bucketSchema.getFields()) {
+      String expectedPartition = field.getName() + "_bucket";
+      assertThat(dataFile.getPartitionPath(), 
containsString(expectedPartition));
+      assertThat(dataFile.getPath(), containsString(expectedPartition));
+    }
+  }
+
+  @Test
+  public void testTimePartitioning() throws IOException {
+    Schema timePartitioningSchema =
+        Schema.builder()
+            .addLogicalTypeField("y_date", SqlTypes.DATE)
+            .addLogicalTypeField("y_datetime", SqlTypes.DATETIME)
+            .addDateTimeField("y_datetime_tz")
+            .addLogicalTypeField("m_date", SqlTypes.DATE)
+            .addLogicalTypeField("m_datetime", SqlTypes.DATETIME)
+            .addDateTimeField("m_datetime_tz")
+            .addLogicalTypeField("d_date", SqlTypes.DATE)
+            .addLogicalTypeField("d_datetime", SqlTypes.DATETIME)
+            .addDateTimeField("d_datetime_tz")
+            .addLogicalTypeField("h_datetime", SqlTypes.DATETIME)
+            .addDateTimeField("h_datetime_tz")
+            .build();
+    org.apache.iceberg.Schema icebergSchema =
+        IcebergUtils.beamSchemaToIcebergSchema(timePartitioningSchema);
+    PartitionSpec spec =
+        PartitionSpec.builderFor(icebergSchema)
+            .year("y_date")
+            .year("y_datetime")
+            .year("y_datetime_tz")
+            .month("m_date")
+            .month("m_datetime")
+            .month("m_datetime_tz")
+            .day("d_date")
+            .day("d_datetime")
+            .day("d_datetime_tz")
+            .hour("h_datetime")
+            .hour("h_datetime_tz")
+            .build();
+
+    WindowedValue<IcebergDestination> dest =
+        getWindowedDestination("time_partitioning", icebergSchema, spec);
+
+    String timestamp = "2024-10-08T13:18:20.053";
+    LocalDateTime localDateTime = LocalDateTime.parse(timestamp);
+    LocalDate localDate = localDateTime.toLocalDate();
+    String timestamptz = "2024-10-08T13:18:20.053+03:27";
+    DateTime dateTime = DateTime.parse(timestamptz);
+
+    Row row =
+        Row.withSchema(timePartitioningSchema)
+            .addValues(localDate, localDateTime, dateTime) // year
+            .addValues(localDate, localDateTime, dateTime) // month
+            .addValues(localDate, localDateTime, dateTime) // day
+            .addValues(localDateTime, dateTime) // hour
+            .build();
+
+    // write some rows
+    RecordWriterManager writer =
+        new RecordWriterManager(catalog, "test_prefix", Long.MAX_VALUE, 
Integer.MAX_VALUE);
+    writer.write(dest, row);
+    writer.close();
+    List<SerializableDataFile> files = 
writer.getSerializableDataFiles().get(dest);
+    assertEquals(1, files.size());
+    SerializableDataFile serializableDataFile = files.get(0);
+    assertEquals(1, serializableDataFile.getRecordCount());
+
+    int year = localDateTime.getYear();
+    int month = localDateTime.getMonthValue();
+    int day = localDateTime.getDayOfMonth();
+    int hour = localDateTime.getHour();
+    List<String> expectedPartitions = new ArrayList<>();
+    for (Schema.Field field : timePartitioningSchema.getFields()) {
+      String name = field.getName();
+      String expected = "";
+      if (name.startsWith("y_")) {
+        expected = String.format("%s_year=%s", name, year);
+      } else if (name.startsWith("m_")) {
+        expected = String.format("%s_month=%s-%02d", name, year, month);
+      } else if (name.startsWith("d_")) {
+        expected = String.format("%s_day=%s-%02d-%02d", name, year, month, 
day);
+      } else if (name.startsWith("h_")) {
+        if (name.contains("tz")) {
+          hour = dateTime.withZone(DateTimeZone.UTC).getHourOfDay();
+        }
+        expected = String.format("%s_hour=%s-%02d-%02d-%02d", name, year, 
month, day, hour);
+      }
+      expectedPartitions.add(expected);
+    }
+    String expectedPartition = String.join("/", expectedPartitions);
+    DataFile dataFile =
+        serializableDataFile.createDataFile(
+            catalog.loadTable(dest.getValue().getTableIdentifier()).specs());
+    assertThat(dataFile.path().toString(), containsString(expectedPartition));
+  }
+
   @Rule public ExpectedException thrown = ExpectedException.none();
 
   @Test

Reply via email to