LadyForest commented on code in PR #21717:
URL: https://github.com/apache/flink/pull/21717#discussion_r1089615342
##########
flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/ExecutorImplITCase.java:
##########
@@ -414,50 +462,130 @@ void testStopJob() throws Exception {
final String insert = "INSERT INTO snk SELECT a FROM src;";
try {
- executor.executeOperation(executor.parseStatement(srcDdl));
- executor.executeOperation(executor.parseStatement(snkDdl));
- TableResult result =
executor.executeOperation(executor.parseStatement(insert));
- JobClient jobClient = result.getJobClient().get();
- JobID jobId = jobClient.getJobID();
+ executor.configureSession(srcDdl);
+ executor.configureSession(snkDdl);
+ ClientResult result = executor.executeStatement(insert);
+ JobID jobID = result.getJobId();
// wait till the job turns into running status or the test times
out
- JobStatus jobStatus;
- do {
- Thread.sleep(2_000L);
- jobStatus = jobClient.getJobStatus().get();
- } while (jobStatus != JobStatus.RUNNING);
-
- Optional<String> savepoint = executor.stopJob(jobId.toString(),
true, true);
- assertThat(savepoint).isPresent();
+ TestUtils.waitUntilAllTasksAreRunning(clusterClient, jobID);
+ StringData savepointPath =
+ CollectionUtil.iteratorToList(
+ executor.executeStatement(
+ String.format("STOP JOB '%s' WITH
SAVEPOINT", jobID)))
+ .get(0)
+ .getString(0);
+ assertThat(
+ Files.exists(
+ Paths.get(
+ URI.create(
+
Preconditions.checkNotNull(savepointPath)
+ .toString()))))
+ .isTrue();
Review Comment:
```suggestion
assertThat(savepointPath)
.isNotNull()
.matches(
stringData ->
Files.exists(Paths.get(URI.create(stringData.toString()))));
```
##########
flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/rest/message/session/ConfigureSessionRequestBody.java:
##########
@@ -39,6 +39,10 @@ public class ConfigureSessionRequestBody implements
RequestBody {
@Nullable
private final Long timeout;
+ public ConfigureSessionRequestBody(String statement) {
+ this(statement, null);
+ }
+
public ConfigureSessionRequestBody(
Review Comment:
> The annotation `JsonCreator` is used when serde the json object. However,
the developer uses the creator here to build the object conveniently. Do you
mean we should add annotation `JsonCreator` for the constructor
`ConfigureSessionRequestBody(@JsonProperty(FIELD_NAME_STATEMENT) String
statement, @Nullable @JsonProperty(FIELD_NAME_EXECUTION_TIMEOUT) Long timeout)`?
Yes. The base interface `RequestBody` says
> Subclass instances are converted to JSON using jackson-databind.
Subclasses must have a constructor that accepts all fields of the JSON request,
that should be annotated with @JsonCreator.
##########
flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/ExecutorImplITCase.java:
##########
@@ -414,50 +462,130 @@ void testStopJob() throws Exception {
final String insert = "INSERT INTO snk SELECT a FROM src;";
try {
- executor.executeOperation(executor.parseStatement(srcDdl));
- executor.executeOperation(executor.parseStatement(snkDdl));
- TableResult result =
executor.executeOperation(executor.parseStatement(insert));
- JobClient jobClient = result.getJobClient().get();
- JobID jobId = jobClient.getJobID();
+ executor.configureSession(srcDdl);
+ executor.configureSession(snkDdl);
+ ClientResult result = executor.executeStatement(insert);
+ JobID jobID = result.getJobId();
// wait till the job turns into running status or the test times
out
- JobStatus jobStatus;
- do {
- Thread.sleep(2_000L);
- jobStatus = jobClient.getJobStatus().get();
- } while (jobStatus != JobStatus.RUNNING);
-
- Optional<String> savepoint = executor.stopJob(jobId.toString(),
true, true);
- assertThat(savepoint).isPresent();
+ TestUtils.waitUntilAllTasksAreRunning(clusterClient, jobID);
+ StringData savepointPath =
+ CollectionUtil.iteratorToList(
+ executor.executeStatement(
+ String.format("STOP JOB '%s' WITH
SAVEPOINT", jobID)))
+ .get(0)
+ .getString(0);
+ assertThat(
+ Files.exists(
+ Paths.get(
+ URI.create(
+
Preconditions.checkNotNull(savepointPath)
+ .toString()))))
+ .isTrue();
Review Comment:
Btw, I found the updated configuration is not read
```sql
Flink SQL> CREATE TABLE src (a STRING) WITH ('connector' = 'datagen');
[INFO] Execute statement succeed.
Flink SQL> CREATE TABLE snk (a STRING) WITH ('connector' = 'blackhole');
[INFO] Execute statement succeed.
Flink SQL> INSERT INTO snk SELECT a FROM src;
[INFO] Submitting SQL update statement to the cluster...
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: 9bafe0a3408cf8db4ae8b4fc4110322f
Flink SQL> show jobs;
+----------------------------------+--------------------------------------------------+---------+-------------------------+
| job id |
job name | status | start time |
+----------------------------------+--------------------------------------------------+---------+-------------------------+
| 9bafe0a3408cf8db4ae8b4fc4110322f |
insert-into_default_catalog.default_database.snk | RUNNING |
2023-01-28T03:47:56.161 |
+----------------------------------+--------------------------------------------------+---------+-------------------------+
1 row in set
Flink SQL> stop job '9bafe0a3408cf8db4ae8b4fc4110322f' with savepoint;
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.runtime.rest.util.RestClientException:
[org.apache.flink.runtime.rest.handler.RestHandlerException: Config key
[state.savepoints.dir] is not set. Property [targetDirectory] must be provided.
Flink SQL> set 'state.savepoints.dir' = '/tmp/test';
[INFO] Execute statement succeed.
Flink SQL> stop job '9bafe0a3408cf8db4ae8b4fc4110322f' with savepoint;
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.runtime.rest.util.RestClientException:
[org.apache.flink.runtime.rest.handler.RestHandlerException: Config key
[state.savepoints.dir] is not set. Property [targetDirectory] must be provided.
Flink SQL> set;
+--------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------+
| key |
value |
+--------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------+
| execution.attached |
true |
| execution.savepoint-restore-mode |
NO_CLAIM |
| execution.savepoint.ignore-unclaimed-state |
false |
| execution.shutdown-on-attached-exit |
false |
| execution.target |
remote |
| jobmanager.bind-host |
localhost |
| jobmanager.execution.failover-strategy |
region |
| jobmanager.memory.process.size |
1600m |
| jobmanager.rpc.address |
localhost |
| jobmanager.rpc.port |
6123 |
| parallelism.default |
1 |
| pipeline.classpaths |
|
| pipeline.jars |
file:${baseDir}/flink/flink-dist/target/flink-1.17-SNAPSHOT-bin/flink-1.17-SNAPSHOT/opt/flink-python-1.17-SNAPSHOT.jar
|
| rest.address |
localhost |
| rest.bind-address |
localhost |
| sql-gateway.endpoint.rest.address |
localhost |
| sql-gateway.endpoint.rest.port |
58950 |
| state.savepoints.dir |
/tmp/test |
| table.resources.download-dir |
/var/folders/xd/9dp1y4vd3h56kjkvdk426l500000gp/T/sql-gateway-3623ad21-994f-46e3-bfa5-2fb01a99d3c6
|
| taskmanager.bind-host |
localhost |
| taskmanager.host |
localhost |
| taskmanager.memory.process.size |
1728m |
| taskmanager.numberOfTaskSlots |
1 |
+--------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------+
23 rows in set
Flink SQL> set sql-client.verbose=true;
[INFO] Execute statement succeed.
Flink SQL> stop job '9bafe0a3408cf8db4ae8b4fc4110322f' with savepoint;
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.runtime.rest.util.RestClientException: [Internal server
error., <Exception on server side:
org.apache.flink.table.gateway.api.utils.SqlGatewayException:
org.apache.flink.table.gateway.api.utils.SqlGatewayException: Failed to
fetchResults.
at
org.apache.flink.table.gateway.rest.handler.statement.FetchResultsHandler.handleRequest(FetchResultsHandler.java:85)
at
org.apache.flink.table.gateway.rest.handler.AbstractSqlGatewayRestHandler.respondToRequest(AbstractSqlGatewayRestHandler.java:84)
at
org.apache.flink.table.gateway.rest.handler.AbstractSqlGatewayRestHandler.respondToRequest(AbstractSqlGatewayRestHandler.java:52)
at
org.apache.flink.runtime.rest.handler.AbstractHandler.respondAsLeader(AbstractHandler.java:196)
at
org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.lambda$channelRead0$0(LeaderRetrievalHandler.java:83)
at java.util.Optional.ifPresent(Optional.java:159)
at
org.apache.flink.util.OptionalConsumer.ifPresent(OptionalConsumer.java:45)
at
org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.channelRead0(LeaderRetrievalHandler.java:80)
at
org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.channelRead0(LeaderRetrievalHandler.java:49)
at
org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at
org.apache.flink.runtime.rest.handler.router.RouterHandler.routed(RouterHandler.java:115)
at
org.apache.flink.runtime.rest.handler.router.RouterHandler.channelRead0(RouterHandler.java:94)
at
org.apache.flink.runtime.rest.handler.router.RouterHandler.channelRead0(RouterHandler.java:55)
at
org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at
org.apache.flink.shaded.netty4.io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at
org.apache.flink.runtime.rest.FileUploadHandler.channelRead0(FileUploadHandler.java:208)
at
org.apache.flink.runtime.rest.FileUploadHandler.channelRead0(FileUploadHandler.java:69)
at
org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at
org.apache.flink.shaded.netty4.io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436)
at
org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:336)
at
org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:308)
at
org.apache.flink.shaded.netty4.io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at
org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at
org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
at
org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
at
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788)
at
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724)
at
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650)
at
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562)
at
org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
at
org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.table.gateway.api.utils.SqlGatewayException:
Failed to fetchResults.
at
org.apache.flink.table.gateway.service.SqlGatewayServiceImpl.fetchResults(SqlGatewayServiceImpl.java:229)
at
org.apache.flink.table.gateway.rest.handler.statement.FetchResultsHandler.handleRequest(FetchResultsHandler.java:83)
... 48 more
Caused by:
org.apache.flink.table.gateway.service.utils.SqlExecutionException: Failed to
execute the operation d6d8fe6f-b0d0-4040-8420-80c605cf6b34.
at
org.apache.flink.table.gateway.service.operation.OperationManager$Operation.processThrowable(OperationManager.java:398)
at
org.apache.flink.table.gateway.service.operation.OperationManager$Operation.lambda$run$0(OperationManager.java:251)
at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
... 1 more
Caused by:
org.apache.flink.table.gateway.service.utils.SqlExecutionException: Could not
stop job 9bafe0a3408cf8db4ae8b4fc4110322f for operation
d6d8fe6f-b0d0-4040-8420-80c605cf6b34.
at
org.apache.flink.table.gateway.service.operation.OperationExecutor.callStopJobOperation(OperationExecutor.java:520)
at
org.apache.flink.table.gateway.service.operation.OperationExecutor.executeOperation(OperationExecutor.java:332)
at
org.apache.flink.table.gateway.service.operation.OperationExecutor.executeStatement(OperationExecutor.java:181)
at
org.apache.flink.table.gateway.service.SqlGatewayServiceImpl.lambda$executeStatement$1(SqlGatewayServiceImpl.java:212)
at
org.apache.flink.table.gateway.service.operation.OperationManager.lambda$submitOperation$1(OperationManager.java:110)
at
org.apache.flink.table.gateway.service.operation.OperationManager$Operation.lambda$run$0(OperationManager.java:242)
... 7 more
Caused by:
org.apache.flink.table.gateway.service.utils.SqlExecutionException: Failed to
run cluster action.
at
org.apache.flink.table.gateway.service.operation.OperationExecutor.runClusterAction(OperationExecutor.java:600)
at
org.apache.flink.table.gateway.service.operation.OperationExecutor.callStopJobOperation(OperationExecutor.java:487)
... 12 more
Caused by: org.apache.flink.util.FlinkException: Could not stop job
9bafe0a3408cf8db4ae8b4fc4110322f in session
d6d8fe6f-b0d0-4040-8420-80c605cf6b34.
at
org.apache.flink.table.gateway.service.operation.OperationExecutor.lambda$callStopJobOperation$10(OperationExecutor.java:510)
at
org.apache.flink.table.gateway.service.operation.OperationExecutor.runClusterAction(OperationExecutor.java:598)
... 13 more
Caused by: java.util.concurrent.ExecutionException:
org.apache.flink.runtime.rest.util.RestClientException:
[org.apache.flink.runtime.rest.handler.RestHandlerException: Config key
[state.savepoints.dir] is not set. Property [targetDirectory] must be provided.
at
org.apache.flink.runtime.rest.handler.job.savepoints.SavepointHandlers$StopWithSavepointHandler.triggerOperation(SavepointHandlers.java:200)
at
org.apache.flink.runtime.rest.handler.job.savepoints.SavepointHandlers$SavepointHandlerBase.handleRequest(SavepointHandlers.java:149)
at
org.apache.flink.runtime.rest.handler.job.savepoints.SavepointHandlers$StopWithSavepointHandler.handleRequest(SavepointHandlers.java:170)
at
org.apache.flink.runtime.rest.handler.AbstractRestHandler.respondToRequest(AbstractRestHandler.java:83)
at
org.apache.flink.runtime.rest.handler.AbstractHandler.respondAsLeader(AbstractHandler.java:196)
at
org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.lambda$channelRead0$0(LeaderRetrievalHandler.java:83)
at java.util.Optional.ifPresent(Optional.java:159)
at
org.apache.flink.util.OptionalConsumer.ifPresent(OptionalConsumer.java:45)
at
org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.channelRead0(LeaderRetrievalHandler.java:80)
at
org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.channelRead0(LeaderRetrievalHandler.java:49)
at
org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at
org.apache.flink.runtime.rest.handler.router.RouterHandler.routed(RouterHandler.java:115)
at
org.apache.flink.runtime.rest.handler.router.RouterHandler.channelRead0(RouterHandler.java:94)
at
org.apache.flink.runtime.rest.handler.router.RouterHandler.channelRead0(RouterHandler.java:55)
at
org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at
org.apache.flink.shaded.netty4.io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at
org.apache.flink.runtime.rest.FileUploadHandler.channelRead0(FileUploadHandler.java:208)
at
org.apache.flink.runtime.rest.FileUploadHandler.channelRead0(FileUploadHandler.java:69)
at
org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at
org.apache.flink.shaded.netty4.io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436)
at
org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:336)
at
org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:308)
at
org.apache.flink.shaded.netty4.io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at
org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at
org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
at
org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
at
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788)
at
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724)
at
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650)
at
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562)
at
org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
at
org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at java.lang.Thread.run(Thread.java:748)
]
at
java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
at
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
at
org.apache.flink.table.gateway.service.operation.OperationExecutor.lambda$callStopJobOperation$10(OperationExecutor.java:502)
... 14 more
Caused by: org.apache.flink.runtime.rest.util.RestClientException:
[org.apache.flink.runtime.rest.handler.RestHandlerException: Config key
[state.savepoints.dir] is not set. Property [targetDirectory] must be provided.
at
org.apache.flink.runtime.rest.handler.job.savepoints.SavepointHandlers$StopWithSavepointHandler.triggerOperation(SavepointHandlers.java:200)
at
org.apache.flink.runtime.rest.handler.job.savepoints.SavepointHandlers$SavepointHandlerBase.handleRequest(SavepointHandlers.java:149)
at
org.apache.flink.runtime.rest.handler.job.savepoints.SavepointHandlers$StopWithSavepointHandler.handleRequest(SavepointHandlers.java:170)
at
org.apache.flink.runtime.rest.handler.AbstractRestHandler.respondToRequest(AbstractRestHandler.java:83)
at
org.apache.flink.runtime.rest.handler.AbstractHandler.respondAsLeader(AbstractHandler.java:196)
at
org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.lambda$channelRead0$0(LeaderRetrievalHandler.java:83)
at java.util.Optional.ifPresent(Optional.java:159)
at
org.apache.flink.util.OptionalConsumer.ifPresent(OptionalConsumer.java:45)
at
org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.channelRead0(LeaderRetrievalHandler.java:80)
at
org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.channelRead0(LeaderRetrievalHandler.java:49)
at
org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at
org.apache.flink.runtime.rest.handler.router.RouterHandler.routed(RouterHandler.java:115)
at
org.apache.flink.runtime.rest.handler.router.RouterHandler.channelRead0(RouterHandler.java:94)
at
org.apache.flink.runtime.rest.handler.router.RouterHandler.channelRead0(RouterHandler.java:55)
at
org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at
org.apache.flink.shaded.netty4.io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at
org.apache.flink.runtime.rest.FileUploadHandler.channelRead0(FileUploadHandler.java:208)
at
org.apache.flink.runtime.rest.FileUploadHandler.channelRead0(FileUploadHandler.java:69)
at
org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at
org.apache.flink.shaded.netty4.io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436)
at
org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:336)
at
org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:308)
at
org.apache.flink.shaded.netty4.io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at
org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at
org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
at
org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
at
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788)
at
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724)
at
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650)
at
org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562)
at
org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
at
org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at java.lang.Thread.run(Thread.java:748)
]
at
org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:536)
at
org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$3(RestClient.java:516)
at
java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:952)
at
java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926)
at
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
... 3 more
End of exception on server side>]
at
org.apache.flink.runtime.rest.RestClient.parseResponse(RestClient.java:536)
at
org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$3(RestClient.java:516)
at
java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:952)
at
java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926)
at
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
```
##########
flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/ExecutorImplITCase.java:
##########
@@ -414,50 +462,130 @@ void testStopJob() throws Exception {
final String insert = "INSERT INTO snk SELECT a FROM src;";
try {
- executor.executeOperation(executor.parseStatement(srcDdl));
- executor.executeOperation(executor.parseStatement(snkDdl));
- TableResult result =
executor.executeOperation(executor.parseStatement(insert));
- JobClient jobClient = result.getJobClient().get();
- JobID jobId = jobClient.getJobID();
+ executor.configureSession(srcDdl);
+ executor.configureSession(snkDdl);
+ ClientResult result = executor.executeStatement(insert);
+ JobID jobID = result.getJobId();
// wait till the job turns into running status or the test times
out
- JobStatus jobStatus;
- do {
- Thread.sleep(2_000L);
- jobStatus = jobClient.getJobStatus().get();
- } while (jobStatus != JobStatus.RUNNING);
-
- Optional<String> savepoint = executor.stopJob(jobId.toString(),
true, true);
- assertThat(savepoint).isPresent();
+ TestUtils.waitUntilAllTasksAreRunning(clusterClient, jobID);
+ StringData savepointPath =
+ CollectionUtil.iteratorToList(
+ executor.executeStatement(
+ String.format("STOP JOB '%s' WITH
SAVEPOINT", jobID)))
+ .get(0)
+ .getString(0);
+ assertThat(
+ Files.exists(
+ Paths.get(
+ URI.create(
+
Preconditions.checkNotNull(savepointPath)
+ .toString()))))
+ .isTrue();
Review Comment:
And `stop job` seems not to work

--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]