This is an automated email from the ASF dual-hosted git repository.

kunni pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/master by this push:
     new 1899bc1c5 [FLINK-38808][pipeline-connector][iceberg] Support partition 
transforms in Iceberg sink (#4192)
1899bc1c5 is described below

commit 1899bc1c51080411f146ab029dfd684cfad669bd
Author: fcfangcc <[email protected]>
AuthorDate: Tue Jan 13 20:24:00 2026 +0800

    [FLINK-38808][pipeline-connector][iceberg] Support partition transforms in 
Iceberg sink (#4192)
---
 .../docs/connectors/pipeline-connectors/iceberg.md |   2 +-
 .../iceberg/sink/IcebergDataSinkFactory.java       |  23 +++-
 .../iceberg/sink/IcebergDataSinkOptions.java       |   3 +-
 .../iceberg/sink/IcebergMetadataApplier.java       |  75 +++++++++++--
 .../iceberg/sink/IcebergDataSinkFactoryTest.java   |  89 +++++++++++++++
 .../iceberg/sink/v2/IcebergWriterTest.java         | 123 ++++++++++++++++++++-
 6 files changed, 300 insertions(+), 15 deletions(-)

diff --git a/docs/content/docs/connectors/pipeline-connectors/iceberg.md 
b/docs/content/docs/connectors/pipeline-connectors/iceberg.md
index 0d1411471..129d154b0 100644
--- a/docs/content/docs/connectors/pipeline-connectors/iceberg.md
+++ b/docs/content/docs/connectors/pipeline-connectors/iceberg.md
@@ -136,7 +136,7 @@ Pipeline Connector Options
       <td>optional</td>
       <td style="word-wrap: break-word;">(none)</td>
       <td>String</td>
-      <td>Partition keys for each partitioned table. Allow setting multiple 
primary keys for multiTables. Tables are separated by ';', and partition keys 
are separated by ','. For example, we can set <code>partition.key</code> of two 
tables using 'testdb.table1:id1,id2;testdb.table2:name'.</td>
+      <td>Partition keys for each partitioned table. Allow setting multiple 
primary keys for multiTables. Tables are separated by ';', and partition keys 
are separated by ','. For example, we can set <code>partition.key</code> of two 
tables using 'testdb.table1:id1,id2;testdb.table2:name'. For partition 
transforms, we can set <code>partition.key</code> using 
'testdb.table1:truncate[10](id);testdb.table2:hour(create_time);testdb.table3:day(create_time);testdb.table4:month(create_time);tes
 [...]
     </tr>
     <tr>
       <td>catalog.properties.*</td>
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergDataSinkFactory.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergDataSinkFactory.java
index d13f8f070..80f1df659 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergDataSinkFactory.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergDataSinkFactory.java
@@ -19,6 +19,7 @@ package org.apache.flink.cdc.connectors.iceberg.sink;
 
 import org.apache.flink.cdc.common.configuration.ConfigOption;
 import org.apache.flink.cdc.common.configuration.Configuration;
+import org.apache.flink.cdc.common.event.TableId;
 import org.apache.flink.cdc.common.factories.DataSinkFactory;
 import org.apache.flink.cdc.common.factories.FactoryHelper;
 import org.apache.flink.cdc.common.pipeline.PipelineOptions;
@@ -31,8 +32,10 @@ import 
org.apache.flink.shaded.guava31.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.CatalogProperties;
 
 import java.time.ZoneId;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
@@ -86,10 +89,28 @@ public class IcebergDataSinkFactory implements 
DataSinkFactory {
         CompactionOptions compactionOptions =
                 getCompactionStrategy(context.getFactoryConfiguration());
 
+        Map<TableId, List<String>> partitionMaps = new HashMap<>();
+        String partitionKey =
+                
context.getFactoryConfiguration().get(IcebergDataSinkOptions.PARTITION_KEY);
+        if (partitionKey != null && !partitionKey.isEmpty()) {
+            for (String tables : partitionKey.trim().split(";")) {
+                String[] splits = tables.trim().split(":");
+                if (splits.length == 2) {
+                    TableId tableId = TableId.parse(splits[0]);
+                    List<String> partitions = 
Arrays.asList(splits[1].split(","));
+                    partitionMaps.put(tableId, partitions);
+                } else {
+                    throw new IllegalArgumentException(
+                            IcebergDataSinkOptions.PARTITION_KEY.key()
+                                    + " is malformed, please refer to the 
documents");
+                }
+            }
+        }
+
         return new IcebergDataSink(
                 catalogOptions,
                 tableOptions,
-                new HashMap<>(),
+                partitionMaps,
                 zoneId,
                 schemaOperatorUid,
                 compactionOptions);
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergDataSinkOptions.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergDataSinkOptions.java
index c20486345..517e1c8eb 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergDataSinkOptions.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergDataSinkOptions.java
@@ -51,7 +51,8 @@ public class IcebergDataSinkOptions {
                     .withDescription(
                             "Partition keys for each partitioned table, allow 
setting multiple primary keys for multiTables. "
                                     + "Tables are separated by ';', and 
partition keys are separated by ','. "
-                                    + "For example, we can set partition.key 
of two tables by 'testdb.table1:id1,id2;testdb.table2:name'.");
+                                    + "For example, we can set partition.key 
of two tables by 'testdb.table1:id1,id2;testdb.table2:name'."
+                                    + "For partition transforms,  we can set 
partition.key by 
'testdb.table1:truncate[10](id);testdb.table2:hour(create_time);testdb.table3:day(create_time);testdb.table4:month(create_time);testdb.table5:year(create_time);testdb.table6:bucket[10](create_time)'.");
 
     @Experimental
     public static final ConfigOption<Boolean> SINK_COMPACTION_ENABLED =
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergMetadataApplier.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergMetadataApplier.java
index d9d913286..24d01ffd0 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergMetadataApplier.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergMetadataApplier.java
@@ -58,11 +58,26 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 
 import static org.apache.flink.cdc.common.utils.Preconditions.checkNotNull;
 
 /** A {@link MetadataApplier} for Apache Iceberg. */
 public class IcebergMetadataApplier implements MetadataApplier {
+    private static final Pattern PARTITION_YEAR_PATTERN = 
Pattern.compile("^year\\((.*)\\)$");
+
+    private static final Pattern PARTITION_MONTH_PATTERN = 
Pattern.compile("^month\\((.*)\\)$");
+
+    private static final Pattern PARTITION_DAY_PATTERN = 
Pattern.compile("^day\\((.*)\\)$");
+
+    private static final Pattern PARTITION_HOUR_PATTERN = 
Pattern.compile("^hour\\((.*)\\)$");
+
+    private static final Pattern PARTITION_BUCKET_PATTERN =
+            Pattern.compile("^bucket\\[(\\d+)]\\((.*)\\)$");
+
+    private static final Pattern PARTITION_TRUNCATE_PATTERN =
+            Pattern.compile("^truncate\\[(\\d+)]\\((.*)\\)$");
 
     private static final Logger LOG = 
LoggerFactory.getLogger(IcebergMetadataApplier.class);
 
@@ -161,13 +176,7 @@ public class IcebergMetadataApplier implements 
MetadataApplier {
             if (partitionMaps.containsKey(event.tableId())) {
                 partitionColumns = partitionMaps.get(event.tableId());
             }
-            PartitionSpec.Builder builder = 
PartitionSpec.builderFor(icebergSchema);
-            for (String name : partitionColumns) {
-                // TODO Add more partition transforms, see
-                // https://iceberg.apache.org/spec/#partition-transforms.
-                builder.identity(name);
-            }
-            PartitionSpec partitionSpec = builder.build();
+            PartitionSpec partitionSpec = generatePartitionSpec(icebergSchema, 
partitionColumns);
             if (!catalog.tableExists(tableIdentifier)) {
                 catalog.createTable(tableIdentifier, icebergSchema, 
partitionSpec, tableOptions);
                 LOG.info(
@@ -281,6 +290,58 @@ public class IcebergMetadataApplier implements 
MetadataApplier {
         }
     }
 
+    private PartitionSpec generatePartitionSpec(Schema schema, List<String> 
partitionColumns) {
+        PartitionSpec.Builder builder = PartitionSpec.builderFor(schema);
+        for (String name : partitionColumns) {
+            Matcher matcherYear = PARTITION_YEAR_PATTERN.matcher(name);
+            if (matcherYear.matches()) {
+                String matchedName = matcherYear.group(1);
+                builder.year(matchedName);
+                continue;
+            }
+
+            Matcher matcherMonth = PARTITION_MONTH_PATTERN.matcher(name);
+            if (matcherMonth.matches()) {
+                String matchedName = matcherMonth.group(1);
+                builder.month(matchedName);
+                continue;
+            }
+
+            Matcher matcherDay = PARTITION_DAY_PATTERN.matcher(name);
+            if (matcherDay.matches()) {
+                String matchedName = matcherDay.group(1);
+                builder.day(matchedName);
+                continue;
+            }
+
+            Matcher matcherHour = PARTITION_HOUR_PATTERN.matcher(name);
+            if (matcherHour.matches()) {
+                String matchedName = matcherHour.group(1);
+                builder.hour(matchedName);
+                continue;
+            }
+
+            Matcher matcherBucket = PARTITION_BUCKET_PATTERN.matcher(name);
+            if (matcherBucket.matches()) {
+                String matchedName = matcherBucket.group(2);
+                int numBuckets = Integer.parseInt(matcherBucket.group(1));
+                builder.bucket(matchedName, numBuckets);
+                continue;
+            }
+
+            Matcher matcherTruncate = PARTITION_TRUNCATE_PATTERN.matcher(name);
+            if (matcherTruncate.matches()) {
+                String matchedName = matcherTruncate.group(2);
+                int width = Integer.parseInt(matcherTruncate.group(1));
+                builder.truncate(matchedName, width);
+                continue;
+            }
+
+            builder.identity(name);
+        }
+        return builder.build();
+    }
+
     @Override
     public MetadataApplier setAcceptedSchemaEvolutionTypes(
             Set<SchemaChangeEventType> schemaEvolutionTypes) {
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergDataSinkFactoryTest.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergDataSinkFactoryTest.java
index 126491b73..dff510a41 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergDataSinkFactoryTest.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergDataSinkFactoryTest.java
@@ -18,19 +18,35 @@
 package org.apache.flink.cdc.connectors.iceberg.sink;
 
 import org.apache.flink.cdc.common.configuration.Configuration;
+import org.apache.flink.cdc.common.event.CreateTableEvent;
+import org.apache.flink.cdc.common.event.TableId;
 import org.apache.flink.cdc.common.factories.DataSinkFactory;
 import org.apache.flink.cdc.common.factories.FactoryHelper;
+import org.apache.flink.cdc.common.schema.Schema;
 import org.apache.flink.cdc.common.sink.DataSink;
+import org.apache.flink.cdc.common.types.DataTypes;
 import org.apache.flink.cdc.composer.utils.FactoryDiscoveryUtils;
 import org.apache.flink.table.api.ValidationException;
 
 import org.apache.flink.shaded.guava31.com.google.common.collect.ImmutableMap;
 
+import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.TableIdentifier;
 import org.assertj.core.api.Assertions;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
 
 /** Tests for {@link IcebergDataSinkFactory}. */
 public class IcebergDataSinkFactoryTest {
+    @TempDir public static java.nio.file.Path temporaryFolder;
 
     @Test
     void testCreateDataSink() {
@@ -92,4 +108,77 @@ public class IcebergDataSinkFactoryTest {
                                 conf, conf, 
Thread.currentThread().getContextClassLoader()));
         Assertions.assertThat(dataSink).isInstanceOf(IcebergDataSink.class);
     }
+
+    @Test
+    public void testPartitionOption() {
+        Map<String, String> testcases = new HashMap<>();
+        testcases.put("test.iceberg_partition_table:year(create_time)", 
"create_time_year");
+        testcases.put("test.iceberg_partition_table:month(create_time)", 
"create_time_month");
+        testcases.put("test.iceberg_partition_table:day(create_time)", 
"create_time_day");
+        testcases.put("test.iceberg_partition_table:hour(create_time)", 
"create_time_hour");
+        testcases.put("test.iceberg_partition_table:bucket[8](create_time)", 
"create_time_bucket");
+        testcases.put("test.iceberg_partition_table:create_time", 
"create_time");
+        testcases.put("test.iceberg_partition_table:truncate[8](id)", 
"id_trunc");
+
+        for (Map.Entry<String, String> entry : testcases.entrySet()) {
+            runTestPartitionOption(entry.getKey(), entry.getValue());
+        }
+    }
+
+    public void runTestPartitionOption(String partitionKey, String 
transformColumnName) {
+        Map<String, String> catalogOptions = new HashMap<>();
+        String warehouse =
+                new File(temporaryFolder.toFile(), 
UUID.randomUUID().toString()).toString();
+        catalogOptions.put("type", "hadoop");
+        catalogOptions.put("warehouse", warehouse);
+        catalogOptions.put("cache-enabled", "false");
+        Catalog catalog =
+                CatalogUtil.buildIcebergCatalog(
+                        "cdc-iceberg-catalog",
+                        catalogOptions,
+                        new org.apache.hadoop.conf.Configuration());
+
+        Map<String, String> catalogConf = new HashMap<>();
+        for (Map.Entry<String, String> entry : catalogOptions.entrySet()) {
+            catalogConf.put(
+                    IcebergDataSinkOptions.PREFIX_CATALOG_PROPERTIES + 
entry.getKey(),
+                    entry.getValue());
+        }
+        IcebergDataSinkFactory sinkFactory = new IcebergDataSinkFactory();
+        Configuration conf = Configuration.fromMap(catalogConf);
+        conf.set(IcebergDataSinkOptions.PARTITION_KEY, partitionKey);
+        DataSink dataSink =
+                sinkFactory.createDataSink(
+                        new FactoryHelper.DefaultContext(
+                                conf, conf, 
Thread.currentThread().getContextClassLoader()));
+
+        TableId tableId = TableId.parse("test.iceberg_partition_table");
+        CreateTableEvent createTableEvent =
+                new CreateTableEvent(
+                        tableId,
+                        Schema.newBuilder()
+                                .physicalColumn(
+                                        "id",
+                                        DataTypes.BIGINT().notNull(),
+                                        "column for id",
+                                        "AUTO_DECREMENT()")
+                                .physicalColumn(
+                                        "create_time",
+                                        DataTypes.TIMESTAMP().notNull(),
+                                        "column for name",
+                                        null)
+                                .primaryKey("id")
+                                .build());
+
+        dataSink.getMetadataApplier().applySchemaChange(createTableEvent);
+
+        Table table =
+                catalog.loadTable(
+                        TableIdentifier.of(tableId.getSchemaName(), 
tableId.getTableName()));
+
+        Assertions.assertThat(table.specs().size()).isEqualTo(1);
+        PartitionSpec spec = table.specs().get(0);
+        Assertions.assertThat(spec.isPartitioned()).isTrue();
+        
Assertions.assertThat(spec.rawPartitionType().field(transformColumnName)).isNotNull();
+    }
 }
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergWriterTest.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergWriterTest.java
index 4a6c1581b..2e28a39d0 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergWriterTest.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergWriterTest.java
@@ -42,11 +42,14 @@ import 
org.apache.flink.streaming.runtime.operators.sink.committables.CommitRequ
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.FileScanTask;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.catalog.Catalog;
 import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.data.IcebergGenerics;
 import org.apache.iceberg.data.Record;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
 import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.types.Types;
 import org.assertj.core.api.Assertions;
@@ -176,7 +179,7 @@ public class IcebergWriterTest {
         Collection<Committer.CommitRequest<WriteResultWrapper>> collection =
                 
writeResults.stream().map(MockCommitRequestImpl::new).collect(Collectors.toList());
         icebergCommitter.commit(collection);
-        List<String> result = fetchTableContent(catalog, tableId);
+        List<String> result = fetchTableContent(catalog, tableId, null);
         Assertions.assertThat(result)
                 .containsExactlyInAnyOrder(
                         "1, Mark, 10, test, true, 1.0, 1.0, 1.00, 1970-01-10",
@@ -252,7 +255,7 @@ public class IcebergWriterTest {
         collection =
                 
writeResults.stream().map(MockCommitRequestImpl::new).collect(Collectors.toList());
         icebergCommitter.commit(collection);
-        result = fetchTableContent(catalog, tableId);
+        result = fetchTableContent(catalog, tableId, null);
         Assertions.assertThat(result)
                 .containsExactlyInAnyOrder(
                         "1, Mark, 10, test, true, 1.0, 1.0, 1.00, 1970-01-10, 
null",
@@ -343,12 +346,117 @@ public class IcebergWriterTest {
         Collection<Committer.CommitRequest<WriteResultWrapper>> collection =
                 
writeResults.stream().map(MockCommitRequestImpl::new).collect(Collectors.toList());
         icebergCommitter.commit(collection);
-        List<String> result = fetchTableContent(catalog, tableId);
+        List<String> result = fetchTableContent(catalog, tableId, null);
         Assertions.assertThat(result)
                 .containsExactlyInAnyOrder(
                         "char, varchar, string, false, [1,2,3,4,5,], 
[1,2,3,4,5,6,7,8,9,10,], 0.00, 1, -1, 2, -2, 12345, 12345, 123.456, 123456.789, 
00:00:12.345, 2003-10-20, 1970-01-01T00:00, 1970-01-01T00:00Z, 
1970-01-01T00:00Z");
     }
 
+    @Test
+    public void testWriteWithPartitionTypes() throws Exception {
+        // all target value is from 1970-01-01 00:00:00 -> 1971-01-01 12:00:01
+        Map<String, Expression> testcases = new HashMap<>();
+        testcases.put("year(create_time)", 
Expressions.equal(Expressions.year("create_time"), 1));
+        testcases.put(
+                "month(create_time)", 
Expressions.equal(Expressions.month("create_time"), 12));
+        testcases.put("day(create_time)", 
Expressions.equal(Expressions.day("create_time"), 365));
+        testcases.put(
+                "hour(create_time)",
+                Expressions.equal(Expressions.hour("create_time"), 365 * 24 + 
12));
+        testcases.put("bucket[8](id)", 
Expressions.equal(Expressions.bucket("id", 8), 1));
+        testcases.put("truncate[8](id)", 
Expressions.equal(Expressions.truncate("id", 8), 12344));
+
+        for (Map.Entry<String, Expression> entry : testcases.entrySet()) {
+            runTestPartitionWrite(entry.getKey(), entry.getValue());
+        }
+    }
+
+    private void runTestPartitionWrite(String partitionKey, Expression 
expression)
+            throws Exception {
+        Map<String, String> catalogOptions = new HashMap<>();
+        String warehouse =
+                new File(temporaryFolder.toFile(), 
UUID.randomUUID().toString()).toString();
+        catalogOptions.put("type", "hadoop");
+        catalogOptions.put("warehouse", warehouse);
+        catalogOptions.put("cache-enabled", "false");
+        Catalog catalog =
+                CatalogUtil.buildIcebergCatalog(
+                        "cdc-iceberg-catalog", catalogOptions, new 
Configuration());
+        IcebergWriter icebergWriter =
+                new IcebergWriter(catalogOptions, 1, 1, 
ZoneId.systemDefault());
+
+        TableId tableId = TableId.parse("test.iceberg_table");
+        Map<TableId, List<String>> partitionMaps = new HashMap<>();
+        partitionMaps.put(tableId, List.of(partitionKey));
+
+        IcebergMetadataApplier icebergMetadataApplier =
+                new IcebergMetadataApplier(catalogOptions, new HashMap<>(), 
partitionMaps);
+        // Create Table.
+        CreateTableEvent createTableEvent =
+                new CreateTableEvent(
+                        tableId,
+                        Schema.newBuilder()
+                                .physicalColumn(
+                                        "id",
+                                        DataTypes.BIGINT().notNull(),
+                                        "column for id",
+                                        "AUTO_DECREMENT()")
+                                .physicalColumn(
+                                        "create_time",
+                                        DataTypes.TIMESTAMP().notNull(),
+                                        "column for name",
+                                        null)
+                                .primaryKey("id")
+                                .build());
+        icebergMetadataApplier.applySchemaChange(createTableEvent);
+        icebergWriter.write(createTableEvent, null);
+        BinaryRecordDataGenerator binaryRecordDataGenerator =
+                new BinaryRecordDataGenerator(
+                        
createTableEvent.getSchema().getColumnDataTypes().toArray(new DataType[0]));
+        RecordData recordData =
+                binaryRecordDataGenerator.generate(
+                        new Object[] {
+                            12345L,
+                            
TimestampData.fromTimestamp(Timestamp.valueOf("1971-01-01 12:00:01")),
+                        });
+        DataChangeEvent dataChangeEvent = DataChangeEvent.insertEvent(tableId, 
recordData);
+        icebergWriter.write(dataChangeEvent, null);
+        RecordData recordData2 =
+                binaryRecordDataGenerator.generate(
+                        new Object[] {
+                            1L,
+                            
TimestampData.fromTimestamp(Timestamp.valueOf("2026-01-12 12:00:01")),
+                        });
+        DataChangeEvent dataChangeEvent2 = 
DataChangeEvent.insertEvent(tableId, recordData2);
+        icebergWriter.write(dataChangeEvent2, null);
+        Collection<WriteResultWrapper> writeResults = 
icebergWriter.prepareCommit();
+        IcebergCommitter icebergCommitter = new 
IcebergCommitter(catalogOptions);
+        Collection<Committer.CommitRequest<WriteResultWrapper>> collection =
+                
writeResults.stream().map(MockCommitRequestImpl::new).collect(Collectors.toList());
+        icebergCommitter.commit(collection);
+
+        Table table =
+                catalog.loadTable(
+                        TableIdentifier.of(tableId.getSchemaName(), 
tableId.getTableName()));
+
+        // test filter files
+        CloseableIterable<FileScanTask> tasks = 
table.newScan().filter(expression).planFiles();
+        int fileCount = 0;
+        for (FileScanTask task : tasks) {
+            fileCount++;
+            
Assertions.assertThat(task.file().partition().toString()).contains("PartitionData{");
+        }
+        Assertions.assertThat(fileCount).isEqualTo(1);
+
+        // test filter records
+        List<String> result = fetchTableContent(catalog, tableId, expression);
+        Assertions.assertThat(result.size()).isEqualTo(1);
+        Assertions.assertThat(result).containsExactlyInAnyOrder("12345, 
1971-01-01T12:00:01");
+
+        result = fetchTableContent(catalog, tableId, null);
+        Assertions.assertThat(result.size()).isEqualTo(2);
+    }
+
     /** Mock CommitRequestImpl. */
     public static class MockCommitRequestImpl<CommT> extends 
CommitRequestImpl<CommT> {
 
@@ -360,13 +468,18 @@ public class IcebergWriterTest {
         }
     }
 
-    private List<String> fetchTableContent(Catalog catalog, TableId tableId) {
+    private List<String> fetchTableContent(
+            Catalog catalog, TableId tableId, Expression expression) {
         List<String> results = new ArrayList<>();
         Table table =
                 catalog.loadTable(
                         TableIdentifier.of(tableId.getSchemaName(), 
tableId.getTableName()));
         org.apache.iceberg.Schema schema = table.schema();
-        CloseableIterable<Record> records = 
IcebergGenerics.read(table).project(schema).build();
+        IcebergGenerics.ScanBuilder scanbuilder = IcebergGenerics.read(table);
+        if (expression != null) {
+            scanbuilder = scanbuilder.where(expression);
+        }
+        CloseableIterable<Record> records = 
scanbuilder.project(schema).build();
         for (Record record : records) {
             List<String> fieldValues = new ArrayList<>();
             for (Types.NestedField field : schema.columns()) {

Reply via email to