[jira] [Commented] (BEAM-1970) Cannot run UserScore on Flink runner due to AvroCoder classload issues
[ https://issues.apache.org/jira/browse/BEAM-1970?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15991047#comment-15991047 ] ASF GitHub Bot commented on BEAM-1970: -- Github user asfgit closed the pull request at: https://github.com/apache/beam/pull/2783 > Cannot run UserScore on Flink runner due to AvroCoder classload issues > -- > > Key: BEAM-1970 > URL: https://issues.apache.org/jira/browse/BEAM-1970 > Project: Beam > Issue Type: Bug > Components: runner-flink >Reporter: Ahmet Altay >Assignee: Kenneth Knowles > Fix For: First stable release > > > Fails with error: > ClassCastException: > org.apache.beam.examples.complete.game.UserScore$GameActionInfo cannot be > cast to org.apache.beam.examples.complete.game.UserScore$GameActionInfo > full stack: > > The program finished with the following exception: > org.apache.flink.client.program.ProgramInvocationException: The main method > caused an error. > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:545) > at > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:419) > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:339) > at > org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:831) > at org.apache.flink.client.CliFrontend.run(CliFrontend.java:256) > at > org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1073) > at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1120) > at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1117) > at > org.apache.flink.runtime.security.HadoopSecurityContext$1.run(HadoopSecurityContext.java:43) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:422) > at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657) > at > org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:40) > at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1116) > Caused by: java.lang.RuntimeException: Pipeline execution failed > at org.apache.beam.runners.flink.FlinkRunner.run(FlinkRunner.java:119) > at org.apache.beam.sdk.Pipeline.run(Pipeline.java:265) > at > org.apache.beam.examples.complete.game.UserScore.main(UserScore.java:238) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:528) > ... 13 more > Caused by: org.apache.flink.client.program.ProgramInvocationException: The > program execution failed: Job execution failed. > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:427) > at > org.apache.flink.yarn.YarnClusterClient.submitJob(YarnClusterClient.java:210) > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:400) > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:387) > at > org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:62) > at > org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment.executePipeline(FlinkPipelineExecutionEnvironment.java:111) > at org.apache.beam.runners.flink.FlinkRunner.run(FlinkRunner.java:116) > ... 20 more > Caused by: org.apache.flink.runtime.client.JobExecutionException: Job > execution failed. > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp(JobManager.scala:900) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:843) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:843) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) > at
[jira] [Commented] (BEAM-1970) Cannot run UserScore on Flink runner due to AvroCoder classload issues
[ https://issues.apache.org/jira/browse/BEAM-1970?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15990266#comment-15990266 ] Kenneth Knowles commented on BEAM-1970: --- I wrote a very small unit test for {{AvroCoder}} that should definitely have passed but did not, so it isn't a Flink issue, but Avro. Flink just happens to exercise the broken code path. It could bite other non-default class loading setups, too. The workaround is pretty easy and surgical in {{AvroCoder}}. > Cannot run UserScore on Flink runner due to AvroCoder classload issues > -- > > Key: BEAM-1970 > URL: https://issues.apache.org/jira/browse/BEAM-1970 > Project: Beam > Issue Type: Bug > Components: runner-flink >Reporter: Ahmet Altay >Assignee: Kenneth Knowles > Fix For: First stable release > > > Fails with error: > ClassCastException: > org.apache.beam.examples.complete.game.UserScore$GameActionInfo cannot be > cast to org.apache.beam.examples.complete.game.UserScore$GameActionInfo > full stack: > > The program finished with the following exception: > org.apache.flink.client.program.ProgramInvocationException: The main method > caused an error. > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:545) > at > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:419) > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:339) > at > org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:831) > at org.apache.flink.client.CliFrontend.run(CliFrontend.java:256) > at > org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1073) > at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1120) > at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1117) > at > org.apache.flink.runtime.security.HadoopSecurityContext$1.run(HadoopSecurityContext.java:43) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:422) > at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657) > at > org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:40) > at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1116) > Caused by: java.lang.RuntimeException: Pipeline execution failed > at org.apache.beam.runners.flink.FlinkRunner.run(FlinkRunner.java:119) > at org.apache.beam.sdk.Pipeline.run(Pipeline.java:265) > at > org.apache.beam.examples.complete.game.UserScore.main(UserScore.java:238) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:528) > ... 13 more > Caused by: org.apache.flink.client.program.ProgramInvocationException: The > program execution failed: Job execution failed. > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:427) > at > org.apache.flink.yarn.YarnClusterClient.submitJob(YarnClusterClient.java:210) > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:400) > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:387) > at > org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:62) > at > org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment.executePipeline(FlinkPipelineExecutionEnvironment.java:111) > at org.apache.beam.runners.flink.FlinkRunner.run(FlinkRunner.java:116) > ... 20 more > Caused by: org.apache.flink.runtime.client.JobExecutionException: Job > execution failed. > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp(JobManager.scala:900) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:843) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:843) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) > at >
[jira] [Commented] (BEAM-1970) Cannot run UserScore on Flink runner due to AvroCoder classload issues
[ https://issues.apache.org/jira/browse/BEAM-1970?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15990139#comment-15990139 ] Luke Cwik commented on BEAM-1970: - The issue in Avro/Flink seems to be that they use class caches for several things (which is ok when using multiple versions of the same class since they won't be equal) but they have one location where they map strings to classes (and cache the first instance): http://grepcode.com/file/repo1.maven.org/maven2/org.apache.avro/avro/1.7.7/org/apache/avro/reflect/ReflectData.java#303 It seems as though if you use a schema and not a class you run into this problem because Avro looks into the cache and pulls out a class from a previous run. So any program that uses the same schema across multiple runs is doomed to fail. > Cannot run UserScore on Flink runner due to AvroCoder classload issues > -- > > Key: BEAM-1970 > URL: https://issues.apache.org/jira/browse/BEAM-1970 > Project: Beam > Issue Type: Bug > Components: runner-flink >Reporter: Ahmet Altay >Assignee: Kenneth Knowles > Fix For: First stable release > > > Fails with error: > ClassCastException: > org.apache.beam.examples.complete.game.UserScore$GameActionInfo cannot be > cast to org.apache.beam.examples.complete.game.UserScore$GameActionInfo > full stack: > > The program finished with the following exception: > org.apache.flink.client.program.ProgramInvocationException: The main method > caused an error. > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:545) > at > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:419) > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:339) > at > org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:831) > at org.apache.flink.client.CliFrontend.run(CliFrontend.java:256) > at > org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1073) > at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1120) > at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1117) > at > org.apache.flink.runtime.security.HadoopSecurityContext$1.run(HadoopSecurityContext.java:43) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:422) > at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657) > at > org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:40) > at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1116) > Caused by: java.lang.RuntimeException: Pipeline execution failed > at org.apache.beam.runners.flink.FlinkRunner.run(FlinkRunner.java:119) > at org.apache.beam.sdk.Pipeline.run(Pipeline.java:265) > at > org.apache.beam.examples.complete.game.UserScore.main(UserScore.java:238) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:528) > ... 13 more > Caused by: org.apache.flink.client.program.ProgramInvocationException: The > program execution failed: Job execution failed. > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:427) > at > org.apache.flink.yarn.YarnClusterClient.submitJob(YarnClusterClient.java:210) > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:400) > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:387) > at > org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:62) > at > org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment.executePipeline(FlinkPipelineExecutionEnvironment.java:111) > at org.apache.beam.runners.flink.FlinkRunner.run(FlinkRunner.java:116) > ... 20 more > Caused by: org.apache.flink.runtime.client.JobExecutionException: Job > execution failed. > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp(JobManager.scala:900) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:843) >
[jira] [Commented] (BEAM-1970) Cannot run UserScore on Flink runner due to AvroCoder classload issues
[ https://issues.apache.org/jira/browse/BEAM-1970?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15990041#comment-15990041 ] ASF GitHub Bot commented on BEAM-1970: -- GitHub user kennknowles opened a pull request: https://github.com/apache/beam/pull/2783 [BEAM-1970] Use a new ReflectData for each AvroCoder instance Be sure to do all of the following to help us incorporate your contribution quickly and easily: - [ ] Make sure the PR title is formatted like: `[BEAM-] Description of pull request` - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable Travis-CI on your fork and ensure the whole test matrix passes). - [ ] Replace `` in the title with the actual Jira issue number, if there is one. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf). --- This addresses an issue where Avro might have cached a class from a different ClassLoader. You can merge this pull request into a Git repository by running: $ git pull https://github.com/kennknowles/beam AvroCoder Alternatively you can review and apply these changes as the patch at: https://github.com/apache/beam/pull/2783.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2783 commit 72c189def87262c506c5c5c65552afcf31be8f04 Author: Kenneth KnowlesDate: 2017-04-29T21:06:37Z Use a new ReflectData for each AvroCoder instance This addresses an issue where Avro might have cached a class from a different ClassLoader. > Cannot run UserScore on Flink runner due to AvroCoder classload issues > -- > > Key: BEAM-1970 > URL: https://issues.apache.org/jira/browse/BEAM-1970 > Project: Beam > Issue Type: Bug > Components: runner-flink >Reporter: Ahmet Altay >Assignee: Kenneth Knowles > Fix For: First stable release > > > Fails with error: > ClassCastException: > org.apache.beam.examples.complete.game.UserScore$GameActionInfo cannot be > cast to org.apache.beam.examples.complete.game.UserScore$GameActionInfo > full stack: > > The program finished with the following exception: > org.apache.flink.client.program.ProgramInvocationException: The main method > caused an error. > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:545) > at > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:419) > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:339) > at > org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:831) > at org.apache.flink.client.CliFrontend.run(CliFrontend.java:256) > at > org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1073) > at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1120) > at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1117) > at > org.apache.flink.runtime.security.HadoopSecurityContext$1.run(HadoopSecurityContext.java:43) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:422) > at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657) > at > org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:40) > at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1116) > Caused by: java.lang.RuntimeException: Pipeline execution failed > at org.apache.beam.runners.flink.FlinkRunner.run(FlinkRunner.java:119) > at org.apache.beam.sdk.Pipeline.run(Pipeline.java:265) > at > org.apache.beam.examples.complete.game.UserScore.main(UserScore.java:238) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:528) > ... 13 more > Caused by: org.apache.flink.client.program.ProgramInvocationException: The > program execution failed: Job execution failed. > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:427) > at >
[jira] [Commented] (BEAM-1970) Cannot run UserScore on Flink runner due to AvroCoder classload issues
[ https://issues.apache.org/jira/browse/BEAM-1970?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15990038#comment-15990038 ] Aljoscha Krettek commented on BEAM-1970: Thanks for that hint, [~StephanEwen]! I think AVRO-1283 could be what's biting us. If the caches were in a ThreadLocal we wouldn't have a problem since new Jobs run in new threads. Having them static is a problem because they survive inside TaskManager JVMs. > Cannot run UserScore on Flink runner due to AvroCoder classload issues > -- > > Key: BEAM-1970 > URL: https://issues.apache.org/jira/browse/BEAM-1970 > Project: Beam > Issue Type: Bug > Components: runner-flink >Reporter: Ahmet Altay >Assignee: Aljoscha Krettek > Fix For: First stable release > > > Fails with error: > ClassCastException: > org.apache.beam.examples.complete.game.UserScore$GameActionInfo cannot be > cast to org.apache.beam.examples.complete.game.UserScore$GameActionInfo > full stack: > > The program finished with the following exception: > org.apache.flink.client.program.ProgramInvocationException: The main method > caused an error. > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:545) > at > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:419) > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:339) > at > org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:831) > at org.apache.flink.client.CliFrontend.run(CliFrontend.java:256) > at > org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1073) > at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1120) > at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1117) > at > org.apache.flink.runtime.security.HadoopSecurityContext$1.run(HadoopSecurityContext.java:43) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:422) > at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657) > at > org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:40) > at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1116) > Caused by: java.lang.RuntimeException: Pipeline execution failed > at org.apache.beam.runners.flink.FlinkRunner.run(FlinkRunner.java:119) > at org.apache.beam.sdk.Pipeline.run(Pipeline.java:265) > at > org.apache.beam.examples.complete.game.UserScore.main(UserScore.java:238) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:528) > ... 13 more > Caused by: org.apache.flink.client.program.ProgramInvocationException: The > program execution failed: Job execution failed. > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:427) > at > org.apache.flink.yarn.YarnClusterClient.submitJob(YarnClusterClient.java:210) > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:400) > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:387) > at > org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:62) > at > org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment.executePipeline(FlinkPipelineExecutionEnvironment.java:111) > at org.apache.beam.runners.flink.FlinkRunner.run(FlinkRunner.java:116) > ... 20 more > Caused by: org.apache.flink.runtime.client.JobExecutionException: Job > execution failed. > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp(JobManager.scala:900) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:843) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:843) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) > at
[jira] [Commented] (BEAM-1970) Cannot run UserScore on Flink runner due to AvroCoder classload issues
[ https://issues.apache.org/jira/browse/BEAM-1970?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15989945#comment-15989945 ] Kenneth Knowles commented on BEAM-1970: --- >From [~StephanEwen] on u...@beam.apache.org here is some help: >https://ci.apache.org/projects/flink/flink-docs-release-1.2/monitoring/debugging_classloading.html Perhaps we can stymie the caching somewhere in how {{AvroCoder}} uses Avro, based on that write up and discussion? > Cannot run UserScore on Flink runner due to AvroCoder classload issues > -- > > Key: BEAM-1970 > URL: https://issues.apache.org/jira/browse/BEAM-1970 > Project: Beam > Issue Type: Bug > Components: runner-flink >Reporter: Ahmet Altay >Assignee: Aljoscha Krettek > Fix For: First stable release > > > Fails with error: > ClassCastException: > org.apache.beam.examples.complete.game.UserScore$GameActionInfo cannot be > cast to org.apache.beam.examples.complete.game.UserScore$GameActionInfo > full stack: > > The program finished with the following exception: > org.apache.flink.client.program.ProgramInvocationException: The main method > caused an error. > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:545) > at > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:419) > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:339) > at > org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:831) > at org.apache.flink.client.CliFrontend.run(CliFrontend.java:256) > at > org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1073) > at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1120) > at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1117) > at > org.apache.flink.runtime.security.HadoopSecurityContext$1.run(HadoopSecurityContext.java:43) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:422) > at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657) > at > org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:40) > at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1116) > Caused by: java.lang.RuntimeException: Pipeline execution failed > at org.apache.beam.runners.flink.FlinkRunner.run(FlinkRunner.java:119) > at org.apache.beam.sdk.Pipeline.run(Pipeline.java:265) > at > org.apache.beam.examples.complete.game.UserScore.main(UserScore.java:238) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:528) > ... 13 more > Caused by: org.apache.flink.client.program.ProgramInvocationException: The > program execution failed: Job execution failed. > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:427) > at > org.apache.flink.yarn.YarnClusterClient.submitJob(YarnClusterClient.java:210) > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:400) > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:387) > at > org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:62) > at > org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment.executePipeline(FlinkPipelineExecutionEnvironment.java:111) > at org.apache.beam.runners.flink.FlinkRunner.run(FlinkRunner.java:116) > ... 20 more > Caused by: org.apache.flink.runtime.client.JobExecutionException: Job > execution failed. > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp(JobManager.scala:900) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:843) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:843) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) >
[jira] [Commented] (BEAM-1970) Cannot run UserScore on Flink runner due to AvroCoder classload issues
[ https://issues.apache.org/jira/browse/BEAM-1970?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15989315#comment-15989315 ] Aljoscha Krettek commented on BEAM-1970: I think we cannot solve it from the Flink side, at least not in the short term. The long-term vision is to bring up Flink workers for a job as needed and release when not needed, that doesn't help us with our problem though. I'll look into whether the {{ThreadLocals}} are causing the problem or if it's something else. The situation is a bit tricky because {{Coders}} need to be thread safe. > Cannot run UserScore on Flink runner due to AvroCoder classload issues > -- > > Key: BEAM-1970 > URL: https://issues.apache.org/jira/browse/BEAM-1970 > Project: Beam > Issue Type: Bug > Components: runner-flink >Reporter: Ahmet Altay >Assignee: Aljoscha Krettek > Fix For: First stable release > > > Fails with error: > ClassCastException: > org.apache.beam.examples.complete.game.UserScore$GameActionInfo cannot be > cast to org.apache.beam.examples.complete.game.UserScore$GameActionInfo > full stack: > > The program finished with the following exception: > org.apache.flink.client.program.ProgramInvocationException: The main method > caused an error. > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:545) > at > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:419) > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:339) > at > org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:831) > at org.apache.flink.client.CliFrontend.run(CliFrontend.java:256) > at > org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1073) > at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1120) > at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1117) > at > org.apache.flink.runtime.security.HadoopSecurityContext$1.run(HadoopSecurityContext.java:43) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:422) > at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657) > at > org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:40) > at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1116) > Caused by: java.lang.RuntimeException: Pipeline execution failed > at org.apache.beam.runners.flink.FlinkRunner.run(FlinkRunner.java:119) > at org.apache.beam.sdk.Pipeline.run(Pipeline.java:265) > at > org.apache.beam.examples.complete.game.UserScore.main(UserScore.java:238) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:528) > ... 13 more > Caused by: org.apache.flink.client.program.ProgramInvocationException: The > program execution failed: Job execution failed. > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:427) > at > org.apache.flink.yarn.YarnClusterClient.submitJob(YarnClusterClient.java:210) > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:400) > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:387) > at > org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:62) > at > org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment.executePipeline(FlinkPipelineExecutionEnvironment.java:111) > at org.apache.beam.runners.flink.FlinkRunner.run(FlinkRunner.java:116) > ... 20 more > Caused by: org.apache.flink.runtime.client.JobExecutionException: Job > execution failed. > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp(JobManager.scala:900) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:843) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:843) > at >
[jira] [Commented] (BEAM-1970) Cannot run UserScore on Flink runner due to AvroCoder classload issues
[ https://issues.apache.org/jira/browse/BEAM-1970?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15989092#comment-15989092 ] Ahmet Altay commented on BEAM-1970: --- Thank you [~aljoscha]. I was re-using the same Flink cluster when this happened, good find. > Cannot run UserScore on Flink runner due to AvroCoder classload issues > -- > > Key: BEAM-1970 > URL: https://issues.apache.org/jira/browse/BEAM-1970 > Project: Beam > Issue Type: Bug > Components: runner-flink >Reporter: Ahmet Altay >Assignee: Aljoscha Krettek > Fix For: First stable release > > > Fails with error: > ClassCastException: > org.apache.beam.examples.complete.game.UserScore$GameActionInfo cannot be > cast to org.apache.beam.examples.complete.game.UserScore$GameActionInfo > full stack: > > The program finished with the following exception: > org.apache.flink.client.program.ProgramInvocationException: The main method > caused an error. > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:545) > at > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:419) > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:339) > at > org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:831) > at org.apache.flink.client.CliFrontend.run(CliFrontend.java:256) > at > org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1073) > at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1120) > at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1117) > at > org.apache.flink.runtime.security.HadoopSecurityContext$1.run(HadoopSecurityContext.java:43) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:422) > at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657) > at > org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:40) > at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1116) > Caused by: java.lang.RuntimeException: Pipeline execution failed > at org.apache.beam.runners.flink.FlinkRunner.run(FlinkRunner.java:119) > at org.apache.beam.sdk.Pipeline.run(Pipeline.java:265) > at > org.apache.beam.examples.complete.game.UserScore.main(UserScore.java:238) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:528) > ... 13 more > Caused by: org.apache.flink.client.program.ProgramInvocationException: The > program execution failed: Job execution failed. > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:427) > at > org.apache.flink.yarn.YarnClusterClient.submitJob(YarnClusterClient.java:210) > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:400) > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:387) > at > org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:62) > at > org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment.executePipeline(FlinkPipelineExecutionEnvironment.java:111) > at org.apache.beam.runners.flink.FlinkRunner.run(FlinkRunner.java:116) > ... 20 more > Caused by: org.apache.flink.runtime.client.JobExecutionException: Job > execution failed. > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp(JobManager.scala:900) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:843) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:843) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) > at >
[jira] [Commented] (BEAM-1970) Cannot run UserScore on Flink runner due to AvroCoder classload issues
[ https://issues.apache.org/jira/browse/BEAM-1970?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15988959#comment-15988959 ] Aljoscha Krettek commented on BEAM-1970: [~altay] I reproduced the problem but it only ever occurs when running the same program on a cluster that was already used to run the program once. I did this: * start Flink cluster * run program, executes fine * run program, fails with the exception you mentioned * stop Flink cluster * start Flink cluster again * run program, runs fine again * ... I think the problem is that Flink keeps JVMs around and that the {{AvroCoder}} caches classes. > Cannot run UserScore on Flink runner due to AvroCoder classload issues > -- > > Key: BEAM-1970 > URL: https://issues.apache.org/jira/browse/BEAM-1970 > Project: Beam > Issue Type: Bug > Components: runner-flink >Reporter: Ahmet Altay >Assignee: Aljoscha Krettek > Fix For: First stable release > > > Fails with error: > ClassCastException: > org.apache.beam.examples.complete.game.UserScore$GameActionInfo cannot be > cast to org.apache.beam.examples.complete.game.UserScore$GameActionInfo > full stack: > > The program finished with the following exception: > org.apache.flink.client.program.ProgramInvocationException: The main method > caused an error. > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:545) > at > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:419) > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:339) > at > org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:831) > at org.apache.flink.client.CliFrontend.run(CliFrontend.java:256) > at > org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1073) > at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1120) > at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1117) > at > org.apache.flink.runtime.security.HadoopSecurityContext$1.run(HadoopSecurityContext.java:43) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:422) > at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657) > at > org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:40) > at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1116) > Caused by: java.lang.RuntimeException: Pipeline execution failed > at org.apache.beam.runners.flink.FlinkRunner.run(FlinkRunner.java:119) > at org.apache.beam.sdk.Pipeline.run(Pipeline.java:265) > at > org.apache.beam.examples.complete.game.UserScore.main(UserScore.java:238) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:528) > ... 13 more > Caused by: org.apache.flink.client.program.ProgramInvocationException: The > program execution failed: Job execution failed. > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:427) > at > org.apache.flink.yarn.YarnClusterClient.submitJob(YarnClusterClient.java:210) > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:400) > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:387) > at > org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:62) > at > org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment.executePipeline(FlinkPipelineExecutionEnvironment.java:111) > at org.apache.beam.runners.flink.FlinkRunner.run(FlinkRunner.java:116) > ... 20 more > Caused by: org.apache.flink.runtime.client.JobExecutionException: Job > execution failed. > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp(JobManager.scala:900) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:843) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:843) > at >
[jira] [Commented] (BEAM-1970) Cannot run UserScore on Flink runner due to AvroCoder classload issues
[ https://issues.apache.org/jira/browse/BEAM-1970?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15976028#comment-15976028 ] Kenneth Knowles commented on BEAM-1970: --- I've now paged out the details, but there is a class getting cached that should not be. > Cannot run UserScore on Flink runner due to AvroCoder classload issues > -- > > Key: BEAM-1970 > URL: https://issues.apache.org/jira/browse/BEAM-1970 > Project: Beam > Issue Type: Bug > Components: runner-flink >Reporter: Ahmet Altay >Assignee: Aljoscha Krettek > Fix For: First stable release > > > Fails with error: > ClassCastException: > org.apache.beam.examples.complete.game.UserScore$GameActionInfo cannot be > cast to org.apache.beam.examples.complete.game.UserScore$GameActionInfo > full stack: > > The program finished with the following exception: > org.apache.flink.client.program.ProgramInvocationException: The main method > caused an error. > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:545) > at > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:419) > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:339) > at > org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:831) > at org.apache.flink.client.CliFrontend.run(CliFrontend.java:256) > at > org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1073) > at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1120) > at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1117) > at > org.apache.flink.runtime.security.HadoopSecurityContext$1.run(HadoopSecurityContext.java:43) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:422) > at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657) > at > org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:40) > at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1116) > Caused by: java.lang.RuntimeException: Pipeline execution failed > at org.apache.beam.runners.flink.FlinkRunner.run(FlinkRunner.java:119) > at org.apache.beam.sdk.Pipeline.run(Pipeline.java:265) > at > org.apache.beam.examples.complete.game.UserScore.main(UserScore.java:238) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:528) > ... 13 more > Caused by: org.apache.flink.client.program.ProgramInvocationException: The > program execution failed: Job execution failed. > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:427) > at > org.apache.flink.yarn.YarnClusterClient.submitJob(YarnClusterClient.java:210) > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:400) > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:387) > at > org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:62) > at > org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment.executePipeline(FlinkPipelineExecutionEnvironment.java:111) > at org.apache.beam.runners.flink.FlinkRunner.run(FlinkRunner.java:116) > ... 20 more > Caused by: org.apache.flink.runtime.client.JobExecutionException: Job > execution failed. > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp(JobManager.scala:900) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:843) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:843) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) > at > scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) > at >