[ 
https://issues.apache.org/jira/browse/FLINK-5633?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16241701#comment-16241701
 ] 

Erik van Oosten commented on FLINK-5633:
----------------------------------------

[~StephanEwen] We need to process 130K msg/s, I guess that can be called often 
:) . Our process is CPU bound and parsing Avro is ±15% of that. Any improvement 
means we can run with fewer machines.

For every message we create a new SpecificDatumReader. If I follow the code 
correctly that should _not_ give a large overhead. The Schema instances we pass 
to it _are_ cached.

Then we call {SpecificDatumReader.read}} to parse each Avro message. In that 
call you eventually end up in {{SpecificData.newInstance}} to create a new 
instance of the target class. The constructor of that class is looked up in a 
cache. That cache is declared as {{static}}. I do not understand how 
instantiating a new {{SpecificData}} for every call to {{read}} helps because 
it would still use the same cache. The code I pasted above also uses a 
constructor cache but the cache is not {{static}}. Reversing the class loader 
order should also work.

> ClassCastException: X cannot be cast to X when re-submitting a job.
> -------------------------------------------------------------------
>
>                 Key: FLINK-5633
>                 URL: https://issues.apache.org/jira/browse/FLINK-5633
>             Project: Flink
>          Issue Type: Bug
>          Components: Job-Submission, YARN
>    Affects Versions: 1.1.4
>            Reporter: Giuliano Caliari
>            Priority: Minor
>
> I’m running a job on my local cluster and the first time I submit the job 
> everything works but whenever I cancel and re-submit the same job it fails 
> with:
> {quote}
> org.apache.flink.client.program.ProgramInvocationException: The program 
> execution failed: Job execution failed.
>       at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:427)
>       at 
> org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:101)
>       at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:400)
>       at 
> org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66)
>       at 
> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:634)
>       at au.com.my.package.pTraitor.OneTrait.execute(Traitor.scala:147)
>       at 
> au.com.my.package.pTraitor.TraitorAppOneTrait$.delayedEndpoint$au$com$my$package$pTraitor$TraitorAppOneTrait$1(TraitorApp.scala:22)
>       at 
> au.com.my.package.pTraitor.TraitorAppOneTrait$delayedInit$body.apply(TraitorApp.scala:21)
>       at scala.Function0$class.apply$mcV$sp(Function0.scala:34)
>       at 
> scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
>       at scala.App$$anonfun$main$1.apply(App.scala:76)
>       at scala.App$$anonfun$main$1.apply(App.scala:76)
>       at scala.collection.immutable.List.foreach(List.scala:381)
>       at 
> scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
>       at scala.App$class.main(App.scala:76)
>       at 
> au.com.my.package.pTraitor.TraitorAppOneTrait$.main(TraitorApp.scala:21)
>       at au.com.my.package.pTraitor.TraitorAppOneTrait.main(TraitorApp.scala)
>       at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>       at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>       at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>       at java.lang.reflect.Method.invoke(Method.java:498)
>       at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:528)
>       at 
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:419)
>       at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:339)
>       at 
> org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:831)
>       at org.apache.flink.client.CliFrontend.run(CliFrontend.java:256)
>       at 
> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1073)
>       at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1120)
>       at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1117)
>       at 
> org.apache.flink.runtime.security.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:29)
>       at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1116)
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job 
> execution failed.
>       at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp(JobManager.scala:900)
>       at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:843)
>       at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:843)
>       at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
>       at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
>       at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
>       at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
>       at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>       at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>       at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>       at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.lang.RuntimeException: Could not forward element to next 
> operator
>       at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:415)
>       at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:397)
>       at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:749)
>       at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
>       at 
> org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.collectWithTimestamp(StreamSourceContexts.java:272)
>       at 
> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:261)
>       at 
> org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.emitRecord(Kafka010Fetcher.java:88)
>       at 
> org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:157)
>       at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:255)
>       at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:78)
>       at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55)
>       at 
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:56)
>       at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:269)
>       at org.apache.flink.runtime.taskmanager.Task.run(Task.java:654)
>       at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.ClassCastException: 
> au.com.my.package.schema.p.WowTransaction cannot be cast to 
> au.com.my.package.schema.p.WowTransaction
>       at 
> au.com.my.package.pTraitor.OneTrait$$anonfun$execute$4.apply(Traitor.scala:132)
>       at 
> org.apache.flink.streaming.api.scala.DataStream$$anon$1.extractAscendingTimestamp(DataStream.scala:763)
>       at 
> org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor.extractTimestamp(AscendingTimestampExtractor.java:72)
>       at 
> org.apache.flink.streaming.runtime.operators.TimestampsAndPeriodicWatermarksOperator.processElement(TimestampsAndPeriodicWatermarksOperator.java:65)
>       at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:412)
>       ... 14 more
> {quote}
> This happens on versions 1.1.4 and 1.2
> Here's a great description of the problem, provided by Yury Ruchin:
> {quote}
> In YARN setup there are several sources where classes are loaded from: Flink 
> lib directory, YARN lib directories, user code. The first two sources are 
> handled by system classloader, the last one is loaded by 
> FlinkUserCodeClassLoader.
> My streaming job parses Avro-encoded data using SpecificRecord facility. In 
> essence, the job looks like this: Source -> Avro parser (Map) -> Sink. 
> Parallelism is 1. Job operates inside a long-lived YARN session. I have a 
> subclass of SpecificRecord, say it's name is MySpecificRecord. From class 
> loading perspective, Avro library classes, including the SpecificRecord, are 
> loaded by system class loader from YARN lib dir - such classes are shared 
> across different Flink tasks within task manager. On the other side, 
> MySpecificRecord is in the job fat jar, so it gets loaded by 
> FlinkUserCodeClassLoader. Upon every job restart, task gets a new 
> FlinkUserCodeClassLoader instance, so classes from user code are confined to 
> a task instance.
> Simply put, the parsing itself looks like this:
> val bean = new 
> SpecificDatumReader[MySpecificRecord](MySpecificRecord.getClassSchema).read(...)
> Now, the scenario:
> 1. I start my job. Parsing is initiated, so the SpecificDatumReader and 
> SpecificData get loaded by system classloader. A new FlinkUserCodeClassloader 
> is instantiated, let's denote its instance as "A". MySpecificRecord then gets 
> loaded by A.
> 2. SpecificData gets a singleton SpecificData.INSTANCE that holds a cache 
> that maps some string key derived from Avro schema to the implementing class. 
> So during parsing I get MySpecificRecord (A) cached there.
> 3. I stop the job and re-submit it. The JVM process is the same, so all 
> standard Avro classes, including SpecificData, remain loaded. A new task 
> instance is created and gets a new FlinkUserCodeClassLoader instance, let's 
> name it "B". A new MySpecificRecord class incarnation is loaded by B. From 
> JVM standpoint MySpecificRecord (B) is different from MySpecificRecord (A), 
> even though their bytecode is identical.
> 4. The job starts parsing again. SpecificDatumReader consults 
> SpecificData.INSTANCE's cache for any stashed classes and finds 
> MySpecificRecord (A) there.
> 5. SpecificDatumReader uses the cached MySpecificRecord (A) to instantiate a 
> bean for filling the parsed data in.
> 6. SpecificDatumReader hands the filled instance of MySpecificRecord (A) back 
> to job.
> 7. Job tries to cast MySpecificRecord (A) to MySpecificRecord (B).
> 8. ClassCastException :^(
> I fixed the issue by not using the SpecificData.INSTANCE singleton (even 
> though this is considered a common and expected practice). I feed every 
> parser a new instance of SpecificData. This way the class cache is confined 
> to a parser instance and gets recycled along with it.
> {quote}
> A discussion the the error can be found at:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/How-to-get-help-on-ClassCastException-when-re-submitting-a-job-td10972.html



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to