Thanks everyone. I'm using Flink on EMR. I just updated to EMR 6.3 which
uses Flink 1.12.1. I will report back whether this resolves the issue.

Thomas

On Wed, Jun 9, 2021 at 11:15 PM Yun Gao <yungao...@aliyun.com> wrote:

> Very thanks Kezhu for the catch, it also looks to me the same issue as
> FLINK-21028.
>
> ------------------------------------------------------------------
> From:Piotr Nowojski <pnowoj...@apache.org>
> Send Time:2021 Jun. 9 (Wed.) 22:12
> To:Kezhu Wang <kez...@gmail.com>
> Cc:Thomas Wang <w...@datability.io>; Yun Gao <yungao...@aliyun.com>; user <
> user@flink.apache.org>
> Subject:Re: Re: Re: Re: Failed to cancel a job using the STOP rest API
>
> Yes good catch Kezhu, IllegalStateException sounds very much like
> FLINK-21028.
>
> Thomas, could you try upgrading to Flink 1.13.1 or 1.12.4? (1.11.4 hasn't
> been released yet)?
>
> Piotrek
>
> wt., 8 cze 2021 o 17:18 Kezhu Wang <kez...@gmail.com> napisał(a):
> Could it be same as FLINK-21028[1] (titled as “Streaming application
> didn’t stop properly”, fixed in 1.11.4, 1.12.2, 1.13.0) ?
>
>
> [1]: https://issues.apache.org/jira/browse/FLINK-21028
>
>
> Best,
> Kezhu Wang
>
> On June 8, 2021 at 22:54:10, Yun Gao (yungao...@aliyun.com) wrote:
> Hi Thomas,
>
> I tried but do not re-produce the exception yet. I have filed
> an issue for the exception first [1].
>
>
>
> [1] https://issues.apache.org/jira/browse/FLINK-22928
>
>
> ------------------Original Mail ------------------
> *Sender:*Thomas Wang <w...@datability.io>
> *Send Date:*Tue Jun 8 07:45:52 2021
> *Recipients:*Yun Gao <yungao...@aliyun.com>
> *CC:*user <user@flink.apache.org>
> *Subject:*Re: Re: Re: Failed to cancel a job using the STOP rest API
> This is actually a very simple job that reads from Kafka and writes to S3
> using the StreamingFileSink w/ Parquet format. I'm all using Flink's API
> and nothing custom.
>
> Thomas
>
> On Sun, Jun 6, 2021 at 6:43 PM Yun Gao <yungao...@aliyun.com> wrote:
> Hi Thoms,
>
> Very thanks for reporting the exceptions, and it seems to be not work as
> expected to me...
> Could you also show us the dag of the job ? And does some operators in the
> source task
> use multiple-threads to emit records?
>
> Best,
> Yun
>
>
> ------------------Original Mail ------------------
> *Sender:*Thomas Wang <w...@datability.io>
> *Send Date:*Sun Jun 6 04:02:20 2021
> *Recipients:*Yun Gao <yungao...@aliyun.com>
> *CC:*user <user@flink.apache.org>
> *Subject:*Re: Re: Failed to cancel a job using the STOP rest API
> One thing I noticed is that if I set drain = true, the job could be
> stopped correctly. Maybe that's because I'm using a Parquet file sink which
> is a bulk-encoded format and only writes to disk during checkpoints?
>
> Thomas
>
> On Sat, Jun 5, 2021 at 10:06 AM Thomas Wang <w...@datability.io> wrote:
> Hi Yun,
>
> Thanks for the tips. Yes, I do see some exceptions as copied below. I'm
> not quite sure what they mean though. Any hints?
>
> Thanks.
>
> Thomas
>
> ```
> 2021-06-05 10:02:51
> java.util.concurrent.ExecutionException:
> org.apache.flink.streaming.runtime.tasks.
> ExceptionInChainedOperatorException: Could not forward element to next
> operator
>     at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture
> .java:357)
>     at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:
> 1928)
>     at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper
> .quiesceTimeServiceAndCloseOperator(StreamOperatorWrapper.java:161)
>     at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper
> .close(StreamOperatorWrapper.java:130)
>     at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper
> .close(StreamOperatorWrapper.java:134)
>     at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper
> .close(StreamOperatorWrapper.java:80)
>     at org.apache.flink.streaming.runtime.tasks.OperatorChain
> .closeOperators(OperatorChain.java:302)
>     at org.apache.flink.streaming.runtime.tasks.StreamTask.afterInvoke(
> StreamTask.java:576)
>     at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(
> StreamTask.java:544)
>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
>     at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.streaming.runtime.tasks.
> ExceptionInChainedOperatorException: Could not forward element to next
> operator
>     at org.apache.flink.streaming.runtime.tasks.
> OperatorChain$ChainingOutput.emitWatermark(OperatorChain.java:642)
>     at org.apache.flink.streaming.api.operators.CountingOutput
> .emitWatermark(CountingOutput.java:41)
>     at org.apache.flink.streaming.runtime.operators.
> TimestampsAndWatermarksOperator$WatermarkEmitter.emitWatermark(
> TimestampsAndWatermarksOperator.java:165)
>     at org.apache.flink.api.common.eventtime.
> BoundedOutOfOrdernessWatermarks.onPeriodicEmit(
> BoundedOutOfOrdernessWatermarks.java:69)
>     at org.apache.flink.streaming.runtime.operators.
> TimestampsAndWatermarksOperator.close(TimestampsAndWatermarksOperator
> .java:125)
>     at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper
> .lambda$closeOperator$5(StreamOperatorWrapper.java:205)
>     at org.apache.flink.streaming.runtime.tasks.
> StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(
> StreamTaskActionExecutor.java:92)
>     at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper
> .closeOperator(StreamOperatorWrapper.java:203)
>     at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper
> .lambda$deferCloseOperatorToMailbox$3(StreamOperatorWrapper.java:177)
>     at org.apache.flink.streaming.runtime.tasks.
> StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(
> StreamTaskActionExecutor.java:92)
>     at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail
> .java:78)
>     at org.apache.flink.streaming.runtime.tasks.mailbox.
> MailboxExecutorImpl.tryYield(MailboxExecutorImpl.java:90)
>     at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper
> .quiesceTimeServiceAndCloseOperator(StreamOperatorWrapper.java:155)
>     ... 9 more
> Caused by: java.lang.RuntimeException
>     at org.apache.flink.streaming.runtime.io.RecordWriterOutput
> .emitWatermark(RecordWriterOutput.java:123)
>     at org.apache.flink.streaming.runtime.tasks.
> OperatorChain$BroadcastingOutputCollector.emitWatermark(OperatorChain
> .java:762)
>     at org.apache.flink.streaming.api.operators.CountingOutput
> .emitWatermark(CountingOutput.java:41)
>     at org.apache.flink.streaming.api.operators.AbstractStreamOperator
> .processWatermark(AbstractStreamOperator.java:570)
>     at org.apache.flink.streaming.runtime.tasks.
> OperatorChain$ChainingOutput.emitWatermark(OperatorChain.java:638)
>     ... 21 more
> Caused by: java.lang.IllegalStateException
>     at org.apache.flink.util.Preconditions.checkState(Preconditions.java:
> 179)
>     at org.apache.flink.runtime.io.network.buffer.BufferBuilder.append(
> BufferBuilder.java:83)
>     at org.apache.flink.runtime.io.network.api.serialization.
> SpanningRecordSerializer.copyToBufferBuilder(SpanningRecordSerializer
> .java:90)
>     at org.apache.flink.runtime.io.network.api.writer.RecordWriter
> .copyFromSerializerToTargetChannel(RecordWriter.java:136)
>     at org.apache.flink.runtime.io.network.api.writer.
> ChannelSelectorRecordWriter.broadcastEmit(ChannelSelectorRecordWriter
> .java:80)
>     at org.apache.flink.streaming.runtime.io.RecordWriterOutput
> .emitWatermark(RecordWriterOutput.java:121)
>     ... 25 more
> ```
>
> On Sat, Jun 5, 2021 at 6:24 AM Yun Gao <yungao...@aliyun.com> wrote:
> Hi Thomas,
>
> For querying the savepoint status, a get request could be issued to
>  /jobs/:jobid/savepoints/:savepointtriggerid [1] to get the status and
> position
> of the savepoint. But if the job is running with some kind of per-job mode
> and
> JobMaster is gone after the stop-with-savepoint, the request might not be
> available.
>
> For the kafka source, have you ever found some exception or some messages
> in the
> TaskManager's log when it could not be stopped ?
>
> Best,
> Yun
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/rest_api/#jobs-jobid-savepoints-triggerid
>
>
>
> ------------------Original Mail ------------------
> *Sender:*Thomas Wang <w...@datability.io>
> *Send Date:*Sat Jun 5 00:47:47 2021
> *Recipients:*Yun Gao <yungao...@aliyun.com>
> *CC:*user <user@flink.apache.org>
> *Subject:*Re: Failed to cancel a job using the STOP rest API
> Hi Yun,
>
> Thanks for your reply. We are not using any legacy source. For this
> specific job, there is only one source that is using FlinkKafkaConsumer
> which I assume has the correct cancel() method implemented.
>
> Also could you suggest how I could use the "request-id" to get the
> savepoint location?
>
> Thanks.
>
> Thomas
>
> On Fri, Jun 4, 2021 at 2:31 AM Yun Gao <yungao...@aliyun.com> wrote:
> Hi Thomas,
>
> I think you are right that the CLI is also using the same rest API
> underlying, and since
> the response of the rest API is ok and the savepoint is triggered
> successfully, I reckon
> that it might not be due to rest API process, and we might still first
> focus on the
> stop-with-savepoint process.
>
> Currently stop-with-savepoint would first do a savepoint, then cancel all
> the sources to
> stop the job. Thus are the sources all legacy source (namely the one using
> SourceFunction) ?
> and does the source implement the cancel() method correctly ?
>
> Best,
> Yun
>
> ------------------------------------------------------------------
> From:Thomas Wang <w...@datability.io>
> Send Time:2021 Jun. 4 (Fri.) 12:55
> To:user <user@flink.apache.org>
> Subject:Failed to cancel a job using the STOP rest API
>
> Hi, Flink community,
>
> I'm trying to use the STOP rest API to cancel a job. So far, I'm seeing
> some inconsistent results. Sometimes, jobs could be cancelled successfully
> while other times, they couldn't. Either way, the POST request is accepted
> with a status code 202 and a "request-id".
>
> From the Flink UI, I can see the savepoint being completed successfully.
> However the job is still in running state afterwards. The CLI command
> `flink stop <JOB ID>` is working ok. I can use the CLI to stop the job and
> get the resulting savepoint location. If I understand this correctly, the
> CLI should be using the same REST API behind the scenes, isn't it?
>
> Here is my POST request URL: `http://
> <HIDDEN>.ec2.internal:33557/jobs/5dce58f02d1f5e739ab12f88b2f5f031/stop`.
>
> Here is the BODY of the request:
> `{"drain":false,"targetDirectory":"s3://<BUCKET-NAME>/flink-savepoint"}`.
>
> I'm using Flink 1.11.2 Commit ID: DeadD0d0.
>
> Any suggestions on how I can debug this?
>
> Another question is, given the response "request-id", which endpoint
> should I query to get the status of the request? Most importantly, where
> can I get the expected savepoint location?
>
> Thanks.
>
> Thomas
>
>
>

Reply via email to