This is an automated email from the ASF dual-hosted git repository.

yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new ac1569c0e [lake/iceberg] Support Log Table in IcebergLakeCatalog 
(#1508)
ac1569c0e is described below

commit ac1569c0ed0959a160440a73268f76c9c37eecec
Author: SeungMin <[email protected]>
AuthorDate: Wed Aug 13 10:45:09 2025 +0900

    [lake/iceberg] Support Log Table in IcebergLakeCatalog (#1508)
---
 .../fluss/lake/iceberg/IcebergLakeCatalog.java     |  77 +++++----
 .../fluss/lake/iceberg/IcebergLakeCatalogTest.java | 178 +++++++++++++++++++++
 2 files changed, 227 insertions(+), 28 deletions(-)

diff --git 
a/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/IcebergLakeCatalog.java
 
b/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/IcebergLakeCatalog.java
index a61bc1ce3..2ec64e6d0 100644
--- 
a/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/IcebergLakeCatalog.java
+++ 
b/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/IcebergLakeCatalog.java
@@ -50,7 +50,7 @@ import static 
com.alibaba.fluss.metadata.TableDescriptor.OFFSET_COLUMN_NAME;
 import static com.alibaba.fluss.metadata.TableDescriptor.TIMESTAMP_COLUMN_NAME;
 import static org.apache.iceberg.CatalogUtil.loadCatalog;
 
-/** A Iceberg implementation of {@link LakeCatalog}. */
+/** An Iceberg implementation of {@link LakeCatalog}. */
 public class IcebergLakeCatalog implements LakeCatalog {
 
     private static final LinkedHashMap<String, Type> SYSTEM_COLUMNS = new 
LinkedHashMap<>();
@@ -101,18 +101,16 @@ public class IcebergLakeCatalog implements LakeCatalog {
     @Override
     public void createTable(TablePath tablePath, TableDescriptor 
tableDescriptor)
             throws TableAlreadyExistException {
-        if (!tableDescriptor.hasPrimaryKey()) {
-            throw new UnsupportedOperationException(
-                    "Iceberg integration currently supports only primary key 
tables.");
-        }
         // convert Fluss table path to iceberg table
+        boolean isPkTable = tableDescriptor.hasPrimaryKey();
         TableIdentifier icebergId = toIcebergTableIdentifier(tablePath);
-        Schema icebergSchema = convertToIcebergSchema(tableDescriptor);
+        Schema icebergSchema = convertToIcebergSchema(tableDescriptor, 
isPkTable);
         Catalog.TableBuilder tableBuilder = 
icebergCatalog.buildTable(icebergId, icebergSchema);
 
-        PartitionSpec partitionSpec = createPartitionSpec(tableDescriptor, 
icebergSchema);
+        PartitionSpec partitionSpec =
+                createPartitionSpec(tableDescriptor, icebergSchema, isPkTable);
         SortOrder sortOrder = createSortOrder(icebergSchema);
-        tableBuilder.withProperties(buildTableProperties(tableDescriptor));
+        tableBuilder.withProperties(buildTableProperties(tableDescriptor, 
isPkTable));
         tableBuilder.withPartitionSpec(partitionSpec);
         tableBuilder.withSortOrder(sortOrder);
         try {
@@ -145,10 +143,11 @@ public class IcebergLakeCatalog implements LakeCatalog {
         }
     }
 
-    public Schema convertToIcebergSchema(TableDescriptor tableDescriptor) {
+    public Schema convertToIcebergSchema(TableDescriptor tableDescriptor, 
boolean isPkTable) {
         List<Types.NestedField> fields = new ArrayList<>();
-        int fieldId = 1;
+        int fieldId = 0;
 
+        // general columns
         for (com.alibaba.fluss.metadata.Schema.Column column :
                 tableDescriptor.getSchema().getColumns()) {
             String colName = column.getName();
@@ -176,24 +175,29 @@ public class IcebergLakeCatalog implements LakeCatalog {
             }
             fields.add(field);
         }
+
+        // system columns
         for (Map.Entry<String, Type> systemColumn : SYSTEM_COLUMNS.entrySet()) 
{
             fields.add(
                     Types.NestedField.required(
                             fieldId++, systemColumn.getKey(), 
systemColumn.getValue()));
         }
 
-        // set identifier fields
-        int[] primaryKeyIndexes = 
tableDescriptor.getSchema().getPrimaryKeyIndexes();
-        Set<Integer> identifierFieldIds = new HashSet<>();
-        for (int i = 0; i < primaryKeyIndexes.length; i++) {
-            identifierFieldIds.add(fields.get(i).fieldId());
+        if (isPkTable) {
+            // set identifier fields
+            int[] primaryKeyIndexes = 
tableDescriptor.getSchema().getPrimaryKeyIndexes();
+            Set<Integer> identifierFieldIds = new HashSet<>();
+            for (int pkIdx : primaryKeyIndexes) {
+                identifierFieldIds.add(fields.get(pkIdx).fieldId());
+            }
+            return new Schema(fields, identifierFieldIds);
+        } else {
+            return new Schema(fields);
         }
-        return new Schema(fields, identifierFieldIds);
     }
 
     private PartitionSpec createPartitionSpec(
-            TableDescriptor tableDescriptor, Schema icebergSchema) {
-        // Only PK tables supported for now
+            TableDescriptor tableDescriptor, Schema icebergSchema, boolean 
isPkTable) {
         List<String> bucketKeys = tableDescriptor.getBucketKeys();
         int bucketCount =
                 tableDescriptor
@@ -204,21 +208,35 @@ public class IcebergLakeCatalog implements LakeCatalog {
                                         new IllegalArgumentException(
                                                 "Bucket count (bucket.num) 
must be set"));
 
-        if (bucketKeys.isEmpty()) {
-            throw new IllegalArgumentException(
-                    "Bucket key must be set for primary key Iceberg tables");
-        }
+        // Only support one bucket key for now
         if (bucketKeys.size() > 1) {
             throw new UnsupportedOperationException(
                     "Only one bucket key is supported for Iceberg at the 
moment");
         }
 
+        // pk table must have bucket key
+        if (bucketKeys.isEmpty() && isPkTable) {
+            throw new IllegalArgumentException(
+                    "Bucket key must be set for primary key Iceberg tables");
+        }
+
         PartitionSpec.Builder builder = 
PartitionSpec.builderFor(icebergSchema);
         List<String> partitionKeys = tableDescriptor.getPartitionKeys();
+        // always set identity partition with partition key
         for (String partitionKey : partitionKeys) {
             builder.identity(partitionKey);
         }
-        builder.bucket(bucketKeys.get(0), bucketCount);
+
+        if (isPkTable) {
+            builder.bucket(bucketKeys.get(0), bucketCount);
+        } else {
+            // if there is no bucket keys, use identity(__bucket)
+            if (bucketKeys.isEmpty()) {
+                builder.identity(BUCKET_COLUMN_NAME);
+            } else {
+                builder.bucket(bucketKeys.get(0), bucketCount);
+            }
+        }
 
         return builder.build();
     }
@@ -252,13 +270,16 @@ public class IcebergLakeCatalog implements LakeCatalog {
         return builder.build();
     }
 
-    private Map<String, String> buildTableProperties(TableDescriptor 
tableDescriptor) {
+    private Map<String, String> buildTableProperties(
+            TableDescriptor tableDescriptor, boolean isPkTable) {
         Map<String, String> icebergProperties = new HashMap<>();
 
-        // MOR table properties for streaming workloads
-        icebergProperties.put("write.delete.mode", "merge-on-read");
-        icebergProperties.put("write.update.mode", "merge-on-read");
-        icebergProperties.put("write.merge.mode", "merge-on-read");
+        if (isPkTable) {
+            // MOR table properties for streaming workloads
+            icebergProperties.put("write.delete.mode", "merge-on-read");
+            icebergProperties.put("write.update.mode", "merge-on-read");
+            icebergProperties.put("write.merge.mode", "merge-on-read");
+        }
 
         tableDescriptor
                 .getProperties()
diff --git 
a/fluss-lake/fluss-lake-iceberg/src/test/java/com/alibaba/fluss/lake/iceberg/IcebergLakeCatalogTest.java
 
b/fluss-lake/fluss-lake-iceberg/src/test/java/com/alibaba/fluss/lake/iceberg/IcebergLakeCatalogTest.java
index feeb23a26..5ce41175c 100644
--- 
a/fluss-lake/fluss-lake-iceberg/src/test/java/com/alibaba/fluss/lake/iceberg/IcebergLakeCatalogTest.java
+++ 
b/fluss-lake/fluss-lake-iceberg/src/test/java/com/alibaba/fluss/lake/iceberg/IcebergLakeCatalogTest.java
@@ -61,6 +61,37 @@ class IcebergLakeCatalogTest {
         this.flussIcebergCatalog = new IcebergLakeCatalog(configuration);
     }
 
+    /** Verify property prefix rewriting. */
+    @Test
+    void testPropertyPrefixRewriting() {
+        String database = "test_db";
+        String tableName = "test_table";
+
+        Schema flussSchema = Schema.newBuilder().column("id", 
DataTypes.BIGINT()).build();
+
+        TableDescriptor tableDescriptor =
+                TableDescriptor.builder()
+                        .schema(flussSchema)
+                        .distributedBy(3)
+                        .property("iceberg.commit.retry.num-retries", "5")
+                        .property("table.datalake.freshness", "30s")
+                        .build();
+
+        TablePath tablePath = TablePath.of(database, tableName);
+        flussIcebergCatalog.createTable(tablePath, tableDescriptor);
+
+        Table created =
+                flussIcebergCatalog
+                        .getIcebergCatalog()
+                        .loadTable(TableIdentifier.of(database, tableName));
+
+        // Verify property prefix rewriting
+        
assertThat(created.properties()).containsEntry("commit.retry.num-retries", "5");
+        
assertThat(created.properties()).containsEntry("fluss.table.datalake.freshness",
 "30s");
+        assertThat(created.properties())
+                .doesNotContainKeys("iceberg.commit.retry.num-retries", 
"table.datalake.freshness");
+    }
+
     @Test
     void testCreatePrimaryKeyTable() {
         String database = "test_db";
@@ -217,4 +248,151 @@ class IcebergLakeCatalogTest {
                 .isInstanceOf(UnsupportedOperationException.class)
                 .hasMessageContaining("Only one bucket key is supported for 
Iceberg");
     }
+
+    @Test
+    void testCreateLogTable() {
+        String database = "test_db";
+        String tableName = "log_table";
+
+        Schema flussSchema =
+                Schema.newBuilder()
+                        .column("id", DataTypes.BIGINT())
+                        .column("name", DataTypes.STRING())
+                        .column("amount", DataTypes.INT())
+                        .column("address", DataTypes.STRING())
+                        .build();
+
+        TableDescriptor td =
+                TableDescriptor.builder()
+                        .schema(flussSchema)
+                        .distributedBy(3) // no bucket key
+                        .build();
+
+        TablePath tablePath = TablePath.of(database, tableName);
+        flussIcebergCatalog.createTable(tablePath, td);
+
+        TableIdentifier tableId = TableIdentifier.of(database, tableName);
+        Table createdTable = 
flussIcebergCatalog.getIcebergCatalog().loadTable(tableId);
+
+        org.apache.iceberg.Schema expectIcebergSchema =
+                new org.apache.iceberg.Schema(
+                        Arrays.asList(
+                                Types.NestedField.optional(1, "id", 
Types.LongType.get()),
+                                Types.NestedField.optional(2, "name", 
Types.StringType.get()),
+                                Types.NestedField.optional(3, "amount", 
Types.IntegerType.get()),
+                                Types.NestedField.optional(4, "address", 
Types.StringType.get()),
+                                Types.NestedField.required(
+                                        5, BUCKET_COLUMN_NAME, 
Types.IntegerType.get()),
+                                Types.NestedField.required(
+                                        6, OFFSET_COLUMN_NAME, 
Types.LongType.get()),
+                                Types.NestedField.required(
+                                        7, TIMESTAMP_COLUMN_NAME, 
Types.TimestampType.withZone())));
+
+        // Verify iceberg table schema
+        
assertThat(createdTable.schema().toString()).isEqualTo(expectIcebergSchema.toString());
+
+        // Verify partition field and transform
+        assertThat(createdTable.spec().fields()).hasSize(1);
+        PartitionField partitionField = createdTable.spec().fields().get(0);
+        assertThat(partitionField.name()).isEqualTo(BUCKET_COLUMN_NAME);
+        
assertThat(partitionField.transform().toString()).isEqualTo("identity");
+
+        // Verify sort field and order
+        assertThat(createdTable.sortOrder().fields()).hasSize(1);
+        SortField sortField = createdTable.sortOrder().fields().get(0);
+        assertThat(sortField.sourceId())
+                
.isEqualTo(createdTable.schema().findField(OFFSET_COLUMN_NAME).fieldId());
+        assertThat(sortField.direction()).isEqualTo(SortDirection.ASC);
+    }
+
+    @Test
+    void testCreatePartitionedLogTable() {
+        String database = "test_db";
+        String tableName = "partitioned_log_table";
+
+        Schema flussSchema =
+                Schema.newBuilder()
+                        .column("id", DataTypes.BIGINT())
+                        .column("name", DataTypes.STRING())
+                        .column("amount", DataTypes.INT())
+                        .column("order_type", DataTypes.STRING())
+                        .build();
+
+        TableDescriptor td =
+                TableDescriptor.builder()
+                        .schema(flussSchema)
+                        .distributedBy(3)
+                        .partitionedBy("order_type")
+                        .build();
+
+        TablePath path = TablePath.of(database, tableName);
+        flussIcebergCatalog.createTable(path, td);
+
+        Table createdTable =
+                flussIcebergCatalog
+                        .getIcebergCatalog()
+                        .loadTable(TableIdentifier.of(database, tableName));
+
+        org.apache.iceberg.Schema expectIcebergSchema =
+                new org.apache.iceberg.Schema(
+                        Arrays.asList(
+                                Types.NestedField.optional(1, "id", 
Types.LongType.get()),
+                                Types.NestedField.optional(2, "name", 
Types.StringType.get()),
+                                Types.NestedField.optional(3, "amount", 
Types.IntegerType.get()),
+                                Types.NestedField.optional(4, "order_type", 
Types.StringType.get()),
+                                Types.NestedField.required(
+                                        5, BUCKET_COLUMN_NAME, 
Types.IntegerType.get()),
+                                Types.NestedField.required(
+                                        6, OFFSET_COLUMN_NAME, 
Types.LongType.get()),
+                                Types.NestedField.required(
+                                        7, TIMESTAMP_COLUMN_NAME, 
Types.TimestampType.withZone())));
+
+        // Verify iceberg table schema
+        
assertThat(createdTable.schema().toString()).isEqualTo(expectIcebergSchema.toString());
+
+        // Verify partition field and transform
+        assertThat(createdTable.spec().fields()).hasSize(2);
+        PartitionField firstPartitionField = 
createdTable.spec().fields().get(0);
+        assertThat(firstPartitionField.name()).isEqualTo("order_type");
+        
assertThat(firstPartitionField.transform().toString()).isEqualTo("identity");
+
+        PartitionField secondPartitionField = 
createdTable.spec().fields().get(1);
+        assertThat(secondPartitionField.name()).isEqualTo(BUCKET_COLUMN_NAME);
+        
assertThat(secondPartitionField.transform().toString()).isEqualTo("identity");
+
+        // Verify sort field and order
+        assertThat(createdTable.sortOrder().fields()).hasSize(1);
+        SortField sortField = createdTable.sortOrder().fields().get(0);
+        assertThat(sortField.sourceId())
+                
.isEqualTo(createdTable.schema().findField(OFFSET_COLUMN_NAME).fieldId());
+        assertThat(sortField.direction()).isEqualTo(SortDirection.ASC);
+    }
+
+    @Test
+    void rejectsLogTableWithMultipleBucketKeys() {
+        String database = "test_db";
+        String tableName = "multi_bucket_log_table";
+
+        Schema flussSchema =
+                Schema.newBuilder()
+                        .column("id", DataTypes.BIGINT())
+                        .column("name", DataTypes.STRING())
+                        .column("amount", DataTypes.INT())
+                        .column("user_type", DataTypes.STRING())
+                        .column("order_type", DataTypes.STRING())
+                        .build();
+
+        TableDescriptor tableDescriptor =
+                TableDescriptor.builder()
+                        .schema(flussSchema)
+                        .distributedBy(3, "user_type", "order_type")
+                        .build();
+
+        TablePath tablePath = TablePath.of(database, tableName);
+
+        // Do not allow multiple bucket keys for log table
+        assertThatThrownBy(() -> flussIcebergCatalog.createTable(tablePath, 
tableDescriptor))
+                .isInstanceOf(UnsupportedOperationException.class)
+                .hasMessageContaining("Only one bucket key is supported for 
Iceberg");
+    }
 }

Reply via email to