This is an automated email from the ASF dual-hosted git repository.

ron pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 29736b8c01924b7da03d4bcbfd9c812a8e5a08b4
Author: fengli <ldliu...@163.com>
AuthorDate: Mon May 6 20:24:16 2024 +0800

    [FLINK-35195][table] Support execute CreateMaterializedTableOperation for 
continuous refresh mode in SqlGateway
---
 flink-table/flink-sql-gateway/pom.xml              |   6 +
 .../MaterializedTableManager.java                  | 182 ++++++++++++++
 .../service/operation/OperationExecutor.java       |  25 +-
 .../service/MaterializedTableStatementITCase.java  | 274 +++++++++++++++++++++
 4 files changed, 483 insertions(+), 4 deletions(-)

diff --git a/flink-table/flink-sql-gateway/pom.xml 
b/flink-table/flink-sql-gateway/pom.xml
index 1a50d665a18..61f1e75942e 100644
--- a/flink-table/flink-sql-gateway/pom.xml
+++ b/flink-table/flink-sql-gateway/pom.xml
@@ -127,6 +127,12 @@
                        <type>test-jar</type>
                        <scope>test</scope>
                </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-filesystem-test-utils</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
     <build>
diff --git 
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java
 
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java
new file mode 100644
index 00000000000..fed60634a3a
--- /dev/null
+++ 
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.service.materializedtable;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.catalog.CatalogMaterializedTable;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.TableChange;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.gateway.api.operation.OperationHandle;
+import org.apache.flink.table.gateway.api.results.ResultSet;
+import org.apache.flink.table.gateway.service.operation.OperationExecutor;
+import org.apache.flink.table.gateway.service.result.ResultFetcher;
+import org.apache.flink.table.gateway.service.utils.SqlExecutionException;
+import 
org.apache.flink.table.operations.materializedtable.AlterMaterializedTableChangeOperation;
+import 
org.apache.flink.table.operations.materializedtable.CreateMaterializedTableOperation;
+import 
org.apache.flink.table.operations.materializedtable.DropMaterializedTableOperation;
+import 
org.apache.flink.table.operations.materializedtable.MaterializedTableOperation;
+import org.apache.flink.table.refresh.ContinuousRefreshHandler;
+import org.apache.flink.table.refresh.ContinuousRefreshHandlerSerializer;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.flink.api.common.RuntimeExecutionMode.STREAMING;
+import static org.apache.flink.configuration.DeploymentOptions.TARGET;
+import static org.apache.flink.configuration.ExecutionOptions.RUNTIME_MODE;
+import static org.apache.flink.configuration.PipelineOptions.NAME;
+import static 
org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL;
+import static 
org.apache.flink.table.api.internal.TableResultInternal.TABLE_RESULT_OK;
+
+/** Manager is responsible for execute the {@link MaterializedTableOperation}. 
*/
+@Internal
+public class MaterializedTableManager {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(MaterializedTableManager.class);
+
+    public static ResultFetcher callMaterializedTableOperation(
+            OperationExecutor operationExecutor,
+            OperationHandle handle,
+            MaterializedTableOperation op,
+            String statement) {
+        if (op instanceof CreateMaterializedTableOperation) {
+            return callCreateMaterializedTableOperation(
+                    operationExecutor, handle, 
(CreateMaterializedTableOperation) op);
+        }
+        throw new SqlExecutionException(
+                String.format(
+                        "Unsupported Operation %s for materialized table.", 
op.asSummaryString()));
+    }
+
+    private static ResultFetcher callCreateMaterializedTableOperation(
+            OperationExecutor operationExecutor,
+            OperationHandle handle,
+            CreateMaterializedTableOperation createMaterializedTableOperation) 
{
+        CatalogMaterializedTable materializedTable =
+                createMaterializedTableOperation.getCatalogMaterializedTable();
+        if (CatalogMaterializedTable.RefreshMode.CONTINUOUS == 
materializedTable.getRefreshMode()) {
+            createMaterializedInContinuousMode(
+                    operationExecutor, handle, 
createMaterializedTableOperation);
+        } else {
+            throw new SqlExecutionException(
+                    "Only support create materialized table in continuous 
refresh mode currently.");
+        }
+        // Just return ok for unify different refresh job info of continuous 
and full mode, user
+        // should get the refresh job info via desc table.
+        return ResultFetcher.fromTableResult(handle, TABLE_RESULT_OK, false);
+    }
+
+    private static void createMaterializedInContinuousMode(
+            OperationExecutor operationExecutor,
+            OperationHandle handle,
+            CreateMaterializedTableOperation createMaterializedTableOperation) 
{
+        // create materialized table first
+        operationExecutor.callExecutableOperation(handle, 
createMaterializedTableOperation);
+
+        ObjectIdentifier materializedTableIdentifier =
+                createMaterializedTableOperation.getTableIdentifier();
+        CatalogMaterializedTable catalogMaterializedTable =
+                createMaterializedTableOperation.getCatalogMaterializedTable();
+
+        // Set job name, runtime mode, checkpoint interval
+        // TODO: Set minibatch related optimization options.
+        Configuration customConfig = new Configuration();
+        String jobName =
+                String.format(
+                        "Materialized_table_%s_continuous_refresh_job",
+                        materializedTableIdentifier.asSerializableString());
+        customConfig.set(NAME, jobName);
+        customConfig.set(RUNTIME_MODE, STREAMING);
+        customConfig.set(CHECKPOINTING_INTERVAL, 
catalogMaterializedTable.getFreshness());
+
+        String insertStatement =
+                String.format(
+                        "INSERT INTO %s %s",
+                        materializedTableIdentifier, 
catalogMaterializedTable.getDefinitionQuery());
+        try {
+            // submit flink streaming job
+            ResultFetcher resultFetcher =
+                    operationExecutor.executeStatement(handle, 
insertStatement);
+
+            // get execution.target and jobId, currently doesn't support yarn 
and k8s, so doesn't
+            // get clusterId
+            List<RowData> results = fetchAllResults(resultFetcher);
+            String jobId = results.get(0).getString(0).toString();
+            String executeTarget =
+                    
operationExecutor.getSessionContext().getSessionConf().get(TARGET);
+            ContinuousRefreshHandler continuousRefreshHandler =
+                    new ContinuousRefreshHandler(executeTarget, jobId);
+            byte[] serializedBytes =
+                    
ContinuousRefreshHandlerSerializer.INSTANCE.serialize(continuousRefreshHandler);
+
+            // update RefreshHandler to Catalog
+            CatalogMaterializedTable updatedMaterializedTable =
+                    catalogMaterializedTable.copy(
+                            CatalogMaterializedTable.RefreshStatus.ACTIVATED,
+                            continuousRefreshHandler.asSummaryString(),
+                            serializedBytes);
+            List<TableChange> tableChanges = new ArrayList<>();
+            tableChanges.add(
+                    TableChange.modifyRefreshStatus(
+                            CatalogMaterializedTable.RefreshStatus.ACTIVATED));
+            tableChanges.add(
+                    TableChange.modifyRefreshHandler(
+                            continuousRefreshHandler.asSummaryString(), 
serializedBytes));
+
+            AlterMaterializedTableChangeOperation 
alterMaterializedTableChangeOperation =
+                    new AlterMaterializedTableChangeOperation(
+                            materializedTableIdentifier, tableChanges, 
updatedMaterializedTable);
+            operationExecutor.callExecutableOperation(
+                    handle, alterMaterializedTableChangeOperation);
+        } catch (Exception e) {
+            // drop materialized table while submit flink streaming job occur 
exception. Thus, weak
+            // atomicity is guaranteed
+            operationExecutor.callExecutableOperation(
+                    handle,
+                    new 
DropMaterializedTableOperation(materializedTableIdentifier, true, false));
+            // log and throw exception
+            LOG.error(
+                    "Submit continuous refresh job for materialized table {} 
occur exception.",
+                    materializedTableIdentifier,
+                    e);
+            throw new TableException(
+                    String.format(
+                            "Submit continuous refresh job for materialized 
table %s occur exception.",
+                            materializedTableIdentifier),
+                    e);
+        }
+    }
+
+    private static List<RowData> fetchAllResults(ResultFetcher resultFetcher) {
+        Long token = 0L;
+        List<RowData> results = new ArrayList<>();
+        while (token != null) {
+            ResultSet result = resultFetcher.fetchResults(token, 
Integer.MAX_VALUE);
+            results.addAll(result.getData());
+            token = result.getNextToken();
+        }
+        return results;
+    }
+}
diff --git 
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java
 
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java
index c50ba8c2bbf..ddd0f930155 100644
--- 
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java
+++ 
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java
@@ -65,6 +65,7 @@ import 
org.apache.flink.table.gateway.api.results.FunctionInfo;
 import org.apache.flink.table.gateway.api.results.TableInfo;
 import 
org.apache.flink.table.gateway.environment.SqlGatewayStreamExecutionEnvironment;
 import org.apache.flink.table.gateway.service.context.SessionContext;
+import 
org.apache.flink.table.gateway.service.materializedtable.MaterializedTableManager;
 import org.apache.flink.table.gateway.service.result.ResultFetcher;
 import org.apache.flink.table.gateway.service.utils.SqlExecutionException;
 import org.apache.flink.table.module.ModuleManager;
@@ -96,6 +97,7 @@ import 
org.apache.flink.table.operations.ddl.CreateCatalogFunctionOperation;
 import org.apache.flink.table.operations.ddl.CreateOperation;
 import org.apache.flink.table.operations.ddl.CreateTempSystemFunctionOperation;
 import org.apache.flink.table.operations.ddl.DropOperation;
+import 
org.apache.flink.table.operations.materializedtable.MaterializedTableOperation;
 import org.apache.flink.table.resource.ResourceManager;
 import org.apache.flink.table.utils.DateTimeUtils;
 import org.apache.flink.util.CollectionUtil;
@@ -193,9 +195,14 @@ public class OperationExecutor {
     }
 
     public ResultFetcher executeStatement(OperationHandle handle, String 
statement) {
+        return executeStatement(handle, new Configuration(), statement);
+    }
+
+    public ResultFetcher executeStatement(
+            OperationHandle handle, Configuration customConfig, String 
statement) {
         // Instantiate the TableEnvironment lazily
         ResourceManager resourceManager = 
sessionContext.getSessionState().resourceManager.copy();
-        TableEnvironmentInternal tableEnv = 
getTableEnvironment(resourceManager);
+        TableEnvironmentInternal tableEnv = 
getTableEnvironment(resourceManager, customConfig);
         PlanCacheManager planCacheManager = 
sessionContext.getPlanCacheManager();
         CachedPlan cachedPlan = null;
         Operation op = null;
@@ -344,13 +351,16 @@ public class OperationExecutor {
     // 
--------------------------------------------------------------------------------------------
 
     public TableEnvironmentInternal getTableEnvironment() {
-        return 
getTableEnvironment(sessionContext.getSessionState().resourceManager);
+        return getTableEnvironment(
+                sessionContext.getSessionState().resourceManager, new 
Configuration());
     }
 
-    public TableEnvironmentInternal getTableEnvironment(ResourceManager 
resourceManager) {
+    public TableEnvironmentInternal getTableEnvironment(
+            ResourceManager resourceManager, Configuration customConfig) {
         // checks the value of RUNTIME_MODE
         Configuration operationConfig = 
sessionContext.getSessionConf().clone();
         operationConfig.addAll(executionConfig);
+        operationConfig.addAll(customConfig);
         final EnvironmentSettings settings =
                 
EnvironmentSettings.newInstance().withConfiguration(operationConfig).build();
 
@@ -492,12 +502,15 @@ public class OperationExecutor {
                 || op instanceof CreateCatalogFunctionOperation
                 || op instanceof ShowFunctionsOperation) {
             return callExecutableOperation(handle, (ExecutableOperation) op);
+        } else if (op instanceof MaterializedTableOperation) {
+            return MaterializedTableManager.callMaterializedTableOperation(
+                    this, handle, (MaterializedTableOperation) op, statement);
         } else {
             return callOperation(tableEnv, handle, op);
         }
     }
 
-    private ResultFetcher callExecutableOperation(OperationHandle handle, 
ExecutableOperation op) {
+    public ResultFetcher callExecutableOperation(OperationHandle handle, 
ExecutableOperation op) {
         TableResultInternal result =
                 op.execute(
                         new ExecutableOperationContextImpl(
@@ -521,6 +534,10 @@ public class OperationExecutor {
         return tableConfig;
     }
 
+    public SessionContext getSessionContext() {
+        return sessionContext;
+    }
+
     private ResultFetcher callSetOperation(
             TableEnvironmentInternal tableEnv, OperationHandle handle, 
SetOperation setOp) {
         if (setOp.getKey().isPresent() && setOp.getValue().isPresent()) {
diff --git 
a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java
 
b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java
new file mode 100644
index 00000000000..29ab697f384
--- /dev/null
+++ 
b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java
@@ -0,0 +1,274 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.gateway.service;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.catalog.CatalogMaterializedTable;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.catalog.ResolvedCatalogMaterializedTable;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.gateway.api.operation.OperationHandle;
+import org.apache.flink.table.gateway.api.session.SessionEnvironment;
+import org.apache.flink.table.gateway.api.session.SessionHandle;
+import org.apache.flink.table.gateway.api.utils.MockedEndpointVersion;
+import org.apache.flink.table.gateway.service.utils.IgnoreExceptionHandler;
+import org.apache.flink.table.gateway.service.utils.SqlExecutionException;
+import org.apache.flink.table.gateway.service.utils.SqlGatewayServiceExtension;
+import org.apache.flink.test.junit5.MiniClusterExtension;
+import org.apache.flink.testutils.executor.TestExecutorExtension;
+import org.apache.flink.util.concurrent.ExecutorThreadFactory;
+
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Order;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static 
org.apache.flink.table.catalog.CommonCatalogOptions.TABLE_CATALOG_STORE_KIND;
+import static 
org.apache.flink.table.gateway.service.utils.SqlGatewayServiceTestUtil.awaitOperationTermination;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/**
+ * ITCase for materialized table related statement via {@link 
SqlGatewayServiceImpl}. Use a separate
+ * test class rather than adding test cases to {@link 
SqlGatewayServiceITCase}, both because the
+ * syntax related to Materialized table is relatively independent, and to try 
to avoid conflicts
+ * with the code in {@link SqlGatewayServiceITCase}.
+ */
+public class MaterializedTableStatementITCase {
+
+    private static final String FILE_CATALOG_STORE = "file_store";
+    private static final String TEST_CATALOG_PREFIX = "test_catalog";
+    private static final String TEST_DEFAULT_DATABASE = "test_db";
+
+    private static final AtomicLong COUNTER = new AtomicLong(0);
+
+    @RegisterExtension
+    @Order(1)
+    static final MiniClusterExtension MINI_CLUSTER =
+            new MiniClusterExtension(
+                    new MiniClusterResourceConfiguration.Builder()
+                            .setNumberTaskManagers(2)
+                            .build());
+
+    @RegisterExtension
+    @Order(2)
+    static final SqlGatewayServiceExtension SQL_GATEWAY_SERVICE_EXTENSION =
+            new 
SqlGatewayServiceExtension(MINI_CLUSTER::getClientConfiguration);
+
+    @RegisterExtension
+    @Order(3)
+    static final TestExecutorExtension<ExecutorService> EXECUTOR_EXTENSION =
+            new TestExecutorExtension<>(
+                    () ->
+                            Executors.newCachedThreadPool(
+                                    new ExecutorThreadFactory(
+                                            "SqlGatewayService Test Pool",
+                                            IgnoreExceptionHandler.INSTANCE)));
+
+    private static SqlGatewayServiceImpl service;
+    private static SessionEnvironment defaultSessionEnvironment;
+    private static Path baseCatalogPath;
+
+    private String fileSystemCatalogPath;
+    private String fileSystemCatalogName;
+
+    @BeforeAll
+    static void setUp(@TempDir Path temporaryFolder) throws Exception {
+        service = (SqlGatewayServiceImpl) 
SQL_GATEWAY_SERVICE_EXTENSION.getService();
+
+        // initialize file catalog store path
+        Path fileCatalogStore = temporaryFolder.resolve(FILE_CATALOG_STORE);
+        Files.createDirectory(fileCatalogStore);
+        Map<String, String> catalogStoreOptions = new HashMap<>();
+        catalogStoreOptions.put(TABLE_CATALOG_STORE_KIND.key(), "file");
+        catalogStoreOptions.put("table.catalog-store.file.path", 
fileCatalogStore.toString());
+
+        // initialize test-filesystem catalog base path
+        baseCatalogPath = temporaryFolder.resolve(TEST_CATALOG_PREFIX);
+        Files.createDirectory(baseCatalogPath);
+
+        defaultSessionEnvironment =
+                SessionEnvironment.newBuilder()
+                        .addSessionConfig(catalogStoreOptions)
+                        .setSessionEndpointVersion(MockedEndpointVersion.V1)
+                        .build();
+    }
+
+    @BeforeEach
+    void before() throws Exception {
+        String randomStr = String.valueOf(COUNTER.incrementAndGet());
+        // initialize test-filesystem catalog path with random uuid
+        Path fileCatalogPath = baseCatalogPath.resolve(randomStr);
+        Files.createDirectory(fileCatalogPath);
+        Path dbPath = fileCatalogPath.resolve(TEST_DEFAULT_DATABASE);
+        Files.createDirectory(dbPath);
+
+        fileSystemCatalogPath = fileCatalogPath.toString();
+        fileSystemCatalogName = TEST_CATALOG_PREFIX + randomStr;
+    }
+
+    @Test
+    void testCreateMaterializedTableInContinuousMode() throws Exception {
+        // initialize session handle, create test-filesystem catalog and 
register it to catalog
+        // store
+        SessionHandle sessionHandle = initializeSession();
+
+        String materializedTableDDL =
+                "CREATE MATERIALIZED TABLE users_shops"
+                        + " PARTITIONED BY (ds)\n"
+                        + " WITH(\n"
+                        + "   'format' = 'debezium-json'\n"
+                        + " )\n"
+                        + " FRESHNESS = INTERVAL '30' SECOND\n"
+                        + " AS SELECT \n"
+                        + "  user_id,\n"
+                        + "  shop_id,\n"
+                        + "  ds,\n"
+                        + "  SUM (payment_amount_cents) AS 
payed_buy_fee_sum,\n"
+                        + "  SUM (1) AS pv\n"
+                        + " FROM (\n"
+                        + "    SELECT user_id, shop_id, 
DATE_FORMAT(order_created_at, 'yyyy-MM-dd') AS ds, payment_amount_cents FROM 
datagenSource"
+                        + " ) AS tmp\n"
+                        + " GROUP BY (user_id, shop_id, ds)";
+        OperationHandle materializedTableHandle =
+                service.executeStatement(
+                        sessionHandle, materializedTableDDL, -1, new 
Configuration());
+        awaitOperationTermination(service, sessionHandle, 
materializedTableHandle);
+
+        // validate materialized table: schema, refresh mode, refresh status, 
refresh handler,
+        // doesn't check the data because it generates randomly.
+        ResolvedCatalogMaterializedTable actualMaterializedTable =
+                (ResolvedCatalogMaterializedTable)
+                        service.getTable(
+                                sessionHandle,
+                                ObjectIdentifier.of(
+                                        fileSystemCatalogName,
+                                        TEST_DEFAULT_DATABASE,
+                                        "users_shops"));
+
+        // Expected schema
+        ResolvedSchema expectedSchema =
+                ResolvedSchema.of(
+                        Arrays.asList(
+                                Column.physical("user_id", DataTypes.BIGINT()),
+                                Column.physical("shop_id", DataTypes.BIGINT()),
+                                Column.physical("ds", DataTypes.STRING()),
+                                Column.physical("payed_buy_fee_sum", 
DataTypes.BIGINT()),
+                                Column.physical("pv", 
DataTypes.INT().notNull())));
+
+        
assertThat(actualMaterializedTable.getResolvedSchema()).isEqualTo(expectedSchema);
+        
assertThat(actualMaterializedTable.getFreshness()).isEqualTo(Duration.ofSeconds(30));
+        assertThat(actualMaterializedTable.getLogicalRefreshMode())
+                
.isEqualTo(CatalogMaterializedTable.LogicalRefreshMode.AUTOMATIC);
+        assertThat(actualMaterializedTable.getRefreshMode())
+                .isEqualTo(CatalogMaterializedTable.RefreshMode.CONTINUOUS);
+        assertThat(actualMaterializedTable.getRefreshStatus())
+                .isEqualTo(CatalogMaterializedTable.RefreshStatus.ACTIVATED);
+        
assertThat(actualMaterializedTable.getRefreshHandlerDescription()).isNotEmpty();
+        
assertThat(actualMaterializedTable.getSerializedRefreshHandler()).isNotEmpty();
+    }
+
+    @Test
+    void testCreateMaterializedTableInFullMode() {
+        // initialize session handle, create test-filesystem catalog and 
register it to catalog
+        // store
+        SessionHandle sessionHandle = initializeSession();
+
+        String materializedTableDDL =
+                "CREATE MATERIALIZED TABLE users_shops"
+                        + " PARTITIONED BY (ds)\n"
+                        + " WITH(\n"
+                        + "   'format' = 'debezium-json'\n"
+                        + " )\n"
+                        + " FRESHNESS = INTERVAL '1' DAY\n"
+                        + " AS SELECT \n"
+                        + "  user_id,\n"
+                        + "  shop_id,\n"
+                        + "  ds,\n"
+                        + "  SUM (payment_amount_cents) AS 
payed_buy_fee_sum,\n"
+                        + "  SUM (1) AS pv\n"
+                        + " FROM (\n"
+                        + "    SELECT user_id, shop_id, 
DATE_FORMAT(order_created_at, 'yyyy-MM-dd') AS ds, payment_amount_cents FROM 
datagenSource"
+                        + " ) AS tmp\n"
+                        + " GROUP BY (user_id, shop_id, ds)";
+        OperationHandle materializedTableHandle =
+                service.executeStatement(
+                        sessionHandle, materializedTableDDL, -1, new 
Configuration());
+
+        assertThatThrownBy(
+                        () ->
+                                awaitOperationTermination(
+                                        service, sessionHandle, 
materializedTableHandle))
+                .rootCause()
+                .isInstanceOf(SqlExecutionException.class)
+                .hasMessage(
+                        "Only support create materialized table in continuous 
refresh mode currently.");
+    }
+
+    private SessionHandle initializeSession() {
+        SessionHandle sessionHandle = 
service.openSession(defaultSessionEnvironment);
+        String catalogDDL =
+                String.format(
+                        "CREATE CATALOG %s\n"
+                                + "WITH (\n"
+                                + "  'type' = 'test-filesystem',\n"
+                                + "  'path' = '%s',\n"
+                                + "  'default-database' = '%s'\n"
+                                + "  )",
+                        fileSystemCatalogName, fileSystemCatalogPath, 
TEST_DEFAULT_DATABASE);
+        service.configureSession(sessionHandle, catalogDDL, -1);
+        service.configureSession(
+                sessionHandle, String.format("USE CATALOG %s", 
fileSystemCatalogName), -1);
+
+        // create source table
+        String dataGenSource =
+                "CREATE TABLE datagenSource (\n"
+                        + "  order_id BIGINT,\n"
+                        + "  order_number VARCHAR(20),\n"
+                        + "  user_id BIGINT,\n"
+                        + "  shop_id BIGINT,\n"
+                        + "  product_id BIGINT,\n"
+                        + "  status BIGINT,\n"
+                        + "  order_type BIGINT,\n"
+                        + "  order_created_at TIMESTAMP,\n"
+                        + "  payment_amount_cents BIGINT\n"
+                        + ")\n"
+                        + "WITH (\n"
+                        + "  'connector' = 'datagen',\n"
+                        + "  'rows-per-second' = '10'\n"
+                        + ")";
+        service.configureSession(sessionHandle, dataGenSource, -1);
+        return sessionHandle;
+    }
+}

Reply via email to