fsk119 commented on code in PR #21292:
URL: https://github.com/apache/flink/pull/21292#discussion_r1050549461
##########
flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestUtils.java:
##########
@@ -181,4 +186,24 @@ public static void waitUntilJobCanceled(JobID jobId,
ClusterClient<?> client)
Thread.sleep(50);
}
}
+
+ /**
+ * Wait util at least one job turns into RUNNING status in the cluster.
Applicable for single
+ * job scenarios.
+ *
+ * @param client ClusterClient which could be {@link
+ * org.apache.flink.test.junit5.InjectClusterClient}.
+ */
+ public static void waitUntilJobIsRunning(ClusterClient<?> client) throws
Exception {
+ Collection<JobStatusMessage> statusMessages = client.listJobs().get();
Review Comment:
This should be in the while-loop.
##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/operation/OperationExecutor.java:
##########
@@ -405,4 +426,115 @@ private Set<TableInfo> listViews(String catalogName,
String databaseName) {
TableKind.VIEW))
.collect(Collectors.toSet()));
}
+
+ public ResultFetcher callStopJobOperation(
+ OperationHandle operationHandle, StopJobOperation stopJobOperation)
+ throws SqlExecutionException {
+ String jobId = stopJobOperation.getJobId();
+ boolean isWithSavepoint = stopJobOperation.isWithSavepoint();
+ boolean isWithDrain = stopJobOperation.isWithDrain();
+ Duration clientTimeout =
+ Configuration.fromMap(sessionContext.getConfigMap())
+ .get(ClientOptions.CLIENT_TIMEOUT);
+ Optional<String> savepoint;
+ try {
+ savepoint =
+ runClusterAction(
+ operationHandle,
+ clusterClient -> {
+ if (isWithSavepoint) {
+ // blocking get savepoint path
+ try {
+ return Optional.of(
+ clusterClient
+ .stopWithSavepoint(
+
JobID.fromHexString(jobId),
+ isWithDrain,
+
executionConfig.get(
+
CheckpointingOptions
+
.SAVEPOINT_DIRECTORY),
+
SavepointFormatType.DEFAULT)
+ .get(
+
clientTimeout.toMillis(),
+
TimeUnit.MILLISECONDS));
+ } catch (Exception e) {
+ throw new FlinkException(
+ "Could not stop job "
+ + stopJobOperation
Review Comment:
The exception msg is
```
Could not stop job
org.apache.flink.table.operations.command.StopJobOperation@4a951f6b in session
aa057ac2-8107-4f14-9fa1-06f637fe2b8e.
```
I think we can print the job id here.
##########
flink-table/flink-sql-gateway/src/test/java/org/apache/flink/table/gateway/service/SqlGatewayServiceITCase.java:
##########
@@ -379,21 +392,62 @@ public void testExecuteSqlWithConfig() {
-1,
Configuration.fromMap(Collections.singletonMap(key,
value)));
- Long token = 0L;
- List<RowData> settings = new ArrayList<>();
- while (token != null) {
- ResultSet result =
- service.fetchResults(sessionHandle, operationHandle,
token, Integer.MAX_VALUE);
- settings.addAll(result.getData());
- token = result.getNextToken();
- }
+ List<RowData> settings = fetchAllResults(sessionHandle,
operationHandle);
assertThat(settings)
.contains(
GenericRowData.of(
StringData.fromString(key),
StringData.fromString(value)));
}
+ @ParameterizedTest
+ @CsvSource({"WITH SAVEPOINT,true", "WITH SAVEPOINT WITH DRAIN,true",
"'',false"})
+ public void testStopJobStatementWithSavepoint(
+ String option,
+ boolean hasSavepoint,
+ @InjectClusterClient RestClusterClient<?> restClusterClient,
+ @TempDir File tmpDir)
+ throws Exception {
+ Configuration configuration = new
Configuration(MINI_CLUSTER.getClientConfiguration());
+ configuration.setBoolean(TableConfigOptions.TABLE_DML_SYNC, false);
+ File savepointDir = new File(tmpDir, "savepoints");
+ configuration.set(
+ CheckpointingOptions.SAVEPOINT_DIRECTORY,
savepointDir.toURI().toString());
+
+ SessionHandle sessionHandle =
service.openSession(defaultSessionEnvironment);
+
+ String sourceDdl = "CREATE TABLE source (a STRING) WITH
('connector'='datagen');";
+ String sinkDdl = "CREATE TABLE sink (a STRING) WITH
('connector'='blackhole');";
+ String insertSql = "INSERT INTO sink SELECT * FROM source;";
+ String stopSqlTemplate = "STOP JOB '%s' %s;";
+
+ service.executeStatement(sessionHandle, sourceDdl, -1, configuration);
+ service.executeStatement(sessionHandle, sinkDdl, -1, configuration);
+
+ OperationHandle insertOperationHandle =
+ service.executeStatement(sessionHandle, insertSql, -1,
configuration);
+
+ List<RowData> results = fetchAllResults(sessionHandle,
insertOperationHandle);
+ assertThat(results.size()).isEqualTo(1);
+ String jobId = results.get(0).getString(0).toString();
+
+ TestUtils.waitUntilJobIsRunning(restClusterClient);
Review Comment:
I think we should wait for all tasks are running. We can do as
`SavepointITCase` does.
```
public static void waitUntilAllTasksAreRunning(
RestClusterClient<?> restClusterClient, JobID jobId) throws
Exception {
// access the REST endpoint of the cluster to determine the state of
each
// ExecutionVertex
final JobDetailsHeaders detailsHeaders =
JobDetailsHeaders.getInstance();
final JobMessageParameters params =
detailsHeaders.getUnresolvedMessageParameters();
params.jobPathParameter.resolve(jobId);
CommonTestUtils.waitUntilCondition(
() ->
restClusterClient
.sendRequest(detailsHeaders, params,
EmptyRequestBody.getInstance())
.thenApply(
detailsInfo ->
allVerticesRunning(
detailsInfo.getJobVerticesPerState()))
.get());
}
private static boolean allVerticesRunning(Map<ExecutionState, Integer>
states) {
return states.entrySet().stream()
.allMatch(
entry -> {
if (entry.getKey() == ExecutionState.RUNNING) {
return entry.getValue() > 0;
} else {
return entry.getValue() == 0; // no vertices
in non-running state.
}
});
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]