This is an automated email from the ASF dual-hosted git repository.
yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new ac1569c0e [lake/iceberg] Support Log Table in IcebergLakeCatalog
(#1508)
ac1569c0e is described below
commit ac1569c0ed0959a160440a73268f76c9c37eecec
Author: SeungMin <[email protected]>
AuthorDate: Wed Aug 13 10:45:09 2025 +0900
[lake/iceberg] Support Log Table in IcebergLakeCatalog (#1508)
---
.../fluss/lake/iceberg/IcebergLakeCatalog.java | 77 +++++----
.../fluss/lake/iceberg/IcebergLakeCatalogTest.java | 178 +++++++++++++++++++++
2 files changed, 227 insertions(+), 28 deletions(-)
diff --git
a/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/IcebergLakeCatalog.java
b/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/IcebergLakeCatalog.java
index a61bc1ce3..2ec64e6d0 100644
---
a/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/IcebergLakeCatalog.java
+++
b/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/IcebergLakeCatalog.java
@@ -50,7 +50,7 @@ import static
com.alibaba.fluss.metadata.TableDescriptor.OFFSET_COLUMN_NAME;
import static com.alibaba.fluss.metadata.TableDescriptor.TIMESTAMP_COLUMN_NAME;
import static org.apache.iceberg.CatalogUtil.loadCatalog;
-/** A Iceberg implementation of {@link LakeCatalog}. */
+/** An Iceberg implementation of {@link LakeCatalog}. */
public class IcebergLakeCatalog implements LakeCatalog {
private static final LinkedHashMap<String, Type> SYSTEM_COLUMNS = new
LinkedHashMap<>();
@@ -101,18 +101,16 @@ public class IcebergLakeCatalog implements LakeCatalog {
@Override
public void createTable(TablePath tablePath, TableDescriptor
tableDescriptor)
throws TableAlreadyExistException {
- if (!tableDescriptor.hasPrimaryKey()) {
- throw new UnsupportedOperationException(
- "Iceberg integration currently supports only primary key
tables.");
- }
// convert Fluss table path to iceberg table
+ boolean isPkTable = tableDescriptor.hasPrimaryKey();
TableIdentifier icebergId = toIcebergTableIdentifier(tablePath);
- Schema icebergSchema = convertToIcebergSchema(tableDescriptor);
+ Schema icebergSchema = convertToIcebergSchema(tableDescriptor,
isPkTable);
Catalog.TableBuilder tableBuilder =
icebergCatalog.buildTable(icebergId, icebergSchema);
- PartitionSpec partitionSpec = createPartitionSpec(tableDescriptor,
icebergSchema);
+ PartitionSpec partitionSpec =
+ createPartitionSpec(tableDescriptor, icebergSchema, isPkTable);
SortOrder sortOrder = createSortOrder(icebergSchema);
- tableBuilder.withProperties(buildTableProperties(tableDescriptor));
+ tableBuilder.withProperties(buildTableProperties(tableDescriptor,
isPkTable));
tableBuilder.withPartitionSpec(partitionSpec);
tableBuilder.withSortOrder(sortOrder);
try {
@@ -145,10 +143,11 @@ public class IcebergLakeCatalog implements LakeCatalog {
}
}
- public Schema convertToIcebergSchema(TableDescriptor tableDescriptor) {
+ public Schema convertToIcebergSchema(TableDescriptor tableDescriptor,
boolean isPkTable) {
List<Types.NestedField> fields = new ArrayList<>();
- int fieldId = 1;
+ int fieldId = 0;
+ // general columns
for (com.alibaba.fluss.metadata.Schema.Column column :
tableDescriptor.getSchema().getColumns()) {
String colName = column.getName();
@@ -176,24 +175,29 @@ public class IcebergLakeCatalog implements LakeCatalog {
}
fields.add(field);
}
+
+ // system columns
for (Map.Entry<String, Type> systemColumn : SYSTEM_COLUMNS.entrySet())
{
fields.add(
Types.NestedField.required(
fieldId++, systemColumn.getKey(),
systemColumn.getValue()));
}
- // set identifier fields
- int[] primaryKeyIndexes =
tableDescriptor.getSchema().getPrimaryKeyIndexes();
- Set<Integer> identifierFieldIds = new HashSet<>();
- for (int i = 0; i < primaryKeyIndexes.length; i++) {
- identifierFieldIds.add(fields.get(i).fieldId());
+ if (isPkTable) {
+ // set identifier fields
+ int[] primaryKeyIndexes =
tableDescriptor.getSchema().getPrimaryKeyIndexes();
+ Set<Integer> identifierFieldIds = new HashSet<>();
+ for (int pkIdx : primaryKeyIndexes) {
+ identifierFieldIds.add(fields.get(pkIdx).fieldId());
+ }
+ return new Schema(fields, identifierFieldIds);
+ } else {
+ return new Schema(fields);
}
- return new Schema(fields, identifierFieldIds);
}
private PartitionSpec createPartitionSpec(
- TableDescriptor tableDescriptor, Schema icebergSchema) {
- // Only PK tables supported for now
+ TableDescriptor tableDescriptor, Schema icebergSchema, boolean
isPkTable) {
List<String> bucketKeys = tableDescriptor.getBucketKeys();
int bucketCount =
tableDescriptor
@@ -204,21 +208,35 @@ public class IcebergLakeCatalog implements LakeCatalog {
new IllegalArgumentException(
"Bucket count (bucket.num)
must be set"));
- if (bucketKeys.isEmpty()) {
- throw new IllegalArgumentException(
- "Bucket key must be set for primary key Iceberg tables");
- }
+ // Only support one bucket key for now
if (bucketKeys.size() > 1) {
throw new UnsupportedOperationException(
"Only one bucket key is supported for Iceberg at the
moment");
}
+ // pk table must have bucket key
+ if (bucketKeys.isEmpty() && isPkTable) {
+ throw new IllegalArgumentException(
+ "Bucket key must be set for primary key Iceberg tables");
+ }
+
PartitionSpec.Builder builder =
PartitionSpec.builderFor(icebergSchema);
List<String> partitionKeys = tableDescriptor.getPartitionKeys();
+ // always set identity partition with partition key
for (String partitionKey : partitionKeys) {
builder.identity(partitionKey);
}
- builder.bucket(bucketKeys.get(0), bucketCount);
+
+ if (isPkTable) {
+ builder.bucket(bucketKeys.get(0), bucketCount);
+ } else {
+ // if there is no bucket keys, use identity(__bucket)
+ if (bucketKeys.isEmpty()) {
+ builder.identity(BUCKET_COLUMN_NAME);
+ } else {
+ builder.bucket(bucketKeys.get(0), bucketCount);
+ }
+ }
return builder.build();
}
@@ -252,13 +270,16 @@ public class IcebergLakeCatalog implements LakeCatalog {
return builder.build();
}
- private Map<String, String> buildTableProperties(TableDescriptor
tableDescriptor) {
+ private Map<String, String> buildTableProperties(
+ TableDescriptor tableDescriptor, boolean isPkTable) {
Map<String, String> icebergProperties = new HashMap<>();
- // MOR table properties for streaming workloads
- icebergProperties.put("write.delete.mode", "merge-on-read");
- icebergProperties.put("write.update.mode", "merge-on-read");
- icebergProperties.put("write.merge.mode", "merge-on-read");
+ if (isPkTable) {
+ // MOR table properties for streaming workloads
+ icebergProperties.put("write.delete.mode", "merge-on-read");
+ icebergProperties.put("write.update.mode", "merge-on-read");
+ icebergProperties.put("write.merge.mode", "merge-on-read");
+ }
tableDescriptor
.getProperties()
diff --git
a/fluss-lake/fluss-lake-iceberg/src/test/java/com/alibaba/fluss/lake/iceberg/IcebergLakeCatalogTest.java
b/fluss-lake/fluss-lake-iceberg/src/test/java/com/alibaba/fluss/lake/iceberg/IcebergLakeCatalogTest.java
index feeb23a26..5ce41175c 100644
---
a/fluss-lake/fluss-lake-iceberg/src/test/java/com/alibaba/fluss/lake/iceberg/IcebergLakeCatalogTest.java
+++
b/fluss-lake/fluss-lake-iceberg/src/test/java/com/alibaba/fluss/lake/iceberg/IcebergLakeCatalogTest.java
@@ -61,6 +61,37 @@ class IcebergLakeCatalogTest {
this.flussIcebergCatalog = new IcebergLakeCatalog(configuration);
}
+ /** Verify property prefix rewriting. */
+ @Test
+ void testPropertyPrefixRewriting() {
+ String database = "test_db";
+ String tableName = "test_table";
+
+ Schema flussSchema = Schema.newBuilder().column("id",
DataTypes.BIGINT()).build();
+
+ TableDescriptor tableDescriptor =
+ TableDescriptor.builder()
+ .schema(flussSchema)
+ .distributedBy(3)
+ .property("iceberg.commit.retry.num-retries", "5")
+ .property("table.datalake.freshness", "30s")
+ .build();
+
+ TablePath tablePath = TablePath.of(database, tableName);
+ flussIcebergCatalog.createTable(tablePath, tableDescriptor);
+
+ Table created =
+ flussIcebergCatalog
+ .getIcebergCatalog()
+ .loadTable(TableIdentifier.of(database, tableName));
+
+ // Verify property prefix rewriting
+
assertThat(created.properties()).containsEntry("commit.retry.num-retries", "5");
+
assertThat(created.properties()).containsEntry("fluss.table.datalake.freshness",
"30s");
+ assertThat(created.properties())
+ .doesNotContainKeys("iceberg.commit.retry.num-retries",
"table.datalake.freshness");
+ }
+
@Test
void testCreatePrimaryKeyTable() {
String database = "test_db";
@@ -217,4 +248,151 @@ class IcebergLakeCatalogTest {
.isInstanceOf(UnsupportedOperationException.class)
.hasMessageContaining("Only one bucket key is supported for
Iceberg");
}
+
+ @Test
+ void testCreateLogTable() {
+ String database = "test_db";
+ String tableName = "log_table";
+
+ Schema flussSchema =
+ Schema.newBuilder()
+ .column("id", DataTypes.BIGINT())
+ .column("name", DataTypes.STRING())
+ .column("amount", DataTypes.INT())
+ .column("address", DataTypes.STRING())
+ .build();
+
+ TableDescriptor td =
+ TableDescriptor.builder()
+ .schema(flussSchema)
+ .distributedBy(3) // no bucket key
+ .build();
+
+ TablePath tablePath = TablePath.of(database, tableName);
+ flussIcebergCatalog.createTable(tablePath, td);
+
+ TableIdentifier tableId = TableIdentifier.of(database, tableName);
+ Table createdTable =
flussIcebergCatalog.getIcebergCatalog().loadTable(tableId);
+
+ org.apache.iceberg.Schema expectIcebergSchema =
+ new org.apache.iceberg.Schema(
+ Arrays.asList(
+ Types.NestedField.optional(1, "id",
Types.LongType.get()),
+ Types.NestedField.optional(2, "name",
Types.StringType.get()),
+ Types.NestedField.optional(3, "amount",
Types.IntegerType.get()),
+ Types.NestedField.optional(4, "address",
Types.StringType.get()),
+ Types.NestedField.required(
+ 5, BUCKET_COLUMN_NAME,
Types.IntegerType.get()),
+ Types.NestedField.required(
+ 6, OFFSET_COLUMN_NAME,
Types.LongType.get()),
+ Types.NestedField.required(
+ 7, TIMESTAMP_COLUMN_NAME,
Types.TimestampType.withZone())));
+
+ // Verify iceberg table schema
+
assertThat(createdTable.schema().toString()).isEqualTo(expectIcebergSchema.toString());
+
+ // Verify partition field and transform
+ assertThat(createdTable.spec().fields()).hasSize(1);
+ PartitionField partitionField = createdTable.spec().fields().get(0);
+ assertThat(partitionField.name()).isEqualTo(BUCKET_COLUMN_NAME);
+
assertThat(partitionField.transform().toString()).isEqualTo("identity");
+
+ // Verify sort field and order
+ assertThat(createdTable.sortOrder().fields()).hasSize(1);
+ SortField sortField = createdTable.sortOrder().fields().get(0);
+ assertThat(sortField.sourceId())
+
.isEqualTo(createdTable.schema().findField(OFFSET_COLUMN_NAME).fieldId());
+ assertThat(sortField.direction()).isEqualTo(SortDirection.ASC);
+ }
+
+ @Test
+ void testCreatePartitionedLogTable() {
+ String database = "test_db";
+ String tableName = "partitioned_log_table";
+
+ Schema flussSchema =
+ Schema.newBuilder()
+ .column("id", DataTypes.BIGINT())
+ .column("name", DataTypes.STRING())
+ .column("amount", DataTypes.INT())
+ .column("order_type", DataTypes.STRING())
+ .build();
+
+ TableDescriptor td =
+ TableDescriptor.builder()
+ .schema(flussSchema)
+ .distributedBy(3)
+ .partitionedBy("order_type")
+ .build();
+
+ TablePath path = TablePath.of(database, tableName);
+ flussIcebergCatalog.createTable(path, td);
+
+ Table createdTable =
+ flussIcebergCatalog
+ .getIcebergCatalog()
+ .loadTable(TableIdentifier.of(database, tableName));
+
+ org.apache.iceberg.Schema expectIcebergSchema =
+ new org.apache.iceberg.Schema(
+ Arrays.asList(
+ Types.NestedField.optional(1, "id",
Types.LongType.get()),
+ Types.NestedField.optional(2, "name",
Types.StringType.get()),
+ Types.NestedField.optional(3, "amount",
Types.IntegerType.get()),
+ Types.NestedField.optional(4, "order_type",
Types.StringType.get()),
+ Types.NestedField.required(
+ 5, BUCKET_COLUMN_NAME,
Types.IntegerType.get()),
+ Types.NestedField.required(
+ 6, OFFSET_COLUMN_NAME,
Types.LongType.get()),
+ Types.NestedField.required(
+ 7, TIMESTAMP_COLUMN_NAME,
Types.TimestampType.withZone())));
+
+ // Verify iceberg table schema
+
assertThat(createdTable.schema().toString()).isEqualTo(expectIcebergSchema.toString());
+
+ // Verify partition field and transform
+ assertThat(createdTable.spec().fields()).hasSize(2);
+ PartitionField firstPartitionField =
createdTable.spec().fields().get(0);
+ assertThat(firstPartitionField.name()).isEqualTo("order_type");
+
assertThat(firstPartitionField.transform().toString()).isEqualTo("identity");
+
+ PartitionField secondPartitionField =
createdTable.spec().fields().get(1);
+ assertThat(secondPartitionField.name()).isEqualTo(BUCKET_COLUMN_NAME);
+
assertThat(secondPartitionField.transform().toString()).isEqualTo("identity");
+
+ // Verify sort field and order
+ assertThat(createdTable.sortOrder().fields()).hasSize(1);
+ SortField sortField = createdTable.sortOrder().fields().get(0);
+ assertThat(sortField.sourceId())
+
.isEqualTo(createdTable.schema().findField(OFFSET_COLUMN_NAME).fieldId());
+ assertThat(sortField.direction()).isEqualTo(SortDirection.ASC);
+ }
+
+ @Test
+ void rejectsLogTableWithMultipleBucketKeys() {
+ String database = "test_db";
+ String tableName = "multi_bucket_log_table";
+
+ Schema flussSchema =
+ Schema.newBuilder()
+ .column("id", DataTypes.BIGINT())
+ .column("name", DataTypes.STRING())
+ .column("amount", DataTypes.INT())
+ .column("user_type", DataTypes.STRING())
+ .column("order_type", DataTypes.STRING())
+ .build();
+
+ TableDescriptor tableDescriptor =
+ TableDescriptor.builder()
+ .schema(flussSchema)
+ .distributedBy(3, "user_type", "order_type")
+ .build();
+
+ TablePath tablePath = TablePath.of(database, tableName);
+
+ // Do not allow multiple bucket keys for log table
+ assertThatThrownBy(() -> flussIcebergCatalog.createTable(tablePath,
tableDescriptor))
+ .isInstanceOf(UnsupportedOperationException.class)
+ .hasMessageContaining("Only one bucket key is supported for
Iceberg");
+ }
}