This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 0b1b59827c [core] optimize validate fields functions (#6061)
0b1b59827c is described below
commit 0b1b59827c1ea9c8159dcfad146abf203ddab5a0
Author: kevin <[email protected]>
AuthorDate: Wed Aug 13 16:44:56 2025 +0800
[core] optimize validate fields functions (#6061)
---
.../src/main/java/org/apache/paimon/schema/Schema.java | 16 ++++++++++------
.../src/main/java/org/apache/paimon/types/RowType.java | 7 +++----
.../paimon/flink/action/cdc/CdcActionCommonUtils.java | 7 ++-----
3 files changed, 15 insertions(+), 15 deletions(-)
diff --git a/paimon-api/src/main/java/org/apache/paimon/schema/Schema.java
b/paimon-api/src/main/java/org/apache/paimon/schema/Schema.java
index d4dbbb0117..9842b5db9f 100644
--- a/paimon-api/src/main/java/org/apache/paimon/schema/Schema.java
+++ b/paimon-api/src/main/java/org/apache/paimon/schema/Schema.java
@@ -36,7 +36,6 @@ import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -44,6 +43,7 @@ import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Function;
import java.util.stream.Collectors;
/**
@@ -129,7 +129,7 @@ public class Schema {
List<DataField> fields, List<String> primaryKeys, List<String>
partitionKeys) {
List<String> fieldNames =
fields.stream().map(DataField::name).collect(Collectors.toList());
- Set<String> duplicateColumns = duplicate(fieldNames);
+ Set<String> duplicateColumns = duplicateFields(fieldNames);
Preconditions.checkState(
duplicateColumns.isEmpty(),
"Table column %s must not contain duplicate fields. Found: %s",
@@ -138,7 +138,7 @@ public class Schema {
Set<String> allFields = new HashSet<>(fieldNames);
- duplicateColumns = duplicate(partitionKeys);
+ duplicateColumns = duplicateFields(partitionKeys);
Preconditions.checkState(
duplicateColumns.isEmpty(),
"Partition key constraint %s must not contain duplicate
columns. Found: %s",
@@ -153,7 +153,7 @@ public class Schema {
if (primaryKeys.isEmpty()) {
return fields;
}
- duplicateColumns = duplicate(primaryKeys);
+ duplicateColumns = duplicateFields(primaryKeys);
Preconditions.checkState(
duplicateColumns.isEmpty(),
"Primary key constraint %s must not contain duplicate columns.
Found: %s",
@@ -218,9 +218,13 @@ public class Schema {
return partitionKeys;
}
- private static Set<String> duplicate(List<String> names) {
+ public static Set<String> duplicateFields(List<String> names) {
return names.stream()
- .filter(name -> Collections.frequency(names, name) > 1)
+ .collect(Collectors.groupingBy(Function.identity(),
Collectors.counting()))
+ .entrySet()
+ .stream()
+ .filter(e -> e.getValue() > 1)
+ .map(Map.Entry::getKey)
.collect(Collectors.toSet());
}
diff --git a/paimon-api/src/main/java/org/apache/paimon/types/RowType.java
b/paimon-api/src/main/java/org/apache/paimon/types/RowType.java
index a7f867aeb3..bd1ca10995 100644
--- a/paimon-api/src/main/java/org/apache/paimon/types/RowType.java
+++ b/paimon-api/src/main/java/org/apache/paimon/types/RowType.java
@@ -19,6 +19,7 @@
package org.apache.paimon.types;
import org.apache.paimon.annotation.Public;
+import org.apache.paimon.schema.Schema;
import org.apache.paimon.table.SpecialFields;
import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.utils.StringUtils;
@@ -272,10 +273,8 @@ public final class RowType extends DataType {
throw new IllegalArgumentException(
"Field names must contain at least one non-whitespace
character.");
}
- final Set<String> duplicates =
- fieldNames.stream()
- .filter(n -> Collections.frequency(fieldNames, n) > 1)
- .collect(Collectors.toSet());
+ final Set<String> duplicates = Schema.duplicateFields(fieldNames);
+
if (!duplicates.isEmpty()) {
throw new IllegalArgumentException(
String.format("Field names must be unique. Found
duplicates: %s", duplicates));
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcActionCommonUtils.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcActionCommonUtils.java
index 6a3001319c..79ecb1941e 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcActionCommonUtils.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcActionCommonUtils.java
@@ -33,9 +33,9 @@ import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.stream.Collectors;
import static org.apache.paimon.flink.action.MultiTablesSinkMode.COMBINED;
@@ -249,10 +249,7 @@ public class CdcActionCommonUtils {
}
public static void checkDuplicateFields(String tableName, List<String>
fieldNames) {
- List<String> duplicates =
- fieldNames.stream()
- .filter(name -> Collections.frequency(fieldNames,
name) > 1)
- .collect(Collectors.toList());
+ Set<String> duplicates = Schema.duplicateFields(fieldNames);
checkState(
duplicates.isEmpty(),
"Table %s contains duplicate columns: %s.\n"