This is an automated email from the ASF dual-hosted git repository.
yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new 184eb8a16 [flink] Support $lake table for iceberg (#1812)
184eb8a16 is described below
commit 184eb8a1667c5925d5f141e2c02441671a4804d1
Author: MehulBatra <[email protected]>
AuthorDate: Wed Oct 15 17:03:40 2025 +0530
[flink] Support $lake table for iceberg (#1812)
---------
Co-authored-by: Mehul Batra <[email protected]>
---
.../apache/fluss/flink/catalog/FlinkCatalog.java | 5 +-
.../fluss/flink/catalog/FlinkTableFactory.java | 15 +++-
.../org/apache/fluss/flink/lake/LakeCatalog.java | 32 +++++++--
.../apache/fluss/flink/lake/LakeTableFactory.java | 84 +++++++++++++++++++---
fluss-lake/fluss-lake-iceberg/pom.xml | 6 ++
.../flink/FlinkUnionReadPrimaryKeyTableITCase.java | 73 +++++++++++++++++++
6 files changed, 193 insertions(+), 22 deletions(-)
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java
index 6fa779387..3ecb01541 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java
@@ -135,7 +135,7 @@ public class FlinkCatalog extends AbstractCatalog {
@Override
public Optional<Factory> getFactory() {
- return Optional.of(new FlinkTableFactory());
+ return Optional.of(new FlinkTableFactory(lakeCatalog));
}
@Override
@@ -336,7 +336,8 @@ public class FlinkCatalog extends AbstractCatalog {
// should be pattern like table_name$lake
tableName = tableComponents[0];
} else {
- // be something like table_name$lake$snapshot
+ // pattern is table_name$lake$snapshots
+ // Need to reconstruct: table_name + $snapshots
tableName = String.join("", tableComponents);
}
return lakeCatalog
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkTableFactory.java
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkTableFactory.java
index 9ca8efae7..0b0a59981 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkTableFactory.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkTableFactory.java
@@ -20,6 +20,7 @@ package org.apache.fluss.flink.catalog;
import org.apache.fluss.config.ConfigOptions;
import org.apache.fluss.config.Configuration;
import org.apache.fluss.flink.FlinkConnectorOptions;
+import org.apache.fluss.flink.lake.LakeCatalog;
import org.apache.fluss.flink.lake.LakeTableFactory;
import org.apache.fluss.flink.sink.FlinkTableSink;
import org.apache.fluss.flink.source.FlinkTableSource;
@@ -68,17 +69,25 @@ import static
org.apache.fluss.flink.utils.FlinkConversions.toFlinkOption;
/** Factory to create table source and table sink for Fluss. */
public class FlinkTableFactory implements DynamicTableSourceFactory,
DynamicTableSinkFactory {
+ private final LakeCatalog lakeCatalog;
private volatile LakeTableFactory lakeTableFactory;
+ public FlinkTableFactory(LakeCatalog lakeCatalog) {
+ this.lakeCatalog = lakeCatalog;
+ }
+
@Override
public DynamicTableSource createDynamicTableSource(Context context) {
// check whether should read from datalake
ObjectIdentifier tableIdentifier = context.getObjectIdentifier();
String tableName = tableIdentifier.getObjectName();
if (tableName.contains(LAKE_TABLE_SPLITTER)) {
- tableName = tableName.substring(0,
tableName.indexOf(LAKE_TABLE_SPLITTER));
+ // Extract the lake table name: for "table$lake" -> "table"
+ // for "table$lake$snapshots" -> "table$snapshots"
+ String lakeTableName = tableName.replaceFirst("\\$lake", "");
+
lakeTableFactory = mayInitLakeTableFactory();
- return lakeTableFactory.createDynamicTableSource(context,
tableName);
+ return lakeTableFactory.createDynamicTableSource(context,
lakeTableName);
}
FactoryUtil.TableFactoryHelper helper =
FactoryUtil.createTableFactoryHelper(this, context);
@@ -248,7 +257,7 @@ public class FlinkTableFactory implements
DynamicTableSourceFactory, DynamicTabl
if (lakeTableFactory == null) {
synchronized (this) {
if (lakeTableFactory == null) {
- lakeTableFactory = new LakeTableFactory();
+ lakeTableFactory = new LakeTableFactory(lakeCatalog);
}
}
}
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeCatalog.java
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeCatalog.java
index 42f5e7008..e1f8096e6 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeCatalog.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeCatalog.java
@@ -50,13 +50,31 @@ public class LakeCatalog {
public Catalog getLakeCatalog(Configuration tableOptions) {
DataLakeFormat lakeFormat =
tableOptions.get(ConfigOptions.TABLE_DATALAKE_FORMAT);
- // TODO: Currently, a Fluss cluster only supports a single DataLake
storage.
- // However, in the
- // future, it may support multiple DataLakes. The following code
assumes
- // that a single
- // lakeCatalog is shared across multiple tables, which will no longer
be
- // valid in such
- // cases and should be updated accordingly.
+ if (lakeFormat == null) {
+ throw new IllegalArgumentException(
+ "DataLake format is not specified in table options. "
+ + "Please ensure '"
+ + ConfigOptions.TABLE_DATALAKE_FORMAT.key()
+ + "' is set.");
+ }
+ return LAKE_CATALOG_CACHE.computeIfAbsent(
+ lakeFormat,
+ (dataLakeFormat) -> {
+ if (dataLakeFormat == PAIMON) {
+ return PaimonCatalogFactory.create(catalogName,
tableOptions, classLoader);
+ } else if (dataLakeFormat == ICEBERG) {
+ return IcebergCatalogFactory.create(catalogName,
tableOptions);
+ } else {
+ throw new UnsupportedOperationException(
+ "Unsupported datalake format: " +
dataLakeFormat);
+ }
+ });
+ }
+
+ public Catalog getLakeCatalog(Configuration tableOptions, DataLakeFormat
lakeFormat) {
+ if (lakeFormat == null) {
+ throw new IllegalArgumentException("DataLake format cannot be
null");
+ }
return LAKE_CATALOG_CACHE.computeIfAbsent(
lakeFormat,
(dataLakeFormat) -> {
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeTableFactory.java
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeTableFactory.java
index 975d49e95..93120e9e2 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeTableFactory.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/lake/LakeTableFactory.java
@@ -20,37 +20,101 @@ package org.apache.fluss.flink.lake;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.factories.DynamicTableFactory;
+import org.apache.flink.table.factories.DynamicTableSourceFactory;
import org.apache.flink.table.factories.FactoryUtil;
-import org.apache.paimon.flink.FlinkTableFactory;
+
+import java.util.Map;
/** A factory to create {@link DynamicTableSource} for lake table. */
public class LakeTableFactory {
+ private final LakeCatalog lakeCatalog;
- // now, always assume is paimon, todo need to describe lake storage from
- // to know which lake storage used
- private final org.apache.paimon.flink.FlinkTableFactory
paimonFlinkTableFactory;
-
- public LakeTableFactory() {
- paimonFlinkTableFactory = new FlinkTableFactory();
+ public LakeTableFactory(LakeCatalog lakeCatalog) {
+ this.lakeCatalog = lakeCatalog;
}
public DynamicTableSource createDynamicTableSource(
DynamicTableFactory.Context context, String tableName) {
ObjectIdentifier originIdentifier = context.getObjectIdentifier();
- ObjectIdentifier paimonIdentifier =
+ ObjectIdentifier lakeIdentifier =
ObjectIdentifier.of(
originIdentifier.getCatalogName(),
originIdentifier.getDatabaseName(),
tableName);
+
+ // Determine the lake format from the table options
+ Map<String, String> tableOptions =
context.getCatalogTable().getOptions();
+
+ // If not present, fallback to 'fluss.table.datalake.format' (set by
Fluss)
+ String connector = tableOptions.get("connector");
+ if (connector == null) {
+ connector = tableOptions.get("fluss.table.datalake.format");
+ }
+
+ if (connector == null) {
+ // For Paimon system tables (like table_name$options), the table
options are empty
+ // Default to Paimon for backward compatibility
+ connector = "paimon";
+ }
+
+ // For Iceberg and Paimon, pass the table name as-is to their factory.
+ // Metadata tables will be handled internally by their respective
factories.
DynamicTableFactory.Context newContext =
new FactoryUtil.DefaultDynamicTableContext(
- paimonIdentifier,
+ lakeIdentifier,
context.getCatalogTable(),
context.getEnrichmentOptions(),
context.getConfiguration(),
context.getClassLoader(),
context.isTemporary());
- return paimonFlinkTableFactory.createDynamicTableSource(newContext);
+ // Get the appropriate factory based on connector type
+ DynamicTableSourceFactory factory = getLakeTableFactory(connector,
tableOptions);
+ return factory.createDynamicTableSource(newContext);
+ }
+
+ private DynamicTableSourceFactory getLakeTableFactory(
+ String connector, Map<String, String> tableOptions) {
+ if ("paimon".equalsIgnoreCase(connector)) {
+ return getPaimonFactory();
+ } else if ("iceberg".equalsIgnoreCase(connector)) {
+ return getIcebergFactory(tableOptions);
+ } else {
+ throw new UnsupportedOperationException(
+ "Unsupported lake connector: "
+ + connector
+ + ". Only 'paimon' and 'iceberg' are supported.");
+ }
+ }
+
+ private DynamicTableSourceFactory getPaimonFactory() {
+ return new org.apache.paimon.flink.FlinkTableFactory();
+ }
+
+ private DynamicTableSourceFactory getIcebergFactory(Map<String, String>
tableOptions) {
+ try {
+ // Get the Iceberg FlinkCatalog instance from LakeCatalog
+ org.apache.fluss.config.Configuration flussConfig =
+
org.apache.fluss.config.Configuration.fromMap(tableOptions);
+
+ // Get catalog with explicit ICEBERG format
+ org.apache.flink.table.catalog.Catalog catalog =
+ lakeCatalog.getLakeCatalog(
+ flussConfig,
org.apache.fluss.metadata.DataLakeFormat.ICEBERG);
+
+ // Create FlinkDynamicTableFactory with the catalog
+ Class<?> icebergFactoryClass =
+
Class.forName("org.apache.iceberg.flink.FlinkDynamicTableFactory");
+ Class<?> flinkCatalogClass =
Class.forName("org.apache.iceberg.flink.FlinkCatalog");
+
+ return (DynamicTableSourceFactory)
+ icebergFactoryClass
+ .getDeclaredConstructor(flinkCatalogClass)
+ .newInstance(catalog);
+ } catch (Exception e) {
+ throw new RuntimeException(
+ "Failed to create Iceberg table factory. Please ensure
iceberg-flink-runtime is on the classpath.",
+ e);
+ }
}
}
diff --git a/fluss-lake/fluss-lake-iceberg/pom.xml
b/fluss-lake/fluss-lake-iceberg/pom.xml
index f1d195029..94bca76b1 100644
--- a/fluss-lake/fluss-lake-iceberg/pom.xml
+++ b/fluss-lake/fluss-lake-iceberg/pom.xml
@@ -242,6 +242,12 @@
<version>${iceberg.version}</version>
<scope>test</scope>
</dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-connector-files</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
diff --git
a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/flink/FlinkUnionReadPrimaryKeyTableITCase.java
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/flink/FlinkUnionReadPrimaryKeyTableITCase.java
index 603947f5b..0ff38190c 100644
---
a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/flink/FlinkUnionReadPrimaryKeyTableITCase.java
+++
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/flink/FlinkUnionReadPrimaryKeyTableITCase.java
@@ -32,9 +32,11 @@ import org.apache.fluss.server.replica.Replica;
import org.apache.fluss.types.DataTypes;
import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.table.api.TableResult;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.CloseableIterator;
+import org.apache.flink.util.CollectionUtil;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
@@ -51,6 +53,7 @@ import java.util.Map;
import static
org.apache.fluss.flink.source.testutils.FlinkRowAssertionsUtils.assertResultsExactOrder;
import static
org.apache.fluss.flink.source.testutils.FlinkRowAssertionsUtils.assertRowResultsIgnoreOrder;
import static org.apache.fluss.testutils.DataTestUtils.row;
+import static org.assertj.core.api.Assertions.assertThat;
/** Test case for union read primary key table. */
public class FlinkUnionReadPrimaryKeyTableITCase extends
FlinkUnionReadTestBase {
@@ -273,6 +276,76 @@ public class FlinkUnionReadPrimaryKeyTableITCase extends
FlinkUnionReadTestBase
}
}
+ @ParameterizedTest
+ @ValueSource(booleans = {false, true})
+ void testReadIcebergLakeTableAndSystemTable(boolean isPartitioned) throws
Exception {
+ // first of all, start tiering
+ JobClient jobClient = buildTieringJob(execEnv);
+
+ String tableName = "lake_pk_table_" + (isPartitioned ? "partitioned" :
"non_partitioned");
+
+ TablePath t1 = TablePath.of(DEFAULT_DB, tableName);
+ Map<TableBucket, Long> bucketLogEndOffset = new HashMap<>();
+ // create table & write initial data
+ long tableId =
+ preparePKTableFullType(t1, DEFAULT_BUCKET_NUM, isPartitioned,
bucketLogEndOffset);
+
+ // wait until records have been synced to Iceberg
+ waitUntilBucketSynced(t1, tableId, DEFAULT_BUCKET_NUM, isPartitioned);
+
+ // Test 1: Read Iceberg lake table directly using $lake suffix
+ TableResult lakeTableResult =
+ batchTEnv.executeSql(String.format("select * from %s$lake",
tableName));
+ List<Row> icebergRows =
CollectionUtil.iteratorToList(lakeTableResult.collect());
+
+ // Verify that we can read data from Iceberg via $lake suffix
+ assertThat(icebergRows).isNotEmpty();
+
+ // Note: The expected row count should be based on how many rows were
written
+ // In preparePKTableFullType, we write 2 unique rows (by PK) per
iteration, 2 iterations
+ // Since this is a primary key table, duplicate PKs are deduplicated,
so only 2 unique rows
+ // per partition
+ int expectedUserRowCount = isPartitioned ? 2 *
waitUntilPartitions(t1).size() : 2;
+ assertThat(icebergRows).hasSize(expectedUserRowCount);
+
+ // verify rows have expected number of columns
+ int userColumnCount =
lakeTableResult.getResolvedSchema().getColumnCount();
+ Row firstRow = icebergRows.get(0);
+ assertThat(firstRow.getArity())
+ .as("Iceberg row should have at least user columns")
+ .isGreaterThanOrEqualTo(userColumnCount);
+
+ // Test 2: Read Iceberg system table (snapshots) using $lake$snapshots
suffix
+ TableResult snapshotsResult =
+ batchTEnv.executeSql(String.format("select * from
%s$lake$snapshots", tableName));
+ List<Row> snapshotRows =
CollectionUtil.iteratorToList(snapshotsResult.collect());
+
+ // Verify that we can read snapshots from Iceberg via $lake$snapshots
suffix
+ assertThat(snapshotRows).as("Should have at least one
snapshot").isNotEmpty();
+
+ // Verify snapshot structure based on Iceberg snapshots table schema
+ // Expected columns: committed_at, snapshot_id, parent_id, operation,
manifest_list, summary
+ Row firstSnapshot = snapshotRows.get(0);
+ assertThat(firstSnapshot.getArity()).as("Snapshot row should have 6
columns").isEqualTo(6);
+
+ // Verify committed_at field (index 0) is not null
+ assertThat(firstSnapshot.getField(0)).as("committed_at should not be
null").isNotNull();
+
+ // Verify snapshot_id field (index 1) is not null
+ assertThat(firstSnapshot.getField(1)).as("snapshot_id should not be
null").isNotNull();
+
+ // Verify manifest_list field (index 4) is not null and is a string
path
+ assertThat(firstSnapshot.getField(4))
+ .as("manifest_list should be a non-null path")
+ .isNotNull()
+ .isInstanceOf(String.class);
+
+ // Verify summary field (index 5) contains expected metadata
+ assertThat(firstSnapshot.getField(5)).as("summary should not be
null").isNotNull();
+
+ jobClient.cancel().get();
+ }
+
private void writeFullTypeRow(TablePath tablePath, String partition)
throws Exception {
List<InternalRow> rows =
Collections.singletonList(