Thanks. As far as I found Protobuf is used in two places independently in Beam: - ProtoCoder, dependency in: (https://github.com/apache/ incubator-beam/blob/master/sdks/java/core/pom.xml) - in FlinkRunner code, it's transient comes from org.apache.flink
Now I'm thinking, would it be possible to shade the com.google.protobuf in all Flink cluster and flink-runner.jar and leaving the Beam (i could use version 3.0.0-beta1 for my proto)? (I've tried, and I think it doesn't really work) I will try your approach with shading the proto in Beam and my program. I'm using jarjar to replace package name in jar. 2016-09-05 15:41 GMT+02:00 Aljoscha Krettek <[email protected]>: > This seems to be a bigger problem. I got it to compile and (almost) run by > shading away the protobuf dependency of Beam in the Flink Runner jar: > https://github.com/apache/incubator-beam/blob/c83783209 > f739fc541cea25f22cfe542b75ffa55/runners/flink/runner/pom.xml#L223. This > does not really work, though, since now your code will not use a ProtoCoder > but a SerializableCoder for your protobuf-generated class. The problem is > that Beam uses reflection to determine whether a ProtoCoder can be used on > user classes. Now, your user class will be derived from > com.google.protobuf.MessageOrBuilder but the Beam code will look for > something like flink.relocated.com.google.protobuf.MessageOrBuilder so > that doesn't work. > > The only solution I see for this is to compile both Beam and the user > program, create a fat-jar from this and then create another fat jar where > Protobuf is relocated in all the code, i.e. in both the Beam code and the > user code. That's not a very nice solution, though. > > On Sat, 3 Sep 2016 at 15:13 Pawel Szczur <[email protected]> wrote: > >> I've tried version 2.5.0, no difference. I've found that problem is in a >> Beam Coder code. It assumes 3.0 version of ProtoBuf and Flink enforces >> 2.5.0. I'm not sure if it's possible to enforce other proto version in >> Flink or rename it by adding some prefix like 'org.apache.flink' >> +'com.google.protobuf' when used in Flink. >> >> 2016-09-02 12:06 GMT+02:00 Aljoscha Krettek <[email protected]>: >> >>> I imagine that it's caused by the classpath being different when you run >>> it using the Flink command. It might also be that your program fails at a >>> different point once you fix the first problem, due to the protobuf >>> mismatches. >>> >>> On Fri, 2 Sep 2016 at 11:59 Pawel Szczur <[email protected]> wrote: >>> >>>> Ok, I will try 2.5.0 (I use proto2 syntax anyway), but then, how is it >>>> that when I run it as a binary it works fine, but sent to local cluster it >>>> fails? >>>> >>>> 2016-09-02 11:42 GMT+02:00 Aljoscha Krettek <[email protected]>: >>>> >>>>> Yep, as I said the problem is most likely that Flink has a dependency >>>>> on a different version of protobuf so that clashes with the version that >>>>> Beam provides or that you have as a dependency. Have you tried setting >>>>> 2.5.0 as the version, since that's what Flink uses. In the end that's not >>>>> a >>>>> proper solution, however, and both Flink and Beam should likely shade >>>>> their >>>>> protobuf dependency, not sure of that's possible, though. >>>>> >>>>> On Fri, 2 Sep 2016 at 11:31 Pawel Szczur <[email protected]> >>>>> wrote: >>>>> >>>>>> Thanks for reply. >>>>>> >>>>>> I've tried that, I think it didn't work . I've explicitly tried >>>>>> version 3.0.0-beta1 to be compatible with Beam and in another test 3.0.0 >>>>>> just for curiosity. It still didn't work. >>>>>> >>>>>> I've added explicitly to pom.xml: >>>>>> >>>>>> <dependency> >>>>>> <groupId>com.google.protobuf</groupId> >>>>>> <artifactId>protobuf-java</artifactId> >>>>>> <version>3.0.0-beta1</version> >>>>>> </dependency> >>>>>> >>>>>> Did I miss some param? >>>>>> >>>>>> 2016-09-02 11:22 GMT+02:00 Aljoscha Krettek <[email protected]>: >>>>>> >>>>>>> Hi, >>>>>>> I think this is the classic problem of dependency version conflicts. >>>>>>> Beam has a dependency on protobuf 3.0.0-beta1 while Flink has a >>>>>>> dependency >>>>>>> on protobuf 2.5.0 (through Akka). I think when running your program >>>>>>> through >>>>>>> the bin/flink command the order in the classpath might be different and >>>>>>> you're getting the wrong version. >>>>>>> >>>>>>> As an immediate fix, I think you could try having your own >>>>>>> dependency on protobuf and shade that, so that you don't have version >>>>>>> conflicts. >>>>>>> >>>>>>> Cheers, >>>>>>> Aljoscha >>>>>>> >>>>>>> On Thu, 1 Sep 2016 at 16:15 Pawel Szczur <[email protected]> >>>>>>> wrote: >>>>>>> >>>>>>>> Hi, >>>>>>>> >>>>>>>> I'm trying to run a simple pipeline on local cluster using Protocol >>>>>>>> Buffer to pass data between Beam functions. >>>>>>>> >>>>>>>> Everything works fine if I run it through: >>>>>>>> java -jar target/dataflow-test-1.0-SNAPSHOT.jar >>>>>>>> --runner=org.apache.beam.runners.flink.FlinkRunner >>>>>>>> --input=/tmp/kinglear.txt --output=/tmp/wordcounts.txt >>>>>>>> >>>>>>>> But it fails when trying to run on flink cluster: >>>>>>>> flink run target/dataflow-test-1.0-SNAPSHOT.jar >>>>>>>> --runner=org.apache.beam.runners.flink.FlinkRunner >>>>>>>> --input=/tmp/kinglear.txt --output=/tmp/wordcounts.txt >>>>>>>> >>>>>>>> The ready to run project: >>>>>>>> https://github.com/orian/beam-flink-local-cluster >>>>>>>> >>>>>>>> Any clues? >>>>>>>> >>>>>>>> Cheers, Pawel >>>>>>>> >>>>>>>> ------------------------------------------------------------ >>>>>>>> The program finished with the following exception: >>>>>>>> >>>>>>>> java.lang.NoSuchMethodError: com.google.protobuf.ExtensionR >>>>>>>> egistry.getAllImmutableExtensionsByExtendedType(Ljava/lang/ >>>>>>>> String;)Ljava/util/Set; >>>>>>>> at com.mycompany.dataflow.WordCount.main(WordCount.java:109) >>>>>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >>>>>>>> at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAcce >>>>>>>> ssorImpl.java:62) >>>>>>>> at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMe >>>>>>>> thodAccessorImpl.java:43) >>>>>>>> at java.lang.reflect.Method.invoke(Method.java:498) >>>>>>>> at org.apache.flink.client.program.PackagedProgram.callMainMeth >>>>>>>> od(PackagedProgram.java:505) >>>>>>>> at org.apache.flink.client.program.PackagedProgram.invokeIntera >>>>>>>> ctiveModeForExecution(PackagedProgram.java:403) >>>>>>>> at org.apache.flink.client.program.Client.runBlocking(Client. >>>>>>>> java:248) >>>>>>>> at org.apache.flink.client.CliFrontend.executeProgramBlocking(C >>>>>>>> liFrontend.java:866) >>>>>>>> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333) >>>>>>>> at org.apache.flink.client.CliFrontend.parseParameters(CliFront >>>>>>>> end.java:1192) >>>>>>>> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1243) >>>>>>>> >>>>>>> >>>>>> >>>> >>
