Hi Hangxiang,

Thanks for your reply. I don't think these 2 jobs have any dependencies,
they are packaged in different jars, although they run on the same cluster
in session mode. The producer job does some filtering and sends it to kafka
using customized serialization logic for serializing key and value into
byte arrays. The consumer job then deserializes it from the same kafka
topic with customized deserialization logic for keys and values, and then
proceeds with some further processing. I only turned of kryo for the
consumer job, I don't know why the producer job is affected and it can't
resume from checkpoint any more..

The error message details are as follows:

org.apache.flink.runtime.JobException: Recovery is suppressed by
FixedDelayRestartBackoffTimeStrategy(maxNumberRestartAttempts=50,
backoffTimeMS=10000)

            at
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138)

            at
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82)

            at
org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:216)

            at
org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:206)

            at
org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:197)

            at
org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:682)

            at
org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:79)

            at
org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:435)

            at sun.reflect.GeneratedMethodAccessor79.invoke(Unknown Source)

            at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

            at java.lang.reflect.Method.invoke(Method.java:498)

            at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305)

            at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212)

            at
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)

            at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)

            at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)

            at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)

            at
scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)

            at
akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)

            at
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)

            at
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)

            at
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)

            at akka.actor.Actor$class.aroundReceive(Actor.scala:517)

            at
akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)

            at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)

            at akka.actor.ActorCell.invoke(ActorCell.scala:561)

            at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)

            at akka.dispatch.Mailbox.run(Mailbox.scala:225)

            at akka.dispatch.Mailbox.exec(Mailbox.scala:235)

            at
akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)

            at
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)

            at
akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)

            at
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

Caused by: java.lang.RuntimeException: Error while getting state

            at
org.apache.flink.runtime.state.DefaultKeyedStateStore.getListState(DefaultKeyedStateStore.java:74)

            at
org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getListState(StreamingRuntimeContext.java:210)

            at
com.visa.flink.functions.DynamicFilterFunction.open(DynamicFilterFunction.scala:50)

            at
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34)

            at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)

            at
org.apache.flink.streaming.api.operators.co.CoBroadcastWithKeyedOperator.open(CoBroadcastWithKeyedOperator.java:91)

            at
org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:442)

            at
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:585)

            at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)

            at
org.apache.flink.streaming.runtime.tasks.StreamTask.executeRestore(StreamTask.java:565)

            at
org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:650)

            at
org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:540)

            at
org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:759)

            at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)

            at java.lang.Thread.run(Thread.java:750)

Caused by: org.apache.flink.util.StateMigrationException: For heap
backends, the new state serializer (
org.apache.flink.api.common.typeutils.base.ListSerializer@8b6e1a2a) must
not be incompatible with the old state serializer (
org.apache.flink.api.common.typeutils.base.ListSerializer@cab0f895).

            at
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.tryRegisterStateTable(HeapKeyedStateBackend.java:211)

            at
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend.createInternalState(HeapKeyedStateBackend.java:276)

            at
org.apache.flink.runtime.state.KeyedStateFactory.createInternalState(KeyedStateFactory.java:47)

            at
org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:71)

            at
org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:301)

            at
org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:352)

            at
org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:115)

            at
org.apache.flink.runtime.state.DefaultKeyedStateStore.getListState(DefaultKeyedStateStore.java:71)

            ... 14 more


Thanks,

Peng

On Tue, Jun 6, 2023 at 9:53 PM Hangxiang Yu <master...@gmail.com> wrote:

> HI, Peng.
> Do these two jobs have any dependency?
> Or Could you please share the specific logic of the two jobs if convenient
> ? Could you also share the failure message of the producer job ?
> In my opinion, if the two tasks have no other association, as you said,
> the consumer job will fail due to unsupported scheme evolution, but it
> should not affect the producer job.
>
>
> On Tue, Jun 6, 2023 at 2:58 PM Peng Peng <pengpeng8...@gmail.com> wrote:
>
>> Hi,
>>
>> I have 2 flink jobs, of which one produces records to kafka using kryo
>> serializer and the other consumes records from kafka and deserializes with
>> kryo. It has been working fine. Then I stopped both jobs with checkpoints
>> and changed the consumer job to disable generic type and kryo to use avro
>> serialization. However, when I resumed the 2 jobs from the checkpoint, both
>> failed. It made sense the consumer job would fail, but why is the producer
>> job also affected?
>>
>> Thanks,
>> Peng
>>
>
>
> --
> Best,
> Hangxiang.
>

Reply via email to