Thanks for the update Max !

Regards
JB

On 03/21/2016 02:39 PM, Maximilian Michels wrote:
FYI: The Runner registration has been fixed. The Flink runner
explicitly registers as of [1]. Also, the SDK tries to look up the
PipelineRunner class in case it has not been registered [2].

[1] https://github.com/apache/incubator-beam/pull/40
[2] https://github.com/apache/incubator-beam/pull/61

On Sat, Mar 19, 2016 at 6:43 PM, Maximilian Michels <[email protected]> wrote:
Great to see such a lively discussion here.

I think we'll support sinks through the Write interface (like in
batched execution) and also have a dedicated wrapper for the Flink
sinks. This is a very pressing but easy to solve issue of the Flink
runner. Expect it to be in next week.

Also, the proper registration of the runner is about to to be merged.
We just need an ok from the contributor to merge the changes.

Best,
Max

On Sat, Mar 19, 2016 at 12:42 AM, Dan Halperin <[email protected]> wrote:
Thanks Bill!

Filed https://issues.apache.org/jira/browse/BEAM-136, but I'm glad it's not
blocking you!

On Fri, Mar 18, 2016 at 4:04 PM, William McCarthy
<[email protected]> wrote:

I tried that, but still no dice: Just to be clear, it’s not a blocker for
me, given that I have my example running, but for your information the
exception is below.

I’ll watch the commit log on the beam incubator and look forward to
deleting my copy of Raghu’s contributions when they’re merger to master.

Thanks again for everyone’s help,

Bill


Command followed by exception:

$ flink run -c
com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample
target/beam-1.0-SNAPSHOT.jar
--runner=org.apache.beam.runners.flink.FlinkPipelineRunner
--bootstrapServers=cl-pu4p:9092 --topics=test_in --outputTopic=test_out

------------------------------------------------------------
  The program finished with the following exception:

org.apache.flink.client.program.ProgramInvocationException: The main
method caused an error.
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:520)
at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
at org.apache.flink.client.program.Client.runBlocking(Client.java:248)
at
org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866)
at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
at
org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189)
at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239)
Caused by: java.lang.IllegalArgumentException: Unknown 'runner' specified
'org.apache.beam.runners.flink.FlinkPipelineRunner', supported pipeline
runners [BlockingDataflowPipelineRunner, DataflowPipelineRunner,
DirectPipelineRunner]
at
com.google.cloud.dataflow.sdk.repackaged.com.google.common.base.Preconditions.checkArgument(Preconditions.java:146)
at
com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory.parseObjects(PipelineOptionsFactory.java:1445)
at
com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory.access$400(PipelineOptionsFactory.java:99)
at
com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory$Builder.as(PipelineOptionsFactory.java:284)
at
com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample.main(TopHashtagsExample.java:117)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
... 6 more

On Mar 18, 2016, at 5:35 PM, Thomas Groh <[email protected]> wrote:

I don't believe the FlinkPipelineRunner is registered the same way the
Dataflow & Direct Pipeline runners are registered; using
org.apache.beam.runners.flink.FlinkPipelineRunner should work

On Fri, Mar 18, 2016 at 2:10 PM, William McCarthy
<[email protected]> wrote:

Thanks Dan,

I tried that, but getting the below. Note that the jar contains the
FlinkPipelineRunner.



% jar -tf target/beam-1.0-SNAPSHOT.jar | grep FlinkPipeline
org/apache/beam/runners/flink/FlinkPipelineRunner.class
org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.class
org/apache/beam/runners/flink/FlinkPipelineOptions.class
org/apache/beam/runners/flink/translation/FlinkPipelineTranslator.class

% flink run -c
com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample
target/beam-1.0-SNAPSHOT.jar --runner=FlinkPipelineRunner
--bootstrapServers=cl-pu4p:9092 --topics=test_in --outputTopic=test_out

------------------------------------------------------------
  The program finished with the following exception:

org.apache.flink.client.program.ProgramInvocationException: The main
method caused an error.
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:520)
at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
at org.apache.flink.client.program.Client.runBlocking(Client.java:248)
at
org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866)
at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
at
org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189)
at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239)
Caused by: java.lang.IllegalArgumentException: Unknown 'runner' specified
'FlinkPipelineRunner', supported pipeline runners
[BlockingDataflowPipelineRunner, DataflowPipelineRunner,
DirectPipelineRunner]
at
com.google.cloud.dataflow.sdk.repackaged.com.google.common.base.Preconditions.checkArgument(Preconditions.java:146)
at
com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory.parseObjects(PipelineOptionsFactory.java:1445)
at
com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory.access$400(PipelineOptionsFactory.java:99)
at
com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory$Builder.as(PipelineOptionsFactory.java:284)
at
com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample.main(TopHashtagsExample.java:117)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
... 6 more



On Mar 18, 2016, at 5:00 PM, Dan Halperin <[email protected]> wrote:

Thanks for catching that, Aljoscha!

Note that the Flink runner should be available via a command-line option
as well: --runner=FlinkPipelineRunner.

The list of valid values for that flag is computed by walking the
classpath at runtime, so as long as the Flink jar is present it'll work.

On Fri, Mar 18, 2016 at 1:21 PM, Aljoscha Krettek <[email protected]>
wrote:

