This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new bfd0a0a8b4 [core] Throw exception if increment query with rescale
bucket (#4984)
bfd0a0a8b4 is described below
commit bfd0a0a8b48e75465ade9ff2c14c995b4c4ee706
Author: yuzelin <[email protected]>
AuthorDate: Thu Jan 23 10:25:00 2025 +0800
[core] Throw exception if increment query with rescale bucket (#4984)
---
.../paimon/table/source/AbstractDataTableScan.java | 5 +++-
.../snapshot/IncrementalTagStartingScanner.java | 14 ++++++++++-
.../table/source/snapshot/TimeTravelUtil.java | 24 ++++++++++++++++--
.../apache/paimon/flink/BatchFileStoreITCase.java | 29 ++++++++++++++++++++++
4 files changed, 68 insertions(+), 4 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java
index 59b11281cc..5bb9ba1378 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java
@@ -245,7 +245,10 @@ public abstract class AbstractDataTableScan implements
DataTableScan {
Options conf = options.toConfiguration();
TagManager tagManager =
- new TagManager(snapshotManager.fileIO(),
snapshotManager.tablePath());
+ new TagManager(
+ snapshotManager.fileIO(),
+ snapshotManager.tablePath(),
+ snapshotManager.branch());
if (conf.contains(CoreOptions.INCREMENTAL_BETWEEN)) {
Pair<String, String> incrementalBetween =
options.incrementalBetween();
Optional<Tag> startTag =
tagManager.get(incrementalBetween.getLeft());
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalTagStartingScanner.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalTagStartingScanner.java
index 8c4b5d5cec..835b7595a3 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalTagStartingScanner.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalTagStartingScanner.java
@@ -20,6 +20,7 @@ package org.apache.paimon.table.source.snapshot;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.Snapshot;
+import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.tag.Tag;
import org.apache.paimon.tag.TagPeriodHandler;
import org.apache.paimon.utils.Pair;
@@ -50,6 +51,14 @@ public class IncrementalTagStartingScanner extends
AbstractStartingScanner {
this.start = start;
this.end = end;
this.startingSnapshotId = start.id();
+
+ TimeTravelUtil.checkRescaleBucketForIncrementalTagQuery(
+ new SchemaManager(
+ snapshotManager.fileIO(),
+ snapshotManager.tablePath(),
+ snapshotManager.branch()),
+ start.schemaId(),
+ end.schemaId());
}
@Override
@@ -66,7 +75,10 @@ public class IncrementalTagStartingScanner extends
AbstractStartingScanner {
endTagName);
TagManager tagManager =
- new TagManager(snapshotManager.fileIO(),
snapshotManager.tablePath());
+ new TagManager(
+ snapshotManager.fileIO(),
+ snapshotManager.tablePath(),
+ snapshotManager.branch());
Optional<Tag> endTag = tagManager.get(endTagName);
if (!endTag.isPresent()) {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/TimeTravelUtil.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/TimeTravelUtil.java
index 5b4ee4e58c..4a0f4290df 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/TimeTravelUtil.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/TimeTravelUtil.java
@@ -20,8 +20,9 @@ package org.apache.paimon.table.source.snapshot;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.Snapshot;
+import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.FileStoreTable;
-import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.utils.SnapshotManager;
import org.apache.paimon.utils.SnapshotNotExistException;
import org.apache.paimon.utils.TagManager;
@@ -30,6 +31,7 @@ import java.util.ArrayList;
import java.util.List;
import static org.apache.paimon.CoreOptions.SCAN_SNAPSHOT_ID;
+import static org.apache.paimon.utils.Preconditions.checkArgument;
/** The util class of resolve snapshot from scan params for time travel. */
public class TimeTravelUtil {
@@ -58,7 +60,7 @@ public class TimeTravelUtil {
return snapshotManager.latestSnapshot();
}
- Preconditions.checkArgument(
+ checkArgument(
scanHandleKey.size() == 1,
String.format(
"Only one of the following parameters may be set :
[%s, %s, %s, %s]",
@@ -124,4 +126,22 @@ public class TimeTravelUtil {
new TagManager(snapshotManager.fileIO(),
snapshotManager.tablePath());
return tagManager.getOrThrow(tagName).trimToSnapshot();
}
+
+ public static void checkRescaleBucketForIncrementalTagQuery(
+ SchemaManager schemaManager, long schemaId1, long schemaId2) {
+ if (schemaId1 != schemaId2) {
+ int bucketNumber1 = bucketNumber(schemaManager, schemaId1);
+ int bucketNumber2 = bucketNumber(schemaManager, schemaId2);
+ checkArgument(
+ bucketNumber1 == bucketNumber2,
+ "The bucket number of two tags are different (%s, %s),
which is not supported in incremental tag query.",
+ bucketNumber1,
+ bucketNumber2);
+ }
+ }
+
+ private static int bucketNumber(SchemaManager schemaManager, long
schemaId) {
+ TableSchema schema = schemaManager.schema(schemaId);
+ return CoreOptions.fromMap(schema.options()).bucket();
+ }
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
index ee24dc8ef3..d9a5cab1d4 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
@@ -21,6 +21,7 @@ package org.apache.paimon.flink;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.flink.util.AbstractTestBase;
import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.Table;
import org.apache.paimon.utils.BlockingIterator;
import org.apache.paimon.utils.DateTimeUtils;
import org.apache.paimon.utils.SnapshotNotExistException;
@@ -37,6 +38,7 @@ import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import java.math.BigDecimal;
+import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@@ -652,6 +654,33 @@ public class BatchFileStoreITCase extends
CatalogITCaseBase {
assertThat(result).containsExactlyInAnyOrder(Row.of(1, 11, 111),
Row.of(2, 22, 222));
}
+ @Test
+ public void testIncrementTagQueryWithRescaleBucket() throws Exception {
+ sql("CREATE TABLE test (a INT PRIMARY KEY NOT ENFORCED, b INT) WITH
('bucket' = '1')");
+ Table table = paimonTable("test");
+
+ sql("INSERT INTO test VALUES (1, 11), (2, 22)");
+ sql("ALTER TABLE test SET ('bucket' = '2')");
+ sql("INSERT OVERWRITE test SELECT * FROM test");
+ sql("INSERT INTO test VALUES (3, 33)");
+
+ table.createTag("2024-01-01", 1);
+ table.createTag("2024-01-02", 3);
+
+ List<String> incrementalOptions =
+ Arrays.asList(
+ "'incremental-between'='2024-01-01,2024-01-02'",
+ "'incremental-to-auto-tag'='2024-01-02'");
+
+ for (String option : incrementalOptions) {
+ assertThatThrownBy(() -> sql("SELECT * FROM test /*+ OPTIONS (%s)
*/", option))
+ .satisfies(
+ anyCauseMatches(
+ IllegalArgumentException.class,
+ "The bucket number of two tags are
different (1, 2), which is not supported in incremental tag query."));
+ }
+ }
+
private void validateCount1PushDown(String sql) {
Transformation<?> transformation = AbstractTestBase.translate(tEnv,
sql);
while (!transformation.getInputs().isEmpty()) {