[ https://issues.apache.org/jira/browse/BEAM-2943?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Maximilian Michels resolved BEAM-2943. -------------------------------------- Resolution: Fixed Assignee: Maximilian Michels (was: Aljoscha Krettek) Fix Version/s: Not applicable The Flink Runner page has been updated. > Beam Flink deployment results in ClassNotFoundException > ------------------------------------------------------- > > Key: BEAM-2943 > URL: https://issues.apache.org/jira/browse/BEAM-2943 > Project: Beam > Issue Type: Bug > Components: runner-flink > Affects Versions: 2.1.0 > Environment: Debian 9.1 / 4.9.0-3-amd64 #1 SMP Debian 4.9.30-2+deb9u3 > (2017-08-06) x86_64 GNU/Linux > Reporter: Guenther Grill > Assignee: Maximilian Michels > Priority: Major > Labels: flink > Fix For: Not applicable > > Time Spent: 20m > Remaining Estimate: 0h > > Hi, > I followed the guide https://beam.apache.org/get-started/quickstart-java/ to > run beam program within a flink cluster. > The output of the dependency-command is: > {code} > mvn dependency:tree -Pflink-runner |grep flink > > [INFO] \- org.apache.beam:beam-runners-flink_2.10:jar:2.1.0:runtime > [INFO] +- org.apache.flink:flink-clients_2.10:jar:1.3.0:runtime > [INFO] | +- org.apache.flink:flink-optimizer_2.10:jar:1.3.0:runtime > [INFO] | \- org.apache.flink:force-shading:jar:1.3.0:runtime > [INFO] +- org.apache.flink:flink-core:jar:1.3.0:runtime > [INFO] | +- org.apache.flink:flink-annotations:jar:1.3.0:runtime > [INFO] +- org.apache.flink:flink-metrics-core:jar:1.3.0:runtime > [INFO] +- org.apache.flink:flink-java:jar:1.3.0:runtime > [INFO] | +- org.apache.flink:flink-shaded-hadoop2:jar:1.3.0:runtime > [INFO] +- org.apache.flink:flink-runtime_2.10:jar:1.3.0:runtime > [INFO] +- org.apache.flink:flink-streaming-java_2.10:jar:1.3.0:runtime > {code} > Then I started the flink cluster with the correct version with docker-compose > {code} > export JOB_MANAGER_RPC_ADDRESS=[HOST_IP] > export FLINK_DOCKER_IMAGE_NAME=flink:1.3.0-hadoop27-scala_2.10 > docker-compose up -d > {code} > The compose file looks like this: > {code} > version: '3.3' > services: > jobmanager: > image: ${FLINK_DOCKER_IMAGE_NAME:-flink} > expose: > - "6123" > ports: > - "6123:6123" > - "8081:8081" > volumes: > - /tmp:/tmp > command: jobmanager > environment: > - JOB_MANAGER_RPC_ADDRESS=[HOST_IP] > taskmanager: > image: ${FLINK_DOCKER_IMAGE_NAME:-flink} > expose: > - "6121" > - "6122" > depends_on: > - jobmanager > command: taskmanager > environment: > - JOB_MANAGER_RPC_ADDRESS=[HOST_IP] > {code} > The flink cluster works, but when I execute > {code} > mvn exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount \ > -Pflink-runner \ > -Dexec.args="--runner=FlinkRunner \ > --inputFile=pom.xml \ > --output=/path/to/counts \ > --flinkMaster=[HOST_IP]:6123 \ > --filesToStage=target/word-count-beam-bundled-0.1.jar" > {code} > I get: > {code} > 2017-09-12 06:39:57,226 INFO org.apache.flink.runtime.jobmanager.JobManager > - Submitting job a913f922506053e65e732eeb8336b3bd > (wordcount-grg-0912063956-c7ea6199). > 2017-09-12 06:39:57,227 INFO org.apache.flink.runtime.jobmanager.JobManager > - Using restart strategy NoRestartStrategy for > a913f922506053e65e732eeb8336b3bd. > 2017-09-12 06:39:57,227 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph - Job recovers > via failover strategy: full graph restart > 2017-09-12 06:39:57,229 INFO org.apache.flink.runtime.jobmanager.JobManager > - Running initialization on master for job > wordcount-grg-0912063956-c7ea6199 (a913f922506053e65e732eeb8336b3bd). > 2017-09-12 06:39:57,230 ERROR org.apache.flink.runtime.jobmanager.JobManager > - Failed to submit job a913f922506053e65e732eeb8336b3bd > (wordcount-grg-0912063956-c7ea6199) > org.apache.flink.runtime.client.JobExecutionException: Cannot initialize task > 'DataSource (at Read(CompressedSource) > (org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat))': > Deserializing the InputFormat > (org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat@58e7a91a) > failed: Could not read the user code wrapper: > org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat > at > org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:153) > at > org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:1315) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:495) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) > at > org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:38) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) > at > org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33) > at > org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28) > at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118) > at > org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28) > at akka.actor.Actor$class.aroundReceive(Actor.scala:467) > at > org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:125) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) > at akka.actor.ActorCell.invoke(ActorCell.scala:487) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) > at akka.dispatch.Mailbox.run(Mailbox.scala:220) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: java.lang.Exception: Deserializing the InputFormat > (org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat@58e7a91a) > failed: Could not read the user code wrapper: > org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat > at > org.apache.flink.runtime.jobgraph.InputFormatVertex.initializeOnMaster(InputFormatVertex.java:66) > at > org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:150) > ... 24 more > Caused by: > org.apache.flink.runtime.operators.util.CorruptConfigurationException: Could > not read the user code wrapper: > org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat > at > org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:290) > at > org.apache.flink.runtime.jobgraph.InputFormatVertex.initializeOnMaster(InputFormatVertex.java:63) > ... 25 more > Caused by: java.lang.ClassNotFoundException: > org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat > at java.net.URLClassLoader.findClass(URLClassLoader.java:381) > at java.lang.ClassLoader.loadClass(ClassLoader.java:424) > at java.lang.ClassLoader.loadClass(ClassLoader.java:357) > at java.lang.Class.forName0(Native Method) > at java.lang.Class.forName(Class.java:348) > at > org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:64) > at > java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1826) > at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1713) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2000) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) > at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245) > at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169) > at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) > at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422) > at > org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:290) > at > org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:248) > at > org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:288) > ... 26 more > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)