[ 
https://issues.apache.org/jira/browse/BEAM-1970?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15990139#comment-15990139
 ] 

Luke Cwik commented on BEAM-1970:
---------------------------------

The issue in Avro/Flink seems to be that they use class caches for several 
things (which is ok when using multiple versions of the same class since they 
won't be equal) but they have one location where they map strings to classes 
(and cache the first instance):
http://grepcode.com/file/repo1.maven.org/maven2/org.apache.avro/avro/1.7.7/org/apache/avro/reflect/ReflectData.java#303

It seems as though if you use a schema and not a class you run into this 
problem because Avro looks into the cache and pulls out a class from a previous 
run. So any program that uses the same schema across multiple runs is doomed to 
fail.

> Cannot run UserScore on Flink runner due to AvroCoder classload issues
> ----------------------------------------------------------------------
>
>                 Key: BEAM-1970
>                 URL: https://issues.apache.org/jira/browse/BEAM-1970
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-flink
>            Reporter: Ahmet Altay
>            Assignee: Kenneth Knowles
>             Fix For: First stable release
>
>
> Fails with error:
> ClassCastException: 
> org.apache.beam.examples.complete.game.UserScore$GameActionInfo cannot be 
> cast to org.apache.beam.examples.complete.game.UserScore$GameActionInfo
> full stack:
> ------------------------------------------------------------
>  The program finished with the following exception:
> org.apache.flink.client.program.ProgramInvocationException: The main method 
> caused an error.
>         at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:545)
>         at 
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:419)
>         at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:339)
>         at 
> org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:831)
>         at org.apache.flink.client.CliFrontend.run(CliFrontend.java:256)
>         at 
> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1073)
>         at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1120)
>         at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1117)
>         at 
> org.apache.flink.runtime.security.HadoopSecurityContext$1.run(HadoopSecurityContext.java:43)
>         at java.security.AccessController.doPrivileged(Native Method)
>         at javax.security.auth.Subject.doAs(Subject.java:422)
>         at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657)
>         at 
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:40)
>         at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1116)
> Caused by: java.lang.RuntimeException: Pipeline execution failed
>         at org.apache.beam.runners.flink.FlinkRunner.run(FlinkRunner.java:119)
>         at org.apache.beam.sdk.Pipeline.run(Pipeline.java:265)
>         at 
> org.apache.beam.examples.complete.game.UserScore.main(UserScore.java:238)
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>         at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:498)
>         at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:528)
>         ... 13 more
> Caused by: org.apache.flink.client.program.ProgramInvocationException: The 
> program execution failed: Job execution failed.
>         at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:427)
>         at 
> org.apache.flink.yarn.YarnClusterClient.submitJob(YarnClusterClient.java:210)
>         at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:400)
>         at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:387)
>         at 
> org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:62)
>         at 
> org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment.executePipeline(FlinkPipelineExecutionEnvironment.java:111)
>         at org.apache.beam.runners.flink.FlinkRunner.run(FlinkRunner.java:116)
>         ... 20 more
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job 
> execution failed.
>         at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp(JobManager.scala:900)
>         at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:843)
>         at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:843)
>         at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
>         at 
> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
>         at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
>         at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
>         at 
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>         at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>         at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>         at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: org.apache.beam.sdk.util.UserCodeException: 
> java.lang.ClassCastException: 
> org.apache.beam.examples.complete.game.UserScore$GameActionInfo cannot
>  be cast to org.apache.beam.examples.complete.game.UserScore$GameActionInfo
>         at 
> org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:36)
>         at 
> org.apache.beam.sdk.transforms.MapElements$1$auxiliary$CfEWT9ws.invokeProcessElement(Unknown
>  Source)
>         at 
> org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:199)
>         at 
> org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:160)
>         at 
> org.apache.beam.runners.flink.translation.functions.FlinkDoFnFunction.mapPartition(FlinkDoFnFunction.java:109)
>         at 
> org.apache.flink.runtime.operators.MapPartitionDriver.run(MapPartitionDriver.java:103)
>         at 
> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:490)
>         at 
> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355)
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)
>         at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.ClassCastException: 
> org.apache.beam.examples.complete.game.UserScore$GameActionInfo cannot be 
> cast to org.apache.beam.examples.complet
> e.game.UserScore$GameActionInfo
>         at 
> org.apache.beam.sdk.transforms.SimpleFunction.apply(SimpleFunction.java:65)
>         at 
> org.apache.beam.sdk.transforms.MapElements$1.processElement(MapElements.java:122)
> 2017-04-13 21:11:11,155 INFO  org.apache.flink.yarn.YarnClusterClient         
>               - Disconnecting YarnClusterClient from ApplicationMaster



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to