Thanks Xiangyu,

I have one issue, while running flink with kafka connector. its a working
fine for couple of days.

But suddenly  kafka lag went to "Negative value"

I am trying to find the root cause for that. Any suggestions?


On Sat, Aug 5, 2023 at 5:57 PM xiangyu feng <xiangyu...@gmail.com> wrote:

> Hi Nagireddy,
>
> I'm not particularly familiar with StreamingFileSink but I checked with
> the implementation of HadoopFsCommitter. AFAIK, when committing files to
> HDFS the committer will check if the temp file exist in the first place.
> [image: image.png]
>
> In your case, could u check why the committing temp file not exist on
> HDFS? Were these files deleted by mistake? I searched some information,
> this error may be due to the small file merge will merge the file that is
> being written. You can disable small file merge when writing files.
>
> Hope this helps.
>
> Regards,
> Xiangyu
>
>
> Y SREEKARA BHARGAVA REDDY <ynagiredd...@gmail.com> 于2023年8月5日周六 18:22写道:
>
>> Hi Xiangyu/Dev,
>>
>> Did any one has solution handle below important note in StreamingFileSink:
>>
>> Caused by: java.io.IOException: Cannot clean commit: Staging file does
>> not exist.
>>     at org.apache.flink.runtime.fs.hdfs.
>> HadoopRecoverableFsDataOutputStream$HadoopFsCommitter.commit(
>> HadoopRecoverableFsDataOutputStream.java:250)
>>
>> Important Note 3: Flink and the StreamingFileSink never overwrites
>> committed data. Given this, when trying to restore from an old
>> checkpoint/savepoint which assumes an in-progress file which was committed
>> by subsequent successful checkpoints, *Flink will refuse to resume and
>> it will throw an exception as it cannot locate the in-progress file*.
>>
>> Currently i am facing same issue in the PROD code.
>>
>>
>>
>> Regards,
>> Nagireddy Y.
>>
>>
>>
>>
>>
>>
>> On Fri, Aug 4, 2023 at 12:11 PM xiangyu feng <xiangyu...@gmail.com>
>> wrote:
>>
>>> Hi ynagireddy4u,
>>>
>>> From the exception info, I think your application has met a HDFS file
>>> issue during the commit phase of checkpoint. Can u check why 'Staging file
>>> does not exist' in the first place?
>>>
>>> Regards,
>>> Xiangyu
>>>
>>> Y SREEKARA BHARGAVA REDDY <ynagiredd...@gmail.com> 于2023年8月4日周五 12:21写道:
>>>
>>>> Hi Xiangyu/Dev Team,
>>>>
>>>> Thanks for reply.
>>>>
>>>> In  our flink job, we increase the *checkpoint timeout to 30 min.*
>>>> And the *checkpoint interval is 10 min.*
>>>>
>>>> But while running the job we got below exception.
>>>>
>>>> java.lang.RuntimeException: Error while confirming checkpoint
>>>>     at org.apache.flink.streaming.runtime.tasks.StreamTask
>>>> .notifyCheckpointComplete(StreamTask.java:952)
>>>>     at org.apache.flink.streaming.runtime.tasks.StreamTask
>>>> .lambda$notifyCheckpointCompleteAsync$7(StreamTask.java:924)
>>>>     at org.apache.flink.util.function.FunctionUtils
>>>> .lambda$asCallable$5(FunctionUtils.java:125)
>>>>     at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>>>     at org.apache.flink.streaming.runtime.tasks.
>>>> StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.run(
>>>> StreamTaskActionExecutor.java:87)
>>>>     at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail
>>>> .java:78)
>>>>     at org.apache.flink.streaming.runtime.tasks.mailbox.
>>>> MailboxProcessor.processMail(MailboxProcessor.java:261)
>>>>     at org.apache.flink.streaming.runtime.tasks.mailbox.
>>>> MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)
>>>>     at org.apache.flink.streaming.runtime.tasks.StreamTask
>>>> .runMailboxLoop(StreamTask.java:487)
>>>>     at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(
>>>> StreamTask.java:470)
>>>>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
>>>>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
>>>>     at java.lang.Thread.run(Thread.java:748)
>>>> Caused by: java.io.IOException: Cannot clean commit: Staging file does
>>>> not exist.
>>>>     at org.apache.flink.runtime.fs.hdfs.
>>>> HadoopRecoverableFsDataOutputStream$HadoopFsCommitter.commit(
>>>> HadoopRecoverableFsDataOutputStream.java:250)
>>>>     at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket
>>>> .onSuccessfulCompletionOfCheckpoint(Bucket.java:300)
>>>>     at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets
>>>> .commitUpToCheckpoint(Buckets.java:216)
>>>>     at org.apache.flink.streaming.api.functions.sink.filesystem.
>>>> StreamingFileSink.notifyCheckpointComplete(StreamingFileSink.java:415)
>>>>     at org.apache.flink.streaming.api.operators.
>>>> AbstractUdfStreamOperator.notifyCheckpointComplete(
>>>> AbstractUdfStreamOperator.java:130)
>>>>     at org.apache.flink.streaming.runtime.tasks.StreamTask
>>>> .lambda$notifyCheckpointComplete$8(StreamTask.java:936)
>>>>     at org.apache.flink.streaming.runtime.tasks.
>>>> StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.call(
>>>> StreamTaskActionExecutor.java:101)
>>>>     at org.apache.flink.streaming.runtime.tasks.StreamTask
>>>> .notifyCheckpointComplete(StreamTask.java:930)
>>>>     ... 12 more
>>>>
>>>> It would be great, if you have any workaround for that.
>>>>
>>>> Regards,
>>>> Nagireddy Y.
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> On Thu, Aug 3, 2023 at 7:24 AM xiangyu feng <xiangyu...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi ynagireddy4u,
>>>>>
>>>>> We have met this exception before. Usually it is caused by following
>>>>> reasons:
>>>>>
>>>>> 1), TaskManager is too busy with other works to send the heartbeat to
>>>>> JobMaster or TaskManager process might already exited;
>>>>> 2), There might be a network issues between this TaskManager and
>>>>> JobMaster;
>>>>> 3), In certain cases, JobMaster actor might also being too busy to
>>>>> process the RPC requests from TaskManager;
>>>>>
>>>>> Pls check if your problem fits the above situations.
>>>>>
>>>>> Best,
>>>>> Xiangyu
>>>>>
>>>>>
>>>>> Y SREEKARA BHARGAVA REDDY <ynagiredd...@gmail.com> 于2023年7月31日周一
>>>>> 20:49写道:
>>>>>
>>>>>> Hi Team,
>>>>>>
>>>>>> Did any one face the below exception.
>>>>>> If yes, please share the resolution.
>>>>>>
>>>>>>
>>>>>> 2023-07-28 22:04:16
>>>>>> j*ava.util.concurrent.TimeoutException: Heartbeat of TaskManager with
>>>>>> id
>>>>>> container_e19_1690528962823_0382_01_000005 timed out.*
>>>>>>     at org.apache.flink.runtime.jobmaster.
>>>>>>
>>>>>> JobMaster$TaskManagerHeartbeatListener.notifyHeartbeatTimeout(JobMaster
>>>>>> .java:1147)
>>>>>>     at org.apache.flink.runtime.heartbeat.HeartbeatMonitorImpl.run(
>>>>>> HeartbeatMonitorImpl.java:109)
>>>>>>     at
>>>>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:
>>>>>> 511)
>>>>>>     at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>>>>>     at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(
>>>>>> AkkaRpcActor.java:397)
>>>>>>     at
>>>>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(
>>>>>> AkkaRpcActor.java:190)
>>>>>>     at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor
>>>>>> .handleRpcMessage(FencedAkkaRpcActor.java:74)
>>>>>>     at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(
>>>>>> AkkaRpcActor.java:152)
>>>>>>     at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
>>>>>>     at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
>>>>>>     at
>>>>>> scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
>>>>>>     at akka.japi.pf
>>>>>> .UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
>>>>>>     at
>>>>>> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
>>>>>>     at
>>>>>> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>>>>>>     at
>>>>>> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>>>>>>     at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
>>>>>>     at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
>>>>>>     at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
>>>>>>     at akka.actor.ActorCell.invoke(ActorCell.scala:561)
>>>>>>     at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
>>>>>>     at akka.dispatch.Mailbox.run(Mailbox.scala:225)
>>>>>>     at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
>>>>>>     at
>>>>>> akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>>>>>     at
>>>>>> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool
>>>>>> .java:1339)
>>>>>>     at
>>>>>> akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>>>>>     at
>>>>>> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread
>>>>>> .java:107)
>>>>>>
>>>>>> Any suggestions, please share with me.
>>>>>>
>>>>>> Regards,
>>>>>> Nagireddy Y
>>>>>>
>>>>>

Reply via email to