This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new ab957145c7 [core] Fix dynamic insert into table with partition columns 
contain primary key error (#5588)
ab957145c7 is described below

commit ab957145c7d38c9faa74537a600aed430e7faf90
Author: Xiduo You <[email protected]>
AuthorDate: Mon May 12 12:17:14 2025 +0800

    [core] Fix dynamic insert into table with partition columns contain primary 
key error (#5588)
---
 .../paimon/crosspartition/GlobalIndexAssigner.java |  6 ++--
 .../paimon/crosspartition/IndexBootstrap.java      | 13 ++++----
 .../KeyPartPartitionKeyExtractor.java              |  5 ++-
 .../sink/RowPartitionAllPrimaryKeyExtractor.java}  | 37 ++++++++++------------
 .../paimon/crosspartition/IndexBootstrapTest.java  |  2 +-
 .../spark/sql/InsertOverwriteTableTestBase.scala   | 23 ++++++++++++++
 6 files changed, 52 insertions(+), 34 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/crosspartition/GlobalIndexAssigner.java
 
b/paimon-core/src/main/java/org/apache/paimon/crosspartition/GlobalIndexAssigner.java
index 4700e73998..5ed13b4fce 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/crosspartition/GlobalIndexAssigner.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/crosspartition/GlobalIndexAssigner.java
@@ -40,7 +40,7 @@ import org.apache.paimon.sort.BinaryExternalSortBuffer;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.Table;
 import org.apache.paimon.table.sink.PartitionKeyExtractor;
-import org.apache.paimon.table.sink.RowPartitionKeyExtractor;
+import org.apache.paimon.table.sink.RowPartitionAllPrimaryKeyExtractor;
 import org.apache.paimon.types.DataType;
 import org.apache.paimon.types.DataTypes;
 import org.apache.paimon.types.RowType;
@@ -129,7 +129,7 @@ public class GlobalIndexAssigner implements Serializable, 
Closeable {
 
         CoreOptions coreOptions = table.coreOptions();
         this.targetBucketRowNumber = (int) 
coreOptions.dynamicBucketTargetRowNum();
-        this.extractor = new RowPartitionKeyExtractor(table.schema());
+        this.extractor = new 
RowPartitionAllPrimaryKeyExtractor(table.schema());
         this.keyPartExtractor = new 
KeyPartPartitionKeyExtractor(table.schema());
 
         // state
@@ -149,7 +149,7 @@ public class GlobalIndexAssigner implements Serializable, 
Closeable {
                         path.toString(),
                         rocksdbOptions,
                         coreOptions.crossPartitionUpsertIndexTtl());
-        RowType keyType = table.schema().logicalTrimmedPrimaryKeysType();
+        RowType keyType = table.schema().logicalPrimaryKeysType();
         this.keyIndex =
                 stateFactory.valueState(
                         INDEX_NAME,
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/crosspartition/IndexBootstrap.java
 
b/paimon-core/src/main/java/org/apache/paimon/crosspartition/IndexBootstrap.java
index ec3ab25ebd..b8f13594a5 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/crosspartition/IndexBootstrap.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/crosspartition/IndexBootstrap.java
@@ -26,7 +26,7 @@ import org.apache.paimon.data.JoinedRow;
 import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.reader.RecordReader;
 import org.apache.paimon.schema.TableSchema;
-import org.apache.paimon.table.Table;
+import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.source.DataSplit;
 import org.apache.paimon.table.source.DataTableScan;
 import org.apache.paimon.table.source.ReadBuilder;
@@ -58,9 +58,9 @@ public class IndexBootstrap implements Serializable {
 
     public static final String BUCKET_FIELD = "_BUCKET";
 
-    private final Table table;
+    private final FileStoreTable table;
 
-    public IndexBootstrap(Table table) {
+    public IndexBootstrap(FileStoreTable table) {
         this.table = table;
     }
 
@@ -71,9 +71,11 @@ public class IndexBootstrap implements Serializable {
 
     public RecordReader<InternalRow> bootstrap(int numAssigners, int assignId) 
throws IOException {
         RowType rowType = table.rowType();
+
         List<String> fieldNames = rowType.getFieldNames();
+        // Use `trimmedPrimaryKeys` to reduce data size since we will add 
partition at the end.
         int[] keyProjection =
-                table.primaryKeys().stream()
+                table.schema().trimmedPrimaryKeys().stream()
                         .map(fieldNames::indexOf)
                         .mapToInt(Integer::intValue)
                         .toArray();
@@ -136,13 +138,12 @@ public class IndexBootstrap implements Serializable {
     }
 
     public static RowType bootstrapType(TableSchema schema) {
-        List<String> primaryKeys = schema.primaryKeys();
+        List<String> primaryKeys = schema.trimmedPrimaryKeys();
         List<String> partitionKeys = schema.partitionKeys();
         List<DataField> bootstrapFields =
                 new ArrayList<>(
                         schema.projectedLogicalRowType(
                                         Stream.concat(primaryKeys.stream(), 
partitionKeys.stream())
-                                                .distinct()
                                                 .collect(Collectors.toList()))
                                 .getFields());
         bootstrapFields.add(
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/crosspartition/KeyPartPartitionKeyExtractor.java
 
b/paimon-core/src/main/java/org/apache/paimon/crosspartition/KeyPartPartitionKeyExtractor.java
index 5abfbfffbc..3d404cde15 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/crosspartition/KeyPartPartitionKeyExtractor.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/crosspartition/KeyPartPartitionKeyExtractor.java
@@ -37,14 +37,13 @@ public class KeyPartPartitionKeyExtractor implements 
PartitionKeyExtractor<Inter
     private final Projection keyProjection;
 
     public KeyPartPartitionKeyExtractor(TableSchema schema) {
-        List<String> primaryKeys = schema.primaryKeys();
         List<String> partitionKeys = schema.partitionKeys();
         RowType keyPartType =
                 schema.projectedLogicalRowType(
-                        Stream.concat(primaryKeys.stream(), 
partitionKeys.stream())
+                        Stream.concat(schema.trimmedPrimaryKeys().stream(), 
partitionKeys.stream())
                                 .collect(Collectors.toList()));
         this.partitionProjection = CodeGenUtils.newProjection(keyPartType, 
partitionKeys);
-        this.keyProjection = CodeGenUtils.newProjection(keyPartType, 
primaryKeys);
+        this.keyProjection = CodeGenUtils.newProjection(keyPartType, 
schema.primaryKeys());
     }
 
     @Override
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/crosspartition/KeyPartPartitionKeyExtractor.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/RowPartitionAllPrimaryKeyExtractor.java
similarity index 54%
copy from 
paimon-core/src/main/java/org/apache/paimon/crosspartition/KeyPartPartitionKeyExtractor.java
copy to 
paimon-core/src/main/java/org/apache/paimon/table/sink/RowPartitionAllPrimaryKeyExtractor.java
index 5abfbfffbc..3dceaf8253 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/crosspartition/KeyPartPartitionKeyExtractor.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/RowPartitionAllPrimaryKeyExtractor.java
@@ -16,35 +16,30 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.crosspartition;
+package org.apache.paimon.table.sink;
 
 import org.apache.paimon.codegen.CodeGenUtils;
 import org.apache.paimon.codegen.Projection;
 import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.schema.TableSchema;
-import org.apache.paimon.table.sink.PartitionKeyExtractor;
-import org.apache.paimon.types.RowType;
 
-import java.util.List;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-
-/** A {@link PartitionKeyExtractor} to {@link InternalRow} with only key and 
partition fields. */
-public class KeyPartPartitionKeyExtractor implements 
PartitionKeyExtractor<InternalRow> {
+/**
+ * A {@link PartitionKeyExtractor} to {@link InternalRow}, the 
`trimmedPrimaryKey` would return all
+ * primary keys.
+ */
+public class RowPartitionAllPrimaryKeyExtractor implements 
PartitionKeyExtractor<InternalRow> {
 
     private final Projection partitionProjection;
-    private final Projection keyProjection;
-
-    public KeyPartPartitionKeyExtractor(TableSchema schema) {
-        List<String> primaryKeys = schema.primaryKeys();
-        List<String> partitionKeys = schema.partitionKeys();
-        RowType keyPartType =
-                schema.projectedLogicalRowType(
-                        Stream.concat(primaryKeys.stream(), 
partitionKeys.stream())
-                                .collect(Collectors.toList()));
-        this.partitionProjection = CodeGenUtils.newProjection(keyPartType, 
partitionKeys);
-        this.keyProjection = CodeGenUtils.newProjection(keyPartType, 
primaryKeys);
+    private final Projection primaryKeyProjection;
+
+    public RowPartitionAllPrimaryKeyExtractor(TableSchema schema) {
+        partitionProjection =
+                CodeGenUtils.newProjection(
+                        schema.logicalRowType(), 
schema.projection(schema.partitionKeys()));
+        primaryKeyProjection =
+                CodeGenUtils.newProjection(
+                        schema.logicalRowType(), 
schema.projection(schema.primaryKeys()));
     }
 
     @Override
@@ -54,6 +49,6 @@ public class KeyPartPartitionKeyExtractor implements 
PartitionKeyExtractor<Inter
 
     @Override
     public BinaryRow trimmedPrimaryKey(InternalRow record) {
-        return keyProjection.apply(record);
+        return primaryKeyProjection.apply(record);
     }
 }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/crosspartition/IndexBootstrapTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/crosspartition/IndexBootstrapTest.java
index 27fa311ddb..c9a4dacb90 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/crosspartition/IndexBootstrapTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/crosspartition/IndexBootstrapTest.java
@@ -68,7 +68,7 @@ public class IndexBootstrapTest extends TableTestBase {
                 row(3, 6, 6, 7),
                 row(3, 7, 7, 8));
 
-        IndexBootstrap indexBootstrap = new IndexBootstrap(table);
+        IndexBootstrap indexBootstrap = new IndexBootstrap((FileStoreTable) 
table);
         List<GenericRow> result = new ArrayList<>();
         Consumer<InternalRow> consumer =
                 row -> result.add(GenericRow.of(row.getInt(0), row.getInt(1), 
row.getInt(2)));
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/InsertOverwriteTableTestBase.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/InsertOverwriteTableTestBase.scala
index b7e0b35dd1..e6e17d4848 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/InsertOverwriteTableTestBase.scala
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/InsertOverwriteTableTestBase.scala
@@ -608,4 +608,27 @@ abstract class InsertOverwriteTableTestBase extends 
PaimonSparkTestBase {
       }
     }
   }
+
+  test("Paimon Insert: dynamic insert into table with partition columns 
contain primary key") {
+    withSQLConf("spark.sql.shuffle.partitions" -> "10") {
+      withTable("pk_pt") {
+        sql("""
+              |create table pk_pt (c1 int) partitioned by(p1 string, p2 string)
+              |tblproperties('primary-key'='c1, p1')
+              |""".stripMargin)
+
+        sql("insert into table pk_pt partition(p1, p2) values(1, 'a', 'b'), 
(1, 'b', 'b')")
+        checkAnswer(
+          sql("select * from pk_pt"),
+          Seq(Row(1, "a", "b"), Row(1, "b", "b"))
+        )
+
+        sql("insert into table pk_pt partition(p1, p2) values(1, 'a', 'b'), 
(1, 'c', 'c')")
+        checkAnswer(
+          sql("select * from pk_pt order by c1, p1, p2"),
+          Seq(Row(1, "a", "b"), Row(1, "b", "b"), Row(1, "c", "c"))
+        )
+      }
+    }
+  }
 }

Reply via email to