Hi,
looks like the example is being executed with the DirectPipelineRunner
which does not seem to be able to cope with UnboundedSource. You need to set
the runner to the FlinkRunner in the example code as described here:
https://github.com/apache/incubator-beam/tree/master/runners/flink#executing-an-example

The Flink runner should be able to deal with UnboundedSource but has the
limitation that sources are always parallelism=1 (this is being worked on,
however).

Cheers,
Aljoscha
On 18 Mar 2016, at 20:56, Dan Halperin <[email protected]> wrote:

Looks like the Flink runner may not yet support arbitrary code written
with the UnboundedSource API. That is, it looks like the Flink runner
expects the sources to get translated away.

Max?

Dan

On Fri, Mar 18, 2016 at 12:20 PM, William McCarthy
<[email protected]> wrote:
Thanks Raghu,

When I try to run it on flink using the incubator-beam code, i.e.

flink run -c
com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample
target/beam-1.0-SNAPSHOT.jar --bootstrapServers=cl-pu4p:9092
--topics=test_in --outputTopic=test_out

I get this:

org.apache.flink.client.program.ProgramInvocationException: The main
method caused an error.
       at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:520)
       at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
       at
org.apache.flink.client.program.Client.runBlocking(Client.java:248)
       at
org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866)
       at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
       at
org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189)
       at
org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239)
Caused by: java.lang.IllegalStateException: no evaluator registered
for Read(UnboundedKafkaSource)
       at
com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner$Evaluator.visitTransform(DirectPipelineRunner.java:852)
       at
com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:219)
       at
com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:215)
       at
com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:215)
       at
com.google.cloud.dataflow.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:102)
       at
com.google.cloud.dataflow.sdk.Pipeline.traverseTopologically(Pipeline.java:259)
       at
com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner$Evaluator.run(DirectPipelineRunner.java:814)
       at
com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:526)
       at
com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:96)
       at com.google.cloud.dataflow.sdk.Pipeline.run(Pipeline.java:180)
       at
com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample.main(TopHashtagsExample.java:140)
       at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
       at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
       at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
       at java.lang.reflect.Method.invoke(Method.java:498)
       at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
       ... 6 more

Any ideas?

Bill

On Mar 18, 2016, at 2:47 PM, Raghu Angadi <[email protected]> wrote:

Thanks for trying it.

I fixed the CheckStyle error  (not sure why my build is not failing).
Let me know if you see any issues running with Beam. I haven't tried it. I
should. In fact Daniel Halperin says my patch should be against Beam..

Raghu.

On Fri, Mar 18, 2016 at 11:22 AM, William McCarthy
<[email protected]> wrote:
Thanks JB, Emanuele, Kostas & Raghu, especially Kostas and Raghu for
pointing me to working code.

I’m in the middle of a hack day at the moment, so the speed of your
responses has been very welcome.

In the first instance, I’ll try using your changes, Raghu. I’ve
cloned your repo, switched to the kafka branch and built both contrib/kafka
and contrib/examples/kafka. The contrib/kafka initially failed with a
CheckStyle error
(/Users/bill/dev/DataflowJavaSDK/contrib/kafka/src/main/java/com/google/cloud/dataflow/contrib/kafka/KafkaIO.java:683:12:
'private' modifier out of order with the JLS suggestions)… I’ve fixed that
in my local clone and now it’s building fine. I hope to be able to run your
contrib unchanged on top of the incubator-beam codebase, which will be what
I attempt to do now.

Thanks again to all, for your swift help.

Bill

On Mar 18, 2016, at 12:55 PM, Raghu Angadi <[email protected]>
wrote:

Hi Bill,

We have fairly well tested patch for KafkaIO (pr #121). It will be
merged soon. The example there keeps track of top hashtags in 10 minute
sliding window and writes the results to another Kafka topic. Please try it
if you can. It is well tested on Google Cloud Dataflow. I have not run it
using Flink runner.

Raghu.

On Fri, Mar 18, 2016 at 9:46 AM, Kostas Kloudas
<[email protected]> wrote:
Hello Bill,

This is a known limitation of the Flink Runner.
There is a JIRA issue for that
https://issues.apache.org/jira/browse/BEAM-127

A wrapper for Flink sinks will come soon and as Beam evolves,
a more Beam-y solution will come as well.

Kostas
On Mar 18, 2016, at 5:23 PM, William McCarthy
<[email protected]> wrote:

Hi,

I’m trying to write a proof-of-concept which takes messages from
Kafka, transforms them using Beam on Flink, then pushes the results onto a
different Kafka topic.

I’ve used the KafkaWindowedWordCountExample as a starting point,
and that’s doing the first part of what I want to do, but it outputs to text
files as opposed to Kafka. FlinkKafkaProducer08 looks promising, but I can’t
figure out how to plug it into the pipeline. I was thinking that it would be
wrapped with an UnboundedFlinkSink, or some such, but that doesn’t seem to
exist.

Any advice or thoughts on what I’m trying to do?

I’m running the latest incubator-beam (as of last night from
Github), Flink 1.0.0 in cluster mode and Kafka 0.9.0.1, all on Google
Compute Engine (Debian Jessie).

Thanks,

Bill McCarthy













--
Jean-Baptiste Onofré
[email protected]
http://blog.nanthrax.net
Talend - http://www.talend.com

Reply via email to