Thank Aljoscha, your solution works.
For future, after building a fat jar, one must replace the package:
jarjar --mode process --rules rules.txt
target/dataflow-test-1.0-SNAPSHOT.jar --output
target/dataflow-test-1.0-SNAPSHOT.shade.jar

flink run --class com.mycompany.dataflow.WordCount
target/dataflow-test-1.0-SNAPSHOT.shade.jar
--runner=org.apache.beam.runners.flink.FlinkRunner
--input=/tmp/kinglear.txt --output=/tmp/wordcounts.txt

I will update the example repo with the solution in case someone has same
problem.

2016-09-05 17:41 GMT+02:00 Pawel Szczur <[email protected]>:

> Thanks.
>
> As far as I found Protobuf is used in two places independently in Beam:
>  - ProtoCoder, dependency in: (https://github.com/apache/inc
> ubator-beam/blob/master/sdks/java/core/pom.xml)
>  - in FlinkRunner code, it's transient comes from org.apache.flink
>
> Now I'm thinking, would it be possible to shade the com.google.protobuf in
> all Flink cluster and flink-runner.jar and leaving the Beam (i could use
> version 3.0.0-beta1 for my proto)?
> (I've tried, and I think it doesn't really work)
>
> I will try your approach with shading the proto in Beam and my program.
> I'm using jarjar to replace package name in jar.
>
> 2016-09-05 15:41 GMT+02:00 Aljoscha Krettek <[email protected]>:
>
>> This seems to be a bigger problem. I got it to compile and (almost) run
>> by shading away the protobuf dependency of Beam in the Flink Runner jar:
>> https://github.com/apache/incubator-beam/blob/c83783209
>> f739fc541cea25f22cfe542b75ffa55/runners/flink/runner/pom.xml#L223. This
>> does not really work, though, since now your code will not use a ProtoCoder
>> but a SerializableCoder for your protobuf-generated class. The problem is
>> that Beam uses reflection to determine whether a ProtoCoder can be used on
>> user classes. Now, your user class will be derived from
>> com.google.protobuf.MessageOrBuilder but the Beam code will look for
>> something like flink.relocated.com.google.protobuf.MessageOrBuilder so
>> that doesn't work.
>>
>> The only solution I see for this is to compile both Beam and the user
>> program, create a fat-jar from this and then create another fat jar where
>> Protobuf is relocated in all the code, i.e. in both the  Beam code and the
>> user code. That's not a very nice solution, though.
>>
>> On Sat, 3 Sep 2016 at 15:13 Pawel Szczur <[email protected]> wrote:
>>
>>> I've tried version 2.5.0, no difference. I've found that problem is in a
>>> Beam Coder code. It assumes 3.0 version of ProtoBuf and Flink enforces
>>> 2.5.0. I'm not sure if it's possible to enforce other proto version in
>>> Flink or rename it by adding some prefix like 'org.apache.flink'
>>> +'com.google.protobuf' when used in Flink.
>>>
>>> 2016-09-02 12:06 GMT+02:00 Aljoscha Krettek <[email protected]>:
>>>
>>>> I imagine that it's caused by the classpath being different when you
>>>> run it using the Flink command. It might also be that your program fails at
>>>> a different point once you fix the first problem, due to the protobuf
>>>> mismatches.
>>>>
>>>> On Fri, 2 Sep 2016 at 11:59 Pawel Szczur <[email protected]> wrote:
>>>>
>>>>> Ok, I will try 2.5.0 (I use proto2 syntax anyway), but then, how is it
>>>>> that when I run it as a binary it works fine, but sent to local cluster it
>>>>> fails?
>>>>>
>>>>> 2016-09-02 11:42 GMT+02:00 Aljoscha Krettek <[email protected]>:
>>>>>
>>>>>> Yep, as I said the problem is most likely that Flink has a dependency
>>>>>> on a different version of protobuf so that clashes with the version that
>>>>>> Beam provides or that you have as a dependency. Have you tried setting
>>>>>> 2.5.0 as the version, since that's what Flink uses. In the end that's 
>>>>>> not a
>>>>>> proper solution, however, and both Flink and Beam should likely shade 
>>>>>> their
>>>>>> protobuf dependency, not sure of that's possible, though.
>>>>>>
>>>>>> On Fri, 2 Sep 2016 at 11:31 Pawel Szczur <[email protected]>
>>>>>> wrote:
>>>>>>
>>>>>>> Thanks for reply.
>>>>>>>
>>>>>>> I've tried that, I think it didn't work . I've explicitly tried
>>>>>>> version 3.0.0-beta1 to be compatible with Beam and in another test 3.0.0
>>>>>>> just for curiosity. It still didn't work.
>>>>>>>
>>>>>>> I've added explicitly to pom.xml:
>>>>>>>
>>>>>>> <dependency>
>>>>>>>     <groupId>com.google.protobuf</groupId>
>>>>>>>     <artifactId>protobuf-java</artifactId>
>>>>>>>     <version>3.0.0-beta1</version>
>>>>>>> </dependency>
>>>>>>>
>>>>>>> Did I miss some param?
>>>>>>>
>>>>>>> 2016-09-02 11:22 GMT+02:00 Aljoscha Krettek <[email protected]>:
>>>>>>>
>>>>>>>> Hi,
>>>>>>>> I think this is the classic problem of dependency version
>>>>>>>> conflicts. Beam has a dependency on protobuf 3.0.0-beta1 while Flink 
>>>>>>>> has a
>>>>>>>> dependency on protobuf 2.5.0 (through Akka). I think when running your
>>>>>>>> program through the bin/flink command the order in the classpath might 
>>>>>>>> be
>>>>>>>> different and you're getting the wrong version.
>>>>>>>>
>>>>>>>> As an immediate fix, I think you could try having your own
>>>>>>>> dependency on protobuf and shade that, so that you don't have version
>>>>>>>> conflicts.
>>>>>>>>
>>>>>>>> Cheers,
>>>>>>>> Aljoscha
>>>>>>>>
>>>>>>>> On Thu, 1 Sep 2016 at 16:15 Pawel Szczur <[email protected]>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi,
>>>>>>>>>
>>>>>>>>> I'm trying to run a simple pipeline on local cluster using
>>>>>>>>> Protocol Buffer to pass data between Beam functions.
>>>>>>>>>
>>>>>>>>> Everything works fine if I run it through:
>>>>>>>>> java -jar target/dataflow-test-1.0-SNAPSHOT.jar
>>>>>>>>> --runner=org.apache.beam.runners.flink.FlinkRunner
>>>>>>>>> --input=/tmp/kinglear.txt --output=/tmp/wordcounts.txt
>>>>>>>>>
>>>>>>>>> But it fails when trying to run on flink cluster:
>>>>>>>>> flink run target/dataflow-test-1.0-SNAPSHOT.jar
>>>>>>>>> --runner=org.apache.beam.runners.flink.FlinkRunner
>>>>>>>>> --input=/tmp/kinglear.txt --output=/tmp/wordcounts.txt
>>>>>>>>>
>>>>>>>>> The ready to run project:
>>>>>>>>> https://github.com/orian/beam-flink-local-cluster
>>>>>>>>>
>>>>>>>>> Any clues?
>>>>>>>>>
>>>>>>>>> Cheers, Pawel
>>>>>>>>>
>>>>>>>>> ------------------------------------------------------------
>>>>>>>>>  The program finished with the following exception:
>>>>>>>>>
>>>>>>>>> java.lang.NoSuchMethodError: com.google.protobuf.ExtensionR
>>>>>>>>> egistry.getAllImmutableExtensionsByExtendedType(Ljava/lang/S
>>>>>>>>> tring;)Ljava/util/Set;
>>>>>>>>> at com.mycompany.dataflow.WordCount.main(WordCount.java:109)
>>>>>>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>>>>>>> at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAcce
>>>>>>>>> ssorImpl.java:62)
>>>>>>>>> at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMe
>>>>>>>>> thodAccessorImpl.java:43)
>>>>>>>>> at java.lang.reflect.Method.invoke(Method.java:498)
>>>>>>>>> at org.apache.flink.client.program.PackagedProgram.callMainMeth
>>>>>>>>> od(PackagedProgram.java:505)
>>>>>>>>> at org.apache.flink.client.program.PackagedProgram.invokeIntera
>>>>>>>>> ctiveModeForExecution(PackagedProgram.java:403)
>>>>>>>>> at org.apache.flink.client.program.Client.runBlocking(Client.ja
>>>>>>>>> va:248)
>>>>>>>>> at org.apache.flink.client.CliFrontend.executeProgramBlocking(C
>>>>>>>>> liFrontend.java:866)
>>>>>>>>> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
>>>>>>>>> at org.apache.flink.client.CliFrontend.parseParameters(CliFront
>>>>>>>>> end.java:1192)
>>>>>>>>> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1243)
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>
>>>
>

Reply via email to