Hi Thoms,

Very thanks for reporting the exceptions, and it seems to be not work as 
expected to me... 
Could you also show us the dag of the job ? And does some operators in the 
source task
use multiple-threads to emit records?

Best,
Yun



 ------------------Original Mail ------------------
Sender:Thomas Wang <w...@datability.io>
Send Date:Sun Jun 6 04:02:20 2021
Recipients:Yun Gao <yungao...@aliyun.com>
CC:user <user@flink.apache.org>
Subject:Re: Re: Failed to cancel a job using the STOP rest API

One thing I noticed is that if I set drain = true, the job could be stopped 
correctly. Maybe that's because I'm using a Parquet file sink which is a 
bulk-encoded format and only writes to disk during checkpoints?

Thomas
On Sat, Jun 5, 2021 at 10:06 AM Thomas Wang <w...@datability.io> wrote:

Hi Yun,

Thanks for the tips. Yes, I do see some exceptions as copied below. I'm not 
quite sure what they mean though. Any hints?

Thanks.

Thomas

```
2021-06-05 10:02:51
java.util.concurrent.ExecutionException: 
org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: 
Could not forward element to next operator
    at 
java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
    at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1928)
    at 
org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.quiesceTimeServiceAndCloseOperator(StreamOperatorWrapper.java:161)
    at 
org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:130)
    at 
org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:134)
    at 
org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:80)
    at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.closeOperators(OperatorChain.java:302)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.afterInvoke(StreamTask.java:576)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:544)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
    at java.lang.Thread.run(Thread.java:748)
Caused by: 
org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: 
Could not forward element to next operator
    at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.emitWatermark(OperatorChain.java:642)
    at 
org.apache.flink.streaming.api.operators.CountingOutput.emitWatermark(CountingOutput.java:41)
    at 
org.apache.flink.streaming.runtime.operators.TimestampsAndWatermarksOperator$WatermarkEmitter.emitWatermark(TimestampsAndWatermarksOperator.java:165)
    at 
org.apache.flink.api.common.eventtime.BoundedOutOfOrdernessWatermarks.onPeriodicEmit(BoundedOutOfOrdernessWatermarks.java:69)
    at 
org.apache.flink.streaming.runtime.operators.TimestampsAndWatermarksOperator.close(TimestampsAndWatermarksOperator.java:125)
    at 
org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.lambda$closeOperator$5(StreamOperatorWrapper.java:205)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
    at 
org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.closeOperator(StreamOperatorWrapper.java:203)
    at 
org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.lambda$deferCloseOperatorToMailbox$3(StreamOperatorWrapper.java:177)
    at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
    at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:78)
    at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxExecutorImpl.tryYield(MailboxExecutorImpl.java:90)
    at 
org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.quiesceTimeServiceAndCloseOperator(StreamOperatorWrapper.java:155)
    ... 9 more
Caused by: java.lang.RuntimeException
    at 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.emitWatermark(RecordWriterOutput.java:123)
    at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.emitWatermark(OperatorChain.java:762)
    at 
org.apache.flink.streaming.api.operators.CountingOutput.emitWatermark(CountingOutput.java:41)
    at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:570)
    at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.emitWatermark(OperatorChain.java:638)
    ... 21 more
Caused by: java.lang.IllegalStateException
    at org.apache.flink.util.Preconditions.checkState(Preconditions.java:179)
    at 
org.apache.flink.runtime.io.network.buffer.BufferBuilder.append(BufferBuilder.java:83)
    at 
org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.copyToBufferBuilder(SpanningRecordSerializer.java:90)
    at 
org.apache.flink.runtime.io.network.api.writer.RecordWriter.copyFromSerializerToTargetChannel(RecordWriter.java:136)
    at 
org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.broadcastEmit(ChannelSelectorRecordWriter.java:80)
    at 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.emitWatermark(RecordWriterOutput.java:121)
    ... 25 more
```
On Sat, Jun 5, 2021 at 6:24 AM Yun Gao <yungao...@aliyun.com> wrote:

Hi Thomas,

For querying the savepoint status, a get request could be issued to 
  /jobs/:jobid/savepoints/:savepointtriggerid [1] to get the status and position
of the savepoint. But if the job is running with some kind of per-job mode and
JobMaster is gone after the stop-with-savepoint, the request might not be
available. 

For the kafka source, have you ever found some exception or some messages in 
the 
TaskManager's log when it could not be stopped ?

Best,
Yun

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/rest_api/#jobs-jobid-savepoints-triggerid




 ------------------Original Mail ------------------
Sender:Thomas Wang <w...@datability.io>
Send Date:Sat Jun 5 00:47:47 2021
Recipients:Yun Gao <yungao...@aliyun.com>
CC:user <user@flink.apache.org>
Subject:Re: Failed to cancel a job using the STOP rest API

Hi Yun,

Thanks for your reply. We are not using any legacy source. For this specific 
job, there is only one source that is using FlinkKafkaConsumer which I assume 
has the correct cancel() method implemented.

Also could you suggest how I could use the "request-id" to get the savepoint 
location?

Thanks.

Thomas
On Fri, Jun 4, 2021 at 2:31 AM Yun Gao <yungao...@aliyun.com> wrote:

Hi Thomas,

I think you are right that the CLI is also using the same rest API underlying, 
and since
the response of the rest API is ok and the savepoint is triggered successfully, 
I reckon
that it might not be due to rest API process, and we might still first focus on 
the 
stop-with-savepoint process.

Currently stop-with-savepoint would first do a savepoint, then cancel all the 
sources to 
stop the job. Thus are the sources all legacy source (namely the one using 
SourceFunction) ? 
and does the source implement the cancel() method correctly ?

Best,
Yun


------------------------------------------------------------------
From:Thomas Wang <w...@datability.io>
Send Time:2021 Jun. 4 (Fri.) 12:55
To:user <user@flink.apache.org>
Subject:Failed to cancel a job using the STOP rest API

Hi, Flink community,

I'm trying to use the STOP rest API to cancel a job. So far, I'm seeing some 
inconsistent results. Sometimes, jobs could be cancelled successfully while 
other times, they couldn't. Either way, the POST request is accepted with a 
status code 202 and a "request-id".

From the Flink UI, I can see the savepoint being completed successfully. 
However the job is still in running state afterwards. The CLI command `flink 
stop <JOB ID>` is working ok. I can use the CLI to stop the job and get the 
resulting savepoint location. If I understand this correctly, the CLI should be 
using the same REST API behind the scenes, isn't it?

Here is my POST request URL: 
`http://<HIDDEN>.ec2.internal:33557/jobs/5dce58f02d1f5e739ab12f88b2f5f031/stop`.

Here is the BODY of the request: 
`{"drain":false,"targetDirectory":"s3://<BUCKET-NAME>/flink-savepoint"}`.

I'm using Flink 1.11.2 Commit ID: DeadD0d0.

Any suggestions on how I can debug this?

Another question is, given the response "request-id", which endpoint should I 
query to get the status of the request? Most importantly, where can I get the 
expected savepoint location?

Thanks.

Thomas

Reply via email to