This is an automated email from the ASF dual-hosted git repository.

yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new 0cba53bb1 [lake/iceberg] Implement IcebergLakeCatalog for PK Tables 
(#1372)
0cba53bb1 is described below

commit 0cba53bb177b7bec50db78e56e1c20450f54a418
Author: MehulBatra <[email protected]>
AuthorDate: Fri Aug 8 14:08:27 2025 +0530

    [lake/iceberg] Implement IcebergLakeCatalog for PK Tables (#1372)
---
 fluss-lake/fluss-lake-iceberg/pom.xml              |   1 -
 .../iceberg/FlussDataTypeToIcebergDataType.java    | 143 +++++++++++
 .../fluss/lake/iceberg/IcebergLakeCatalog.java     | 277 +++++++++++++++++++++
 .../fluss/lake/iceberg/IcebergLakeStorage.java     |   5 +-
 .../fluss/lake/iceberg/IcebergLakeCatalogTest.java | 220 ++++++++++++++++
 5 files changed, 642 insertions(+), 4 deletions(-)

diff --git a/fluss-lake/fluss-lake-iceberg/pom.xml 
b/fluss-lake/fluss-lake-iceberg/pom.xml
index 2869bb799..01918d3aa 100644
--- a/fluss-lake/fluss-lake-iceberg/pom.xml
+++ b/fluss-lake/fluss-lake-iceberg/pom.xml
@@ -38,7 +38,6 @@
             <artifactId>iceberg-core</artifactId>
             <version>${iceberg.version}</version>
         </dependency>
-
         <dependency>
             <groupId>com.alibaba.fluss</groupId>
             <artifactId>fluss-common</artifactId>
diff --git 
a/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/FlussDataTypeToIcebergDataType.java
 
b/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/FlussDataTypeToIcebergDataType.java
new file mode 100644
index 000000000..2f265acc4
--- /dev/null
+++ 
b/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/FlussDataTypeToIcebergDataType.java
@@ -0,0 +1,143 @@
+/*
+ * Copyright (c) 2025 Alibaba Group Holding Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.lake.iceberg;
+
+import com.alibaba.fluss.types.ArrayType;
+import com.alibaba.fluss.types.BigIntType;
+import com.alibaba.fluss.types.BinaryType;
+import com.alibaba.fluss.types.BooleanType;
+import com.alibaba.fluss.types.BytesType;
+import com.alibaba.fluss.types.CharType;
+import com.alibaba.fluss.types.DataTypeVisitor;
+import com.alibaba.fluss.types.DateType;
+import com.alibaba.fluss.types.DecimalType;
+import com.alibaba.fluss.types.DoubleType;
+import com.alibaba.fluss.types.FloatType;
+import com.alibaba.fluss.types.IntType;
+import com.alibaba.fluss.types.LocalZonedTimestampType;
+import com.alibaba.fluss.types.MapType;
+import com.alibaba.fluss.types.RowType;
+import com.alibaba.fluss.types.SmallIntType;
+import com.alibaba.fluss.types.StringType;
+import com.alibaba.fluss.types.TimeType;
+import com.alibaba.fluss.types.TimestampType;
+import com.alibaba.fluss.types.TinyIntType;
+
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.Types;
+
+/** Convert from Fluss's data type to Iceberg's data type. */
+public class FlussDataTypeToIcebergDataType implements DataTypeVisitor<Type> {
+
+    public static final FlussDataTypeToIcebergDataType INSTANCE =
+            new FlussDataTypeToIcebergDataType();
+
+    @Override
+    public Type visit(CharType charType) {
+        return Types.StringType.get();
+    }
+
+    @Override
+    public Type visit(StringType stringType) {
+        return Types.StringType.get();
+    }
+
+    @Override
+    public Type visit(BooleanType booleanType) {
+        return Types.BooleanType.get();
+    }
+
+    @Override
+    public Type visit(BinaryType binaryType) {
+        return Types.BinaryType.get();
+    }
+
+    @Override
+    public Type visit(BytesType bytesType) {
+        return Types.BinaryType.get();
+    }
+
+    @Override
+    public Type visit(DecimalType decimalType) {
+        return Types.DecimalType.of(decimalType.getPrecision(), 
decimalType.getScale());
+    }
+
+    @Override
+    public Type visit(TinyIntType tinyIntType) {
+        return Types.IntegerType.get();
+    }
+
+    @Override
+    public Type visit(SmallIntType smallIntType) {
+        return Types.IntegerType.get();
+    }
+
+    @Override
+    public Type visit(IntType intType) {
+        return Types.IntegerType.get();
+    }
+
+    @Override
+    public Type visit(BigIntType bigIntType) {
+        return Types.LongType.get();
+    }
+
+    @Override
+    public Type visit(FloatType floatType) {
+        return Types.FloatType.get();
+    }
+
+    @Override
+    public Type visit(DoubleType doubleType) {
+        return Types.DoubleType.get();
+    }
+
+    @Override
+    public Type visit(DateType dateType) {
+        return Types.DateType.get();
+    }
+
+    @Override
+    public Type visit(TimeType timeType) {
+        return Types.TimeType.get();
+    }
+
+    @Override
+    public Type visit(TimestampType timestampType) {
+        return Types.TimestampType.withoutZone();
+    }
+
+    @Override
+    public Type visit(LocalZonedTimestampType localZonedTimestampType) {
+        return Types.TimestampType.withZone();
+    }
+
+    @Override
+    public Type visit(ArrayType arrayType) {
+        throw new UnsupportedOperationException("Unsupported array type");
+    }
+
+    @Override
+    public Type visit(MapType mapType) {
+        throw new UnsupportedOperationException("Unsupported map type");
+    }
+
+    @Override
+    public Type visit(RowType rowType) {
+        throw new UnsupportedOperationException("Unsupported row type");
+    }
+}
diff --git 
a/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/IcebergLakeCatalog.java
 
b/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/IcebergLakeCatalog.java
new file mode 100644
index 000000000..a61bc1ce3
--- /dev/null
+++ 
b/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/IcebergLakeCatalog.java
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.lake.iceberg;
+
+import com.alibaba.fluss.annotation.VisibleForTesting;
+import com.alibaba.fluss.config.Configuration;
+import com.alibaba.fluss.exception.TableAlreadyExistException;
+import com.alibaba.fluss.lake.lakestorage.LakeCatalog;
+import com.alibaba.fluss.metadata.TableDescriptor;
+import com.alibaba.fluss.metadata.TablePath;
+import com.alibaba.fluss.utils.IOUtils;
+
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.SortOrder;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.SupportsNamespaces;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.exceptions.NoSuchNamespaceException;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.Types;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static com.alibaba.fluss.metadata.TableDescriptor.BUCKET_COLUMN_NAME;
+import static com.alibaba.fluss.metadata.TableDescriptor.OFFSET_COLUMN_NAME;
+import static com.alibaba.fluss.metadata.TableDescriptor.TIMESTAMP_COLUMN_NAME;
+import static org.apache.iceberg.CatalogUtil.loadCatalog;
+
+/** A Iceberg implementation of {@link LakeCatalog}. */
+public class IcebergLakeCatalog implements LakeCatalog {
+
+    private static final LinkedHashMap<String, Type> SYSTEM_COLUMNS = new 
LinkedHashMap<>();
+
+    static {
+        // We need __bucket system column to filter out the given bucket
+        // for iceberg bucket append only table & primary key table.
+        SYSTEM_COLUMNS.put(BUCKET_COLUMN_NAME, Types.IntegerType.get());
+        SYSTEM_COLUMNS.put(OFFSET_COLUMN_NAME, Types.LongType.get());
+        SYSTEM_COLUMNS.put(TIMESTAMP_COLUMN_NAME, 
Types.TimestampType.withZone());
+    }
+
+    private final Catalog icebergCatalog;
+
+    // for fluss config
+    private static final String FLUSS_CONF_PREFIX = "fluss.";
+    // for iceberg config
+    private static final String ICEBERG_CONF_PREFIX = "iceberg.";
+
+    public IcebergLakeCatalog(Configuration configuration) {
+        this.icebergCatalog = createIcebergCatalog(configuration);
+    }
+
+    @VisibleForTesting
+    protected Catalog getIcebergCatalog() {
+        return icebergCatalog;
+    }
+
+    private Catalog createIcebergCatalog(Configuration configuration) {
+        Map<String, String> icebergProps = configuration.toMap();
+
+        String catalogType = icebergProps.get("type");
+        if (catalogType == null) {
+            throw new IllegalArgumentException(
+                    "Missing required Iceberg catalog type. Set 
'iceberg.catalog.type' in your configuration (e.g., 'hive', 'hadoop', or 
'rest').");
+        }
+
+        String catalogName = icebergProps.getOrDefault("name", 
"fluss-iceberg-catalog");
+
+        return loadCatalog(
+                catalogType,
+                catalogName,
+                icebergProps,
+                null // Optional: pass Hadoop configuration if available
+                );
+    }
+
+    @Override
+    public void createTable(TablePath tablePath, TableDescriptor 
tableDescriptor)
+            throws TableAlreadyExistException {
+        if (!tableDescriptor.hasPrimaryKey()) {
+            throw new UnsupportedOperationException(
+                    "Iceberg integration currently supports only primary key 
tables.");
+        }
+        // convert Fluss table path to iceberg table
+        TableIdentifier icebergId = toIcebergTableIdentifier(tablePath);
+        Schema icebergSchema = convertToIcebergSchema(tableDescriptor);
+        Catalog.TableBuilder tableBuilder = 
icebergCatalog.buildTable(icebergId, icebergSchema);
+
+        PartitionSpec partitionSpec = createPartitionSpec(tableDescriptor, 
icebergSchema);
+        SortOrder sortOrder = createSortOrder(icebergSchema);
+        tableBuilder.withProperties(buildTableProperties(tableDescriptor));
+        tableBuilder.withPartitionSpec(partitionSpec);
+        tableBuilder.withSortOrder(sortOrder);
+        try {
+            createTable(tablePath, tableBuilder);
+        } catch (NoSuchNamespaceException e) {
+            createDatabase(tablePath.getDatabaseName());
+            try {
+                createTable(tablePath, tableBuilder);
+            } catch (NoSuchNamespaceException t) {
+                // shouldn't happen in normal cases
+                throw new RuntimeException(
+                        String.format(
+                                "Fail to create table %s in Iceberg, because "
+                                        + "Namespace %s still doesn't exist 
although create namespace "
+                                        + "successfully, please try again.",
+                                tablePath, tablePath.getDatabaseName()));
+            }
+        }
+    }
+
+    private TableIdentifier toIcebergTableIdentifier(TablePath tablePath) {
+        return TableIdentifier.of(tablePath.getDatabaseName(), 
tablePath.getTableName());
+    }
+
+    private void createTable(TablePath tablePath, Catalog.TableBuilder 
tableBuilder) {
+        try {
+            tableBuilder.create();
+        } catch (AlreadyExistsException e) {
+            throw new TableAlreadyExistException("Table " + tablePath + " 
already exists.");
+        }
+    }
+
+    public Schema convertToIcebergSchema(TableDescriptor tableDescriptor) {
+        List<Types.NestedField> fields = new ArrayList<>();
+        int fieldId = 1;
+
+        for (com.alibaba.fluss.metadata.Schema.Column column :
+                tableDescriptor.getSchema().getColumns()) {
+            String colName = column.getName();
+            if (SYSTEM_COLUMNS.containsKey(colName)) {
+                throw new IllegalArgumentException(
+                        "Column '" + colName + "' conflicts with a reserved 
system column name.");
+            }
+            Types.NestedField field;
+            if (column.getDataType().isNullable()) {
+                field =
+                        Types.NestedField.optional(
+                                fieldId++,
+                                colName,
+                                column.getDataType()
+                                        
.accept(FlussDataTypeToIcebergDataType.INSTANCE),
+                                column.getComment().orElse(null));
+            } else {
+                field =
+                        Types.NestedField.required(
+                                fieldId++,
+                                colName,
+                                column.getDataType()
+                                        
.accept(FlussDataTypeToIcebergDataType.INSTANCE),
+                                column.getComment().orElse(null));
+            }
+            fields.add(field);
+        }
+        for (Map.Entry<String, Type> systemColumn : SYSTEM_COLUMNS.entrySet()) 
{
+            fields.add(
+                    Types.NestedField.required(
+                            fieldId++, systemColumn.getKey(), 
systemColumn.getValue()));
+        }
+
+        // set identifier fields
+        int[] primaryKeyIndexes = 
tableDescriptor.getSchema().getPrimaryKeyIndexes();
+        Set<Integer> identifierFieldIds = new HashSet<>();
+        for (int i = 0; i < primaryKeyIndexes.length; i++) {
+            identifierFieldIds.add(fields.get(i).fieldId());
+        }
+        return new Schema(fields, identifierFieldIds);
+    }
+
+    private PartitionSpec createPartitionSpec(
+            TableDescriptor tableDescriptor, Schema icebergSchema) {
+        // Only PK tables supported for now
+        List<String> bucketKeys = tableDescriptor.getBucketKeys();
+        int bucketCount =
+                tableDescriptor
+                        .getTableDistribution()
+                        
.flatMap(TableDescriptor.TableDistribution::getBucketCount)
+                        .orElseThrow(
+                                () ->
+                                        new IllegalArgumentException(
+                                                "Bucket count (bucket.num) 
must be set"));
+
+        if (bucketKeys.isEmpty()) {
+            throw new IllegalArgumentException(
+                    "Bucket key must be set for primary key Iceberg tables");
+        }
+        if (bucketKeys.size() > 1) {
+            throw new UnsupportedOperationException(
+                    "Only one bucket key is supported for Iceberg at the 
moment");
+        }
+
+        PartitionSpec.Builder builder = 
PartitionSpec.builderFor(icebergSchema);
+        List<String> partitionKeys = tableDescriptor.getPartitionKeys();
+        for (String partitionKey : partitionKeys) {
+            builder.identity(partitionKey);
+        }
+        builder.bucket(bucketKeys.get(0), bucketCount);
+
+        return builder.build();
+    }
+
+    private void setFlussPropertyToIceberg(
+            String key, String value, Map<String, String> icebergProperties) {
+        if (key.startsWith(ICEBERG_CONF_PREFIX)) {
+            icebergProperties.put(key.substring(ICEBERG_CONF_PREFIX.length()), 
value);
+        } else {
+            icebergProperties.put(FLUSS_CONF_PREFIX + key, value);
+        }
+    }
+
+    private void createDatabase(String databaseName) {
+        Namespace namespace = Namespace.of(databaseName);
+        if (icebergCatalog instanceof SupportsNamespaces) {
+            SupportsNamespaces supportsNamespaces = (SupportsNamespaces) 
icebergCatalog;
+            if (!supportsNamespaces.namespaceExists(namespace)) {
+                supportsNamespaces.createNamespace(namespace);
+            }
+        } else {
+            throw new UnsupportedOperationException(
+                    "The underlying Iceberg catalog does not support namespace 
operations.");
+        }
+    }
+
+    private SortOrder createSortOrder(Schema icebergSchema) {
+        // Sort by __offset system column for deterministic ordering
+        SortOrder.Builder builder = SortOrder.builderFor(icebergSchema);
+        builder.asc(OFFSET_COLUMN_NAME);
+        return builder.build();
+    }
+
+    private Map<String, String> buildTableProperties(TableDescriptor 
tableDescriptor) {
+        Map<String, String> icebergProperties = new HashMap<>();
+
+        // MOR table properties for streaming workloads
+        icebergProperties.put("write.delete.mode", "merge-on-read");
+        icebergProperties.put("write.update.mode", "merge-on-read");
+        icebergProperties.put("write.merge.mode", "merge-on-read");
+
+        tableDescriptor
+                .getProperties()
+                .forEach((k, v) -> setFlussPropertyToIceberg(k, v, 
icebergProperties));
+        tableDescriptor
+                .getCustomProperties()
+                .forEach((k, v) -> setFlussPropertyToIceberg(k, v, 
icebergProperties));
+
+        return icebergProperties;
+    }
+
+    @Override
+    public void close() {
+        IOUtils.closeQuietly((AutoCloseable) icebergCatalog, 
"fluss-iceberg-catalog");
+    }
+}
diff --git 
a/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/IcebergLakeStorage.java
 
b/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/IcebergLakeStorage.java
index fab753afe..9a553a3dd 100644
--- 
a/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/IcebergLakeStorage.java
+++ 
b/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/IcebergLakeStorage.java
@@ -18,7 +18,6 @@
 package com.alibaba.fluss.lake.iceberg;
 
 import com.alibaba.fluss.config.Configuration;
-import com.alibaba.fluss.lake.lakestorage.LakeCatalog;
 import com.alibaba.fluss.lake.lakestorage.LakeStorage;
 import com.alibaba.fluss.lake.source.LakeSource;
 import com.alibaba.fluss.lake.writer.LakeTieringFactory;
@@ -39,8 +38,8 @@ public class IcebergLakeStorage implements LakeStorage {
     }
 
     @Override
-    public LakeCatalog createLakeCatalog() {
-        throw new UnsupportedOperationException("Not implemented");
+    public IcebergLakeCatalog createLakeCatalog() {
+        return new IcebergLakeCatalog(icebergConfig);
     }
 
     @Override
diff --git 
a/fluss-lake/fluss-lake-iceberg/src/test/java/com/alibaba/fluss/lake/iceberg/IcebergLakeCatalogTest.java
 
b/fluss-lake/fluss-lake-iceberg/src/test/java/com/alibaba/fluss/lake/iceberg/IcebergLakeCatalogTest.java
new file mode 100644
index 000000000..feeb23a26
--- /dev/null
+++ 
b/fluss-lake/fluss-lake-iceberg/src/test/java/com/alibaba/fluss/lake/iceberg/IcebergLakeCatalogTest.java
@@ -0,0 +1,220 @@
+/*
+ * Copyright (c) 2025 Alibaba Group Holding Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.lake.iceberg;
+
+import com.alibaba.fluss.config.Configuration;
+import com.alibaba.fluss.metadata.Schema;
+import com.alibaba.fluss.metadata.TableDescriptor;
+import com.alibaba.fluss.metadata.TablePath;
+import com.alibaba.fluss.types.DataTypes;
+
+import org.apache.iceberg.PartitionField;
+import org.apache.iceberg.SortDirection;
+import org.apache.iceberg.SortField;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.types.Types;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+import static com.alibaba.fluss.metadata.TableDescriptor.BUCKET_COLUMN_NAME;
+import static com.alibaba.fluss.metadata.TableDescriptor.OFFSET_COLUMN_NAME;
+import static com.alibaba.fluss.metadata.TableDescriptor.TIMESTAMP_COLUMN_NAME;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy;
+
+/** Unit test for {@link IcebergLakeCatalog}. */
+class IcebergLakeCatalogTest {
+
+    @TempDir private File tempWarehouseDir;
+
+    private IcebergLakeCatalog flussIcebergCatalog;
+
+    @BeforeEach
+    void setupCatalog() {
+        Configuration configuration = new Configuration();
+        configuration.setString("warehouse", 
tempWarehouseDir.toURI().toString());
+        configuration.setString("type", 
"org.apache.iceberg.inmemory.InMemoryCatalog");
+        configuration.setString("name", "fluss_test_catalog");
+
+        this.flussIcebergCatalog = new IcebergLakeCatalog(configuration);
+    }
+
+    @Test
+    void testCreatePrimaryKeyTable() {
+        String database = "test_db";
+        String tableName = "simple_table";
+
+        Schema flussSchema =
+                Schema.newBuilder()
+                        .column("id", DataTypes.INT())
+                        .column("name", DataTypes.STRING())
+                        .withComment("field name")
+                        .primaryKey("id")
+                        .build();
+
+        TableDescriptor tableDescriptor =
+                TableDescriptor.builder().schema(flussSchema).distributedBy(4, 
"id").build();
+
+        TablePath tablePath = TablePath.of(database, tableName);
+
+        flussIcebergCatalog.createTable(tablePath, tableDescriptor);
+
+        TableIdentifier tableId = TableIdentifier.of(database, tableName);
+        Table createdTable = 
flussIcebergCatalog.getIcebergCatalog().loadTable(tableId);
+
+        assertThat(createdTable).isNotNull();
+        
assertThat(createdTable.name()).isEqualTo("fluss_test_catalog.test_db.simple_table");
+
+        org.apache.iceberg.Schema expectIcebergSchema =
+                new org.apache.iceberg.Schema(
+                        Arrays.asList(
+                                Types.NestedField.required(1, "id", 
Types.IntegerType.get()),
+                                Types.NestedField.optional(
+                                        2, "name", Types.StringType.get(), 
"field name"),
+                                Types.NestedField.required(
+                                        3, BUCKET_COLUMN_NAME, 
Types.IntegerType.get()),
+                                Types.NestedField.required(
+                                        4, OFFSET_COLUMN_NAME, 
Types.LongType.get()),
+                                Types.NestedField.required(
+                                        5, TIMESTAMP_COLUMN_NAME, 
Types.TimestampType.withZone())),
+                        Collections.singleton(1));
+
+        
assertThat(createdTable.schema().toString()).isEqualTo(expectIcebergSchema.toString());
+        // verify partition spec
+        assertThat(createdTable.spec().fields()).hasSize(1);
+        PartitionField partitionField = createdTable.spec().fields().get(0);
+        assertThat(partitionField.name()).isEqualTo("id_bucket");
+        
assertThat(partitionField.transform().toString()).isEqualTo("bucket[4]");
+        assertThat(partitionField.sourceId()).isEqualTo(1);
+    }
+
+    @Test
+    void testCreatePartitionedPrimaryKeyTable() {
+        String database = "test_db";
+        String tableName = "pk_table";
+
+        Schema flussSchema =
+                Schema.newBuilder()
+                        .column("shop_id", DataTypes.BIGINT())
+                        .column("user_id", DataTypes.BIGINT())
+                        .column("num_orders", DataTypes.INT())
+                        .column("total_amount", DataTypes.INT().copy(false))
+                        .primaryKey("shop_id", "user_id")
+                        .build();
+
+        TableDescriptor tableDescriptor =
+                TableDescriptor.builder()
+                        .schema(flussSchema)
+                        .distributedBy(10)
+                        .partitionedBy("shop_id")
+                        .property("iceberg.write.format.default", "orc")
+                        .property("fluss_k1", "fluss_v1")
+                        .build();
+
+        TablePath tablePath = TablePath.of(database, tableName);
+
+        flussIcebergCatalog.createTable(tablePath, tableDescriptor);
+
+        TableIdentifier tableId = TableIdentifier.of(database, tableName);
+        Table createdTable = 
flussIcebergCatalog.getIcebergCatalog().loadTable(tableId);
+
+        Set<Integer> identifierFieldIds = new HashSet<>();
+        identifierFieldIds.add(1);
+        identifierFieldIds.add(2);
+        org.apache.iceberg.Schema expectIcebergSchema =
+                new org.apache.iceberg.Schema(
+                        Arrays.asList(
+                                Types.NestedField.required(1, "shop_id", 
Types.LongType.get()),
+                                Types.NestedField.required(2, "user_id", 
Types.LongType.get()),
+                                Types.NestedField.optional(
+                                        3, "num_orders", 
Types.IntegerType.get()),
+                                Types.NestedField.required(
+                                        4, "total_amount", 
Types.IntegerType.get()),
+                                Types.NestedField.required(
+                                        5, BUCKET_COLUMN_NAME, 
Types.IntegerType.get()),
+                                Types.NestedField.required(
+                                        6, OFFSET_COLUMN_NAME, 
Types.LongType.get()),
+                                Types.NestedField.required(
+                                        7, TIMESTAMP_COLUMN_NAME, 
Types.TimestampType.withZone())),
+                        identifierFieldIds);
+        
assertThat(createdTable.schema().toString()).isEqualTo(expectIcebergSchema.toString());
+
+        // Verify partition spec
+        assertThat(createdTable.spec().fields()).hasSize(2);
+        // first should be partitioned by the fluss partition key
+        PartitionField partitionField1 = createdTable.spec().fields().get(0);
+        assertThat(partitionField1.name()).isEqualTo("shop_id");
+        
assertThat(partitionField1.transform().toString()).isEqualTo("identity");
+        assertThat(partitionField1.sourceId()).isEqualTo(1);
+
+        // the second should be partitioned by primary key
+        PartitionField partitionField2 = createdTable.spec().fields().get(1);
+        assertThat(partitionField2.name()).isEqualTo("user_id_bucket");
+        
assertThat(partitionField2.transform().toString()).isEqualTo("bucket[10]");
+        assertThat(partitionField2.sourceId()).isEqualTo(2);
+
+        // Verify sort order
+        assertThat(createdTable.sortOrder().fields()).hasSize(1);
+        SortField sortField = createdTable.sortOrder().fields().get(0);
+        assertThat(sortField.sourceId())
+                
.isEqualTo(createdTable.schema().findField(OFFSET_COLUMN_NAME).fieldId());
+        assertThat(sortField.direction()).isEqualTo(SortDirection.ASC);
+
+        // Verify  table properties
+        assertThat(createdTable.properties())
+                .containsEntry("write.merge.mode", "merge-on-read")
+                .containsEntry("write.delete.mode", "merge-on-read")
+                .containsEntry("write.update.mode", "merge-on-read")
+                .containsEntry("fluss.fluss_k1", "fluss_v1")
+                .containsEntry("write.format.default", "orc");
+    }
+
+    @Test
+    void rejectsPrimaryKeyTableWithMultipleBucketKeys() {
+        String database = "test_db";
+        String tableName = "multi_bucket_pk_table";
+
+        Schema flussSchema =
+                Schema.newBuilder()
+                        .column("user_id", DataTypes.BIGINT())
+                        .column("shop_id", DataTypes.BIGINT())
+                        .column("order_id", DataTypes.BIGINT())
+                        .column("amount", DataTypes.DOUBLE())
+                        .primaryKey("user_id", "shop_id")
+                        .build();
+
+        TableDescriptor tableDescriptor =
+                TableDescriptor.builder()
+                        .schema(flussSchema)
+                        .distributedBy(4, "user_id", "shop_id") // Multiple 
bucket keys
+                        .build();
+
+        TablePath tablePath = TablePath.of(database, tableName);
+
+        assertThatThrownBy(() -> flussIcebergCatalog.createTable(tablePath, 
tableDescriptor))
+                .isInstanceOf(UnsupportedOperationException.class)
+                .hasMessageContaining("Only one bucket key is supported for 
Iceberg");
+    }
+}

Reply via email to