Re: [PR] [FLINK-35200][table] Support the execution of suspend, resume materialized table in full refresh mode [flink]

2024-06-04 Thread via GitHub


lsyldliu closed pull request #24877:  [FLINK-35200][table] Support the 
execution of suspend, resume materialized table in full refresh mode
URL: https://github.com/apache/flink/pull/24877


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35200][table] Support the execution of suspend, resume materialized table in full refresh mode [flink]

2024-06-03 Thread via GitHub


hackergin commented on code in PR #24877:
URL: https://github.com/apache/flink/pull/24877#discussion_r1624617251


##
flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/MaterializedTableStatementITCase.java:
##
@@ -652,6 +654,158 @@ void 
testAlterMaterializedTableWithoutSavepointDirConfigured() throws Exception
 "Savepoint directory is not configured, can't stop job 
with savepoint.");
 }
 
+@Test
+void testAlterMaterializedSuspendAndResumeInFullMode() throws Exception {
+createMaterializedTableInFullMode("users_shops", 
Collections.emptyList());
+
+ResolvedCatalogMaterializedTable activeMaterializedTable =
+(ResolvedCatalogMaterializedTable)
+service.getTable(
+sessionHandle,
+ObjectIdentifier.of(
+fileSystemCatalogName,
+TEST_DEFAULT_DATABASE,
+"users_shops"));
+
+assertThat(activeMaterializedTable.getRefreshStatus())
+.isEqualTo(CatalogMaterializedTable.RefreshStatus.ACTIVATED);
+
+// suspend materialized table
+String alterMaterializedTableSuspendDDL = "ALTER MATERIALIZED TABLE 
users_shops SUSPEND";
+OperationHandle alterMaterializedTableSuspendHandle =
+service.executeStatement(
+sessionHandle, alterMaterializedTableSuspendDDL, -1, 
new Configuration());
+
+awaitOperationTermination(service, sessionHandle, 
alterMaterializedTableSuspendHandle);
+
+ResolvedCatalogMaterializedTable suspendMaterializedTable =
+(ResolvedCatalogMaterializedTable)
+service.getTable(
+sessionHandle,
+ObjectIdentifier.of(
+fileSystemCatalogName,
+TEST_DEFAULT_DATABASE,
+"users_shops"));
+
+assertThat(suspendMaterializedTable.getRefreshStatus())
+.isEqualTo(CatalogMaterializedTable.RefreshStatus.SUSPENDED);
+
+// verify workflow is suspended
+byte[] refreshHandler = 
suspendMaterializedTable.getSerializedRefreshHandler();
+EmbeddedRefreshHandler suspendRefreshHandler =
+EmbeddedRefreshHandlerSerializer.INSTANCE.deserialize(
+refreshHandler, getClass().getClassLoader());
+
+String workflowName = suspendRefreshHandler.getWorkflowName();
+String workflowGroup = suspendRefreshHandler.getWorkflowGroup();
+EmbeddedQuartzScheduler embeddedWorkflowScheduler =
+SQL_GATEWAY_REST_ENDPOINT_EXTENSION
+.getSqlGatewayRestEndpoint()
+.getQuartzScheduler();
+JobKey jobKey = new JobKey(workflowName, workflowGroup);
+Trigger.TriggerState suspendTriggerState =
+embeddedWorkflowScheduler
+.getQuartzScheduler()
+.getTriggerState(new TriggerKey(workflowName, 
workflowGroup));
+
+assertThat(suspendTriggerState).isEqualTo(Trigger.TriggerState.PAUSED);
+
+// resume materialized table
+String alterMaterializedTableResumeDDL =
+"ALTER MATERIALIZED TABLE users_shops RESUME WITH 
('debezium-json.ignore-parse-errors' = 'true')";
+OperationHandle alterMaterializedTableResumeHandle =
+service.executeStatement(
+sessionHandle, alterMaterializedTableResumeDDL, -1, 
new Configuration());
+awaitOperationTermination(service, sessionHandle, 
alterMaterializedTableResumeHandle);
+
+ResolvedCatalogMaterializedTable resumedCatalogMaterializedTable =
+(ResolvedCatalogMaterializedTable)
+service.getTable(
+sessionHandle,
+ObjectIdentifier.of(
+fileSystemCatalogName,
+TEST_DEFAULT_DATABASE,
+"users_shops"));
+
+assertThat(resumedCatalogMaterializedTable.getOptions())
+.doesNotContainKey("debezium-json.ignore-parse-errors");
+assertThat(resumedCatalogMaterializedTable.getRefreshStatus())
+.isEqualTo(CatalogMaterializedTable.RefreshStatus.ACTIVATED);
+
+// verify workflow is resumed
+refreshHandler = 
resumedCatalogMaterializedTable.getSerializedRefreshHandler();
+EmbeddedRefreshHandler resumeRefreshHandler =
+EmbeddedRefreshHandlerSerializer.INSTANCE.deserialize(
+refreshHandler, getClass().getClassLoader());
+
+

Re: [PR] [FLINK-35200][table] Support the execution of suspend, resume materialized table in full refresh mode [flink]

2024-06-03 Thread via GitHub


hackergin commented on PR #24877:
URL: https://github.com/apache/flink/pull/24877#issuecomment-2145284586

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35200][table] Support the execution of suspend, resume materialized table in full refresh mode [flink]

2024-06-02 Thread via GitHub


lsyldliu commented on code in PR #24877:
URL: https://github.com/apache/flink/pull/24877#discussion_r1623739592


