This is an automated email from the ASF dual-hosted git repository. ron pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 24040de22af994e624b3fa14dac8b68e2c25a4c8 Author: fengli <ldliu...@163.com> AuthorDate: Tue Jun 25 20:46:07 2024 +0800 [FLINK-35691][table] Fix unexpected behavior of repeated suspend and resume materialized table --- .../MaterializedTableManager.java | 55 +++++++ .../service/MaterializedTableStatementITCase.java | 181 +++++++++++++++++++++ 2 files changed, 236 insertions(+) diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java index a4cd306db60..293634e66d1 100644 --- a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java +++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java @@ -295,6 +295,15 @@ public class MaterializedTableManager { CatalogMaterializedTable materializedTable = getCatalogMaterializedTable(operationExecutor, tableIdentifier); + // Initialization phase doesn't support resume operation. + if (CatalogMaterializedTable.RefreshStatus.INITIALIZING + == materializedTable.getRefreshStatus()) { + throw new SqlExecutionException( + String.format( + "Materialized table %s is being initialized and does not support suspend operation.", + tableIdentifier)); + } + if (CatalogMaterializedTable.RefreshMode.CONTINUOUS == materializedTable.getRefreshMode()) { suspendContinuousRefreshJob( operationExecutor, handle, tableIdentifier, materializedTable); @@ -313,6 +322,14 @@ public class MaterializedTableManager { ContinuousRefreshHandler refreshHandler = deserializeContinuousHandler(materializedTable.getSerializedRefreshHandler()); + if (CatalogMaterializedTable.RefreshStatus.SUSPENDED + == materializedTable.getRefreshStatus()) { + throw new SqlExecutionException( + String.format( + "Materialized table %s continuous refresh job has been suspended, jobId is %s.", + tableIdentifier, refreshHandler.getJobId())); + } + String savepointPath = stopJobWithSavepoint(operationExecutor, handle, refreshHandler.getJobId()); @@ -344,6 +361,14 @@ public class MaterializedTableManager { OperationHandle handle, ObjectIdentifier tableIdentifier, CatalogMaterializedTable materializedTable) { + if (CatalogMaterializedTable.RefreshStatus.SUSPENDED + == materializedTable.getRefreshStatus()) { + throw new SqlExecutionException( + String.format( + "Materialized table %s refresh workflow has been suspended.", + tableIdentifier)); + } + if (workflowScheduler == null) { throw new SqlExecutionException( "The workflow scheduler must be configured when suspending materialized table in full refresh mode."); @@ -384,6 +409,15 @@ public class MaterializedTableManager { CatalogMaterializedTable catalogMaterializedTable = getCatalogMaterializedTable(operationExecutor, tableIdentifier); + // Initialization phase doesn't support resume operation. + if (CatalogMaterializedTable.RefreshStatus.INITIALIZING + == catalogMaterializedTable.getRefreshStatus()) { + throw new SqlExecutionException( + String.format( + "Materialized table %s is being initialized and does not support resume operation.", + tableIdentifier)); + } + if (CatalogMaterializedTable.RefreshMode.CONTINUOUS == catalogMaterializedTable.getRefreshMode()) { resumeContinuousRefreshJob( @@ -414,6 +448,18 @@ public class MaterializedTableManager { deserializeContinuousHandler( catalogMaterializedTable.getSerializedRefreshHandler()); + // Repeated resume continuous refresh job is not supported + if (CatalogMaterializedTable.RefreshStatus.ACTIVATED + == catalogMaterializedTable.getRefreshStatus()) { + JobStatus jobStatus = getJobStatus(operationExecutor, handle, refreshHandler); + if (!jobStatus.isGloballyTerminalState()) { + throw new SqlExecutionException( + String.format( + "Materialized table %s continuous refresh job has been resumed, jobId is %s.", + tableIdentifier, refreshHandler.getJobId())); + } + } + Optional<String> restorePath = refreshHandler.getRestorePath(); try { executeContinuousRefreshJob( @@ -438,6 +484,15 @@ public class MaterializedTableManager { ObjectIdentifier tableIdentifier, CatalogMaterializedTable catalogMaterializedTable, Map<String, String> dynamicOptions) { + // Repeated resume refresh workflow is not supported + if (CatalogMaterializedTable.RefreshStatus.ACTIVATED + == catalogMaterializedTable.getRefreshStatus()) { + throw new SqlExecutionException( + String.format( + "Materialized table %s refresh workflow has been resumed.", + tableIdentifier)); + } + if (workflowScheduler == null) { throw new SqlExecutionException( "The workflow scheduler must be configured when resuming materialized table in full refresh mode."); diff --git a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java index f090a6cada7..98ee00d48ee 100644 --- a/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java +++ b/flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java @@ -654,6 +654,114 @@ public class MaterializedTableStatementITCase extends AbstractMaterializedTableS "Savepoint directory is not configured, can't stop job with savepoint."); } + @Test + void testAlterMaterializedTableWithRepeatedSuspendAndResumeInContinuousMode( + @TempDir Path temporaryPath) throws Exception { + String materializedTableDDL = + "CREATE MATERIALIZED TABLE users_shops" + + " PARTITIONED BY (ds)\n" + + " WITH(\n" + + " 'format' = 'debezium-json'\n" + + " )\n" + + " FRESHNESS = INTERVAL '30' SECOND\n" + + " AS SELECT \n" + + " user_id,\n" + + " shop_id,\n" + + " ds,\n" + + " SUM (payment_amount_cents) AS payed_buy_fee_sum,\n" + + " SUM (1) AS pv\n" + + " FROM (\n" + + " SELECT user_id, shop_id, DATE_FORMAT(order_created_at, 'yyyy-MM-dd') AS ds, payment_amount_cents FROM datagenSource" + + " ) AS tmp\n" + + " GROUP BY (user_id, shop_id, ds)"; + + OperationHandle materializedTableHandle = + service.executeStatement( + sessionHandle, materializedTableDDL, -1, new Configuration()); + awaitOperationTermination(service, sessionHandle, materializedTableHandle); + + ResolvedCatalogMaterializedTable activeMaterializedTable = + (ResolvedCatalogMaterializedTable) + service.getTable( + sessionHandle, + ObjectIdentifier.of( + fileSystemCatalogName, + TEST_DEFAULT_DATABASE, + "users_shops")); + waitUntilAllTasksAreRunning( + restClusterClient, + JobID.fromHexString( + ContinuousRefreshHandlerSerializer.INSTANCE + .deserialize( + activeMaterializedTable.getSerializedRefreshHandler(), + getClass().getClassLoader()) + .getJobId())); + + // suspend materialized table + String savepointDir = temporaryPath.toString(); + String alterJobSavepointDDL = + String.format( + "SET 'execution.checkpointing.savepoint-dir' = 'file://%s'", savepointDir); + OperationHandle alterMaterializedTableSavepointHandle = + service.executeStatement( + sessionHandle, alterJobSavepointDDL, -1, new Configuration()); + awaitOperationTermination(service, sessionHandle, alterMaterializedTableSavepointHandle); + + // suspend materialized table + String alterMaterializedTableSuspendDDL = "ALTER MATERIALIZED TABLE users_shops SUSPEND"; + OperationHandle alterMaterializedTableSuspendHandle = + service.executeStatement( + sessionHandle, alterMaterializedTableSuspendDDL, -1, new Configuration()); + awaitOperationTermination(service, sessionHandle, alterMaterializedTableSuspendHandle); + + // verify repeated suspend materialized table + OperationHandle repeatedAlterMaterializedTableSuspendHandle = + service.executeStatement( + sessionHandle, alterMaterializedTableSuspendDDL, -1, new Configuration()); + assertThatThrownBy( + () -> + awaitOperationTermination( + service, + sessionHandle, + repeatedAlterMaterializedTableSuspendHandle)) + .rootCause() + .isInstanceOf(SqlExecutionException.class) + .hasMessageContaining( + String.format( + "Materialized table %s continuous refresh job has been suspended", + ObjectIdentifier.of( + fileSystemCatalogName, + TEST_DEFAULT_DATABASE, + "users_shops"))); + + // resume materialized table + String alterMaterializedTableResumeDDL = "ALTER MATERIALIZED TABLE users_shops RESUME"; + OperationHandle alterMaterializedTableResumeHandle = + service.executeStatement( + sessionHandle, alterMaterializedTableResumeDDL, -1, new Configuration()); + awaitOperationTermination(service, sessionHandle, alterMaterializedTableResumeHandle); + + // verify repeated resume materialized table + OperationHandle repeatedAlterMaterializedTableResumeHandle = + service.executeStatement( + sessionHandle, alterMaterializedTableResumeDDL, -1, new Configuration()); + assertThatThrownBy( + () -> + awaitOperationTermination( + service, + sessionHandle, + repeatedAlterMaterializedTableResumeHandle)) + .rootCause() + .isInstanceOf(SqlExecutionException.class) + .hasMessageContaining( + String.format( + "Materialized table %s continuous refresh job has been resumed", + ObjectIdentifier.of( + fileSystemCatalogName, + TEST_DEFAULT_DATABASE, + "users_shops"))); + } + @Test void testAlterMaterializedTableSuspendAndResumeInFullMode() throws Exception { createAndVerifyCreateMaterializedTableWithData( @@ -753,6 +861,79 @@ public class MaterializedTableStatementITCase extends AbstractMaterializedTableS .containsEntry("debezium-json.ignore-parse-errors", "true"); } + @Test + void testAlterMaterializedTableWithRepeatedSuspendAndResumeInFullMode() throws Exception { + createAndVerifyCreateMaterializedTableWithData( + "users_shops", Collections.emptyList(), Collections.emptyMap(), RefreshMode.FULL); + + ResolvedCatalogMaterializedTable activeMaterializedTable = + (ResolvedCatalogMaterializedTable) + service.getTable( + sessionHandle, + ObjectIdentifier.of( + fileSystemCatalogName, + TEST_DEFAULT_DATABASE, + "users_shops")); + + assertThat(activeMaterializedTable.getRefreshStatus()) + .isEqualTo(CatalogMaterializedTable.RefreshStatus.ACTIVATED); + + // suspend materialized table + String alterMaterializedTableSuspendDDL = "ALTER MATERIALIZED TABLE users_shops SUSPEND"; + OperationHandle alterMaterializedTableSuspendHandle = + service.executeStatement( + sessionHandle, alterMaterializedTableSuspendDDL, -1, new Configuration()); + awaitOperationTermination(service, sessionHandle, alterMaterializedTableSuspendHandle); + + // repeated suspend materialized table + OperationHandle repeatedAlterMaterializedTableSuspendHandle = + service.executeStatement( + sessionHandle, alterMaterializedTableSuspendDDL, -1, new Configuration()); + assertThatThrownBy( + () -> + awaitOperationTermination( + service, + sessionHandle, + repeatedAlterMaterializedTableSuspendHandle)) + .rootCause() + .isInstanceOf(SqlExecutionException.class) + .hasMessageContaining( + String.format( + "Materialized table %s refresh workflow has been suspended.", + ObjectIdentifier.of( + fileSystemCatalogName, + TEST_DEFAULT_DATABASE, + "users_shops"))); + + // resume materialized table + String alterMaterializedTableResumeDDL = + "ALTER MATERIALIZED TABLE users_shops RESUME WITH ('debezium-json.ignore-parse-errors' = 'true')"; + OperationHandle alterMaterializedTableResumeHandle = + service.executeStatement( + sessionHandle, alterMaterializedTableResumeDDL, -1, new Configuration()); + awaitOperationTermination(service, sessionHandle, alterMaterializedTableResumeHandle); + + // verify repeated resume materialized table + OperationHandle repeatedAlterMaterializedTableResumeHandle = + service.executeStatement( + sessionHandle, alterMaterializedTableResumeDDL, -1, new Configuration()); + assertThatThrownBy( + () -> + awaitOperationTermination( + service, + sessionHandle, + repeatedAlterMaterializedTableResumeHandle)) + .rootCause() + .isInstanceOf(SqlExecutionException.class) + .hasMessageContaining( + String.format( + "Materialized table %s refresh workflow has been resumed.", + ObjectIdentifier.of( + fileSystemCatalogName, + TEST_DEFAULT_DATABASE, + "users_shops"))); + } + @Test void testDropMaterializedTableInContinuousMode() throws Exception { String materializedTableDDL =