This is an automated email from the ASF dual-hosted git repository.

wuchunfu pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 6b36f90f4d [Improve][Iceberg] Add savemode create table primaryKey 
testcase (#7641)
6b36f90f4d is described below

commit 6b36f90f4d55a7a35b19383be03e9616c0938d24
Author: hailin0 <[email protected]>
AuthorDate: Thu Sep 12 17:59:39 2024 +0800

    [Improve][Iceberg] Add savemode create table primaryKey testcase (#7641)
---
 .../seatunnel/iceberg/catalog/IcebergCatalog.java  | 19 +++++++++--
 .../seatunnel/iceberg/utils/SchemaUtils.java       | 39 ++++++++--------------
 .../iceberg/catalog/IcebergCatalogTest.java        |  7 +++-
 3 files changed, 36 insertions(+), 29 deletions(-)

diff --git 
a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/catalog/IcebergCatalog.java
 
b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/catalog/IcebergCatalog.java
index fc28001b2c..60591d9893 100644
--- 
a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/catalog/IcebergCatalog.java
+++ 
b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/catalog/IcebergCatalog.java
@@ -23,6 +23,7 @@ import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.catalog.InfoPreviewResult;
 import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
 import org.apache.seatunnel.api.table.catalog.PreviewResult;
+import org.apache.seatunnel.api.table.catalog.PrimaryKey;
 import org.apache.seatunnel.api.table.catalog.TablePath;
 import org.apache.seatunnel.api.table.catalog.TableSchema;
 import org.apache.seatunnel.api.table.catalog.exception.CatalogException;
@@ -36,6 +37,7 @@ import 
org.apache.seatunnel.connectors.seatunnel.iceberg.config.CommonConfig;
 import org.apache.seatunnel.connectors.seatunnel.iceberg.utils.SchemaUtils;
 
 import org.apache.iceberg.PartitionField;
+import org.apache.iceberg.Schema;
 import org.apache.iceberg.Snapshot;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.catalog.Namespace;
@@ -48,8 +50,11 @@ import lombok.extern.slf4j.Slf4j;
 
 import java.io.Closeable;
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Optional;
+import java.util.Set;
+import java.util.function.Function;
 import java.util.stream.Collectors;
 
 import static com.google.common.base.Preconditions.checkArgument;
@@ -222,7 +227,8 @@ public class IcebergCatalog implements Catalog {
     }
 
     public CatalogTable toCatalogTable(Table icebergTable, TablePath 
tablePath) {
-        List<Types.NestedField> columns = icebergTable.schema().columns();
+        Schema schema = icebergTable.schema();
+        List<Types.NestedField> columns = schema.columns();
         TableSchema.Builder builder = TableSchema.builder();
         columns.forEach(
                 nestedField -> {
@@ -234,12 +240,19 @@ public class IcebergCatalog implements Catalog {
                                     name,
                                     seaTunnelType,
                                     (Long) null,
-                                    true,
+                                    nestedField.isOptional(),
                                     null,
                                     nestedField.doc());
                     builder.column(physicalColumn);
                 });
-
+        Optional.ofNullable(schema.identifierFieldNames())
+                .map(
+                        (Function<Set<String>, Object>)
+                                names ->
+                                        builder.primaryKey(
+                                                PrimaryKey.of(
+                                                        
tablePath.getTableName() + "_pk",
+                                                        new 
ArrayList<>(names))));
         List<String> partitionKeys =
                 icebergTable.spec().fields().stream()
                         .map(PartitionField::name)
diff --git 
a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/utils/SchemaUtils.java
 
b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/utils/SchemaUtils.java
index 54ea5721ac..5047746e9e 100644
--- 
a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/utils/SchemaUtils.java
+++ 
b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/utils/SchemaUtils.java
@@ -36,7 +36,6 @@ import 
org.apache.seatunnel.connectors.seatunnel.iceberg.sink.schema.SchemaChang
 import 
org.apache.seatunnel.connectors.seatunnel.iceberg.sink.schema.SchemaDeleteColumn;
 import 
org.apache.seatunnel.connectors.seatunnel.iceberg.sink.schema.SchemaModifyColumn;
 
-import org.apache.commons.collections.CollectionUtils;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.Table;
@@ -54,17 +53,17 @@ import com.google.common.annotations.VisibleForTesting;
 import lombok.extern.slf4j.Slf4j;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
+import java.util.stream.Collectors;
 
 import static java.util.stream.Collectors.toList;
 
@@ -161,27 +160,17 @@ public class SchemaUtils {
     @NotNull protected static Schema toIcebergSchema(
             TableSchema tableSchema, ReadonlyConfig readonlyConfig) {
         Types.StructType structType = SchemaUtils.toIcebergType(tableSchema);
-        Set<Integer> identifierFieldIds = new HashSet<>();
-        if (Objects.nonNull(readonlyConfig)) {
-            List<String> pks =
-                    
SinkConfig.stringToList(readonlyConfig.get(SinkConfig.TABLE_PRIMARY_KEYS), ",");
-            if (CollectionUtils.isNotEmpty(pks)) {
-                for (String pk : pks) {
-                    Optional<Integer> pkId =
-                            structType.fields().stream()
-                                    .filter(nestedField -> 
nestedField.name().equals(pk))
-                                    .map(Types.NestedField::fieldId)
-                                    .findFirst();
-                    if (!pkId.isPresent()) {
-                        throw new IllegalArgumentException(
-                                String.format(
-                                        "iceberg table pk:%s not present in 
the incoming struct",
-                                        pk));
-                    }
-                    identifierFieldIds.add(pkId.get());
-                }
-            }
-        }
+        Set<Integer> identifierFieldIds =
+                readonlyConfig.getOptional(SinkConfig.TABLE_PRIMARY_KEYS)
+                        .map(e -> SinkConfig.stringToList(e, ","))
+                        .orElseGet(
+                                () ->
+                                        
Optional.ofNullable(tableSchema.getPrimaryKey())
+                                                .map(e -> e.getColumnNames())
+                                                
.orElse(Collections.emptyList()))
+                        .stream()
+                        .map(f -> structType.field(f).fieldId())
+                        .collect(Collectors.toSet());
         List<Types.NestedField> fields = new ArrayList<>();
         structType
                 .fields()
@@ -281,7 +270,7 @@ public class SchemaUtils {
             Types.NestedField icebergField =
                     Types.NestedField.of(
                             idIncrementer.getAndIncrement(),
-                            true,
+                            column.isNullable(),
                             column.getName(),
                             
IcebergTypeMapper.toIcebergType(column.getDataType(), idIncrementer),
                             column.getComment());
diff --git 
a/seatunnel-connectors-v2/connector-iceberg/src/test/java/org/apache/seatunnel/connectors/seatunnel/iceberg/catalog/IcebergCatalogTest.java
 
b/seatunnel-connectors-v2/connector-iceberg/src/test/java/org/apache/seatunnel/connectors/seatunnel/iceberg/catalog/IcebergCatalogTest.java
index c7c240228b..6ec5ae5783 100644
--- 
a/seatunnel-connectors-v2/connector-iceberg/src/test/java/org/apache/seatunnel/connectors/seatunnel/iceberg/catalog/IcebergCatalogTest.java
+++ 
b/seatunnel-connectors-v2/connector-iceberg/src/test/java/org/apache/seatunnel/connectors/seatunnel/iceberg/catalog/IcebergCatalogTest.java
@@ -20,6 +20,7 @@ package 
org.apache.seatunnel.connectors.seatunnel.iceberg.catalog;
 import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
+import org.apache.seatunnel.api.table.catalog.PrimaryKey;
 import org.apache.seatunnel.api.table.catalog.TableIdentifier;
 import org.apache.seatunnel.api.table.catalog.TablePath;
 import org.apache.seatunnel.api.table.catalog.TableSchema;
@@ -152,7 +153,8 @@ class IcebergCatalogTest {
     CatalogTable buildAllTypesTable(TableIdentifier tableIdentifier) {
         TableSchema.Builder builder = TableSchema.builder();
         builder.column(
-                PhysicalColumn.of("id", BasicType.INT_TYPE, (Long) null, true, 
null, "id comment"));
+                PhysicalColumn.of(
+                        "id", BasicType.INT_TYPE, (Long) null, false, null, 
"id comment"));
         builder.column(
                 PhysicalColumn.of(
                         "boolean_col", BasicType.BOOLEAN_TYPE, (Long) null, 
true, null, null));
@@ -185,6 +187,9 @@ class IcebergCatalogTest {
                 PhysicalColumn.of(
                         "decimal_col", new DecimalType(38, 18), (Long) null, 
true, null, null));
         builder.column(PhysicalColumn.of("dt_col", STRING_TYPE, (Long) null, 
true, null, null));
+        builder.primaryKey(
+                PrimaryKey.of(
+                        tableIdentifier.getTableName() + "_pk", 
Collections.singletonList("id")));
 
         TableSchema schema = builder.build();
         HashMap<String, String> options = new HashMap<>();

Reply via email to