This is an automated email from the ASF dual-hosted git repository.

ron pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 24040de22af994e624b3fa14dac8b68e2c25a4c8
Author: fengli <ldliu...@163.com>
AuthorDate: Tue Jun 25 20:46:07 2024 +0800

    [FLINK-35691][table] Fix unexpected behavior of repeated suspend and resume 
materialized table
---
 .../MaterializedTableManager.java                  |  55 +++++++
 .../service/MaterializedTableStatementITCase.java  | 181 +++++++++++++++++++++
 2 files changed, 236 insertions(+)

diff --git 
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java
 
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java
index a4cd306db60..293634e66d1 100644
--- 
a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java
+++ 
b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java
@@ -295,6 +295,15 @@ public class MaterializedTableManager {
         CatalogMaterializedTable materializedTable =
                 getCatalogMaterializedTable(operationExecutor, 
tableIdentifier);
 
+        // Initialization phase doesn't support resume operation.
+        if (CatalogMaterializedTable.RefreshStatus.INITIALIZING
+                == materializedTable.getRefreshStatus()) {
+            throw new SqlExecutionException(
+                    String.format(
+                            "Materialized table %s is being initialized and 
does not support suspend operation.",
+                            tableIdentifier));
+        }
+
         if (CatalogMaterializedTable.RefreshMode.CONTINUOUS == 
materializedTable.getRefreshMode()) {
             suspendContinuousRefreshJob(
                     operationExecutor, handle, tableIdentifier, 
materializedTable);
@@ -313,6 +322,14 @@ public class MaterializedTableManager {
             ContinuousRefreshHandler refreshHandler =
                     
deserializeContinuousHandler(materializedTable.getSerializedRefreshHandler());
 
+            if (CatalogMaterializedTable.RefreshStatus.SUSPENDED
+                    == materializedTable.getRefreshStatus()) {
+                throw new SqlExecutionException(
+                        String.format(
+                                "Materialized table %s continuous refresh job 
has been suspended, jobId is %s.",
+                                tableIdentifier, refreshHandler.getJobId()));
+            }
+
             String savepointPath =
                     stopJobWithSavepoint(operationExecutor, handle, 
refreshHandler.getJobId());
 
@@ -344,6 +361,14 @@ public class MaterializedTableManager {
             OperationHandle handle,
             ObjectIdentifier tableIdentifier,
             CatalogMaterializedTable materializedTable) {
+        if (CatalogMaterializedTable.RefreshStatus.SUSPENDED
+                == materializedTable.getRefreshStatus()) {
+            throw new SqlExecutionException(
+                    String.format(
+                            "Materialized table %s refresh workflow has been 
suspended.",
+                            tableIdentifier));
+        }
+
         if (workflowScheduler == null) {
             throw new SqlExecutionException(
                     "The workflow scheduler must be configured when suspending 
materialized table in full refresh mode.");
@@ -384,6 +409,15 @@ public class MaterializedTableManager {
         CatalogMaterializedTable catalogMaterializedTable =
                 getCatalogMaterializedTable(operationExecutor, 
tableIdentifier);
 
+        // Initialization phase doesn't support resume operation.
+        if (CatalogMaterializedTable.RefreshStatus.INITIALIZING
+                == catalogMaterializedTable.getRefreshStatus()) {
+            throw new SqlExecutionException(
+                    String.format(
+                            "Materialized table %s is being initialized and 
does not support resume operation.",
+                            tableIdentifier));
+        }
+
         if (CatalogMaterializedTable.RefreshMode.CONTINUOUS
                 == catalogMaterializedTable.getRefreshMode()) {
             resumeContinuousRefreshJob(
@@ -414,6 +448,18 @@ public class MaterializedTableManager {
                 deserializeContinuousHandler(
                         
catalogMaterializedTable.getSerializedRefreshHandler());
 
+        // Repeated resume continuous refresh job is not supported
+        if (CatalogMaterializedTable.RefreshStatus.ACTIVATED
+                == catalogMaterializedTable.getRefreshStatus()) {
+            JobStatus jobStatus = getJobStatus(operationExecutor, handle, 
refreshHandler);
+            if (!jobStatus.isGloballyTerminalState()) {
+                throw new SqlExecutionException(
+                        String.format(
+                                "Materialized table %s continuous refresh job 
has been resumed, jobId is %s.",
+                                tableIdentifier, refreshHandler.getJobId()));
+            }
+        }
+
         Optional<String> restorePath = refreshHandler.getRestorePath();
         try {
             executeContinuousRefreshJob(
@@ -438,6 +484,15 @@ public class MaterializedTableManager {
             ObjectIdentifier tableIdentifier,
             CatalogMaterializedTable catalogMaterializedTable,
             Map<String, String> dynamicOptions) {
+        // Repeated resume refresh workflow is not supported
+        if (CatalogMaterializedTable.RefreshStatus.ACTIVATED
+                == catalogMaterializedTable.getRefreshStatus()) {
+            throw new SqlExecutionException(
+                    String.format(
+                            "Materialized table %s refresh workflow has been 
resumed.",
+                            tableIdentifier));
+        }
+
         if (workflowScheduler == null) {
             throw new SqlExecutionException(
                     "The workflow scheduler must be configured when resuming 
materialized table in full refresh mode.");
diff --git 
a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java
 
b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java
index f090a6cada7..98ee00d48ee 100644
--- 
a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java
+++ 
b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java
@@ -654,6 +654,114 @@ public class MaterializedTableStatementITCase extends 
AbstractMaterializedTableS
                         "Savepoint directory is not configured, can't stop job 
with savepoint.");
     }
 
+    @Test
+    void 
testAlterMaterializedTableWithRepeatedSuspendAndResumeInContinuousMode(
+            @TempDir Path temporaryPath) throws Exception {
+        String materializedTableDDL =
+                "CREATE MATERIALIZED TABLE users_shops"
+                        + " PARTITIONED BY (ds)\n"
+                        + " WITH(\n"
+                        + "   'format' = 'debezium-json'\n"
+                        + " )\n"
+                        + " FRESHNESS = INTERVAL '30' SECOND\n"
+                        + " AS SELECT \n"
+                        + "  user_id,\n"
+                        + "  shop_id,\n"
+                        + "  ds,\n"
+                        + "  SUM (payment_amount_cents) AS 
payed_buy_fee_sum,\n"
+                        + "  SUM (1) AS pv\n"
+                        + " FROM (\n"
+                        + "    SELECT user_id, shop_id, 
DATE_FORMAT(order_created_at, 'yyyy-MM-dd') AS ds, payment_amount_cents FROM 
datagenSource"
+                        + " ) AS tmp\n"
+                        + " GROUP BY (user_id, shop_id, ds)";
+
+        OperationHandle materializedTableHandle =
+                service.executeStatement(
+                        sessionHandle, materializedTableDDL, -1, new 
Configuration());
+        awaitOperationTermination(service, sessionHandle, 
materializedTableHandle);
+
+        ResolvedCatalogMaterializedTable activeMaterializedTable =
+                (ResolvedCatalogMaterializedTable)
+                        service.getTable(
+                                sessionHandle,
+                                ObjectIdentifier.of(
+                                        fileSystemCatalogName,
+                                        TEST_DEFAULT_DATABASE,
+                                        "users_shops"));
+        waitUntilAllTasksAreRunning(
+                restClusterClient,
+                JobID.fromHexString(
+                        ContinuousRefreshHandlerSerializer.INSTANCE
+                                .deserialize(
+                                        
activeMaterializedTable.getSerializedRefreshHandler(),
+                                        getClass().getClassLoader())
+                                .getJobId()));
+
+        // suspend materialized table
+        String savepointDir = temporaryPath.toString();
+        String alterJobSavepointDDL =
+                String.format(
+                        "SET 'execution.checkpointing.savepoint-dir' = 
'file://%s'", savepointDir);
+        OperationHandle alterMaterializedTableSavepointHandle =
+                service.executeStatement(
+                        sessionHandle, alterJobSavepointDDL, -1, new 
Configuration());
+        awaitOperationTermination(service, sessionHandle, 
alterMaterializedTableSavepointHandle);
+
+        // suspend materialized table
+        String alterMaterializedTableSuspendDDL = "ALTER MATERIALIZED TABLE 
users_shops SUSPEND";
+        OperationHandle alterMaterializedTableSuspendHandle =
+                service.executeStatement(
+                        sessionHandle, alterMaterializedTableSuspendDDL, -1, 
new Configuration());
+        awaitOperationTermination(service, sessionHandle, 
alterMaterializedTableSuspendHandle);
+
+        // verify repeated suspend materialized table
+        OperationHandle repeatedAlterMaterializedTableSuspendHandle =
+                service.executeStatement(
+                        sessionHandle, alterMaterializedTableSuspendDDL, -1, 
new Configuration());
+        assertThatThrownBy(
+                        () ->
+                                awaitOperationTermination(
+                                        service,
+                                        sessionHandle,
+                                        
repeatedAlterMaterializedTableSuspendHandle))
+                .rootCause()
+                .isInstanceOf(SqlExecutionException.class)
+                .hasMessageContaining(
+                        String.format(
+                                "Materialized table %s continuous refresh job 
has been suspended",
+                                ObjectIdentifier.of(
+                                        fileSystemCatalogName,
+                                        TEST_DEFAULT_DATABASE,
+                                        "users_shops")));
+
+        // resume materialized table
+        String alterMaterializedTableResumeDDL = "ALTER MATERIALIZED TABLE 
users_shops RESUME";
+        OperationHandle alterMaterializedTableResumeHandle =
+                service.executeStatement(
+                        sessionHandle, alterMaterializedTableResumeDDL, -1, 
new Configuration());
+        awaitOperationTermination(service, sessionHandle, 
alterMaterializedTableResumeHandle);
+
+        // verify repeated resume materialized table
+        OperationHandle repeatedAlterMaterializedTableResumeHandle =
+                service.executeStatement(
+                        sessionHandle, alterMaterializedTableResumeDDL, -1, 
new Configuration());
+        assertThatThrownBy(
+                        () ->
+                                awaitOperationTermination(
+                                        service,
+                                        sessionHandle,
+                                        
repeatedAlterMaterializedTableResumeHandle))
+                .rootCause()
+                .isInstanceOf(SqlExecutionException.class)
+                .hasMessageContaining(
+                        String.format(
+                                "Materialized table %s continuous refresh job 
has been resumed",
+                                ObjectIdentifier.of(
+                                        fileSystemCatalogName,
+                                        TEST_DEFAULT_DATABASE,
+                                        "users_shops")));
+    }
+
     @Test
     void testAlterMaterializedTableSuspendAndResumeInFullMode() throws 
Exception {
         createAndVerifyCreateMaterializedTableWithData(
@@ -753,6 +861,79 @@ public class MaterializedTableStatementITCase extends 
AbstractMaterializedTableS
                 .containsEntry("debezium-json.ignore-parse-errors", "true");
     }
 
+    @Test
+    void testAlterMaterializedTableWithRepeatedSuspendAndResumeInFullMode() 
throws Exception {
+        createAndVerifyCreateMaterializedTableWithData(
+                "users_shops", Collections.emptyList(), 
Collections.emptyMap(), RefreshMode.FULL);
+
+        ResolvedCatalogMaterializedTable activeMaterializedTable =
+                (ResolvedCatalogMaterializedTable)
+                        service.getTable(
+                                sessionHandle,
+                                ObjectIdentifier.of(
+                                        fileSystemCatalogName,
+                                        TEST_DEFAULT_DATABASE,
+                                        "users_shops"));
+
+        assertThat(activeMaterializedTable.getRefreshStatus())
+                .isEqualTo(CatalogMaterializedTable.RefreshStatus.ACTIVATED);
+
+        // suspend materialized table
+        String alterMaterializedTableSuspendDDL = "ALTER MATERIALIZED TABLE 
users_shops SUSPEND";
+        OperationHandle alterMaterializedTableSuspendHandle =
+                service.executeStatement(
+                        sessionHandle, alterMaterializedTableSuspendDDL, -1, 
new Configuration());
+        awaitOperationTermination(service, sessionHandle, 
alterMaterializedTableSuspendHandle);
+
+        // repeated suspend materialized table
+        OperationHandle repeatedAlterMaterializedTableSuspendHandle =
+                service.executeStatement(
+                        sessionHandle, alterMaterializedTableSuspendDDL, -1, 
new Configuration());
+        assertThatThrownBy(
+                        () ->
+                                awaitOperationTermination(
+                                        service,
+                                        sessionHandle,
+                                        
repeatedAlterMaterializedTableSuspendHandle))
+                .rootCause()
+                .isInstanceOf(SqlExecutionException.class)
+                .hasMessageContaining(
+                        String.format(
+                                "Materialized table %s refresh workflow has 
been suspended.",
+                                ObjectIdentifier.of(
+                                        fileSystemCatalogName,
+                                        TEST_DEFAULT_DATABASE,
+                                        "users_shops")));
+
+        // resume materialized table
+        String alterMaterializedTableResumeDDL =
+                "ALTER MATERIALIZED TABLE users_shops RESUME WITH 
('debezium-json.ignore-parse-errors' = 'true')";
+        OperationHandle alterMaterializedTableResumeHandle =
+                service.executeStatement(
+                        sessionHandle, alterMaterializedTableResumeDDL, -1, 
new Configuration());
+        awaitOperationTermination(service, sessionHandle, 
alterMaterializedTableResumeHandle);
+
+        // verify repeated resume materialized table
+        OperationHandle repeatedAlterMaterializedTableResumeHandle =
+                service.executeStatement(
+                        sessionHandle, alterMaterializedTableResumeDDL, -1, 
new Configuration());
+        assertThatThrownBy(
+                        () ->
+                                awaitOperationTermination(
+                                        service,
+                                        sessionHandle,
+                                        
repeatedAlterMaterializedTableResumeHandle))
+                .rootCause()
+                .isInstanceOf(SqlExecutionException.class)
+                .hasMessageContaining(
+                        String.format(
+                                "Materialized table %s refresh workflow has 
been resumed.",
+                                ObjectIdentifier.of(
+                                        fileSystemCatalogName,
+                                        TEST_DEFAULT_DATABASE,
+                                        "users_shops")));
+    }
+
     @Test
     void testDropMaterializedTableInContinuousMode() throws Exception {
         String materializedTableDDL =

Reply via email to