Can we bump the version of Flink in Beam? Is there a reason to stuck with
1.0.3? From my experience it tends to be harder and harder to change in
future.

2016-09-05 18:41 GMT+02:00 Aljoscha Krettek <[email protected]>:

> I actually just got it to work by updating the Akka dependency of Flink to
> 2.4.9 (Akka 2.4.x does not have a Protobuf dependency anymore and Protobuf
> only came into Flink as a transitive dependency.) I did this on Flink 1.1.2
> so I also had to update the Flink Runner to 1.1.2 (I also changed the
> Runner pom to create a shaded "bundled" jar). Here are the two branches
> that you can use to get it to work:
>
>  - https://github.com/aljoscha/flink/tree/flink-1.1.2-akka-2.4.9
>  - https://github.com/aljoscha/incubator-beam/tree/flink-1.1.2
>
> The steps I did:
>  - checkout Flink
>  - run "mvn clean install -DskipTests"
>  - checkout Beam
>  - run "mvn clean install -DskipTests"
>  - build your testing project using "mvn clean package"
>  - copy beam-runners-flink_2.10-bundled-0.3.0-incubating-SNAPSHOT.jar
> from <beam-root>/runners/flink/runner/target to the lib/ folder of the
> Flink install
>  - copy the jar from your testing project to the lib/ folder as well (this
> is important)
>  - only now start the cluster
>  - run using "bin/flink run" while also specifying your jar
>
> One caveat is that you have to move the program jar to the lib folder as
> well because of some class loader issues. It doesn't work if you simply
> give it as an argument to "bin/flink run". Also, the Web Dashboard seems to
> not work with those two jars in the lib folder. Probably because there is
> some stuff in those jars that shouldn't really be there.
>
> In the future we should probably provide ready-made packages for this and
> update both Flink and Beam.
>
> Cheers,
> Aljoscha
>
> P.S. While writing this I just saw your second mail. Good that you also
> found a solution! :-)
>
> On Mon, 5 Sep 2016 at 17:42 Pawel Szczur <[email protected]> wrote:
>
>> Thanks.
>>
>> As far as I found Protobuf is used in two places independently in Beam:
>>  - ProtoCoder, dependency in: (https://github.com/apache/
>> incubator-beam/blob/master/sdks/java/core/pom.xml)
>>  - in FlinkRunner code, it's transient comes from org.apache.flink
>>
>> Now I'm thinking, would it be possible to shade the com.google.protobuf
>> in all Flink cluster and flink-runner.jar and leaving the Beam (i could use
>> version 3.0.0-beta1 for my proto)?
>> (I've tried, and I think it doesn't really work)
>>
>> I will try your approach with shading the proto in Beam and my program.
>> I'm using jarjar to replace package name in jar.
>>
>> 2016-09-05 15:41 GMT+02:00 Aljoscha Krettek <[email protected]>:
>>
>>> This seems to be a bigger problem. I got it to compile and (almost) run
>>> by shading away the protobuf dependency of Beam in the Flink Runner jar:
>>> https://github.com/apache/incubator-beam/blob/
>>> c83783209f739fc541cea25f22cfe542b75ffa55/runners/flink/
>>> runner/pom.xml#L223. This does not really work, though, since now your
>>> code will not use a ProtoCoder but a SerializableCoder for your
>>> protobuf-generated class. The problem is that Beam uses reflection to
>>> determine whether a ProtoCoder can be used on user classes. Now, your user
>>> class will be derived from com.google.protobuf.MessageOrBuilder but the
>>> Beam code will look for something like 
>>> flink.relocated.com.google.protobuf.MessageOrBuilder
>>> so that doesn't work.
>>>
>>> The only solution I see for this is to compile both Beam and the user
>>> program, create a fat-jar from this and then create another fat jar where
>>> Protobuf is relocated in all the code, i.e. in both the  Beam code and the
>>> user code. That's not a very nice solution, though.
>>>
>>> On Sat, 3 Sep 2016 at 15:13 Pawel Szczur <[email protected]> wrote:
>>>
>>>> I've tried version 2.5.0, no difference. I've found that problem is in
>>>> a Beam Coder code. It assumes 3.0 version of ProtoBuf and Flink enforces
>>>> 2.5.0. I'm not sure if it's possible to enforce other proto version in
>>>> Flink or rename it by adding some prefix like 'org.apache.flink'
>>>> +'com.google.protobuf' when used in Flink.
>>>>
>>>> 2016-09-02 12:06 GMT+02:00 Aljoscha Krettek <[email protected]>:
>>>>
>>>>> I imagine that it's caused by the classpath being different when you
>>>>> run it using the Flink command. It might also be that your program fails 
>>>>> at
>>>>> a different point once you fix the first problem, due to the protobuf
>>>>> mismatches.
>>>>>
>>>>> On Fri, 2 Sep 2016 at 11:59 Pawel Szczur <[email protected]>
>>>>> wrote:
>>>>>
>>>>>> Ok, I will try 2.5.0 (I use proto2 syntax anyway), but then, how is
>>>>>> it that when I run it as a binary it works fine, but sent to local 
>>>>>> cluster
>>>>>> it fails?
>>>>>>
>>>>>> 2016-09-02 11:42 GMT+02:00 Aljoscha Krettek <[email protected]>:
>>>>>>
>>>>>>> Yep, as I said the problem is most likely that Flink has a
>>>>>>> dependency on a different version of protobuf so that clashes with the
>>>>>>> version that Beam provides or that you have as a dependency. Have you 
>>>>>>> tried
>>>>>>> setting 2.5.0 as the version, since that's what Flink uses. In the end
>>>>>>> that's not a proper solution, however, and both Flink and Beam should
>>>>>>> likely shade their protobuf dependency, not sure of that's possible, 
>>>>>>> though.
>>>>>>>
>>>>>>> On Fri, 2 Sep 2016 at 11:31 Pawel Szczur <[email protected]>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Thanks for reply.
>>>>>>>>
>>>>>>>> I've tried that, I think it didn't work . I've explicitly tried
>>>>>>>> version 3.0.0-beta1 to be compatible with Beam and in another test 
>>>>>>>> 3.0.0
>>>>>>>> just for curiosity. It still didn't work.
>>>>>>>>
>>>>>>>> I've added explicitly to pom.xml:
>>>>>>>>
>>>>>>>> <dependency>
>>>>>>>>     <groupId>com.google.protobuf</groupId>
>>>>>>>>     <artifactId>protobuf-java</artifactId>
>>>>>>>>     <version>3.0.0-beta1</version>
>>>>>>>> </dependency>
>>>>>>>>
>>>>>>>> Did I miss some param?
>>>>>>>>
>>>>>>>> 2016-09-02 11:22 GMT+02:00 Aljoscha Krettek <[email protected]>:
>>>>>>>>
>>>>>>>>> Hi,
>>>>>>>>> I think this is the classic problem of dependency version
>>>>>>>>> conflicts. Beam has a dependency on protobuf 3.0.0-beta1 while Flink 
>>>>>>>>> has a
>>>>>>>>> dependency on protobuf 2.5.0 (through Akka). I think when running your
>>>>>>>>> program through the bin/flink command the order in the classpath 
>>>>>>>>> might be
>>>>>>>>> different and you're getting the wrong version.
>>>>>>>>>
>>>>>>>>> As an immediate fix, I think you could try having your own
>>>>>>>>> dependency on protobuf and shade that, so that you don't have version
>>>>>>>>> conflicts.
>>>>>>>>>
>>>>>>>>> Cheers,
>>>>>>>>> Aljoscha
>>>>>>>>>
>>>>>>>>> On Thu, 1 Sep 2016 at 16:15 Pawel Szczur <[email protected]>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Hi,
>>>>>>>>>>
>>>>>>>>>> I'm trying to run a simple pipeline on local cluster using
>>>>>>>>>> Protocol Buffer to pass data between Beam functions.
>>>>>>>>>>
>>>>>>>>>> Everything works fine if I run it through:
>>>>>>>>>> java -jar target/dataflow-test-1.0-SNAPSHOT.jar
>>>>>>>>>> --runner=org.apache.beam.runners.flink.FlinkRunner
>>>>>>>>>> --input=/tmp/kinglear.txt --output=/tmp/wordcounts.txt
>>>>>>>>>>
>>>>>>>>>> But it fails when trying to run on flink cluster:
>>>>>>>>>> flink run target/dataflow-test-1.0-SNAPSHOT.jar
>>>>>>>>>> --runner=org.apache.beam.runners.flink.FlinkRunner
>>>>>>>>>> --input=/tmp/kinglear.txt --output=/tmp/wordcounts.txt
>>>>>>>>>>
>>>>>>>>>> The ready to run project:
>>>>>>>>>> https://github.com/orian/beam-flink-local-cluster
>>>>>>>>>>
>>>>>>>>>> Any clues?
>>>>>>>>>>
>>>>>>>>>> Cheers, Pawel
>>>>>>>>>>
>>>>>>>>>> ------------------------------------------------------------
>>>>>>>>>>  The program finished with the following exception:
>>>>>>>>>>
>>>>>>>>>> java.lang.NoSuchMethodError: com.google.protobuf.
>>>>>>>>>> ExtensionRegistry.getAllImmutableExtensionsByExt
>>>>>>>>>> endedType(Ljava/lang/String;)Ljava/util/Set;
>>>>>>>>>> at com.mycompany.dataflow.WordCount.main(WordCount.java:109)
>>>>>>>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>>>>>>>> at sun.reflect.NativeMethodAccessorImpl.invoke(
>>>>>>>>>> NativeMethodAccessorImpl.java:62)
>>>>>>>>>> at sun.reflect.DelegatingMethodAccessorImpl.invoke(
>>>>>>>>>> DelegatingMethodAccessorImpl.java:43)
>>>>>>>>>> at java.lang.reflect.Method.invoke(Method.java:498)
>>>>>>>>>> at org.apache.flink.client.program.PackagedProgram.
>>>>>>>>>> callMainMethod(PackagedProgram.java:505)
>>>>>>>>>> at org.apache.flink.client.program.PackagedProgram.
>>>>>>>>>> invokeInteractiveModeForExecution(PackagedProgram.java:403)
>>>>>>>>>> at org.apache.flink.client.program.Client.runBlocking(
>>>>>>>>>> Client.java:248)
>>>>>>>>>> at org.apache.flink.client.CliFrontend.executeProgramBlocking(
>>>>>>>>>> CliFrontend.java:866)
>>>>>>>>>> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
>>>>>>>>>> at org.apache.flink.client.CliFrontend.parseParameters(
>>>>>>>>>> CliFrontend.java:1192)
>>>>>>>>>> at org.apache.flink.client.CliFrontend.main(CliFrontend.
>>>>>>>>>> java:1243)
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>
>>>>
>>

Reply via email to