I've tried version 2.5.0, no difference. I've found that problem is in a
Beam Coder code. It assumes 3.0 version of ProtoBuf and Flink enforces
2.5.0. I'm not sure if it's possible to enforce other proto version in
Flink or rename it by adding some prefix like 'org.apache.flink'
+'com.google.protobuf' when used in Flink.

2016-09-02 12:06 GMT+02:00 Aljoscha Krettek <[email protected]>:

> I imagine that it's caused by the classpath being different when you run
> it using the Flink command. It might also be that your program fails at a
> different point once you fix the first problem, due to the protobuf
> mismatches.
>
> On Fri, 2 Sep 2016 at 11:59 Pawel Szczur <[email protected]> wrote:
>
>> Ok, I will try 2.5.0 (I use proto2 syntax anyway), but then, how is it
>> that when I run it as a binary it works fine, but sent to local cluster it
>> fails?
>>
>> 2016-09-02 11:42 GMT+02:00 Aljoscha Krettek <[email protected]>:
>>
>>> Yep, as I said the problem is most likely that Flink has a dependency on
>>> a different version of protobuf so that clashes with the version that Beam
>>> provides or that you have as a dependency. Have you tried setting 2.5.0 as
>>> the version, since that's what Flink uses. In the end that's not a proper
>>> solution, however, and both Flink and Beam should likely shade their
>>> protobuf dependency, not sure of that's possible, though.
>>>
>>> On Fri, 2 Sep 2016 at 11:31 Pawel Szczur <[email protected]> wrote:
>>>
>>>> Thanks for reply.
>>>>
>>>> I've tried that, I think it didn't work . I've explicitly tried version
>>>> 3.0.0-beta1 to be compatible with Beam and in another test 3.0.0 just for
>>>> curiosity. It still didn't work.
>>>>
>>>> I've added explicitly to pom.xml:
>>>>
>>>> <dependency>
>>>>     <groupId>com.google.protobuf</groupId>
>>>>     <artifactId>protobuf-java</artifactId>
>>>>     <version>3.0.0-beta1</version>
>>>> </dependency>
>>>>
>>>> Did I miss some param?
>>>>
>>>> 2016-09-02 11:22 GMT+02:00 Aljoscha Krettek <[email protected]>:
>>>>
>>>>> Hi,
>>>>> I think this is the classic problem of dependency version conflicts.
>>>>> Beam has a dependency on protobuf 3.0.0-beta1 while Flink has a dependency
>>>>> on protobuf 2.5.0 (through Akka). I think when running your program 
>>>>> through
>>>>> the bin/flink command the order in the classpath might be different and
>>>>> you're getting the wrong version.
>>>>>
>>>>> As an immediate fix, I think you could try having your own dependency
>>>>> on protobuf and shade that, so that you don't have version conflicts.
>>>>>
>>>>> Cheers,
>>>>> Aljoscha
>>>>>
>>>>> On Thu, 1 Sep 2016 at 16:15 Pawel Szczur <[email protected]>
>>>>> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> I'm trying to run a simple pipeline on local cluster using Protocol
>>>>>> Buffer to pass data between Beam functions.
>>>>>>
>>>>>> Everything works fine if I run it through:
>>>>>> java -jar target/dataflow-test-1.0-SNAPSHOT.jar
>>>>>> --runner=org.apache.beam.runners.flink.FlinkRunner
>>>>>> --input=/tmp/kinglear.txt --output=/tmp/wordcounts.txt
>>>>>>
>>>>>> But it fails when trying to run on flink cluster:
>>>>>> flink run target/dataflow-test-1.0-SNAPSHOT.jar
>>>>>> --runner=org.apache.beam.runners.flink.FlinkRunner
>>>>>> --input=/tmp/kinglear.txt --output=/tmp/wordcounts.txt
>>>>>>
>>>>>> The ready to run project:
>>>>>> https://github.com/orian/beam-flink-local-cluster
>>>>>>
>>>>>> Any clues?
>>>>>>
>>>>>> Cheers, Pawel
>>>>>>
>>>>>> ------------------------------------------------------------
>>>>>>  The program finished with the following exception:
>>>>>>
>>>>>> java.lang.NoSuchMethodError: com.google.protobuf.ExtensionRegistry.
>>>>>> getAllImmutableExtensionsByExtendedType(Ljava/lang/String;)
>>>>>> Ljava/util/Set;
>>>>>> at com.mycompany.dataflow.WordCount.main(WordCount.java:109)
>>>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>>>> at sun.reflect.NativeMethodAccessorImpl.invoke(
>>>>>> NativeMethodAccessorImpl.java:62)
>>>>>> at sun.reflect.DelegatingMethodAccessorImpl.invoke(
>>>>>> DelegatingMethodAccessorImpl.java:43)
>>>>>> at java.lang.reflect.Method.invoke(Method.java:498)
>>>>>> at org.apache.flink.client.program.PackagedProgram.callMainMethod(
>>>>>> PackagedProgram.java:505)
>>>>>> at org.apache.flink.client.program.PackagedProgram.
>>>>>> invokeInteractiveModeForExecution(PackagedProgram.java:403)
>>>>>> at org.apache.flink.client.program.Client.runBlocking(
>>>>>> Client.java:248)
>>>>>> at org.apache.flink.client.CliFrontend.executeProgramBlocking(
>>>>>> CliFrontend.java:866)
>>>>>> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
>>>>>> at org.apache.flink.client.CliFrontend.parseParameters(
>>>>>> CliFrontend.java:1192)
>>>>>> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1243)
>>>>>>
>>>>>
>>>>
>>

Reply via email to