Hi Pawel,

https://github.com/apache/incubator-beam/blob/master/runners/flink/pom.xml#L42

Dan

On Fri, Nov 4, 2016 at 2:08 PM, Pawel Szczur <[email protected]> wrote:

> Can we bump the version of Flink in Beam? Is there a reason to stuck with
> 1.0.3? From my experience it tends to be harder and harder to change in
> future.
>
> 2016-09-05 18:41 GMT+02:00 Aljoscha Krettek <[email protected]>:
>
>> I actually just got it to work by updating the Akka dependency of Flink
>> to 2.4.9 (Akka 2.4.x does not have a Protobuf dependency anymore and
>> Protobuf only came into Flink as a transitive dependency.) I did this on
>> Flink 1.1.2 so I also had to update the Flink Runner to 1.1.2 (I also
>> changed the Runner pom to create a shaded "bundled" jar). Here are the two
>> branches that you can use to get it to work:
>>
>>  - https://github.com/aljoscha/flink/tree/flink-1.1.2-akka-2.4.9
>>  - https://github.com/aljoscha/incubator-beam/tree/flink-1.1.2
>>
>> The steps I did:
>>  - checkout Flink
>>  - run "mvn clean install -DskipTests"
>>  - checkout Beam
>>  - run "mvn clean install -DskipTests"
>>  - build your testing project using "mvn clean package"
>>  - copy beam-runners-flink_2.10-bundled-0.3.0-incubating-SNAPSHOT.jar
>> from <beam-root>/runners/flink/runner/target to the lib/ folder of the
>> Flink install
>>  - copy the jar from your testing project to the lib/ folder as well
>> (this is important)
>>  - only now start the cluster
>>  - run using "bin/flink run" while also specifying your jar
>>
>> One caveat is that you have to move the program jar to the lib folder as
>> well because of some class loader issues. It doesn't work if you simply
>> give it as an argument to "bin/flink run". Also, the Web Dashboard seems to
>> not work with those two jars in the lib folder. Probably because there is
>> some stuff in those jars that shouldn't really be there.
>>
>> In the future we should probably provide ready-made packages for this and
>> update both Flink and Beam.
>>
>> Cheers,
>> Aljoscha
>>
>> P.S. While writing this I just saw your second mail. Good that you also
>> found a solution! :-)
>>
>> On Mon, 5 Sep 2016 at 17:42 Pawel Szczur <[email protected]> wrote:
>>
>>> Thanks.
>>>
>>> As far as I found Protobuf is used in two places independently in Beam:
>>>  - ProtoCoder, dependency in: (https://github.com/apache/inc
>>> ubator-beam/blob/master/sdks/java/core/pom.xml)
>>>  - in FlinkRunner code, it's transient comes from org.apache.flink
>>>
>>> Now I'm thinking, would it be possible to shade the com.google.protobuf
>>> in all Flink cluster and flink-runner.jar and leaving the Beam (i could use
>>> version 3.0.0-beta1 for my proto)?
>>> (I've tried, and I think it doesn't really work)
>>>
>>> I will try your approach with shading the proto in Beam and my program.
>>> I'm using jarjar to replace package name in jar.
>>>
>>> 2016-09-05 15:41 GMT+02:00 Aljoscha Krettek <[email protected]>:
>>>
>>>> This seems to be a bigger problem. I got it to compile and (almost) run
>>>> by shading away the protobuf dependency of Beam in the Flink Runner jar:
>>>> https://github.com/apache/incubator-beam/blob/c83783209
>>>> f739fc541cea25f22cfe542b75ffa55/runners/flink/runner/pom.xml#L223.
>>>> This does not really work, though, since now your code will not use a
>>>> ProtoCoder but a SerializableCoder for your protobuf-generated class. The
>>>> problem is that Beam uses reflection to determine whether a ProtoCoder can
>>>> be used on user classes. Now, your user class will be derived from
>>>> com.google.protobuf.MessageOrBuilder but the Beam code will look for
>>>> something like flink.relocated.com.google.protobuf.MessageOrBuilder so
>>>> that doesn't work.
>>>>
>>>> The only solution I see for this is to compile both Beam and the user
>>>> program, create a fat-jar from this and then create another fat jar where
>>>> Protobuf is relocated in all the code, i.e. in both the  Beam code and the
>>>> user code. That's not a very nice solution, though.
>>>>
>>>> On Sat, 3 Sep 2016 at 15:13 Pawel Szczur <[email protected]> wrote:
>>>>
>>>>> I've tried version 2.5.0, no difference. I've found that problem is in
>>>>> a Beam Coder code. It assumes 3.0 version of ProtoBuf and Flink enforces
>>>>> 2.5.0. I'm not sure if it's possible to enforce other proto version in
>>>>> Flink or rename it by adding some prefix like 'org.apache.flink'
>>>>> +'com.google.protobuf' when used in Flink.
>>>>>
>>>>> 2016-09-02 12:06 GMT+02:00 Aljoscha Krettek <[email protected]>:
>>>>>
>>>>>> I imagine that it's caused by the classpath being different when you
>>>>>> run it using the Flink command. It might also be that your program fails 
>>>>>> at
>>>>>> a different point once you fix the first problem, due to the protobuf
>>>>>> mismatches.
>>>>>>
>>>>>> On Fri, 2 Sep 2016 at 11:59 Pawel Szczur <[email protected]>
>>>>>> wrote:
>>>>>>
>>>>>>> Ok, I will try 2.5.0 (I use proto2 syntax anyway), but then, how is
>>>>>>> it that when I run it as a binary it works fine, but sent to local 
>>>>>>> cluster
>>>>>>> it fails?
>>>>>>>
>>>>>>> 2016-09-02 11:42 GMT+02:00 Aljoscha Krettek <[email protected]>:
>>>>>>>
>>>>>>>> Yep, as I said the problem is most likely that Flink has a
>>>>>>>> dependency on a different version of protobuf so that clashes with the
>>>>>>>> version that Beam provides or that you have as a dependency. Have you 
>>>>>>>> tried
>>>>>>>> setting 2.5.0 as the version, since that's what Flink uses. In the end
>>>>>>>> that's not a proper solution, however, and both Flink and Beam should
>>>>>>>> likely shade their protobuf dependency, not sure of that's possible, 
>>>>>>>> though.
>>>>>>>>
>>>>>>>> On Fri, 2 Sep 2016 at 11:31 Pawel Szczur <[email protected]>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Thanks for reply.
>>>>>>>>>
>>>>>>>>> I've tried that, I think it didn't work . I've explicitly tried
>>>>>>>>> version 3.0.0-beta1 to be compatible with Beam and in another test 
>>>>>>>>> 3.0.0
>>>>>>>>> just for curiosity. It still didn't work.
>>>>>>>>>
>>>>>>>>> I've added explicitly to pom.xml:
>>>>>>>>>
>>>>>>>>> <dependency>
>>>>>>>>>     <groupId>com.google.protobuf</groupId>
>>>>>>>>>     <artifactId>protobuf-java</artifactId>
>>>>>>>>>     <version>3.0.0-beta1</version>
>>>>>>>>> </dependency>
>>>>>>>>>
>>>>>>>>> Did I miss some param?
>>>>>>>>>
>>>>>>>>> 2016-09-02 11:22 GMT+02:00 Aljoscha Krettek <[email protected]>:
>>>>>>>>>
>>>>>>>>>> Hi,
>>>>>>>>>> I think this is the classic problem of dependency version
>>>>>>>>>> conflicts. Beam has a dependency on protobuf 3.0.0-beta1 while Flink 
>>>>>>>>>> has a
>>>>>>>>>> dependency on protobuf 2.5.0 (through Akka). I think when running 
>>>>>>>>>> your
>>>>>>>>>> program through the bin/flink command the order in the classpath 
>>>>>>>>>> might be
>>>>>>>>>> different and you're getting the wrong version.
>>>>>>>>>>
>>>>>>>>>> As an immediate fix, I think you could try having your own
>>>>>>>>>> dependency on protobuf and shade that, so that you don't have version
>>>>>>>>>> conflicts.
>>>>>>>>>>
>>>>>>>>>> Cheers,
>>>>>>>>>> Aljoscha
>>>>>>>>>>
>>>>>>>>>> On Thu, 1 Sep 2016 at 16:15 Pawel Szczur <[email protected]>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi,
>>>>>>>>>>>
>>>>>>>>>>> I'm trying to run a simple pipeline on local cluster using
>>>>>>>>>>> Protocol Buffer to pass data between Beam functions.
>>>>>>>>>>>
>>>>>>>>>>> Everything works fine if I run it through:
>>>>>>>>>>> java -jar target/dataflow-test-1.0-SNAPSHOT.jar
>>>>>>>>>>> --runner=org.apache.beam.runners.flink.FlinkRunner
>>>>>>>>>>> --input=/tmp/kinglear.txt --output=/tmp/wordcounts.txt
>>>>>>>>>>>
>>>>>>>>>>> But it fails when trying to run on flink cluster:
>>>>>>>>>>> flink run target/dataflow-test-1.0-SNAPSHOT.jar
>>>>>>>>>>> --runner=org.apache.beam.runners.flink.FlinkRunner
>>>>>>>>>>> --input=/tmp/kinglear.txt --output=/tmp/wordcounts.txt
>>>>>>>>>>>
>>>>>>>>>>> The ready to run project:
>>>>>>>>>>> https://github.com/orian/beam-flink-local-cluster
>>>>>>>>>>>
>>>>>>>>>>> Any clues?
>>>>>>>>>>>
>>>>>>>>>>> Cheers, Pawel
>>>>>>>>>>>
>>>>>>>>>>> ------------------------------------------------------------
>>>>>>>>>>>  The program finished with the following exception:
>>>>>>>>>>>
>>>>>>>>>>> java.lang.NoSuchMethodError: com.google.protobuf.ExtensionR
>>>>>>>>>>> egistry.getAllImmutableExtensionsByExtendedType(Ljava/lang/
>>>>>>>>>>> String;)Ljava/util/Set;
>>>>>>>>>>> at com.mycompany.dataflow.WordCount.main(WordCount.java:109)
>>>>>>>>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>>>>>>>>> at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAcce
>>>>>>>>>>> ssorImpl.java:62)
>>>>>>>>>>> at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMe
>>>>>>>>>>> thodAccessorImpl.java:43)
>>>>>>>>>>> at java.lang.reflect.Method.invoke(Method.java:498)
>>>>>>>>>>> at org.apache.flink.client.program.PackagedProgram.callMainMeth
>>>>>>>>>>> od(PackagedProgram.java:505)
>>>>>>>>>>> at org.apache.flink.client.program.PackagedProgram.invokeIntera
>>>>>>>>>>> ctiveModeForExecution(PackagedProgram.java:403)
>>>>>>>>>>> at org.apache.flink.client.program.Client.runBlocking(Client.
>>>>>>>>>>> java:248)
>>>>>>>>>>> at org.apache.flink.client.CliFrontend.executeProgramBlocking(C
>>>>>>>>>>> liFrontend.java:866)
>>>>>>>>>>> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
>>>>>>>>>>> at org.apache.flink.client.CliFrontend.parseParameters(CliFront
>>>>>>>>>>> end.java:1192)
>>>>>>>>>>> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:
>>>>>>>>>>> 1243)
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>
>>>>>
>>>
>

Reply via email to