This is an automated email from the ASF dual-hosted git repository.
snuyanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 004268ad2f4 [FLINK-38968][table] Add toCatalogTable method to
CatalogMaterializedTable interface
004268ad2f4 is described below
commit 004268ad2f43f43108679bafdfe6a2be6b0044b4
Author: Ramin Gharib <[email protected]>
AuthorDate: Fri Mar 27 23:29:57 2026 +0100
[FLINK-38968][table] Add toCatalogTable method to CatalogMaterializedTable
interface
---
.../catalog/CatalogBaseTableResolutionTest.java | 39 ++++++++++++++++++++++
.../table/catalog/CatalogMaterializedTable.java | 12 +++++++
.../catalog/ResolvedCatalogMaterializedTable.java | 11 +-----
3 files changed, 52 insertions(+), 10 deletions(-)
diff --git
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/CatalogBaseTableResolutionTest.java
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/CatalogBaseTableResolutionTest.java
index 92035ccc944..cc94ab86fbf 100644
---
a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/CatalogBaseTableResolutionTest.java
+++
b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/CatalogBaseTableResolutionTest.java
@@ -354,6 +354,45 @@ class CatalogBaseTableResolutionTest {
+ "distributed table must be at least 1.");
}
+ @Test
+ void testCatalogMaterializedTableToCatalogTable() {
+ final CatalogMaterializedTable materializedTable =
catalogMaterializedTable();
+
+ final CatalogTable catalogTable = materializedTable.toCatalogTable();
+
+ // Verify the conversion preserves properties
+ assertThat(catalogTable.getUnresolvedSchema())
+ .isEqualTo(materializedTable.getUnresolvedSchema());
+
assertThat(catalogTable.getComment()).isEqualTo(materializedTable.getComment());
+
assertThat(catalogTable.getPartitionKeys()).isEqualTo(materializedTable.getPartitionKeys());
+
assertThat(catalogTable.getOptions()).isEqualTo(materializedTable.getOptions());
+
assertThat(catalogTable.getDistribution()).isEqualTo(materializedTable.getDistribution());
+
assertThat(catalogTable.getSnapshot()).isEqualTo(materializedTable.getSnapshot());
+ }
+
+ @Test
+ void testResolvedCatalogMaterializedTableToResolvedCatalogTable() {
+ final CatalogMaterializedTable materializedTable =
catalogMaterializedTable();
+
+ final ResolvedCatalogMaterializedTable resolvedMaterializedTable =
+
resolveCatalogBaseTable(ResolvedCatalogMaterializedTable.class,
materializedTable);
+
+ final ResolvedCatalogTable resolvedCatalogTable =
+ resolvedMaterializedTable.toResolvedCatalogTable();
+
+ // Verify schema is preserved
+ assertThat(resolvedCatalogTable.getResolvedSchema())
+ .isEqualTo(resolvedMaterializedTable.getResolvedSchema());
+
+ // Verify origin properties are preserved
+ assertThat(resolvedCatalogTable.getComment())
+ .isEqualTo(resolvedMaterializedTable.getComment());
+ assertThat(resolvedCatalogTable.getPartitionKeys())
+ .isEqualTo(resolvedMaterializedTable.getPartitionKeys());
+ assertThat(resolvedCatalogTable.getOptions())
+ .isEqualTo(resolvedMaterializedTable.getOptions());
+ }
+
//
--------------------------------------------------------------------------------------------
// Utilities
//
--------------------------------------------------------------------------------------------
diff --git
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogMaterializedTable.java
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogMaterializedTable.java
index 92546ecee3b..0597fdd3a50 100644
---
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogMaterializedTable.java
+++
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogMaterializedTable.java
@@ -180,6 +180,18 @@ public interface CatalogMaterializedTable extends
CatalogBaseTable {
@Nullable
byte[] getSerializedRefreshHandler();
+ /** Convert this object to a {@link CatalogTable} object for planner
optimize query. */
+ default CatalogTable toCatalogTable() {
+ return CatalogTable.newBuilder()
+ .schema(getUnresolvedSchema())
+ .comment(getComment())
+ .distribution(getDistribution().orElse(null))
+ .partitionKeys(getPartitionKeys())
+ .options(getOptions())
+ .snapshot(getSnapshot().orElse(null))
+ .build();
+ }
+
/** The logical refresh mode of materialized table. */
@PublicEvolving
enum LogicalRefreshMode {
diff --git
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ResolvedCatalogMaterializedTable.java
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ResolvedCatalogMaterializedTable.java
index b87db09f1f2..722627d74ec 100644
---
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ResolvedCatalogMaterializedTable.java
+++
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ResolvedCatalogMaterializedTable.java
@@ -205,15 +205,6 @@ public class ResolvedCatalogMaterializedTable
/** Convert this object to a {@link ResolvedCatalogTable} object for
planner optimize query. */
public ResolvedCatalogTable toResolvedCatalogTable() {
- return new ResolvedCatalogTable(
- CatalogTable.newBuilder()
- .schema(getUnresolvedSchema())
- .comment(getComment())
- .distribution(getDistribution().orElse(null))
- .partitionKeys(getPartitionKeys())
- .options(getOptions())
- .snapshot(getSnapshot().orElse(null))
- .build(),
- getResolvedSchema());
+ return new ResolvedCatalogTable(origin.toCatalogTable(),
getResolvedSchema());
}
}