Thanks Dan,

I tried that, but getting the below. Note that the jar contains the 
FlinkPipelineRunner.



% jar -tf target/beam-1.0-SNAPSHOT.jar | grep FlinkPipeline
org/apache/beam/runners/flink/FlinkPipelineRunner.class
org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.class
org/apache/beam/runners/flink/FlinkPipelineOptions.class
org/apache/beam/runners/flink/translation/FlinkPipelineTranslator.class

% flink run -c 
com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample 
target/beam-1.0-SNAPSHOT.jar --runner=FlinkPipelineRunner 
--bootstrapServers=cl-pu4p:9092 --topics=test_in --outputTopic=test_out

------------------------------------------------------------
 The program finished with the following exception:

org.apache.flink.client.program.ProgramInvocationException: The main method 
caused an error.
        at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:520)
        at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
        at org.apache.flink.client.program.Client.runBlocking(Client.java:248)
        at 
org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866)
        at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
        at 
org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189)
        at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239)
Caused by: java.lang.IllegalArgumentException: Unknown 'runner' specified 
'FlinkPipelineRunner', supported pipeline runners 
[BlockingDataflowPipelineRunner, DataflowPipelineRunner, DirectPipelineRunner]
        at 
com.google.cloud.dataflow.sdk.repackaged.com.google.common.base.Preconditions.checkArgument(Preconditions.java:146)
        at 
com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory.parseObjects(PipelineOptionsFactory.java:1445)
        at 
com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory.access$400(PipelineOptionsFactory.java:99)
        at 
com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory$Builder.as(PipelineOptionsFactory.java:284)
        at 
com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample.main(TopHashtagsExample.java:117)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
        ... 6 more



