This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new dc7f695537 [Fix][Connecotr-V2] Fix paimon dynamic bucket tale in
primary key is not first (#7728)
dc7f695537 is described below
commit dc7f695537fe4942ff31e57039f592c6661b88bc
Author: zhangdonghao <[email protected]>
AuthorDate: Fri Sep 27 09:41:58 2024 +0800
[Fix][Connecotr-V2] Fix paimon dynamic bucket tale in primary key is not
first (#7728)
---
.../seatunnel/paimon/catalog/PaimonCatalog.java | 24 ++
.../seatunnel/paimon/config/PaimonSinkConfig.java | 9 +
.../paimon/exception/PaimonConnectorErrorCode.java | 4 +-
.../seatunnel/paimon/sink/PaimonSinkWriter.java | 4 +
.../paimon/sink/bucket/PaimonBucketAssigner.java | 40 ++--
.../paimon/catalog/PaimonCatalogTest.java | 255 +++++++++++++++++++++
.../paimon/PaimonSinkDynamicBucketIT.java | 72 ++++++
.../fake_to_dynamic_bucket_paimon_case6.conf | 94 ++++++++
.../fake_to_dynamic_bucket_paimon_case7.conf | 82 +++++++
9 files changed, 567 insertions(+), 17 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalog.java
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalog.java
index 9e09035e2f..3075358470 100644
---
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalog.java
+++
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalog.java
@@ -31,6 +31,8 @@ import
org.apache.seatunnel.api.table.catalog.exception.TableNotExistException;
import org.apache.seatunnel.api.table.converter.BasicTypeDefine;
import org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonConfig;
import
org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonSinkConfig;
+import
org.apache.seatunnel.connectors.seatunnel.paimon.exception.PaimonConnectorErrorCode;
+import
org.apache.seatunnel.connectors.seatunnel.paimon.exception.PaimonConnectorException;
import org.apache.seatunnel.connectors.seatunnel.paimon.utils.SchemaUtil;
import org.apache.paimon.catalog.Identifier;
@@ -161,6 +163,8 @@ public class PaimonCatalog implements Catalog, PaimonTable {
throw new TableAlreadyExistException(this.catalogName, tablePath);
} catch (org.apache.paimon.catalog.Catalog.DatabaseNotExistException
e) {
throw new DatabaseNotExistException(this.catalogName,
tablePath.getDatabaseName());
+ } catch (Exception e) {
+ resolveException(e);
}
}
@@ -273,4 +277,24 @@ public class PaimonCatalog implements Catalog, PaimonTable
{
private Identifier toIdentifier(TablePath tablePath) {
return Identifier.create(tablePath.getDatabaseName(),
tablePath.getTableName());
}
+
+ private void resolveException(Exception e) {
+ Throwable cause = e.getCause();
+ if (cause instanceof UnsupportedOperationException) {
+ String message = cause.getMessage();
+ if (message.contains("The type ")
+ && message.contains(" in primary key field ")
+ && message.contains(" is unsupported")) {
+ throw new PaimonConnectorException(
+ PaimonConnectorErrorCode.UNSUPPORTED_PRIMARY_DATATYPE,
message);
+ }
+ } else if (cause instanceof RuntimeException) {
+ String message = cause.getMessage();
+ if (message.contains("Cannot define 'bucket-key' in unaware or
dynamic bucket mode.")) {
+ throw new PaimonConnectorException(
+ PaimonConnectorErrorCode.WRITE_PROPS_BUCKET_KEY_ERROR,
message);
+ }
+ }
+ throw new CatalogException("An unexpected error occurred", e);
+ }
}
diff --git
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/config/PaimonSinkConfig.java
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/config/PaimonSinkConfig.java
index b706e622f7..9b358a2e8c 100644
---
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/config/PaimonSinkConfig.java
+++
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/config/PaimonSinkConfig.java
@@ -24,12 +24,14 @@ import org.apache.seatunnel.api.sink.DataSaveMode;
import org.apache.seatunnel.api.sink.SchemaSaveMode;
import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@Getter
+@Slf4j
public class PaimonSinkConfig extends PaimonConfig {
public static final Option<SchemaSaveMode> SCHEMA_SAVE_MODE =
Options.key("schema_save_mode")
@@ -77,5 +79,12 @@ public class PaimonSinkConfig extends PaimonConfig {
this.primaryKeys = stringToList(readonlyConfig.get(PRIMARY_KEYS), ",");
this.partitionKeys = stringToList(readonlyConfig.get(PARTITION_KEYS),
",");
this.writeProps = readonlyConfig.get(WRITE_PROPS);
+ checkConfig();
+ }
+
+ private void checkConfig() {
+ if (this.primaryKeys.isEmpty() &&
"-1".equals(this.writeProps.get("bucket"))) {
+ log.warn("Append only table currently do not support dynamic
bucket");
+ }
}
}
diff --git
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/exception/PaimonConnectorErrorCode.java
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/exception/PaimonConnectorErrorCode.java
index ed4c80a40d..237c7c2448 100644
---
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/exception/PaimonConnectorErrorCode.java
+++
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/exception/PaimonConnectorErrorCode.java
@@ -27,7 +27,9 @@ public enum PaimonConnectorErrorCode implements
SeaTunnelErrorCode {
GET_TABLE_FAILED("PAIMON-04", "Get table from database failed"),
AUTHENTICATE_KERBEROS_FAILED("PAIMON-05", "Authenticate kerberos failed"),
LOAD_CATALOG("PAIMON-06", "Load catalog failed"),
- GET_FILED_FAILED("PAIMON-07", "Get field failed");
+ GET_FILED_FAILED("PAIMON-07", "Get field failed"),
+ UNSUPPORTED_PRIMARY_DATATYPE("PAIMON-08", "Paimon primary key datatype is
unsupported"),
+ WRITE_PROPS_BUCKET_KEY_ERROR("PAIMON-09", "Cannot define 'bucket-key' in
dynamic bucket mode");
private final String code;
private final String description;
diff --git
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkWriter.java
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkWriter.java
index 18442d05b5..7a3fe6d033 100644
---
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkWriter.java
+++
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkWriter.java
@@ -104,6 +104,10 @@ public class PaimonSinkWriter
BucketMode bucketMode = ((FileStoreTable) table).bucketMode();
this.dynamicBucket =
BucketMode.DYNAMIC == bucketMode || BucketMode.GLOBAL_DYNAMIC
== bucketMode;
+ int bucket = ((FileStoreTable) table).coreOptions().bucket();
+ if (bucket == -1 && BucketMode.UNAWARE == bucketMode) {
+ log.warn("Append only table currently do not support dynamic
bucket");
+ }
if (dynamicBucket) {
this.bucketAssigner =
new PaimonBucketAssigner(
diff --git
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/bucket/PaimonBucketAssigner.java
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/bucket/PaimonBucketAssigner.java
index 4f5f681fff..16804754fa 100644
---
a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/bucket/PaimonBucketAssigner.java
+++
b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/bucket/PaimonBucketAssigner.java
@@ -17,10 +17,8 @@
package org.apache.seatunnel.connectors.seatunnel.paimon.sink.bucket;
-import org.apache.commons.collections.CollectionUtils;
-import org.apache.paimon.codegen.CodeGenUtils;
-import org.apache.paimon.codegen.Projection;
import org.apache.paimon.crosspartition.IndexBootstrap;
+import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.index.SimpleHashBucketAssigner;
import org.apache.paimon.reader.RecordReader;
@@ -29,15 +27,20 @@ import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.sink.RowPartitionKeyExtractor;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataType;
import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
public class PaimonBucketAssigner {
private final RowPartitionKeyExtractor extractor;
- private final Projection bucketKeyProjection;
-
private final SimpleHashBucketAssigner simpleHashBucketAssigner;
private final TableSchema schema;
@@ -46,10 +49,6 @@ public class PaimonBucketAssigner {
FileStoreTable fileStoreTable = (FileStoreTable) table;
this.schema = fileStoreTable.schema();
this.extractor = new RowPartitionKeyExtractor(fileStoreTable.schema());
- this.bucketKeyProjection =
- CodeGenUtils.newProjection(
- fileStoreTable.schema().logicalRowType(),
-
fileStoreTable.schema().projection(fileStoreTable.schema().bucketKeys()));
long dynamicBucketTargetRowNum =
((FileStoreTable)
table).coreOptions().dynamicBucketTargetRowNum();
this.simpleHashBucketAssigner =
@@ -59,13 +58,27 @@ public class PaimonBucketAssigner {
private void loadBucketIndex(FileStoreTable fileStoreTable, int
numAssigners, int assignId) {
IndexBootstrap indexBootstrap = new IndexBootstrap(fileStoreTable);
+ List<String> fieldNames = schema.fieldNames();
+ Map<String, Integer> fieldIndexMap =
+ IntStream.range(0, fieldNames.size())
+ .boxed()
+ .collect(Collectors.toMap(fieldNames::get,
Function.identity()));
+ List<DataField> primaryKeys = schema.primaryKeysFields();
try (RecordReader<InternalRow> recordReader =
indexBootstrap.bootstrap(numAssigners, assignId)) {
RecordReaderIterator<InternalRow> readerIterator =
new RecordReaderIterator<>(recordReader);
while (readerIterator.hasNext()) {
InternalRow row = readerIterator.next();
- assign(row);
+ GenericRow binaryRow = new GenericRow(fieldNames.size());
+ for (int i = 0; i < primaryKeys.size(); i++) {
+ String name = primaryKeys.get(i).name();
+ DataType type = primaryKeys.get(i).type();
+ binaryRow.setField(
+ fieldIndexMap.get(name),
+ InternalRow.createFieldGetter(type,
i).getFieldOrNull(row));
+ }
+ assign(binaryRow);
}
} catch (IOException e) {
throw new RuntimeException(e);
@@ -73,12 +86,7 @@ public class PaimonBucketAssigner {
}
public int assign(InternalRow rowData) {
- int hash;
- if (CollectionUtils.isEmpty(this.schema.bucketKeys())) {
- hash = extractor.trimmedPrimaryKey(rowData).hashCode();
- } else {
- hash = bucketKeyProjection.apply(rowData).hashCode();
- }
+ int hash = extractor.trimmedPrimaryKey(rowData).hashCode();
return Math.abs(
this.simpleHashBucketAssigner.assign(this.extractor.partition(rowData), hash));
}
diff --git
a/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalogTest.java
b/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalogTest.java
new file mode 100644
index 0000000000..9ebf758439
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-paimon/src/test/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalogTest.java
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.paimon.catalog;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
+import org.apache.seatunnel.api.table.catalog.PrimaryKey;
+import org.apache.seatunnel.api.table.catalog.TableIdentifier;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
+import org.apache.seatunnel.api.table.type.ArrayType;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.DecimalType;
+import org.apache.seatunnel.api.table.type.LocalTimeType;
+import org.apache.seatunnel.api.table.type.MapType;
+import
org.apache.seatunnel.connectors.seatunnel.paimon.exception.PaimonConnectorErrorCode;
+import
org.apache.seatunnel.connectors.seatunnel.paimon.exception.PaimonConnectorException;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+@Slf4j
+public class PaimonCatalogTest {
+
+ private PaimonCatalog paimonCatalog;
+ private TableSchema.Builder schemaBuilder;
+ private final String CATALOG_NAME = "paimon_catalog";
+ private final String DATABASE_NAME = "default";
+ private final String TABLE_NAME = "test_table";
+
+ @BeforeEach
+ public void before() {
+ Map<String, Object> properties = new HashMap<>();
+ properties.put("warehouse", "/tmp/paimon");
+ properties.put("plugin_name", "Paimon");
+ properties.put("database", DATABASE_NAME);
+ properties.put("table", TABLE_NAME);
+ Map<String, String> writeProps = new HashMap<>();
+ writeProps.put("bucket", "-1");
+ writeProps.put("bucket-key", "c_string");
+ properties.put("paimon.table.write-props", writeProps);
+ ReadonlyConfig config = ReadonlyConfig.fromMap(properties);
+ paimonCatalog = new PaimonCatalog(CATALOG_NAME, config);
+ paimonCatalog.open();
+ paimonCatalog.createDatabase(TablePath.of(DATABASE_NAME, TABLE_NAME),
false);
+ this.schemaBuilder =
+ TableSchema.builder()
+ .column(
+ PhysicalColumn.of(
+ "c_map",
+ new MapType<>(BasicType.STRING_TYPE,
BasicType.STRING_TYPE),
+ (Long) null,
+ true,
+ null,
+ null))
+ .column(
+ PhysicalColumn.of(
+ "c_array",
+ ArrayType.STRING_ARRAY_TYPE,
+ (Long) null,
+ false,
+ null,
+ "c_array"))
+ .column(
+ PhysicalColumn.of(
+ "c_string",
+ BasicType.STRING_TYPE,
+ (Long) null,
+ false,
+ null,
+ "c_string"))
+ .column(
+ PhysicalColumn.of(
+ "c_boolean",
+ BasicType.BOOLEAN_TYPE,
+ (Long) null,
+ false,
+ null,
+ "c_boolean"))
+ .column(
+ PhysicalColumn.of(
+ "c_tinyint",
+ BasicType.INT_TYPE,
+ (Long) null,
+ false,
+ null,
+ "c_tinyint"))
+ .column(
+ PhysicalColumn.of(
+ "c_smallint",
+ BasicType.INT_TYPE,
+ (Long) null,
+ false,
+ null,
+ "c_smallint"))
+ .column(
+ PhysicalColumn.of(
+ "c_int",
+ BasicType.INT_TYPE,
+ (Long) null,
+ false,
+ null,
+ "c_int"))
+ .column(
+ PhysicalColumn.of(
+ "c_bigint",
+ BasicType.LONG_TYPE,
+ (Long) null,
+ false,
+ null,
+ "c_bigint"))
+ .column(
+ PhysicalColumn.of(
+ "c_float",
+ BasicType.FLOAT_TYPE,
+ (Long) null,
+ false,
+ null,
+ "c_float"))
+ .column(
+ PhysicalColumn.of(
+ "c_double",
+ BasicType.DOUBLE_TYPE,
+ (Long) null,
+ false,
+ null,
+ "c_double"))
+ .column(
+ PhysicalColumn.of(
+ "c_decimal",
+ new DecimalType(10, 2),
+ (Long) null,
+ false,
+ null,
+ "c_decimal"))
+ .column(
+ PhysicalColumn.of(
+ "c_bytes",
+ BasicType.BYTE_TYPE,
+ (Long) null,
+ false,
+ null,
+ "c_bytes"))
+ .column(
+ PhysicalColumn.of(
+ "c_date",
+ LocalTimeType.LOCAL_DATE_TYPE,
+ (Long) null,
+ false,
+ null,
+ "c_date"))
+ .column(
+ PhysicalColumn.of(
+ "c_timestamp",
+ LocalTimeType.LOCAL_DATE_TIME_TYPE,
+ (Long) null,
+ false,
+ null,
+ "c_timestamp"));
+ }
+
+ @Test
+ public void primaryDataTypeError() {
+ TableSchema tableSchema =
+ schemaBuilder
+ .primaryKey(
+ PrimaryKey.of("pk", Arrays.asList("c_map",
"c_array", "c_string")))
+ .build();
+ CatalogTable catalogTable =
+ CatalogTable.of(
+ TableIdentifier.of(CATALOG_NAME, DATABASE_NAME,
TABLE_NAME),
+ tableSchema,
+ new HashMap<>(),
+ new ArrayList<>(),
+ "test table");
+ Assertions.assertThrows(
+ PaimonConnectorException.class,
+ () -> {
+ try {
+ paimonCatalog.createTable(
+ TablePath.of("default.default.default"),
catalogTable, true);
+ } catch (Exception e) {
+ Assertions.assertTrue(
+ e.getMessage()
+ .contains(
+ PaimonConnectorErrorCode
+
.UNSUPPORTED_PRIMARY_DATATYPE
+ .getCode()));
+ throw e;
+ }
+ });
+ }
+
+ @Test
+ public void bucketKeyError() {
+ TableSchema tableSchema =
+ schemaBuilder
+ .primaryKey(PrimaryKey.of("pk",
Arrays.asList("c_string", "c_bigint")))
+ .build();
+ CatalogTable catalogTable =
+ CatalogTable.of(
+ TableIdentifier.of(CATALOG_NAME, DATABASE_NAME,
TABLE_NAME),
+ tableSchema,
+ new HashMap<>(),
+ new ArrayList<>(),
+ "test table");
+ Assertions.assertThrows(
+ PaimonConnectorException.class,
+ () -> {
+ try {
+ paimonCatalog.createTable(
+ TablePath.of("default.default.default"),
catalogTable, false);
+ } catch (Exception e) {
+ Assertions.assertTrue(
+ e.getMessage()
+ .contains(
+ PaimonConnectorErrorCode
+
.WRITE_PROPS_BUCKET_KEY_ERROR
+ .getCode()));
+ throw e;
+ }
+ });
+ }
+
+ @AfterEach
+ public void after() {
+ paimonCatalog.dropDatabase(TablePath.of(DATABASE_NAME, TABLE_NAME),
false);
+ paimonCatalog.close();
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkDynamicBucketIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkDynamicBucketIT.java
index 8e36e07cd1..adb3d8a6c0 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkDynamicBucketIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkDynamicBucketIT.java
@@ -19,6 +19,7 @@ package org.apache.seatunnel.e2e.connector.paimon;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.common.utils.FileUtils;
+import org.apache.seatunnel.common.utils.SeaTunnelException;
import
org.apache.seatunnel.connectors.seatunnel.paimon.catalog.PaimonCatalogLoader;
import
org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonSinkConfig;
import org.apache.seatunnel.core.starter.utils.CompressionUtils;
@@ -35,6 +36,8 @@ import org.apache.paimon.catalog.CatalogContext;
import org.apache.paimon.catalog.CatalogFactory;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.crosspartition.IndexBootstrap;
+import org.apache.paimon.data.InternalArray;
+import org.apache.paimon.data.InternalMap;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.options.Options;
import org.apache.paimon.reader.RecordReader;
@@ -343,6 +346,34 @@ public class PaimonSinkDynamicBucketIT extends
TestSuiteBase implements TestReso
});
}
+ @TestTemplate
+ public void primaryFullTypeAndLoadData(TestContainer container)
+ throws IOException, InterruptedException {
+ Container.ExecResult writeResult =
+
container.executeJob("/fake_to_dynamic_bucket_paimon_case6.conf");
+ Assertions.assertEquals(0, writeResult.getExitCode());
+
+ given().ignoreExceptions()
+ .await()
+ .atLeast(100L, TimeUnit.MILLISECONDS)
+ .atMost(60L, TimeUnit.SECONDS)
+ .untilAsserted(
+ () -> {
+
container.executeExtraCommands(containerExtendedFactory);
+ FileStoreTable table =
+ (FileStoreTable) getTable("full_type",
"st_test");
+ List<String> primaryKeys =
table.schema().primaryKeys();
+ Assertions.assertEquals(12, primaryKeys.size());
+ List<PaimonRecordWithFullType> paimonSourceRecords
=
+ loadPaimonDataWithFullType(table);
+ Assertions.assertEquals(6,
paimonSourceRecords.size());
+ });
+ // load full_type.st_test table data and initialize the
PaimonBucketAssigner class
+ Container.ExecResult writeResult1 =
+
container.executeJob("/fake_to_dynamic_bucket_paimon_case7.conf");
+ Assertions.assertEquals(0, writeResult1.getExitCode());
+ }
+
protected final ContainerExtendedFactory containerExtendedFactory =
container -> {
if (isWindows) {
@@ -417,4 +448,45 @@ public class PaimonSinkDynamicBucketIT extends
TestSuiteBase implements TestReso
throw new RuntimeException("table not exist");
}
}
+
+ private List<PaimonRecordWithFullType>
loadPaimonDataWithFullType(FileStoreTable table) {
+ ReadBuilder readBuilder = table.newReadBuilder();
+ TableScan.Plan plan = readBuilder.newScan().plan();
+ TableRead tableRead = readBuilder.newRead();
+ List<PaimonRecordWithFullType> result = new ArrayList<>();
+ try (RecordReader<InternalRow> reader = tableRead.createReader(plan)) {
+ reader.forEachRemaining(
+ row -> {
+ InternalMap internalMap = row.getMap(0);
+ InternalArray keyArray = internalMap.keyArray();
+ InternalArray valueArray = internalMap.valueArray();
+ HashMap<Object, Object> map = new
HashMap<>(internalMap.size());
+ for (int i = 0; i < internalMap.size(); i++) {
+ map.put(keyArray.getString(i),
valueArray.getString(i));
+ }
+ InternalArray internalArray = row.getArray(1);
+ int[] intArray = internalArray.toIntArray();
+ PaimonRecordWithFullType paimonRecordWithFullType =
+ new PaimonRecordWithFullType(
+ map,
+ intArray,
+ row.getString(2),
+ row.getBoolean(3),
+ row.getShort(4),
+ row.getShort(5),
+ row.getInt(6),
+ row.getLong(7),
+ row.getFloat(8),
+ row.getDouble(9),
+ row.getDecimal(10, 30, 8),
+ row.getString(11),
+ row.getInt(12),
+ row.getTimestamp(13, 6));
+ result.add(paimonRecordWithFullType);
+ });
+ } catch (IOException e) {
+ throw new SeaTunnelException(e);
+ }
+ return result;
+ }
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_dynamic_bucket_paimon_case6.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_dynamic_bucket_paimon_case6.conf
new file mode 100644
index 0000000000..5ab8be50d9
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_dynamic_bucket_paimon_case6.conf
@@ -0,0 +1,94 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ FakeSource {
+ schema = {
+ fields {
+ c_map = "map<string, string>"
+ c_array = "array<int>"
+ c_string = string
+ c_boolean = boolean
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_int = int
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ c_decimal = "decimal(30, 8)"
+ c_bytes = bytes
+ c_date = date
+ c_timestamp = timestamp
+ }
+ primaryKey {
+ name = "pk"
+ columnNames =
[c_string,c_boolean,c_tinyint,c_smallint,c_int,c_bigint,c_float,c_double,c_decimal,c_bytes,c_date,c_timestamp]
+ }
+ }
+ rows = [
+ {
+ kind = INSERT
+ fields = [{"a": "b"}, [101], "c_string", true, 117, 15987, 563873951,
7084913402530365001, 1.21, 1.231, "2924137191386439303744.39292211",
"bWlJWmo=", "2023-04-21", "2023-04-21T23:20:58"]
+ }
+ {
+ kind = INSERT
+ fields = [{"a": "c"}, [102], "c_string1", false, 118, 15988,
563873952, 7084913402530365002, 1.22, 1.232, "2924137191386439303744.39292212",
"bWlJWmo=", "2023-04-22", "2023-04-22T23:20:58"]
+ }
+ {
+ kind = INSERT
+ fields = [{"a": "e"}, [103], "c_string2", true, 119, 15987, 563873953,
7084913402530365003, 1.23, 1.233, "2924137191386439303744.39292213",
"bWlJWmo=", "2023-04-23", "2023-04-23T23:20:58"]
+ }
+ {
+ kind = INSERT
+ fields = [{"a": "f"}, [104], null, false, 118, 15988, 563873951,
7084913402530365004, 1.24, 1.234, "2924137191386439303744.39292214",
"bWlJWmo=", "2023-04-24", "2023-04-24T23:20:58"]
+ }
+ {
+ kind = INSERT
+ fields = [{"a": "b"}, [101], "c_string1", true, 120, 15987, 563873952,
7084913402530365001, 1.21, 1.231, "2924137191386439303744.39292211",
"bWlJWmo=", "2023-04-25", "2023-04-25T23:20:58"]
+ }
+ {
+ kind = UPDATE_BEFORE
+ fields = [{"a": "c"}, [102], "c_string2", false, 116, 15987,
563873953, 7084913402530365002, 1.22, 1.232, "2924137191386439303744.39292212",
"bWlJWmo=", "2023-04-26", "2023-04-26T23:20:58"]
+ }
+ {
+ kind = UPDATE_AFTER
+ fields = [{"a": "e"}, [103], "c_string3", true, 116, 15989, 563873951,
7084913402530365003, 1.23, 1.233, "2924137191386439303744.39292213",
"bWlJWmo=", "2023-04-27", "2023-04-27T23:20:58"]
+ }
+ {
+ kind = DELETE
+ fields = [{"a": "f"}, [104], "c_string4", true, 120, 15987, 563873952,
7084913402530365004, 1.24, 1.234, "2924137191386439303744.39292214",
"bWlJWmo=", "2023-04-28", "2023-04-28T23:20:58"]
+ }
+ ]
+ result_table_name = "fake"
+ }
+}
+
+sink {
+ Paimon {
+ warehouse = "file:///tmp/paimon"
+ database = "full_type"
+ table = "st_test"
+ paimon.table.write-props = {
+ bucket = -1
+ }
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_dynamic_bucket_paimon_case7.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_dynamic_bucket_paimon_case7.conf
new file mode 100644
index 0000000000..04632a6ca7
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_dynamic_bucket_paimon_case7.conf
@@ -0,0 +1,82 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ FakeSource {
+ schema = {
+ fields {
+ c_map = "map<string, string>"
+ c_array = "array<int>"
+ c_string = string
+ c_boolean = boolean
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_int = int
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ c_decimal = "decimal(30, 8)"
+ c_bytes = bytes
+ c_date = date
+ c_timestamp = timestamp
+ }
+ primaryKey {
+ name = "pk"
+ columnNames =
[c_string,c_boolean,c_tinyint,c_smallint,c_int,c_bigint,c_float,c_double,c_decimal,c_bytes,c_date,c_timestamp]
+ }
+ }
+ rows = [
+ {
+ kind = INSERT
+ fields = [{"a": "b"}, [101], "c_string", true, 121, 15987, 563873951,
7084913402530365001, 1.21, 1.231, "2924137191386439303744.39292211",
"bWlJWmo=", "2023-04-21", "2023-04-21T23:20:58"]
+ }
+ {
+ kind = INSERT
+ fields = [{"a": "b"}, [101], "c_string1", true, 122, 15987, 563873952,
7084913402530365001, 1.21, 1.231, "2924137191386439303744.39292211",
"bWlJWmo=", "2023-04-25", "2023-04-25T23:20:58"]
+ }
+ {
+ kind = UPDATE_BEFORE
+ fields = [{"a": "c"}, [102], "c_string2", true, 117, 15987, 563873953,
7084913402530365002, 1.22, 1.232, "2924137191386439303744.39292212",
"bWlJWmo=", "2023-04-26", "2023-04-26T23:20:58"]
+ }
+ {
+ kind = UPDATE_AFTER
+ fields = [{"a": "e"}, [103], "c_string3", false, 117, 15989,
563873951, 7084913402530365003, 1.23, 1.233, "2924137191386439303744.39292213",
"bWlJWmo=", "2023-04-27", "2023-04-27T23:20:58"]
+ }
+ {
+ kind = DELETE
+ fields = [{"a": "e"}, [103], "c_string2", true, 119, 15987, 563873953,
7084913402530365003, 1.23, 1.233, "2924137191386439303744.39292213",
"bWlJWmo=", "2023-04-23", "2023-04-23T23:20:58"]
+ }
+ ]
+ result_table_name = "fake"
+ }
+}
+
+sink {
+ Paimon {
+ warehouse = "/tmp/paimon"
+ database = "full_type"
+ table = "st_test"
+ paimon.table.write-props = {
+ bucket = -1
+ }
+ }
+}