This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 73cf2aaaa8 [core] Incremental query should return empty result for
some corner cases (#5339)
73cf2aaaa8 is described below
commit 73cf2aaaa82f48bc51a6465798b4ef42044ab8dd
Author: yuzelin <[email protected]>
AuthorDate: Tue Mar 25 13:27:09 2025 +0800
[core] Incremental query should return empty result for some corner cases
(#5339)
---
.../paimon/table/source/AbstractDataTableScan.java | 27 ++++++--
.../snapshot/IncrementalDeltaStartingScanner.java | 38 +++---------
.../snapshot/IncrementalDiffStartingScanner.java | 18 +++---
.../apache/paimon/table/IncrementalTableTest.java | 72 +++++++++++++++++++++-
.../IncrementalDeltaStartingScannerTest.java | 19 ++----
5 files changed, 113 insertions(+), 61 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java
index 37cd0e378f..17a5b161b0 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java
@@ -257,13 +257,28 @@ abstract class AbstractDataTableScan implements
DataTableScan {
incrementalBetween.getLeft(),
incrementalBetween.getRight()));
}
+ checkArgument(
+ endId >= startId,
+ "Ending snapshotId should >= starting snapshotId %s.",
+ endId,
+ startId);
+
+ if (snapshotManager.earliestSnapshot() == null) {
+ LOG.warn("There is currently no snapshot. Waiting for
snapshot generation.");
+ return new EmptyResultStartingScanner(snapshotManager);
+ }
+
+ if (startId == endId) {
+ return new EmptyResultStartingScanner(snapshotManager);
+ }
+
CoreOptions.IncrementalBetweenScanMode scanMode =
options.incrementalBetweenScanMode();
return scanMode == DIFF
? IncrementalDiffStartingScanner.betweenSnapshotIds(
startId, endId, snapshotManager)
- : new IncrementalDeltaStartingScanner(
- snapshotManager, startId, endId,
toSnapshotScanMode(scanMode));
+ : IncrementalDeltaStartingScanner.betweenSnapshotIds(
+ startId, endId, snapshotManager,
toSnapshotScanMode(scanMode));
}
} else if (conf.contains(CoreOptions.INCREMENTAL_BETWEEN_TIMESTAMP)) {
Pair<Long, Long> incrementalBetween =
options.incrementalBetweenTimestamp();
@@ -277,11 +292,13 @@ abstract class AbstractDataTableScan implements
DataTableScan {
long startTimestamp = incrementalBetween.getLeft();
long endTimestamp = incrementalBetween.getRight();
checkArgument(
- endTimestamp > startTimestamp,
- "Ending timestamp %s should be larger than starting
timestamp %s.",
+ endTimestamp >= startTimestamp,
+ "Ending timestamp %s should be >= starting timestamp %s.",
endTimestamp,
startTimestamp);
- if (startTimestamp > latestSnapshot.timeMillis()
+
+ if (startTimestamp == endTimestamp
+ || startTimestamp > latestSnapshot.timeMillis()
|| endTimestamp < earliestSnapshot.timeMillis()) {
return new EmptyResultStartingScanner(snapshotManager);
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalDeltaStartingScanner.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalDeltaStartingScanner.java
index 6f18dde393..80a56e4f0c 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalDeltaStartingScanner.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalDeltaStartingScanner.java
@@ -43,7 +43,6 @@ import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
-import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import java.util.stream.LongStream;
@@ -72,11 +71,6 @@ public class IncrementalDeltaStartingScanner extends
AbstractStartingScanner {
@Override
public Result scan(SnapshotReader reader) {
- // Check the validity of scan staring snapshotId.
- Optional<Result> checkResult = checkScanSnapshotIdValidity();
- if (checkResult.isPresent()) {
- return checkResult.get();
- }
Map<Pair<BinaryRow, Integer>, List<DataFileMeta>> grouped = new
ConcurrentHashMap<>();
ManifestsReader manifestsReader = reader.manifestsReader();
@@ -153,39 +147,23 @@ public class IncrementalDeltaStartingScanner extends
AbstractStartingScanner {
return StartingScanner.fromPlan(new PlanImpl(null, endingSnapshotId,
result));
}
- /**
- * Check the validity of staring snapshotId early.
- *
- * @return If the check passes return empty.
- */
- private Optional<Result> checkScanSnapshotIdValidity() {
- Long earliestSnapshotId = snapshotManager.earliestSnapshotId();
- Long latestSnapshotId = snapshotManager.latestSnapshotId();
-
- if (earliestSnapshotId == null || latestSnapshotId == null) {
- LOG.warn("There is currently no snapshot. Waiting for snapshot
generation.");
- return Optional.of(new NoSnapshot());
- }
-
- checkArgument(
- startingSnapshotId <= endingSnapshotId,
- "Starting snapshotId %s must less than ending snapshotId %s.",
- startingSnapshotId,
- endingSnapshotId);
+ public static StartingScanner betweenSnapshotIds(
+ long startId, long endId, SnapshotManager snapshotManager,
ScanMode scanMode) {
+ long earliestSnapshotId = snapshotManager.earliestSnapshotId();
+ long latestSnapshotId = snapshotManager.latestSnapshotId();
// because of the left open right closed rule of
IncrementalStartingScanner that is
// different from StaticFromStartingScanner, so we should allow
starting snapshotId to be
// equal to the earliestSnapshotId - 1.
checkArgument(
- startingSnapshotId >= earliestSnapshotId - 1
- && endingSnapshotId <= latestSnapshotId,
+ startId >= earliestSnapshotId - 1 && endId <= latestSnapshotId,
"The specified scan snapshotId range [%s, %s] is out of
available snapshotId range [%s, %s].",
- startingSnapshotId,
- endingSnapshotId,
+ startId,
+ endId,
earliestSnapshotId,
latestSnapshotId);
- return Optional.empty();
+ return new IncrementalDeltaStartingScanner(snapshotManager, startId,
endId, scanMode);
}
public static IncrementalDeltaStartingScanner betweenTimestamps(
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalDiffStartingScanner.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalDiffStartingScanner.java
index 942ee23ffa..37c6498e95 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalDiffStartingScanner.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalDiffStartingScanner.java
@@ -67,7 +67,7 @@ public class IncrementalDiffStartingScanner extends
AbstractStartingScanner {
return
StartingScanner.fromPlan(reader.withSnapshot(end).readIncrementalDiff(start));
}
- public static IncrementalDiffStartingScanner betweenTags(
+ public static StartingScanner betweenTags(
Tag startTag,
Tag endTag,
SnapshotManager snapshotManager,
@@ -82,24 +82,22 @@ public class IncrementalDiffStartingScanner extends
AbstractStartingScanner {
end.id());
checkArgument(
- end.id() > start.id(),
- "Tag end %s with snapshot id %s should be larger than tag
start %s with snapshot id %s",
+ end.id() >= start.id(),
+ "Tag end %s with snapshot id %s should be >= tag start %s with
snapshot id %s",
incrementalBetween.getRight(),
end.id(),
incrementalBetween.getLeft(),
start.id());
+ if (start.id() == end.id()) {
+ return new EmptyResultStartingScanner(snapshotManager);
+ }
+
return new IncrementalDiffStartingScanner(snapshotManager, start, end);
}
- public static IncrementalDiffStartingScanner betweenSnapshotIds(
+ public static StartingScanner betweenSnapshotIds(
long startId, long endId, SnapshotManager snapshotManager) {
- checkArgument(
- endId > startId,
- "Ending snapshotId should be larger than starting snapshotId
%s.",
- endId,
- startId);
-
Snapshot start = snapshotManager.snapshot(startId);
Snapshot end = snapshotManager.snapshot(endId);
return new IncrementalDiffStartingScanner(snapshotManager, start, end);
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/IncrementalTableTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/IncrementalTableTest.java
index 416f375bdc..4810e9f4f5 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/IncrementalTableTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/IncrementalTableTest.java
@@ -33,6 +33,7 @@ import org.apache.paimon.table.sink.TableWriteImpl;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowKind;
import org.apache.paimon.utils.Pair;
+import org.apache.paimon.utils.SnapshotManager;
import org.apache.paimon.utils.TagManager;
import org.junit.jupiter.api.Test;
@@ -42,6 +43,7 @@ import java.util.Collections;
import java.util.List;
import static org.apache.paimon.CoreOptions.INCREMENTAL_BETWEEN;
+import static org.apache.paimon.CoreOptions.INCREMENTAL_BETWEEN_TIMESTAMP;
import static org.apache.paimon.CoreOptions.INCREMENTAL_TO_AUTO_TAG;
import static org.apache.paimon.data.BinaryString.fromString;
import static org.apache.paimon.io.DataFileTestUtils.row;
@@ -287,7 +289,7 @@ public class IncrementalTableTest extends TableTestBase {
assertThatThrownBy(() -> read(table, Pair.of(INCREMENTAL_BETWEEN,
"TAG2,TAG1")))
.hasMessageContaining(
- "Tag end TAG1 with snapshot id 1 should be larger than
tag start TAG2 with snapshot id 2");
+ "Tag end TAG1 with snapshot id 1 should be >= tag
start TAG2 with snapshot id 2");
}
@Test
@@ -406,6 +408,74 @@ public class IncrementalTableTest extends TableTestBase {
.containsExactly(GenericRow.of(3,
BinaryString.fromString("c")));
}
+ @Test
+ public void testIncrementalEmptyResult() throws Exception {
+ Identifier identifier = identifier("T");
+ Schema schema =
+ Schema.newBuilder()
+ .column("a", DataTypes.INT())
+ .column("b", DataTypes.STRING())
+ .primaryKey("a")
+ .option("bucket", "1")
+ .build();
+ catalog.createTable(identifier, schema, false);
+ FileStoreTable table = (FileStoreTable) catalog.getTable(identifier);
+
+ // no snapshot
+ assertThat(read(table, Pair.of(INCREMENTAL_BETWEEN, "1,2"))).isEmpty();
+ assertThat(read(table, Pair.of(INCREMENTAL_BETWEEN_TIMESTAMP,
"2025-01-01,2025-01-02")))
+ .isEmpty();
+
+ TableWriteImpl<?> write = table.newWrite(commitUser);
+ TableCommitImpl commit =
table.newCommit(commitUser).ignoreEmptyCommit(false);
+ SnapshotManager snapshotManager = table.snapshotManager();
+
+ write.write(GenericRow.of(1, BinaryString.fromString("a")));
+ List<CommitMessage> commitMessages = write.prepareCommit(false, 0);
+ commit.commit(0, commitMessages);
+
+ write.write(GenericRow.of(2, BinaryString.fromString("b")));
+ commitMessages = write.prepareCommit(false, 1);
+ commit.commit(1, commitMessages);
+
+ table.createTag("tag1", 1);
+
+ long earliestTimestamp =
snapshotManager.earliestSnapshot().timeMillis();
+ long latestTimestamp = snapshotManager.latestSnapshot().timeMillis();
+
+ // same tag
+ assertThat(read(table, Pair.of(INCREMENTAL_BETWEEN,
"tag1,tag1"))).isEmpty();
+
+ // same snapshot
+ assertThat(read(table, Pair.of(INCREMENTAL_BETWEEN, "1,1"))).isEmpty();
+
+ // same timestamp
+ assertThat(read(table, Pair.of(INCREMENTAL_BETWEEN_TIMESTAMP,
"2025-01-01,2025-01-01")))
+ .isEmpty();
+
+ // startTimestamp > latestSnapshot.timeMillis()
+ assertThat(
+ read(
+ table,
+ Pair.of(
+ INCREMENTAL_BETWEEN_TIMESTAMP,
+ String.format(
+ "%s,%s",
+ latestTimestamp + 1,
latestTimestamp + 2))))
+ .isEmpty();
+
+ // endTimestamp < earliestSnapshot.timeMillis()
+ assertThat(
+ read(
+ table,
+ Pair.of(
+ INCREMENTAL_BETWEEN_TIMESTAMP,
+ String.format(
+ "%s,%s",
+ earliestTimestamp - 2,
earliestTimestamp - 1))))
+ .isEmpty();
+ }
+
private static long utcMills(String timestamp) {
return
Timestamp.fromLocalDateTime(LocalDateTime.parse(timestamp)).getMillisecond();
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/IncrementalDeltaStartingScannerTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/IncrementalDeltaStartingScannerTest.java
index 100b80c93f..b19d686577 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/IncrementalDeltaStartingScannerTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/IncrementalDeltaStartingScannerTest.java
@@ -110,25 +110,14 @@ public class IncrementalDeltaStartingScannerTest extends
ScannerTestBase {
assertThatNoException()
.isThrownBy(
() ->
- new IncrementalDeltaStartingScanner(
- snapshotManager, 0, 4,
ScanMode.DELTA)
+
IncrementalDeltaStartingScanner.betweenSnapshotIds(
+ 0, 4, snapshotManager,
ScanMode.DELTA)
.scan(snapshotReader));
- // Starting snapshotId must less than ending snapshotId.
assertThatThrownBy(
() ->
- new IncrementalDeltaStartingScanner(
- snapshotManager, 4, 3,
ScanMode.DELTA)
- .scan(snapshotReader))
- .satisfies(
- anyCauseMatches(
- IllegalArgumentException.class,
- "Starting snapshotId 4 must less than ending
snapshotId 3."));
-
- assertThatThrownBy(
- () ->
- new IncrementalDeltaStartingScanner(
- snapshotManager, 1, 5,
ScanMode.DELTA)
+
IncrementalDeltaStartingScanner.betweenSnapshotIds(
+ 1, 5, snapshotManager,
ScanMode.DELTA)
.scan(snapshotReader))
.satisfies(
anyCauseMatches(