Thanks everyone. I'm using Flink on EMR. I just updated to EMR 6.3 which uses Flink 1.12.1. I will report back whether this resolves the issue.
Thomas On Wed, Jun 9, 2021 at 11:15 PM Yun Gao <yungao...@aliyun.com> wrote: > Very thanks Kezhu for the catch, it also looks to me the same issue as > FLINK-21028. > > ------------------------------------------------------------------ > From:Piotr Nowojski <pnowoj...@apache.org> > Send Time:2021 Jun. 9 (Wed.) 22:12 > To:Kezhu Wang <kez...@gmail.com> > Cc:Thomas Wang <w...@datability.io>; Yun Gao <yungao...@aliyun.com>; user < > user@flink.apache.org> > Subject:Re: Re: Re: Re: Failed to cancel a job using the STOP rest API > > Yes good catch Kezhu, IllegalStateException sounds very much like > FLINK-21028. > > Thomas, could you try upgrading to Flink 1.13.1 or 1.12.4? (1.11.4 hasn't > been released yet)? > > Piotrek > > wt., 8 cze 2021 o 17:18 Kezhu Wang <kez...@gmail.com> napisał(a): > Could it be same as FLINK-21028[1] (titled as “Streaming application > didn’t stop properly”, fixed in 1.11.4, 1.12.2, 1.13.0) ? > > > [1]: https://issues.apache.org/jira/browse/FLINK-21028 > > > Best, > Kezhu Wang > > On June 8, 2021 at 22:54:10, Yun Gao (yungao...@aliyun.com) wrote: > Hi Thomas, > > I tried but do not re-produce the exception yet. I have filed > an issue for the exception first [1]. > > > > [1] https://issues.apache.org/jira/browse/FLINK-22928 > > > ------------------Original Mail ------------------ > *Sender:*Thomas Wang <w...@datability.io> > *Send Date:*Tue Jun 8 07:45:52 2021 > *Recipients:*Yun Gao <yungao...@aliyun.com> > *CC:*user <user@flink.apache.org> > *Subject:*Re: Re: Re: Failed to cancel a job using the STOP rest API > This is actually a very simple job that reads from Kafka and writes to S3 > using the StreamingFileSink w/ Parquet format. I'm all using Flink's API > and nothing custom. > > Thomas > > On Sun, Jun 6, 2021 at 6:43 PM Yun Gao <yungao...@aliyun.com> wrote: > Hi Thoms, > > Very thanks for reporting the exceptions, and it seems to be not work as > expected to me... > Could you also show us the dag of the job ? And does some operators in the > source task > use multiple-threads to emit records? > > Best, > Yun > > > ------------------Original Mail ------------------ > *Sender:*Thomas Wang <w...@datability.io> > *Send Date:*Sun Jun 6 04:02:20 2021 > *Recipients:*Yun Gao <yungao...@aliyun.com> > *CC:*user <user@flink.apache.org> > *Subject:*Re: Re: Failed to cancel a job using the STOP rest API > One thing I noticed is that if I set drain = true, the job could be > stopped correctly. Maybe that's because I'm using a Parquet file sink which > is a bulk-encoded format and only writes to disk during checkpoints? > > Thomas > > On Sat, Jun 5, 2021 at 10:06 AM Thomas Wang <w...@datability.io> wrote: > Hi Yun, > > Thanks for the tips. Yes, I do see some exceptions as copied below. I'm > not quite sure what they mean though. Any hints? > > Thanks. > > Thomas > > ``` > 2021-06-05 10:02:51 > java.util.concurrent.ExecutionException: > org.apache.flink.streaming.runtime.tasks. > ExceptionInChainedOperatorException: Could not forward element to next > operator > at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture > .java:357) > at java.util.concurrent.CompletableFuture.get(CompletableFuture.java: > 1928) > at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper > .quiesceTimeServiceAndCloseOperator(StreamOperatorWrapper.java:161) > at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper > .close(StreamOperatorWrapper.java:130) > at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper > .close(StreamOperatorWrapper.java:134) > at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper > .close(StreamOperatorWrapper.java:80) > at org.apache.flink.streaming.runtime.tasks.OperatorChain > .closeOperators(OperatorChain.java:302) > at org.apache.flink.streaming.runtime.tasks.StreamTask.afterInvoke( > StreamTask.java:576) > at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke( > StreamTask.java:544) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) > at java.lang.Thread.run(Thread.java:748) > Caused by: org.apache.flink.streaming.runtime.tasks. > ExceptionInChainedOperatorException: Could not forward element to next > operator > at org.apache.flink.streaming.runtime.tasks. > OperatorChain$ChainingOutput.emitWatermark(OperatorChain.java:642) > at org.apache.flink.streaming.api.operators.CountingOutput > .emitWatermark(CountingOutput.java:41) > at org.apache.flink.streaming.runtime.operators. > TimestampsAndWatermarksOperator$WatermarkEmitter.emitWatermark( > TimestampsAndWatermarksOperator.java:165) > at org.apache.flink.api.common.eventtime. > BoundedOutOfOrdernessWatermarks.onPeriodicEmit( > BoundedOutOfOrdernessWatermarks.java:69) > at org.apache.flink.streaming.runtime.operators. > TimestampsAndWatermarksOperator.close(TimestampsAndWatermarksOperator > .java:125) > at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper > .lambda$closeOperator$5(StreamOperatorWrapper.java:205) > at org.apache.flink.streaming.runtime.tasks. > StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing( > StreamTaskActionExecutor.java:92) > at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper > .closeOperator(StreamOperatorWrapper.java:203) > at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper > .lambda$deferCloseOperatorToMailbox$3(StreamOperatorWrapper.java:177) > at org.apache.flink.streaming.runtime.tasks. > StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing( > StreamTaskActionExecutor.java:92) > at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail > .java:78) > at org.apache.flink.streaming.runtime.tasks.mailbox. > MailboxExecutorImpl.tryYield(MailboxExecutorImpl.java:90) > at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper > .quiesceTimeServiceAndCloseOperator(StreamOperatorWrapper.java:155) > ... 9 more > Caused by: java.lang.RuntimeException > at org.apache.flink.streaming.runtime.io.RecordWriterOutput > .emitWatermark(RecordWriterOutput.java:123) > at org.apache.flink.streaming.runtime.tasks. > OperatorChain$BroadcastingOutputCollector.emitWatermark(OperatorChain > .java:762) > at org.apache.flink.streaming.api.operators.CountingOutput > .emitWatermark(CountingOutput.java:41) > at org.apache.flink.streaming.api.operators.AbstractStreamOperator > .processWatermark(AbstractStreamOperator.java:570) > at org.apache.flink.streaming.runtime.tasks. > OperatorChain$ChainingOutput.emitWatermark(OperatorChain.java:638) > ... 21 more > Caused by: java.lang.IllegalStateException > at org.apache.flink.util.Preconditions.checkState(Preconditions.java: > 179) > at org.apache.flink.runtime.io.network.buffer.BufferBuilder.append( > BufferBuilder.java:83) > at org.apache.flink.runtime.io.network.api.serialization. > SpanningRecordSerializer.copyToBufferBuilder(SpanningRecordSerializer > .java:90) > at org.apache.flink.runtime.io.network.api.writer.RecordWriter > .copyFromSerializerToTargetChannel(RecordWriter.java:136) > at org.apache.flink.runtime.io.network.api.writer. > ChannelSelectorRecordWriter.broadcastEmit(ChannelSelectorRecordWriter > .java:80) > at org.apache.flink.streaming.runtime.io.RecordWriterOutput > .emitWatermark(RecordWriterOutput.java:121) > ... 25 more > ``` > > On Sat, Jun 5, 2021 at 6:24 AM Yun Gao <yungao...@aliyun.com> wrote: > Hi Thomas, > > For querying the savepoint status, a get request could be issued to > /jobs/:jobid/savepoints/:savepointtriggerid [1] to get the status and > position > of the savepoint. But if the job is running with some kind of per-job mode > and > JobMaster is gone after the stop-with-savepoint, the request might not be > available. > > For the kafka source, have you ever found some exception or some messages > in the > TaskManager's log when it could not be stopped ? > > Best, > Yun > > [1] > https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/rest_api/#jobs-jobid-savepoints-triggerid > > > > ------------------Original Mail ------------------ > *Sender:*Thomas Wang <w...@datability.io> > *Send Date:*Sat Jun 5 00:47:47 2021 > *Recipients:*Yun Gao <yungao...@aliyun.com> > *CC:*user <user@flink.apache.org> > *Subject:*Re: Failed to cancel a job using the STOP rest API > Hi Yun, > > Thanks for your reply. We are not using any legacy source. For this > specific job, there is only one source that is using FlinkKafkaConsumer > which I assume has the correct cancel() method implemented. > > Also could you suggest how I could use the "request-id" to get the > savepoint location? > > Thanks. > > Thomas > > On Fri, Jun 4, 2021 at 2:31 AM Yun Gao <yungao...@aliyun.com> wrote: > Hi Thomas, > > I think you are right that the CLI is also using the same rest API > underlying, and since > the response of the rest API is ok and the savepoint is triggered > successfully, I reckon > that it might not be due to rest API process, and we might still first > focus on the > stop-with-savepoint process. > > Currently stop-with-savepoint would first do a savepoint, then cancel all > the sources to > stop the job. Thus are the sources all legacy source (namely the one using > SourceFunction) ? > and does the source implement the cancel() method correctly ? > > Best, > Yun > > ------------------------------------------------------------------ > From:Thomas Wang <w...@datability.io> > Send Time:2021 Jun. 4 (Fri.) 12:55 > To:user <user@flink.apache.org> > Subject:Failed to cancel a job using the STOP rest API > > Hi, Flink community, > > I'm trying to use the STOP rest API to cancel a job. So far, I'm seeing > some inconsistent results. Sometimes, jobs could be cancelled successfully > while other times, they couldn't. Either way, the POST request is accepted > with a status code 202 and a "request-id". > > From the Flink UI, I can see the savepoint being completed successfully. > However the job is still in running state afterwards. The CLI command > `flink stop <JOB ID>` is working ok. I can use the CLI to stop the job and > get the resulting savepoint location. If I understand this correctly, the > CLI should be using the same REST API behind the scenes, isn't it? > > Here is my POST request URL: `http:// > <HIDDEN>.ec2.internal:33557/jobs/5dce58f02d1f5e739ab12f88b2f5f031/stop`. > > Here is the BODY of the request: > `{"drain":false,"targetDirectory":"s3://<BUCKET-NAME>/flink-savepoint"}`. > > I'm using Flink 1.11.2 Commit ID: DeadD0d0. > > Any suggestions on how I can debug this? > > Another question is, given the response "request-id", which endpoint > should I query to get the status of the request? Most importantly, where > can I get the expected savepoint location? > > Thanks. > > Thomas > > >