This is an automated email from the ASF dual-hosted git repository.
snuyanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 1c6574d2ff6 [FLINK-38470][table] Make
`CreateMaterializedTableOperation` return `ResolvedCatalogMaterializedTable`
1c6574d2ff6 is described below
commit 1c6574d2ff6736011170e6efed8ebd38e61b7054
Author: Ramin Gharib <[email protected]>
AuthorDate: Thu Oct 2 14:08:13 2025 +0200
[FLINK-38470][table] Make `CreateMaterializedTableOperation` return
`ResolvedCatalogMaterializedTable`
---
.../materializedtable/MaterializedTableManager.java | 6 +++---
.../sql/parser/ddl/SqlCreateMaterializedTable.java | 12 ++++++------
.../CreateMaterializedTableOperation.java | 5 ++---
...lMaterializedTableNodeToOperationConverterTest.java | 18 ++++++------------
4 files changed, 17 insertions(+), 24 deletions(-)
diff --git
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java
index 854ea910c28..323e0ca17b7 100644
---
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java
+++
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java
@@ -197,7 +197,7 @@ public class MaterializedTableManager {
OperationExecutor operationExecutor,
OperationHandle handle,
CreateMaterializedTableOperation createMaterializedTableOperation)
{
- CatalogMaterializedTable materializedTable =
+ ResolvedCatalogMaterializedTable materializedTable =
createMaterializedTableOperation.getCatalogMaterializedTable();
if (CatalogMaterializedTable.RefreshMode.CONTINUOUS ==
materializedTable.getRefreshMode()) {
createMaterializedTableInContinuousMode(
@@ -220,7 +220,7 @@ public class MaterializedTableManager {
ObjectIdentifier materializedTableIdentifier =
createMaterializedTableOperation.getTableIdentifier();
- CatalogMaterializedTable catalogMaterializedTable =
+ ResolvedCatalogMaterializedTable catalogMaterializedTable =
createMaterializedTableOperation.getCatalogMaterializedTable();
try {
@@ -257,7 +257,7 @@ public class MaterializedTableManager {
ObjectIdentifier materializedTableIdentifier =
createMaterializedTableOperation.getTableIdentifier();
- CatalogMaterializedTable catalogMaterializedTable =
+ ResolvedCatalogMaterializedTable catalogMaterializedTable =
createMaterializedTableOperation.getCatalogMaterializedTable();
// convert duration to cron expression
diff --git
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateMaterializedTable.java
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateMaterializedTable.java
index 626c3c63ebd..5a9245c5dd0 100644
---
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateMaterializedTable.java
+++
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateMaterializedTable.java
@@ -51,11 +51,11 @@ public class SqlCreateMaterializedTable extends SqlCreate {
private final SqlIdentifier tableName;
- private final SqlCharStringLiteral comment;
+ private final @Nullable SqlTableConstraint tableConstraint;
- private final SqlTableConstraint tableConstraint;
+ private final @Nullable SqlCharStringLiteral comment;
- private final SqlDistribution distribution;
+ private final @Nullable SqlDistribution distribution;
private final SqlNodeList partitionKeyList;
@@ -63,7 +63,7 @@ public class SqlCreateMaterializedTable extends SqlCreate {
private final SqlIntervalLiteral freshness;
- @Nullable private final SqlLiteral refreshMode;
+ private final @Nullable SqlLiteral refreshMode;
private final SqlNode asQuery;
@@ -80,8 +80,8 @@ public class SqlCreateMaterializedTable extends SqlCreate {
SqlNode asQuery) {
super(OPERATOR, pos, false, false);
this.tableName = requireNonNull(tableName, "tableName should not be
null");
- this.comment = comment;
this.tableConstraint = tableConstraint;
+ this.comment = comment;
this.distribution = distribution;
this.partitionKeyList =
requireNonNull(partitionKeyList, "partitionKeyList should not
be null");
@@ -124,7 +124,7 @@ public class SqlCreateMaterializedTable extends SqlCreate {
return Optional.ofNullable(tableConstraint);
}
- public SqlDistribution getDistribution() {
+ public @Nullable SqlDistribution getDistribution() {
return distribution;
}
diff --git
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/CreateMaterializedTableOperation.java
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/CreateMaterializedTableOperation.java
index d4eff00254d..9fb68dc71b0 100644
---
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/CreateMaterializedTableOperation.java
+++
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/CreateMaterializedTableOperation.java
@@ -21,7 +21,6 @@ package org.apache.flink.table.operations.materializedtable;
import org.apache.flink.annotation.Internal;
import org.apache.flink.table.api.internal.TableResultImpl;
import org.apache.flink.table.api.internal.TableResultInternal;
-import org.apache.flink.table.catalog.CatalogMaterializedTable;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.catalog.ResolvedCatalogMaterializedTable;
import org.apache.flink.table.operations.Operation;
@@ -38,7 +37,7 @@ public class CreateMaterializedTableOperation
implements CreateOperation, MaterializedTableOperation {
private final ObjectIdentifier tableIdentifier;
- private final CatalogMaterializedTable materializedTable;
+ private final ResolvedCatalogMaterializedTable materializedTable;
public CreateMaterializedTableOperation(
ObjectIdentifier tableIdentifier, ResolvedCatalogMaterializedTable
materializedTable) {
@@ -57,7 +56,7 @@ public class CreateMaterializedTableOperation
return tableIdentifier;
}
- public CatalogMaterializedTable getCatalogMaterializedTable() {
+ public ResolvedCatalogMaterializedTable getCatalogMaterializedTable() {
return materializedTable;
}
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java
index 49d74708c67..acc651b1e5b 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlMaterializedTableNodeToOperationConverterTest.java
@@ -126,8 +126,7 @@ public class
SqlMaterializedTableNodeToOperationConverterTest
assertThat(operation).isInstanceOf(CreateMaterializedTableOperation.class);
CreateMaterializedTableOperation op =
(CreateMaterializedTableOperation) operation;
- CatalogMaterializedTable materializedTable =
op.getCatalogMaterializedTable();
-
assertThat(materializedTable).isInstanceOf(ResolvedCatalogMaterializedTable.class);
+ ResolvedCatalogMaterializedTable materializedTable =
op.getCatalogMaterializedTable();
Map<String, String> options = new HashMap<>();
options.put("connector", "filesystem");
@@ -152,8 +151,7 @@ public class
SqlMaterializedTableNodeToOperationConverterTest
.definitionQuery("SELECT *\n" + "FROM
`builtin`.`default`.`t1`")
.build();
- assertThat(((ResolvedCatalogMaterializedTable)
materializedTable).getOrigin())
- .isEqualTo(expected);
+ assertThat(materializedTable.getOrigin()).isEqualTo(expected);
}
@Test
@@ -202,8 +200,7 @@ public class
SqlMaterializedTableNodeToOperationConverterTest
assertThat(operation).isInstanceOf(CreateMaterializedTableOperation.class);
CreateMaterializedTableOperation op =
(CreateMaterializedTableOperation) operation;
- CatalogMaterializedTable materializedTable =
op.getCatalogMaterializedTable();
-
assertThat(materializedTable).isInstanceOf(ResolvedCatalogMaterializedTable.class);
+ ResolvedCatalogMaterializedTable materializedTable =
op.getCatalogMaterializedTable();
assertThat(materializedTable.getLogicalRefreshMode())
.isEqualTo(CatalogMaterializedTable.LogicalRefreshMode.AUTOMATIC);
@@ -220,8 +217,7 @@ public class
SqlMaterializedTableNodeToOperationConverterTest
assertThat(operation2).isInstanceOf(CreateMaterializedTableOperation.class);
CreateMaterializedTableOperation op2 =
(CreateMaterializedTableOperation) operation2;
- CatalogMaterializedTable materializedTable2 =
op2.getCatalogMaterializedTable();
-
assertThat(materializedTable2).isInstanceOf(ResolvedCatalogMaterializedTable.class);
+ ResolvedCatalogMaterializedTable materializedTable2 =
op2.getCatalogMaterializedTable();
assertThat(materializedTable2.getLogicalRefreshMode())
.isEqualTo(CatalogMaterializedTable.LogicalRefreshMode.CONTINUOUS);
@@ -240,8 +236,7 @@ public class
SqlMaterializedTableNodeToOperationConverterTest
assertThat(operation).isInstanceOf(CreateMaterializedTableOperation.class);
CreateMaterializedTableOperation op =
(CreateMaterializedTableOperation) operation;
- CatalogMaterializedTable materializedTable =
op.getCatalogMaterializedTable();
-
assertThat(materializedTable).isInstanceOf(ResolvedCatalogMaterializedTable.class);
+ ResolvedCatalogMaterializedTable materializedTable =
op.getCatalogMaterializedTable();
assertThat(materializedTable.getLogicalRefreshMode())
.isEqualTo(CatalogMaterializedTable.LogicalRefreshMode.AUTOMATIC);
@@ -258,8 +253,7 @@ public class
SqlMaterializedTableNodeToOperationConverterTest
assertThat(operation2).isInstanceOf(CreateMaterializedTableOperation.class);
CreateMaterializedTableOperation op2 =
(CreateMaterializedTableOperation) operation2;
- CatalogMaterializedTable materializedTable2 =
op2.getCatalogMaterializedTable();
-
assertThat(materializedTable2).isInstanceOf(ResolvedCatalogMaterializedTable.class);
+ ResolvedCatalogMaterializedTable materializedTable2 =
op2.getCatalogMaterializedTable();
assertThat(materializedTable2.getLogicalRefreshMode())
.isEqualTo(CatalogMaterializedTable.LogicalRefreshMode.FULL);