I imagine that it's caused by the classpath being different when you run it using the Flink command. It might also be that your program fails at a different point once you fix the first problem, due to the protobuf mismatches.
On Fri, 2 Sep 2016 at 11:59 Pawel Szczur <[email protected]> wrote: > Ok, I will try 2.5.0 (I use proto2 syntax anyway), but then, how is it > that when I run it as a binary it works fine, but sent to local cluster it > fails? > > 2016-09-02 11:42 GMT+02:00 Aljoscha Krettek <[email protected]>: > >> Yep, as I said the problem is most likely that Flink has a dependency on >> a different version of protobuf so that clashes with the version that Beam >> provides or that you have as a dependency. Have you tried setting 2.5.0 as >> the version, since that's what Flink uses. In the end that's not a proper >> solution, however, and both Flink and Beam should likely shade their >> protobuf dependency, not sure of that's possible, though. >> >> On Fri, 2 Sep 2016 at 11:31 Pawel Szczur <[email protected]> wrote: >> >>> Thanks for reply. >>> >>> I've tried that, I think it didn't work . I've explicitly tried version >>> 3.0.0-beta1 to be compatible with Beam and in another test 3.0.0 just for >>> curiosity. It still didn't work. >>> >>> I've added explicitly to pom.xml: >>> >>> <dependency> >>> <groupId>com.google.protobuf</groupId> >>> <artifactId>protobuf-java</artifactId> >>> <version>3.0.0-beta1</version> >>> </dependency> >>> >>> Did I miss some param? >>> >>> 2016-09-02 11:22 GMT+02:00 Aljoscha Krettek <[email protected]>: >>> >>>> Hi, >>>> I think this is the classic problem of dependency version conflicts. >>>> Beam has a dependency on protobuf 3.0.0-beta1 while Flink has a dependency >>>> on protobuf 2.5.0 (through Akka). I think when running your program through >>>> the bin/flink command the order in the classpath might be different and >>>> you're getting the wrong version. >>>> >>>> As an immediate fix, I think you could try having your own dependency >>>> on protobuf and shade that, so that you don't have version conflicts. >>>> >>>> Cheers, >>>> Aljoscha >>>> >>>> On Thu, 1 Sep 2016 at 16:15 Pawel Szczur <[email protected]> wrote: >>>> >>>>> Hi, >>>>> >>>>> I'm trying to run a simple pipeline on local cluster using Protocol >>>>> Buffer to pass data between Beam functions. >>>>> >>>>> Everything works fine if I run it through: >>>>> java -jar target/dataflow-test-1.0-SNAPSHOT.jar >>>>> --runner=org.apache.beam.runners.flink.FlinkRunner >>>>> --input=/tmp/kinglear.txt --output=/tmp/wordcounts.txt >>>>> >>>>> But it fails when trying to run on flink cluster: >>>>> flink run target/dataflow-test-1.0-SNAPSHOT.jar >>>>> --runner=org.apache.beam.runners.flink.FlinkRunner >>>>> --input=/tmp/kinglear.txt --output=/tmp/wordcounts.txt >>>>> >>>>> The ready to run project: >>>>> https://github.com/orian/beam-flink-local-cluster >>>>> >>>>> Any clues? >>>>> >>>>> Cheers, Pawel >>>>> >>>>> ------------------------------------------------------------ >>>>> The program finished with the following exception: >>>>> >>>>> java.lang.NoSuchMethodError: >>>>> com.google.protobuf.ExtensionRegistry.getAllImmutableExtensionsByExtendedType(Ljava/lang/String;)Ljava/util/Set; >>>>> at com.mycompany.dataflow.WordCount.main(WordCount.java:109) >>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >>>>> at >>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >>>>> at >>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>>>> at java.lang.reflect.Method.invoke(Method.java:498) >>>>> at >>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505) >>>>> at >>>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403) >>>>> at org.apache.flink.client.program.Client.runBlocking(Client.java:248) >>>>> at >>>>> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866) >>>>> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333) >>>>> at >>>>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1192) >>>>> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1243) >>>>> >>>> >>> >
