This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new dc7f695537 [Fix][Connecotr-V2] Fix paimon dynamic bucket tale in 
primary key is not first (#7728)
dc7f695537 is described below

commit dc7f695537fe4942ff31e57039f592c6661b88bc
Author: zhangdonghao <[email protected]>
AuthorDate: Fri Sep 27 09:41:58 2024 +0800

    [Fix][Connecotr-V2] Fix paimon dynamic bucket tale in primary key is not 
first (#7728)
---
 .../seatunnel/paimon/catalog/PaimonCatalog.java    |  24 ++
 .../seatunnel/paimon/config/PaimonSinkConfig.java  |   9 +
 .../paimon/exception/PaimonConnectorErrorCode.java |   4 +-
 .../seatunnel/paimon/sink/PaimonSinkWriter.java    |   4 +
 .../paimon/sink/bucket/PaimonBucketAssigner.java   |  40 ++--
 .../paimon/catalog/PaimonCatalogTest.java          | 255 +++++++++++++++++++++
 .../paimon/PaimonSinkDynamicBucketIT.java          |  72 ++++++
 .../fake_to_dynamic_bucket_paimon_case6.conf       |  94 ++++++++
 .../fake_to_dynamic_bucket_paimon_case7.conf       |  82 +++++++
 9 files changed, 567 insertions(+), 17 deletions(-)

diff --git 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalog.java
 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalog.java
index 9e09035e2f..3075358470 100644
--- 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalog.java
+++ 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalog.java
@@ -31,6 +31,8 @@ import 
org.apache.seatunnel.api.table.catalog.exception.TableNotExistException;
 import org.apache.seatunnel.api.table.converter.BasicTypeDefine;
 import org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonConfig;
 import 
org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonSinkConfig;
+import 
org.apache.seatunnel.connectors.seatunnel.paimon.exception.PaimonConnectorErrorCode;
+import 
org.apache.seatunnel.connectors.seatunnel.paimon.exception.PaimonConnectorException;
 import org.apache.seatunnel.connectors.seatunnel.paimon.utils.SchemaUtil;
 
 import org.apache.paimon.catalog.Identifier;
@@ -161,6 +163,8 @@ public class PaimonCatalog implements Catalog, PaimonTable {
             throw new TableAlreadyExistException(this.catalogName, tablePath);
         } catch (org.apache.paimon.catalog.Catalog.DatabaseNotExistException 
e) {
             throw new DatabaseNotExistException(this.catalogName, 
tablePath.getDatabaseName());
+        } catch (Exception e) {
+            resolveException(e);
         }
     }
 
@@ -273,4 +277,24 @@ public class PaimonCatalog implements Catalog, PaimonTable 
{
     private Identifier toIdentifier(TablePath tablePath) {
         return Identifier.create(tablePath.getDatabaseName(), 
tablePath.getTableName());
     }
+
+    private void resolveException(Exception e) {
+        Throwable cause = e.getCause();
+        if (cause instanceof UnsupportedOperationException) {
+            String message = cause.getMessage();
+            if (message.contains("The type ")
+                    && message.contains(" in primary key field ")
+                    && message.contains(" is unsupported")) {
+                throw new PaimonConnectorException(
+                        PaimonConnectorErrorCode.UNSUPPORTED_PRIMARY_DATATYPE, 
message);
+            }
+        } else if (cause instanceof RuntimeException) {
+            String message = cause.getMessage();
+            if (message.contains("Cannot define 'bucket-key' in unaware or 
dynamic bucket mode.")) {
+                throw new PaimonConnectorException(
+                        PaimonConnectorErrorCode.WRITE_PROPS_BUCKET_KEY_ERROR, 
message);
+            }
+        }
+        throw new CatalogException("An unexpected error occurred", e);
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/config/PaimonSinkConfig.java
 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/config/PaimonSinkConfig.java
index b706e622f7..9b358a2e8c 100644
--- 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/config/PaimonSinkConfig.java
+++ 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/config/PaimonSinkConfig.java
@@ -24,12 +24,14 @@ import org.apache.seatunnel.api.sink.DataSaveMode;
 import org.apache.seatunnel.api.sink.SchemaSaveMode;
 
 import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
 
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
 @Getter
+@Slf4j
 public class PaimonSinkConfig extends PaimonConfig {
     public static final Option<SchemaSaveMode> SCHEMA_SAVE_MODE =
             Options.key("schema_save_mode")
@@ -77,5 +79,12 @@ public class PaimonSinkConfig extends PaimonConfig {
         this.primaryKeys = stringToList(readonlyConfig.get(PRIMARY_KEYS), ",");
         this.partitionKeys = stringToList(readonlyConfig.get(PARTITION_KEYS), 
",");
         this.writeProps = readonlyConfig.get(WRITE_PROPS);
+        checkConfig();
+    }
+
+    private void checkConfig() {
+        if (this.primaryKeys.isEmpty() && 
"-1".equals(this.writeProps.get("bucket"))) {
+            log.warn("Append only table currently do not support dynamic 
bucket");
+        }
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/exception/PaimonConnectorErrorCode.java
 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/exception/PaimonConnectorErrorCode.java
index ed4c80a40d..237c7c2448 100644
--- 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/exception/PaimonConnectorErrorCode.java
+++ 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/exception/PaimonConnectorErrorCode.java
@@ -27,7 +27,9 @@ public enum PaimonConnectorErrorCode implements 
SeaTunnelErrorCode {
     GET_TABLE_FAILED("PAIMON-04", "Get table from database failed"),
     AUTHENTICATE_KERBEROS_FAILED("PAIMON-05", "Authenticate kerberos failed"),
     LOAD_CATALOG("PAIMON-06", "Load catalog failed"),
-    GET_FILED_FAILED("PAIMON-07", "Get field failed");
+    GET_FILED_FAILED("PAIMON-07", "Get field failed"),
+    UNSUPPORTED_PRIMARY_DATATYPE("PAIMON-08", "Paimon primary key datatype is 
unsupported"),
+    WRITE_PROPS_BUCKET_KEY_ERROR("PAIMON-09", "Cannot define 'bucket-key' in 
dynamic bucket mode");
 
     private final String code;
     private final String description;
diff --git 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkWriter.java
 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkWriter.java
index 18442d05b5..7a3fe6d033 100644
--- 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkWriter.java
+++ 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkWriter.java
@@ -104,6 +104,10 @@ public class PaimonSinkWriter
         BucketMode bucketMode = ((FileStoreTable) table).bucketMode();
         this.dynamicBucket =
                 BucketMode.DYNAMIC == bucketMode || BucketMode.GLOBAL_DYNAMIC 
== bucketMode;
+        int bucket = ((FileStoreTable) table).coreOptions().bucket();
+        if (bucket == -1 && BucketMode.UNAWARE == bucketMode) {
+            log.warn("Append only table currently do not support dynamic 
bucket");
+        }
         if (dynamicBucket) {
             this.bucketAssigner =
                     new PaimonBucketAssigner(
diff --git 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/bucket/PaimonBucketAssigner.java
 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/bucket/PaimonBucketAssigner.java
index 4f5f681fff..16804754fa 100644
--- 
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/bucket/PaimonBucketAssigner.java
+++ 
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/bucket/PaimonBucketAssigner.java
@@ -17,10 +17,8 @@
 
 package org.apache.seatunnel.connectors.seatunnel.paimon.sink.bucket;
 
-import org.apache.commons.collections.CollectionUtils;
-import org.apache.paimon.codegen.CodeGenUtils;
-import org.apache.paimon.codegen.Projection;
 import org.apache.paimon.crosspartition.IndexBootstrap;
+import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.index.SimpleHashBucketAssigner;
 import org.apache.paimon.reader.RecordReader;
@@ -29,15 +27,20 @@ import org.apache.paimon.schema.TableSchema;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.Table;
 import org.apache.paimon.table.sink.RowPartitionKeyExtractor;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataType;
 
 import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 
 public class PaimonBucketAssigner {
 
     private final RowPartitionKeyExtractor extractor;
 
-    private final Projection bucketKeyProjection;
-
     private final SimpleHashBucketAssigner simpleHashBucketAssigner;
 
     private final TableSchema schema;
@@ -46,10 +49,6 @@ public class PaimonBucketAssigner {
         FileStoreTable fileStoreTable = (FileStoreTable) table;
         this.schema = fileStoreTable.schema();
         this.extractor = new RowPartitionKeyExtractor(fileStoreTable.schema());
-        this.bucketKeyProjection =
-                CodeGenUtils.newProjection(
-                        fileStoreTable.schema().logicalRowType(),
-                        
fileStoreTable.schema().projection(fileStoreTable.schema().bucketKeys()));
         long dynamicBucketTargetRowNum =
                 ((FileStoreTable) 
table).coreOptions().dynamicBucketTargetRowNum();
         this.simpleHashBucketAssigner =
@@ -59,13 +58,27 @@ public class PaimonBucketAssigner {
 
     private void loadBucketIndex(FileStoreTable fileStoreTable, int 
numAssigners, int assignId) {
         IndexBootstrap indexBootstrap = new IndexBootstrap(fileStoreTable);
+        List<String> fieldNames = schema.fieldNames();
+        Map<String, Integer> fieldIndexMap =
+                IntStream.range(0, fieldNames.size())
+                        .boxed()
+                        .collect(Collectors.toMap(fieldNames::get, 
Function.identity()));
+        List<DataField> primaryKeys = schema.primaryKeysFields();
         try (RecordReader<InternalRow> recordReader =
                 indexBootstrap.bootstrap(numAssigners, assignId)) {
             RecordReaderIterator<InternalRow> readerIterator =
                     new RecordReaderIterator<>(recordReader);
             while (readerIterator.hasNext()) {
                 InternalRow row = readerIterator.next();
-                assign(row);
+                GenericRow binaryRow = new GenericRow(fieldNames.size());
+                for (int i = 0; i < primaryKeys.size(); i++) {
+                    String name = primaryKeys.get(i).name();
+                    DataType type = primaryKeys.get(i).type();
+                    binaryRow.setField(
+                            fieldIndexMap.get(name),
+                            InternalRow.createFieldGetter(type, 
i).getFieldOrNull(row));
+                }
+                assign(binaryRow);
             }
         } catch (IOException e) {
             throw new RuntimeException(e);
@@ -73,12 +86,7 @@ public class PaimonBucketAssigner {
     }
 
     public int assign(InternalRow rowData) {
-        int hash;
-        if (CollectionUtils.isEmpty(this.schema.bucketKeys())) {
-            hash = extractor.trimmedPrimaryKey(rowData).hashCode();
-        } else {
-            hash = bucketKeyProjection.apply(rowData).hashCode();
-        }
+        int hash = extractor.trimmedPrimaryKey(rowData).hashCode();
         return Math.abs(
                 
this.simpleHashBucketAssigner.assign(this.extractor.partition(rowData), hash));
     }
diff --git 
a/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalogTest.java
 
b/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalogTest.java
new file mode 100644
index 0000000000..9ebf758439
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalogTest.java
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.paimon.catalog;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
+import org.apache.seatunnel.api.table.catalog.PrimaryKey;
+import org.apache.seatunnel.api.table.catalog.TableIdentifier;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
+import org.apache.seatunnel.api.table.type.ArrayType;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.DecimalType;
+import org.apache.seatunnel.api.table.type.LocalTimeType;
+import org.apache.seatunnel.api.table.type.MapType;
+import 
org.apache.seatunnel.connectors.seatunnel.paimon.exception.PaimonConnectorErrorCode;
+import 
org.apache.seatunnel.connectors.seatunnel.paimon.exception.PaimonConnectorException;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+@Slf4j
+public class PaimonCatalogTest {
+
+    private PaimonCatalog paimonCatalog;
+    private TableSchema.Builder schemaBuilder;
+    private final String CATALOG_NAME = "paimon_catalog";
+    private final String DATABASE_NAME = "default";
+    private final String TABLE_NAME = "test_table";
+
+    @BeforeEach
+    public void before() {
+        Map<String, Object> properties = new HashMap<>();
+        properties.put("warehouse", "/tmp/paimon");
+        properties.put("plugin_name", "Paimon");
+        properties.put("database", DATABASE_NAME);
+        properties.put("table", TABLE_NAME);
+        Map<String, String> writeProps = new HashMap<>();
+        writeProps.put("bucket", "-1");
+        writeProps.put("bucket-key", "c_string");
+        properties.put("paimon.table.write-props", writeProps);
+        ReadonlyConfig config = ReadonlyConfig.fromMap(properties);
+        paimonCatalog = new PaimonCatalog(CATALOG_NAME, config);
+        paimonCatalog.open();
+        paimonCatalog.createDatabase(TablePath.of(DATABASE_NAME, TABLE_NAME), 
false);
+        this.schemaBuilder =
+                TableSchema.builder()
+                        .column(
+                                PhysicalColumn.of(
+                                        "c_map",
+                                        new MapType<>(BasicType.STRING_TYPE, 
BasicType.STRING_TYPE),
+                                        (Long) null,
+                                        true,
+                                        null,
+                                        null))
+                        .column(
+                                PhysicalColumn.of(
+                                        "c_array",
+                                        ArrayType.STRING_ARRAY_TYPE,
+                                        (Long) null,
+                                        false,
+                                        null,
+                                        "c_array"))
+                        .column(
+                                PhysicalColumn.of(
+                                        "c_string",
+                                        BasicType.STRING_TYPE,
+                                        (Long) null,
+                                        false,
+                                        null,
+                                        "c_string"))
+                        .column(
+                                PhysicalColumn.of(
+                                        "c_boolean",
+                                        BasicType.BOOLEAN_TYPE,
+                                        (Long) null,
+                                        false,
+                                        null,
+                                        "c_boolean"))
+                        .column(
+                                PhysicalColumn.of(
+                                        "c_tinyint",
+                                        BasicType.INT_TYPE,
+                                        (Long) null,
+                                        false,
+                                        null,
+                                        "c_tinyint"))
+                        .column(
+                                PhysicalColumn.of(
+                                        "c_smallint",
+                                        BasicType.INT_TYPE,
+                                        (Long) null,
+                                        false,
+                                        null,
+                                        "c_smallint"))
+                        .column(
+                                PhysicalColumn.of(
+                                        "c_int",
+                                        BasicType.INT_TYPE,
+                                        (Long) null,
+                                        false,
+                                        null,
+                                        "c_int"))
+                        .column(
+                                PhysicalColumn.of(
+                                        "c_bigint",
+                                        BasicType.LONG_TYPE,
+                                        (Long) null,
+                                        false,
+                                        null,
+                                        "c_bigint"))
+                        .column(
+                                PhysicalColumn.of(
+                                        "c_float",
+                                        BasicType.FLOAT_TYPE,
+                                        (Long) null,
+                                        false,
+                                        null,
+                                        "c_float"))
+                        .column(
+                                PhysicalColumn.of(
+                                        "c_double",
+                                        BasicType.DOUBLE_TYPE,
+                                        (Long) null,
+                                        false,
+                                        null,
+                                        "c_double"))
+                        .column(
+                                PhysicalColumn.of(
+                                        "c_decimal",
+                                        new DecimalType(10, 2),
+                                        (Long) null,
+                                        false,
+                                        null,
+                                        "c_decimal"))
+                        .column(
+                                PhysicalColumn.of(
+                                        "c_bytes",
+                                        BasicType.BYTE_TYPE,
+                                        (Long) null,
+                                        false,
+                                        null,
+                                        "c_bytes"))
+                        .column(
+                                PhysicalColumn.of(
+                                        "c_date",
+                                        LocalTimeType.LOCAL_DATE_TYPE,
+                                        (Long) null,
+                                        false,
+                                        null,
+                                        "c_date"))
+                        .column(
+                                PhysicalColumn.of(
+                                        "c_timestamp",
+                                        LocalTimeType.LOCAL_DATE_TIME_TYPE,
+                                        (Long) null,
+                                        false,
+                                        null,
+                                        "c_timestamp"));
+    }
+
+    @Test
+    public void primaryDataTypeError() {
+        TableSchema tableSchema =
+                schemaBuilder
+                        .primaryKey(
+                                PrimaryKey.of("pk", Arrays.asList("c_map", 
"c_array", "c_string")))
+                        .build();
+        CatalogTable catalogTable =
+                CatalogTable.of(
+                        TableIdentifier.of(CATALOG_NAME, DATABASE_NAME, 
TABLE_NAME),
+                        tableSchema,
+                        new HashMap<>(),
+                        new ArrayList<>(),
+                        "test table");
+        Assertions.assertThrows(
+                PaimonConnectorException.class,
+                () -> {
+                    try {
+                        paimonCatalog.createTable(
+                                TablePath.of("default.default.default"), 
catalogTable, true);
+                    } catch (Exception e) {
+                        Assertions.assertTrue(
+                                e.getMessage()
+                                        .contains(
+                                                PaimonConnectorErrorCode
+                                                        
.UNSUPPORTED_PRIMARY_DATATYPE
+                                                        .getCode()));
+                        throw e;
+                    }
+                });
+    }
+
+    @Test
+    public void bucketKeyError() {
+        TableSchema tableSchema =
+                schemaBuilder
+                        .primaryKey(PrimaryKey.of("pk", 
Arrays.asList("c_string", "c_bigint")))
+                        .build();
+        CatalogTable catalogTable =
+                CatalogTable.of(
+                        TableIdentifier.of(CATALOG_NAME, DATABASE_NAME, 
TABLE_NAME),
+                        tableSchema,
+                        new HashMap<>(),
+                        new ArrayList<>(),
+                        "test table");
+        Assertions.assertThrows(
+                PaimonConnectorException.class,
+                () -> {
+                    try {
+                        paimonCatalog.createTable(
+                                TablePath.of("default.default.default"), 
catalogTable, false);
+                    } catch (Exception e) {
+                        Assertions.assertTrue(
+                                e.getMessage()
+                                        .contains(
+                                                PaimonConnectorErrorCode
+                                                        
.WRITE_PROPS_BUCKET_KEY_ERROR
+                                                        .getCode()));
+                        throw e;
+                    }
+                });
+    }
+
+    @AfterEach
+    public void after() {
+        paimonCatalog.dropDatabase(TablePath.of(DATABASE_NAME, TABLE_NAME), 
false);
+        paimonCatalog.close();
+    }
+}
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkDynamicBucketIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkDynamicBucketIT.java
index 8e36e07cd1..adb3d8a6c0 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkDynamicBucketIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkDynamicBucketIT.java
@@ -19,6 +19,7 @@ package org.apache.seatunnel.e2e.connector.paimon;
 
 import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.common.utils.FileUtils;
+import org.apache.seatunnel.common.utils.SeaTunnelException;
 import 
org.apache.seatunnel.connectors.seatunnel.paimon.catalog.PaimonCatalogLoader;
 import 
org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonSinkConfig;
 import org.apache.seatunnel.core.starter.utils.CompressionUtils;
@@ -35,6 +36,8 @@ import org.apache.paimon.catalog.CatalogContext;
 import org.apache.paimon.catalog.CatalogFactory;
 import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.crosspartition.IndexBootstrap;
+import org.apache.paimon.data.InternalArray;
+import org.apache.paimon.data.InternalMap;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.reader.RecordReader;
@@ -343,6 +346,34 @@ public class PaimonSinkDynamicBucketIT extends 
TestSuiteBase implements TestReso
                         });
     }
 
+    @TestTemplate
+    public void primaryFullTypeAndLoadData(TestContainer container)
+            throws IOException, InterruptedException {
+        Container.ExecResult writeResult =
+                
container.executeJob("/fake_to_dynamic_bucket_paimon_case6.conf");
+        Assertions.assertEquals(0, writeResult.getExitCode());
+
+        given().ignoreExceptions()
+                .await()
+                .atLeast(100L, TimeUnit.MILLISECONDS)
+                .atMost(60L, TimeUnit.SECONDS)
+                .untilAsserted(
+                        () -> {
+                            
container.executeExtraCommands(containerExtendedFactory);
+                            FileStoreTable table =
+                                    (FileStoreTable) getTable("full_type", 
"st_test");
+                            List<String> primaryKeys = 
table.schema().primaryKeys();
+                            Assertions.assertEquals(12, primaryKeys.size());
+                            List<PaimonRecordWithFullType> paimonSourceRecords 
=
+                                    loadPaimonDataWithFullType(table);
+                            Assertions.assertEquals(6, 
paimonSourceRecords.size());
+                        });
+        // load full_type.st_test table data and initialize the 
PaimonBucketAssigner class
+        Container.ExecResult writeResult1 =
+                
container.executeJob("/fake_to_dynamic_bucket_paimon_case7.conf");
+        Assertions.assertEquals(0, writeResult1.getExitCode());
+    }
+
     protected final ContainerExtendedFactory containerExtendedFactory =
             container -> {
                 if (isWindows) {
@@ -417,4 +448,45 @@ public class PaimonSinkDynamicBucketIT extends 
TestSuiteBase implements TestReso
             throw new RuntimeException("table not exist");
         }
     }
+
+    private List<PaimonRecordWithFullType> 
loadPaimonDataWithFullType(FileStoreTable table) {
+        ReadBuilder readBuilder = table.newReadBuilder();
+        TableScan.Plan plan = readBuilder.newScan().plan();
+        TableRead tableRead = readBuilder.newRead();
+        List<PaimonRecordWithFullType> result = new ArrayList<>();
+        try (RecordReader<InternalRow> reader = tableRead.createReader(plan)) {
+            reader.forEachRemaining(
+                    row -> {
+                        InternalMap internalMap = row.getMap(0);
+                        InternalArray keyArray = internalMap.keyArray();
+                        InternalArray valueArray = internalMap.valueArray();
+                        HashMap<Object, Object> map = new 
HashMap<>(internalMap.size());
+                        for (int i = 0; i < internalMap.size(); i++) {
+                            map.put(keyArray.getString(i), 
valueArray.getString(i));
+                        }
+                        InternalArray internalArray = row.getArray(1);
+                        int[] intArray = internalArray.toIntArray();
+                        PaimonRecordWithFullType paimonRecordWithFullType =
+                                new PaimonRecordWithFullType(
+                                        map,
+                                        intArray,
+                                        row.getString(2),
+                                        row.getBoolean(3),
+                                        row.getShort(4),
+                                        row.getShort(5),
+                                        row.getInt(6),
+                                        row.getLong(7),
+                                        row.getFloat(8),
+                                        row.getDouble(9),
+                                        row.getDecimal(10, 30, 8),
+                                        row.getString(11),
+                                        row.getInt(12),
+                                        row.getTimestamp(13, 6));
+                        result.add(paimonRecordWithFullType);
+                    });
+        } catch (IOException e) {
+            throw new SeaTunnelException(e);
+        }
+        return result;
+    }
 }
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_dynamic_bucket_paimon_case6.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_dynamic_bucket_paimon_case6.conf
new file mode 100644
index 0000000000..5ab8be50d9
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_dynamic_bucket_paimon_case6.conf
@@ -0,0 +1,94 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  parallelism = 1
+  job.mode = "BATCH"
+}
+
+source {
+  FakeSource {
+    schema = {
+      fields {
+        c_map = "map<string, string>"
+        c_array = "array<int>"
+        c_string = string
+        c_boolean = boolean
+        c_tinyint = tinyint
+        c_smallint = smallint
+        c_int = int
+        c_bigint = bigint
+        c_float = float
+        c_double = double
+        c_decimal = "decimal(30, 8)"
+        c_bytes = bytes
+        c_date = date
+        c_timestamp = timestamp
+      }
+      primaryKey {
+        name = "pk"
+        columnNames = 
[c_string,c_boolean,c_tinyint,c_smallint,c_int,c_bigint,c_float,c_double,c_decimal,c_bytes,c_date,c_timestamp]
+      }
+    }
+    rows = [
+      {
+        kind = INSERT
+        fields = [{"a": "b"}, [101], "c_string", true, 117, 15987, 563873951, 
7084913402530365001, 1.21, 1.231, "2924137191386439303744.39292211", 
"bWlJWmo=", "2023-04-21", "2023-04-21T23:20:58"]
+      }
+      {
+        kind = INSERT
+        fields = [{"a": "c"}, [102], "c_string1", false, 118, 15988, 
563873952, 7084913402530365002, 1.22, 1.232, "2924137191386439303744.39292212", 
"bWlJWmo=", "2023-04-22", "2023-04-22T23:20:58"]
+      }
+      {
+        kind = INSERT
+        fields = [{"a": "e"}, [103], "c_string2", true, 119, 15987, 563873953, 
7084913402530365003, 1.23, 1.233, "2924137191386439303744.39292213", 
"bWlJWmo=", "2023-04-23", "2023-04-23T23:20:58"]
+      }
+      {
+        kind = INSERT
+        fields = [{"a": "f"}, [104], null, false, 118, 15988, 563873951, 
7084913402530365004, 1.24, 1.234, "2924137191386439303744.39292214", 
"bWlJWmo=", "2023-04-24", "2023-04-24T23:20:58"]
+      }
+      {
+        kind = INSERT
+        fields = [{"a": "b"}, [101], "c_string1", true, 120, 15987, 563873952, 
7084913402530365001, 1.21, 1.231, "2924137191386439303744.39292211", 
"bWlJWmo=", "2023-04-25", "2023-04-25T23:20:58"]
+      }
+      {
+        kind = UPDATE_BEFORE
+        fields = [{"a": "c"}, [102], "c_string2", false, 116, 15987, 
563873953, 7084913402530365002, 1.22, 1.232, "2924137191386439303744.39292212", 
"bWlJWmo=", "2023-04-26", "2023-04-26T23:20:58"]
+      }
+      {
+        kind = UPDATE_AFTER
+        fields = [{"a": "e"}, [103], "c_string3", true, 116, 15989, 563873951, 
7084913402530365003, 1.23, 1.233, "2924137191386439303744.39292213", 
"bWlJWmo=", "2023-04-27", "2023-04-27T23:20:58"]
+      }
+      {
+        kind = DELETE
+        fields = [{"a": "f"}, [104], "c_string4", true, 120, 15987, 563873952, 
7084913402530365004, 1.24, 1.234, "2924137191386439303744.39292214", 
"bWlJWmo=", "2023-04-28", "2023-04-28T23:20:58"]
+      }
+    ]
+    result_table_name = "fake"
+  }
+}
+
+sink {
+  Paimon {
+    warehouse = "file:///tmp/paimon"
+    database = "full_type"
+    table = "st_test"
+    paimon.table.write-props = {
+       bucket = -1
+    }
+  }
+}
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_dynamic_bucket_paimon_case7.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_dynamic_bucket_paimon_case7.conf
new file mode 100644
index 0000000000..04632a6ca7
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_dynamic_bucket_paimon_case7.conf
@@ -0,0 +1,82 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  parallelism = 1
+  job.mode = "BATCH"
+}
+
+source {
+  FakeSource {
+    schema = {
+      fields {
+        c_map = "map<string, string>"
+        c_array = "array<int>"
+        c_string = string
+        c_boolean = boolean
+        c_tinyint = tinyint
+        c_smallint = smallint
+        c_int = int
+        c_bigint = bigint
+        c_float = float
+        c_double = double
+        c_decimal = "decimal(30, 8)"
+        c_bytes = bytes
+        c_date = date
+        c_timestamp = timestamp
+      }
+      primaryKey {
+        name = "pk"
+        columnNames = 
[c_string,c_boolean,c_tinyint,c_smallint,c_int,c_bigint,c_float,c_double,c_decimal,c_bytes,c_date,c_timestamp]
+      }
+    }
+    rows = [
+      {
+        kind = INSERT
+        fields = [{"a": "b"}, [101], "c_string", true, 121, 15987, 563873951, 
7084913402530365001, 1.21, 1.231, "2924137191386439303744.39292211", 
"bWlJWmo=", "2023-04-21", "2023-04-21T23:20:58"]
+      }
+      {
+        kind = INSERT
+        fields = [{"a": "b"}, [101], "c_string1", true, 122, 15987, 563873952, 
7084913402530365001, 1.21, 1.231, "2924137191386439303744.39292211", 
"bWlJWmo=", "2023-04-25", "2023-04-25T23:20:58"]
+      }
+      {
+        kind = UPDATE_BEFORE
+        fields = [{"a": "c"}, [102], "c_string2", true, 117, 15987, 563873953, 
7084913402530365002, 1.22, 1.232, "2924137191386439303744.39292212", 
"bWlJWmo=", "2023-04-26", "2023-04-26T23:20:58"]
+      }
+      {
+        kind = UPDATE_AFTER
+        fields = [{"a": "e"}, [103], "c_string3", false, 117, 15989, 
563873951, 7084913402530365003, 1.23, 1.233, "2924137191386439303744.39292213", 
"bWlJWmo=", "2023-04-27", "2023-04-27T23:20:58"]
+      }
+      {
+        kind = DELETE
+        fields = [{"a": "e"}, [103], "c_string2", true, 119, 15987, 563873953, 
7084913402530365003, 1.23, 1.233, "2924137191386439303744.39292213", 
"bWlJWmo=", "2023-04-23", "2023-04-23T23:20:58"]
+      }
+    ]
+    result_table_name = "fake"
+  }
+}
+
+sink {
+  Paimon {
+    warehouse = "/tmp/paimon"
+    database = "full_type"
+    table = "st_test"
+    paimon.table.write-props = {
+      bucket = -1
+    }
+  }
+}


Reply via email to