Thanks Xiangyu, I have one issue, while running flink with kafka connector. its a working fine for couple of days.
But suddenly kafka lag went to "Negative value" I am trying to find the root cause for that. Any suggestions? On Sat, Aug 5, 2023 at 5:57 PM xiangyu feng <xiangyu...@gmail.com> wrote: > Hi Nagireddy, > > I'm not particularly familiar with StreamingFileSink but I checked with > the implementation of HadoopFsCommitter. AFAIK, when committing files to > HDFS the committer will check if the temp file exist in the first place. > [image: image.png] > > In your case, could u check why the committing temp file not exist on > HDFS? Were these files deleted by mistake? I searched some information, > this error may be due to the small file merge will merge the file that is > being written. You can disable small file merge when writing files. > > Hope this helps. > > Regards, > Xiangyu > > > Y SREEKARA BHARGAVA REDDY <ynagiredd...@gmail.com> 于2023年8月5日周六 18:22写道: > >> Hi Xiangyu/Dev, >> >> Did any one has solution handle below important note in StreamingFileSink: >> >> Caused by: java.io.IOException: Cannot clean commit: Staging file does >> not exist. >> at org.apache.flink.runtime.fs.hdfs. >> HadoopRecoverableFsDataOutputStream$HadoopFsCommitter.commit( >> HadoopRecoverableFsDataOutputStream.java:250) >> >> Important Note 3: Flink and the StreamingFileSink never overwrites >> committed data. Given this, when trying to restore from an old >> checkpoint/savepoint which assumes an in-progress file which was committed >> by subsequent successful checkpoints, *Flink will refuse to resume and >> it will throw an exception as it cannot locate the in-progress file*. >> >> Currently i am facing same issue in the PROD code. >> >> >> >> Regards, >> Nagireddy Y. >> >> >> >> >> >> >> On Fri, Aug 4, 2023 at 12:11 PM xiangyu feng <xiangyu...@gmail.com> >> wrote: >> >>> Hi ynagireddy4u, >>> >>> From the exception info, I think your application has met a HDFS file >>> issue during the commit phase of checkpoint. Can u check why 'Staging file >>> does not exist' in the first place? >>> >>> Regards, >>> Xiangyu >>> >>> Y SREEKARA BHARGAVA REDDY <ynagiredd...@gmail.com> 于2023年8月4日周五 12:21写道: >>> >>>> Hi Xiangyu/Dev Team, >>>> >>>> Thanks for reply. >>>> >>>> In our flink job, we increase the *checkpoint timeout to 30 min.* >>>> And the *checkpoint interval is 10 min.* >>>> >>>> But while running the job we got below exception. >>>> >>>> java.lang.RuntimeException: Error while confirming checkpoint >>>> at org.apache.flink.streaming.runtime.tasks.StreamTask >>>> .notifyCheckpointComplete(StreamTask.java:952) >>>> at org.apache.flink.streaming.runtime.tasks.StreamTask >>>> .lambda$notifyCheckpointCompleteAsync$7(StreamTask.java:924) >>>> at org.apache.flink.util.function.FunctionUtils >>>> .lambda$asCallable$5(FunctionUtils.java:125) >>>> at java.util.concurrent.FutureTask.run(FutureTask.java:266) >>>> at org.apache.flink.streaming.runtime.tasks. >>>> StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.run( >>>> StreamTaskActionExecutor.java:87) >>>> at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail >>>> .java:78) >>>> at org.apache.flink.streaming.runtime.tasks.mailbox. >>>> MailboxProcessor.processMail(MailboxProcessor.java:261) >>>> at org.apache.flink.streaming.runtime.tasks.mailbox. >>>> MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186) >>>> at org.apache.flink.streaming.runtime.tasks.StreamTask >>>> .runMailboxLoop(StreamTask.java:487) >>>> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke( >>>> StreamTask.java:470) >>>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707) >>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532) >>>> at java.lang.Thread.run(Thread.java:748) >>>> Caused by: java.io.IOException: Cannot clean commit: Staging file does >>>> not exist. >>>> at org.apache.flink.runtime.fs.hdfs. >>>> HadoopRecoverableFsDataOutputStream$HadoopFsCommitter.commit( >>>> HadoopRecoverableFsDataOutputStream.java:250) >>>> at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket >>>> .onSuccessfulCompletionOfCheckpoint(Bucket.java:300) >>>> at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets >>>> .commitUpToCheckpoint(Buckets.java:216) >>>> at org.apache.flink.streaming.api.functions.sink.filesystem. >>>> StreamingFileSink.notifyCheckpointComplete(StreamingFileSink.java:415) >>>> at org.apache.flink.streaming.api.operators. >>>> AbstractUdfStreamOperator.notifyCheckpointComplete( >>>> AbstractUdfStreamOperator.java:130) >>>> at org.apache.flink.streaming.runtime.tasks.StreamTask >>>> .lambda$notifyCheckpointComplete$8(StreamTask.java:936) >>>> at org.apache.flink.streaming.runtime.tasks. >>>> StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.call( >>>> StreamTaskActionExecutor.java:101) >>>> at org.apache.flink.streaming.runtime.tasks.StreamTask >>>> .notifyCheckpointComplete(StreamTask.java:930) >>>> ... 12 more >>>> >>>> It would be great, if you have any workaround for that. >>>> >>>> Regards, >>>> Nagireddy Y. >>>> >>>> >>>> >>>> >>>> >>>> >>>> On Thu, Aug 3, 2023 at 7:24 AM xiangyu feng <xiangyu...@gmail.com> >>>> wrote: >>>> >>>>> Hi ynagireddy4u, >>>>> >>>>> We have met this exception before. Usually it is caused by following >>>>> reasons: >>>>> >>>>> 1), TaskManager is too busy with other works to send the heartbeat to >>>>> JobMaster or TaskManager process might already exited; >>>>> 2), There might be a network issues between this TaskManager and >>>>> JobMaster; >>>>> 3), In certain cases, JobMaster actor might also being too busy to >>>>> process the RPC requests from TaskManager; >>>>> >>>>> Pls check if your problem fits the above situations. >>>>> >>>>> Best, >>>>> Xiangyu >>>>> >>>>> >>>>> Y SREEKARA BHARGAVA REDDY <ynagiredd...@gmail.com> 于2023年7月31日周一 >>>>> 20:49写道: >>>>> >>>>>> Hi Team, >>>>>> >>>>>> Did any one face the below exception. >>>>>> If yes, please share the resolution. >>>>>> >>>>>> >>>>>> 2023-07-28 22:04:16 >>>>>> j*ava.util.concurrent.TimeoutException: Heartbeat of TaskManager with >>>>>> id >>>>>> container_e19_1690528962823_0382_01_000005 timed out.* >>>>>> at org.apache.flink.runtime.jobmaster. >>>>>> >>>>>> JobMaster$TaskManagerHeartbeatListener.notifyHeartbeatTimeout(JobMaster >>>>>> .java:1147) >>>>>> at org.apache.flink.runtime.heartbeat.HeartbeatMonitorImpl.run( >>>>>> HeartbeatMonitorImpl.java:109) >>>>>> at >>>>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java: >>>>>> 511) >>>>>> at java.util.concurrent.FutureTask.run(FutureTask.java:266) >>>>>> at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync( >>>>>> AkkaRpcActor.java:397) >>>>>> at >>>>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage( >>>>>> AkkaRpcActor.java:190) >>>>>> at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor >>>>>> .handleRpcMessage(FencedAkkaRpcActor.java:74) >>>>>> at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage( >>>>>> AkkaRpcActor.java:152) >>>>>> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) >>>>>> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) >>>>>> at >>>>>> scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) >>>>>> at akka.japi.pf >>>>>> .UnitCaseStatement.applyOrElse(CaseStatements.scala:21) >>>>>> at >>>>>> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) >>>>>> at >>>>>> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) >>>>>> at >>>>>> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) >>>>>> at akka.actor.Actor$class.aroundReceive(Actor.scala:517) >>>>>> at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) >>>>>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) >>>>>> at akka.actor.ActorCell.invoke(ActorCell.scala:561) >>>>>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) >>>>>> at akka.dispatch.Mailbox.run(Mailbox.scala:225) >>>>>> at akka.dispatch.Mailbox.exec(Mailbox.scala:235) >>>>>> at >>>>>> akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) >>>>>> at >>>>>> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool >>>>>> .java:1339) >>>>>> at >>>>>> akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) >>>>>> at >>>>>> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread >>>>>> .java:107) >>>>>> >>>>>> Any suggestions, please share with me. >>>>>> >>>>>> Regards, >>>>>> Nagireddy Y >>>>>> >>>>>