This is actually a very simple job that reads from Kafka and writes to S3
using the StreamingFileSink w/ Parquet format. I'm all using Flink's API
and nothing custom.

Thomas

On Sun, Jun 6, 2021 at 6:43 PM Yun Gao <yungao...@aliyun.com> wrote:

> Hi Thoms,
>
> Very thanks for reporting the exceptions, and it seems to be not work as
> expected to me...
> Could you also show us the dag of the job ? And does some operators in the
> source task
> use multiple-threads to emit records?
>
> Best,
> Yun
>
>
> ------------------Original Mail ------------------
> *Sender:*Thomas Wang <w...@datability.io>
> *Send Date:*Sun Jun 6 04:02:20 2021
> *Recipients:*Yun Gao <yungao...@aliyun.com>
> *CC:*user <user@flink.apache.org>
> *Subject:*Re: Re: Failed to cancel a job using the STOP rest API
>
>> One thing I noticed is that if I set drain = true, the job could be
>> stopped correctly. Maybe that's because I'm using a Parquet file sink which
>> is a bulk-encoded format and only writes to disk during checkpoints?
>>
>> Thomas
>>
>> On Sat, Jun 5, 2021 at 10:06 AM Thomas Wang <w...@datability.io> wrote:
>>
>>> Hi Yun,
>>>
>>> Thanks for the tips. Yes, I do see some exceptions as copied below. I'm
>>> not quite sure what they mean though. Any hints?
>>>
>>> Thanks.
>>>
>>> Thomas
>>>
>>> ```
>>> 2021-06-05 10:02:51
>>> java.util.concurrent.ExecutionException:
>>> org.apache.flink.streaming.runtime.tasks.
>>> ExceptionInChainedOperatorException: Could not forward element to next
>>> operator
>>>     at java.util.concurrent.CompletableFuture.reportGet(
>>> CompletableFuture.java:357)
>>>     at java.util.concurrent.CompletableFuture.get(CompletableFuture
>>> .java:1928)
>>>     at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper
>>> .quiesceTimeServiceAndCloseOperator(StreamOperatorWrapper.java:161)
>>>     at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper
>>> .close(StreamOperatorWrapper.java:130)
>>>     at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper
>>> .close(StreamOperatorWrapper.java:134)
>>>     at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper
>>> .close(StreamOperatorWrapper.java:80)
>>>     at org.apache.flink.streaming.runtime.tasks.OperatorChain
>>> .closeOperators(OperatorChain.java:302)
>>>     at org.apache.flink.streaming.runtime.tasks.StreamTask.afterInvoke(
>>> StreamTask.java:576)
>>>     at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(
>>> StreamTask.java:544)
>>>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
>>>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
>>>     at java.lang.Thread.run(Thread.java:748)
>>> Caused by: org.apache.flink.streaming.runtime.tasks.
>>> ExceptionInChainedOperatorException: Could not forward element to next
>>> operator
>>>     at org.apache.flink.streaming.runtime.tasks.
>>> OperatorChain$ChainingOutput.emitWatermark(OperatorChain.java:642)
>>>     at org.apache.flink.streaming.api.operators.CountingOutput
>>> .emitWatermark(CountingOutput.java:41)
>>>     at org.apache.flink.streaming.runtime.operators.
>>> TimestampsAndWatermarksOperator$WatermarkEmitter.emitWatermark(
>>> TimestampsAndWatermarksOperator.java:165)
>>>     at org.apache.flink.api.common.eventtime.
>>> BoundedOutOfOrdernessWatermarks.onPeriodicEmit(
>>> BoundedOutOfOrdernessWatermarks.java:69)
>>>     at org.apache.flink.streaming.runtime.operators.
>>> TimestampsAndWatermarksOperator.close(TimestampsAndWatermarksOperator
>>> .java:125)
>>>     at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper
>>> .lambda$closeOperator$5(StreamOperatorWrapper.java:205)
>>>     at org.apache.flink.streaming.runtime.tasks.
>>> StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor
>>> .runThrowing(StreamTaskActionExecutor.java:92)
>>>     at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper
>>> .closeOperator(StreamOperatorWrapper.java:203)
>>>     at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper
>>> .lambda$deferCloseOperatorToMailbox$3(StreamOperatorWrapper.java:177)
>>>     at org.apache.flink.streaming.runtime.tasks.
>>> StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor
>>> .runThrowing(StreamTaskActionExecutor.java:92)
>>>     at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail
>>> .java:78)
>>>     at org.apache.flink.streaming.runtime.tasks.mailbox.
>>> MailboxExecutorImpl.tryYield(MailboxExecutorImpl.java:90)
>>>     at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper
>>> .quiesceTimeServiceAndCloseOperator(StreamOperatorWrapper.java:155)
>>>     ... 9 more
>>> Caused by: java.lang.RuntimeException
>>>     at org.apache.flink.streaming.runtime.io.RecordWriterOutput
>>> .emitWatermark(RecordWriterOutput.java:123)
>>>     at org.apache.flink.streaming.runtime.tasks.
>>> OperatorChain$BroadcastingOutputCollector.emitWatermark(OperatorChain
>>> .java:762)
>>>     at org.apache.flink.streaming.api.operators.CountingOutput
>>> .emitWatermark(CountingOutput.java:41)
>>>     at org.apache.flink.streaming.api.operators.AbstractStreamOperator
>>> .processWatermark(AbstractStreamOperator.java:570)
>>>     at org.apache.flink.streaming.runtime.tasks.
>>> OperatorChain$ChainingOutput.emitWatermark(OperatorChain.java:638)
>>>     ... 21 more
>>> Caused by: java.lang.IllegalStateException
>>>     at org.apache.flink.util.Preconditions.checkState(Preconditions
>>> .java:179)
>>>     at org.apache.flink.runtime.io.network.buffer.BufferBuilder.append(
>>> BufferBuilder.java:83)
>>>     at org.apache.flink.runtime.io.network.api.serialization.
>>> SpanningRecordSerializer.copyToBufferBuilder(SpanningRecordSerializer
>>> .java:90)
>>>     at org.apache.flink.runtime.io.network.api.writer.RecordWriter
>>> .copyFromSerializerToTargetChannel(RecordWriter.java:136)
>>>     at org.apache.flink.runtime.io.network.api.writer.
>>> ChannelSelectorRecordWriter.broadcastEmit(ChannelSelectorRecordWriter
>>> .java:80)
>>>     at org.apache.flink.streaming.runtime.io.RecordWriterOutput
>>> .emitWatermark(RecordWriterOutput.java:121)
>>>     ... 25 more
>>> ```
>>>
>>> On Sat, Jun 5, 2021 at 6:24 AM Yun Gao <yungao...@aliyun.com> wrote:
>>>
>>>> Hi Thomas,
>>>>
>>>> For querying the savepoint status, a get request could be issued to
>>>>  /jobs/:jobid/savepoints/:savepointtriggerid [1] to get the status and
>>>> position
>>>> of the savepoint. But if the job is running with some kind of per-job
>>>> mode and
>>>> JobMaster is gone after the stop-with-savepoint, the request might not
>>>> be
>>>> available.
>>>>
>>>> For the kafka source, have you ever found some exception or some
>>>> messages in the
>>>> TaskManager's log when it could not be stopped ?
>>>>
>>>> Best,
>>>> Yun
>>>>
>>>> [1]
>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/rest_api/#jobs-jobid-savepoints-triggerid
>>>>
>>>>
>>>>
>>>> ------------------Original Mail ------------------
>>>> *Sender:*Thomas Wang <w...@datability.io>
>>>> *Send Date:*Sat Jun 5 00:47:47 2021
>>>> *Recipients:*Yun Gao <yungao...@aliyun.com>
>>>> *CC:*user <user@flink.apache.org>
>>>> *Subject:*Re: Failed to cancel a job using the STOP rest API
>>>>
>>>>> Hi Yun,
>>>>>
>>>>> Thanks for your reply. We are not using any legacy source. For this
>>>>> specific job, there is only one source that is using FlinkKafkaConsumer
>>>>> which I assume has the correct cancel() method implemented.
>>>>>
>>>>> Also could you suggest how I could use the "request-id" to get the
>>>>> savepoint location?
>>>>>
>>>>> Thanks.
>>>>>
>>>>> Thomas
>>>>>
>>>>> On Fri, Jun 4, 2021 at 2:31 AM Yun Gao <yungao...@aliyun.com> wrote:
>>>>>
>>>>>> Hi Thomas,
>>>>>>
>>>>>> I think you are right that the CLI is also using the same rest API
>>>>>> underlying, and since
>>>>>> the response of the rest API is ok and the savepoint is triggered
>>>>>> successfully, I reckon
>>>>>> that it might not be due to rest API process, and we might still
>>>>>> first focus on the
>>>>>> stop-with-savepoint process.
>>>>>>
>>>>>> Currently stop-with-savepoint would first do a savepoint, then cancel
>>>>>> all the sources to
>>>>>> stop the job. Thus are the sources all legacy source (namely the one
>>>>>> using SourceFunction) ?
>>>>>> and does the source implement the cancel() method correctly ?
>>>>>>
>>>>>> Best,
>>>>>> Yun
>>>>>>
>>>>>> ------------------------------------------------------------------
>>>>>> From:Thomas Wang <w...@datability.io>
>>>>>> Send Time:2021 Jun. 4 (Fri.) 12:55
>>>>>> To:user <user@flink.apache.org>
>>>>>> Subject:Failed to cancel a job using the STOP rest API
>>>>>>
>>>>>> Hi, Flink community,
>>>>>>
>>>>>> I'm trying to use the STOP rest API to cancel a job. So far, I'm
>>>>>> seeing some inconsistent results. Sometimes, jobs could be cancelled
>>>>>> successfully while other times, they couldn't. Either way, the POST 
>>>>>> request
>>>>>> is accepted with a status code 202 and a "request-id".
>>>>>>
>>>>>> From the Flink UI, I can see the savepoint being completed
>>>>>> successfully. However the job is still in running state afterwards. The 
>>>>>> CLI
>>>>>> command `flink stop <JOB ID>` is working ok. I can use the CLI to stop 
>>>>>> the
>>>>>> job and get the resulting savepoint location. If I understand
>>>>>> this correctly, the CLI should be using the same REST API behind the
>>>>>> scenes, isn't it?
>>>>>>
>>>>>> Here is my POST request URL: `http://
>>>>>> <HIDDEN>.ec2.internal:33557/jobs/5dce58f02d1f5e739ab12f88b2f5f031/stop`.
>>>>>>
>>>>>> Here is the BODY of the request:
>>>>>> `{"drain":false,"targetDirectory":"s3://<BUCKET-NAME>/flink-savepoint"}`.
>>>>>>
>>>>>> I'm using Flink 1.11.2 Commit ID: DeadD0d0.
>>>>>>
>>>>>> Any suggestions on how I can debug this?
>>>>>>
>>>>>> Another question is, given the response "request-id", which endpoint
>>>>>> should I query to get the status of the request? Most importantly, where
>>>>>> can I get the expected savepoint location?
>>>>>>
>>>>>> Thanks.
>>>>>>
>>>>>> Thomas
>>>>>>
>>>>>>
>>>>>>

Reply via email to