Re: changing serializer affects resuming from checkpoint

2023-06-11 Thread Hangxiang Yu
Hi, Peng.
Maybe you could check your same codes in your different jars, shared
configurations between two jobs (due to your session mode).
If these are all separate, the consumer should not affect the producer from
my experience.


On Sun, Jun 11, 2023 at 10:01 AM Peng Peng  wrote:

> Hi Hangxiang,
>
> Thanks for your reply. I don't think these 2 jobs have any dependencies,
> they are packaged in different jars, although they run on the same cluster
> in session mode. The producer job does some filtering and sends it to kafka
> using customized serialization logic for serializing key and value into
> byte arrays. The consumer job then deserializes it from the same kafka
> topic with customized deserialization logic for keys and values, and then
> proceeds with some further processing. I only turned of kryo for the
> consumer job, I don't know why the producer job is affected and it can't
> resume from checkpoint any more..
>
> The error message details are as follows:
>
> org.apache.flink.runtime.JobException: Recovery is suppressed by
> FixedDelayRestartBackoffTimeStrategy(maxNumberRestartAttempts=50,
> backoffTimeMS=1)
>
> at
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138)
>
> at
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82)
>
> at
> org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:216)
>
> at
> org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:206)
>
> at
> org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:197)
>
> at
> org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:682)
>
> at
> org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:79)
>
> at
> org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:435)
>
> at sun.reflect.GeneratedMethodAccessor79.invoke(Unknown Source)
>
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>
> at java.lang.reflect.Method.invoke(Method.java:498)
>
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305)
>
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212)
>
> at
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
>
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
>
> at
> akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
>
> at
> akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
>
> at
> scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
>
> at
> akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
>
> at
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
>
> at
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>
> at
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>
> at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
>
> at
> akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
>
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
>
> at akka.actor.ActorCell.invoke(ActorCell.scala:561)
>
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
>
> at akka.dispatch.Mailbox.run(Mailbox.scala:225)
>
> at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
>
> at
> akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>
> at
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>
> at
> akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>
> at
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>
> Caused by: java.lang.RuntimeException: Error while getting state
>
> at
> org.apache.flink.runtime.state.DefaultKeyedStateStore.getListState(DefaultKeyedStateStore.java:74)
>
> at
> org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getListState(StreamingRuntimeContext.java:210)
>
> at
> com.visa.flink.functions.DynamicFilterFunction.open(DynamicFilterFunction.scala:50)
>
> at
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34)
>
> at
> 

Re: changing serializer affects resuming from checkpoint

2023-06-10 Thread Peng Peng
Hi Hangxiang,

Thanks for your reply. I don't think these 2 jobs have any dependencies,
they are packaged in different jars, although they run on the same cluster
in session mode. The producer job does some filtering and sends it to kafka
using customized serialization logic for serializing key and value into
byte arrays. The consumer job then deserializes it from the same kafka
topic with customized deserialization logic for keys and values, and then
proceeds with some further processing. I only turned of kryo for the
consumer job, I don't know why the producer job is affected and it can't
resume from checkpoint any more..

The error message details are as follows:

org.apache.flink.runtime.JobException: Recovery is suppressed by
FixedDelayRestartBackoffTimeStrategy(maxNumberRestartAttempts=50,
backoffTimeMS=1)

at
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138)

at
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82)

at
org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:216)

at
org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:206)

at
org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:197)

at
org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:682)

at
org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:79)

at
org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:435)

at sun.reflect.GeneratedMethodAccessor79.invoke(Unknown Source)

at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:498)

at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305)

at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212)

at
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)

at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)

at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)

at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)

at
scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)

at
akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)

at
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)

at
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)

at
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)

at akka.actor.Actor$class.aroundReceive(Actor.scala:517)

at
akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)

at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)

at akka.actor.ActorCell.invoke(ActorCell.scala:561)

at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)

at akka.dispatch.Mailbox.run(Mailbox.scala:225)

at akka.dispatch.Mailbox.exec(Mailbox.scala:235)

at
akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)

at
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)

at
akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)

at
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

Caused by: java.lang.RuntimeException: Error while getting state

at
org.apache.flink.runtime.state.DefaultKeyedStateStore.getListState(DefaultKeyedStateStore.java:74)

at
org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getListState(StreamingRuntimeContext.java:210)

at
com.visa.flink.functions.DynamicFilterFunction.open(DynamicFilterFunction.scala:50)

at
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34)

at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)

at
org.apache.flink.streaming.api.operators.co.CoBroadcastWithKeyedOperator.open(CoBroadcastWithKeyedOperator.java:91)

at
org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:442)

at
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:585)

at

Re: changing serializer affects resuming from checkpoint

2023-06-06 Thread Hangxiang Yu
HI, Peng.
Do these two jobs have any dependency?
Or Could you please share the specific logic of the two jobs if convenient
? Could you also share the failure message of the producer job ?
In my opinion, if the two tasks have no other association, as you said, the
consumer job will fail due to unsupported scheme evolution, but it should
not affect the producer job.


On Tue, Jun 6, 2023 at 2:58 PM Peng Peng  wrote:

> Hi,
>
> I have 2 flink jobs, of which one produces records to kafka using kryo
> serializer and the other consumes records from kafka and deserializes with
> kryo. It has been working fine. Then I stopped both jobs with checkpoints
> and changed the consumer job to disable generic type and kryo to use avro
> serialization. However, when I resumed the 2 jobs from the checkpoint, both
> failed. It made sense the consumer job would fail, but why is the producer
> job also affected?
>
> Thanks,
> Peng
>


-- 
Best,
Hangxiang.