Hi Pawel, https://github.com/apache/incubator-beam/blob/master/runners/flink/pom.xml#L42
Dan On Fri, Nov 4, 2016 at 2:08 PM, Pawel Szczur <[email protected]> wrote: > Can we bump the version of Flink in Beam? Is there a reason to stuck with > 1.0.3? From my experience it tends to be harder and harder to change in > future. > > 2016-09-05 18:41 GMT+02:00 Aljoscha Krettek <[email protected]>: > >> I actually just got it to work by updating the Akka dependency of Flink >> to 2.4.9 (Akka 2.4.x does not have a Protobuf dependency anymore and >> Protobuf only came into Flink as a transitive dependency.) I did this on >> Flink 1.1.2 so I also had to update the Flink Runner to 1.1.2 (I also >> changed the Runner pom to create a shaded "bundled" jar). Here are the two >> branches that you can use to get it to work: >> >> - https://github.com/aljoscha/flink/tree/flink-1.1.2-akka-2.4.9 >> - https://github.com/aljoscha/incubator-beam/tree/flink-1.1.2 >> >> The steps I did: >> - checkout Flink >> - run "mvn clean install -DskipTests" >> - checkout Beam >> - run "mvn clean install -DskipTests" >> - build your testing project using "mvn clean package" >> - copy beam-runners-flink_2.10-bundled-0.3.0-incubating-SNAPSHOT.jar >> from <beam-root>/runners/flink/runner/target to the lib/ folder of the >> Flink install >> - copy the jar from your testing project to the lib/ folder as well >> (this is important) >> - only now start the cluster >> - run using "bin/flink run" while also specifying your jar >> >> One caveat is that you have to move the program jar to the lib folder as >> well because of some class loader issues. It doesn't work if you simply >> give it as an argument to "bin/flink run". Also, the Web Dashboard seems to >> not work with those two jars in the lib folder. Probably because there is >> some stuff in those jars that shouldn't really be there. >> >> In the future we should probably provide ready-made packages for this and >> update both Flink and Beam. >> >> Cheers, >> Aljoscha >> >> P.S. While writing this I just saw your second mail. Good that you also >> found a solution! :-) >> >> On Mon, 5 Sep 2016 at 17:42 Pawel Szczur <[email protected]> wrote: >> >>> Thanks. >>> >>> As far as I found Protobuf is used in two places independently in Beam: >>> - ProtoCoder, dependency in: (https://github.com/apache/inc >>> ubator-beam/blob/master/sdks/java/core/pom.xml) >>> - in FlinkRunner code, it's transient comes from org.apache.flink >>> >>> Now I'm thinking, would it be possible to shade the com.google.protobuf >>> in all Flink cluster and flink-runner.jar and leaving the Beam (i could use >>> version 3.0.0-beta1 for my proto)? >>> (I've tried, and I think it doesn't really work) >>> >>> I will try your approach with shading the proto in Beam and my program. >>> I'm using jarjar to replace package name in jar. >>> >>> 2016-09-05 15:41 GMT+02:00 Aljoscha Krettek <[email protected]>: >>> >>>> This seems to be a bigger problem. I got it to compile and (almost) run >>>> by shading away the protobuf dependency of Beam in the Flink Runner jar: >>>> https://github.com/apache/incubator-beam/blob/c83783209 >>>> f739fc541cea25f22cfe542b75ffa55/runners/flink/runner/pom.xml#L223. >>>> This does not really work, though, since now your code will not use a >>>> ProtoCoder but a SerializableCoder for your protobuf-generated class. The >>>> problem is that Beam uses reflection to determine whether a ProtoCoder can >>>> be used on user classes. Now, your user class will be derived from >>>> com.google.protobuf.MessageOrBuilder but the Beam code will look for >>>> something like flink.relocated.com.google.protobuf.MessageOrBuilder so >>>> that doesn't work. >>>> >>>> The only solution I see for this is to compile both Beam and the user >>>> program, create a fat-jar from this and then create another fat jar where >>>> Protobuf is relocated in all the code, i.e. in both the Beam code and the >>>> user code. That's not a very nice solution, though. >>>> >>>> On Sat, 3 Sep 2016 at 15:13 Pawel Szczur <[email protected]> wrote: >>>> >>>>> I've tried version 2.5.0, no difference. I've found that problem is in >>>>> a Beam Coder code. It assumes 3.0 version of ProtoBuf and Flink enforces >>>>> 2.5.0. I'm not sure if it's possible to enforce other proto version in >>>>> Flink or rename it by adding some prefix like 'org.apache.flink' >>>>> +'com.google.protobuf' when used in Flink. >>>>> >>>>> 2016-09-02 12:06 GMT+02:00 Aljoscha Krettek <[email protected]>: >>>>> >>>>>> I imagine that it's caused by the classpath being different when you >>>>>> run it using the Flink command. It might also be that your program fails >>>>>> at >>>>>> a different point once you fix the first problem, due to the protobuf >>>>>> mismatches. >>>>>> >>>>>> On Fri, 2 Sep 2016 at 11:59 Pawel Szczur <[email protected]> >>>>>> wrote: >>>>>> >>>>>>> Ok, I will try 2.5.0 (I use proto2 syntax anyway), but then, how is >>>>>>> it that when I run it as a binary it works fine, but sent to local >>>>>>> cluster >>>>>>> it fails? >>>>>>> >>>>>>> 2016-09-02 11:42 GMT+02:00 Aljoscha Krettek <[email protected]>: >>>>>>> >>>>>>>> Yep, as I said the problem is most likely that Flink has a >>>>>>>> dependency on a different version of protobuf so that clashes with the >>>>>>>> version that Beam provides or that you have as a dependency. Have you >>>>>>>> tried >>>>>>>> setting 2.5.0 as the version, since that's what Flink uses. In the end >>>>>>>> that's not a proper solution, however, and both Flink and Beam should >>>>>>>> likely shade their protobuf dependency, not sure of that's possible, >>>>>>>> though. >>>>>>>> >>>>>>>> On Fri, 2 Sep 2016 at 11:31 Pawel Szczur <[email protected]> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Thanks for reply. >>>>>>>>> >>>>>>>>> I've tried that, I think it didn't work . I've explicitly tried >>>>>>>>> version 3.0.0-beta1 to be compatible with Beam and in another test >>>>>>>>> 3.0.0 >>>>>>>>> just for curiosity. It still didn't work. >>>>>>>>> >>>>>>>>> I've added explicitly to pom.xml: >>>>>>>>> >>>>>>>>> <dependency> >>>>>>>>> <groupId>com.google.protobuf</groupId> >>>>>>>>> <artifactId>protobuf-java</artifactId> >>>>>>>>> <version>3.0.0-beta1</version> >>>>>>>>> </dependency> >>>>>>>>> >>>>>>>>> Did I miss some param? >>>>>>>>> >>>>>>>>> 2016-09-02 11:22 GMT+02:00 Aljoscha Krettek <[email protected]>: >>>>>>>>> >>>>>>>>>> Hi, >>>>>>>>>> I think this is the classic problem of dependency version >>>>>>>>>> conflicts. Beam has a dependency on protobuf 3.0.0-beta1 while Flink >>>>>>>>>> has a >>>>>>>>>> dependency on protobuf 2.5.0 (through Akka). I think when running >>>>>>>>>> your >>>>>>>>>> program through the bin/flink command the order in the classpath >>>>>>>>>> might be >>>>>>>>>> different and you're getting the wrong version. >>>>>>>>>> >>>>>>>>>> As an immediate fix, I think you could try having your own >>>>>>>>>> dependency on protobuf and shade that, so that you don't have version >>>>>>>>>> conflicts. >>>>>>>>>> >>>>>>>>>> Cheers, >>>>>>>>>> Aljoscha >>>>>>>>>> >>>>>>>>>> On Thu, 1 Sep 2016 at 16:15 Pawel Szczur <[email protected]> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> Hi, >>>>>>>>>>> >>>>>>>>>>> I'm trying to run a simple pipeline on local cluster using >>>>>>>>>>> Protocol Buffer to pass data between Beam functions. >>>>>>>>>>> >>>>>>>>>>> Everything works fine if I run it through: >>>>>>>>>>> java -jar target/dataflow-test-1.0-SNAPSHOT.jar >>>>>>>>>>> --runner=org.apache.beam.runners.flink.FlinkRunner >>>>>>>>>>> --input=/tmp/kinglear.txt --output=/tmp/wordcounts.txt >>>>>>>>>>> >>>>>>>>>>> But it fails when trying to run on flink cluster: >>>>>>>>>>> flink run target/dataflow-test-1.0-SNAPSHOT.jar >>>>>>>>>>> --runner=org.apache.beam.runners.flink.FlinkRunner >>>>>>>>>>> --input=/tmp/kinglear.txt --output=/tmp/wordcounts.txt >>>>>>>>>>> >>>>>>>>>>> The ready to run project: >>>>>>>>>>> https://github.com/orian/beam-flink-local-cluster >>>>>>>>>>> >>>>>>>>>>> Any clues? >>>>>>>>>>> >>>>>>>>>>> Cheers, Pawel >>>>>>>>>>> >>>>>>>>>>> ------------------------------------------------------------ >>>>>>>>>>> The program finished with the following exception: >>>>>>>>>>> >>>>>>>>>>> java.lang.NoSuchMethodError: com.google.protobuf.ExtensionR >>>>>>>>>>> egistry.getAllImmutableExtensionsByExtendedType(Ljava/lang/ >>>>>>>>>>> String;)Ljava/util/Set; >>>>>>>>>>> at com.mycompany.dataflow.WordCount.main(WordCount.java:109) >>>>>>>>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >>>>>>>>>>> at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAcce >>>>>>>>>>> ssorImpl.java:62) >>>>>>>>>>> at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMe >>>>>>>>>>> thodAccessorImpl.java:43) >>>>>>>>>>> at java.lang.reflect.Method.invoke(Method.java:498) >>>>>>>>>>> at org.apache.flink.client.program.PackagedProgram.callMainMeth >>>>>>>>>>> od(PackagedProgram.java:505) >>>>>>>>>>> at org.apache.flink.client.program.PackagedProgram.invokeIntera >>>>>>>>>>> ctiveModeForExecution(PackagedProgram.java:403) >>>>>>>>>>> at org.apache.flink.client.program.Client.runBlocking(Client. >>>>>>>>>>> java:248) >>>>>>>>>>> at org.apache.flink.client.CliFrontend.executeProgramBlocking(C >>>>>>>>>>> liFrontend.java:866) >>>>>>>>>>> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333) >>>>>>>>>>> at org.apache.flink.client.CliFrontend.parseParameters(CliFront >>>>>>>>>>> end.java:1192) >>>>>>>>>>> at org.apache.flink.client.CliFrontend.main(CliFrontend.java: >>>>>>>>>>> 1243) >>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>> >>>>> >>> >
