[ 
https://issues.apache.org/jira/browse/FLINK-5633?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16130917#comment-16130917
 ] 

Svend Vanderveken commented on FLINK-5633:
------------------------------------------

Hi all, 

I am hitting exactly the same issue, although I am not using the 
`SpecificDatumReader` directly but rather using Confluent's Avro deserializer. 
We opted for this one since it has the advantage of being integrated with the 
Schema registry, and thus validating the schemas at runtime. 

I am not the first person to encounter this exact situation and I found and 
commented the  PR below, which essentially suggests to apply the fix mentioned 
by [~gcaliari] to that deserializer.

https://github.com/confluentinc/schema-registry/pull/509

[~StephanEwen], [~gcaliari], feel free to add comments on that PR if you think 
it should move forward, I think such kind of update would greatly help to 
integrate Kafka and Flink through Avro and the Schema Registry. 


> ClassCastException: X cannot be cast to X when re-submitting a job.
> -------------------------------------------------------------------
>
>                 Key: FLINK-5633
>                 URL: https://issues.apache.org/jira/browse/FLINK-5633
>             Project: Flink
>          Issue Type: Bug
>          Components: Job-Submission, YARN
>    Affects Versions: 1.1.4
>            Reporter: Giuliano Caliari
>            Priority: Minor
>
> I’m running a job on my local cluster and the first time I submit the job 
> everything works but whenever I cancel and re-submit the same job it fails 
> with:
> {quote}
> org.apache.flink.client.program.ProgramInvocationException: The program 
> execution failed: Job execution failed.
>       at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:427)
>       at 
> org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:101)
>       at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:400)
>       at 
> org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66)
>       at 
> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:634)
>       at au.com.my.package.pTraitor.OneTrait.execute(Traitor.scala:147)
>       at 
> au.com.my.package.pTraitor.TraitorAppOneTrait$.delayedEndpoint$au$com$my$package$pTraitor$TraitorAppOneTrait$1(TraitorApp.scala:22)
>       at 
> au.com.my.package.pTraitor.TraitorAppOneTrait$delayedInit$body.apply(TraitorApp.scala:21)
>       at scala.Function0$class.apply$mcV$sp(Function0.scala:34)
>       at 
> scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
>       at scala.App$$anonfun$main$1.apply(App.scala:76)
>       at scala.App$$anonfun$main$1.apply(App.scala:76)
>       at scala.collection.immutable.List.foreach(List.scala:381)
>       at 
> scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
>       at scala.App$class.main(App.scala:76)
>       at 
> au.com.my.package.pTraitor.TraitorAppOneTrait$.main(TraitorApp.scala:21)
>       at au.com.my.package.pTraitor.TraitorAppOneTrait.main(TraitorApp.scala)
>       at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>       at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>       at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>       at java.lang.reflect.Method.invoke(Method.java:498)
>       at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:528)
>       at 
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:419)
>       at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:339)
>       at 
> org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:831)
>       at org.apache.flink.client.CliFrontend.run(CliFrontend.java:256)
>       at 
> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1073)
>       at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1120)
>       at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1117)
>       at 
> org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:29)
>       at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1116)
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job 
> execution failed.
>       at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp(JobManager.scala:900)
>       at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:843)
>       at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:843)
>       at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
>       at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
>       at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
>       at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
>       at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>       at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>       at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>       at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.lang.RuntimeException: Could not forward element to next 
> operator
>       at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:415)
>       at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:397)
>       at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:749)
>       at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
>       at 
> org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.collectWithTimestamp(StreamSourceContexts.java:272)
>       at 
> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:261)
>       at 
> org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.emitRecord(Kafka010Fetcher.java:88)
>       at 
> org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:157)
>       at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:255)
>       at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:78)
>       at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55)
>       at 
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:56)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:269)
>       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:654)
>       at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.ClassCastException: 
> au.com.my.package.schema.p.WowTransaction cannot be cast to 
> au.com.my.package.schema.p.WowTransaction
>       at 
> au.com.my.package.pTraitor.OneTrait$$anonfun$execute$4.apply(Traitor.scala:132)
>       at 
> org.apache.flink.streaming.api.scala.DataStream$$anon$1.extractAscendingTimestamp(DataStream.scala:763)
>       at 
> org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor.extractTimestamp(AscendingTimestampExtractor.java:72)
>       at 
> org.apache.flink.streaming.runtime.operators.TimestampsAndPeriodicWatermarksOperator.processElement(TimestampsAndPeriodicWatermarksOperator.java:65)
>       at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:412)
>       ... 14 more
> {quote}
> This happens on versions 1.1.4 and 1.2
> Here's a great description of the problem, provided by Yury Ruchin:
> {quote}
> In YARN setup there are several sources where classes are loaded from: Flink 
> lib directory, YARN lib directories, user code. The first two sources are 
> handled by system classloader, the last one is loaded by 
> FlinkUserCodeClassLoader.
> My streaming job parses Avro-encoded data using SpecificRecord facility. In 
> essence, the job looks like this: Source -> Avro parser (Map) -> Sink. 
> Parallelism is 1. Job operates inside a long-lived YARN session. I have a 
> subclass of SpecificRecord, say it's name is MySpecificRecord. From class 
> loading perspective, Avro library classes, including the SpecificRecord, are 
> loaded by system class loader from YARN lib dir - such classes are shared 
> across different Flink tasks within task manager. On the other side, 
> MySpecificRecord is in the job fat jar, so it gets loaded by 
> FlinkUserCodeClassLoader. Upon every job restart, task gets a new 
> FlinkUserCodeClassLoader instance, so classes from user code are confined to 
> a task instance.
> Simply put, the parsing itself looks like this:
> val bean = new 
> SpecificDatumReader[MySpecificRecord](MySpecificRecord.getClassSchema).read(...)
> Now, the scenario:
> 1. I start my job. Parsing is initiated, so the SpecificDatumReader and 
> SpecificData get loaded by system classloader. A new FlinkUserCodeClassloader 
> is instantiated, let's denote its instance as "A". MySpecificRecord then gets 
> loaded by A.
> 2. SpecificData gets a singleton SpecificData.INSTANCE that holds a cache 
> that maps some string key derived from Avro schema to the implementing class. 
> So during parsing I get MySpecificRecord (A) cached there.
> 3. I stop the job and re-submit it. The JVM process is the same, so all 
> standard Avro classes, including SpecificData, remain loaded. A new task 
> instance is created and gets a new FlinkUserCodeClassLoader instance, let's 
> name it "B". A new MySpecificRecord class incarnation is loaded by B. From 
> JVM standpoint MySpecificRecord (B) is different from MySpecificRecord (A), 
> even though their bytecode is identical.
> 4. The job starts parsing again. SpecificDatumReader consults 
> SpecificData.INSTANCE's cache for any stashed classes and finds 
> MySpecificRecord (A) there.
> 5. SpecificDatumReader uses the cached MySpecificRecord (A) to instantiate a 
> bean for filling the parsed data in.
> 6. SpecificDatumReader hands the filled instance of MySpecificRecord (A) back 
> to job.
> 7. Job tries to cast MySpecificRecord (A) to MySpecificRecord (B).
> 8. ClassCastException :^(
> I fixed the issue by not using the SpecificData.INSTANCE singleton (even 
> though this is considered a common and expected practice). I feed every 
> parser a new instance of SpecificData. This way the class cache is confined 
> to a parser instance and gets recycled along with it.
> {quote}
> A discussion the the error can be found at:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/How-to-get-help-on-ClassCastException-when-re-submitting-a-job-td10972.html



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to