##
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java:
##
@@ -302,21 +315,47 @@ private ResultFetcher callAlterMaterializedTableSuspend(
 refreshHandler.getJobId(),
 savepointPath);
 
-CatalogMaterializedTable updatedMaterializedTable =
-materializedTable.copy(
-CatalogMaterializedTable.RefreshStatus.SUSPENDED,
-
materializedTable.getRefreshHandlerDescription().orElse(null),
-serializeContinuousHandler(updateRefreshHandler));
-List tableChanges = new ArrayList<>();
-tableChanges.add(
-
TableChange.modifyRefreshStatus(CatalogMaterializedTable.RefreshStatus.ACTIVATED));
-AlterMaterializedTableChangeOperation 
alterMaterializedTableChangeOperation =
-new AlterMaterializedTableChangeOperation(
-tableIdentifier, tableChanges, 
updatedMaterializedTable);
+updateRefreshHandler(
+operationExecutor,
+handle,
+tableIdentifier,
+materializedTable,
+CatalogMaterializedTable.RefreshStatus.SUSPENDED,
+updateRefreshHandler.asSummaryString(),
+serializeContinuousHandler(updateRefreshHandler));
+}
 
-operationExecutor.callExecutableOperation(handle, 
alterMaterializedTableChangeOperation);
+private void suspendRefreshWorkflow(
+OperationExecutor operationExecutor,
+OperationHandle handle,
+ObjectIdentifier tableIdentifier,
+CatalogMaterializedTable materializedTable) {
 
-return ResultFetcher.fromTableResult(handle, TABLE_RESULT_OK, false);
+try {
+RefreshHandlerSerializer refreshHandlerSerializer =
+workflowScheduler.getRefreshHandlerSerializer();

Review Comment:
   You should check the `workflowScheduler` is not null before suspending.



##
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/materializedtable/MaterializedTableManager.java:
##
@@ -328,24 +367,81 @@ private ResultFetcher callAlterMaterializedTableResume(
 getCatalogMaterializedTable(operationExecutor, 
tableIdentifier);
 
 if (CatalogMaterializedTable.RefreshMode.CONTINUOUS
-!= catalogMaterializedTable.getRefreshMode()) {
-throw new SqlExecutionException(
-"Only support resume continuous refresh job currently.");
+== catalogMaterializedTable.getRefreshMode()) {
+resumeContinuousRefreshJob(
+operationExecutor,
+handle,
+tableIdentifier,
+catalogMaterializedTable,
+op.getDynamicOptions());
+} else {
+resumeRefreshWorkflow(
+operationExecutor,
+handle,
+tableIdentifier,
+catalogMaterializedTable,
+op.getDynamicOptions());
 }
 
-ContinuousRefreshHandler continuousRefreshHandler =
+return ResultFetcher.fromTableResult(handle, TABLE_RESULT_OK, false);
+}
+
+private void resumeContinuousRefreshJob(
+OperationExecutor operationExecutor,
+OperationHandle handle,
+ObjectIdentifier tableIdentifier,
+CatalogMaterializedTable catalogMaterializedTable,
+Map dynamicOptions) {
+ContinuousRefreshHandler refreshHandler =
 deserializeContinuousHandler(
 
catalogMaterializedTable.getSerializedRefreshHandler());
-Optional restorePath = 
continuousRefreshHandler.getRestorePath();
+
+Optional restorePath = refreshHandler.getRestorePath();
 executeContinuousRefreshJob(
 operationExecutor,
 handle,
 catalogMaterializedTable,
 tableIdentifier,
-op.getDynamicOptions(),
+dynamicOptions,
 restorePath);
+}
 
-return ResultFetcher.fromTableResult(handle, TABLE_RESULT_OK, false);
+private void resumeRefreshWorkflow(
+OperationExecutor operationExecutor,
+OperationHandle handle,
+ObjectIdentifier tableIdentifier,
+CatalogMaterializedTable catalogMaterializedTable,
+Map dynamicOptions) {
+try {
+RefreshHandlerSerializer refreshHandlerSerializer =

Review Comment:
   ```suggestion
   RefreshHandlerSerializer refreshHandlerSerializer =
   ```



##

Re: [PR] [FLINK-35200][table] Support the execution of suspend, resume materialized table in full refresh mode [flink]

2024-06-02 Thread via GitHub


hackergin commented on PR #24877:
URL: https://github.com/apache/flink/pull/24877#issuecomment-2143766184

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-35200][table] Support the execution of suspend, resume materialized table in full refresh mode [flink]

2024-06-01 Thread via GitHub


flinkbot commented on PR #24877:
URL: https://github.com/apache/flink/pull/24877#issuecomment-2143497653

   
   ## CI report:
   
   * 4940feeb60b1438b3fe1ac7ee487a48f78e2d629 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [FLINK-35200][table] Support the execution of suspend, resume materialized table in full refresh mode [flink]

2024-06-01 Thread via GitHub


hackergin opened a new pull request, #24877:
URL: https://github.com/apache/flink/pull/24877

   ## What is the purpose of the change
   
   *Support the execution of suspend, resume materialized table in full refresh 
mode*
   
   
   ## Brief change log
   
   * Add dynamic options for ResumeRefreshWorkflow interface.
   *  Support the execution of suspend, resume materialized table in full 
refresh mode 
   
   
   ## Verifying this change
   
   * Add test case testAlterMaterializedSuspendAndResumeInFullMode in 
MaterializedTableStatementITCase to verify the suspend and resume operation for 
materialized table in full mode.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes)
 - If yes, how is the feature documented? (will be added in another pr)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org