Can we bump the version of Flink in Beam? Is there a reason to stuck with 1.0.3? From my experience it tends to be harder and harder to change in future.
2016-09-05 18:41 GMT+02:00 Aljoscha Krettek <[email protected]>: > I actually just got it to work by updating the Akka dependency of Flink to > 2.4.9 (Akka 2.4.x does not have a Protobuf dependency anymore and Protobuf > only came into Flink as a transitive dependency.) I did this on Flink 1.1.2 > so I also had to update the Flink Runner to 1.1.2 (I also changed the > Runner pom to create a shaded "bundled" jar). Here are the two branches > that you can use to get it to work: > > - https://github.com/aljoscha/flink/tree/flink-1.1.2-akka-2.4.9 > - https://github.com/aljoscha/incubator-beam/tree/flink-1.1.2 > > The steps I did: > - checkout Flink > - run "mvn clean install -DskipTests" > - checkout Beam > - run "mvn clean install -DskipTests" > - build your testing project using "mvn clean package" > - copy beam-runners-flink_2.10-bundled-0.3.0-incubating-SNAPSHOT.jar > from <beam-root>/runners/flink/runner/target to the lib/ folder of the > Flink install > - copy the jar from your testing project to the lib/ folder as well (this > is important) > - only now start the cluster > - run using "bin/flink run" while also specifying your jar > > One caveat is that you have to move the program jar to the lib folder as > well because of some class loader issues. It doesn't work if you simply > give it as an argument to "bin/flink run". Also, the Web Dashboard seems to > not work with those two jars in the lib folder. Probably because there is > some stuff in those jars that shouldn't really be there. > > In the future we should probably provide ready-made packages for this and > update both Flink and Beam. > > Cheers, > Aljoscha > > P.S. While writing this I just saw your second mail. Good that you also > found a solution! :-) > > On Mon, 5 Sep 2016 at 17:42 Pawel Szczur <[email protected]> wrote: > >> Thanks. >> >> As far as I found Protobuf is used in two places independently in Beam: >> - ProtoCoder, dependency in: (https://github.com/apache/ >> incubator-beam/blob/master/sdks/java/core/pom.xml) >> - in FlinkRunner code, it's transient comes from org.apache.flink >> >> Now I'm thinking, would it be possible to shade the com.google.protobuf >> in all Flink cluster and flink-runner.jar and leaving the Beam (i could use >> version 3.0.0-beta1 for my proto)? >> (I've tried, and I think it doesn't really work) >> >> I will try your approach with shading the proto in Beam and my program. >> I'm using jarjar to replace package name in jar. >> >> 2016-09-05 15:41 GMT+02:00 Aljoscha Krettek <[email protected]>: >> >>> This seems to be a bigger problem. I got it to compile and (almost) run >>> by shading away the protobuf dependency of Beam in the Flink Runner jar: >>> https://github.com/apache/incubator-beam/blob/ >>> c83783209f739fc541cea25f22cfe542b75ffa55/runners/flink/ >>> runner/pom.xml#L223. This does not really work, though, since now your >>> code will not use a ProtoCoder but a SerializableCoder for your >>> protobuf-generated class. The problem is that Beam uses reflection to >>> determine whether a ProtoCoder can be used on user classes. Now, your user >>> class will be derived from com.google.protobuf.MessageOrBuilder but the >>> Beam code will look for something like >>> flink.relocated.com.google.protobuf.MessageOrBuilder >>> so that doesn't work. >>> >>> The only solution I see for this is to compile both Beam and the user >>> program, create a fat-jar from this and then create another fat jar where >>> Protobuf is relocated in all the code, i.e. in both the Beam code and the >>> user code. That's not a very nice solution, though. >>> >>> On Sat, 3 Sep 2016 at 15:13 Pawel Szczur <[email protected]> wrote: >>> >>>> I've tried version 2.5.0, no difference. I've found that problem is in >>>> a Beam Coder code. It assumes 3.0 version of ProtoBuf and Flink enforces >>>> 2.5.0. I'm not sure if it's possible to enforce other proto version in >>>> Flink or rename it by adding some prefix like 'org.apache.flink' >>>> +'com.google.protobuf' when used in Flink. >>>> >>>> 2016-09-02 12:06 GMT+02:00 Aljoscha Krettek <[email protected]>: >>>> >>>>> I imagine that it's caused by the classpath being different when you >>>>> run it using the Flink command. It might also be that your program fails >>>>> at >>>>> a different point once you fix the first problem, due to the protobuf >>>>> mismatches. >>>>> >>>>> On Fri, 2 Sep 2016 at 11:59 Pawel Szczur <[email protected]> >>>>> wrote: >>>>> >>>>>> Ok, I will try 2.5.0 (I use proto2 syntax anyway), but then, how is >>>>>> it that when I run it as a binary it works fine, but sent to local >>>>>> cluster >>>>>> it fails? >>>>>> >>>>>> 2016-09-02 11:42 GMT+02:00 Aljoscha Krettek <[email protected]>: >>>>>> >>>>>>> Yep, as I said the problem is most likely that Flink has a >>>>>>> dependency on a different version of protobuf so that clashes with the >>>>>>> version that Beam provides or that you have as a dependency. Have you >>>>>>> tried >>>>>>> setting 2.5.0 as the version, since that's what Flink uses. In the end >>>>>>> that's not a proper solution, however, and both Flink and Beam should >>>>>>> likely shade their protobuf dependency, not sure of that's possible, >>>>>>> though. >>>>>>> >>>>>>> On Fri, 2 Sep 2016 at 11:31 Pawel Szczur <[email protected]> >>>>>>> wrote: >>>>>>> >>>>>>>> Thanks for reply. >>>>>>>> >>>>>>>> I've tried that, I think it didn't work . I've explicitly tried >>>>>>>> version 3.0.0-beta1 to be compatible with Beam and in another test >>>>>>>> 3.0.0 >>>>>>>> just for curiosity. It still didn't work. >>>>>>>> >>>>>>>> I've added explicitly to pom.xml: >>>>>>>> >>>>>>>> <dependency> >>>>>>>> <groupId>com.google.protobuf</groupId> >>>>>>>> <artifactId>protobuf-java</artifactId> >>>>>>>> <version>3.0.0-beta1</version> >>>>>>>> </dependency> >>>>>>>> >>>>>>>> Did I miss some param? >>>>>>>> >>>>>>>> 2016-09-02 11:22 GMT+02:00 Aljoscha Krettek <[email protected]>: >>>>>>>> >>>>>>>>> Hi, >>>>>>>>> I think this is the classic problem of dependency version >>>>>>>>> conflicts. Beam has a dependency on protobuf 3.0.0-beta1 while Flink >>>>>>>>> has a >>>>>>>>> dependency on protobuf 2.5.0 (through Akka). I think when running your >>>>>>>>> program through the bin/flink command the order in the classpath >>>>>>>>> might be >>>>>>>>> different and you're getting the wrong version. >>>>>>>>> >>>>>>>>> As an immediate fix, I think you could try having your own >>>>>>>>> dependency on protobuf and shade that, so that you don't have version >>>>>>>>> conflicts. >>>>>>>>> >>>>>>>>> Cheers, >>>>>>>>> Aljoscha >>>>>>>>> >>>>>>>>> On Thu, 1 Sep 2016 at 16:15 Pawel Szczur <[email protected]> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Hi, >>>>>>>>>> >>>>>>>>>> I'm trying to run a simple pipeline on local cluster using >>>>>>>>>> Protocol Buffer to pass data between Beam functions. >>>>>>>>>> >>>>>>>>>> Everything works fine if I run it through: >>>>>>>>>> java -jar target/dataflow-test-1.0-SNAPSHOT.jar >>>>>>>>>> --runner=org.apache.beam.runners.flink.FlinkRunner >>>>>>>>>> --input=/tmp/kinglear.txt --output=/tmp/wordcounts.txt >>>>>>>>>> >>>>>>>>>> But it fails when trying to run on flink cluster: >>>>>>>>>> flink run target/dataflow-test-1.0-SNAPSHOT.jar >>>>>>>>>> --runner=org.apache.beam.runners.flink.FlinkRunner >>>>>>>>>> --input=/tmp/kinglear.txt --output=/tmp/wordcounts.txt >>>>>>>>>> >>>>>>>>>> The ready to run project: >>>>>>>>>> https://github.com/orian/beam-flink-local-cluster >>>>>>>>>> >>>>>>>>>> Any clues? >>>>>>>>>> >>>>>>>>>> Cheers, Pawel >>>>>>>>>> >>>>>>>>>> ------------------------------------------------------------ >>>>>>>>>> The program finished with the following exception: >>>>>>>>>> >>>>>>>>>> java.lang.NoSuchMethodError: com.google.protobuf. >>>>>>>>>> ExtensionRegistry.getAllImmutableExtensionsByExt >>>>>>>>>> endedType(Ljava/lang/String;)Ljava/util/Set; >>>>>>>>>> at com.mycompany.dataflow.WordCount.main(WordCount.java:109) >>>>>>>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >>>>>>>>>> at sun.reflect.NativeMethodAccessorImpl.invoke( >>>>>>>>>> NativeMethodAccessorImpl.java:62) >>>>>>>>>> at sun.reflect.DelegatingMethodAccessorImpl.invoke( >>>>>>>>>> DelegatingMethodAccessorImpl.java:43) >>>>>>>>>> at java.lang.reflect.Method.invoke(Method.java:498) >>>>>>>>>> at org.apache.flink.client.program.PackagedProgram. >>>>>>>>>> callMainMethod(PackagedProgram.java:505) >>>>>>>>>> at org.apache.flink.client.program.PackagedProgram. >>>>>>>>>> invokeInteractiveModeForExecution(PackagedProgram.java:403) >>>>>>>>>> at org.apache.flink.client.program.Client.runBlocking( >>>>>>>>>> Client.java:248) >>>>>>>>>> at org.apache.flink.client.CliFrontend.executeProgramBlocking( >>>>>>>>>> CliFrontend.java:866) >>>>>>>>>> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333) >>>>>>>>>> at org.apache.flink.client.CliFrontend.parseParameters( >>>>>>>>>> CliFrontend.java:1192) >>>>>>>>>> at org.apache.flink.client.CliFrontend.main(CliFrontend. >>>>>>>>>> java:1243) >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>> >>>> >>
