This is an automated email from the ASF dual-hosted git repository.
wuchunfu pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 6b36f90f4d [Improve][Iceberg] Add savemode create table primaryKey
testcase (#7641)
6b36f90f4d is described below
commit 6b36f90f4d55a7a35b19383be03e9616c0938d24
Author: hailin0 <[email protected]>
AuthorDate: Thu Sep 12 17:59:39 2024 +0800
[Improve][Iceberg] Add savemode create table primaryKey testcase (#7641)
---
.../seatunnel/iceberg/catalog/IcebergCatalog.java | 19 +++++++++--
.../seatunnel/iceberg/utils/SchemaUtils.java | 39 ++++++++--------------
.../iceberg/catalog/IcebergCatalogTest.java | 7 +++-
3 files changed, 36 insertions(+), 29 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/catalog/IcebergCatalog.java
b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/catalog/IcebergCatalog.java
index fc28001b2c..60591d9893 100644
---
a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/catalog/IcebergCatalog.java
+++
b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/catalog/IcebergCatalog.java
@@ -23,6 +23,7 @@ import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.InfoPreviewResult;
import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
import org.apache.seatunnel.api.table.catalog.PreviewResult;
+import org.apache.seatunnel.api.table.catalog.PrimaryKey;
import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.catalog.TableSchema;
import org.apache.seatunnel.api.table.catalog.exception.CatalogException;
@@ -36,6 +37,7 @@ import
org.apache.seatunnel.connectors.seatunnel.iceberg.config.CommonConfig;
import org.apache.seatunnel.connectors.seatunnel.iceberg.utils.SchemaUtils;
import org.apache.iceberg.PartitionField;
+import org.apache.iceberg.Schema;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.Namespace;
@@ -48,8 +50,11 @@ import lombok.extern.slf4j.Slf4j;
import java.io.Closeable;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
+import java.util.Set;
+import java.util.function.Function;
import java.util.stream.Collectors;
import static com.google.common.base.Preconditions.checkArgument;
@@ -222,7 +227,8 @@ public class IcebergCatalog implements Catalog {
}
public CatalogTable toCatalogTable(Table icebergTable, TablePath
tablePath) {
- List<Types.NestedField> columns = icebergTable.schema().columns();
+ Schema schema = icebergTable.schema();
+ List<Types.NestedField> columns = schema.columns();
TableSchema.Builder builder = TableSchema.builder();
columns.forEach(
nestedField -> {
@@ -234,12 +240,19 @@ public class IcebergCatalog implements Catalog {
name,
seaTunnelType,
(Long) null,
- true,
+ nestedField.isOptional(),
null,
nestedField.doc());
builder.column(physicalColumn);
});
-
+ Optional.ofNullable(schema.identifierFieldNames())
+ .map(
+ (Function<Set<String>, Object>)
+ names ->
+ builder.primaryKey(
+ PrimaryKey.of(
+
tablePath.getTableName() + "_pk",
+ new
ArrayList<>(names))));
List<String> partitionKeys =
icebergTable.spec().fields().stream()
.map(PartitionField::name)
diff --git
a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/utils/SchemaUtils.java
b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/utils/SchemaUtils.java
index 54ea5721ac..5047746e9e 100644
---
a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/utils/SchemaUtils.java
+++
b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/utils/SchemaUtils.java
@@ -36,7 +36,6 @@ import
org.apache.seatunnel.connectors.seatunnel.iceberg.sink.schema.SchemaChang
import
org.apache.seatunnel.connectors.seatunnel.iceberg.sink.schema.SchemaDeleteColumn;
import
org.apache.seatunnel.connectors.seatunnel.iceberg.sink.schema.SchemaModifyColumn;
-import org.apache.commons.collections.CollectionUtils;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
@@ -54,17 +53,17 @@ import com.google.common.annotations.VisibleForTesting;
import lombok.extern.slf4j.Slf4j;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
-import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
+import java.util.stream.Collectors;
import static java.util.stream.Collectors.toList;
@@ -161,27 +160,17 @@ public class SchemaUtils {
@NotNull protected static Schema toIcebergSchema(
TableSchema tableSchema, ReadonlyConfig readonlyConfig) {
Types.StructType structType = SchemaUtils.toIcebergType(tableSchema);
- Set<Integer> identifierFieldIds = new HashSet<>();
- if (Objects.nonNull(readonlyConfig)) {
- List<String> pks =
-
SinkConfig.stringToList(readonlyConfig.get(SinkConfig.TABLE_PRIMARY_KEYS), ",");
- if (CollectionUtils.isNotEmpty(pks)) {
- for (String pk : pks) {
- Optional<Integer> pkId =
- structType.fields().stream()
- .filter(nestedField ->
nestedField.name().equals(pk))
- .map(Types.NestedField::fieldId)
- .findFirst();
- if (!pkId.isPresent()) {
- throw new IllegalArgumentException(
- String.format(
- "iceberg table pk:%s not present in
the incoming struct",
- pk));
- }
- identifierFieldIds.add(pkId.get());
- }
- }
- }
+ Set<Integer> identifierFieldIds =
+ readonlyConfig.getOptional(SinkConfig.TABLE_PRIMARY_KEYS)
+ .map(e -> SinkConfig.stringToList(e, ","))
+ .orElseGet(
+ () ->
+
Optional.ofNullable(tableSchema.getPrimaryKey())
+ .map(e -> e.getColumnNames())
+
.orElse(Collections.emptyList()))
+ .stream()
+ .map(f -> structType.field(f).fieldId())
+ .collect(Collectors.toSet());
List<Types.NestedField> fields = new ArrayList<>();
structType
.fields()
@@ -281,7 +270,7 @@ public class SchemaUtils {
Types.NestedField icebergField =
Types.NestedField.of(
idIncrementer.getAndIncrement(),
- true,
+ column.isNullable(),
column.getName(),
IcebergTypeMapper.toIcebergType(column.getDataType(), idIncrementer),
column.getComment());
diff --git
a/seatunnel-connectors-v2/connector-iceberg/src/test/java/org/apache/seatunnel/connectors/seatunnel/iceberg/catalog/IcebergCatalogTest.java
b/seatunnel-connectors-v2/connector-iceberg/src/test/java/org/apache/seatunnel/connectors/seatunnel/iceberg/catalog/IcebergCatalogTest.java
index c7c240228b..6ec5ae5783 100644
---
a/seatunnel-connectors-v2/connector-iceberg/src/test/java/org/apache/seatunnel/connectors/seatunnel/iceberg/catalog/IcebergCatalogTest.java
+++
b/seatunnel-connectors-v2/connector-iceberg/src/test/java/org/apache/seatunnel/connectors/seatunnel/iceberg/catalog/IcebergCatalogTest.java
@@ -20,6 +20,7 @@ package
org.apache.seatunnel.connectors.seatunnel.iceberg.catalog;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
+import org.apache.seatunnel.api.table.catalog.PrimaryKey;
import org.apache.seatunnel.api.table.catalog.TableIdentifier;
import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.catalog.TableSchema;
@@ -152,7 +153,8 @@ class IcebergCatalogTest {
CatalogTable buildAllTypesTable(TableIdentifier tableIdentifier) {
TableSchema.Builder builder = TableSchema.builder();
builder.column(
- PhysicalColumn.of("id", BasicType.INT_TYPE, (Long) null, true,
null, "id comment"));
+ PhysicalColumn.of(
+ "id", BasicType.INT_TYPE, (Long) null, false, null,
"id comment"));
builder.column(
PhysicalColumn.of(
"boolean_col", BasicType.BOOLEAN_TYPE, (Long) null,
true, null, null));
@@ -185,6 +187,9 @@ class IcebergCatalogTest {
PhysicalColumn.of(
"decimal_col", new DecimalType(38, 18), (Long) null,
true, null, null));
builder.column(PhysicalColumn.of("dt_col", STRING_TYPE, (Long) null,
true, null, null));
+ builder.primaryKey(
+ PrimaryKey.of(
+ tableIdentifier.getTableName() + "_pk",
Collections.singletonList("id")));
TableSchema schema = builder.build();
HashMap<String, String> options = new HashMap<>();