This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 6c54255abd [rest] Supports global system tables (#4880)
6c54255abd is described below
commit 6c54255abdd7cd3bfa9472d93925251fe63aeb9c
Author: Jingsong Lee <[email protected]>
AuthorDate: Fri Jan 10 19:55:41 2025 +0800
[rest] Supports global system tables (#4880)
---
.../org/apache/paimon/catalog/AbstractCatalog.java | 30 +------
.../org/apache/paimon/catalog/CatalogUtils.java | 30 +++++++
.../java/org/apache/paimon/rest/RESTCatalog.java | 2 +-
.../paimon/table/system/AllTableOptionsTable.java | 92 +++++++-------------
.../org/apache/paimon/catalog/CatalogTestBase.java | 8 --
.../org/apache/paimon/rest/RESTCatalogTest.java | 5 --
.../table/system/AllTableOptionsTableTest.java | 3 +-
.../apache/paimon/flink/CatalogTableITCase.java | 71 +++++++++++++++-
.../org/apache/paimon/flink/SystemTableITCase.java | 98 ----------------------
9 files changed, 132 insertions(+), 207 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
index a4c47f54a6..702d5229cf 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
@@ -39,8 +39,6 @@ import org.apache.paimon.table.FormatTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.object.ObjectTable;
import org.apache.paimon.table.sink.BatchWriteBuilder;
-import org.apache.paimon.table.system.AllTableOptionsTable;
-import org.apache.paimon.table.system.CatalogOptionsTable;
import org.apache.paimon.table.system.SystemTableLoader;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.Preconditions;
@@ -66,8 +64,6 @@ import static
org.apache.paimon.catalog.CatalogUtils.listPartitionsFromFileSyste
import static org.apache.paimon.catalog.CatalogUtils.validateAutoCreateClose;
import static org.apache.paimon.options.CatalogOptions.LOCK_ENABLED;
import static org.apache.paimon.options.CatalogOptions.LOCK_TYPE;
-import static
org.apache.paimon.table.system.AllTableOptionsTable.ALL_TABLE_OPTIONS;
-import static
org.apache.paimon.table.system.CatalogOptionsTable.CATALOG_OPTIONS;
import static org.apache.paimon.utils.BranchManager.DEFAULT_MAIN_BRANCH;
import static org.apache.paimon.utils.Preconditions.checkArgument;
import static org.apache.paimon.utils.Preconditions.checkNotNull;
@@ -372,15 +368,7 @@ public abstract class AbstractCatalog implements Catalog {
@Override
public Table getTable(Identifier identifier) throws TableNotExistException
{
if (isSystemDatabase(identifier.getDatabaseName())) {
- String tableName = identifier.getTableName();
- switch (tableName.toLowerCase()) {
- case ALL_TABLE_OPTIONS:
- return new AllTableOptionsTable(fileIO, allTablePaths());
- case CATALOG_OPTIONS:
- return new CatalogOptionsTable(catalogOptions);
- default:
- throw new TableNotExistException(identifier);
- }
+ return
CatalogUtils.createGlobalSystemTable(identifier.getTableName(), this);
} else if (identifier.isSystemTable()) {
Table originTable =
getDataOrFormatTable(
@@ -454,22 +442,6 @@ public abstract class AbstractCatalog implements Catalog {
return newDatabasePath(warehouse(), database);
}
- public Map<String, Map<String, Path>> allTablePaths() {
- try {
- Map<String, Map<String, Path>> allPaths = new HashMap<>();
- for (String database : listDatabases()) {
- Map<String, Path> tableMap =
- allPaths.computeIfAbsent(database, d -> new
HashMap<>());
- for (String table : listTables(database)) {
- tableMap.put(table,
getTableLocation(Identifier.create(database, table)));
- }
- }
- return allPaths;
- } catch (DatabaseNotExistException e) {
- throw new RuntimeException("Database is deleted while listing", e);
- }
- }
-
protected TableMeta getDataTableMeta(Identifier identifier) throws
TableNotExistException {
return new TableMeta(getDataTableSchema(identifier), null);
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogUtils.java
b/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogUtils.java
index 9267532f9d..9b69248d6d 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogUtils.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogUtils.java
@@ -26,11 +26,14 @@ import org.apache.paimon.partition.Partition;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
+import org.apache.paimon.table.system.AllTableOptionsTable;
+import org.apache.paimon.table.system.CatalogOptionsTable;
import org.apache.paimon.table.system.SystemTableLoader;
import org.apache.paimon.utils.InternalRowPartitionComputer;
import org.apache.paimon.utils.Preconditions;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -39,6 +42,8 @@ import static
org.apache.paimon.CoreOptions.PARTITION_GENERATE_LEGCY_NAME;
import static org.apache.paimon.catalog.Catalog.SYSTEM_DATABASE_NAME;
import static org.apache.paimon.catalog.Catalog.TABLE_DEFAULT_OPTION_PREFIX;
import static
org.apache.paimon.options.OptionsUtils.convertToPropertiesPrefixKey;
+import static
org.apache.paimon.table.system.AllTableOptionsTable.ALL_TABLE_OPTIONS;
+import static
org.apache.paimon.table.system.CatalogOptionsTable.CATALOG_OPTIONS;
import static org.apache.paimon.utils.Preconditions.checkArgument;
/** Utils for {@link Catalog}. */
@@ -121,6 +126,31 @@ public class CatalogUtils {
CoreOptions.AUTO_CREATE.key(), Boolean.FALSE));
}
+ public static Table createGlobalSystemTable(String tableName, Catalog
catalog)
+ throws Catalog.TableNotExistException {
+ switch (tableName.toLowerCase()) {
+ case ALL_TABLE_OPTIONS:
+ try {
+ Map<Identifier, Map<String, String>> allOptions = new
HashMap<>();
+ for (String database : catalog.listDatabases()) {
+ for (String name : catalog.listTables(database)) {
+ Identifier identifier =
Identifier.create(database, name);
+ Table table = catalog.getTable(identifier);
+ allOptions.put(identifier, table.options());
+ }
+ }
+ return new AllTableOptionsTable(allOptions);
+ } catch (Catalog.DatabaseNotExistException |
Catalog.TableNotExistException e) {
+ throw new RuntimeException("Database is deleted while
listing", e);
+ }
+ case CATALOG_OPTIONS:
+ return new
CatalogOptionsTable(Options.fromMap(catalog.options()));
+ default:
+ throw new Catalog.TableNotExistException(
+ Identifier.create(SYSTEM_DATABASE_NAME, tableName));
+ }
+ }
+
public static Table createSystemTable(Identifier identifier, Table
originTable)
throws Catalog.TableNotExistException {
if (!(originTable instanceof FileStoreTable)) {
diff --git a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
index 3f7647ca84..a807ad2c9d 100644
--- a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
@@ -273,7 +273,7 @@ public class RESTCatalog implements Catalog {
@Override
public Table getTable(Identifier identifier) throws TableNotExistException
{
if (SYSTEM_DATABASE_NAME.equals(identifier.getDatabaseName())) {
- throw new UnsupportedOperationException("TODO support global
system tables.");
+ return
CatalogUtils.createGlobalSystemTable(identifier.getTableName(), this);
} else if (identifier.isSystemTable()) {
return getSystemTable(identifier);
} else {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/system/AllTableOptionsTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/system/AllTableOptionsTable.java
index 13b5366a6a..b354a263c7 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/system/AllTableOptionsTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/system/AllTableOptionsTable.java
@@ -18,15 +18,13 @@
package org.apache.paimon.table.system;
+import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.disk.IOManager;
-import org.apache.paimon.fs.FileIO;
-import org.apache.paimon.fs.Path;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.reader.RecordReader;
-import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.table.ReadonlyTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.source.InnerTableRead;
@@ -45,7 +43,6 @@ import
org.apache.paimon.shade.guava30.com.google.common.collect.Iterators;
import java.util.ArrayList;
import java.util.Collections;
-import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -68,13 +65,10 @@ public class AllTableOptionsTable implements ReadonlyTable {
public static final String ALL_TABLE_OPTIONS = "all_table_options";
- private final FileIO fileIO;
- private final Map<String, Map<String, Path>> allTablePaths;
+ private final Map<Identifier, Map<String, String>> allOptions;
- public AllTableOptionsTable(FileIO fileIO, Map<String, Map<String, Path>>
allTablePaths) {
- // allTablePath is the map of <database, <table_name, properties>>
- this.fileIO = fileIO;
- this.allTablePaths = allTablePaths;
+ public AllTableOptionsTable(Map<Identifier, Map<String, String>>
allOptions) {
+ this.allOptions = allOptions;
}
@Override
@@ -104,12 +98,12 @@ public class AllTableOptionsTable implements ReadonlyTable
{
@Override
public InnerTableRead newRead() {
- return new AllTableOptionsRead(fileIO);
+ return new AllTableOptionsRead();
}
@Override
public Table copy(Map<String, String> dynamicOptions) {
- return new AllTableOptionsTable(fileIO, allTablePaths);
+ return new AllTableOptionsTable(allOptions);
}
private class AllTableOptionsScan extends ReadOnceTableScan {
@@ -121,7 +115,7 @@ public class AllTableOptionsTable implements ReadonlyTable {
@Override
public Plan innerPlan() {
- return () -> Collections.singletonList(new
AllTableSplit(allTablePaths));
+ return () -> Collections.singletonList(new
AllTableSplit(allOptions));
}
}
@@ -129,10 +123,10 @@ public class AllTableOptionsTable implements
ReadonlyTable {
private static final long serialVersionUID = 1L;
- private final Map<String, Map<String, Path>> allTablePaths;
+ private final Map<Identifier, Map<String, String>> allOptions;
- private AllTableSplit(Map<String, Map<String, Path>> allTablePaths) {
- this.allTablePaths = allTablePaths;
+ private AllTableSplit(Map<Identifier, Map<String, String>> allOptions)
{
+ this.allOptions = allOptions;
}
@Override
@@ -144,24 +138,19 @@ public class AllTableOptionsTable implements
ReadonlyTable {
return false;
}
AllTableSplit that = (AllTableSplit) o;
- return Objects.equals(allTablePaths, that.allTablePaths);
+ return Objects.equals(allOptions, that.allOptions);
}
@Override
public int hashCode() {
- return Objects.hash(allTablePaths);
+ return Objects.hash(allOptions);
}
}
private static class AllTableOptionsRead implements InnerTableRead {
- private final FileIO fileIO;
private RowType readType;
- public AllTableOptionsRead(FileIO fileIO) {
- this.fileIO = fileIO;
- }
-
@Override
public InnerTableRead withFilter(Predicate predicate) {
return this;
@@ -183,29 +172,12 @@ public class AllTableOptionsTable implements
ReadonlyTable {
if (!(split instanceof AllTableSplit)) {
throw new IllegalArgumentException("Unsupported split: " +
split.getClass());
}
- Map<String, Map<String, Path>> location = ((AllTableSplit)
split).allTablePaths;
- Iterator<InternalRow> rows = toRow(options(fileIO, location));
- if (readType != null) {
- rows =
- Iterators.transform(
- rows,
- row ->
- ProjectedRow.from(
- readType,
AggregationFieldsTable.TABLE_TYPE)
- .replaceRow(row));
- }
- return new IteratorRecordReader<>(rows);
- }
- }
-
- protected static Iterator<InternalRow> toRow(
- Map<String, Map<String, Map<String, String>>> option) {
- List<InternalRow> rows = new ArrayList<>();
- for (Map.Entry<String, Map<String, Map<String, String>>> entry0 :
option.entrySet()) {
- String database = entry0.getKey();
- for (Map.Entry<String, Map<String, String>> entry1 :
entry0.getValue().entrySet()) {
- String tableName = entry1.getKey();
- for (Map.Entry<String, String> entry2 :
entry1.getValue().entrySet()) {
+ List<InternalRow> rows = new ArrayList<>();
+ for (Map.Entry<Identifier, Map<String, String>> entry :
+ ((AllTableSplit) split).allOptions.entrySet()) {
+ String database = entry.getKey().getDatabaseName();
+ String tableName = entry.getKey().getTableName();
+ for (Map.Entry<String, String> entry2 :
entry.getValue().entrySet()) {
String key = entry2.getKey();
String value = entry2.getValue();
rows.add(
@@ -216,25 +188,17 @@ public class AllTableOptionsTable implements
ReadonlyTable {
BinaryString.fromString(value)));
}
}
- }
- return rows.iterator();
- }
-
- protected static Map<String, Map<String, Map<String, String>>> options(
- FileIO fileIO, Map<String, Map<String, Path>> allTablePaths) {
- Map<String, Map<String, Map<String, String>>> allOptions = new
HashMap<>();
- for (Map.Entry<String, Map<String, Path>> entry0 :
allTablePaths.entrySet()) {
- Map<String, Map<String, String>> m0 =
- allOptions.computeIfAbsent(entry0.getKey(), k -> new
HashMap<>());
- for (Map.Entry<String, Path> entry1 :
entry0.getValue().entrySet()) {
- Map<String, String> options =
- new SchemaManager(fileIO, entry1.getValue())
- .latest()
- .orElseThrow(() -> new RuntimeException("Table
not exists."))
- .options();
- m0.put(entry1.getKey(), options);
+ Iterator<InternalRow> iterator = rows.iterator();
+ if (readType != null) {
+ iterator =
+ Iterators.transform(
+ iterator,
+ row ->
+ ProjectedRow.from(
+ readType,
AggregationFieldsTable.TABLE_TYPE)
+ .replaceRow(row));
}
+ return new IteratorRecordReader<>(iterator);
}
- return allOptions;
}
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java
b/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java
index f7aa4ab5a6..6448972cde 100644
--- a/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java
+++ b/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java
@@ -426,10 +426,6 @@ public abstract class CatalogTestBase {
() ->
catalog.getTable(Identifier.create("non_existing_db", "test_table")))
.withMessage("Table non_existing_db.test_table does not
exist.");
- // Get all table options from system database
- if (!supportGetFromSystemDatabase()) {
- return;
- }
Table allTableOptionsTable =
catalog.getTable(Identifier.create(SYSTEM_DATABASE_NAME,
ALL_TABLE_OPTIONS));
assertThat(allTableOptionsTable).isNotNull();
@@ -1029,10 +1025,6 @@ public abstract class CatalogTestBase {
.isGreaterThan(0);
}
- protected boolean supportGetFromSystemDatabase() {
- return true;
- }
-
protected boolean supportsAlterDatabase() {
return false;
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java
b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java
index b34ca1e5ac..4bbfcde215 100644
--- a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java
@@ -67,11 +67,6 @@ class RESTCatalogTest extends CatalogTestBase {
restCatalogServer.shutdown();
}
- @Override
- protected boolean supportGetFromSystemDatabase() {
- return false;
- }
-
@Test
void testInitFailWhenDefineWarehouse() {
Options options = new Options();
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/system/AllTableOptionsTableTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/system/AllTableOptionsTableTest.java
index 764c0f4e16..16e3baadfa 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/system/AllTableOptionsTableTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/system/AllTableOptionsTableTest.java
@@ -59,11 +59,12 @@ public class AllTableOptionsTableTest extends TableTestBase
{
}
@Test
- public void testSchemasTable() throws Exception {
+ public void testAllTableOptionsTable() throws Exception {
List<String> result =
read(allTableOptionsTable).stream()
.map(Objects::toString)
.collect(Collectors.toList());
+ result = result.stream().filter(r ->
!r.contains("path")).collect(Collectors.toList());
assertThat(result)
.containsExactlyInAnyOrder(
"+I(default,T,fields.sales.aggregate-function,sum)",
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
index b82b511b63..8cd6afbb4d 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
@@ -158,7 +158,10 @@ public class CatalogTableITCase extends CatalogITCaseBase {
sql("CREATE TABLE T (a INT, b INT) with ('a.aa.aaa'='val1',
'b.bb.bbb'='val2')");
sql("ALTER TABLE T SET ('c.cc.ccc' = 'val3')");
- List<Row> result = sql("SELECT * FROM sys.all_table_options");
+ List<Row> result =
+ sql("SELECT * FROM sys.all_table_options").stream()
+ .filter(row -> !row.getField(2).equals("path"))
+ .collect(Collectors.toList());
assertThat(result)
.containsExactly(
Row.of("default", "T", "a.aa.aaa", "val1"),
@@ -1100,6 +1103,72 @@ public class CatalogTableITCase extends
CatalogITCaseBase {
innerTestReadOptimizedTable();
}
+ @Test
+ public void testBinlogTableStreamRead() throws Exception {
+ sql(
+ "CREATE TABLE T (a INT, b INT, primary key (a) NOT ENFORCED)
with ('changelog-producer' = 'lookup', "
+ + "'bucket' = '2')");
+ BlockingIterator<Row, Row> iterator =
+ streamSqlBlockIter("SELECT * FROM T$binlog /*+
OPTIONS('scan.mode' = 'latest') */");
+ sql("INSERT INTO T VALUES (1, 2)");
+ sql("INSERT INTO T VALUES (1, 3)");
+ sql("INSERT INTO T VALUES (2, 2)");
+ List<Row> rows = iterator.collect(3);
+ assertThat(rows)
+ .containsExactly(
+ Row.of("+I", new Integer[] {1}, new Integer[] {2}),
+ Row.of("+U", new Integer[] {1, 1}, new Integer[] {2,
3}),
+ Row.of("+I", new Integer[] {2}, new Integer[] {2}));
+ iterator.close();
+ }
+
+ @Test
+ public void testBinlogTableBatchRead() throws Exception {
+ sql(
+ "CREATE TABLE T (a INT, b INT, primary key (a) NOT ENFORCED)
with ('changelog-producer' = 'lookup', "
+ + "'bucket' = '2')");
+ sql("INSERT INTO T VALUES (1, 2)");
+ sql("INSERT INTO T VALUES (1, 3)");
+ sql("INSERT INTO T VALUES (2, 2)");
+ List<Row> rows = sql("SELECT * FROM T$binlog /*+ OPTIONS('scan.mode' =
'latest') */");
+ assertThat(rows)
+ .containsExactly(
+ Row.of("+I", new Integer[] {1}, new Integer[] {3}),
+ Row.of("+I", new Integer[] {2}, new Integer[] {2}));
+ }
+
+ @Test
+ public void testIndexesTable() {
+ sql(
+ "CREATE TABLE T (pt STRING, a INT, b STRING, PRIMARY KEY (pt,
a) NOT ENFORCED)"
+ + " PARTITIONED BY (pt) with
('deletion-vectors.enabled'='true')");
+ sql(
+ "INSERT INTO T VALUES ('2024-10-01', 1,
'aaaaaaaaaaaaaaaaaaa'), ('2024-10-01', 2, 'b'), ('2024-10-01', 3, 'c')");
+ sql("INSERT INTO T VALUES ('2024-10-01', 1, 'a_new1'), ('2024-10-01',
3, 'c_new1')");
+
+ List<Row> rows = sql("SELECT * FROM `T$table_indexes` WHERE index_type
= 'HASH'");
+ assertThat(rows.size()).isEqualTo(1);
+ Row row = rows.get(0);
+ assertThat(row.getField(0)).isEqualTo("{2024-10-01}");
+ assertThat(row.getField(1)).isEqualTo(0);
+ assertThat(row.getField(2)).isEqualTo("HASH");
+ assertThat(row.getField(3).toString().startsWith("index-")).isTrue();
+ assertThat(row.getField(4)).isEqualTo(12L);
+ assertThat(row.getField(5)).isEqualTo(3L);
+ assertThat(row.getField(6)).isNull();
+
+ rows = sql("SELECT * FROM `T$table_indexes` WHERE index_type =
'DELETION_VECTORS'");
+ assertThat(rows.size()).isEqualTo(1);
+ row = rows.get(0);
+ assertThat(row.getField(0)).isEqualTo("{2024-10-01}");
+ assertThat(row.getField(1)).isEqualTo(0);
+ assertThat(row.getField(2)).isEqualTo("DELETION_VECTORS");
+ assertThat(row.getField(3).toString().startsWith("index-")).isTrue();
+ assertThat(row.getField(4)).isEqualTo(33L);
+ assertThat(row.getField(5)).isEqualTo(1L);
+ assertThat(row.getField(6)).isNotNull();
+ }
+
private void innerTestReadOptimizedTable() {
// full compaction will always be performed at the end of batch jobs,
as long as
// full-compaction.delta-commits is set, regardless of its value
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SystemTableITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SystemTableITCase.java
deleted file mode 100644
index e28078052b..0000000000
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SystemTableITCase.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.flink;
-
-import org.apache.paimon.utils.BlockingIterator;
-
-import org.apache.flink.types.Row;
-import org.junit.jupiter.api.Test;
-
-import java.util.List;
-
-import static org.assertj.core.api.Assertions.assertThat;
-
-/** ITCase for system table. */
-public class SystemTableITCase extends CatalogTableITCase {
-
- @Test
- public void testBinlogTableStreamRead() throws Exception {
- sql(
- "CREATE TABLE T (a INT, b INT, primary key (a) NOT ENFORCED)
with ('changelog-producer' = 'lookup', "
- + "'bucket' = '2')");
- BlockingIterator<Row, Row> iterator =
- streamSqlBlockIter("SELECT * FROM T$binlog /*+
OPTIONS('scan.mode' = 'latest') */");
- sql("INSERT INTO T VALUES (1, 2)");
- sql("INSERT INTO T VALUES (1, 3)");
- sql("INSERT INTO T VALUES (2, 2)");
- List<Row> rows = iterator.collect(3);
- assertThat(rows)
- .containsExactly(
- Row.of("+I", new Integer[] {1}, new Integer[] {2}),
- Row.of("+U", new Integer[] {1, 1}, new Integer[] {2,
3}),
- Row.of("+I", new Integer[] {2}, new Integer[] {2}));
- iterator.close();
- }
-
- @Test
- public void testBinlogTableBatchRead() throws Exception {
- sql(
- "CREATE TABLE T (a INT, b INT, primary key (a) NOT ENFORCED)
with ('changelog-producer' = 'lookup', "
- + "'bucket' = '2')");
- sql("INSERT INTO T VALUES (1, 2)");
- sql("INSERT INTO T VALUES (1, 3)");
- sql("INSERT INTO T VALUES (2, 2)");
- List<Row> rows = sql("SELECT * FROM T$binlog /*+ OPTIONS('scan.mode' =
'latest') */");
- assertThat(rows)
- .containsExactly(
- Row.of("+I", new Integer[] {1}, new Integer[] {3}),
- Row.of("+I", new Integer[] {2}, new Integer[] {2}));
- }
-
- @Test
- public void testIndexesTable() {
- sql(
- "CREATE TABLE T (pt STRING, a INT, b STRING, PRIMARY KEY (pt,
a) NOT ENFORCED)"
- + " PARTITIONED BY (pt) with
('deletion-vectors.enabled'='true')");
- sql(
- "INSERT INTO T VALUES ('2024-10-01', 1,
'aaaaaaaaaaaaaaaaaaa'), ('2024-10-01', 2, 'b'), ('2024-10-01', 3, 'c')");
- sql("INSERT INTO T VALUES ('2024-10-01', 1, 'a_new1'), ('2024-10-01',
3, 'c_new1')");
-
- List<Row> rows = sql("SELECT * FROM `T$table_indexes` WHERE index_type
= 'HASH'");
- assertThat(rows.size()).isEqualTo(1);
- Row row = rows.get(0);
- assertThat(row.getField(0)).isEqualTo("{2024-10-01}");
- assertThat(row.getField(1)).isEqualTo(0);
- assertThat(row.getField(2)).isEqualTo("HASH");
- assertThat(row.getField(3).toString().startsWith("index-")).isTrue();
- assertThat(row.getField(4)).isEqualTo(12L);
- assertThat(row.getField(5)).isEqualTo(3L);
- assertThat(row.getField(6)).isNull();
-
- rows = sql("SELECT * FROM `T$table_indexes` WHERE index_type =
'DELETION_VECTORS'");
- assertThat(rows.size()).isEqualTo(1);
- row = rows.get(0);
- assertThat(row.getField(0)).isEqualTo("{2024-10-01}");
- assertThat(row.getField(1)).isEqualTo(0);
- assertThat(row.getField(2)).isEqualTo("DELETION_VECTORS");
- assertThat(row.getField(3).toString().startsWith("index-")).isTrue();
- assertThat(row.getField(4)).isEqualTo(33L);
- assertThat(row.getField(5)).isEqualTo(1L);
- assertThat(row.getField(6)).isNotNull();
- }
-}