This is an automated email from the ASF dual-hosted git repository.
yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new 0cba53bb1 [lake/iceberg] Implement IcebergLakeCatalog for PK Tables
(#1372)
0cba53bb1 is described below
commit 0cba53bb177b7bec50db78e56e1c20450f54a418
Author: MehulBatra <[email protected]>
AuthorDate: Fri Aug 8 14:08:27 2025 +0530
[lake/iceberg] Implement IcebergLakeCatalog for PK Tables (#1372)
---
fluss-lake/fluss-lake-iceberg/pom.xml | 1 -
.../iceberg/FlussDataTypeToIcebergDataType.java | 143 +++++++++++
.../fluss/lake/iceberg/IcebergLakeCatalog.java | 277 +++++++++++++++++++++
.../fluss/lake/iceberg/IcebergLakeStorage.java | 5 +-
.../fluss/lake/iceberg/IcebergLakeCatalogTest.java | 220 ++++++++++++++++
5 files changed, 642 insertions(+), 4 deletions(-)
diff --git a/fluss-lake/fluss-lake-iceberg/pom.xml
b/fluss-lake/fluss-lake-iceberg/pom.xml
index 2869bb799..01918d3aa 100644
--- a/fluss-lake/fluss-lake-iceberg/pom.xml
+++ b/fluss-lake/fluss-lake-iceberg/pom.xml
@@ -38,7 +38,6 @@
<artifactId>iceberg-core</artifactId>
<version>${iceberg.version}</version>
</dependency>
-
<dependency>
<groupId>com.alibaba.fluss</groupId>
<artifactId>fluss-common</artifactId>
diff --git
a/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/FlussDataTypeToIcebergDataType.java
b/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/FlussDataTypeToIcebergDataType.java
new file mode 100644
index 000000000..2f265acc4
--- /dev/null
+++
b/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/FlussDataTypeToIcebergDataType.java
@@ -0,0 +1,143 @@
+/*
+ * Copyright (c) 2025 Alibaba Group Holding Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.lake.iceberg;
+
+import com.alibaba.fluss.types.ArrayType;
+import com.alibaba.fluss.types.BigIntType;
+import com.alibaba.fluss.types.BinaryType;
+import com.alibaba.fluss.types.BooleanType;
+import com.alibaba.fluss.types.BytesType;
+import com.alibaba.fluss.types.CharType;
+import com.alibaba.fluss.types.DataTypeVisitor;
+import com.alibaba.fluss.types.DateType;
+import com.alibaba.fluss.types.DecimalType;
+import com.alibaba.fluss.types.DoubleType;
+import com.alibaba.fluss.types.FloatType;
+import com.alibaba.fluss.types.IntType;
+import com.alibaba.fluss.types.LocalZonedTimestampType;
+import com.alibaba.fluss.types.MapType;
+import com.alibaba.fluss.types.RowType;
+import com.alibaba.fluss.types.SmallIntType;
+import com.alibaba.fluss.types.StringType;
+import com.alibaba.fluss.types.TimeType;
+import com.alibaba.fluss.types.TimestampType;
+import com.alibaba.fluss.types.TinyIntType;
+
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.Types;
+
+/** Convert from Fluss's data type to Iceberg's data type. */
+public class FlussDataTypeToIcebergDataType implements DataTypeVisitor<Type> {
+
+ public static final FlussDataTypeToIcebergDataType INSTANCE =
+ new FlussDataTypeToIcebergDataType();
+
+ @Override
+ public Type visit(CharType charType) {
+ return Types.StringType.get();
+ }
+
+ @Override
+ public Type visit(StringType stringType) {
+ return Types.StringType.get();
+ }
+
+ @Override
+ public Type visit(BooleanType booleanType) {
+ return Types.BooleanType.get();
+ }
+
+ @Override
+ public Type visit(BinaryType binaryType) {
+ return Types.BinaryType.get();
+ }
+
+ @Override
+ public Type visit(BytesType bytesType) {
+ return Types.BinaryType.get();
+ }
+
+ @Override
+ public Type visit(DecimalType decimalType) {
+ return Types.DecimalType.of(decimalType.getPrecision(),
decimalType.getScale());
+ }
+
+ @Override
+ public Type visit(TinyIntType tinyIntType) {
+ return Types.IntegerType.get();
+ }
+
+ @Override
+ public Type visit(SmallIntType smallIntType) {
+ return Types.IntegerType.get();
+ }
+
+ @Override
+ public Type visit(IntType intType) {
+ return Types.IntegerType.get();
+ }
+
+ @Override
+ public Type visit(BigIntType bigIntType) {
+ return Types.LongType.get();
+ }
+
+ @Override
+ public Type visit(FloatType floatType) {
+ return Types.FloatType.get();
+ }
+
+ @Override
+ public Type visit(DoubleType doubleType) {
+ return Types.DoubleType.get();
+ }
+
+ @Override
+ public Type visit(DateType dateType) {
+ return Types.DateType.get();
+ }
+
+ @Override
+ public Type visit(TimeType timeType) {
+ return Types.TimeType.get();
+ }
+
+ @Override
+ public Type visit(TimestampType timestampType) {
+ return Types.TimestampType.withoutZone();
+ }
+
+ @Override
+ public Type visit(LocalZonedTimestampType localZonedTimestampType) {
+ return Types.TimestampType.withZone();
+ }
+
+ @Override
+ public Type visit(ArrayType arrayType) {
+ throw new UnsupportedOperationException("Unsupported array type");
+ }
+
+ @Override
+ public Type visit(MapType mapType) {
+ throw new UnsupportedOperationException("Unsupported map type");
+ }
+
+ @Override
+ public Type visit(RowType rowType) {
+ throw new UnsupportedOperationException("Unsupported row type");
+ }
+}
diff --git
a/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/IcebergLakeCatalog.java
b/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/IcebergLakeCatalog.java
new file mode 100644
index 000000000..a61bc1ce3
--- /dev/null
+++
b/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/IcebergLakeCatalog.java
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.lake.iceberg;
+
+import com.alibaba.fluss.annotation.VisibleForTesting;
+import com.alibaba.fluss.config.Configuration;
+import com.alibaba.fluss.exception.TableAlreadyExistException;
+import com.alibaba.fluss.lake.lakestorage.LakeCatalog;
+import com.alibaba.fluss.metadata.TableDescriptor;
+import com.alibaba.fluss.metadata.TablePath;
+import com.alibaba.fluss.utils.IOUtils;
+
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.SortOrder;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.SupportsNamespaces;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.exceptions.NoSuchNamespaceException;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.Types;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static com.alibaba.fluss.metadata.TableDescriptor.BUCKET_COLUMN_NAME;
+import static com.alibaba.fluss.metadata.TableDescriptor.OFFSET_COLUMN_NAME;
+import static com.alibaba.fluss.metadata.TableDescriptor.TIMESTAMP_COLUMN_NAME;
+import static org.apache.iceberg.CatalogUtil.loadCatalog;
+
+/** A Iceberg implementation of {@link LakeCatalog}. */
+public class IcebergLakeCatalog implements LakeCatalog {
+
+ private static final LinkedHashMap<String, Type> SYSTEM_COLUMNS = new
LinkedHashMap<>();
+
+ static {
+ // We need __bucket system column to filter out the given bucket
+ // for iceberg bucket append only table & primary key table.
+ SYSTEM_COLUMNS.put(BUCKET_COLUMN_NAME, Types.IntegerType.get());
+ SYSTEM_COLUMNS.put(OFFSET_COLUMN_NAME, Types.LongType.get());
+ SYSTEM_COLUMNS.put(TIMESTAMP_COLUMN_NAME,
Types.TimestampType.withZone());
+ }
+
+ private final Catalog icebergCatalog;
+
+ // for fluss config
+ private static final String FLUSS_CONF_PREFIX = "fluss.";
+ // for iceberg config
+ private static final String ICEBERG_CONF_PREFIX = "iceberg.";
+
+ public IcebergLakeCatalog(Configuration configuration) {
+ this.icebergCatalog = createIcebergCatalog(configuration);
+ }
+
+ @VisibleForTesting
+ protected Catalog getIcebergCatalog() {
+ return icebergCatalog;
+ }
+
+ private Catalog createIcebergCatalog(Configuration configuration) {
+ Map<String, String> icebergProps = configuration.toMap();
+
+ String catalogType = icebergProps.get("type");
+ if (catalogType == null) {
+ throw new IllegalArgumentException(
+ "Missing required Iceberg catalog type. Set
'iceberg.catalog.type' in your configuration (e.g., 'hive', 'hadoop', or
'rest').");
+ }
+
+ String catalogName = icebergProps.getOrDefault("name",
"fluss-iceberg-catalog");
+
+ return loadCatalog(
+ catalogType,
+ catalogName,
+ icebergProps,
+ null // Optional: pass Hadoop configuration if available
+ );
+ }
+
+ @Override
+ public void createTable(TablePath tablePath, TableDescriptor
tableDescriptor)
+ throws TableAlreadyExistException {
+ if (!tableDescriptor.hasPrimaryKey()) {
+ throw new UnsupportedOperationException(
+ "Iceberg integration currently supports only primary key
tables.");
+ }
+ // convert Fluss table path to iceberg table
+ TableIdentifier icebergId = toIcebergTableIdentifier(tablePath);
+ Schema icebergSchema = convertToIcebergSchema(tableDescriptor);
+ Catalog.TableBuilder tableBuilder =
icebergCatalog.buildTable(icebergId, icebergSchema);
+
+ PartitionSpec partitionSpec = createPartitionSpec(tableDescriptor,
icebergSchema);
+ SortOrder sortOrder = createSortOrder(icebergSchema);
+ tableBuilder.withProperties(buildTableProperties(tableDescriptor));
+ tableBuilder.withPartitionSpec(partitionSpec);
+ tableBuilder.withSortOrder(sortOrder);
+ try {
+ createTable(tablePath, tableBuilder);
+ } catch (NoSuchNamespaceException e) {
+ createDatabase(tablePath.getDatabaseName());
+ try {
+ createTable(tablePath, tableBuilder);
+ } catch (NoSuchNamespaceException t) {
+ // shouldn't happen in normal cases
+ throw new RuntimeException(
+ String.format(
+ "Fail to create table %s in Iceberg, because "
+ + "Namespace %s still doesn't exist
although create namespace "
+ + "successfully, please try again.",
+ tablePath, tablePath.getDatabaseName()));
+ }
+ }
+ }
+
+ private TableIdentifier toIcebergTableIdentifier(TablePath tablePath) {
+ return TableIdentifier.of(tablePath.getDatabaseName(),
tablePath.getTableName());
+ }
+
+ private void createTable(TablePath tablePath, Catalog.TableBuilder
tableBuilder) {
+ try {
+ tableBuilder.create();
+ } catch (AlreadyExistsException e) {
+ throw new TableAlreadyExistException("Table " + tablePath + "
already exists.");
+ }
+ }
+
+ public Schema convertToIcebergSchema(TableDescriptor tableDescriptor) {
+ List<Types.NestedField> fields = new ArrayList<>();
+ int fieldId = 1;
+
+ for (com.alibaba.fluss.metadata.Schema.Column column :
+ tableDescriptor.getSchema().getColumns()) {
+ String colName = column.getName();
+ if (SYSTEM_COLUMNS.containsKey(colName)) {
+ throw new IllegalArgumentException(
+ "Column '" + colName + "' conflicts with a reserved
system column name.");
+ }
+ Types.NestedField field;
+ if (column.getDataType().isNullable()) {
+ field =
+ Types.NestedField.optional(
+ fieldId++,
+ colName,
+ column.getDataType()
+
.accept(FlussDataTypeToIcebergDataType.INSTANCE),
+ column.getComment().orElse(null));
+ } else {
+ field =
+ Types.NestedField.required(
+ fieldId++,
+ colName,
+ column.getDataType()
+
.accept(FlussDataTypeToIcebergDataType.INSTANCE),
+ column.getComment().orElse(null));
+ }
+ fields.add(field);
+ }
+ for (Map.Entry<String, Type> systemColumn : SYSTEM_COLUMNS.entrySet())
{
+ fields.add(
+ Types.NestedField.required(
+ fieldId++, systemColumn.getKey(),
systemColumn.getValue()));
+ }
+
+ // set identifier fields
+ int[] primaryKeyIndexes =
tableDescriptor.getSchema().getPrimaryKeyIndexes();
+ Set<Integer> identifierFieldIds = new HashSet<>();
+ for (int i = 0; i < primaryKeyIndexes.length; i++) {
+ identifierFieldIds.add(fields.get(i).fieldId());
+ }
+ return new Schema(fields, identifierFieldIds);
+ }
+
+ private PartitionSpec createPartitionSpec(
+ TableDescriptor tableDescriptor, Schema icebergSchema) {
+ // Only PK tables supported for now
+ List<String> bucketKeys = tableDescriptor.getBucketKeys();
+ int bucketCount =
+ tableDescriptor
+ .getTableDistribution()
+
.flatMap(TableDescriptor.TableDistribution::getBucketCount)
+ .orElseThrow(
+ () ->
+ new IllegalArgumentException(
+ "Bucket count (bucket.num)
must be set"));
+
+ if (bucketKeys.isEmpty()) {
+ throw new IllegalArgumentException(
+ "Bucket key must be set for primary key Iceberg tables");
+ }
+ if (bucketKeys.size() > 1) {
+ throw new UnsupportedOperationException(
+ "Only one bucket key is supported for Iceberg at the
moment");
+ }
+
+ PartitionSpec.Builder builder =
PartitionSpec.builderFor(icebergSchema);
+ List<String> partitionKeys = tableDescriptor.getPartitionKeys();
+ for (String partitionKey : partitionKeys) {
+ builder.identity(partitionKey);
+ }
+ builder.bucket(bucketKeys.get(0), bucketCount);
+
+ return builder.build();
+ }
+
+ private void setFlussPropertyToIceberg(
+ String key, String value, Map<String, String> icebergProperties) {
+ if (key.startsWith(ICEBERG_CONF_PREFIX)) {
+ icebergProperties.put(key.substring(ICEBERG_CONF_PREFIX.length()),
value);
+ } else {
+ icebergProperties.put(FLUSS_CONF_PREFIX + key, value);
+ }
+ }
+
+ private void createDatabase(String databaseName) {
+ Namespace namespace = Namespace.of(databaseName);
+ if (icebergCatalog instanceof SupportsNamespaces) {
+ SupportsNamespaces supportsNamespaces = (SupportsNamespaces)
icebergCatalog;
+ if (!supportsNamespaces.namespaceExists(namespace)) {
+ supportsNamespaces.createNamespace(namespace);
+ }
+ } else {
+ throw new UnsupportedOperationException(
+ "The underlying Iceberg catalog does not support namespace
operations.");
+ }
+ }
+
+ private SortOrder createSortOrder(Schema icebergSchema) {
+ // Sort by __offset system column for deterministic ordering
+ SortOrder.Builder builder = SortOrder.builderFor(icebergSchema);
+ builder.asc(OFFSET_COLUMN_NAME);
+ return builder.build();
+ }
+
+ private Map<String, String> buildTableProperties(TableDescriptor
tableDescriptor) {
+ Map<String, String> icebergProperties = new HashMap<>();
+
+ // MOR table properties for streaming workloads
+ icebergProperties.put("write.delete.mode", "merge-on-read");
+ icebergProperties.put("write.update.mode", "merge-on-read");
+ icebergProperties.put("write.merge.mode", "merge-on-read");
+
+ tableDescriptor
+ .getProperties()
+ .forEach((k, v) -> setFlussPropertyToIceberg(k, v,
icebergProperties));
+ tableDescriptor
+ .getCustomProperties()
+ .forEach((k, v) -> setFlussPropertyToIceberg(k, v,
icebergProperties));
+
+ return icebergProperties;
+ }
+
+ @Override
+ public void close() {
+ IOUtils.closeQuietly((AutoCloseable) icebergCatalog,
"fluss-iceberg-catalog");
+ }
+}
diff --git
a/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/IcebergLakeStorage.java
b/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/IcebergLakeStorage.java
index fab753afe..9a553a3dd 100644
---
a/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/IcebergLakeStorage.java
+++
b/fluss-lake/fluss-lake-iceberg/src/main/java/com/alibaba/fluss/lake/iceberg/IcebergLakeStorage.java
@@ -18,7 +18,6 @@
package com.alibaba.fluss.lake.iceberg;
import com.alibaba.fluss.config.Configuration;
-import com.alibaba.fluss.lake.lakestorage.LakeCatalog;
import com.alibaba.fluss.lake.lakestorage.LakeStorage;
import com.alibaba.fluss.lake.source.LakeSource;
import com.alibaba.fluss.lake.writer.LakeTieringFactory;
@@ -39,8 +38,8 @@ public class IcebergLakeStorage implements LakeStorage {
}
@Override
- public LakeCatalog createLakeCatalog() {
- throw new UnsupportedOperationException("Not implemented");
+ public IcebergLakeCatalog createLakeCatalog() {
+ return new IcebergLakeCatalog(icebergConfig);
}
@Override
diff --git
a/fluss-lake/fluss-lake-iceberg/src/test/java/com/alibaba/fluss/lake/iceberg/IcebergLakeCatalogTest.java
b/fluss-lake/fluss-lake-iceberg/src/test/java/com/alibaba/fluss/lake/iceberg/IcebergLakeCatalogTest.java
new file mode 100644
index 000000000..feeb23a26
--- /dev/null
+++
b/fluss-lake/fluss-lake-iceberg/src/test/java/com/alibaba/fluss/lake/iceberg/IcebergLakeCatalogTest.java
@@ -0,0 +1,220 @@
+/*
+ * Copyright (c) 2025 Alibaba Group Holding Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.lake.iceberg;
+
+import com.alibaba.fluss.config.Configuration;
+import com.alibaba.fluss.metadata.Schema;
+import com.alibaba.fluss.metadata.TableDescriptor;
+import com.alibaba.fluss.metadata.TablePath;
+import com.alibaba.fluss.types.DataTypes;
+
+import org.apache.iceberg.PartitionField;
+import org.apache.iceberg.SortDirection;
+import org.apache.iceberg.SortField;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.types.Types;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+import static com.alibaba.fluss.metadata.TableDescriptor.BUCKET_COLUMN_NAME;
+import static com.alibaba.fluss.metadata.TableDescriptor.OFFSET_COLUMN_NAME;
+import static com.alibaba.fluss.metadata.TableDescriptor.TIMESTAMP_COLUMN_NAME;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy;
+
+/** Unit test for {@link IcebergLakeCatalog}. */
+class IcebergLakeCatalogTest {
+
+ @TempDir private File tempWarehouseDir;
+
+ private IcebergLakeCatalog flussIcebergCatalog;
+
+ @BeforeEach
+ void setupCatalog() {
+ Configuration configuration = new Configuration();
+ configuration.setString("warehouse",
tempWarehouseDir.toURI().toString());
+ configuration.setString("type",
"org.apache.iceberg.inmemory.InMemoryCatalog");
+ configuration.setString("name", "fluss_test_catalog");
+
+ this.flussIcebergCatalog = new IcebergLakeCatalog(configuration);
+ }
+
+ @Test
+ void testCreatePrimaryKeyTable() {
+ String database = "test_db";
+ String tableName = "simple_table";
+
+ Schema flussSchema =
+ Schema.newBuilder()
+ .column("id", DataTypes.INT())
+ .column("name", DataTypes.STRING())
+ .withComment("field name")
+ .primaryKey("id")
+ .build();
+
+ TableDescriptor tableDescriptor =
+ TableDescriptor.builder().schema(flussSchema).distributedBy(4,
"id").build();
+
+ TablePath tablePath = TablePath.of(database, tableName);
+
+ flussIcebergCatalog.createTable(tablePath, tableDescriptor);
+
+ TableIdentifier tableId = TableIdentifier.of(database, tableName);
+ Table createdTable =
flussIcebergCatalog.getIcebergCatalog().loadTable(tableId);
+
+ assertThat(createdTable).isNotNull();
+
assertThat(createdTable.name()).isEqualTo("fluss_test_catalog.test_db.simple_table");
+
+ org.apache.iceberg.Schema expectIcebergSchema =
+ new org.apache.iceberg.Schema(
+ Arrays.asList(
+ Types.NestedField.required(1, "id",
Types.IntegerType.get()),
+ Types.NestedField.optional(
+ 2, "name", Types.StringType.get(),
"field name"),
+ Types.NestedField.required(
+ 3, BUCKET_COLUMN_NAME,
Types.IntegerType.get()),
+ Types.NestedField.required(
+ 4, OFFSET_COLUMN_NAME,
Types.LongType.get()),
+ Types.NestedField.required(
+ 5, TIMESTAMP_COLUMN_NAME,
Types.TimestampType.withZone())),
+ Collections.singleton(1));
+
+
assertThat(createdTable.schema().toString()).isEqualTo(expectIcebergSchema.toString());
+ // verify partition spec
+ assertThat(createdTable.spec().fields()).hasSize(1);
+ PartitionField partitionField = createdTable.spec().fields().get(0);
+ assertThat(partitionField.name()).isEqualTo("id_bucket");
+
assertThat(partitionField.transform().toString()).isEqualTo("bucket[4]");
+ assertThat(partitionField.sourceId()).isEqualTo(1);
+ }
+
+ @Test
+ void testCreatePartitionedPrimaryKeyTable() {
+ String database = "test_db";
+ String tableName = "pk_table";
+
+ Schema flussSchema =
+ Schema.newBuilder()
+ .column("shop_id", DataTypes.BIGINT())
+ .column("user_id", DataTypes.BIGINT())
+ .column("num_orders", DataTypes.INT())
+ .column("total_amount", DataTypes.INT().copy(false))
+ .primaryKey("shop_id", "user_id")
+ .build();
+
+ TableDescriptor tableDescriptor =
+ TableDescriptor.builder()
+ .schema(flussSchema)
+ .distributedBy(10)
+ .partitionedBy("shop_id")
+ .property("iceberg.write.format.default", "orc")
+ .property("fluss_k1", "fluss_v1")
+ .build();
+
+ TablePath tablePath = TablePath.of(database, tableName);
+
+ flussIcebergCatalog.createTable(tablePath, tableDescriptor);
+
+ TableIdentifier tableId = TableIdentifier.of(database, tableName);
+ Table createdTable =
flussIcebergCatalog.getIcebergCatalog().loadTable(tableId);
+
+ Set<Integer> identifierFieldIds = new HashSet<>();
+ identifierFieldIds.add(1);
+ identifierFieldIds.add(2);
+ org.apache.iceberg.Schema expectIcebergSchema =
+ new org.apache.iceberg.Schema(
+ Arrays.asList(
+ Types.NestedField.required(1, "shop_id",
Types.LongType.get()),
+ Types.NestedField.required(2, "user_id",
Types.LongType.get()),
+ Types.NestedField.optional(
+ 3, "num_orders",
Types.IntegerType.get()),
+ Types.NestedField.required(
+ 4, "total_amount",
Types.IntegerType.get()),
+ Types.NestedField.required(
+ 5, BUCKET_COLUMN_NAME,
Types.IntegerType.get()),
+ Types.NestedField.required(
+ 6, OFFSET_COLUMN_NAME,
Types.LongType.get()),
+ Types.NestedField.required(
+ 7, TIMESTAMP_COLUMN_NAME,
Types.TimestampType.withZone())),
+ identifierFieldIds);
+
assertThat(createdTable.schema().toString()).isEqualTo(expectIcebergSchema.toString());
+
+ // Verify partition spec
+ assertThat(createdTable.spec().fields()).hasSize(2);
+ // first should be partitioned by the fluss partition key
+ PartitionField partitionField1 = createdTable.spec().fields().get(0);
+ assertThat(partitionField1.name()).isEqualTo("shop_id");
+
assertThat(partitionField1.transform().toString()).isEqualTo("identity");
+ assertThat(partitionField1.sourceId()).isEqualTo(1);
+
+ // the second should be partitioned by primary key
+ PartitionField partitionField2 = createdTable.spec().fields().get(1);
+ assertThat(partitionField2.name()).isEqualTo("user_id_bucket");
+
assertThat(partitionField2.transform().toString()).isEqualTo("bucket[10]");
+ assertThat(partitionField2.sourceId()).isEqualTo(2);
+
+ // Verify sort order
+ assertThat(createdTable.sortOrder().fields()).hasSize(1);
+ SortField sortField = createdTable.sortOrder().fields().get(0);
+ assertThat(sortField.sourceId())
+
.isEqualTo(createdTable.schema().findField(OFFSET_COLUMN_NAME).fieldId());
+ assertThat(sortField.direction()).isEqualTo(SortDirection.ASC);
+
+ // Verify table properties
+ assertThat(createdTable.properties())
+ .containsEntry("write.merge.mode", "merge-on-read")
+ .containsEntry("write.delete.mode", "merge-on-read")
+ .containsEntry("write.update.mode", "merge-on-read")
+ .containsEntry("fluss.fluss_k1", "fluss_v1")
+ .containsEntry("write.format.default", "orc");
+ }
+
+ @Test
+ void rejectsPrimaryKeyTableWithMultipleBucketKeys() {
+ String database = "test_db";
+ String tableName = "multi_bucket_pk_table";
+
+ Schema flussSchema =
+ Schema.newBuilder()
+ .column("user_id", DataTypes.BIGINT())
+ .column("shop_id", DataTypes.BIGINT())
+ .column("order_id", DataTypes.BIGINT())
+ .column("amount", DataTypes.DOUBLE())
+ .primaryKey("user_id", "shop_id")
+ .build();
+
+ TableDescriptor tableDescriptor =
+ TableDescriptor.builder()
+ .schema(flussSchema)
+ .distributedBy(4, "user_id", "shop_id") // Multiple
bucket keys
+ .build();
+
+ TablePath tablePath = TablePath.of(database, tableName);
+
+ assertThatThrownBy(() -> flussIcebergCatalog.createTable(tablePath,
tableDescriptor))
+ .isInstanceOf(UnsupportedOperationException.class)
+ .hasMessageContaining("Only one bucket key is supported for
Iceberg");
+ }
+}