This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 5ba0927198 [core] format table: fix partition key not in the end write
error (#6595)
5ba0927198 is described below
commit 5ba0927198eeb9a6ac842f815fc3c7989dd5134a
Author: jerry <[email protected]>
AuthorDate: Thu Nov 13 13:36:59 2025 +0800
[core] format table: fix partition key not in the end write error (#6595)
---
.../paimon/table/format/FormatTableFileWriter.java | 16 ++++-----------
.../paimon/table/format/FormatTableWrite.java | 24 ++++++++++++++--------
.../paimon/spark/table/PaimonFormatTableTest.scala | 18 ++++++++++++++++
3 files changed, 38 insertions(+), 20 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/format/FormatTableFileWriter.java
b/paimon-core/src/main/java/org/apache/paimon/table/format/FormatTableFileWriter.java
index 5e6c59c574..31615669b0 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/format/FormatTableFileWriter.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/format/FormatTableFileWriter.java
@@ -32,7 +32,6 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.stream.Collectors;
import static org.apache.paimon.format.FileFormat.fileFormat;
@@ -40,18 +39,16 @@ import static
org.apache.paimon.format.FileFormat.fileFormat;
public class FormatTableFileWriter {
private final FileIO fileIO;
- private RowType rowType;
- private RowType partitionType;
+ private RowType writeRowType;
private final FileFormat fileFormat;
private final FileStorePathFactory pathFactory;
protected final Map<BinaryRow, FormatTableRecordWriter> writers;
protected final CoreOptions options;
public FormatTableFileWriter(
- FileIO fileIO, RowType rowType, CoreOptions options, RowType
partitionType) {
+ FileIO fileIO, RowType writeRowType, CoreOptions options, RowType
partitionType) {
this.fileIO = fileIO;
- this.rowType = rowType;
- this.partitionType = partitionType;
+ this.writeRowType = writeRowType;
this.fileFormat = fileFormat(options);
this.writers = new HashMap<>();
this.options = options;
@@ -72,7 +69,7 @@ public class FormatTableFileWriter {
}
public void withWriteType(RowType writeType) {
- this.rowType = writeType;
+ this.writeRowType = writeType;
}
public void write(BinaryRow partition, InternalRow data) throws Exception {
@@ -101,11 +98,6 @@ public class FormatTableFileWriter {
}
private FormatTableRecordWriter createWriter(BinaryRow partition) {
- RowType writeRowType =
- rowType.project(
- rowType.getFieldNames().stream()
- .filter(name ->
!partitionType.getFieldNames().contains(name))
- .collect(Collectors.toList()));
return new FormatTableRecordWriter(
fileIO,
fileFormat,
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/format/FormatTableWrite.java
b/paimon-core/src/main/java/org/apache/paimon/table/format/FormatTableWrite.java
index 363523e6c7..c74bf95e1a 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/format/FormatTableWrite.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/format/FormatTableWrite.java
@@ -33,6 +33,7 @@ import
org.apache.paimon.table.sink.FormatTableRowPartitionKeyExtractor;
import org.apache.paimon.table.sink.TableWrite;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.ProjectedRow;
import javax.annotation.Nullable;
@@ -48,6 +49,8 @@ public class FormatTableWrite implements BatchTableWrite {
private final int[] notNullFieldIndex;
private final @Nullable DefaultValueRow defaultValueRow;
+ private final ProjectedRow projectedRow;
+ private final RowType writeRowType;
public FormatTableWrite(
FileIO fileIO,
@@ -56,7 +59,6 @@ public class FormatTableWrite implements BatchTableWrite {
RowType partitionType,
List<String> partitionKeys) {
this.rowType = rowType;
- this.write = new FormatTableFileWriter(fileIO, rowType, options,
partitionType);
this.partitionKeyExtractor =
new FormatTableRowPartitionKeyExtractor(rowType,
partitionKeys);
List<String> notNullColumnNames =
@@ -66,12 +68,13 @@ public class FormatTableWrite implements BatchTableWrite {
.collect(Collectors.toList());
this.notNullFieldIndex = rowType.getFieldIndices(notNullColumnNames);
this.defaultValueRow = DefaultValueRow.create(rowType);
- }
-
- @Override
- public BatchTableWrite withWriteType(RowType writeType) {
- write.withWriteType(writeType);
- return this;
+ this.writeRowType =
+ rowType.project(
+ rowType.getFieldNames().stream()
+ .filter(name ->
!partitionType.getFieldNames().contains(name))
+ .collect(Collectors.toList()));
+ this.projectedRow = ProjectedRow.from(writeRowType, rowType);
+ this.write = new FormatTableFileWriter(fileIO, writeRowType, options,
partitionType);
}
@Override
@@ -91,7 +94,7 @@ public class FormatTableWrite implements BatchTableWrite {
}
row = defaultValueRow == null ? row : defaultValueRow.replaceRow(row);
BinaryRow partition = partitionKeyExtractor.partition(row);
- write.write(partition, row);
+ write.write(partition, projectedRow.replaceRow(row));
}
@Override
@@ -119,6 +122,11 @@ public class FormatTableWrite implements BatchTableWrite {
return this;
}
+ @Override
+ public BatchTableWrite withWriteType(RowType writeType) {
+ throw new UnsupportedOperationException();
+ }
+
@Override
public void write(InternalRow row, int bucket) throws Exception {
throw new UnsupportedOperationException();
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/table/PaimonFormatTableTest.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/table/PaimonFormatTableTest.scala
index 96997dec34..048f3111b8 100644
---
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/table/PaimonFormatTableTest.scala
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/table/PaimonFormatTableTest.scala
@@ -118,6 +118,24 @@ class PaimonFormatTableTest extends
PaimonSparkTestWithRestCatalogBase {
}
}
+ test("PaimonFormatTable write: partition key in diff position") {
+ val tableName = "paimon_format_test_orc_mode"
+ withTable(tableName) {
+ sql(
+ s"CREATE TABLE $tableName (`ds` bigint, age INT, `ds1` bigint, name
STRING, `ds2` bigint) USING ORC TBLPROPERTIES (" +
+ s"'format-table.implementation'='paimon') PARTITIONED BY (`ds`,
`ds1`, `ds2`)")
+ val table =
+ paimonCatalog.getTable(Identifier.create("test_db",
tableName)).asInstanceOf[FormatTable]
+ val partition = 20250920
+ table.fileIO().mkdirs(new Path(table.location()))
+ spark.sql(s"INSERT INTO $tableName VALUES (5, 11, 12, 'ab', 13), (7,
11, 12, 'Larry', 13)")
+ checkAnswer(
+ spark.sql(s"SELECT ds, age, ds1, name, ds2 FROM $tableName ORDER BY
age"),
+ Row(5, 11, 12, "ab", 13) :: Row(7, 11, 12, "Larry", 13) :: Nil
+ )
+ }
+ }
+
test("PaimonFormatTable: set dynamic options") {
withTable("t") {
sql(s"create table t (id INT, v INT, pt STRING) using csv")