Re: [PR] [FLINK-35200][table] Support the execution of suspend, resume materialized table in full refresh mode [flink]
lsyldliu closed pull request #24877: [FLINK-35200][table] Support the execution of suspend, resume materialized table in full refresh mode URL: https://github.com/apache/flink/pull/24877 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35200][table] Support the execution of suspend, resume materialized table in full refresh mode [flink]
hackergin commented on code in PR #24877: URL: https://github.com/apache/flink/pull/24877#discussion_r1624617251 ## flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java: ## @@ -652,6 +654,158 @@ void testAlterMaterializedTableWithoutSavepointDirConfigured() throws Exception "Savepoint directory is not configured, can't stop job with savepoint."); } +@Test +void testAlterMaterializedSuspendAndResumeInFullMode() throws Exception { +createMaterializedTableInFullMode("users_shops", Collections.emptyList()); + +ResolvedCatalogMaterializedTable activeMaterializedTable = +(ResolvedCatalogMaterializedTable) +service.getTable( +sessionHandle, +ObjectIdentifier.of( +fileSystemCatalogName, +TEST_DEFAULT_DATABASE, +"users_shops")); + +assertThat(activeMaterializedTable.getRefreshStatus()) +.isEqualTo(CatalogMaterializedTable.RefreshStatus.ACTIVATED); + +// suspend materialized table +String alterMaterializedTableSuspendDDL = "ALTER MATERIALIZED TABLE users_shops SUSPEND"; +OperationHandle alterMaterializedTableSuspendHandle = +service.executeStatement( +sessionHandle, alterMaterializedTableSuspendDDL, -1, new Configuration()); + +awaitOperationTermination(service, sessionHandle, alterMaterializedTableSuspendHandle); + +ResolvedCatalogMaterializedTable suspendMaterializedTable = +(ResolvedCatalogMaterializedTable) +service.getTable( +sessionHandle, +ObjectIdentifier.of( +fileSystemCatalogName, +TEST_DEFAULT_DATABASE, +"users_shops")); + +assertThat(suspendMaterializedTable.getRefreshStatus()) +.isEqualTo(CatalogMaterializedTable.RefreshStatus.SUSPENDED); + +// verify workflow is suspended +byte[] refreshHandler = suspendMaterializedTable.getSerializedRefreshHandler(); +EmbeddedRefreshHandler suspendRefreshHandler = +EmbeddedRefreshHandlerSerializer.INSTANCE.deserialize( +refreshHandler, getClass().getClassLoader()); + +String workflowName = suspendRefreshHandler.getWorkflowName(); +String workflowGroup = suspendRefreshHandler.getWorkflowGroup(); +EmbeddedQuartzScheduler embeddedWorkflowScheduler = +SQL_GATEWAY_REST_ENDPOINT_EXTENSION +.getSqlGatewayRestEndpoint() +.getQuartzScheduler(); +JobKey jobKey = new JobKey(workflowName, workflowGroup); +Trigger.TriggerState suspendTriggerState = +embeddedWorkflowScheduler +.getQuartzScheduler() +.getTriggerState(new TriggerKey(workflowName, workflowGroup)); + +assertThat(suspendTriggerState).isEqualTo(Trigger.TriggerState.PAUSED); + +// resume materialized table +String alterMaterializedTableResumeDDL = +"ALTER MATERIALIZED TABLE users_shops RESUME WITH ('debezium-json.ignore-parse-errors' = 'true')"; +OperationHandle alterMaterializedTableResumeHandle = +service.executeStatement( +sessionHandle, alterMaterializedTableResumeDDL, -1, new Configuration()); +awaitOperationTermination(service, sessionHandle, alterMaterializedTableResumeHandle); + +ResolvedCatalogMaterializedTable resumedCatalogMaterializedTable = +(ResolvedCatalogMaterializedTable) +service.getTable( +sessionHandle, +ObjectIdentifier.of( +fileSystemCatalogName, +TEST_DEFAULT_DATABASE, +"users_shops")); + +assertThat(resumedCatalogMaterializedTable.getOptions()) +.doesNotContainKey("debezium-json.ignore-parse-errors"); +assertThat(resumedCatalogMaterializedTable.getRefreshStatus()) +.isEqualTo(CatalogMaterializedTable.RefreshStatus.ACTIVATED); + +// verify workflow is resumed +refreshHandler = resumedCatalogMaterializedTable.getSerializedRefreshHandler(); +EmbeddedRefreshHandler resumeRefreshHandler = +EmbeddedRefreshHandlerSerializer.INSTANCE.deserialize( +refreshHandler, getClass().getClassLoader()); + +
Re: [PR] [FLINK-35200][table] Support the execution of suspend, resume materialized table in full refresh mode [flink]
hackergin commented on PR #24877: URL: https://github.com/apache/flink/pull/24877#issuecomment-2145284586 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35200][table] Support the execution of suspend, resume materialized table in full refresh mode [flink]
lsyldliu commented on code in PR #24877: URL: https://github.com/apache/flink/pull/24877#discussion_r1623739592 ## flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java: ## @@ -302,21 +315,47 @@ private ResultFetcher callAlterMaterializedTableSuspend( refreshHandler.getJobId(), savepointPath); -CatalogMaterializedTable updatedMaterializedTable = -materializedTable.copy( -CatalogMaterializedTable.RefreshStatus.SUSPENDED, - materializedTable.getRefreshHandlerDescription().orElse(null), -serializeContinuousHandler(updateRefreshHandler)); -List tableChanges = new ArrayList<>(); -tableChanges.add( - TableChange.modifyRefreshStatus(CatalogMaterializedTable.RefreshStatus.ACTIVATED)); -AlterMaterializedTableChangeOperation alterMaterializedTableChangeOperation = -new AlterMaterializedTableChangeOperation( -tableIdentifier, tableChanges, updatedMaterializedTable); +updateRefreshHandler( +operationExecutor, +handle, +tableIdentifier, +materializedTable, +CatalogMaterializedTable.RefreshStatus.SUSPENDED, +updateRefreshHandler.asSummaryString(), +serializeContinuousHandler(updateRefreshHandler)); +} -operationExecutor.callExecutableOperation(handle, alterMaterializedTableChangeOperation); +private void suspendRefreshWorkflow( +OperationExecutor operationExecutor, +OperationHandle handle, +ObjectIdentifier tableIdentifier, +CatalogMaterializedTable materializedTable) { -return ResultFetcher.fromTableResult(handle, TABLE_RESULT_OK, false); +try { +RefreshHandlerSerializer refreshHandlerSerializer = +workflowScheduler.getRefreshHandlerSerializer(); Review Comment: You should check the `workflowScheduler` is not null before suspending. ## flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java: ## @@ -328,24 +367,81 @@ private ResultFetcher callAlterMaterializedTableResume( getCatalogMaterializedTable(operationExecutor, tableIdentifier); if (CatalogMaterializedTable.RefreshMode.CONTINUOUS -!= catalogMaterializedTable.getRefreshMode()) { -throw new SqlExecutionException( -"Only support resume continuous refresh job currently."); +== catalogMaterializedTable.getRefreshMode()) { +resumeContinuousRefreshJob( +operationExecutor, +handle, +tableIdentifier, +catalogMaterializedTable, +op.getDynamicOptions()); +} else { +resumeRefreshWorkflow( +operationExecutor, +handle, +tableIdentifier, +catalogMaterializedTable, +op.getDynamicOptions()); } -ContinuousRefreshHandler continuousRefreshHandler = +return ResultFetcher.fromTableResult(handle, TABLE_RESULT_OK, false); +} + +private void resumeContinuousRefreshJob( +OperationExecutor operationExecutor, +OperationHandle handle, +ObjectIdentifier tableIdentifier, +CatalogMaterializedTable catalogMaterializedTable, +Map dynamicOptions) { +ContinuousRefreshHandler refreshHandler = deserializeContinuousHandler( catalogMaterializedTable.getSerializedRefreshHandler()); -Optional restorePath = continuousRefreshHandler.getRestorePath(); + +Optional restorePath = refreshHandler.getRestorePath(); executeContinuousRefreshJob( operationExecutor, handle, catalogMaterializedTable, tableIdentifier, -op.getDynamicOptions(), +dynamicOptions, restorePath); +} -return ResultFetcher.fromTableResult(handle, TABLE_RESULT_OK, false); +private void resumeRefreshWorkflow( +OperationExecutor operationExecutor, +OperationHandle handle, +ObjectIdentifier tableIdentifier, +CatalogMaterializedTable catalogMaterializedTable, +Map dynamicOptions) { +try { +RefreshHandlerSerializer refreshHandlerSerializer = Review Comment: ```suggestion RefreshHandlerSerializer refreshHandlerSerializer = ``` ##
Re: [PR] [FLINK-35200][table] Support the execution of suspend, resume materialized table in full refresh mode [flink]
hackergin commented on PR #24877: URL: https://github.com/apache/flink/pull/24877#issuecomment-2143766184 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-35200][table] Support the execution of suspend, resume materialized table in full refresh mode [flink]
flinkbot commented on PR #24877: URL: https://github.com/apache/flink/pull/24877#issuecomment-2143497653 ## CI report: * 4940feeb60b1438b3fe1ac7ee487a48f78e2d629 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [FLINK-35200][table] Support the execution of suspend, resume materialized table in full refresh mode [flink]
hackergin opened a new pull request, #24877: URL: https://github.com/apache/flink/pull/24877 ## What is the purpose of the change *Support the execution of suspend, resume materialized table in full refresh mode* ## Brief change log * Add dynamic options for ResumeRefreshWorkflow interface. * Support the execution of suspend, resume materialized table in full refresh mode ## Verifying this change * Add test case testAlterMaterializedSuspendAndResumeInFullMode in MaterializedTableStatementITCase to verify the suspend and resume operation for materialized table in full mode. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (yes) - If yes, how is the feature documented? (will be added in another pr) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org