> On Mar 18, 2016, at 5:00 PM, Dan Halperin <[email protected]> wrote:
> 
> Thanks for catching that, Aljoscha!
> 
> Note that the Flink runner should be available via a command-line option as 
> well: --runner=FlinkPipelineRunner.
> 
> The list of valid values for that flag is computed by walking the classpath 
> at runtime, so as long as the Flink jar is present it'll work.
> 
> On Fri, Mar 18, 2016 at 1:21 PM, Aljoscha Krettek <[email protected] 
> <mailto:[email protected]>> wrote:
> Hi,
> looks like the example is being executed with the DirectPipelineRunner which 
> does not seem to be able to cope with UnboundedSource. You need to set the 
> runner to the FlinkRunner in the example code as described here: 
> https://github.com/apache/incubator-beam/tree/master/runners/flink#executing-an-example
>  
> <https://github.com/apache/incubator-beam/tree/master/runners/flink#executing-an-example>
> 
> The Flink runner should be able to deal with UnboundedSource but has the 
> limitation that sources are always parallelism=1 (this is being worked on, 
> however).
> 
> Cheers,
> Aljoscha
> > On 18 Mar 2016, at 20:56, Dan Halperin <[email protected] 
> > <mailto:[email protected]>> wrote:
> >
> > Looks like the Flink runner may not yet support arbitrary code written with 
> > the UnboundedSource API. That is, it looks like the Flink runner expects 
> > the sources to get translated away.
> >
> > Max?
> >
> > Dan
> >
> > On Fri, Mar 18, 2016 at 12:20 PM, William McCarthy 
> > <[email protected] <mailto:[email protected]>> wrote:
> > Thanks Raghu,
> >
> > When I try to run it on flink using the incubator-beam code, i.e.
> >
> > flink run -c 
> > com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample 
> > target/beam-1.0-SNAPSHOT.jar --bootstrapServers=cl-pu4p:9092 
> > --topics=test_in --outputTopic=test_out
> >
> > I get this:
> >
> > org.apache.flink.client.program.ProgramInvocationException: The main method 
> > caused an error.
> >       at 
> > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:520)
> >       at 
> > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
> >       at org.apache.flink.client.program.Client.runBlocking(Client.java:248)
> >       at 
> > org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866)
> >       at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
> >       at 
> > org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189)
> >       at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239)
> > Caused by: java.lang.IllegalStateException: no evaluator registered for 
> > Read(UnboundedKafkaSource)
> >       at 
> > com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner$Evaluator.visitTransform(DirectPipelineRunner.java:852)
> >       at 
> > com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:219)
> >       at 
> > com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:215)
> >       at 
> > com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:215)
> >       at 
> > com.google.cloud.dataflow.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:102)
> >       at 
> > com.google.cloud.dataflow.sdk.Pipeline.traverseTopologically(Pipeline.java:259)
> >       at 
> > com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner$Evaluator.run(DirectPipelineRunner.java:814)
> >       at 
> > com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:526)
> >       at 
> > com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:96)
> >       at com.google.cloud.dataflow.sdk.Pipeline.run(Pipeline.java:180)
> >       at 
> > com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample.main(TopHashtagsExample.java:140)
> >       at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> >       at 
> > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> >       at 
> > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> >       at java.lang.reflect.Method.invoke(Method.java:498)
> >       at 
> > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
> >       ... 6 more
> >
> > Any ideas?
> >
> > Bill
> >
> >> On Mar 18, 2016, at 2:47 PM, Raghu Angadi <[email protected] 
> >> <mailto:[email protected]>> wrote:
> >>
> >> Thanks for trying it.
> >>
> >> I fixed the CheckStyle error  (not sure why my build is not failing). Let 
> >> me know if you see any issues running with Beam. I haven't tried it. I 
> >> should. In fact Daniel Halperin says my patch should be against Beam..
> >>
> >> Raghu.
> >>
> >> On Fri, Mar 18, 2016 at 11:22 AM, William McCarthy 
> >> <[email protected] <mailto:[email protected]>> wrote:
> >> Thanks JB, Emanuele, Kostas & Raghu, especially Kostas and Raghu for 
> >> pointing me to working code.
> >>
> >> I’m in the middle of a hack day at the moment, so the speed of your 
> >> responses has been very welcome.
> >>
> >> In the first instance, I’ll try using your changes, Raghu. I’ve cloned 
> >> your repo, switched to the kafka branch and built both contrib/kafka and 
> >> contrib/examples/kafka. The contrib/kafka initially failed with a 
> >> CheckStyle error 
> >> (/Users/bill/dev/DataflowJavaSDK/contrib/kafka/src/main/java/com/google/cloud/dataflow/contrib/kafka/KafkaIO.java:683:12:
> >>  'private' modifier out of order with the JLS suggestions)… I’ve fixed 
> >> that in my local clone and now it’s building fine. I hope to be able to 
> >> run your contrib unchanged on top of the incubator-beam codebase, which 
> >> will be what I attempt to do now.
> >>
> >> Thanks again to all, for your swift help.
> >>
> >> Bill
> >>
> >>> On Mar 18, 2016, at 12:55 PM, Raghu Angadi <[email protected] 
> >>> <mailto:[email protected]>> wrote:
> >>>
> >>> Hi Bill,
> >>>
> >>> We have fairly well tested patch for KafkaIO (pr #121). It will be merged 
> >>> soon. The example there keeps track of top hashtags in 10 minute sliding 
> >>> window and writes the results to another Kafka topic. Please try it if 
> >>> you can. It is well tested on Google Cloud Dataflow. I have not run it 
> >>> using Flink runner.
> >>>
> >>> Raghu.
> >>>
> >>> On Fri, Mar 18, 2016 at 9:46 AM, Kostas Kloudas 
> >>> <[email protected] <mailto:[email protected]>> wrote:
> >>> Hello Bill,
> >>>
> >>> This is a known limitation of the Flink Runner.
> >>> There is a JIRA issue for that 
> >>> https://issues.apache.org/jira/browse/BEAM-127 
> >>> <https://issues.apache.org/jira/browse/BEAM-127>
> >>>
> >>> A wrapper for Flink sinks will come soon and as Beam evolves,
> >>> a more Beam-y solution will come as well.
> >>>
> >>> Kostas
> >>>> On Mar 18, 2016, at 5:23 PM, William McCarthy <[email protected] 
> >>>> <mailto:[email protected]>> wrote:
> >>>>
> >>>> Hi,
> >>>>
> >>>> I’m trying to write a proof-of-concept which takes messages from Kafka, 
> >>>> transforms them using Beam on Flink, then pushes the results onto a 
> >>>> different Kafka topic.
> >>>>
> >>>> I’ve used the KafkaWindowedWordCountExample as a starting point, and 
> >>>> that’s doing the first part of what I want to do, but it outputs to text 
> >>>> files as opposed to Kafka. FlinkKafkaProducer08 looks promising, but I 
> >>>> can’t figure out how to plug it into the pipeline. I was thinking that 
> >>>> it would be wrapped with an UnboundedFlinkSink, or some such, but that 
> >>>> doesn’t seem to exist.
> >>>>
> >>>> Any advice or thoughts on what I’m trying to do?
> >>>>
> >>>> I’m running the latest incubator-beam (as of last night from Github), 
> >>>> Flink 1.0.0 in cluster mode and Kafka 0.9.0.1, all on Google Compute 
> >>>> Engine (Debian Jessie).
> >>>>
> >>>> Thanks,
> >>>>
> >>>> Bill McCarthy
> >>>
> >>>
> >>
> >>
> >
> >
> 
> 

Reply via email to