I imagine that it's caused by the classpath being different when you run it
using the Flink command. It might also be that your program fails at a
different point once you fix the first problem, due to the protobuf
mismatches.

On Fri, 2 Sep 2016 at 11:59 Pawel Szczur <[email protected]> wrote:

> Ok, I will try 2.5.0 (I use proto2 syntax anyway), but then, how is it
> that when I run it as a binary it works fine, but sent to local cluster it
> fails?
>
> 2016-09-02 11:42 GMT+02:00 Aljoscha Krettek <[email protected]>:
>
>> Yep, as I said the problem is most likely that Flink has a dependency on
>> a different version of protobuf so that clashes with the version that Beam
>> provides or that you have as a dependency. Have you tried setting 2.5.0 as
>> the version, since that's what Flink uses. In the end that's not a proper
>> solution, however, and both Flink and Beam should likely shade their
>> protobuf dependency, not sure of that's possible, though.
>>
>> On Fri, 2 Sep 2016 at 11:31 Pawel Szczur <[email protected]> wrote:
>>
>>> Thanks for reply.
>>>
>>> I've tried that, I think it didn't work . I've explicitly tried version
>>> 3.0.0-beta1 to be compatible with Beam and in another test 3.0.0 just for
>>> curiosity. It still didn't work.
>>>
>>> I've added explicitly to pom.xml:
>>>
>>> <dependency>
>>>     <groupId>com.google.protobuf</groupId>
>>>     <artifactId>protobuf-java</artifactId>
>>>     <version>3.0.0-beta1</version>
>>> </dependency>
>>>
>>> Did I miss some param?
>>>
>>> 2016-09-02 11:22 GMT+02:00 Aljoscha Krettek <[email protected]>:
>>>
>>>> Hi,
>>>> I think this is the classic problem of dependency version conflicts.
>>>> Beam has a dependency on protobuf 3.0.0-beta1 while Flink has a dependency
>>>> on protobuf 2.5.0 (through Akka). I think when running your program through
>>>> the bin/flink command the order in the classpath might be different and
>>>> you're getting the wrong version.
>>>>
>>>> As an immediate fix, I think you could try having your own dependency
>>>> on protobuf and shade that, so that you don't have version conflicts.
>>>>
>>>> Cheers,
>>>> Aljoscha
>>>>
>>>> On Thu, 1 Sep 2016 at 16:15 Pawel Szczur <[email protected]> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> I'm trying to run a simple pipeline on local cluster using Protocol
>>>>> Buffer to pass data between Beam functions.
>>>>>
>>>>> Everything works fine if I run it through:
>>>>> java -jar target/dataflow-test-1.0-SNAPSHOT.jar
>>>>> --runner=org.apache.beam.runners.flink.FlinkRunner
>>>>> --input=/tmp/kinglear.txt --output=/tmp/wordcounts.txt
>>>>>
>>>>> But it fails when trying to run on flink cluster:
>>>>> flink run target/dataflow-test-1.0-SNAPSHOT.jar
>>>>> --runner=org.apache.beam.runners.flink.FlinkRunner
>>>>> --input=/tmp/kinglear.txt --output=/tmp/wordcounts.txt
>>>>>
>>>>> The ready to run project:
>>>>> https://github.com/orian/beam-flink-local-cluster
>>>>>
>>>>> Any clues?
>>>>>
>>>>> Cheers, Pawel
>>>>>
>>>>> ------------------------------------------------------------
>>>>>  The program finished with the following exception:
>>>>>
>>>>> java.lang.NoSuchMethodError:
>>>>> com.google.protobuf.ExtensionRegistry.getAllImmutableExtensionsByExtendedType(Ljava/lang/String;)Ljava/util/Set;
>>>>> at com.mycompany.dataflow.WordCount.main(WordCount.java:109)
>>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>>> at
>>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>>> at
>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>> at java.lang.reflect.Method.invoke(Method.java:498)
>>>>> at
>>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
>>>>> at
>>>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
>>>>> at org.apache.flink.client.program.Client.runBlocking(Client.java:248)
>>>>> at
>>>>> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866)
>>>>> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
>>>>> at
>>>>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1192)
>>>>> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1243)
>>>>>
>>>>
>>>
>

Reply via email to