This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new aadfb6d83 [flink] Fix that schema validation fails when using 'scan.x'
options to do time travel on schema changed tables (#2543)
aadfb6d83 is described below
commit aadfb6d83de7636752d5773dbfc6e8abf24e0682
Author: yuzelin <[email protected]>
AuthorDate: Thu Dec 21 09:47:54 2023 +0800
[flink] Fix that schema validation fails when using 'scan.x' options to do
time travel on schema changed tables (#2543)
---
.../org/apache/paimon/utils/DateTimeUtils.java | 2 +-
.../paimon/table/AbstractFileStoreTable.java | 27 ++++++++++++++----
.../org/apache/paimon/table/FileStoreTable.java | 3 ++
.../paimon/flink/AbstractFlinkTableFactory.java | 8 ++++--
.../apache/paimon/flink/BatchFileStoreITCase.java | 30 ++++++++++++++++++++
.../paimon/flink/ContinuousFileStoreITCase.java | 32 ++++++++++++++++++++++
6 files changed, 93 insertions(+), 9 deletions(-)
diff --git
a/paimon-common/src/main/java/org/apache/paimon/utils/DateTimeUtils.java
b/paimon-common/src/main/java/org/apache/paimon/utils/DateTimeUtils.java
index ad48b2d14..16d84bc95 100644
--- a/paimon-common/src/main/java/org/apache/paimon/utils/DateTimeUtils.java
+++ b/paimon-common/src/main/java/org/apache/paimon/utils/DateTimeUtils.java
@@ -407,7 +407,7 @@ public class DateTimeUtils {
// Format
//
--------------------------------------------------------------------------------------------
- private static String formatTimestamp(Timestamp ts, int precision) {
+ public static String formatTimestamp(Timestamp ts, int precision) {
LocalDateTime ldt = ts.toLocalDateTime();
String fraction = pad(9, ldt.getNano());
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
index 7e206d787..14bc66316 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
@@ -163,6 +163,22 @@ public abstract class AbstractFileStoreTable implements
FileStoreTable {
@Override
public FileStoreTable copy(Map<String, String> dynamicOptions) {
+ checkImmutability(dynamicOptions);
+ return copyInternal(dynamicOptions, true);
+ }
+
+ @Override
+ public FileStoreTable copyWithoutTimeTravel(Map<String, String>
dynamicOptions) {
+ checkImmutability(dynamicOptions);
+ return copyInternal(dynamicOptions, false);
+ }
+
+ @Override
+ public FileStoreTable internalCopyWithoutCheck(Map<String, String>
dynamicOptions) {
+ return copyInternal(dynamicOptions, true);
+ }
+
+ private void checkImmutability(Map<String, String> dynamicOptions) {
Map<String, String> options = tableSchema.options();
// check option is not immutable
dynamicOptions.forEach(
@@ -171,12 +187,9 @@ public abstract class AbstractFileStoreTable implements
FileStoreTable {
SchemaManager.checkAlterTableOption(k);
}
});
-
- return internalCopyWithoutCheck(dynamicOptions);
}
- @Override
- public FileStoreTable internalCopyWithoutCheck(Map<String, String>
dynamicOptions) {
+ private FileStoreTable copyInternal(Map<String, String> dynamicOptions,
boolean tryTimeTravel) {
Map<String, String> options = new HashMap<>(tableSchema.options());
// merge non-null dynamic options into schema.options
@@ -203,8 +216,10 @@ public abstract class AbstractFileStoreTable implements
FileStoreTable {
// validate schema with new options
SchemaValidation.validateTableSchema(newTableSchema);
- // see if merged options contain time travel option
- newTableSchema = tryTimeTravel(newOptions).orElse(newTableSchema);
+ if (tryTimeTravel) {
+ // see if merged options contain time travel option
+ newTableSchema = tryTimeTravel(newOptions).orElse(newTableSchema);
+ }
return copy(newTableSchema);
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTable.java
index 860bb91be..5ff78a197 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTable.java
@@ -79,6 +79,9 @@ public interface FileStoreTable extends DataTable {
@Override
FileStoreTable copy(Map<String, String> dynamicOptions);
+ /** Doesn't change table schema even when there exists time travel scan
options. */
+ FileStoreTable copyWithoutTimeTravel(Map<String, String> dynamicOptions);
+
/** Sometimes we have to change some Immutable options to implement
features. */
FileStoreTable internalCopyWithoutCheck(Map<String, String>
dynamicOptions);
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/AbstractFlinkTableFactory.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/AbstractFlinkTableFactory.java
index cc559c190..782a9804e 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/AbstractFlinkTableFactory.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/AbstractFlinkTableFactory.java
@@ -248,10 +248,14 @@ public abstract class AbstractFlinkTableFactory
newOptions.putAll(origin.getOptions());
newOptions.putAll(dynamicOptions);
+ // notice that the Paimon table schema must be the same with the
Flink's
if (origin instanceof DataCatalogTable) {
- table = ((DataCatalogTable) origin).table().copy(newOptions);
+ FileStoreTable fileStoreTable = (FileStoreTable)
((DataCatalogTable) origin).table();
+ table = fileStoreTable.copyWithoutTimeTravel(newOptions);
} else {
- table =
FileStoreTableFactory.create(createCatalogContext(context)).copy(newOptions);
+ table =
+ FileStoreTableFactory.create(createCatalogContext(context))
+ .copyWithoutTimeTravel(newOptions);
}
Schema schema =
FlinkCatalog.fromCatalogTable(context.getCatalogTable());
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
index ac68dcad7..0a1e06473 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
@@ -21,6 +21,7 @@ package org.apache.paimon.flink;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.utils.BlockingIterator;
+import org.apache.paimon.utils.DateTimeUtils;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
@@ -386,4 +387,33 @@ public class BatchFileStoreITCase extends
CatalogITCaseBase {
Row.ofKind(RowKind.INSERT, 1, "B"),
Row.ofKind(RowKind.INSERT, 2, "B"));
iterator.close();
}
+
+ @Test
+ public void testScanFromOldSchema() throws InterruptedException {
+ sql("CREATE TABLE select_old (f0 INT PRIMARY KEY NOT ENFORCED, f1
STRING)");
+
+ sql("INSERT INTO select_old VALUES (1, 'a'), (2, 'b')");
+
+ Thread.sleep(1_000);
+ long timestamp = System.currentTimeMillis();
+
+ sql("ALTER TABLE select_old ADD f2 STRING");
+ sql("INSERT INTO select_old VALUES (3, 'c', 'C')");
+
+ // this way will initialize source with the latest schema
+ assertThat(
+ sql(
+ "SELECT * FROM select_old /*+
OPTIONS('scan.timestamp-millis'='%s') */",
+ timestamp))
+ // old schema doesn't have column f2
+ .containsExactlyInAnyOrder(Row.of(1, "a", null), Row.of(2,
"b", null));
+
+ // this way will initialize source with time-travelled schema
+ assertThat(
+ sql(
+ "SELECT * FROM select_old FOR SYSTEM_TIME AS
OF TIMESTAMP '%s'",
+ DateTimeUtils.formatTimestamp(
+ DateTimeUtils.toInternal(timestamp,
0), 0)))
+ .containsExactlyInAnyOrder(Row.of(1, "a"), Row.of(2, "b"));
+ }
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
index b056c9e86..2e88dcbb1 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java
@@ -21,6 +21,7 @@ package org.apache.paimon.flink;
import org.apache.paimon.Snapshot;
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.utils.BlockingIterator;
+import org.apache.paimon.utils.DateTimeUtils;
import org.apache.paimon.utils.SnapshotManager;
import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList;
@@ -521,4 +522,35 @@ public class ContinuousFileStoreITCase extends
CatalogITCaseBase {
sql("INSERT INTO ignore_delete VALUES (1, 'B')");
assertThat(sql("SELECT * FROM
ignore_delete")).containsExactly(Row.of(1, "B"));
}
+
+ @Test
+ public void testScanFromOldSchema() throws Exception {
+ sql("CREATE TABLE select_old (f0 INT PRIMARY KEY NOT ENFORCED, f1
STRING)");
+
+ sql("INSERT INTO select_old VALUES (1, 'a'), (2, 'b')");
+
+ Thread.sleep(1_000);
+ long timestamp = System.currentTimeMillis();
+
+ sql("ALTER TABLE select_old ADD f2 STRING");
+ sql("INSERT INTO select_old VALUES (3, 'c', 'C')");
+
+ // this way will initialize source with the latest schema
+ BlockingIterator<Row, Row> iterator =
+ BlockingIterator.of(
+ streamSqlIter(
+ "SELECT * FROM select_old /*+
OPTIONS('scan.timestamp-millis'='%s') */",
+ timestamp));
+ assertThat(iterator.collect(1)).containsExactlyInAnyOrder(Row.of(3,
"c", "C"));
+ iterator.close();
+
+ // this way will initialize source with time-travelled schema
+ iterator =
+ BlockingIterator.of(
+ streamSqlIter(
+ "SELECT * FROM select_old FOR SYSTEM_TIME AS
OF TIMESTAMP '%s'",
+ DateTimeUtils.formatTimestamp(
+ DateTimeUtils.toInternal(timestamp,
0), 0)));
+ assertThat(iterator.collect(1)).containsExactlyInAnyOrder(Row.of(3,
"c"));
+ }
}