And clearly we should fix this! On Fri, Mar 18, 2016 at 14:36 Thomas Groh <[email protected]> wrote:
> I don't believe the FlinkPipelineRunner is registered the same way the > Dataflow & Direct Pipeline runners are registered; using > org.apache.beam.runners.flink.FlinkPipelineRunner > should work > > On Fri, Mar 18, 2016 at 2:10 PM, William McCarthy < > [email protected]> wrote: > >> Thanks Dan, >> >> I tried that, but getting the below. Note that the jar contains the >> FlinkPipelineRunner. >> >> >> >> % jar -tf target/beam-1.0-SNAPSHOT.jar | grep FlinkPipeline >> org/apache/beam/runners/flink/FlinkPipelineRunner.class >> org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.class >> org/apache/beam/runners/flink/FlinkPipelineOptions.class >> org/apache/beam/runners/flink/translation/FlinkPipelineTranslator.class >> >> % flink run -c >> com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample >> target/beam-1.0-SNAPSHOT.jar --runner=FlinkPipelineRunner >> --bootstrapServers=cl-pu4p:9092 --topics=test_in --outputTopic=test_out >> >> ------------------------------------------------------------ >> The program finished with the following exception: >> >> org.apache.flink.client.program.ProgramInvocationException: The main >> method caused an error. >> at >> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:520) >> at >> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403) >> at org.apache.flink.client.program.Client.runBlocking(Client.java:248) >> at >> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866) >> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333) >> at >> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189) >> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239) >> Caused by: java.lang.IllegalArgumentException: Unknown 'runner' specified >> 'FlinkPipelineRunner', supported pipeline >> runners [BlockingDataflowPipelineRunner, DataflowPipelineRunner, >> DirectPipelineRunner] >> at >> com.google.cloud.dataflow.sdk.repackaged.com.google.common.base.Preconditions.checkArgument(Preconditions.java:146) >> at >> com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory.parseObjects(PipelineOptionsFactory.java:1445) >> at >> com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory.access$400(PipelineOptionsFactory.java:99) >> at >> com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory$Builder.as(PipelineOptionsFactory.java:284) >> at >> com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample.main(TopHashtagsExample.java:117) >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >> at >> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >> at >> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >> at java.lang.reflect.Method.invoke(Method.java:498) >> at >> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505) >> ... 6 more >> >> >> >> On Mar 18, 2016, at 5:00 PM, Dan Halperin <[email protected]> wrote: >> >> Thanks for catching that, Aljoscha! >> >> Note that the Flink runner should be available via a command-line option >> as well: --runner=FlinkPipelineRunner. >> >> The list of valid values for that flag is computed by walking the >> classpath at runtime, so as long as the Flink jar is present it'll work. >> >> On Fri, Mar 18, 2016 at 1:21 PM, Aljoscha Krettek <[email protected]> >> wrote: >> >>> Hi, >>> looks like the example is being executed with the DirectPipelineRunner >>> which does not seem to be able to cope with UnboundedSource. You need to >>> set the runner to the FlinkRunner in the example code as described here: >>> https://github.com/apache/incubator-beam/tree/master/runners/flink#executing-an-example >>> >>> The Flink runner should be able to deal with UnboundedSource but has the >>> limitation that sources are always parallelism=1 (this is being worked on, >>> however). >>> >>> Cheers, >>> Aljoscha >>> > On 18 Mar 2016, at 20:56, Dan Halperin <[email protected]> wrote: >>> > >>> > Looks like the Flink runner may not yet support arbitrary code written >>> with the UnboundedSource API. That is, it looks like the Flink runner >>> expects the sources to get translated away. >>> > >>> > Max? >>> > >>> > Dan >>> > >>> > On Fri, Mar 18, 2016 at 12:20 PM, William McCarthy < >>> [email protected]> wrote: >>> > Thanks Raghu, >>> > >>> > When I try to run it on flink using the incubator-beam code, i.e. >>> > >>> > flink run -c >>> com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample >>> target/beam-1.0-SNAPSHOT.jar --bootstrapServers=cl-pu4p:9092 >>> --topics=test_in --outputTopic=test_out >>> > >>> > I get this: >>> > >>> > org.apache.flink.client.program.ProgramInvocationException: The main >>> method caused an error. >>> > at >>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:520) >>> > at >>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403) >>> > at >>> org.apache.flink.client.program.Client.runBlocking(Client.java:248) >>> > at >>> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866) >>> > at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333) >>> > at >>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189) >>> > at >>> org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239) >>> > Caused by: java.lang.IllegalStateException: no evaluator registered >>> for Read(UnboundedKafkaSource) >>> > at >>> com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner$Evaluator.visitTransform(DirectPipelineRunner.java:852) >>> > at >>> com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:219) >>> > at >>> com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:215) >>> > at >>> com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:215) >>> > at >>> com.google.cloud.dataflow.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:102) >>> > at >>> com.google.cloud.dataflow.sdk.Pipeline.traverseTopologically(Pipeline.java:259) >>> > at >>> com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner$Evaluator.run(DirectPipelineRunner.java:814) >>> > at >>> com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:526) >>> > at >>> com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:96) >>> > at com.google.cloud.dataflow.sdk.Pipeline.run(Pipeline.java:180) >>> > at >>> com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample.main(TopHashtagsExample.java:140) >>> > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >>> > at >>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >>> > at >>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>> > at java.lang.reflect.Method.invoke(Method.java:498) >>> > at >>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505) >>> > ... 6 more >>> > >>> > Any ideas? >>> > >>> > Bill >>> > >>> >> On Mar 18, 2016, at 2:47 PM, Raghu Angadi <[email protected]> wrote: >>> >> >>> >> Thanks for trying it. >>> >> >>> >> I fixed the CheckStyle error (not sure why my build is not failing). >>> Let me know if you see any issues running with Beam. I haven't tried it. I >>> should. In fact Daniel Halperin says my patch should be against Beam.. >>> >> >>> >> Raghu. >>> >> >>> >> On Fri, Mar 18, 2016 at 11:22 AM, William McCarthy < >>> [email protected]> wrote: >>> >> Thanks JB, Emanuele, Kostas & Raghu, especially Kostas and Raghu for >>> pointing me to working code. >>> >> >>> >> I’m in the middle of a hack day at the moment, so the speed of your >>> responses has been very welcome. >>> >> >>> >> In the first instance, I’ll try using your changes, Raghu. I’ve >>> cloned your repo, switched to the kafka branch and built both contrib/kafka >>> and contrib/examples/kafka. The contrib/kafka initially failed with a >>> CheckStyle error >>> (/Users/bill/dev/DataflowJavaSDK/contrib/kafka/src/main/java/com/google/cloud/dataflow/contrib/kafka/KafkaIO.java:683:12: >>> 'private' modifier out of order with the JLS suggestions)… I’ve fixed that >>> in my local clone and now it’s building fine. I hope to be able to run your >>> contrib unchanged on top of the incubator-beam codebase, which will be what >>> I attempt to do now. >>> >> >>> >> Thanks again to all, for your swift help. >>> >> >>> >> Bill >>> >> >>> >>> On Mar 18, 2016, at 12:55 PM, Raghu Angadi <[email protected]> >>> wrote: >>> >>> >>> >>> Hi Bill, >>> >>> >>> >>> We have fairly well tested patch for KafkaIO (pr #121). It will be >>> merged soon. The example there keeps track of top hashtags in 10 minute >>> sliding window and writes the results to another Kafka topic. Please try it >>> if you can. It is well tested on Google Cloud Dataflow. I have not run it >>> using Flink runner. >>> >>> >>> >>> Raghu. >>> >>> >>> >>> On Fri, Mar 18, 2016 at 9:46 AM, Kostas Kloudas < >>> [email protected]> wrote: >>> >>> Hello Bill, >>> >>> >>> >>> This is a known limitation of the Flink Runner. >>> >>> There is a JIRA issue for that >>> https://issues.apache.org/jira/browse/BEAM-127 >>> >>> >>> >>> A wrapper for Flink sinks will come soon and as Beam evolves, >>> >>> a more Beam-y solution will come as well. >>> >>> >>> >>> Kostas >>> >>>> On Mar 18, 2016, at 5:23 PM, William McCarthy < >>> [email protected]> wrote: >>> >>>> >>> >>>> Hi, >>> >>>> >>> >>>> I’m trying to write a proof-of-concept which takes messages from >>> Kafka, transforms them using Beam on Flink, then pushes the results onto a >>> different Kafka topic. >>> >>>> >>> >>>> I’ve used the KafkaWindowedWordCountExample as a starting point, >>> and that’s doing the first part of what I want to do, but it outputs to >>> text files as opposed to Kafka. FlinkKafkaProducer08 looks promising, but I >>> can’t figure out how to plug it into the pipeline. I was thinking that it >>> would be wrapped with an UnboundedFlinkSink, or some such, but that doesn’t >>> seem to exist. >>> >>>> >>> >>>> Any advice or thoughts on what I’m trying to do? >>> >>>> >>> >>>> I’m running the latest incubator-beam (as of last night from >>> Github), Flink 1.0.0 in cluster mode and Kafka 0.9.0.1, all on Google >>> Compute Engine (Debian Jessie). >>> >>>> >>> >>>> Thanks, >>> >>>> >>> >>>> Bill McCarthy >>> >>> >>> >>> >>> >> >>> >> >>> > >>> > >>> >>> >> >> >
