One thing I noticed is that if I set drain = true, the job could be stopped correctly. Maybe that's because I'm using a Parquet file sink which is a bulk-encoded format and only writes to disk during checkpoints?
Thomas On Sat, Jun 5, 2021 at 10:06 AM Thomas Wang <w...@datability.io> wrote: > Hi Yun, > > Thanks for the tips. Yes, I do see some exceptions as copied below. I'm > not quite sure what they mean though. Any hints? > > Thanks. > > Thomas > > ``` > 2021-06-05 10:02:51 > java.util.concurrent.ExecutionException: > org.apache.flink.streaming.runtime.tasks. > ExceptionInChainedOperatorException: Could not forward element to next > operator > at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture > .java:357) > at java.util.concurrent.CompletableFuture.get(CompletableFuture.java: > 1928) > at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper > .quiesceTimeServiceAndCloseOperator(StreamOperatorWrapper.java:161) > at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper > .close(StreamOperatorWrapper.java:130) > at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper > .close(StreamOperatorWrapper.java:134) > at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper > .close(StreamOperatorWrapper.java:80) > at org.apache.flink.streaming.runtime.tasks.OperatorChain > .closeOperators(OperatorChain.java:302) > at org.apache.flink.streaming.runtime.tasks.StreamTask.afterInvoke( > StreamTask.java:576) > at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke( > StreamTask.java:544) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546) > at java.lang.Thread.run(Thread.java:748) > Caused by: org.apache.flink.streaming.runtime.tasks. > ExceptionInChainedOperatorException: Could not forward element to next > operator > at org.apache.flink.streaming.runtime.tasks. > OperatorChain$ChainingOutput.emitWatermark(OperatorChain.java:642) > at org.apache.flink.streaming.api.operators.CountingOutput > .emitWatermark(CountingOutput.java:41) > at org.apache.flink.streaming.runtime.operators. > TimestampsAndWatermarksOperator$WatermarkEmitter.emitWatermark( > TimestampsAndWatermarksOperator.java:165) > at org.apache.flink.api.common.eventtime. > BoundedOutOfOrdernessWatermarks.onPeriodicEmit( > BoundedOutOfOrdernessWatermarks.java:69) > at org.apache.flink.streaming.runtime.operators. > TimestampsAndWatermarksOperator.close(TimestampsAndWatermarksOperator > .java:125) > at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper > .lambda$closeOperator$5(StreamOperatorWrapper.java:205) > at org.apache.flink.streaming.runtime.tasks. > StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing( > StreamTaskActionExecutor.java:92) > at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper > .closeOperator(StreamOperatorWrapper.java:203) > at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper > .lambda$deferCloseOperatorToMailbox$3(StreamOperatorWrapper.java:177) > at org.apache.flink.streaming.runtime.tasks. > StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing( > StreamTaskActionExecutor.java:92) > at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail > .java:78) > at org.apache.flink.streaming.runtime.tasks.mailbox. > MailboxExecutorImpl.tryYield(MailboxExecutorImpl.java:90) > at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper > .quiesceTimeServiceAndCloseOperator(StreamOperatorWrapper.java:155) > ... 9 more > Caused by: java.lang.RuntimeException > at org.apache.flink.streaming.runtime.io.RecordWriterOutput > .emitWatermark(RecordWriterOutput.java:123) > at org.apache.flink.streaming.runtime.tasks. > OperatorChain$BroadcastingOutputCollector.emitWatermark(OperatorChain > .java:762) > at org.apache.flink.streaming.api.operators.CountingOutput > .emitWatermark(CountingOutput.java:41) > at org.apache.flink.streaming.api.operators.AbstractStreamOperator > .processWatermark(AbstractStreamOperator.java:570) > at org.apache.flink.streaming.runtime.tasks. > OperatorChain$ChainingOutput.emitWatermark(OperatorChain.java:638) > ... 21 more > Caused by: java.lang.IllegalStateException > at org.apache.flink.util.Preconditions.checkState(Preconditions.java: > 179) > at org.apache.flink.runtime.io.network.buffer.BufferBuilder.append( > BufferBuilder.java:83) > at org.apache.flink.runtime.io.network.api.serialization. > SpanningRecordSerializer.copyToBufferBuilder(SpanningRecordSerializer > .java:90) > at org.apache.flink.runtime.io.network.api.writer.RecordWriter > .copyFromSerializerToTargetChannel(RecordWriter.java:136) > at org.apache.flink.runtime.io.network.api.writer. > ChannelSelectorRecordWriter.broadcastEmit(ChannelSelectorRecordWriter > .java:80) > at org.apache.flink.streaming.runtime.io.RecordWriterOutput > .emitWatermark(RecordWriterOutput.java:121) > ... 25 more > ``` > > On Sat, Jun 5, 2021 at 6:24 AM Yun Gao <yungao...@aliyun.com> wrote: > >> Hi Thomas, >> >> For querying the savepoint status, a get request could be issued to >> /jobs/:jobid/savepoints/:savepointtriggerid [1] to get the status and >> position >> of the savepoint. But if the job is running with some kind of per-job >> mode and >> JobMaster is gone after the stop-with-savepoint, the request might not be >> available. >> >> For the kafka source, have you ever found some exception or some >> messages in the >> TaskManager's log when it could not be stopped ? >> >> Best, >> Yun >> >> [1] >> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/rest_api/#jobs-jobid-savepoints-triggerid >> >> >> >> ------------------Original Mail ------------------ >> *Sender:*Thomas Wang <w...@datability.io> >> *Send Date:*Sat Jun 5 00:47:47 2021 >> *Recipients:*Yun Gao <yungao...@aliyun.com> >> *CC:*user <user@flink.apache.org> >> *Subject:*Re: Failed to cancel a job using the STOP rest API >> >>> Hi Yun, >>> >>> Thanks for your reply. We are not using any legacy source. For this >>> specific job, there is only one source that is using FlinkKafkaConsumer >>> which I assume has the correct cancel() method implemented. >>> >>> Also could you suggest how I could use the "request-id" to get the >>> savepoint location? >>> >>> Thanks. >>> >>> Thomas >>> >>> On Fri, Jun 4, 2021 at 2:31 AM Yun Gao <yungao...@aliyun.com> wrote: >>> >>>> Hi Thomas, >>>> >>>> I think you are right that the CLI is also using the same rest API >>>> underlying, and since >>>> the response of the rest API is ok and the savepoint is triggered >>>> successfully, I reckon >>>> that it might not be due to rest API process, and we might still first >>>> focus on the >>>> stop-with-savepoint process. >>>> >>>> Currently stop-with-savepoint would first do a savepoint, then cancel >>>> all the sources to >>>> stop the job. Thus are the sources all legacy source (namely the one >>>> using SourceFunction) ? >>>> and does the source implement the cancel() method correctly ? >>>> >>>> Best, >>>> Yun >>>> >>>> ------------------------------------------------------------------ >>>> From:Thomas Wang <w...@datability.io> >>>> Send Time:2021 Jun. 4 (Fri.) 12:55 >>>> To:user <user@flink.apache.org> >>>> Subject:Failed to cancel a job using the STOP rest API >>>> >>>> Hi, Flink community, >>>> >>>> I'm trying to use the STOP rest API to cancel a job. So far, I'm seeing >>>> some inconsistent results. Sometimes, jobs could be cancelled successfully >>>> while other times, they couldn't. Either way, the POST request is accepted >>>> with a status code 202 and a "request-id". >>>> >>>> From the Flink UI, I can see the savepoint being completed >>>> successfully. However the job is still in running state afterwards. The CLI >>>> command `flink stop <JOB ID>` is working ok. I can use the CLI to stop the >>>> job and get the resulting savepoint location. If I understand >>>> this correctly, the CLI should be using the same REST API behind the >>>> scenes, isn't it? >>>> >>>> Here is my POST request URL: `http:// >>>> <HIDDEN>.ec2.internal:33557/jobs/5dce58f02d1f5e739ab12f88b2f5f031/stop`. >>>> >>>> Here is the BODY of the request: >>>> `{"drain":false,"targetDirectory":"s3://<BUCKET-NAME>/flink-savepoint"}`. >>>> >>>> I'm using Flink 1.11.2 Commit ID: DeadD0d0. >>>> >>>> Any suggestions on how I can debug this? >>>> >>>> Another question is, given the response "request-id", which endpoint >>>> should I query to get the status of the request? Most importantly, where >>>> can I get the expected savepoint location? >>>> >>>> Thanks. >>>> >>>> Thomas >>>> >>>> >>>>