This is an automated email from the ASF dual-hosted git repository.
mehulbatra pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new ae7387e38 [lake/iceberg] Add iceberg it case (#1572)
ae7387e38 is described below
commit ae7387e387168fe97fd2cfb0a493aeb60bfdb5ca
Author: yuxia Luo <[email protected]>
AuthorDate: Fri Aug 22 17:31:15 2025 +0800
[lake/iceberg] Add iceberg it case (#1572)
* add iceberg it case
* address comments
---
fluss-lake/fluss-lake-iceberg/pom.xml | 83 +++
.../testutils/FlinkIcebergTieringTestBase.java} | 554 ++++++++++-----------
.../lake/iceberg/tiering/IcebergTieringITCase.java | 205 ++++++++
.../iceberg/{ => tiering}/IcebergTieringTest.java | 21 +-
.../testutils/FlinkPaimonTieringTestBase.java | 14 -
5 files changed, 568 insertions(+), 309 deletions(-)
diff --git a/fluss-lake/fluss-lake-iceberg/pom.xml
b/fluss-lake/fluss-lake-iceberg/pom.xml
index 274de4919..e0443c49b 100644
--- a/fluss-lake/fluss-lake-iceberg/pom.xml
+++ b/fluss-lake/fluss-lake-iceberg/pom.xml
@@ -68,6 +68,8 @@
<artifactId>iceberg-bundled-guava</artifactId>
<version>${iceberg.version}</version>
</dependency>
+
+ <!-- test dependency -->
<dependency>
<groupId>com.alibaba.fluss</groupId>
<artifactId>fluss-common</artifactId>
@@ -80,6 +82,33 @@
<version>${project.version}</version>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>com.alibaba.fluss</groupId>
+ <artifactId>fluss-server</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.alibaba.fluss</groupId>
+ <artifactId>fluss-server</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>com.alibaba.fluss</groupId>
+ <artifactId>fluss-common</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>com.alibaba.fluss</groupId>
+ <artifactId>fluss-test-utils</artifactId>
+ </dependency>
+
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
@@ -130,6 +159,22 @@
<artifactId>commons-io</artifactId>
<groupId>commons-io</groupId>
</exclusion>
+ <exclusion>
+ <groupId>org.apache.curator</groupId>
+ <artifactId>curator-client</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.curator</groupId>
+ <artifactId>curator-framework</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.curator</groupId>
+ <artifactId>curator-recipes</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.zookeeper</groupId>
+ <artifactId>zookeeper</artifactId>
+ </exclusion>
</exclusions>
</dependency>
@@ -140,6 +185,44 @@
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>com.alibaba.fluss</groupId>
+ <artifactId>fluss-flink-common</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-core</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-common</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-runtime</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-connector-base</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-test-utils</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+
</dependencies>
<build>
diff --git
a/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/testutils/FlinkPaimonTieringTestBase.java
b/fluss-lake/fluss-lake-iceberg/src/test/java/com/alibaba/fluss/lake/iceberg/testutils/FlinkIcebergTieringTestBase.java
similarity index 53%
copy from
fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/testutils/FlinkPaimonTieringTestBase.java
copy to
fluss-lake/fluss-lake-iceberg/src/test/java/com/alibaba/fluss/lake/iceberg/testutils/FlinkIcebergTieringTestBase.java
index 8abb04d35..b4abe4ab5 100644
---
a/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/testutils/FlinkPaimonTieringTestBase.java
+++
b/fluss-lake/fluss-lake-iceberg/src/test/java/com/alibaba/fluss/lake/iceberg/testutils/FlinkIcebergTieringTestBase.java
@@ -1,12 +1,13 @@
/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -15,7 +16,7 @@
* limitations under the License.
*/
-package com.alibaba.fluss.lake.paimon.testutils;
+package com.alibaba.fluss.lake.iceberg.testutils;
import com.alibaba.fluss.client.Connection;
import com.alibaba.fluss.client.ConnectionFactory;
@@ -43,40 +44,48 @@ import com.alibaba.fluss.types.DataTypes;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.paimon.Snapshot;
-import org.apache.paimon.catalog.Catalog;
-import org.apache.paimon.catalog.CatalogContext;
-import org.apache.paimon.catalog.CatalogFactory;
-import org.apache.paimon.catalog.Identifier;
-import org.apache.paimon.options.Options;
-import org.apache.paimon.reader.RecordReader;
-import org.apache.paimon.table.FileStoreTable;
-import org.apache.paimon.utils.CloseableIterator;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.TableScan;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.data.IcebergGenerics;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.data.parquet.GenericParquetReaders;
+import org.apache.iceberg.hadoop.HadoopCatalog;
+import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.parquet.Parquet;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.extension.RegisterExtension;
+import java.io.Closeable;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
import java.nio.file.Files;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.SortedSet;
+import java.util.TreeSet;
import static
com.alibaba.fluss.flink.tiering.source.TieringSourceOptions.POLL_TIERING_TABLE_INTERVAL;
+import static
com.alibaba.fluss.lake.iceberg.utils.IcebergConversions.toIceberg;
+import static com.alibaba.fluss.metadata.TableDescriptor.OFFSET_COLUMN_NAME;
import static com.alibaba.fluss.testutils.DataTestUtils.row;
import static com.alibaba.fluss.testutils.common.CommonTestUtils.retry;
-import static com.alibaba.fluss.testutils.common.CommonTestUtils.waitUntil;
import static com.alibaba.fluss.testutils.common.CommonTestUtils.waitValue;
+import static org.apache.iceberg.expressions.Expressions.equal;
import static org.assertj.core.api.Assertions.assertThat;
-/** Test base for sync to paimon by Flink. */
-public class FlinkPaimonTieringTestBase {
- protected static final String DEFAULT_DB = "fluss";
+/** Test base for tiering to Iceberg by Flink. */
+public class FlinkIcebergTieringTestBase {
@RegisterExtension
public static final FlussClusterExtension FLUSS_CLUSTER_EXTENSION =
@@ -85,31 +94,31 @@ public class FlinkPaimonTieringTestBase {
.setNumOfTabletServers(3)
.build();
- protected static final String CATALOG_NAME = "testcatalog";
protected StreamExecutionEnvironment execEnv;
protected static Connection conn;
protected static Admin admin;
protected static Configuration clientConf;
protected static String warehousePath;
- protected static Catalog paimonCatalog;
+ protected static Catalog icebergCatalog;
private static Configuration initConfig() {
Configuration conf = new Configuration();
conf.set(ConfigOptions.KV_SNAPSHOT_INTERVAL, Duration.ofSeconds(1))
- // not to clean snapshots for test purpose
.set(ConfigOptions.KV_MAX_RETAINED_SNAPSHOTS,
Integer.MAX_VALUE);
- conf.setString("datalake.format", "paimon");
- conf.setString("datalake.paimon.metastore", "filesystem");
+
+ // Configure the tiering sink to be Iceberg
+ conf.set(ConfigOptions.DATALAKE_FORMAT, DataLakeFormat.ICEBERG);
+ conf.setString("datalake.iceberg.type", "hadoop");
try {
warehousePath =
- Files.createTempDirectory("fluss-testing-datalake-tiered")
+ Files.createTempDirectory("fluss-testing-iceberg-tiered")
.resolve("warehouse")
.toString();
} catch (Exception e) {
- throw new FlussRuntimeException("Failed to create warehouse path");
+ throw new FlussRuntimeException("Failed to create Iceberg
warehouse path", e);
}
- conf.setString("datalake.paimon.warehouse", warehousePath);
+ conf.setString("datalake.iceberg.warehouse", warehousePath);
return conf;
}
@@ -118,33 +127,7 @@ public class FlinkPaimonTieringTestBase {
clientConf = FLUSS_CLUSTER_EXTENSION.getClientConfig();
conn = ConnectionFactory.createConnection(clientConf);
admin = conn.getAdmin();
- paimonCatalog = getPaimonCatalog();
- }
-
- @BeforeEach
- public void beforeEach() {
- execEnv = StreamExecutionEnvironment.getExecutionEnvironment();
- execEnv.setRuntimeMode(RuntimeExecutionMode.STREAMING);
- execEnv.setParallelism(2);
- execEnv = StreamExecutionEnvironment.getExecutionEnvironment();
- }
-
- protected JobClient buildTieringJob(StreamExecutionEnvironment execEnv)
throws Exception {
- Configuration flussConfig = new Configuration(clientConf);
- flussConfig.set(POLL_TIERING_TABLE_INTERVAL, Duration.ofMillis(500L));
- return LakeTieringJobBuilder.newBuilder(
- execEnv,
- flussConfig,
- Configuration.fromMap(getPaimonCatalogConf()),
- DataLakeFormat.PAIMON.toString())
- .build();
- }
-
- protected static Map<String, String> getPaimonCatalogConf() {
- Map<String, String> paimonConf = new HashMap<>();
- paimonConf.put("metastore", "filesystem");
- paimonConf.put("warehouse", warehousePath);
- return paimonConf;
+ icebergCatalog = getIcebergCatalog();
}
@AfterAll
@@ -157,106 +140,48 @@ public class FlinkPaimonTieringTestBase {
conn.close();
conn = null;
}
- }
-
- protected long createTable(TablePath tablePath, TableDescriptor
tableDescriptor)
- throws Exception {
- admin.createTable(tablePath, tableDescriptor, true).get();
- return admin.getTableInfo(tablePath).get().getTableId();
- }
-
- protected void waitUntilSnapshot(long tableId, int bucketNum, long
snapshotId) {
- for (int i = 0; i < bucketNum; i++) {
- TableBucket tableBucket = new TableBucket(tableId, i);
- FLUSS_CLUSTER_EXTENSION.waitUntilSnapshotFinished(tableBucket,
snapshotId);
- }
- }
-
- protected void writeRows(TablePath tablePath, List<InternalRow> rows,
boolean append)
- throws Exception {
- try (Table table = conn.getTable(tablePath)) {
- TableWriter tableWriter;
- if (append) {
- tableWriter = table.newAppend().createWriter();
- } else {
- tableWriter = table.newUpsert().createWriter();
- }
- for (InternalRow row : rows) {
- if (tableWriter instanceof AppendWriter) {
- ((AppendWriter) tableWriter).append(row);
- } else {
- ((UpsertWriter) tableWriter).upsert(row);
- }
- }
- tableWriter.flush();
+ if (icebergCatalog instanceof Closeable) {
+ ((Closeable) icebergCatalog).close();
+ icebergCatalog = null;
}
}
- protected Map<String, List<InternalRow>> writeRowsIntoPartitionedTable(
- TablePath tablePath,
- TableDescriptor tableDescriptor,
- Map<Long, String> partitionNameByIds)
- throws Exception {
- List<InternalRow> rows = new ArrayList<>();
- Map<String, List<InternalRow>> writtenRowsByPartition = new
HashMap<>();
- for (String partitionName : partitionNameByIds.values()) {
- List<InternalRow> partitionRows =
- Arrays.asList(
- row(11, "v1", partitionName),
- row(12, "v2", partitionName),
- row(13, "v3", partitionName));
- rows.addAll(partitionRows);
- writtenRowsByPartition.put(partitionName, partitionRows);
- }
-
- writeRows(tablePath, rows, !tableDescriptor.hasPrimaryKey());
- return writtenRowsByPartition;
+ @BeforeEach
+ public void beforeEach() {
+ execEnv = StreamExecutionEnvironment.getExecutionEnvironment();
+ execEnv.setRuntimeMode(RuntimeExecutionMode.STREAMING);
+ execEnv.setParallelism(2);
}
- /**
- * Wait until the default number of partitions is created. Return the map
from partition id to
- * partition name. .
- */
- public static Map<Long, String> waitUntilPartitions(
- ZooKeeperClient zooKeeperClient, TablePath tablePath) {
- return waitUntilPartitions(
- zooKeeperClient,
- tablePath,
-
ConfigOptions.TABLE_AUTO_PARTITION_NUM_PRECREATE.defaultValue());
+ protected JobClient buildTieringJob(StreamExecutionEnvironment execEnv)
throws Exception {
+ Configuration flussConfig = new Configuration(clientConf);
+ flussConfig.set(POLL_TIERING_TABLE_INTERVAL, Duration.ofMillis(500L));
+ return LakeTieringJobBuilder.newBuilder(
+ execEnv,
+ flussConfig,
+ Configuration.fromMap(getIcebergCatalogConf()),
+ DataLakeFormat.ICEBERG.toString())
+ .build();
}
- public static Map<Long, String> waitUntilPartitions(TablePath tablePath) {
- return waitUntilPartitions(
- FLUSS_CLUSTER_EXTENSION.getZooKeeperClient(),
- tablePath,
-
ConfigOptions.TABLE_AUTO_PARTITION_NUM_PRECREATE.defaultValue());
+ protected static Map<String, String> getIcebergCatalogConf() {
+ Map<String, String> icebergConf = new HashMap<>();
+ icebergConf.put("type", "hadoop");
+ icebergConf.put("warehouse", warehousePath);
+ return icebergConf;
}
- /**
- * Wait until the given number of partitions is created. Return the map
from partition id to
- * partition name.
- */
- public static Map<Long, String> waitUntilPartitions(
- ZooKeeperClient zooKeeperClient, TablePath tablePath, int
expectPartitions) {
- return waitValue(
- () -> {
- Map<Long, String> gotPartitions =
- zooKeeperClient.getPartitionIdAndNames(tablePath);
- return expectPartitions == gotPartitions.size()
- ? Optional.of(gotPartitions)
- : Optional.empty();
- },
- Duration.ofMinutes(1),
- String.format("expect %d table partition has not been
created", expectPartitions));
- }
-
- protected static Catalog getPaimonCatalog() {
- Map<String, String> catalogOptions = getPaimonCatalogConf();
- return
CatalogFactory.createCatalog(CatalogContext.create(Options.fromMap(catalogOptions)));
+ protected static Catalog getIcebergCatalog() {
+ HadoopCatalog catalog = new HadoopCatalog();
+ catalog.setConf(new org.apache.hadoop.conf.Configuration());
+ Map<String, String> properties = new HashMap<>();
+ properties.put("warehouse", warehousePath);
+ catalog.initialize("hadoop", properties);
+ return catalog;
}
- protected Replica getLeaderReplica(TableBucket tableBucket) {
- return FLUSS_CLUSTER_EXTENSION.waitAndGetLeaderReplica(tableBucket);
+ protected long createPkTable(TablePath tablePath) throws Exception {
+ return createPkTable(tablePath, 1);
}
protected long createLogTable(TablePath tablePath) throws Exception {
@@ -289,61 +214,6 @@ public class FlinkPaimonTieringTestBase {
return createTable(tablePath, tableBuilder.build());
}
- protected long createFullTypeLogTable(TablePath tablePath, int bucketNum,
boolean isPartitioned)
- throws Exception {
- Schema.Builder schemaBuilder =
- Schema.newBuilder()
- .column("f_boolean", DataTypes.BOOLEAN())
- .column("f_byte", DataTypes.TINYINT())
- .column("f_short", DataTypes.SMALLINT())
- .column("f_int", DataTypes.INT())
- .column("f_long", DataTypes.BIGINT())
- .column("f_float", DataTypes.FLOAT())
- .column("f_double", DataTypes.DOUBLE())
- .column("f_string", DataTypes.STRING())
- .column("f_decimal1", DataTypes.DECIMAL(5, 2))
- .column("f_decimal2", DataTypes.DECIMAL(20, 0))
- .column("f_timestamp_ltz1", DataTypes.TIMESTAMP_LTZ(3))
- .column("f_timestamp_ltz2", DataTypes.TIMESTAMP_LTZ(6))
- .column("f_timestamp_ntz1", DataTypes.TIMESTAMP(3))
- .column("f_timestamp_ntz2", DataTypes.TIMESTAMP(6))
- .column("f_binary", DataTypes.BINARY(4));
-
- TableDescriptor.Builder tableBuilder =
- TableDescriptor.builder()
- .distributedBy(bucketNum, "f_int")
- .property(ConfigOptions.TABLE_DATALAKE_ENABLED.key(),
"true")
- .property(ConfigOptions.TABLE_DATALAKE_FRESHNESS,
Duration.ofMillis(500));
-
- if (isPartitioned) {
- schemaBuilder.column("p", DataTypes.STRING());
- tableBuilder.property(ConfigOptions.TABLE_AUTO_PARTITION_ENABLED,
true);
- tableBuilder.partitionedBy("p");
- tableBuilder.property(
- ConfigOptions.TABLE_AUTO_PARTITION_TIME_UNIT,
AutoPartitionTimeUnit.YEAR);
- }
- tableBuilder.schema(schemaBuilder.build());
- return createTable(tablePath, tableBuilder.build());
- }
-
- protected long createPrimaryKeyTable(
- TablePath tablePath, int bucketNum, List<Schema.Column> columns)
throws Exception {
- Schema.Builder schemaBuilder =
-
Schema.newBuilder().fromColumns(columns).primaryKey(columns.get(0).getName());
-
- TableDescriptor.Builder tableBuilder =
- TableDescriptor.builder()
- .distributedBy(bucketNum)
- .property(ConfigOptions.TABLE_DATALAKE_ENABLED.key(),
"true")
- .property(ConfigOptions.TABLE_DATALAKE_FRESHNESS,
Duration.ofMillis(500));
- tableBuilder.schema(schemaBuilder.build());
- return createTable(tablePath, tableBuilder.build());
- }
-
- protected long createPkTable(TablePath tablePath) throws Exception {
- return createPkTable(tablePath, 1);
- }
-
protected long createPkTable(TablePath tablePath, int bucketNum) throws
Exception {
TableDescriptor table1Descriptor =
TableDescriptor.builder()
@@ -360,41 +230,10 @@ public class FlinkPaimonTieringTestBase {
return createTable(tablePath, table1Descriptor);
}
- protected void dropTable(TablePath tablePath) throws Exception {
- admin.dropTable(tablePath, false).get();
- Identifier tableIdentifier = toPaimonIdentifier(tablePath);
- try {
- paimonCatalog.dropTable(tableIdentifier, false);
- } catch (Catalog.TableNotExistException e) {
- // do nothing, table not exists
- }
- }
-
- private Identifier toPaimonIdentifier(TablePath tablePath) {
- return Identifier.create(tablePath.getDatabaseName(),
tablePath.getTableName());
- }
-
- protected void assertReplicaStatus(
- TablePath tablePath,
- long tableId,
- int bucketCount,
- boolean isPartitioned,
- Map<TableBucket, Long> expectedLogEndOffset) {
- if (isPartitioned) {
- Map<Long, String> partitionById =
-
waitUntilPartitions(FLUSS_CLUSTER_EXTENSION.getZooKeeperClient(), tablePath);
- for (Long partitionId : partitionById.keySet()) {
- for (int i = 0; i < bucketCount; i++) {
- TableBucket tableBucket = new TableBucket(tableId,
partitionId, i);
- assertReplicaStatus(tableBucket,
expectedLogEndOffset.get(tableBucket));
- }
- }
- } else {
- for (int i = 0; i < bucketCount; i++) {
- TableBucket tableBucket = new TableBucket(tableId, i);
- assertReplicaStatus(tableBucket,
expectedLogEndOffset.get(tableBucket));
- }
- }
+ protected long createTable(TablePath tablePath, TableDescriptor
tableDescriptor)
+ throws Exception {
+ admin.createTable(tablePath, tableDescriptor, true).get();
+ return admin.getTableInfo(tablePath).get().getTableId();
}
protected void assertReplicaStatus(TableBucket tb, long
expectedLogEndOffset) {
@@ -409,70 +248,219 @@ public class FlinkPaimonTieringTestBase {
});
}
- protected void waitUntilBucketSynced(
- TablePath tablePath, long tableId, int bucketCount, boolean
isPartition) {
- if (isPartition) {
- Map<Long, String> partitionById = waitUntilPartitions(tablePath);
- for (Long partitionId : partitionById.keySet()) {
- for (int i = 0; i < bucketCount; i++) {
- TableBucket tableBucket = new TableBucket(tableId,
partitionId, i);
- waitUntilBucketSynced(tableBucket);
- }
+ public static Map<Long, String> waitUntilPartitions(TablePath tablePath) {
+ return waitUntilPartitions(
+ FLUSS_CLUSTER_EXTENSION.getZooKeeperClient(),
+ tablePath,
+
ConfigOptions.TABLE_AUTO_PARTITION_NUM_PRECREATE.defaultValue());
+ }
+
+ /**
+ * Wait until the given number of partitions is created. Return the map
from partition id to
+ * partition name.
+ */
+ public static Map<Long, String> waitUntilPartitions(
+ ZooKeeperClient zooKeeperClient, TablePath tablePath, int
expectPartitions) {
+ return waitValue(
+ () -> {
+ Map<Long, String> gotPartitions =
+ zooKeeperClient.getPartitionIdAndNames(tablePath);
+ return expectPartitions == gotPartitions.size()
+ ? Optional.of(gotPartitions)
+ : Optional.empty();
+ },
+ Duration.ofMinutes(1),
+ String.format("expect %d table partition has not been
created", expectPartitions));
+ }
+
+ protected Replica getLeaderReplica(TableBucket tableBucket) {
+ return FLUSS_CLUSTER_EXTENSION.waitAndGetLeaderReplica(tableBucket);
+ }
+
+ protected void writeRows(TablePath tablePath, List<InternalRow> rows,
boolean append)
+ throws Exception {
+ try (Table table = conn.getTable(tablePath)) {
+ TableWriter tableWriter;
+ if (append) {
+ tableWriter = table.newAppend().createWriter();
+ } else {
+ tableWriter = table.newUpsert().createWriter();
}
- } else {
- for (int i = 0; i < bucketCount; i++) {
- TableBucket tableBucket = new TableBucket(tableId, i);
- waitUntilBucketSynced(tableBucket);
+ for (InternalRow row : rows) {
+ if (tableWriter instanceof AppendWriter) {
+ ((AppendWriter) tableWriter).append(row);
+ } else {
+ ((UpsertWriter) tableWriter).upsert(row);
+ }
}
+ tableWriter.flush();
}
}
- protected void waitUntilBucketSynced(TableBucket tb) {
- waitUntil(
- () -> {
- Replica replica = getLeaderReplica(tb);
- return replica.getLogTablet().getLakeTableSnapshotId() >=
0;
- },
- Duration.ofMinutes(2),
- "bucket " + tb + "not synced");
+ protected void waitUntilSnapshot(long tableId, int bucketNum, long
snapshotId) {
+ for (int i = 0; i < bucketNum; i++) {
+ TableBucket tableBucket = new TableBucket(tableId, i);
+ FLUSS_CLUSTER_EXTENSION.waitUntilSnapshotFinished(tableBucket,
snapshotId);
+ }
}
- protected void checkDataInPaimonPrimayKeyTable(
+ protected void checkDataInIcebergPrimaryKeyTable(
TablePath tablePath, List<InternalRow> expectedRows) throws
Exception {
- Iterator<org.apache.paimon.data.InternalRow> paimonRowIterator =
- getPaimonRowCloseableIterator(tablePath);
- for (InternalRow expectedRow : expectedRows) {
- org.apache.paimon.data.InternalRow row = paimonRowIterator.next();
- assertThat(row.getInt(0)).isEqualTo(expectedRow.getInt(0));
-
assertThat(row.getString(1).toString()).isEqualTo(expectedRow.getString(1).toString());
+ try (CloseableIterator<Record> records = getIcebergRows(tablePath)) {
+ for (InternalRow row : expectedRows) {
+ Record record = records.next();
+ assertThat(record.get(0)).isEqualTo(row.getInt(0));
+
assertThat(record.get(1)).isEqualTo(row.getString(1).toString());
+ }
+ assertThat(records.hasNext()).isFalse();
}
}
- protected CloseableIterator<org.apache.paimon.data.InternalRow>
getPaimonRowCloseableIterator(
- TablePath tablePath) throws Exception {
- Identifier tableIdentifier =
- Identifier.create(tablePath.getDatabaseName(),
tablePath.getTableName());
+ protected void checkDataInIcebergAppendOnlyTable(
+ TablePath tablePath, List<InternalRow> expectedRows, long
startingOffset)
+ throws Exception {
+ try (CloseableIterator<Record> records = getIcebergRows(tablePath)) {
+ Iterator<InternalRow> flussRowIterator = expectedRows.iterator();
+ while (records.hasNext()) {
+ Record actualRecord = records.next();
+ InternalRow flussRow = flussRowIterator.next();
+ assertThat(actualRecord.get(0)).isEqualTo(flussRow.getInt(0));
+
assertThat(actualRecord.get(1)).isEqualTo(flussRow.getString(1).toString());
+ // the idx 2 is __bucket, so use 3
+ assertThat(actualRecord.get(3)).isEqualTo(startingOffset++);
+ }
+ assertThat(flussRowIterator.hasNext()).isFalse();
+ }
+ }
- paimonCatalog = getPaimonCatalog();
+ protected void checkDataInIcebergAppendOnlyPartitionedTable(
+ TablePath tablePath,
+ Map<String, String> partitionSpec,
+ List<InternalRow> expectedRows,
+ long startingOffset)
+ throws Exception {
+ try (CloseableIterator<Record> records = getIcebergRows(tablePath,
partitionSpec)) {
+ Iterator<InternalRow> flussRowIterator = expectedRows.iterator();
+ while (records.hasNext()) {
+ Record actualRecord = records.next();
+ InternalRow flussRow = flussRowIterator.next();
+ assertThat(actualRecord.get(0)).isEqualTo(flussRow.getInt(0));
+
assertThat(actualRecord.get(1)).isEqualTo(flussRow.getString(1).toString());
+
assertThat(actualRecord.get(2)).isEqualTo(flussRow.getString(2).toString());
+ // the idx 3 is __bucket, so use 4
+ assertThat(actualRecord.get(4)).isEqualTo(startingOffset++);
+ }
+ assertThat(flussRowIterator.hasNext()).isFalse();
+ }
+ }
- FileStoreTable table = (FileStoreTable)
paimonCatalog.getTable(tableIdentifier);
+ private CloseableIterator<Record> getIcebergRows(TablePath tablePath) {
+ return getIcebergRows(tablePath, Collections.emptyMap());
+ }
+
+ @SuppressWarnings("resource")
+ private CloseableIterator<Record> getIcebergRows(
+ TablePath tablePath, Map<String, String> partitionSpec) {
+ org.apache.iceberg.Table table =
icebergCatalog.loadTable(toIceberg(tablePath));
+ // is primary key, we don't care about records order,
+ // use iceberg read api directly
+ if (!table.schema().identifierFieldIds().isEmpty()) {
+ IcebergGenerics.ScanBuilder scanBuilder =
+ filterByPartition(IcebergGenerics.read(table),
partitionSpec);
+ return scanBuilder.build().iterator();
+ } else {
+ // is log table, we want to compare __offset column
+ // so sort data files by __offset according to the column stats
+ List<Record> records = new ArrayList<>();
+ int fieldId =
table.schema().findField(OFFSET_COLUMN_NAME).fieldId();
+ SortedSet<DataFile> files =
+ new TreeSet<>(
+ (f1, f2) -> {
+ ByteBuffer buffer1 =
+ (ByteBuffer)
+ f1.lowerBounds()
+ .get(fieldId)
+
.order(ByteOrder.LITTLE_ENDIAN)
+ .rewind();
+ long offset1 = buffer1.getLong();
+ ByteBuffer buffer2 =
+ (ByteBuffer)
+ f2.lowerBounds()
+ .get(fieldId)
+
.order(ByteOrder.LITTLE_ENDIAN)
+ .rewind();
+ long offset2 = buffer2.getLong();
+ return Long.compare(offset1, offset2);
+ });
+
+ table.refresh();
+ TableScan tableScan = filterByPartition(table.newScan(),
partitionSpec);
+ tableScan
+ .includeColumnStats()
+ .planFiles()
+ .iterator()
+ .forEachRemaining(fileScanTask ->
files.add(fileScanTask.file()));
+
+ for (DataFile file : files) {
+ Iterable<Record> iterable =
+
Parquet.read(table.io().newInputFile(file.path().toString()))
+ .project(table.schema())
+ .createReaderFunc(
+ fileSchema ->
+
GenericParquetReaders.buildReader(
+ table.schema(),
fileSchema))
+ .build();
+ iterable.forEach(records::add);
+ }
+
+ return CloseableIterator.withClose(records.iterator());
+ }
+ }
+
+ private IcebergGenerics.ScanBuilder filterByPartition(
+ IcebergGenerics.ScanBuilder scanBuilder, Map<String, String>
partitionSpec) {
+ for (Map.Entry<String, String> partitionKeyAndValue :
partitionSpec.entrySet()) {
+ String partitionCol = partitionKeyAndValue.getKey();
+ String partitionValue = partitionKeyAndValue.getValue();
+ scanBuilder = scanBuilder.where(equal(partitionCol,
partitionValue));
+ }
+ return scanBuilder;
+ }
- RecordReader<org.apache.paimon.data.InternalRow> reader =
-
table.newRead().createReader(table.newReadBuilder().newScan().plan());
- return reader.toCloseableIterator();
+ private TableScan filterByPartition(TableScan tableScan, Map<String,
String> partitionSpec) {
+ for (Map.Entry<String, String> partitionKeyAndValue :
partitionSpec.entrySet()) {
+ String partitionCol = partitionKeyAndValue.getKey();
+ String partitionValue = partitionKeyAndValue.getValue();
+ tableScan = tableScan.filter(equal(partitionCol, partitionValue));
+ }
+ return tableScan;
}
- protected void checkSnapshotPropertyInPaimon(
+ protected void checkSnapshotPropertyInIceberg(
TablePath tablePath, Map<String, String> expectedProperties)
throws Exception {
- FileStoreTable table =
- (FileStoreTable)
- getPaimonCatalog()
- .getTable(
- Identifier.create(
- tablePath.getDatabaseName(),
- tablePath.getTableName()));
- Snapshot snapshot = table.snapshotManager().latestSnapshot();
- assertThat(snapshot).isNotNull();
- assertThat(snapshot.properties()).isEqualTo(expectedProperties);
+ org.apache.iceberg.Table table =
icebergCatalog.loadTable(toIceberg(tablePath));
+ Snapshot snapshot = table.currentSnapshot();
+
assertThat(snapshot.summary()).containsAllEntriesOf(expectedProperties);
+ }
+
+ protected Map<String, List<InternalRow>> writeRowsIntoPartitionedTable(
+ TablePath tablePath,
+ TableDescriptor tableDescriptor,
+ Map<Long, String> partitionNameByIds)
+ throws Exception {
+ List<InternalRow> rows = new ArrayList<>();
+ Map<String, List<InternalRow>> writtenRowsByPartition = new
HashMap<>();
+ for (String partitionName : partitionNameByIds.values()) {
+ List<InternalRow> partitionRows =
+ Arrays.asList(
+ row(11, "v1", partitionName),
+ row(12, "v2", partitionName),
+ row(13, "v3", partitionName));
+ rows.addAll(partitionRows);
+ writtenRowsByPartition.put(partitionName, partitionRows);
+ }
+
+ writeRows(tablePath, rows, !tableDescriptor.hasPrimaryKey());
+ return writtenRowsByPartition;
}
}
diff --git
a/fluss-lake/fluss-lake-iceberg/src/test/java/com/alibaba/fluss/lake/iceberg/tiering/IcebergTieringITCase.java
b/fluss-lake/fluss-lake-iceberg/src/test/java/com/alibaba/fluss/lake/iceberg/tiering/IcebergTieringITCase.java
new file mode 100644
index 000000000..0b3e5f2dd
--- /dev/null
+++
b/fluss-lake/fluss-lake-iceberg/src/test/java/com/alibaba/fluss/lake/iceberg/tiering/IcebergTieringITCase.java
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.lake.iceberg.tiering;
+
+import com.alibaba.fluss.config.AutoPartitionTimeUnit;
+import com.alibaba.fluss.config.ConfigOptions;
+import com.alibaba.fluss.lake.iceberg.testutils.FlinkIcebergTieringTestBase;
+import com.alibaba.fluss.metadata.Schema;
+import com.alibaba.fluss.metadata.TableBucket;
+import com.alibaba.fluss.metadata.TableDescriptor;
+import com.alibaba.fluss.metadata.TablePath;
+import com.alibaba.fluss.row.InternalRow;
+import com.alibaba.fluss.types.DataTypes;
+import com.alibaba.fluss.utils.types.Tuple2;
+
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static
com.alibaba.fluss.lake.committer.BucketOffset.FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY;
+import static com.alibaba.fluss.testutils.DataTestUtils.row;
+
+/** The ITCase for tiering into iceberg. */
+class IcebergTieringITCase extends FlinkIcebergTieringTestBase {
+
+ protected static final String DEFAULT_DB = "fluss";
+
+ private static StreamExecutionEnvironment execEnv;
+
+ @BeforeAll
+ protected static void beforeAll() {
+ FlinkIcebergTieringTestBase.beforeAll();
+ execEnv = StreamExecutionEnvironment.getExecutionEnvironment();
+ execEnv.setParallelism(2);
+ execEnv.enableCheckpointing(1000);
+ }
+
+ @Test
+ void testTiering() throws Exception {
+ // create a pk table, write some records and wait until snapshot
finished
+ TablePath t1 = TablePath.of(DEFAULT_DB, "pkTable");
+ long t1Id = createPkTable(t1);
+ TableBucket t1Bucket = new TableBucket(t1Id, 0);
+ // write records
+ List<InternalRow> rows = Arrays.asList(row(1, "v1"), row(2, "v2"),
row(3, "v3"));
+ writeRows(t1, rows, false);
+ waitUntilSnapshot(t1Id, 1, 0);
+
+ // then start tiering job
+ JobClient jobClient = buildTieringJob(execEnv);
+ try {
+ // check the status of replica after synced
+ assertReplicaStatus(t1Bucket, 3);
+
+ checkDataInIcebergPrimaryKeyTable(t1, rows);
+ // check snapshot property in iceberg
+ Map<String, String> properties =
+ new HashMap<String, String>() {
+ {
+ put(
+ FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY,
+ "[{\"bucket_id\":0,\"log_offset\":3}]");
+ }
+ };
+ checkSnapshotPropertyInIceberg(t1, properties);
+
+ // test log table
+ testLogTableTiering();
+
+ // then write data to the pk tables
+ // write records
+ rows = Arrays.asList(row(1, "v111"), row(2, "v222"), row(3,
"v333"));
+ // write records
+ writeRows(t1, rows, false);
+
+ // check the status of replica of t1 after synced
+ // not check start offset since we won't
+ // update start log offset for primary key table
+ // 3 initial + (3 deletes + 3 inserts) = 9
+ assertReplicaStatus(t1Bucket, 9);
+
+ checkDataInIcebergPrimaryKeyTable(t1, rows);
+
+ // then create partitioned table and wait partitions are ready
+ testPartitionedTableTiering();
+ } finally {
+ jobClient.cancel().get();
+ }
+ }
+
+ private Tuple2<Long, TableDescriptor> createPartitionedTable(TablePath
partitionedTablePath)
+ throws Exception {
+ TableDescriptor partitionedTableDescriptor =
+ TableDescriptor.builder()
+ .schema(
+ Schema.newBuilder()
+ .column("id", DataTypes.INT())
+ .column("name", DataTypes.STRING())
+ .column("date", DataTypes.STRING())
+ .build())
+ .partitionedBy("date")
+ .property(ConfigOptions.TABLE_AUTO_PARTITION_ENABLED,
true)
+ .property(
+ ConfigOptions.TABLE_AUTO_PARTITION_TIME_UNIT,
+ AutoPartitionTimeUnit.YEAR)
+ .property(ConfigOptions.TABLE_DATALAKE_ENABLED, true)
+ .property(ConfigOptions.TABLE_DATALAKE_FRESHNESS,
Duration.ofMillis(500))
+ .build();
+ return Tuple2.of(
+ createTable(partitionedTablePath, partitionedTableDescriptor),
+ partitionedTableDescriptor);
+ }
+
+ private void testLogTableTiering() throws Exception {
+ // then, create another log table
+ TablePath t2 = TablePath.of(DEFAULT_DB, "logTable");
+ long t2Id = createLogTable(t2);
+ TableBucket t2Bucket = new TableBucket(t2Id, 0);
+ List<InternalRow> flussRows = new ArrayList<>();
+ List<InternalRow> rows;
+ // write records
+ for (int i = 0; i < 10; i++) {
+ rows = Arrays.asList(row(1, "v1"), row(2, "v2"), row(3, "v3"));
+ flussRows.addAll(rows);
+ // write records
+ writeRows(t2, rows, true);
+ }
+ // check the status of replica after synced;
+ // note: we can't update log start offset for unaware bucket mode log
table
+ assertReplicaStatus(t2Bucket, 30);
+
+ // check data in iceberg
+ checkDataInIcebergAppendOnlyTable(t2, flussRows, 0);
+ }
+
+ private void testPartitionedTableTiering() throws Exception {
+ TablePath partitionedTablePath = TablePath.of(DEFAULT_DB,
"partitionedTable");
+ Tuple2<Long, TableDescriptor> tableIdAndDescriptor =
+ createPartitionedTable(partitionedTablePath);
+ Map<Long, String> partitionNameByIds =
waitUntilPartitions(partitionedTablePath);
+
+ // now, write rows into partitioned table
+ TableDescriptor partitionedTableDescriptor = tableIdAndDescriptor.f1;
+ Map<String, List<InternalRow>> writtenRowsByPartition =
+ writeRowsIntoPartitionedTable(
+ partitionedTablePath, partitionedTableDescriptor,
partitionNameByIds);
+ long tableId = tableIdAndDescriptor.f0;
+
+ // wait until synced to iceberg
+ for (Long partitionId : partitionNameByIds.keySet()) {
+ TableBucket tableBucket = new TableBucket(tableId, partitionId, 0);
+ assertReplicaStatus(tableBucket, 3);
+ }
+
+ // now, let's check data in iceberg per partition
+ // check data in iceberg
+ String partitionCol =
partitionedTableDescriptor.getPartitionKeys().get(0);
+ for (String partitionName : partitionNameByIds.values()) {
+ checkDataInIcebergAppendOnlyPartitionedTable(
+ partitionedTablePath,
+ Collections.singletonMap(partitionCol, partitionName),
+ writtenRowsByPartition.get(partitionName),
+ 0);
+ }
+
+ Map<String, String> properties =
+ new HashMap<String, String>() {
+ {
+ put(
+ FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY,
+ "["
+ +
"{\"partition_id\":0,\"bucket_id\":0,\"partition_name\":\"date=2025\",\"log_offset\":3},"
+ +
"{\"partition_id\":1,\"bucket_id\":0,\"partition_name\":\"date=2026\",\"log_offset\":3}"
+ + "]");
+ }
+ };
+
+ checkSnapshotPropertyInIceberg(partitionedTablePath, properties);
+ }
+}
diff --git
a/fluss-lake/fluss-lake-iceberg/src/test/java/com/alibaba/fluss/lake/iceberg/IcebergTieringTest.java
b/fluss-lake/fluss-lake-iceberg/src/test/java/com/alibaba/fluss/lake/iceberg/tiering/IcebergTieringTest.java
similarity index 95%
rename from
fluss-lake/fluss-lake-iceberg/src/test/java/com/alibaba/fluss/lake/iceberg/IcebergTieringTest.java
rename to
fluss-lake/fluss-lake-iceberg/src/test/java/com/alibaba/fluss/lake/iceberg/tiering/IcebergTieringTest.java
index 8a666b677..c55b834f7 100644
---
a/fluss-lake/fluss-lake-iceberg/src/test/java/com/alibaba/fluss/lake/iceberg/IcebergTieringTest.java
+++
b/fluss-lake/fluss-lake-iceberg/src/test/java/com/alibaba/fluss/lake/iceberg/tiering/IcebergTieringTest.java
@@ -1,12 +1,13 @@
/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -15,14 +16,10 @@
* limitations under the License.
*/
-package com.alibaba.fluss.lake.iceberg;
+package com.alibaba.fluss.lake.iceberg.tiering;
import com.alibaba.fluss.config.Configuration;
import com.alibaba.fluss.lake.committer.LakeCommitter;
-import com.alibaba.fluss.lake.iceberg.tiering.IcebergCatalogProvider;
-import com.alibaba.fluss.lake.iceberg.tiering.IcebergCommittable;
-import com.alibaba.fluss.lake.iceberg.tiering.IcebergLakeTieringFactory;
-import com.alibaba.fluss.lake.iceberg.tiering.IcebergWriteResult;
import com.alibaba.fluss.lake.serializer.SimpleVersionedSerializer;
import com.alibaba.fluss.lake.writer.LakeWriter;
import com.alibaba.fluss.lake.writer.WriterInitContext;
diff --git
a/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/testutils/FlinkPaimonTieringTestBase.java
b/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/testutils/FlinkPaimonTieringTestBase.java
index 8abb04d35..3dcfc826d 100644
---
a/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/testutils/FlinkPaimonTieringTestBase.java
+++
b/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/testutils/FlinkPaimonTieringTestBase.java
@@ -326,20 +326,6 @@ public class FlinkPaimonTieringTestBase {
return createTable(tablePath, tableBuilder.build());
}
- protected long createPrimaryKeyTable(
- TablePath tablePath, int bucketNum, List<Schema.Column> columns)
throws Exception {
- Schema.Builder schemaBuilder =
-
Schema.newBuilder().fromColumns(columns).primaryKey(columns.get(0).getName());
-
- TableDescriptor.Builder tableBuilder =
- TableDescriptor.builder()
- .distributedBy(bucketNum)
- .property(ConfigOptions.TABLE_DATALAKE_ENABLED.key(),
"true")
- .property(ConfigOptions.TABLE_DATALAKE_FRESHNESS,
Duration.ofMillis(500));
- tableBuilder.schema(schemaBuilder.build());
- return createTable(tablePath, tableBuilder.build());
- }
-
protected long createPkTable(TablePath tablePath) throws Exception {
return createPkTable(tablePath, 1);
}