This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 12cc41ae9a [core] Fix row id push down: blob file may not continous
(#6550)
12cc41ae9a is described below
commit 12cc41ae9af05aa9a8f27636f5dc0485784c7e9b
Author: YeJunHao <[email protected]>
AuthorDate: Fri Nov 7 08:33:51 2025 +0800
[core] Fix row id push down: blob file may not continous (#6550)
---
.../paimon/operation/DataEvolutionSplitRead.java | 59 +++++++++++++++-------
.../paimon/operation/DataEvolutionReadTest.java | 21 +++++++-
2 files changed, 61 insertions(+), 19 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionSplitRead.java
b/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionSplitRead.java
index c0e54af576..47a247be48 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionSplitRead.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/DataEvolutionSplitRead.java
@@ -374,6 +374,14 @@ public class DataEvolutionSplitRead implements
SplitRead<InternalRow> {
@VisibleForTesting
public static List<FieldBunch> splitFieldBunches(
List<DataFileMeta> needMergeFiles, Function<DataFileMeta, Integer>
blobFileToFieldId) {
+ return splitFieldBunches(needMergeFiles, blobFileToFieldId, false);
+ }
+
+ @VisibleForTesting
+ public static List<FieldBunch> splitFieldBunches(
+ List<DataFileMeta> needMergeFiles,
+ Function<DataFileMeta, Integer> blobFileToFieldId,
+ boolean rowIdPushDown) {
List<FieldBunch> fieldsFiles = new ArrayList<>();
Map<Integer, BlobBunch> blobBunchMap = new HashMap<>();
long rowCount = -1;
@@ -382,7 +390,8 @@ public class DataEvolutionSplitRead implements
SplitRead<InternalRow> {
int fieldId = blobFileToFieldId.apply(file);
final long expectedRowCount = rowCount;
blobBunchMap
- .computeIfAbsent(fieldId, key -> new
BlobBunch(expectedRowCount))
+ .computeIfAbsent(
+ fieldId, key -> new
BlobBunch(expectedRowCount, rowIdPushDown))
.add(file);
} else {
// Normal file, just add it to the current merge split
@@ -426,16 +435,18 @@ public class DataEvolutionSplitRead implements
SplitRead<InternalRow> {
final List<DataFileMeta> files;
final long expectedRowCount;
+ final boolean rowIdPushDown;
long latestFistRowId = -1;
long expectedNextFirstRowId = -1;
long latestMaxSequenceNumber = -1;
long rowCount;
- BlobBunch(long expectedRowCount) {
+ BlobBunch(long expectedRowCount, boolean rowIdPushDown) {
this.files = new ArrayList<>();
this.rowCount = 0;
this.expectedRowCount = expectedRowCount;
+ this.rowIdPushDown = rowIdPushDown;
}
void add(DataFileMeta file) {
@@ -452,24 +463,36 @@ public class DataEvolutionSplitRead implements
SplitRead<InternalRow> {
}
if (!files.isEmpty()) {
long firstRowId = file.firstRowId();
- if (firstRowId < expectedNextFirstRowId) {
+ if (rowIdPushDown) {
+ if (firstRowId < expectedNextFirstRowId) {
+ if (file.maxSequenceNumber() >
latestMaxSequenceNumber) {
+ files.remove(files.size() - 1);
+ } else {
+ return;
+ }
+ }
+ } else {
+ if (firstRowId < expectedNextFirstRowId) {
+ checkArgument(
+ file.maxSequenceNumber() <
latestMaxSequenceNumber,
+ "Blob file with overlapping row id should have
decreasing sequence number.");
+ return;
+ } else if (firstRowId > expectedNextFirstRowId) {
+ throw new IllegalArgumentException(
+ "Blob file first row id should be continuous,
expect "
+ + expectedNextFirstRowId
+ + " but got "
+ + firstRowId);
+ }
+ }
+ if (!files.isEmpty()) {
checkArgument(
- file.maxSequenceNumber() < latestMaxSequenceNumber,
- "Blob file with overlapping row id should have
decreasing sequence number.");
- return;
- } else if (firstRowId > expectedNextFirstRowId) {
- throw new IllegalArgumentException(
- "Blob file first row id should be continuous,
expect "
- + expectedNextFirstRowId
- + " but got "
- + firstRowId);
+ file.schemaId() == files.get(0).schemaId(),
+ "All files in a blob bunch should have the same
schema id.");
+ checkArgument(
+ file.writeCols().equals(files.get(0).writeCols()),
+ "All files in a blob bunch should have the same
write columns.");
}
- checkArgument(
- file.schemaId() == files.get(0).schemaId(),
- "All files in a blob bunch should have the same schema
id.");
- checkArgument(
- file.writeCols().equals(files.get(0).writeCols()),
- "All files in a blob bunch should have the same write
columns.");
}
files.add(file);
rowCount += file.rowCount();
diff --git
a/paimon-core/src/test/java/org/apache/paimon/operation/DataEvolutionReadTest.java
b/paimon-core/src/test/java/org/apache/paimon/operation/DataEvolutionReadTest.java
index 91c6285fa6..6b9dd29b59 100644
---
a/paimon-core/src/test/java/org/apache/paimon/operation/DataEvolutionReadTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/operation/DataEvolutionReadTest.java
@@ -35,6 +35,7 @@ import java.util.List;
import static
org.apache.paimon.operation.DataEvolutionSplitRead.splitFieldBunches;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatCode;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
/** Tests for {@link BlobBunch}. */
@@ -44,7 +45,7 @@ public class DataEvolutionReadTest {
@BeforeEach
public void setUp() {
- blobBunch = new BlobBunch(Long.MAX_VALUE);
+ blobBunch = new BlobBunch(Long.MAX_VALUE, false);
}
@Test
@@ -319,6 +320,24 @@ public class DataEvolutionReadTest {
writeCols);
}
+ @Test
+ public void testRowIdPushDown() {
+ BlobBunch blobBunch = new BlobBunch(Long.MAX_VALUE, true);
+ DataFileMeta blobEntry1 = createBlobFile("blob1", 0, 100, 1);
+ DataFileMeta blobEntry2 = createBlobFile("blob2", 200, 300, 1);
+ blobBunch.add(blobEntry1);
+ BlobBunch finalBlobBunch = blobBunch;
+ DataFileMeta finalBlobEntry = blobEntry2;
+ assertThatCode(() ->
finalBlobBunch.add(finalBlobEntry)).doesNotThrowAnyException();
+
+ blobBunch = new BlobBunch(Long.MAX_VALUE, true);
+ blobEntry1 = createBlobFile("blob1", 0, 100, 1);
+ blobEntry2 = createBlobFile("blob2", 50, 200, 2);
+ blobBunch.add(blobEntry1);
+ blobBunch.add(blobEntry2);
+ assertThat(blobBunch.files).containsExactlyInAnyOrder(blobEntry2);
+ }
+
/** Creates a normal (non-blob) file for testing. */
private DataFileMeta createNormalFile(
String fileName,