This seems to be a bigger problem. I got it to compile and (almost) run by
shading away the protobuf dependency of Beam in the Flink Runner jar:
https://github.com/apache/incubator-beam/blob/c83783209f739fc541cea25f22cfe542b75ffa55/runners/flink/runner/pom.xml#L223.
This does not really work, though, since now your code will not use a
ProtoCoder but a SerializableCoder for your protobuf-generated class. The
problem is that Beam uses reflection to determine whether a ProtoCoder can
be used on user classes. Now, your user class will be derived from
com.google.protobuf.MessageOrBuilder but the Beam code will look for
something like flink.relocated.com.google.protobuf.MessageOrBuilder so that
doesn't work.

The only solution I see for this is to compile both Beam and the user
program, create a fat-jar from this and then create another fat jar where
Protobuf is relocated in all the code, i.e. in both the  Beam code and the
user code. That's not a very nice solution, though.

On Sat, 3 Sep 2016 at 15:13 Pawel Szczur <[email protected]> wrote:

> I've tried version 2.5.0, no difference. I've found that problem is in a
> Beam Coder code. It assumes 3.0 version of ProtoBuf and Flink enforces
> 2.5.0. I'm not sure if it's possible to enforce other proto version in
> Flink or rename it by adding some prefix like 'org.apache.flink'
> +'com.google.protobuf' when used in Flink.
>
> 2016-09-02 12:06 GMT+02:00 Aljoscha Krettek <[email protected]>:
>
>> I imagine that it's caused by the classpath being different when you run
>> it using the Flink command. It might also be that your program fails at a
>> different point once you fix the first problem, due to the protobuf
>> mismatches.
>>
>> On Fri, 2 Sep 2016 at 11:59 Pawel Szczur <[email protected]> wrote:
>>
>>> Ok, I will try 2.5.0 (I use proto2 syntax anyway), but then, how is it
>>> that when I run it as a binary it works fine, but sent to local cluster it
>>> fails?
>>>
>>> 2016-09-02 11:42 GMT+02:00 Aljoscha Krettek <[email protected]>:
>>>
>>>> Yep, as I said the problem is most likely that Flink has a dependency
>>>> on a different version of protobuf so that clashes with the version that
>>>> Beam provides or that you have as a dependency. Have you tried setting
>>>> 2.5.0 as the version, since that's what Flink uses. In the end that's not a
>>>> proper solution, however, and both Flink and Beam should likely shade their
>>>> protobuf dependency, not sure of that's possible, though.
>>>>
>>>> On Fri, 2 Sep 2016 at 11:31 Pawel Szczur <[email protected]> wrote:
>>>>
>>>>> Thanks for reply.
>>>>>
>>>>> I've tried that, I think it didn't work . I've explicitly tried
>>>>> version 3.0.0-beta1 to be compatible with Beam and in another test 3.0.0
>>>>> just for curiosity. It still didn't work.
>>>>>
>>>>> I've added explicitly to pom.xml:
>>>>>
>>>>> <dependency>
>>>>>     <groupId>com.google.protobuf</groupId>
>>>>>     <artifactId>protobuf-java</artifactId>
>>>>>     <version>3.0.0-beta1</version>
>>>>> </dependency>
>>>>>
>>>>> Did I miss some param?
>>>>>
>>>>> 2016-09-02 11:22 GMT+02:00 Aljoscha Krettek <[email protected]>:
>>>>>
>>>>>> Hi,
>>>>>> I think this is the classic problem of dependency version conflicts.
>>>>>> Beam has a dependency on protobuf 3.0.0-beta1 while Flink has a 
>>>>>> dependency
>>>>>> on protobuf 2.5.0 (through Akka). I think when running your program 
>>>>>> through
>>>>>> the bin/flink command the order in the classpath might be different and
>>>>>> you're getting the wrong version.
>>>>>>
>>>>>> As an immediate fix, I think you could try having your own dependency
>>>>>> on protobuf and shade that, so that you don't have version conflicts.
>>>>>>
>>>>>> Cheers,
>>>>>> Aljoscha
>>>>>>
>>>>>> On Thu, 1 Sep 2016 at 16:15 Pawel Szczur <[email protected]>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> I'm trying to run a simple pipeline on local cluster using Protocol
>>>>>>> Buffer to pass data between Beam functions.
>>>>>>>
>>>>>>> Everything works fine if I run it through:
>>>>>>> java -jar target/dataflow-test-1.0-SNAPSHOT.jar
>>>>>>> --runner=org.apache.beam.runners.flink.FlinkRunner
>>>>>>> --input=/tmp/kinglear.txt --output=/tmp/wordcounts.txt
>>>>>>>
>>>>>>> But it fails when trying to run on flink cluster:
>>>>>>> flink run target/dataflow-test-1.0-SNAPSHOT.jar
>>>>>>> --runner=org.apache.beam.runners.flink.FlinkRunner
>>>>>>> --input=/tmp/kinglear.txt --output=/tmp/wordcounts.txt
>>>>>>>
>>>>>>> The ready to run project:
>>>>>>> https://github.com/orian/beam-flink-local-cluster
>>>>>>>
>>>>>>> Any clues?
>>>>>>>
>>>>>>> Cheers, Pawel
>>>>>>>
>>>>>>> ------------------------------------------------------------
>>>>>>>  The program finished with the following exception:
>>>>>>>
>>>>>>> java.lang.NoSuchMethodError:
>>>>>>> com.google.protobuf.ExtensionRegistry.getAllImmutableExtensionsByExtendedType(Ljava/lang/String;)Ljava/util/Set;
>>>>>>> at com.mycompany.dataflow.WordCount.main(WordCount.java:109)
>>>>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>>>>> at
>>>>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>>>>> at
>>>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>>>> at java.lang.reflect.Method.invoke(Method.java:498)
>>>>>>> at
>>>>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
>>>>>>> at
>>>>>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
>>>>>>> at
>>>>>>> org.apache.flink.client.program.Client.runBlocking(Client.java:248)
>>>>>>> at
>>>>>>> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866)
>>>>>>> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
>>>>>>> at
>>>>>>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1192)
>>>>>>> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1243)
>>>>>>>
>>>>>>
>>>>>
>>>
>

Reply via email to