This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new ab957145c7 [core] Fix dynamic insert into table with partition columns
contain primary key error (#5588)
ab957145c7 is described below
commit ab957145c7d38c9faa74537a600aed430e7faf90
Author: Xiduo You <[email protected]>
AuthorDate: Mon May 12 12:17:14 2025 +0800
[core] Fix dynamic insert into table with partition columns contain primary
key error (#5588)
---
.../paimon/crosspartition/GlobalIndexAssigner.java | 6 ++--
.../paimon/crosspartition/IndexBootstrap.java | 13 ++++----
.../KeyPartPartitionKeyExtractor.java | 5 ++-
.../sink/RowPartitionAllPrimaryKeyExtractor.java} | 37 ++++++++++------------
.../paimon/crosspartition/IndexBootstrapTest.java | 2 +-
.../spark/sql/InsertOverwriteTableTestBase.scala | 23 ++++++++++++++
6 files changed, 52 insertions(+), 34 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/crosspartition/GlobalIndexAssigner.java
b/paimon-core/src/main/java/org/apache/paimon/crosspartition/GlobalIndexAssigner.java
index 4700e73998..5ed13b4fce 100644
---
a/paimon-core/src/main/java/org/apache/paimon/crosspartition/GlobalIndexAssigner.java
+++
b/paimon-core/src/main/java/org/apache/paimon/crosspartition/GlobalIndexAssigner.java
@@ -40,7 +40,7 @@ import org.apache.paimon.sort.BinaryExternalSortBuffer;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.sink.PartitionKeyExtractor;
-import org.apache.paimon.table.sink.RowPartitionKeyExtractor;
+import org.apache.paimon.table.sink.RowPartitionAllPrimaryKeyExtractor;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;
@@ -129,7 +129,7 @@ public class GlobalIndexAssigner implements Serializable,
Closeable {
CoreOptions coreOptions = table.coreOptions();
this.targetBucketRowNumber = (int)
coreOptions.dynamicBucketTargetRowNum();
- this.extractor = new RowPartitionKeyExtractor(table.schema());
+ this.extractor = new
RowPartitionAllPrimaryKeyExtractor(table.schema());
this.keyPartExtractor = new
KeyPartPartitionKeyExtractor(table.schema());
// state
@@ -149,7 +149,7 @@ public class GlobalIndexAssigner implements Serializable,
Closeable {
path.toString(),
rocksdbOptions,
coreOptions.crossPartitionUpsertIndexTtl());
- RowType keyType = table.schema().logicalTrimmedPrimaryKeysType();
+ RowType keyType = table.schema().logicalPrimaryKeysType();
this.keyIndex =
stateFactory.valueState(
INDEX_NAME,
diff --git
a/paimon-core/src/main/java/org/apache/paimon/crosspartition/IndexBootstrap.java
b/paimon-core/src/main/java/org/apache/paimon/crosspartition/IndexBootstrap.java
index ec3ab25ebd..b8f13594a5 100644
---
a/paimon-core/src/main/java/org/apache/paimon/crosspartition/IndexBootstrap.java
+++
b/paimon-core/src/main/java/org/apache/paimon/crosspartition/IndexBootstrap.java
@@ -26,7 +26,7 @@ import org.apache.paimon.data.JoinedRow;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.schema.TableSchema;
-import org.apache.paimon.table.Table;
+import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.DataTableScan;
import org.apache.paimon.table.source.ReadBuilder;
@@ -58,9 +58,9 @@ public class IndexBootstrap implements Serializable {
public static final String BUCKET_FIELD = "_BUCKET";
- private final Table table;
+ private final FileStoreTable table;
- public IndexBootstrap(Table table) {
+ public IndexBootstrap(FileStoreTable table) {
this.table = table;
}
@@ -71,9 +71,11 @@ public class IndexBootstrap implements Serializable {
public RecordReader<InternalRow> bootstrap(int numAssigners, int assignId)
throws IOException {
RowType rowType = table.rowType();
+
List<String> fieldNames = rowType.getFieldNames();
+ // Use `trimmedPrimaryKeys` to reduce data size since we will add
partition at the end.
int[] keyProjection =
- table.primaryKeys().stream()
+ table.schema().trimmedPrimaryKeys().stream()
.map(fieldNames::indexOf)
.mapToInt(Integer::intValue)
.toArray();
@@ -136,13 +138,12 @@ public class IndexBootstrap implements Serializable {
}
public static RowType bootstrapType(TableSchema schema) {
- List<String> primaryKeys = schema.primaryKeys();
+ List<String> primaryKeys = schema.trimmedPrimaryKeys();
List<String> partitionKeys = schema.partitionKeys();
List<DataField> bootstrapFields =
new ArrayList<>(
schema.projectedLogicalRowType(
Stream.concat(primaryKeys.stream(),
partitionKeys.stream())
- .distinct()
.collect(Collectors.toList()))
.getFields());
bootstrapFields.add(
diff --git
a/paimon-core/src/main/java/org/apache/paimon/crosspartition/KeyPartPartitionKeyExtractor.java
b/paimon-core/src/main/java/org/apache/paimon/crosspartition/KeyPartPartitionKeyExtractor.java
index 5abfbfffbc..3d404cde15 100644
---
a/paimon-core/src/main/java/org/apache/paimon/crosspartition/KeyPartPartitionKeyExtractor.java
+++
b/paimon-core/src/main/java/org/apache/paimon/crosspartition/KeyPartPartitionKeyExtractor.java
@@ -37,14 +37,13 @@ public class KeyPartPartitionKeyExtractor implements
PartitionKeyExtractor<Inter
private final Projection keyProjection;
public KeyPartPartitionKeyExtractor(TableSchema schema) {
- List<String> primaryKeys = schema.primaryKeys();
List<String> partitionKeys = schema.partitionKeys();
RowType keyPartType =
schema.projectedLogicalRowType(
- Stream.concat(primaryKeys.stream(),
partitionKeys.stream())
+ Stream.concat(schema.trimmedPrimaryKeys().stream(),
partitionKeys.stream())
.collect(Collectors.toList()));
this.partitionProjection = CodeGenUtils.newProjection(keyPartType,
partitionKeys);
- this.keyProjection = CodeGenUtils.newProjection(keyPartType,
primaryKeys);
+ this.keyProjection = CodeGenUtils.newProjection(keyPartType,
schema.primaryKeys());
}
@Override
diff --git
a/paimon-core/src/main/java/org/apache/paimon/crosspartition/KeyPartPartitionKeyExtractor.java
b/paimon-core/src/main/java/org/apache/paimon/table/sink/RowPartitionAllPrimaryKeyExtractor.java
similarity index 54%
copy from
paimon-core/src/main/java/org/apache/paimon/crosspartition/KeyPartPartitionKeyExtractor.java
copy to
paimon-core/src/main/java/org/apache/paimon/table/sink/RowPartitionAllPrimaryKeyExtractor.java
index 5abfbfffbc..3dceaf8253 100644
---
a/paimon-core/src/main/java/org/apache/paimon/crosspartition/KeyPartPartitionKeyExtractor.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/sink/RowPartitionAllPrimaryKeyExtractor.java
@@ -16,35 +16,30 @@
* limitations under the License.
*/
-package org.apache.paimon.crosspartition;
+package org.apache.paimon.table.sink;
import org.apache.paimon.codegen.CodeGenUtils;
import org.apache.paimon.codegen.Projection;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.schema.TableSchema;
-import org.apache.paimon.table.sink.PartitionKeyExtractor;
-import org.apache.paimon.types.RowType;
-import java.util.List;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-
-/** A {@link PartitionKeyExtractor} to {@link InternalRow} with only key and
partition fields. */
-public class KeyPartPartitionKeyExtractor implements
PartitionKeyExtractor<InternalRow> {
+/**
+ * A {@link PartitionKeyExtractor} to {@link InternalRow}, the
`trimmedPrimaryKey` would return all
+ * primary keys.
+ */
+public class RowPartitionAllPrimaryKeyExtractor implements
PartitionKeyExtractor<InternalRow> {
private final Projection partitionProjection;
- private final Projection keyProjection;
-
- public KeyPartPartitionKeyExtractor(TableSchema schema) {
- List<String> primaryKeys = schema.primaryKeys();
- List<String> partitionKeys = schema.partitionKeys();
- RowType keyPartType =
- schema.projectedLogicalRowType(
- Stream.concat(primaryKeys.stream(),
partitionKeys.stream())
- .collect(Collectors.toList()));
- this.partitionProjection = CodeGenUtils.newProjection(keyPartType,
partitionKeys);
- this.keyProjection = CodeGenUtils.newProjection(keyPartType,
primaryKeys);
+ private final Projection primaryKeyProjection;
+
+ public RowPartitionAllPrimaryKeyExtractor(TableSchema schema) {
+ partitionProjection =
+ CodeGenUtils.newProjection(
+ schema.logicalRowType(),
schema.projection(schema.partitionKeys()));
+ primaryKeyProjection =
+ CodeGenUtils.newProjection(
+ schema.logicalRowType(),
schema.projection(schema.primaryKeys()));
}
@Override
@@ -54,6 +49,6 @@ public class KeyPartPartitionKeyExtractor implements
PartitionKeyExtractor<Inter
@Override
public BinaryRow trimmedPrimaryKey(InternalRow record) {
- return keyProjection.apply(record);
+ return primaryKeyProjection.apply(record);
}
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/crosspartition/IndexBootstrapTest.java
b/paimon-core/src/test/java/org/apache/paimon/crosspartition/IndexBootstrapTest.java
index 27fa311ddb..c9a4dacb90 100644
---
a/paimon-core/src/test/java/org/apache/paimon/crosspartition/IndexBootstrapTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/crosspartition/IndexBootstrapTest.java
@@ -68,7 +68,7 @@ public class IndexBootstrapTest extends TableTestBase {
row(3, 6, 6, 7),
row(3, 7, 7, 8));
- IndexBootstrap indexBootstrap = new IndexBootstrap(table);
+ IndexBootstrap indexBootstrap = new IndexBootstrap((FileStoreTable)
table);
List<GenericRow> result = new ArrayList<>();
Consumer<InternalRow> consumer =
row -> result.add(GenericRow.of(row.getInt(0), row.getInt(1),
row.getInt(2)));
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/InsertOverwriteTableTestBase.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/InsertOverwriteTableTestBase.scala
index b7e0b35dd1..e6e17d4848 100644
---
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/InsertOverwriteTableTestBase.scala
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/InsertOverwriteTableTestBase.scala
@@ -608,4 +608,27 @@ abstract class InsertOverwriteTableTestBase extends
PaimonSparkTestBase {
}
}
}
+
+ test("Paimon Insert: dynamic insert into table with partition columns
contain primary key") {
+ withSQLConf("spark.sql.shuffle.partitions" -> "10") {
+ withTable("pk_pt") {
+ sql("""
+ |create table pk_pt (c1 int) partitioned by(p1 string, p2 string)
+ |tblproperties('primary-key'='c1, p1')
+ |""".stripMargin)
+
+ sql("insert into table pk_pt partition(p1, p2) values(1, 'a', 'b'),
(1, 'b', 'b')")
+ checkAnswer(
+ sql("select * from pk_pt"),
+ Seq(Row(1, "a", "b"), Row(1, "b", "b"))
+ )
+
+ sql("insert into table pk_pt partition(p1, p2) values(1, 'a', 'b'),
(1, 'c', 'c')")
+ checkAnswer(
+ sql("select * from pk_pt order by c1, p1, p2"),
+ Seq(Row(1, "a", "b"), Row(1, "b", "b"), Row(1, "c", "c"))
+ )
+ }
+ }
+ }
}