lsyldliu commented on code in PR #24765:
URL: https://github.com/apache/flink/pull/24765#discussion_r1596318366


##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/AlterMaterializedTableResumeOperation.java:
##########
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.operations.materializedtable;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.internal.TableResultInternal;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+
+import java.util.Map;
+
+/** Operation to describe a ALTER MATERIALIZED TABLE ... SUSPEND statement. */
+@Internal
+public class AlterMaterializedTableResumeOperation extends 
AlterMaterializedTableOperation {
+
+    private final Map<String, String> options;

Review Comment:
   It would be better to rename it to `dynamicOptions`.



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/AlterMaterializedTableResumeOperation.java:
##########
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.operations.materializedtable;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.internal.TableResultInternal;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+
+import java.util.Map;
+
+/** Operation to describe a ALTER MATERIALIZED TABLE ... SUSPEND statement. */
+@Internal
+public class AlterMaterializedTableResumeOperation extends 
AlterMaterializedTableOperation {
+
+    private final Map<String, String> options;
+
+    public AlterMaterializedTableResumeOperation(
+            ObjectIdentifier tableIdentifier, Map<String, String> options) {
+        super(tableIdentifier);
+        this.options = options;
+    }
+
+    public Map<String, String> getOptions() {
+        return options;
+    }
+
+    @Override
+    public TableResultInternal execute(Context ctx) {
+        throw new TableException("This method shouldn't be called.");

Review Comment:
   We should throw the following exception:
   ```
   throw new UnsupportedOperationException(
                   "AlterMaterializedTableResumeOperation doesn't support 
ExecutableOperation yet.")
   ```



##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java:
##########
@@ -169,6 +284,75 @@ private static void createMaterializedInContinuousMode(
         }
     }
 
+    private static String stopJobWithSavepoint(
+            OperationExecutor executor,
+            OperationHandle handle,
+            ContinuousRefreshHandler refreshHandler) {
+        ResultFetcher resultFetcher =
+                executor.callStopJobOperation(
+                        executor.getTableEnvironment(),
+                        handle,
+                        new StopJobOperation(refreshHandler.getJobId(), true, 
false));
+        List<RowData> results = fetchAllResults(resultFetcher);
+        return results.get(0).getString(0).toString();
+    }
+
+    private static ContinuousRefreshHandler deserializeContinuousHandler(
+            byte[] serializedRefreshHandler, ClassLoader classLoader) {
+        try {
+            return ContinuousRefreshHandlerSerializer.INSTANCE.deserialize(
+                    serializedRefreshHandler, classLoader);
+        } catch (IOException | ClassNotFoundException e) {
+            throw new TableException("Deserialize ContinuousRefreshHandler 
occur exception.", e);
+        }
+    }
+
+    private static byte[] serializeContinuousHandler(ContinuousRefreshHandler 
refreshHandler) {
+        try {
+            return 
ContinuousRefreshHandlerSerializer.INSTANCE.serialize(refreshHandler);
+        } catch (IOException e) {
+            throw new TableException("Serialize ContinuousRefreshHandler occur 
exception.", e);
+        }
+    }
+
+    private static CatalogMaterializedTable getCatalogMaterializedTable(

Review Comment:
   We should get the materialized table by following way:
   ```
       private static CatalogMaterializedTable getCatalogMaterializedTable(
               OperationExecutor operationExecutor, ObjectIdentifier 
tableIdentifier) {
           ResolvedCatalogBaseTable<?> resolvedCatalogBaseTable =
                   operationExecutor.getTable(tableIdentifier);
           if (CatalogBaseTable.TableKind.MATERIALIZED_TABLE
                   != resolvedCatalogBaseTable.getTableKind()) {
               throw new SqlExecutionException(
                       String.format(
                               "Table %s is not a materialized table, does not 
support materialized table related operation.",
                               tableIdentifier));
           }
   
           return (ResolvedCatalogMaterializedTable) resolvedCatalogBaseTable;
       }
   ```



##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java:
##########
@@ -100,6 +117,115 @@ private static void createMaterializedInContinuousMode(
         CatalogMaterializedTable catalogMaterializedTable =
                 createMaterializedTableOperation.getCatalogMaterializedTable();
 
+        try {
+            executeContinuousRefreshJob(
+                    operationExecutor,
+                    handle,
+                    catalogMaterializedTable,
+                    materializedTableIdentifier,
+                    Collections.emptyMap(),
+                    Optional.empty());
+        } catch (Exception e) {
+            // drop materialized table while submit flink streaming job occur 
exception. Thus, weak
+            // atomicity is guaranteed
+            LOG.warn("Submit continuous refresh job occur exception, drop 
materialized table.", e);
+            operationExecutor.callExecutableOperation(
+                    handle,
+                    new 
DropMaterializedTableOperation(materializedTableIdentifier, true, false));
+            throw e;

Review Comment:
   I think we should unify the exception type to `SqlExecutionException` in SQL 
Gateway, and keep a consistent style. Can you help to unify it?



##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java:
##########
@@ -100,6 +117,115 @@ private static void createMaterializedInContinuousMode(
         CatalogMaterializedTable catalogMaterializedTable =
                 createMaterializedTableOperation.getCatalogMaterializedTable();
 
+        try {
+            executeContinuousRefreshJob(
+                    operationExecutor,
+                    handle,
+                    catalogMaterializedTable,
+                    materializedTableIdentifier,
+                    Collections.emptyMap(),
+                    Optional.empty());
+        } catch (Exception e) {
+            // drop materialized table while submit flink streaming job occur 
exception. Thus, weak
+            // atomicity is guaranteed
+            LOG.warn("Submit continuous refresh job occur exception, drop 
materialized table.", e);
+            operationExecutor.callExecutableOperation(
+                    handle,
+                    new 
DropMaterializedTableOperation(materializedTableIdentifier, true, false));
+            throw e;
+        }
+    }
+
+    private static ResultFetcher callAlterMaterializedTableSuspend(
+            OperationExecutor operationExecutor,
+            OperationHandle handle,
+            AlterMaterializedTableSuspendOperation op) {
+        ObjectIdentifier tableIdentifier = op.getTableIdentifier();
+        CatalogMaterializedTable materializedTable =
+                getCatalogMaterializedTable(operationExecutor, 
tableIdentifier);
+
+        if (materializedTable.getRefreshMode() != 
CatalogMaterializedTable.RefreshMode.CONTINUOUS) {

Review Comment:
   Enumeration constants are better preceded by comparison logic:
   ```
   CatalogMaterializedTable.RefreshMode.CONTINUOUS != 
materializedTable.getRefreshMode()
   ```



##########
flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java:
##########
@@ -271,4 +461,65 @@ private SessionHandle initializeSession() {
         service.configureSession(sessionHandle, dataGenSource, -1);
         return sessionHandle;
     }
+
+    private List<RowData> fetchAllResults(
+            SessionHandle sessionHandle, OperationHandle operationHandle) {
+        Long token = 0L;
+        List<RowData> results = new ArrayList<>();
+        while (token != null) {
+            ResultSet result =
+                    service.fetchResults(sessionHandle, operationHandle, 
token, Integer.MAX_VALUE);
+            results.addAll(result.getData());
+            token = result.getNextToken();
+        }
+        return results;
+    }
+
+    private CheckpointConfigInfo getCheckpointConfigInfo(
+            ClusterClient<?> clusterClient, String jobId) throws Exception {
+        Configuration configuration = new Configuration();
+        final RestClient restClient =
+                new RestClient(

Review Comment:
   I think we should use `TestingRestClient` directly, please refer to 
https://github.com/apache/flink/blob/4fe66e0697471105e0f0a3f8519bb0c0ac559709/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/rest/SqlGatewayRestEndpointITCase.java#L80



##########
flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java:
##########
@@ -236,6 +282,150 @@ void testCreateMaterializedTableInFullMode() {
                         "Only support create materialized table in continuous 
refresh mode currently.");
     }
 
+    @Test
+    void testAlterMaterializedTableSuspendAndResume(
+            @TempDir Path temporaryPath,
+            @InjectClusterClient RestClusterClient<?> restClusterClient)
+            throws Exception {
+        // initialize session handle, create test-filesystem catalog and 
register it to catalog
+        // store
+        SessionHandle sessionHandle = initializeSession();
+
+        String materializedTableDDL =
+                "CREATE MATERIALIZED TABLE users_shops"
+                        + " PARTITIONED BY (ds)\n"
+                        + " WITH(\n"
+                        + "   'format' = 'debezium-json'\n"
+                        + " )\n"
+                        + " FRESHNESS = INTERVAL '30' SECOND\n"
+                        + " AS SELECT \n"
+                        + "  user_id,\n"
+                        + "  shop_id,\n"
+                        + "  ds,\n"
+                        + "  SUM (payment_amount_cents) AS 
payed_buy_fee_sum,\n"
+                        + "  SUM (1) AS pv\n"
+                        + " FROM (\n"
+                        + "    SELECT user_id, shop_id, 
DATE_FORMAT(order_created_at, 'yyyy-MM-dd') AS ds, payment_amount_cents FROM 
datagenSource"
+                        + " ) AS tmp\n"
+                        + " GROUP BY (user_id, shop_id, ds)";
+
+        OperationHandle materializedTableHandle =
+                service.executeStatement(
+                        sessionHandle, materializedTableDDL, -1, new 
Configuration());
+        awaitOperationTermination(service, sessionHandle, 
materializedTableHandle);
+
+        // set up savepoint dir
+        String savepointDir = temporaryPath.toString();
+        String alterMaterializedTableSavepointDDL =
+                String.format("SET 'state.savepoints.dir' = 'file://%s'", 
savepointDir);

Review Comment:
   I think we don't need to add the prefix `file:`?



##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java:
##########
@@ -100,6 +117,115 @@ private static void createMaterializedInContinuousMode(
         CatalogMaterializedTable catalogMaterializedTable =
                 createMaterializedTableOperation.getCatalogMaterializedTable();
 
+        try {
+            executeContinuousRefreshJob(
+                    operationExecutor,
+                    handle,
+                    catalogMaterializedTable,
+                    materializedTableIdentifier,
+                    Collections.emptyMap(),
+                    Optional.empty());
+        } catch (Exception e) {
+            // drop materialized table while submit flink streaming job occur 
exception. Thus, weak
+            // atomicity is guaranteed
+            LOG.warn("Submit continuous refresh job occur exception, drop 
materialized table.", e);
+            operationExecutor.callExecutableOperation(
+                    handle,
+                    new 
DropMaterializedTableOperation(materializedTableIdentifier, true, false));
+            throw e;
+        }
+    }
+
+    private static ResultFetcher callAlterMaterializedTableSuspend(
+            OperationExecutor operationExecutor,
+            OperationHandle handle,
+            AlterMaterializedTableSuspendOperation op) {
+        ObjectIdentifier tableIdentifier = op.getTableIdentifier();
+        CatalogMaterializedTable materializedTable =
+                getCatalogMaterializedTable(operationExecutor, 
tableIdentifier);
+
+        if (materializedTable.getRefreshMode() != 
CatalogMaterializedTable.RefreshMode.CONTINUOUS) {
+            throw new TableException("Only support suspend continuous refresh 
job currently.");
+        }
+
+        ContinuousRefreshHandler refreshHandler =
+                deserializeContinuousHandler(
+                        materializedTable.getSerializedRefreshHandler(),
+                        
operationExecutor.getSessionContext().getUserClassloader());
+
+        String savepointPath = stopJobWithSavepoint(operationExecutor, handle, 
refreshHandler);
+
+        ContinuousRefreshHandler updateRefreshHandler =
+                new ContinuousRefreshHandler(
+                        refreshHandler.getExecutionTarget(),
+                        refreshHandler.getJobId(),
+                        savepointPath);
+
+        CatalogMaterializedTable updatedMaterializedTable =
+                materializedTable.copy(
+                        CatalogMaterializedTable.RefreshStatus.SUSPENDED,
+                        
materializedTable.getRefreshHandlerDescription().orElse(null),
+                        serializeContinuousHandler(updateRefreshHandler));
+        AlterMaterializedTableChangeOperation 
alterMaterializedTableChangeOperation =
+                new AlterMaterializedTableChangeOperation(
+                        tableIdentifier, Collections.emptyList(), 
updatedMaterializedTable);
+
+        operationExecutor.callExecutableOperation(handle, 
alterMaterializedTableChangeOperation);
+
+        return ResultFetcher.fromTableResult(handle, TABLE_RESULT_OK, false);
+    }
+
+    private static ResultFetcher callAlterMaterializedTableResume(
+            OperationExecutor operationExecutor,
+            OperationHandle handle,
+            AlterMaterializedTableResumeOperation op) {
+        ObjectIdentifier tableIdentifier = op.getTableIdentifier();
+        CatalogMaterializedTable catalogMaterializedTable =
+                getCatalogMaterializedTable(operationExecutor, 
tableIdentifier);
+
+        if (catalogMaterializedTable.getRefreshMode()

Review Comment:
   ```
           if (CatalogMaterializedTable.RefreshMode.CONTINUOUS
                   != catalogMaterializedTable.getRefreshMode()) {
               throw new SqlExecutionException(
                       "Only support resume materialized table in continuous 
mode currently.");
           }
   ```



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/AlterMaterializedTableResumeOperation.java:
##########
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.operations.materializedtable;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.internal.TableResultInternal;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+
+import java.util.Map;
+
+/** Operation to describe a ALTER MATERIALIZED TABLE ... SUSPEND statement. */
+@Internal
+public class AlterMaterializedTableResumeOperation extends 
AlterMaterializedTableOperation {
+
+    private final Map<String, String> options;
+
+    public AlterMaterializedTableResumeOperation(
+            ObjectIdentifier tableIdentifier, Map<String, String> options) {
+        super(tableIdentifier);
+        this.options = options;
+    }
+
+    public Map<String, String> getOptions() {
+        return options;
+    }
+
+    @Override
+    public TableResultInternal execute(Context ctx) {
+        throw new TableException("This method shouldn't be called.");
+    }
+
+    @Override
+    public String asSummaryString() {
+        return String.format(
+                "ALTER MATERIALIZED TABLE %s RESUME", 
tableIdentifier.asSummaryString());

Review Comment:
   The `dynamicOptions` also should be one part of the summary string.
   ```
           StringBuilder builder =
                   new StringBuilder(
                           String.format(
                                   "ALTER MATERIALIZED TABLE %s RESUME",
                                   tableIdentifier.asSummaryString()));
           if (!dynamicOptions.isEmpty()) {
               builder.append(
                       String.format(" WITH (%s)", 
OperationUtils.formatProperties(dynamicOptions)));
           }
           return builder.toString();
   ```



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/refresh/ContinuousRefreshHandler.java:
##########
@@ -43,6 +55,10 @@ public String getJobId() {
         return jobId;
     }
 
+    public Optional<String> getRestorePath() {
+        return Optional.ofNullable(restorePath);
+    }
+
     @Override
     public String asSummaryString() {
         return String.format("{\n executionTarget: %s,\n jobId: %s\n}", 
executionTarget, jobId);

Review Comment:
   I think we also need to print the `restorePath` if it exists, WDYT?



##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java:
##########
@@ -169,6 +284,75 @@ private static void createMaterializedInContinuousMode(
         }
     }
 
+    private static String stopJobWithSavepoint(
+            OperationExecutor executor,
+            OperationHandle handle,
+            ContinuousRefreshHandler refreshHandler) {
+        ResultFetcher resultFetcher =
+                executor.callStopJobOperation(
+                        executor.getTableEnvironment(),
+                        handle,
+                        new StopJobOperation(refreshHandler.getJobId(), true, 
false));
+        List<RowData> results = fetchAllResults(resultFetcher);
+        return results.get(0).getString(0).toString();
+    }
+
+    private static ContinuousRefreshHandler deserializeContinuousHandler(
+            byte[] serializedRefreshHandler, ClassLoader classLoader) {
+        try {
+            return ContinuousRefreshHandlerSerializer.INSTANCE.deserialize(
+                    serializedRefreshHandler, classLoader);
+        } catch (IOException | ClassNotFoundException e) {
+            throw new TableException("Deserialize ContinuousRefreshHandler 
occur exception.", e);
+        }
+    }
+
+    private static byte[] serializeContinuousHandler(ContinuousRefreshHandler 
refreshHandler) {
+        try {
+            return 
ContinuousRefreshHandlerSerializer.INSTANCE.serialize(refreshHandler);
+        } catch (IOException e) {
+            throw new TableException("Serialize ContinuousRefreshHandler occur 
exception.", e);
+        }
+    }
+
+    private static CatalogMaterializedTable getCatalogMaterializedTable(
+            OperationExecutor operationExecutor, ObjectIdentifier 
tableIdentifier) {
+        ContextResolvedTable contextResolvedTable =
+                operationExecutor
+                        .getSessionContext()
+                        .getSessionState()
+                        .catalogManager
+                        .getTableOrError(tableIdentifier);
+        CatalogBaseTable.TableKind tableKind =
+                contextResolvedTable.getResolvedTable().getTableKind();
+        if (tableKind != CatalogBaseTable.TableKind.MATERIALIZED_TABLE) {
+            throw new TableException(
+                    String.format(
+                            "Table %s is not a materialized table, does not 
support materialized table related operations",
+                            tableIdentifier));
+        }
+
+        return (CatalogMaterializedTable) 
contextResolvedTable.getResolvedTable().getOrigin();
+    }
+
+    private static String generateInsertStatement(
+            CatalogMaterializedTable catalogMaterializedTable,
+            ObjectIdentifier objectIdentifier,
+            Map<String, String> hints) {

Review Comment:
   I think it would be better to call it `dynamicOptions`. Moreover, I think we 
can extract this as a util method, then we can test it separately. 
   ```
       private static String generateInsertStatement(
               ObjectIdentifier materializedTableIdentifier,
               String definitionQuery,
               Map<String, String> dynamicOptions) {
           StringBuilder builder =
                   new StringBuilder(
                           String.format(
                                   "INSERT INTO %s", 
materializedTableIdentifier.asSerializableString()));
   
           if (!dynamicOptions.isEmpty()) {
               String hints =
                       dynamicOptions.entrySet().stream()
                               .map(e -> String.format("'%s'='%s'", e.getKey(), 
e.getValue()))
                               .collect(Collectors.joining(", "));
               builder.append(String.format(" /*+ OPTIONS(%s) */", hints));
           }
   
           builder.append("\n").append(definitionQuery);
           return builder.toString();
       }
   ```



##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/materializedtable/AlterMaterializedTableSuspendOperation.java:
##########
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.operations.materializedtable;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.internal.TableResultInternal;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+
+/** Operation to describe a ALTER MATERIALIZED TABLE ... SUSPEND statement. */
+@Internal
+public class AlterMaterializedTableSuspendOperation extends 
AlterMaterializedTableOperation {
+
+    public AlterMaterializedTableSuspendOperation(ObjectIdentifier 
tableIdentifier) {
+        super(tableIdentifier);
+    }
+
+    @Override
+    public TableResultInternal execute(Context ctx) {
+        throw new TableException("This method shouldn't be called.");

Review Comment:
   ditto



##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java:
##########
@@ -100,6 +117,115 @@ private static void createMaterializedInContinuousMode(
         CatalogMaterializedTable catalogMaterializedTable =
                 createMaterializedTableOperation.getCatalogMaterializedTable();
 
+        try {
+            executeContinuousRefreshJob(
+                    operationExecutor,
+                    handle,
+                    catalogMaterializedTable,
+                    materializedTableIdentifier,
+                    Collections.emptyMap(),
+                    Optional.empty());
+        } catch (Exception e) {
+            // drop materialized table while submit flink streaming job occur 
exception. Thus, weak
+            // atomicity is guaranteed
+            LOG.warn("Submit continuous refresh job occur exception, drop 
materialized table.", e);

Review Comment:
   ```suggestion
                           LOG.warn(
                       "Submit continuous refresh job occur exception, drop 
materialized table {}.",
                       materializedTableIdentifier,
                       e);
   ```



##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java:
##########
@@ -169,6 +284,75 @@ private static void createMaterializedInContinuousMode(
         }
     }
 
+    private static String stopJobWithSavepoint(
+            OperationExecutor executor,
+            OperationHandle handle,
+            ContinuousRefreshHandler refreshHandler) {
+        ResultFetcher resultFetcher =
+                executor.callStopJobOperation(

Review Comment:
   I think the savepoint path must be configured in session conf, so here we 
need to check it. If it is not specified, we should throw an exception to 
remind.



##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java:
##########
@@ -100,6 +117,115 @@ private static void createMaterializedInContinuousMode(
         CatalogMaterializedTable catalogMaterializedTable =
                 createMaterializedTableOperation.getCatalogMaterializedTable();
 
+        try {
+            executeContinuousRefreshJob(
+                    operationExecutor,
+                    handle,
+                    catalogMaterializedTable,
+                    materializedTableIdentifier,
+                    Collections.emptyMap(),
+                    Optional.empty());
+        } catch (Exception e) {
+            // drop materialized table while submit flink streaming job occur 
exception. Thus, weak
+            // atomicity is guaranteed
+            LOG.warn("Submit continuous refresh job occur exception, drop 
materialized table.", e);
+            operationExecutor.callExecutableOperation(
+                    handle,
+                    new 
DropMaterializedTableOperation(materializedTableIdentifier, true, false));
+            throw e;

Review Comment:
   I think we should unify the exception type to `SqlExecutionException` in SQL 
Gateway, and keep a consistent style. Can you help to unify it?



##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java:
##########
@@ -100,6 +117,115 @@ private static void createMaterializedInContinuousMode(
         CatalogMaterializedTable catalogMaterializedTable =
                 createMaterializedTableOperation.getCatalogMaterializedTable();
 
+        try {
+            executeContinuousRefreshJob(
+                    operationExecutor,
+                    handle,
+                    catalogMaterializedTable,
+                    materializedTableIdentifier,
+                    Collections.emptyMap(),
+                    Optional.empty());
+        } catch (Exception e) {
+            // drop materialized table while submit flink streaming job occur 
exception. Thus, weak
+            // atomicity is guaranteed
+            LOG.warn("Submit continuous refresh job occur exception, drop 
materialized table.", e);
+            operationExecutor.callExecutableOperation(
+                    handle,
+                    new 
DropMaterializedTableOperation(materializedTableIdentifier, true, false));
+            throw e;
+        }
+    }
+
+    private static ResultFetcher callAlterMaterializedTableSuspend(
+            OperationExecutor operationExecutor,
+            OperationHandle handle,
+            AlterMaterializedTableSuspendOperation op) {
+        ObjectIdentifier tableIdentifier = op.getTableIdentifier();
+        CatalogMaterializedTable materializedTable =
+                getCatalogMaterializedTable(operationExecutor, 
tableIdentifier);
+
+        if (materializedTable.getRefreshMode() != 
CatalogMaterializedTable.RefreshMode.CONTINUOUS) {
+            throw new TableException("Only support suspend continuous refresh 
job currently.");

Review Comment:
   ```suggestion
               throw new SqlExecutionException(
                       "Only support suspend materialized table in continuous 
refresh mode currently.");
   ```



##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java:
##########
@@ -169,6 +284,75 @@ private static void createMaterializedInContinuousMode(
         }
     }
 
+    private static String stopJobWithSavepoint(
+            OperationExecutor executor,
+            OperationHandle handle,
+            ContinuousRefreshHandler refreshHandler) {
+        ResultFetcher resultFetcher =
+                executor.callStopJobOperation(
+                        executor.getTableEnvironment(),
+                        handle,
+                        new StopJobOperation(refreshHandler.getJobId(), true, 
false));
+        List<RowData> results = fetchAllResults(resultFetcher);
+        return results.get(0).getString(0).toString();
+    }
+
+    private static ContinuousRefreshHandler deserializeContinuousHandler(
+            byte[] serializedRefreshHandler, ClassLoader classLoader) {
+        try {
+            return ContinuousRefreshHandlerSerializer.INSTANCE.deserialize(
+                    serializedRefreshHandler, classLoader);
+        } catch (IOException | ClassNotFoundException e) {
+            throw new TableException("Deserialize ContinuousRefreshHandler 
occur exception.", e);

Review Comment:
   ```suggestion
                           throw new SqlExecutionException(
                       String.format(
                               "Deserialize ContinuousRefreshHandler for 
materialized table %s occur exception.",
                               materializedTableIdentifier),
                       e);
   ```



##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java:
##########
@@ -100,6 +117,115 @@ private static void createMaterializedInContinuousMode(
         CatalogMaterializedTable catalogMaterializedTable =
                 createMaterializedTableOperation.getCatalogMaterializedTable();
 
+        try {
+            executeContinuousRefreshJob(
+                    operationExecutor,
+                    handle,
+                    catalogMaterializedTable,
+                    materializedTableIdentifier,
+                    Collections.emptyMap(),
+                    Optional.empty());
+        } catch (Exception e) {
+            // drop materialized table while submit flink streaming job occur 
exception. Thus, weak
+            // atomicity is guaranteed
+            LOG.warn("Submit continuous refresh job occur exception, drop 
materialized table.", e);
+            operationExecutor.callExecutableOperation(
+                    handle,
+                    new 
DropMaterializedTableOperation(materializedTableIdentifier, true, false));
+            throw e;
+        }
+    }
+
+    private static ResultFetcher callAlterMaterializedTableSuspend(
+            OperationExecutor operationExecutor,
+            OperationHandle handle,
+            AlterMaterializedTableSuspendOperation op) {
+        ObjectIdentifier tableIdentifier = op.getTableIdentifier();
+        CatalogMaterializedTable materializedTable =
+                getCatalogMaterializedTable(operationExecutor, 
tableIdentifier);
+
+        if (materializedTable.getRefreshMode() != 
CatalogMaterializedTable.RefreshMode.CONTINUOUS) {
+            throw new TableException("Only support suspend continuous refresh 
job currently.");
+        }
+
+        ContinuousRefreshHandler refreshHandler =
+                deserializeContinuousHandler(
+                        materializedTable.getSerializedRefreshHandler(),
+                        
operationExecutor.getSessionContext().getUserClassloader());
+
+        String savepointPath = stopJobWithSavepoint(operationExecutor, handle, 
refreshHandler);
+
+        ContinuousRefreshHandler updateRefreshHandler =
+                new ContinuousRefreshHandler(
+                        refreshHandler.getExecutionTarget(),
+                        refreshHandler.getJobId(),
+                        savepointPath);
+
+        CatalogMaterializedTable updatedMaterializedTable =
+                materializedTable.copy(
+                        CatalogMaterializedTable.RefreshStatus.SUSPENDED,
+                        
materializedTable.getRefreshHandlerDescription().orElse(null),
+                        serializeContinuousHandler(updateRefreshHandler));
+        AlterMaterializedTableChangeOperation 
alterMaterializedTableChangeOperation =
+                new AlterMaterializedTableChangeOperation(
+                        tableIdentifier, Collections.emptyList(), 
updatedMaterializedTable);
+
+        operationExecutor.callExecutableOperation(handle, 
alterMaterializedTableChangeOperation);
+
+        return ResultFetcher.fromTableResult(handle, TABLE_RESULT_OK, false);
+    }
+
+    private static ResultFetcher callAlterMaterializedTableResume(
+            OperationExecutor operationExecutor,
+            OperationHandle handle,
+            AlterMaterializedTableResumeOperation op) {
+        ObjectIdentifier tableIdentifier = op.getTableIdentifier();
+        CatalogMaterializedTable catalogMaterializedTable =
+                getCatalogMaterializedTable(operationExecutor, 
tableIdentifier);
+
+        if (catalogMaterializedTable.getRefreshMode()
+                != CatalogMaterializedTable.RefreshMode.CONTINUOUS) {
+            throw new TableException("Only support resume continuous refresh 
job currently.");
+        }
+
+        CatalogMaterializedTable updatedMaterializedTable =
+                catalogMaterializedTable.copy(
+                        CatalogMaterializedTable.RefreshStatus.INITIALIZING,
+                        
catalogMaterializedTable.getRefreshHandlerDescription().orElse(null),
+                        
catalogMaterializedTable.getSerializedRefreshHandler());
+
+        AlterMaterializedTableChangeOperation 
alterMaterializedTableChangeOperation =
+                new AlterMaterializedTableChangeOperation(
+                        tableIdentifier,
+                        Collections.singletonList(
+                                TableChange.modifyRefreshStatus(
+                                        
CatalogMaterializedTable.RefreshStatus.INITIALIZING)),
+                        updatedMaterializedTable);
+        operationExecutor.callExecutableOperation(handle, 
alterMaterializedTableChangeOperation);
+
+        ContinuousRefreshHandler continuousRefreshHandler =
+                deserializeContinuousHandler(
+                        updatedMaterializedTable.getSerializedRefreshHandler(),
+                        
operationExecutor.getSessionContext().getUserClassloader());
+        Optional<String> restorePath = 
continuousRefreshHandler.getRestorePath();
+        executeContinuousRefreshJob(
+                operationExecutor,
+                handle,
+                updatedMaterializedTable,
+                tableIdentifier,
+                op.getOptions(),
+                restorePath);
+
+        return ResultFetcher.fromTableResult(handle, TABLE_RESULT_OK, false);
+    }
+
+    private static void executeContinuousRefreshJob(
+            OperationExecutor operationExecutor,
+            OperationHandle handle,
+            CatalogMaterializedTable catalogMaterializedTable,
+            ObjectIdentifier materializedTableIdentifier,
+            Map<String, String> hints,

Review Comment:
   dynamicOptions



##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java:
##########
@@ -100,6 +117,115 @@ private static void createMaterializedInContinuousMode(
         CatalogMaterializedTable catalogMaterializedTable =
                 createMaterializedTableOperation.getCatalogMaterializedTable();
 
+        try {
+            executeContinuousRefreshJob(
+                    operationExecutor,
+                    handle,
+                    catalogMaterializedTable,
+                    materializedTableIdentifier,
+                    Collections.emptyMap(),
+                    Optional.empty());
+        } catch (Exception e) {
+            // drop materialized table while submit flink streaming job occur 
exception. Thus, weak
+            // atomicity is guaranteed
+            LOG.warn("Submit continuous refresh job occur exception, drop 
materialized table.", e);
+            operationExecutor.callExecutableOperation(
+                    handle,
+                    new 
DropMaterializedTableOperation(materializedTableIdentifier, true, false));
+            throw e;
+        }
+    }
+
+    private static ResultFetcher callAlterMaterializedTableSuspend(
+            OperationExecutor operationExecutor,
+            OperationHandle handle,
+            AlterMaterializedTableSuspendOperation op) {
+        ObjectIdentifier tableIdentifier = op.getTableIdentifier();
+        CatalogMaterializedTable materializedTable =
+                getCatalogMaterializedTable(operationExecutor, 
tableIdentifier);
+
+        if (materializedTable.getRefreshMode() != 
CatalogMaterializedTable.RefreshMode.CONTINUOUS) {
+            throw new TableException("Only support suspend continuous refresh 
job currently.");
+        }
+
+        ContinuousRefreshHandler refreshHandler =
+                deserializeContinuousHandler(
+                        materializedTable.getSerializedRefreshHandler(),
+                        
operationExecutor.getSessionContext().getUserClassloader());
+
+        String savepointPath = stopJobWithSavepoint(operationExecutor, handle, 
refreshHandler);
+
+        ContinuousRefreshHandler updateRefreshHandler =
+                new ContinuousRefreshHandler(
+                        refreshHandler.getExecutionTarget(),
+                        refreshHandler.getJobId(),
+                        savepointPath);
+
+        CatalogMaterializedTable updatedMaterializedTable =
+                materializedTable.copy(
+                        CatalogMaterializedTable.RefreshStatus.SUSPENDED,
+                        
materializedTable.getRefreshHandlerDescription().orElse(null),
+                        serializeContinuousHandler(updateRefreshHandler));
+        AlterMaterializedTableChangeOperation 
alterMaterializedTableChangeOperation =
+                new AlterMaterializedTableChangeOperation(
+                        tableIdentifier, Collections.emptyList(), 
updatedMaterializedTable);
+
+        operationExecutor.callExecutableOperation(handle, 
alterMaterializedTableChangeOperation);
+
+        return ResultFetcher.fromTableResult(handle, TABLE_RESULT_OK, false);
+    }
+
+    private static ResultFetcher callAlterMaterializedTableResume(
+            OperationExecutor operationExecutor,
+            OperationHandle handle,
+            AlterMaterializedTableResumeOperation op) {
+        ObjectIdentifier tableIdentifier = op.getTableIdentifier();
+        CatalogMaterializedTable catalogMaterializedTable =
+                getCatalogMaterializedTable(operationExecutor, 
tableIdentifier);
+
+        if (catalogMaterializedTable.getRefreshMode()
+                != CatalogMaterializedTable.RefreshMode.CONTINUOUS) {
+            throw new TableException("Only support resume continuous refresh 
job currently.");
+        }
+
+        CatalogMaterializedTable updatedMaterializedTable =
+                catalogMaterializedTable.copy(

Review Comment:
   Why do we need to change the change refresh status to `INITIALIZING`? The 
state machine change logic should be: `INITIALIZING` -> `ACTIVATED` -> 
`SUSPENDED` -> `ACTIVATED`.



##########
flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java:
##########
@@ -236,6 +282,150 @@ void testCreateMaterializedTableInFullMode() {
                         "Only support create materialized table in continuous 
refresh mode currently.");
     }
 
+    @Test
+    void testAlterMaterializedTableSuspendAndResume(
+            @TempDir Path temporaryPath,
+            @InjectClusterClient RestClusterClient<?> restClusterClient)
+            throws Exception {
+        // initialize session handle, create test-filesystem catalog and 
register it to catalog
+        // store
+        SessionHandle sessionHandle = initializeSession();
+
+        String materializedTableDDL =
+                "CREATE MATERIALIZED TABLE users_shops"
+                        + " PARTITIONED BY (ds)\n"
+                        + " WITH(\n"
+                        + "   'format' = 'debezium-json'\n"
+                        + " )\n"
+                        + " FRESHNESS = INTERVAL '30' SECOND\n"
+                        + " AS SELECT \n"
+                        + "  user_id,\n"
+                        + "  shop_id,\n"
+                        + "  ds,\n"
+                        + "  SUM (payment_amount_cents) AS 
payed_buy_fee_sum,\n"
+                        + "  SUM (1) AS pv\n"
+                        + " FROM (\n"
+                        + "    SELECT user_id, shop_id, 
DATE_FORMAT(order_created_at, 'yyyy-MM-dd') AS ds, payment_amount_cents FROM 
datagenSource"
+                        + " ) AS tmp\n"
+                        + " GROUP BY (user_id, shop_id, ds)";
+
+        OperationHandle materializedTableHandle =
+                service.executeStatement(
+                        sessionHandle, materializedTableDDL, -1, new 
Configuration());
+        awaitOperationTermination(service, sessionHandle, 
materializedTableHandle);
+
+        // set up savepoint dir
+        String savepointDir = temporaryPath.toString();
+        String alterMaterializedTableSavepointDDL =
+                String.format("SET 'state.savepoints.dir' = 'file://%s'", 
savepointDir);
+        OperationHandle alterMaterializedTableSavepointHandle =
+                service.executeStatement(
+                        sessionHandle, alterMaterializedTableSavepointDDL, -1, 
new Configuration());
+        awaitOperationTermination(service, sessionHandle, 
alterMaterializedTableSavepointHandle);
+
+        ResolvedCatalogMaterializedTable activeMaterializedTable =
+                (ResolvedCatalogMaterializedTable)
+                        service.getTable(
+                                sessionHandle,
+                                ObjectIdentifier.of(
+                                        fileSystemCatalogName,
+                                        TEST_DEFAULT_DATABASE,
+                                        "users_shops"));
+
+        assertThat(activeMaterializedTable.getRefreshStatus())
+                .isEqualTo(CatalogMaterializedTable.RefreshStatus.ACTIVATED);
+
+        ContinuousRefreshHandler activeRefreshHandler =
+                ContinuousRefreshHandlerSerializer.INSTANCE.deserialize(
+                        activeMaterializedTable.getSerializedRefreshHandler(),
+                        getClass().getClassLoader());
+
+        waitUntilAllTasksAreRunning(
+                restClusterClient, 
JobID.fromHexString(activeRefreshHandler.getJobId()));
+
+        // suspend materialized table

Review Comment:
   These code move here would be better:
   ```
           // set up savepoint dir
           String savepointDir = temporaryPath.toString();
           String setSavapointPath =
                   String.format("SET 'state.savepoints.dir' = 'file://%s'", 
savepointDir);
           OperationHandle alterMaterializedTableSavepointHandle =
                   service.executeStatement(
                           sessionHandle, setSavapointPath, -1, new 
Configuration());
           awaitOperationTermination(service, sessionHandle, 
alterMaterializedTableSavepointHandle);
   ```



##########
flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java:
##########
@@ -197,6 +220,29 @@ void testCreateMaterializedTableInContinuousMode() throws 
Exception {
                 .isEqualTo(CatalogMaterializedTable.RefreshStatus.ACTIVATED);
         
assertThat(actualMaterializedTable.getRefreshHandlerDescription()).isNotEmpty();
         
assertThat(actualMaterializedTable.getSerializedRefreshHandler()).isNotEmpty();
+
+        ContinuousRefreshHandler continuousRefreshHandler =
+                ContinuousRefreshHandlerSerializer.INSTANCE.deserialize(
+                        actualMaterializedTable.getSerializedRefreshHandler(),
+                        getClass().getClassLoader());
+        // check the background job is running
+        String describeJobDDL =
+                String.format("DESCRIBE JOB '%s'", 
continuousRefreshHandler.getJobId());
+        OperationHandle describeJobHandle =
+                service.executeStatement(sessionHandle, describeJobDDL, -1, 
new Configuration());
+        awaitOperationTermination(service, sessionHandle, describeJobHandle);
+        List<RowData> jobResults = fetchAllResults(sessionHandle, 
describeJobHandle);
+        
assertThat(jobResults.get(0).getString(2).toString()).isEqualTo("RUNNING");
+
+        // get checkpoint config from the materialized table
+        CheckpointConfigInfo checkpointConfigInfo =
+                getCheckpointConfigInfo(clusterClient, 
continuousRefreshHandler.getJobId());

Review Comment:
   Can we get the JSON string directly? It looks a bit strange to convert the 
CheckpointConfigInfo to a json string, and then read the interval from json 
tree.



##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java:
##########
@@ -169,6 +284,75 @@ private static void createMaterializedInContinuousMode(
         }
     }
 
+    private static String stopJobWithSavepoint(
+            OperationExecutor executor,
+            OperationHandle handle,
+            ContinuousRefreshHandler refreshHandler) {
+        ResultFetcher resultFetcher =
+                executor.callStopJobOperation(
+                        executor.getTableEnvironment(),
+                        handle,
+                        new StopJobOperation(refreshHandler.getJobId(), true, 
false));
+        List<RowData> results = fetchAllResults(resultFetcher);
+        return results.get(0).getString(0).toString();
+    }
+
+    private static ContinuousRefreshHandler deserializeContinuousHandler(
+            byte[] serializedRefreshHandler, ClassLoader classLoader) {
+        try {
+            return ContinuousRefreshHandlerSerializer.INSTANCE.deserialize(
+                    serializedRefreshHandler, classLoader);
+        } catch (IOException | ClassNotFoundException e) {
+            throw new TableException("Deserialize ContinuousRefreshHandler 
occur exception.", e);
+        }
+    }
+
+    private static byte[] serializeContinuousHandler(ContinuousRefreshHandler 
refreshHandler) {
+        try {
+            return 
ContinuousRefreshHandlerSerializer.INSTANCE.serialize(refreshHandler);
+        } catch (IOException e) {
+            throw new TableException("Serialize ContinuousRefreshHandler occur 
exception.", e);

Review Comment:
   ```suggestion
                           throw new SqlExecutionException(
                       String.format(
                               "Serialize ContinuousRefreshHandler for 
materialized table %s occur exception.",
                               materializedTableIdentifier),
                       e);
   ```



##########
flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java:
##########
@@ -236,6 +282,150 @@ void testCreateMaterializedTableInFullMode() {
                         "Only support create materialized table in continuous 
refresh mode currently.");
     }
 
+    @Test
+    void testAlterMaterializedTableSuspendAndResume(
+            @TempDir Path temporaryPath,
+            @InjectClusterClient RestClusterClient<?> restClusterClient)
+            throws Exception {
+        // initialize session handle, create test-filesystem catalog and 
register it to catalog
+        // store
+        SessionHandle sessionHandle = initializeSession();
+
+        String materializedTableDDL =
+                "CREATE MATERIALIZED TABLE users_shops"
+                        + " PARTITIONED BY (ds)\n"
+                        + " WITH(\n"
+                        + "   'format' = 'debezium-json'\n"
+                        + " )\n"
+                        + " FRESHNESS = INTERVAL '30' SECOND\n"
+                        + " AS SELECT \n"
+                        + "  user_id,\n"
+                        + "  shop_id,\n"
+                        + "  ds,\n"
+                        + "  SUM (payment_amount_cents) AS 
payed_buy_fee_sum,\n"
+                        + "  SUM (1) AS pv\n"
+                        + " FROM (\n"
+                        + "    SELECT user_id, shop_id, 
DATE_FORMAT(order_created_at, 'yyyy-MM-dd') AS ds, payment_amount_cents FROM 
datagenSource"
+                        + " ) AS tmp\n"
+                        + " GROUP BY (user_id, shop_id, ds)";
+
+        OperationHandle materializedTableHandle =
+                service.executeStatement(
+                        sessionHandle, materializedTableDDL, -1, new 
Configuration());
+        awaitOperationTermination(service, sessionHandle, 
materializedTableHandle);
+
+        // set up savepoint dir
+        String savepointDir = temporaryPath.toString();
+        String alterMaterializedTableSavepointDDL =

Review Comment:
   This is not a materialized table ddl, so this variable name is not suitable.



##########
flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java:
##########
@@ -236,6 +282,150 @@ void testCreateMaterializedTableInFullMode() {
                         "Only support create materialized table in continuous 
refresh mode currently.");
     }
 
+    @Test
+    void testAlterMaterializedTableSuspendAndResume(
+            @TempDir Path temporaryPath,
+            @InjectClusterClient RestClusterClient<?> restClusterClient)
+            throws Exception {
+        // initialize session handle, create test-filesystem catalog and 
register it to catalog
+        // store
+        SessionHandle sessionHandle = initializeSession();
+
+        String materializedTableDDL =
+                "CREATE MATERIALIZED TABLE users_shops"
+                        + " PARTITIONED BY (ds)\n"
+                        + " WITH(\n"
+                        + "   'format' = 'debezium-json'\n"
+                        + " )\n"
+                        + " FRESHNESS = INTERVAL '30' SECOND\n"
+                        + " AS SELECT \n"
+                        + "  user_id,\n"
+                        + "  shop_id,\n"
+                        + "  ds,\n"
+                        + "  SUM (payment_amount_cents) AS 
payed_buy_fee_sum,\n"
+                        + "  SUM (1) AS pv\n"
+                        + " FROM (\n"
+                        + "    SELECT user_id, shop_id, 
DATE_FORMAT(order_created_at, 'yyyy-MM-dd') AS ds, payment_amount_cents FROM 
datagenSource"
+                        + " ) AS tmp\n"
+                        + " GROUP BY (user_id, shop_id, ds)";
+
+        OperationHandle materializedTableHandle =
+                service.executeStatement(
+                        sessionHandle, materializedTableDDL, -1, new 
Configuration());
+        awaitOperationTermination(service, sessionHandle, 
materializedTableHandle);
+
+        // set up savepoint dir
+        String savepointDir = temporaryPath.toString();
+        String alterMaterializedTableSavepointDDL =
+                String.format("SET 'state.savepoints.dir' = 'file://%s'", 
savepointDir);
+        OperationHandle alterMaterializedTableSavepointHandle =
+                service.executeStatement(
+                        sessionHandle, alterMaterializedTableSavepointDDL, -1, 
new Configuration());
+        awaitOperationTermination(service, sessionHandle, 
alterMaterializedTableSavepointHandle);
+
+        ResolvedCatalogMaterializedTable activeMaterializedTable =
+                (ResolvedCatalogMaterializedTable)
+                        service.getTable(
+                                sessionHandle,
+                                ObjectIdentifier.of(
+                                        fileSystemCatalogName,
+                                        TEST_DEFAULT_DATABASE,
+                                        "users_shops"));
+
+        assertThat(activeMaterializedTable.getRefreshStatus())
+                .isEqualTo(CatalogMaterializedTable.RefreshStatus.ACTIVATED);
+
+        ContinuousRefreshHandler activeRefreshHandler =
+                ContinuousRefreshHandlerSerializer.INSTANCE.deserialize(
+                        activeMaterializedTable.getSerializedRefreshHandler(),
+                        getClass().getClassLoader());
+
+        waitUntilAllTasksAreRunning(
+                restClusterClient, 
JobID.fromHexString(activeRefreshHandler.getJobId()));
+
+        // suspend materialized table
+        String alterMaterializedTableSuspendDDL = "ALTER MATERIALIZED TABLE 
users_shops SUSPEND";
+        OperationHandle alterMaterializedTableSuspendHandle =
+                service.executeStatement(
+                        sessionHandle, alterMaterializedTableSuspendDDL, -1, 
new Configuration());
+        awaitOperationTermination(service, sessionHandle, 
alterMaterializedTableSuspendHandle);
+
+        ResolvedCatalogMaterializedTable suspendMaterializedTable =
+                (ResolvedCatalogMaterializedTable)
+                        service.getTable(
+                                sessionHandle,
+                                ObjectIdentifier.of(
+                                        fileSystemCatalogName,
+                                        TEST_DEFAULT_DATABASE,
+                                        "users_shops"));
+
+        assertThat(suspendMaterializedTable.getRefreshStatus())
+                .isEqualTo(CatalogMaterializedTable.RefreshStatus.SUSPENDED);
+
+        // check background job is stopped
+        byte[] refreshHandler = 
suspendMaterializedTable.getSerializedRefreshHandler();
+        ContinuousRefreshHandler suspendRefreshHandler =
+                ContinuousRefreshHandlerSerializer.INSTANCE.deserialize(
+                        refreshHandler, getClass().getClassLoader());
+        String suspendJobId = suspendRefreshHandler.getJobId();
+
+        String describeJobDDL = String.format("DESCRIBE JOB '%s'", 
suspendJobId);
+        OperationHandle describeJobHandle =
+                service.executeStatement(sessionHandle, describeJobDDL, -1, 
new Configuration());
+        awaitOperationTermination(service, sessionHandle, 
alterMaterializedTableSuspendHandle);
+        List<RowData> jobResults = fetchAllResults(sessionHandle, 
describeJobHandle);
+        
assertThat(jobResults.get(0).getString(2).toString()).isEqualTo("FINISHED");
+
+        // check savepoint is created
+        assertThat(suspendRefreshHandler.getRestorePath()).isNotEmpty();
+        String actualSavepointPath = 
suspendRefreshHandler.getRestorePath().get();
+
+        // resume materialized table
+        String alterMaterializedTableResumeDDL =
+                "ALTER MATERIALIZED TABLE users_shops RESUME WITH 
('debezium-json.ignore-parse-errors' = 'true')";
+        OperationHandle alterMaterializedTableResumeHandle =
+                service.executeStatement(
+                        sessionHandle, alterMaterializedTableResumeDDL, -1, 
new Configuration());
+        awaitOperationTermination(service, sessionHandle, 
alterMaterializedTableResumeHandle);
+
+        ResolvedCatalogMaterializedTable resumedCatalogMaterializedTable =
+                (ResolvedCatalogMaterializedTable)
+                        service.getTable(
+                                sessionHandle,
+                                ObjectIdentifier.of(
+                                        fileSystemCatalogName,
+                                        TEST_DEFAULT_DATABASE,
+                                        "users_shops"));
+        assertThat(resumedCatalogMaterializedTable.getOptions())
+                .doesNotContainKey("debezium-json.ignore-parse-errors");
+        assertThat(resumedCatalogMaterializedTable.getRefreshStatus())
+                .isEqualTo(CatalogMaterializedTable.RefreshStatus.ACTIVATED);
+
+        // check background job is running

Review Comment:
   Should call the following method before check?
   ```
           waitUntilAllTasksAreRunning(
                   restClusterClient, 
JobID.fromHexString(activeRefreshHandler.getJobId()));
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to