This is an automated email from the ASF dual-hosted git repository.
hongshun pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new b85ba5a26 [flink] Get table info of primary lake table. (#2152)
b85ba5a26 is described below
commit b85ba5a263495a7ba18bb1639be6e353db42dd59
Author: Hongshun Wang <[email protected]>
AuthorDate: Tue Dec 16 19:22:19 2025 +0800
[flink] Get table info of primary lake table. (#2152)
---
.../apache/fluss/flink/catalog/Flink21Catalog.java | 22 +++++-
.../fluss/flink/catalog/FlinkCatalog21Test.java | 61 +++++++++++++++
.../apache/fluss/flink/catalog/FlinkCatalog.java | 19 ++++-
.../fluss/flink/catalog/FlinkCatalogTest.java | 91 ++++++++++++++++------
4 files changed, 166 insertions(+), 27 deletions(-)
diff --git
a/fluss-flink/fluss-flink-2.1/src/main/java/org/apache/fluss/flink/catalog/Flink21Catalog.java
b/fluss-flink/fluss-flink-2.1/src/main/java/org/apache/fluss/flink/catalog/Flink21Catalog.java
index 4f567aae0..7a6f737e0 100644
---
a/fluss-flink/fluss-flink-2.1/src/main/java/org/apache/fluss/flink/catalog/Flink21Catalog.java
+++
b/fluss-flink/fluss-flink-2.1/src/main/java/org/apache/fluss/flink/catalog/Flink21Catalog.java
@@ -18,8 +18,10 @@
package org.apache.fluss.flink.catalog;
+import org.apache.fluss.flink.lake.LakeFlinkCatalog;
import org.apache.fluss.metadata.TableInfo;
+import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.catalog.CatalogBaseTable;
import org.apache.flink.table.catalog.CatalogTable;
@@ -44,11 +46,29 @@ public class Flink21Catalog extends FlinkCatalog {
super(name, defaultDatabase, bootstrapServers, classLoader,
securityConfigs);
}
+ @VisibleForTesting
+ public Flink21Catalog(
+ String name,
+ String defaultDatabase,
+ String bootstrapServers,
+ ClassLoader classLoader,
+ Map<String, String> securityConfigs,
+ LakeFlinkCatalog lakeFlinkCatalog) {
+ super(
+ name,
+ defaultDatabase,
+ bootstrapServers,
+ classLoader,
+ securityConfigs,
+ lakeFlinkCatalog);
+ }
+
@Override
public CatalogBaseTable getTable(ObjectPath objectPath)
throws TableNotExistException, CatalogException {
CatalogBaseTable catalogBaseTable = super.getTable(objectPath);
- if (!(catalogBaseTable instanceof CatalogTable)) {
+ if (!(catalogBaseTable instanceof CatalogTable)
+ || objectPath.getObjectName().contains(LAKE_TABLE_SPLITTER)) {
return catalogBaseTable;
}
diff --git
a/fluss-flink/fluss-flink-2.1/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalog21Test.java
b/fluss-flink/fluss-flink-2.1/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalog21Test.java
new file mode 100644
index 000000000..e66a625e5
--- /dev/null
+++
b/fluss-flink/fluss-flink-2.1/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalog21Test.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.flink.catalog;
+
+import org.apache.fluss.flink.lake.LakeFlinkCatalog;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.DefaultIndex;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.catalog.UniqueConstraint;
+
+import java.util.Arrays;
+import java.util.Collections;
+
+/** Test for {@link Flink21Catalog}. */
+public class FlinkCatalog21Test extends FlinkCatalogTest {
+
+ @Override
+ protected FlinkCatalog initCatalog(
+ String catalogName,
+ String databaseName,
+ String bootstrapServers,
+ LakeFlinkCatalog lakeFlinkCatalog) {
+ return new Flink21Catalog(
+ catalogName,
+ databaseName,
+ bootstrapServers,
+ Thread.currentThread().getContextClassLoader(),
+ Collections.emptyMap(),
+ lakeFlinkCatalog);
+ }
+
+ protected ResolvedSchema createSchema() {
+ return new ResolvedSchema(
+ Arrays.asList(
+ Column.physical("first", DataTypes.STRING().notNull()),
+ Column.physical("second", DataTypes.INT()),
+ Column.physical("third",
DataTypes.STRING().notNull())),
+ Collections.emptyList(),
+ UniqueConstraint.primaryKey("PK_first_third",
Arrays.asList("first", "third")),
+ Collections.singletonList(
+ DefaultIndex.newIndex(
+ "INDEX_first_third", Arrays.asList("first",
"third"))));
+ }
+}
diff --git
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java
index 16dc60c86..16b05089a 100644
---
a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java
+++
b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/catalog/FlinkCatalog.java
@@ -124,13 +124,30 @@ public class FlinkCatalog extends AbstractCatalog {
String bootstrapServers,
ClassLoader classLoader,
Map<String, String> securityConfigs) {
+ this(
+ name,
+ defaultDatabase,
+ bootstrapServers,
+ classLoader,
+ securityConfigs,
+ new LakeFlinkCatalog(name, classLoader));
+ }
+
+ @VisibleForTesting
+ public FlinkCatalog(
+ String name,
+ String defaultDatabase,
+ String bootstrapServers,
+ ClassLoader classLoader,
+ Map<String, String> securityConfigs,
+ LakeFlinkCatalog lakeFlinkCatalog) {
super(name, defaultDatabase);
this.catalogName = name;
this.defaultDatabase = defaultDatabase;
this.bootstrapServers = bootstrapServers;
this.classLoader = classLoader;
this.securityConfigs = securityConfigs;
- this.lakeFlinkCatalog = new LakeFlinkCatalog(catalogName, classLoader);
+ this.lakeFlinkCatalog = lakeFlinkCatalog;
}
@Override
diff --git
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogTest.java
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogTest.java
index abaf8e7e0..c32492c4b 100644
---
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogTest.java
+++
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogTest.java
@@ -22,6 +22,7 @@ import org.apache.fluss.config.Configuration;
import org.apache.fluss.exception.IllegalConfigurationException;
import org.apache.fluss.exception.InvalidPartitionException;
import org.apache.fluss.exception.InvalidTableException;
+import org.apache.fluss.flink.lake.LakeFlinkCatalog;
import org.apache.fluss.flink.utils.FlinkConversionsTest;
import org.apache.fluss.server.testutils.FlussClusterExtension;
import org.apache.fluss.utils.ExceptionUtils;
@@ -36,6 +37,7 @@ import
org.apache.flink.table.catalog.CatalogMaterializedTable;
import org.apache.flink.table.catalog.CatalogPartitionSpec;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.GenericInMemoryCatalog;
import org.apache.flink.table.catalog.IntervalFreshness;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.ResolvedCatalogMaterializedTable;
@@ -58,8 +60,7 @@ import org.apache.flink.table.expressions.ResolvedExpression;
import org.apache.flink.table.expressions.utils.ResolvedExpressionMock;
import org.apache.flink.table.factories.Factory;
import org.apache.flink.table.factories.FactoryUtil;
-import org.junit.jupiter.api.AfterAll;
-import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
@@ -78,6 +79,7 @@ import static
org.apache.fluss.config.ConfigOptions.TABLE_DATALAKE_FORMAT;
import static org.apache.fluss.flink.FlinkConnectorOptions.BUCKET_KEY;
import static org.apache.fluss.flink.FlinkConnectorOptions.BUCKET_NUMBER;
import static org.apache.fluss.flink.FlinkConnectorOptions.SCAN_STARTUP_MODE;
+import static
org.apache.fluss.flink.adapter.CatalogTableAdapter.toCatalogTable;
import static org.apache.fluss.flink.utils.CatalogTableTestUtils.addOptions;
import static
org.apache.fluss.flink.utils.CatalogTableTestUtils.checkEqualsIgnoreSchema;
import static
org.apache.fluss.flink.utils.CatalogTableTestUtils.checkEqualsRespectSchema;
@@ -101,7 +103,8 @@ class FlinkCatalogTest {
private static final FlinkConversionsTest.TestRefreshHandler
REFRESH_HANDLER =
new FlinkConversionsTest.TestRefreshHandler("jobID: xxx,
clusterId: yyy");
- static Catalog catalog;
+ private Catalog catalog;
+ private MockLakeFlinkCatalog mockLakeCatalog;
private final ObjectPath tableInDefaultDb = new ObjectPath(DEFAULT_DB,
"t1");
private static Configuration initConfig() {
@@ -110,7 +113,7 @@ class FlinkCatalogTest {
return configuration;
}
- private ResolvedSchema createSchema() {
+ protected ResolvedSchema createSchema() {
return new ResolvedSchema(
Arrays.asList(
Column.physical("first", DataTypes.STRING().notNull()),
@@ -128,7 +131,7 @@ class FlinkCatalogTest {
private CatalogTable newCatalogTable(
ResolvedSchema resolvedSchema, Map<String, String> options) {
CatalogTable origin =
- CatalogTable.of(
+ toCatalogTable(
Schema.newBuilder().fromResolvedSchema(resolvedSchema).build(),
"test comment",
Collections.emptyList(),
@@ -158,29 +161,36 @@ class FlinkCatalogTest {
return new ResolvedCatalogMaterializedTable(origin, resolvedSchema);
}
- @BeforeAll
- static void beforeAll() {
+ protected FlinkCatalog initCatalog(
+ String catalogName,
+ String databaseName,
+ String bootstrapServers,
+ LakeFlinkCatalog lakeFlinkCatalog) {
+ return new FlinkCatalog(
+ catalogName,
+ databaseName,
+ bootstrapServers,
+ Thread.currentThread().getContextClassLoader(),
+ Collections.emptyMap(),
+ lakeFlinkCatalog);
+ }
+
+ @BeforeEach
+ void beforeEach() throws Exception {
// set fluss conf
Configuration flussConf = FLUSS_CLUSTER_EXTENSION.getClientConfig();
+
+ mockLakeCatalog =
+ new MockLakeFlinkCatalog(
+ CATALOG_NAME,
Thread.currentThread().getContextClassLoader());
catalog =
- new FlinkCatalog(
+ initCatalog(
CATALOG_NAME,
DEFAULT_DB,
String.join(",", flussConf.get(BOOTSTRAP_SERVERS)),
- Thread.currentThread().getContextClassLoader(),
- Collections.emptyMap());
+ mockLakeCatalog);
catalog.open();
- }
- @AfterAll
- static void afterAll() {
- if (catalog != null) {
- catalog.close();
- }
- }
-
- @BeforeEach
- void beforeEach() throws Exception {
// First check if database exists, and drop it if it does
if (catalog.databaseExists(DEFAULT_DB)) {
catalog.dropDatabase(DEFAULT_DB, true, true);
@@ -198,6 +208,13 @@ class FlinkCatalogTest {
}
}
+ @AfterEach
+ void afterEach() {
+ if (catalog != null) {
+ catalog.close();
+ }
+ }
+
@Test
void testCreateTable() throws Exception {
Map<String, String> options = new HashMap<>();
@@ -270,7 +287,7 @@ class FlinkCatalogTest {
ResolvedSchema resolvedSchema = this.createSchema();
CatalogTable table2 =
new ResolvedCatalogTable(
- CatalogTable.of(
+ toCatalogTable(
Schema.newBuilder().fromResolvedSchema(resolvedSchema).build(),
"test comment",
Collections.singletonList("first"),
@@ -306,6 +323,11 @@ class FlinkCatalogTest {
CatalogTable table = this.newCatalogTable(options);
catalog.createTable(lakeTablePath, table, false);
assertThat(catalog.tableExists(lakeTablePath)).isTrue();
+ // get the lake table from lake catalog.
+ mockLakeCatalog.registerLakeTable(lakeTablePath, table);
+ assertThat((CatalogTable) catalog.getTable(new ObjectPath(DEFAULT_DB,
"lake_table$lake")))
+ .isEqualTo(table);
+
// drop fluss table
catalog.dropTable(lakeTablePath, false);
assertThat(catalog.tableExists(lakeTablePath)).isFalse();
@@ -363,7 +385,7 @@ class FlinkCatalogTest {
UniqueConstraint.primaryKey(
"PK_first",
Collections.singletonList("first")));
CatalogTable origin =
- CatalogTable.of(
+ toCatalogTable(
Schema.newBuilder().fromResolvedSchema(resolvedSchema).build(),
"test comment",
Collections.emptyList(),
@@ -648,7 +670,7 @@ class FlinkCatalogTest {
ResolvedSchema resolvedSchema = this.createSchema();
CatalogTable table2 =
new ResolvedCatalogTable(
- CatalogTable.of(
+ toCatalogTable(
Schema.newBuilder().fromResolvedSchema(resolvedSchema).build(),
"test comment",
Collections.singletonList("first"),
@@ -758,7 +780,7 @@ class FlinkCatalogTest {
ObjectPath partitionedPath = new ObjectPath(DEFAULT_DB,
"partitioned_table1");
CatalogTable partitionedTable =
new ResolvedCatalogTable(
- CatalogTable.of(
+ toCatalogTable(
Schema.newBuilder().fromResolvedSchema(resolvedSchema).build(),
"test comment",
Collections.singletonList("first"),
@@ -845,7 +867,7 @@ class FlinkCatalogTest {
ResolvedSchema schema = createSchema();
CatalogTable partTable =
new ResolvedCatalogTable(
- CatalogTable.of(
+ toCatalogTable(
Schema.newBuilder().fromResolvedSchema(schema).build(),
"partitioned table for stats",
Collections.singletonList("first"),
@@ -974,4 +996,23 @@ class FlinkCatalogTest {
checkEqualsRespectSchema((CatalogTable) tableCreated, table);
catalog.dropTable(tablePath, false);
}
+
+ private static class MockLakeFlinkCatalog extends LakeFlinkCatalog {
+ private final GenericInMemoryCatalog catalog;
+
+ public MockLakeFlinkCatalog(String catalogName, ClassLoader
classLoader) {
+ super(catalogName, classLoader);
+ catalog = new GenericInMemoryCatalog(catalogName, DEFAULT_DB);
+ }
+
+ @Override
+ public Catalog getLakeCatalog(Configuration tableOptions) {
+ return catalog;
+ }
+
+ void registerLakeTable(ObjectPath tablePath, CatalogTable table)
+ throws TableAlreadyExistException, DatabaseNotExistException {
+ catalog.createTable(tablePath, table, false);
+ }
+ }
}