This is actually a very simple job that reads from Kafka and writes to S3
using the StreamingFileSink w/ Parquet format. I'm all using Flink's API
and nothing custom.


On Sun, Jun 6, 2021 at 6:43 PM Yun Gao <> wrote:

> Hi Thoms,
> Very thanks for reporting the exceptions, and it seems to be not work as
> expected to me...
> Could you also show us the dag of the job ? And does some operators in the
> source task
> use multiple-threads to emit records?
> Best,
> Yun
> ------------------Original Mail ------------------
> *Sender:*Thomas Wang <>
> *Send Date:*Sun Jun 6 04:02:20 2021
> *Recipients:*Yun Gao <>
> *CC:*user <>
> *Subject:*Re: Re: Failed to cancel a job using the STOP rest API
>> One thing I noticed is that if I set drain = true, the job could be
>> stopped correctly. Maybe that's because I'm using a Parquet file sink which
>> is a bulk-encoded format and only writes to disk during checkpoints?
>> Thomas
>> On Sat, Jun 5, 2021 at 10:06 AM Thomas Wang <> wrote:
>>> Hi Yun,
>>> Thanks for the tips. Yes, I do see some exceptions as copied below. I'm
>>> not quite sure what they mean though. Any hints?
>>> Thanks.
>>> Thomas
>>> ```
>>> 2021-06-05 10:02:51
>>> java.util.concurrent.ExecutionException:
>>> org.apache.flink.streaming.runtime.tasks.
>>> ExceptionInChainedOperatorException: Could not forward element to next
>>> operator
>>>     at java.util.concurrent.CompletableFuture.reportGet(
>>>     at java.util.concurrent.CompletableFuture.get(CompletableFuture
>>> .java:1928)
>>>     at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper
>>> .quiesceTimeServiceAndCloseOperator(
>>>     at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper
>>> .close(
>>>     at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper
>>> .close(
>>>     at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper
>>> .close(
>>>     at org.apache.flink.streaming.runtime.tasks.OperatorChain
>>> .closeOperators(
>>>     at org.apache.flink.streaming.runtime.tasks.StreamTask.afterInvoke(
>>>     at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(
>>>     at org.apache.flink.runtime.taskmanager.Task.doRun(
>>>     at
>>>     at
>>> Caused by: org.apache.flink.streaming.runtime.tasks.
>>> ExceptionInChainedOperatorException: Could not forward element to next
>>> operator
>>>     at org.apache.flink.streaming.runtime.tasks.
>>> OperatorChain$ChainingOutput.emitWatermark(
>>>     at org.apache.flink.streaming.api.operators.CountingOutput
>>> .emitWatermark(
>>>     at org.apache.flink.streaming.runtime.operators.
>>> TimestampsAndWatermarksOperator$WatermarkEmitter.emitWatermark(
>>>     at org.apache.flink.api.common.eventtime.
>>> BoundedOutOfOrdernessWatermarks.onPeriodicEmit(
>>>     at org.apache.flink.streaming.runtime.operators.
>>> TimestampsAndWatermarksOperator.close(TimestampsAndWatermarksOperator
>>> .java:125)
>>>     at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper
>>> .lambda$closeOperator$5(
>>>     at org.apache.flink.streaming.runtime.tasks.
>>> StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor
>>> .runThrowing(
>>>     at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper
>>> .closeOperator(
>>>     at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper
>>> .lambda$deferCloseOperatorToMailbox$3(
>>>     at org.apache.flink.streaming.runtime.tasks.
>>> StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor
>>> .runThrowing(
>>>     at
>>> .java:78)
>>>     at org.apache.flink.streaming.runtime.tasks.mailbox.
>>> MailboxExecutorImpl.tryYield(
>>>     at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper
>>> .quiesceTimeServiceAndCloseOperator(
>>>     ... 9 more
>>> Caused by: java.lang.RuntimeException
>>>     at
>>> .emitWatermark(
>>>     at org.apache.flink.streaming.runtime.tasks.
>>> OperatorChain$BroadcastingOutputCollector.emitWatermark(OperatorChain
>>> .java:762)
>>>     at org.apache.flink.streaming.api.operators.CountingOutput
>>> .emitWatermark(
>>>     at org.apache.flink.streaming.api.operators.AbstractStreamOperator
>>> .processWatermark(
>>>     at org.apache.flink.streaming.runtime.tasks.
>>> OperatorChain$ChainingOutput.emitWatermark(
>>>     ... 21 more
>>> Caused by: java.lang.IllegalStateException
>>>     at org.apache.flink.util.Preconditions.checkState(Preconditions
>>> .java:179)
>>>     at
>>>     at
>>> SpanningRecordSerializer.copyToBufferBuilder(SpanningRecordSerializer
>>> .java:90)
>>>     at
>>> .copyFromSerializerToTargetChannel(
>>>     at
>>> ChannelSelectorRecordWriter.broadcastEmit(ChannelSelectorRecordWriter
>>> .java:80)
>>>     at
>>> .emitWatermark(
>>>     ... 25 more
>>> ```
>>> On Sat, Jun 5, 2021 at 6:24 AM Yun Gao <> wrote:
>>>> Hi Thomas,
>>>> For querying the savepoint status, a get request could be issued to
>>>>  /jobs/:jobid/savepoints/:savepointtriggerid [1] to get the status and
>>>> position
>>>> of the savepoint. But if the job is running with some kind of per-job
>>>> mode and
>>>> JobMaster is gone after the stop-with-savepoint, the request might not
>>>> be
>>>> available.
>>>> For the kafka source, have you ever found some exception or some
>>>> messages in the
>>>> TaskManager's log when it could not be stopped ?
>>>> Best,
>>>> Yun
>>>> [1]
>>>> ------------------Original Mail ------------------
>>>> *Sender:*Thomas Wang <>
>>>> *Send Date:*Sat Jun 5 00:47:47 2021
>>>> *Recipients:*Yun Gao <>
>>>> *CC:*user <>
>>>> *Subject:*Re: Failed to cancel a job using the STOP rest API
>>>>> Hi Yun,
>>>>> Thanks for your reply. We are not using any legacy source. For this
>>>>> specific job, there is only one source that is using FlinkKafkaConsumer
>>>>> which I assume has the correct cancel() method implemented.
>>>>> Also could you suggest how I could use the "request-id" to get the
>>>>> savepoint location?
>>>>> Thanks.
>>>>> Thomas
>>>>> On Fri, Jun 4, 2021 at 2:31 AM Yun Gao <> wrote:
>>>>>> Hi Thomas,
>>>>>> I think you are right that the CLI is also using the same rest API
>>>>>> underlying, and since
>>>>>> the response of the rest API is ok and the savepoint is triggered
>>>>>> successfully, I reckon
>>>>>> that it might not be due to rest API process, and we might still
>>>>>> first focus on the
>>>>>> stop-with-savepoint process.
>>>>>> Currently stop-with-savepoint would first do a savepoint, then cancel
>>>>>> all the sources to
>>>>>> stop the job. Thus are the sources all legacy source (namely the one
>>>>>> using SourceFunction) ?
>>>>>> and does the source implement the cancel() method correctly ?
>>>>>> Best,
>>>>>> Yun
>>>>>> ------------------------------------------------------------------
>>>>>> From:Thomas Wang <>
>>>>>> Send Time:2021 Jun. 4 (Fri.) 12:55
>>>>>> To:user <>
>>>>>> Subject:Failed to cancel a job using the STOP rest API
>>>>>> Hi, Flink community,
>>>>>> I'm trying to use the STOP rest API to cancel a job. So far, I'm
>>>>>> seeing some inconsistent results. Sometimes, jobs could be cancelled
>>>>>> successfully while other times, they couldn't. Either way, the POST 
>>>>>> request
>>>>>> is accepted with a status code 202 and a "request-id".
>>>>>> From the Flink UI, I can see the savepoint being completed
>>>>>> successfully. However the job is still in running state afterwards. The 
>>>>>> CLI
>>>>>> command `flink stop <JOB ID>` is working ok. I can use the CLI to stop 
>>>>>> the
>>>>>> job and get the resulting savepoint location. If I understand
>>>>>> this correctly, the CLI should be using the same REST API behind the
>>>>>> scenes, isn't it?
>>>>>> Here is my POST request URL: `http://
>>>>>> <HIDDEN>.ec2.internal:33557/jobs/5dce58f02d1f5e739ab12f88b2f5f031/stop`.
>>>>>> Here is the BODY of the request:
>>>>>> `{"drain":false,"targetDirectory":"s3://<BUCKET-NAME>/flink-savepoint"}`.
>>>>>> I'm using Flink 1.11.2 Commit ID: DeadD0d0.
>>>>>> Any suggestions on how I can debug this?
>>>>>> Another question is, given the response "request-id", which endpoint
>>>>>> should I query to get the status of the request? Most importantly, where
>>>>>> can I get the expected savepoint location?
>>>>>> Thanks.
>>>>>> Thomas

Reply via email to