This is an automated email from the ASF dual-hosted git repository.
kunni pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
The following commit(s) were added to refs/heads/master by this push:
new 1899bc1c5 [FLINK-38808][pipeline-connector][iceberg] Support partition
transforms in Iceberg sink (#4192)
1899bc1c5 is described below
commit 1899bc1c51080411f146ab029dfd684cfad669bd
Author: fcfangcc <[email protected]>
AuthorDate: Tue Jan 13 20:24:00 2026 +0800
[FLINK-38808][pipeline-connector][iceberg] Support partition transforms in
Iceberg sink (#4192)
---
.../docs/connectors/pipeline-connectors/iceberg.md | 2 +-
.../iceberg/sink/IcebergDataSinkFactory.java | 23 +++-
.../iceberg/sink/IcebergDataSinkOptions.java | 3 +-
.../iceberg/sink/IcebergMetadataApplier.java | 75 +++++++++++--
.../iceberg/sink/IcebergDataSinkFactoryTest.java | 89 +++++++++++++++
.../iceberg/sink/v2/IcebergWriterTest.java | 123 ++++++++++++++++++++-
6 files changed, 300 insertions(+), 15 deletions(-)
diff --git a/docs/content/docs/connectors/pipeline-connectors/iceberg.md
b/docs/content/docs/connectors/pipeline-connectors/iceberg.md
index 0d1411471..129d154b0 100644
--- a/docs/content/docs/connectors/pipeline-connectors/iceberg.md
+++ b/docs/content/docs/connectors/pipeline-connectors/iceberg.md
@@ -136,7 +136,7 @@ Pipeline Connector Options
<td>optional</td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
- <td>Partition keys for each partitioned table. Allow setting multiple
primary keys for multiTables. Tables are separated by ';', and partition keys
are separated by ','. For example, we can set <code>partition.key</code> of two
tables using 'testdb.table1:id1,id2;testdb.table2:name'.</td>
+ <td>Partition keys for each partitioned table. Allow setting multiple
primary keys for multiTables. Tables are separated by ';', and partition keys
are separated by ','. For example, we can set <code>partition.key</code> of two
tables using 'testdb.table1:id1,id2;testdb.table2:name'. For partition
transforms, we can set <code>partition.key</code> using
'testdb.table1:truncate[10](id);testdb.table2:hour(create_time);testdb.table3:day(create_time);testdb.table4:month(create_time);tes
[...]
</tr>
<tr>
<td>catalog.properties.*</td>
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergDataSinkFactory.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergDataSinkFactory.java
index d13f8f070..80f1df659 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergDataSinkFactory.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergDataSinkFactory.java
@@ -19,6 +19,7 @@ package org.apache.flink.cdc.connectors.iceberg.sink;
import org.apache.flink.cdc.common.configuration.ConfigOption;
import org.apache.flink.cdc.common.configuration.Configuration;
+import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.factories.DataSinkFactory;
import org.apache.flink.cdc.common.factories.FactoryHelper;
import org.apache.flink.cdc.common.pipeline.PipelineOptions;
@@ -31,8 +32,10 @@ import
org.apache.flink.shaded.guava31.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.CatalogProperties;
import java.time.ZoneId;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
@@ -86,10 +89,28 @@ public class IcebergDataSinkFactory implements
DataSinkFactory {
CompactionOptions compactionOptions =
getCompactionStrategy(context.getFactoryConfiguration());
+ Map<TableId, List<String>> partitionMaps = new HashMap<>();
+ String partitionKey =
+
context.getFactoryConfiguration().get(IcebergDataSinkOptions.PARTITION_KEY);
+ if (partitionKey != null && !partitionKey.isEmpty()) {
+ for (String tables : partitionKey.trim().split(";")) {
+ String[] splits = tables.trim().split(":");
+ if (splits.length == 2) {
+ TableId tableId = TableId.parse(splits[0]);
+ List<String> partitions =
Arrays.asList(splits[1].split(","));
+ partitionMaps.put(tableId, partitions);
+ } else {
+ throw new IllegalArgumentException(
+ IcebergDataSinkOptions.PARTITION_KEY.key()
+ + " is malformed, please refer to the
documents");
+ }
+ }
+ }
+
return new IcebergDataSink(
catalogOptions,
tableOptions,
- new HashMap<>(),
+ partitionMaps,
zoneId,
schemaOperatorUid,
compactionOptions);
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergDataSinkOptions.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergDataSinkOptions.java
index c20486345..517e1c8eb 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergDataSinkOptions.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergDataSinkOptions.java
@@ -51,7 +51,8 @@ public class IcebergDataSinkOptions {
.withDescription(
"Partition keys for each partitioned table, allow
setting multiple primary keys for multiTables. "
+ "Tables are separated by ';', and
partition keys are separated by ','. "
- + "For example, we can set partition.key
of two tables by 'testdb.table1:id1,id2;testdb.table2:name'.");
+ + "For example, we can set partition.key
of two tables by 'testdb.table1:id1,id2;testdb.table2:name'."
+ + "For partition transforms, we can set
partition.key by
'testdb.table1:truncate[10](id);testdb.table2:hour(create_time);testdb.table3:day(create_time);testdb.table4:month(create_time);testdb.table5:year(create_time);testdb.table6:bucket[10](create_time)'.");
@Experimental
public static final ConfigOption<Boolean> SINK_COMPACTION_ENABLED =
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergMetadataApplier.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergMetadataApplier.java
index d9d913286..24d01ffd0 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergMetadataApplier.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergMetadataApplier.java
@@ -58,11 +58,26 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
import static org.apache.flink.cdc.common.utils.Preconditions.checkNotNull;
/** A {@link MetadataApplier} for Apache Iceberg. */
public class IcebergMetadataApplier implements MetadataApplier {
+ private static final Pattern PARTITION_YEAR_PATTERN =
Pattern.compile("^year\\((.*)\\)$");
+
+ private static final Pattern PARTITION_MONTH_PATTERN =
Pattern.compile("^month\\((.*)\\)$");
+
+ private static final Pattern PARTITION_DAY_PATTERN =
Pattern.compile("^day\\((.*)\\)$");
+
+ private static final Pattern PARTITION_HOUR_PATTERN =
Pattern.compile("^hour\\((.*)\\)$");
+
+ private static final Pattern PARTITION_BUCKET_PATTERN =
+ Pattern.compile("^bucket\\[(\\d+)]\\((.*)\\)$");
+
+ private static final Pattern PARTITION_TRUNCATE_PATTERN =
+ Pattern.compile("^truncate\\[(\\d+)]\\((.*)\\)$");
private static final Logger LOG =
LoggerFactory.getLogger(IcebergMetadataApplier.class);
@@ -161,13 +176,7 @@ public class IcebergMetadataApplier implements
MetadataApplier {
if (partitionMaps.containsKey(event.tableId())) {
partitionColumns = partitionMaps.get(event.tableId());
}
- PartitionSpec.Builder builder =
PartitionSpec.builderFor(icebergSchema);
- for (String name : partitionColumns) {
- // TODO Add more partition transforms, see
- // https://iceberg.apache.org/spec/#partition-transforms.
- builder.identity(name);
- }
- PartitionSpec partitionSpec = builder.build();
+ PartitionSpec partitionSpec = generatePartitionSpec(icebergSchema,
partitionColumns);
if (!catalog.tableExists(tableIdentifier)) {
catalog.createTable(tableIdentifier, icebergSchema,
partitionSpec, tableOptions);
LOG.info(
@@ -281,6 +290,58 @@ public class IcebergMetadataApplier implements
MetadataApplier {
}
}
+ private PartitionSpec generatePartitionSpec(Schema schema, List<String>
partitionColumns) {
+ PartitionSpec.Builder builder = PartitionSpec.builderFor(schema);
+ for (String name : partitionColumns) {
+ Matcher matcherYear = PARTITION_YEAR_PATTERN.matcher(name);
+ if (matcherYear.matches()) {
+ String matchedName = matcherYear.group(1);
+ builder.year(matchedName);
+ continue;
+ }
+
+ Matcher matcherMonth = PARTITION_MONTH_PATTERN.matcher(name);
+ if (matcherMonth.matches()) {
+ String matchedName = matcherMonth.group(1);
+ builder.month(matchedName);
+ continue;
+ }
+
+ Matcher matcherDay = PARTITION_DAY_PATTERN.matcher(name);
+ if (matcherDay.matches()) {
+ String matchedName = matcherDay.group(1);
+ builder.day(matchedName);
+ continue;
+ }
+
+ Matcher matcherHour = PARTITION_HOUR_PATTERN.matcher(name);
+ if (matcherHour.matches()) {
+ String matchedName = matcherHour.group(1);
+ builder.hour(matchedName);
+ continue;
+ }
+
+ Matcher matcherBucket = PARTITION_BUCKET_PATTERN.matcher(name);
+ if (matcherBucket.matches()) {
+ String matchedName = matcherBucket.group(2);
+ int numBuckets = Integer.parseInt(matcherBucket.group(1));
+ builder.bucket(matchedName, numBuckets);
+ continue;
+ }
+
+ Matcher matcherTruncate = PARTITION_TRUNCATE_PATTERN.matcher(name);
+ if (matcherTruncate.matches()) {
+ String matchedName = matcherTruncate.group(2);
+ int width = Integer.parseInt(matcherTruncate.group(1));
+ builder.truncate(matchedName, width);
+ continue;
+ }
+
+ builder.identity(name);
+ }
+ return builder.build();
+ }
+
@Override
public MetadataApplier setAcceptedSchemaEvolutionTypes(
Set<SchemaChangeEventType> schemaEvolutionTypes) {
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergDataSinkFactoryTest.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergDataSinkFactoryTest.java
index 126491b73..dff510a41 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergDataSinkFactoryTest.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergDataSinkFactoryTest.java
@@ -18,19 +18,35 @@
package org.apache.flink.cdc.connectors.iceberg.sink;
import org.apache.flink.cdc.common.configuration.Configuration;
+import org.apache.flink.cdc.common.event.CreateTableEvent;
+import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.factories.DataSinkFactory;
import org.apache.flink.cdc.common.factories.FactoryHelper;
+import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.cdc.common.sink.DataSink;
+import org.apache.flink.cdc.common.types.DataTypes;
import org.apache.flink.cdc.composer.utils.FactoryDiscoveryUtils;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.shaded.guava31.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.TableIdentifier;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
/** Tests for {@link IcebergDataSinkFactory}. */
public class IcebergDataSinkFactoryTest {
+ @TempDir public static java.nio.file.Path temporaryFolder;
@Test
void testCreateDataSink() {
@@ -92,4 +108,77 @@ public class IcebergDataSinkFactoryTest {
conf, conf,
Thread.currentThread().getContextClassLoader()));
Assertions.assertThat(dataSink).isInstanceOf(IcebergDataSink.class);
}
+
+ @Test
+ public void testPartitionOption() {
+ Map<String, String> testcases = new HashMap<>();
+ testcases.put("test.iceberg_partition_table:year(create_time)",
"create_time_year");
+ testcases.put("test.iceberg_partition_table:month(create_time)",
"create_time_month");
+ testcases.put("test.iceberg_partition_table:day(create_time)",
"create_time_day");
+ testcases.put("test.iceberg_partition_table:hour(create_time)",
"create_time_hour");
+ testcases.put("test.iceberg_partition_table:bucket[8](create_time)",
"create_time_bucket");
+ testcases.put("test.iceberg_partition_table:create_time",
"create_time");
+ testcases.put("test.iceberg_partition_table:truncate[8](id)",
"id_trunc");
+
+ for (Map.Entry<String, String> entry : testcases.entrySet()) {
+ runTestPartitionOption(entry.getKey(), entry.getValue());
+ }
+ }
+
+ public void runTestPartitionOption(String partitionKey, String
transformColumnName) {
+ Map<String, String> catalogOptions = new HashMap<>();
+ String warehouse =
+ new File(temporaryFolder.toFile(),
UUID.randomUUID().toString()).toString();
+ catalogOptions.put("type", "hadoop");
+ catalogOptions.put("warehouse", warehouse);
+ catalogOptions.put("cache-enabled", "false");
+ Catalog catalog =
+ CatalogUtil.buildIcebergCatalog(
+ "cdc-iceberg-catalog",
+ catalogOptions,
+ new org.apache.hadoop.conf.Configuration());
+
+ Map<String, String> catalogConf = new HashMap<>();
+ for (Map.Entry<String, String> entry : catalogOptions.entrySet()) {
+ catalogConf.put(
+ IcebergDataSinkOptions.PREFIX_CATALOG_PROPERTIES +
entry.getKey(),
+ entry.getValue());
+ }
+ IcebergDataSinkFactory sinkFactory = new IcebergDataSinkFactory();
+ Configuration conf = Configuration.fromMap(catalogConf);
+ conf.set(IcebergDataSinkOptions.PARTITION_KEY, partitionKey);
+ DataSink dataSink =
+ sinkFactory.createDataSink(
+ new FactoryHelper.DefaultContext(
+ conf, conf,
Thread.currentThread().getContextClassLoader()));
+
+ TableId tableId = TableId.parse("test.iceberg_partition_table");
+ CreateTableEvent createTableEvent =
+ new CreateTableEvent(
+ tableId,
+ Schema.newBuilder()
+ .physicalColumn(
+ "id",
+ DataTypes.BIGINT().notNull(),
+ "column for id",
+ "AUTO_DECREMENT()")
+ .physicalColumn(
+ "create_time",
+ DataTypes.TIMESTAMP().notNull(),
+ "column for name",
+ null)
+ .primaryKey("id")
+ .build());
+
+ dataSink.getMetadataApplier().applySchemaChange(createTableEvent);
+
+ Table table =
+ catalog.loadTable(
+ TableIdentifier.of(tableId.getSchemaName(),
tableId.getTableName()));
+
+ Assertions.assertThat(table.specs().size()).isEqualTo(1);
+ PartitionSpec spec = table.specs().get(0);
+ Assertions.assertThat(spec.isPartitioned()).isTrue();
+
Assertions.assertThat(spec.rawPartitionType().field(transformColumnName)).isNotNull();
+ }
}
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergWriterTest.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergWriterTest.java
index 4a6c1581b..2e28a39d0 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergWriterTest.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergWriterTest.java
@@ -42,11 +42,14 @@ import
org.apache.flink.streaming.runtime.operators.sink.committables.CommitRequ
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.data.IcebergGenerics;
import org.apache.iceberg.data.Record;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.types.Types;
import org.assertj.core.api.Assertions;
@@ -176,7 +179,7 @@ public class IcebergWriterTest {
Collection<Committer.CommitRequest<WriteResultWrapper>> collection =
writeResults.stream().map(MockCommitRequestImpl::new).collect(Collectors.toList());
icebergCommitter.commit(collection);
- List<String> result = fetchTableContent(catalog, tableId);
+ List<String> result = fetchTableContent(catalog, tableId, null);
Assertions.assertThat(result)
.containsExactlyInAnyOrder(
"1, Mark, 10, test, true, 1.0, 1.0, 1.00, 1970-01-10",
@@ -252,7 +255,7 @@ public class IcebergWriterTest {
collection =
writeResults.stream().map(MockCommitRequestImpl::new).collect(Collectors.toList());
icebergCommitter.commit(collection);
- result = fetchTableContent(catalog, tableId);
+ result = fetchTableContent(catalog, tableId, null);
Assertions.assertThat(result)
.containsExactlyInAnyOrder(
"1, Mark, 10, test, true, 1.0, 1.0, 1.00, 1970-01-10,
null",
@@ -343,12 +346,117 @@ public class IcebergWriterTest {
Collection<Committer.CommitRequest<WriteResultWrapper>> collection =
writeResults.stream().map(MockCommitRequestImpl::new).collect(Collectors.toList());
icebergCommitter.commit(collection);
- List<String> result = fetchTableContent(catalog, tableId);
+ List<String> result = fetchTableContent(catalog, tableId, null);
Assertions.assertThat(result)
.containsExactlyInAnyOrder(
"char, varchar, string, false, [1,2,3,4,5,],
[1,2,3,4,5,6,7,8,9,10,], 0.00, 1, -1, 2, -2, 12345, 12345, 123.456, 123456.789,
00:00:12.345, 2003-10-20, 1970-01-01T00:00, 1970-01-01T00:00Z,
1970-01-01T00:00Z");
}
+ @Test
+ public void testWriteWithPartitionTypes() throws Exception {
+ // all target value is from 1970-01-01 00:00:00 -> 1971-01-01 12:00:01
+ Map<String, Expression> testcases = new HashMap<>();
+ testcases.put("year(create_time)",
Expressions.equal(Expressions.year("create_time"), 1));
+ testcases.put(
+ "month(create_time)",
Expressions.equal(Expressions.month("create_time"), 12));
+ testcases.put("day(create_time)",
Expressions.equal(Expressions.day("create_time"), 365));
+ testcases.put(
+ "hour(create_time)",
+ Expressions.equal(Expressions.hour("create_time"), 365 * 24 +
12));
+ testcases.put("bucket[8](id)",
Expressions.equal(Expressions.bucket("id", 8), 1));
+ testcases.put("truncate[8](id)",
Expressions.equal(Expressions.truncate("id", 8), 12344));
+
+ for (Map.Entry<String, Expression> entry : testcases.entrySet()) {
+ runTestPartitionWrite(entry.getKey(), entry.getValue());
+ }
+ }
+
+ private void runTestPartitionWrite(String partitionKey, Expression
expression)
+ throws Exception {
+ Map<String, String> catalogOptions = new HashMap<>();
+ String warehouse =
+ new File(temporaryFolder.toFile(),
UUID.randomUUID().toString()).toString();
+ catalogOptions.put("type", "hadoop");
+ catalogOptions.put("warehouse", warehouse);
+ catalogOptions.put("cache-enabled", "false");
+ Catalog catalog =
+ CatalogUtil.buildIcebergCatalog(
+ "cdc-iceberg-catalog", catalogOptions, new
Configuration());
+ IcebergWriter icebergWriter =
+ new IcebergWriter(catalogOptions, 1, 1,
ZoneId.systemDefault());
+
+ TableId tableId = TableId.parse("test.iceberg_table");
+ Map<TableId, List<String>> partitionMaps = new HashMap<>();
+ partitionMaps.put(tableId, List.of(partitionKey));
+
+ IcebergMetadataApplier icebergMetadataApplier =
+ new IcebergMetadataApplier(catalogOptions, new HashMap<>(),
partitionMaps);
+ // Create Table.
+ CreateTableEvent createTableEvent =
+ new CreateTableEvent(
+ tableId,
+ Schema.newBuilder()
+ .physicalColumn(
+ "id",
+ DataTypes.BIGINT().notNull(),
+ "column for id",
+ "AUTO_DECREMENT()")
+ .physicalColumn(
+ "create_time",
+ DataTypes.TIMESTAMP().notNull(),
+ "column for name",
+ null)
+ .primaryKey("id")
+ .build());
+ icebergMetadataApplier.applySchemaChange(createTableEvent);
+ icebergWriter.write(createTableEvent, null);
+ BinaryRecordDataGenerator binaryRecordDataGenerator =
+ new BinaryRecordDataGenerator(
+
createTableEvent.getSchema().getColumnDataTypes().toArray(new DataType[0]));
+ RecordData recordData =
+ binaryRecordDataGenerator.generate(
+ new Object[] {
+ 12345L,
+
TimestampData.fromTimestamp(Timestamp.valueOf("1971-01-01 12:00:01")),
+ });
+ DataChangeEvent dataChangeEvent = DataChangeEvent.insertEvent(tableId,
recordData);
+ icebergWriter.write(dataChangeEvent, null);
+ RecordData recordData2 =
+ binaryRecordDataGenerator.generate(
+ new Object[] {
+ 1L,
+
TimestampData.fromTimestamp(Timestamp.valueOf("2026-01-12 12:00:01")),
+ });
+ DataChangeEvent dataChangeEvent2 =
DataChangeEvent.insertEvent(tableId, recordData2);
+ icebergWriter.write(dataChangeEvent2, null);
+ Collection<WriteResultWrapper> writeResults =
icebergWriter.prepareCommit();
+ IcebergCommitter icebergCommitter = new
IcebergCommitter(catalogOptions);
+ Collection<Committer.CommitRequest<WriteResultWrapper>> collection =
+
writeResults.stream().map(MockCommitRequestImpl::new).collect(Collectors.toList());
+ icebergCommitter.commit(collection);
+
+ Table table =
+ catalog.loadTable(
+ TableIdentifier.of(tableId.getSchemaName(),
tableId.getTableName()));
+
+ // test filter files
+ CloseableIterable<FileScanTask> tasks =
table.newScan().filter(expression).planFiles();
+ int fileCount = 0;
+ for (FileScanTask task : tasks) {
+ fileCount++;
+
Assertions.assertThat(task.file().partition().toString()).contains("PartitionData{");
+ }
+ Assertions.assertThat(fileCount).isEqualTo(1);
+
+ // test filter records
+ List<String> result = fetchTableContent(catalog, tableId, expression);
+ Assertions.assertThat(result.size()).isEqualTo(1);
+ Assertions.assertThat(result).containsExactlyInAnyOrder("12345,
1971-01-01T12:00:01");
+
+ result = fetchTableContent(catalog, tableId, null);
+ Assertions.assertThat(result.size()).isEqualTo(2);
+ }
+
/** Mock CommitRequestImpl. */
public static class MockCommitRequestImpl<CommT> extends
CommitRequestImpl<CommT> {
@@ -360,13 +468,18 @@ public class IcebergWriterTest {
}
}
- private List<String> fetchTableContent(Catalog catalog, TableId tableId) {
+ private List<String> fetchTableContent(
+ Catalog catalog, TableId tableId, Expression expression) {
List<String> results = new ArrayList<>();
Table table =
catalog.loadTable(
TableIdentifier.of(tableId.getSchemaName(),
tableId.getTableName()));
org.apache.iceberg.Schema schema = table.schema();
- CloseableIterable<Record> records =
IcebergGenerics.read(table).project(schema).build();
+ IcebergGenerics.ScanBuilder scanbuilder = IcebergGenerics.read(table);
+ if (expression != null) {
+ scanbuilder = scanbuilder.where(expression);
+ }
+ CloseableIterable<Record> records =
scanbuilder.project(schema).build();
for (Record record : records) {
List<String> fieldValues = new ArrayList<>();
for (Types.NestedField field : schema.columns()) {