This is an automated email from the ASF dual-hosted git repository.
yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new dbd7f5cd6 [spark][bugfix] Use pk index for comparator align with
SortMergeReade (#2987)
dbd7f5cd6 is described below
commit dbd7f5cd61cc0bdf89a7c85ed5f8987b1f85e131
Author: Yang Zhang <[email protected]>
AuthorDate: Tue Apr 7 16:33:41 2026 +0800
[spark][bugfix] Use pk index for comparator align with SortMergeReade
(#2987)
---
.../fluss/spark/read/FlussUpsertPartitionReader.scala | 14 +++++++++-----
.../org/apache/fluss/spark/utils/LogChangesIterator.scala | 10 ++++++----
.../apache/fluss/spark/SparkPrimaryKeyTableReadTest.scala | 13 +++++++++++++
3 files changed, 28 insertions(+), 9 deletions(-)
diff --git
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussUpsertPartitionReader.scala
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussUpsertPartitionReader.scala
index e8e56a2a5..73f3fca52 100644
---
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussUpsertPartitionReader.scala
+++
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussUpsertPartitionReader.scala
@@ -27,6 +27,7 @@ import org.apache.fluss.record.LogRecord
import org.apache.fluss.row.{encode, InternalRow, KeyValueRow}
import org.apache.fluss.spark.SparkFlussConf
import org.apache.fluss.spark.utils.LogChangesIterator
+import org.apache.fluss.types.{DataField, RowType}
import org.apache.fluss.utils.CloseableIterator
import org.apache.spark.internal.Logging
@@ -65,13 +66,12 @@ class FlussUpsertPartitionReader(
extraProjections ++= projection
pkIndexes.foreach {
pkIndex =>
- projection.find(p => p == pkIndex) match {
- case Some(index) =>
- _pkProjection += index
- case _ =>
+ projection.indexOf(pkIndex) match {
+ case -1 =>
extraProjections += pkIndex
_pkProjection += projection.length + i
i += 1
+ case idx => _pkProjection += idx
}
}
(extraProjections.toArray, _pkProjection.toArray)
@@ -99,9 +99,13 @@ class FlussUpsertPartitionReader(
private def createSortMergeReader(): SortMergeReader = {
// Create key encoder for primary keys
+ val pkIndexes = tableInfo.getSchema.getPrimaryKeyIndexes
+ val pkFields = new java.util.ArrayList[DataField]()
+ pkIndexes.foreach(i => pkFields.add(rowType.getFields.get(i)))
+ val pkRowType = new RowType(pkFields)
val keyEncoder =
encode.KeyEncoder.ofPrimaryKeyEncoder(
- rowType,
+ pkRowType,
tableInfo.getPhysicalPrimaryKeys,
tableInfo.getTableConfig,
tableInfo.isDefaultBucketKey)
diff --git
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/utils/LogChangesIterator.scala
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/utils/LogChangesIterator.scala
index 2c5871c84..15d784013 100644
---
a/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/utils/LogChangesIterator.scala
+++
b/fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/utils/LogChangesIterator.scala
@@ -48,10 +48,15 @@ case class LogChangesIterator(
comparator: Comparator[InternalRow]
) extends CloseableIterator[KeyValueRow] {
+ private val projectRow1 = ProjectedRow.from(pkProjection)
+ private val projectRow2 = ProjectedRow.from(pkProjection)
+
// Sort the records by primary key and then by offset
private val sortedLogRecords = logRecords.sortWith {
case (record1, record2) =>
- val keyComparison = comparator.compare(record1.getRow, record2.getRow)
+ val keyComparison = comparator.compare(
+ projectRow1.replaceRow(record1.getRow),
+ projectRow2.replaceRow(record2.getRow))
if (keyComparison == 0) {
record1.logOffset() < record2.logOffset() // For same key, lower
offset comes first
} else {
@@ -63,9 +68,6 @@ case class LogChangesIterator(
sortedLogRecords.head,
CloseableIterator.wrap(sortedLogRecords.tail.toIterator.asJava))
- private val projectRow1 = ProjectedRow.from(pkProjection)
- private val projectRow2 = ProjectedRow.from(pkProjection)
-
private var currentScanRecord: ScanRecord = _
override def hasNext: Boolean = {
diff --git
a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/SparkPrimaryKeyTableReadTest.scala
b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/SparkPrimaryKeyTableReadTest.scala
index 359614cbc..e75367f07 100644
---
a/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/SparkPrimaryKeyTableReadTest.scala
+++
b/fluss-spark/fluss-spark-ut/src/test/scala/org/apache/fluss/spark/SparkPrimaryKeyTableReadTest.scala
@@ -242,6 +242,19 @@ class SparkPrimaryKeyTableReadTest extends
FlussSparkTestBase {
}
}
+ test("Spark Read: primary key table with random project") {
+ withTable("t") {
+ sql(
+ "CREATE TABLE t (id int, name string, pk int, pk2 string)
TBLPROPERTIES('primary.key'='pk,pk2')")
+ checkAnswer(sql("SELECT * FROM t"), Nil)
+ sql("INSERT INTO t VALUES (1, 'a', 10, 'x'), (2, 'b', 20, 'y')")
+ checkAnswer(
+ sql("SELECT * FROM t ORDER BY id"),
+ Row(1, "a", 10, "x") :: Row(2, "b", 20, "y") :: Nil)
+ checkAnswer(sql("SELECT pk, id FROM t ORDER BY id"), Row(10, 1) ::
Row(20, 2) :: Nil)
+ }
+ }
+
private def genInputPartition(
tablePath: TablePath,
partitionName: String): Array[FlussUpsertInputPartition] = {