Thank Aljoscha, your solution works. For future, after building a fat jar, one must replace the package: jarjar --mode process --rules rules.txt target/dataflow-test-1.0-SNAPSHOT.jar --output target/dataflow-test-1.0-SNAPSHOT.shade.jar
flink run --class com.mycompany.dataflow.WordCount target/dataflow-test-1.0-SNAPSHOT.shade.jar --runner=org.apache.beam.runners.flink.FlinkRunner --input=/tmp/kinglear.txt --output=/tmp/wordcounts.txt I will update the example repo with the solution in case someone has same problem. 2016-09-05 17:41 GMT+02:00 Pawel Szczur <[email protected]>: > Thanks. > > As far as I found Protobuf is used in two places independently in Beam: > - ProtoCoder, dependency in: (https://github.com/apache/inc > ubator-beam/blob/master/sdks/java/core/pom.xml) > - in FlinkRunner code, it's transient comes from org.apache.flink > > Now I'm thinking, would it be possible to shade the com.google.protobuf in > all Flink cluster and flink-runner.jar and leaving the Beam (i could use > version 3.0.0-beta1 for my proto)? > (I've tried, and I think it doesn't really work) > > I will try your approach with shading the proto in Beam and my program. > I'm using jarjar to replace package name in jar. > > 2016-09-05 15:41 GMT+02:00 Aljoscha Krettek <[email protected]>: > >> This seems to be a bigger problem. I got it to compile and (almost) run >> by shading away the protobuf dependency of Beam in the Flink Runner jar: >> https://github.com/apache/incubator-beam/blob/c83783209 >> f739fc541cea25f22cfe542b75ffa55/runners/flink/runner/pom.xml#L223. This >> does not really work, though, since now your code will not use a ProtoCoder >> but a SerializableCoder for your protobuf-generated class. The problem is >> that Beam uses reflection to determine whether a ProtoCoder can be used on >> user classes. Now, your user class will be derived from >> com.google.protobuf.MessageOrBuilder but the Beam code will look for >> something like flink.relocated.com.google.protobuf.MessageOrBuilder so >> that doesn't work. >> >> The only solution I see for this is to compile both Beam and the user >> program, create a fat-jar from this and then create another fat jar where >> Protobuf is relocated in all the code, i.e. in both the Beam code and the >> user code. That's not a very nice solution, though. >> >> On Sat, 3 Sep 2016 at 15:13 Pawel Szczur <[email protected]> wrote: >> >>> I've tried version 2.5.0, no difference. I've found that problem is in a >>> Beam Coder code. It assumes 3.0 version of ProtoBuf and Flink enforces >>> 2.5.0. I'm not sure if it's possible to enforce other proto version in >>> Flink or rename it by adding some prefix like 'org.apache.flink' >>> +'com.google.protobuf' when used in Flink. >>> >>> 2016-09-02 12:06 GMT+02:00 Aljoscha Krettek <[email protected]>: >>> >>>> I imagine that it's caused by the classpath being different when you >>>> run it using the Flink command. It might also be that your program fails at >>>> a different point once you fix the first problem, due to the protobuf >>>> mismatches. >>>> >>>> On Fri, 2 Sep 2016 at 11:59 Pawel Szczur <[email protected]> wrote: >>>> >>>>> Ok, I will try 2.5.0 (I use proto2 syntax anyway), but then, how is it >>>>> that when I run it as a binary it works fine, but sent to local cluster it >>>>> fails? >>>>> >>>>> 2016-09-02 11:42 GMT+02:00 Aljoscha Krettek <[email protected]>: >>>>> >>>>>> Yep, as I said the problem is most likely that Flink has a dependency >>>>>> on a different version of protobuf so that clashes with the version that >>>>>> Beam provides or that you have as a dependency. Have you tried setting >>>>>> 2.5.0 as the version, since that's what Flink uses. In the end that's >>>>>> not a >>>>>> proper solution, however, and both Flink and Beam should likely shade >>>>>> their >>>>>> protobuf dependency, not sure of that's possible, though. >>>>>> >>>>>> On Fri, 2 Sep 2016 at 11:31 Pawel Szczur <[email protected]> >>>>>> wrote: >>>>>> >>>>>>> Thanks for reply. >>>>>>> >>>>>>> I've tried that, I think it didn't work . I've explicitly tried >>>>>>> version 3.0.0-beta1 to be compatible with Beam and in another test 3.0.0 >>>>>>> just for curiosity. It still didn't work. >>>>>>> >>>>>>> I've added explicitly to pom.xml: >>>>>>> >>>>>>> <dependency> >>>>>>> <groupId>com.google.protobuf</groupId> >>>>>>> <artifactId>protobuf-java</artifactId> >>>>>>> <version>3.0.0-beta1</version> >>>>>>> </dependency> >>>>>>> >>>>>>> Did I miss some param? >>>>>>> >>>>>>> 2016-09-02 11:22 GMT+02:00 Aljoscha Krettek <[email protected]>: >>>>>>> >>>>>>>> Hi, >>>>>>>> I think this is the classic problem of dependency version >>>>>>>> conflicts. Beam has a dependency on protobuf 3.0.0-beta1 while Flink >>>>>>>> has a >>>>>>>> dependency on protobuf 2.5.0 (through Akka). I think when running your >>>>>>>> program through the bin/flink command the order in the classpath might >>>>>>>> be >>>>>>>> different and you're getting the wrong version. >>>>>>>> >>>>>>>> As an immediate fix, I think you could try having your own >>>>>>>> dependency on protobuf and shade that, so that you don't have version >>>>>>>> conflicts. >>>>>>>> >>>>>>>> Cheers, >>>>>>>> Aljoscha >>>>>>>> >>>>>>>> On Thu, 1 Sep 2016 at 16:15 Pawel Szczur <[email protected]> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Hi, >>>>>>>>> >>>>>>>>> I'm trying to run a simple pipeline on local cluster using >>>>>>>>> Protocol Buffer to pass data between Beam functions. >>>>>>>>> >>>>>>>>> Everything works fine if I run it through: >>>>>>>>> java -jar target/dataflow-test-1.0-SNAPSHOT.jar >>>>>>>>> --runner=org.apache.beam.runners.flink.FlinkRunner >>>>>>>>> --input=/tmp/kinglear.txt --output=/tmp/wordcounts.txt >>>>>>>>> >>>>>>>>> But it fails when trying to run on flink cluster: >>>>>>>>> flink run target/dataflow-test-1.0-SNAPSHOT.jar >>>>>>>>> --runner=org.apache.beam.runners.flink.FlinkRunner >>>>>>>>> --input=/tmp/kinglear.txt --output=/tmp/wordcounts.txt >>>>>>>>> >>>>>>>>> The ready to run project: >>>>>>>>> https://github.com/orian/beam-flink-local-cluster >>>>>>>>> >>>>>>>>> Any clues? >>>>>>>>> >>>>>>>>> Cheers, Pawel >>>>>>>>> >>>>>>>>> ------------------------------------------------------------ >>>>>>>>> The program finished with the following exception: >>>>>>>>> >>>>>>>>> java.lang.NoSuchMethodError: com.google.protobuf.ExtensionR >>>>>>>>> egistry.getAllImmutableExtensionsByExtendedType(Ljava/lang/S >>>>>>>>> tring;)Ljava/util/Set; >>>>>>>>> at com.mycompany.dataflow.WordCount.main(WordCount.java:109) >>>>>>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >>>>>>>>> at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAcce >>>>>>>>> ssorImpl.java:62) >>>>>>>>> at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMe >>>>>>>>> thodAccessorImpl.java:43) >>>>>>>>> at java.lang.reflect.Method.invoke(Method.java:498) >>>>>>>>> at org.apache.flink.client.program.PackagedProgram.callMainMeth >>>>>>>>> od(PackagedProgram.java:505) >>>>>>>>> at org.apache.flink.client.program.PackagedProgram.invokeIntera >>>>>>>>> ctiveModeForExecution(PackagedProgram.java:403) >>>>>>>>> at org.apache.flink.client.program.Client.runBlocking(Client.ja >>>>>>>>> va:248) >>>>>>>>> at org.apache.flink.client.CliFrontend.executeProgramBlocking(C >>>>>>>>> liFrontend.java:866) >>>>>>>>> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333) >>>>>>>>> at org.apache.flink.client.CliFrontend.parseParameters(CliFront >>>>>>>>> end.java:1192) >>>>>>>>> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1243) >>>>>>>>> >>>>>>>> >>>>>>> >>>>> >>> >
