One thing I noticed is that if I set drain = true, the job could be stopped
correctly. Maybe that's because I'm using a Parquet file sink which is a
bulk-encoded format and only writes to disk during checkpoints?

Thomas

On Sat, Jun 5, 2021 at 10:06 AM Thomas Wang <w...@datability.io> wrote:

> Hi Yun,
>
> Thanks for the tips. Yes, I do see some exceptions as copied below. I'm
> not quite sure what they mean though. Any hints?
>
> Thanks.
>
> Thomas
>
> ```
> 2021-06-05 10:02:51
> java.util.concurrent.ExecutionException:
> org.apache.flink.streaming.runtime.tasks.
> ExceptionInChainedOperatorException: Could not forward element to next
> operator
>     at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture
> .java:357)
>     at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:
> 1928)
>     at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper
> .quiesceTimeServiceAndCloseOperator(StreamOperatorWrapper.java:161)
>     at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper
> .close(StreamOperatorWrapper.java:130)
>     at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper
> .close(StreamOperatorWrapper.java:134)
>     at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper
> .close(StreamOperatorWrapper.java:80)
>     at org.apache.flink.streaming.runtime.tasks.OperatorChain
> .closeOperators(OperatorChain.java:302)
>     at org.apache.flink.streaming.runtime.tasks.StreamTask.afterInvoke(
> StreamTask.java:576)
>     at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(
> StreamTask.java:544)
>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
>     at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.streaming.runtime.tasks.
> ExceptionInChainedOperatorException: Could not forward element to next
> operator
>     at org.apache.flink.streaming.runtime.tasks.
> OperatorChain$ChainingOutput.emitWatermark(OperatorChain.java:642)
>     at org.apache.flink.streaming.api.operators.CountingOutput
> .emitWatermark(CountingOutput.java:41)
>     at org.apache.flink.streaming.runtime.operators.
> TimestampsAndWatermarksOperator$WatermarkEmitter.emitWatermark(
> TimestampsAndWatermarksOperator.java:165)
>     at org.apache.flink.api.common.eventtime.
> BoundedOutOfOrdernessWatermarks.onPeriodicEmit(
> BoundedOutOfOrdernessWatermarks.java:69)
>     at org.apache.flink.streaming.runtime.operators.
> TimestampsAndWatermarksOperator.close(TimestampsAndWatermarksOperator
> .java:125)
>     at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper
> .lambda$closeOperator$5(StreamOperatorWrapper.java:205)
>     at org.apache.flink.streaming.runtime.tasks.
> StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(
> StreamTaskActionExecutor.java:92)
>     at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper
> .closeOperator(StreamOperatorWrapper.java:203)
>     at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper
> .lambda$deferCloseOperatorToMailbox$3(StreamOperatorWrapper.java:177)
>     at org.apache.flink.streaming.runtime.tasks.
> StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(
> StreamTaskActionExecutor.java:92)
>     at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail
> .java:78)
>     at org.apache.flink.streaming.runtime.tasks.mailbox.
> MailboxExecutorImpl.tryYield(MailboxExecutorImpl.java:90)
>     at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper
> .quiesceTimeServiceAndCloseOperator(StreamOperatorWrapper.java:155)
>     ... 9 more
> Caused by: java.lang.RuntimeException
>     at org.apache.flink.streaming.runtime.io.RecordWriterOutput
> .emitWatermark(RecordWriterOutput.java:123)
>     at org.apache.flink.streaming.runtime.tasks.
> OperatorChain$BroadcastingOutputCollector.emitWatermark(OperatorChain
> .java:762)
>     at org.apache.flink.streaming.api.operators.CountingOutput
> .emitWatermark(CountingOutput.java:41)
>     at org.apache.flink.streaming.api.operators.AbstractStreamOperator
> .processWatermark(AbstractStreamOperator.java:570)
>     at org.apache.flink.streaming.runtime.tasks.
> OperatorChain$ChainingOutput.emitWatermark(OperatorChain.java:638)
>     ... 21 more
> Caused by: java.lang.IllegalStateException
>     at org.apache.flink.util.Preconditions.checkState(Preconditions.java:
> 179)
>     at org.apache.flink.runtime.io.network.buffer.BufferBuilder.append(
> BufferBuilder.java:83)
>     at org.apache.flink.runtime.io.network.api.serialization.
> SpanningRecordSerializer.copyToBufferBuilder(SpanningRecordSerializer
> .java:90)
>     at org.apache.flink.runtime.io.network.api.writer.RecordWriter
> .copyFromSerializerToTargetChannel(RecordWriter.java:136)
>     at org.apache.flink.runtime.io.network.api.writer.
> ChannelSelectorRecordWriter.broadcastEmit(ChannelSelectorRecordWriter
> .java:80)
>     at org.apache.flink.streaming.runtime.io.RecordWriterOutput
> .emitWatermark(RecordWriterOutput.java:121)
>     ... 25 more
> ```
>
> On Sat, Jun 5, 2021 at 6:24 AM Yun Gao <yungao...@aliyun.com> wrote:
>
>> Hi Thomas,
>>
>> For querying the savepoint status, a get request could be issued to
>>  /jobs/:jobid/savepoints/:savepointtriggerid [1] to get the status and
>> position
>> of the savepoint. But if the job is running with some kind of per-job
>> mode and
>> JobMaster is gone after the stop-with-savepoint, the request might not be
>> available.
>>
>> For the kafka source, have you ever found some exception or some
>> messages in the
>> TaskManager's log when it could not be stopped ?
>>
>> Best,
>> Yun
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/rest_api/#jobs-jobid-savepoints-triggerid
>>
>>
>>
>> ------------------Original Mail ------------------
>> *Sender:*Thomas Wang <w...@datability.io>
>> *Send Date:*Sat Jun 5 00:47:47 2021
>> *Recipients:*Yun Gao <yungao...@aliyun.com>
>> *CC:*user <user@flink.apache.org>
>> *Subject:*Re: Failed to cancel a job using the STOP rest API
>>
>>> Hi Yun,
>>>
>>> Thanks for your reply. We are not using any legacy source. For this
>>> specific job, there is only one source that is using FlinkKafkaConsumer
>>> which I assume has the correct cancel() method implemented.
>>>
>>> Also could you suggest how I could use the "request-id" to get the
>>> savepoint location?
>>>
>>> Thanks.
>>>
>>> Thomas
>>>
>>> On Fri, Jun 4, 2021 at 2:31 AM Yun Gao <yungao...@aliyun.com> wrote:
>>>
>>>> Hi Thomas,
>>>>
>>>> I think you are right that the CLI is also using the same rest API
>>>> underlying, and since
>>>> the response of the rest API is ok and the savepoint is triggered
>>>> successfully, I reckon
>>>> that it might not be due to rest API process, and we might still first
>>>> focus on the
>>>> stop-with-savepoint process.
>>>>
>>>> Currently stop-with-savepoint would first do a savepoint, then cancel
>>>> all the sources to
>>>> stop the job. Thus are the sources all legacy source (namely the one
>>>> using SourceFunction) ?
>>>> and does the source implement the cancel() method correctly ?
>>>>
>>>> Best,
>>>> Yun
>>>>
>>>> ------------------------------------------------------------------
>>>> From:Thomas Wang <w...@datability.io>
>>>> Send Time:2021 Jun. 4 (Fri.) 12:55
>>>> To:user <user@flink.apache.org>
>>>> Subject:Failed to cancel a job using the STOP rest API
>>>>
>>>> Hi, Flink community,
>>>>
>>>> I'm trying to use the STOP rest API to cancel a job. So far, I'm seeing
>>>> some inconsistent results. Sometimes, jobs could be cancelled successfully
>>>> while other times, they couldn't. Either way, the POST request is accepted
>>>> with a status code 202 and a "request-id".
>>>>
>>>> From the Flink UI, I can see the savepoint being completed
>>>> successfully. However the job is still in running state afterwards. The CLI
>>>> command `flink stop <JOB ID>` is working ok. I can use the CLI to stop the
>>>> job and get the resulting savepoint location. If I understand
>>>> this correctly, the CLI should be using the same REST API behind the
>>>> scenes, isn't it?
>>>>
>>>> Here is my POST request URL: `http://
>>>> <HIDDEN>.ec2.internal:33557/jobs/5dce58f02d1f5e739ab12f88b2f5f031/stop`.
>>>>
>>>> Here is the BODY of the request:
>>>> `{"drain":false,"targetDirectory":"s3://<BUCKET-NAME>/flink-savepoint"}`.
>>>>
>>>> I'm using Flink 1.11.2 Commit ID: DeadD0d0.
>>>>
>>>> Any suggestions on how I can debug this?
>>>>
>>>> Another question is, given the response "request-id", which endpoint
>>>> should I query to get the status of the request? Most importantly, where
>>>> can I get the expected savepoint location?
>>>>
>>>> Thanks.
>>>>
>>>> Thomas
>>>>
>>>>
>>>>

Reply via email to