Thanks Max. Bill McCarthy, I know you are unblocked and KafkaWriter is good enough. Please try KafkaIO source from my branch with Flink runner if you get a chance.
thanks, Raghu. On Mon, Mar 21, 2016 at 6:54 AM, Jean-Baptiste Onofré <[email protected]> wrote: > Thanks for the update Max ! > > Regards > JB > > > On 03/21/2016 02:39 PM, Maximilian Michels wrote: > >> FYI: The Runner registration has been fixed. The Flink runner >> explicitly registers as of [1]. Also, the SDK tries to look up the >> PipelineRunner class in case it has not been registered [2]. >> >> [1] https://github.com/apache/incubator-beam/pull/40 >> [2] https://github.com/apache/incubator-beam/pull/61 >> >> On Sat, Mar 19, 2016 at 6:43 PM, Maximilian Michels <[email protected]> >> wrote: >> >>> Great to see such a lively discussion here. >>> >>> I think we'll support sinks through the Write interface (like in >>> batched execution) and also have a dedicated wrapper for the Flink >>> sinks. This is a very pressing but easy to solve issue of the Flink >>> runner. Expect it to be in next week. >>> >>> Also, the proper registration of the runner is about to to be merged. >>> We just need an ok from the contributor to merge the changes. >>> >>> Best, >>> Max >>> >>> On Sat, Mar 19, 2016 at 12:42 AM, Dan Halperin <[email protected]> >>> wrote: >>> >>>> Thanks Bill! >>>> >>>> Filed https://issues.apache.org/jira/browse/BEAM-136, but I'm glad >>>> it's not >>>> blocking you! >>>> >>>> On Fri, Mar 18, 2016 at 4:04 PM, William McCarthy >>>> <[email protected]> wrote: >>>> >>>>> >>>>> I tried that, but still no dice: Just to be clear, it’s not a blocker >>>>> for >>>>> me, given that I have my example running, but for your information the >>>>> exception is below. >>>>> >>>>> I’ll watch the commit log on the beam incubator and look forward to >>>>> deleting my copy of Raghu’s contributions when they’re merger to >>>>> master. >>>>> >>>>> Thanks again for everyone’s help, >>>>> >>>>> Bill >>>>> >>>>> >>>>> Command followed by exception: >>>>> >>>>> $ flink run -c >>>>> com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample >>>>> target/beam-1.0-SNAPSHOT.jar >>>>> --runner=org.apache.beam.runners.flink.FlinkPipelineRunner >>>>> --bootstrapServers=cl-pu4p:9092 --topics=test_in --outputTopic=test_out >>>>> >>>>> ------------------------------------------------------------ >>>>> The program finished with the following exception: >>>>> >>>>> org.apache.flink.client.program.ProgramInvocationException: The main >>>>> method caused an error. >>>>> at >>>>> >>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:520) >>>>> at >>>>> >>>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403) >>>>> at org.apache.flink.client.program.Client.runBlocking(Client.java:248) >>>>> at >>>>> >>>>> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866) >>>>> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333) >>>>> at >>>>> >>>>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189) >>>>> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239) >>>>> Caused by: java.lang.IllegalArgumentException: Unknown 'runner' >>>>> specified >>>>> 'org.apache.beam.runners.flink.FlinkPipelineRunner', supported pipeline >>>>> runners [BlockingDataflowPipelineRunner, DataflowPipelineRunner, >>>>> DirectPipelineRunner] >>>>> at >>>>> >>>>> com.google.cloud.dataflow.sdk.repackaged.com.google.common.base.Preconditions.checkArgument(Preconditions.java:146) >>>>> at >>>>> >>>>> com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory.parseObjects(PipelineOptionsFactory.java:1445) >>>>> at >>>>> >>>>> com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory.access$400(PipelineOptionsFactory.java:99) >>>>> at >>>>> >>>>> com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory$Builder.as(PipelineOptionsFactory.java:284) >>>>> at >>>>> >>>>> com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample.main(TopHashtagsExample.java:117) >>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >>>>> at >>>>> >>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >>>>> at >>>>> >>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>>>> at java.lang.reflect.Method.invoke(Method.java:498) >>>>> at >>>>> >>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505) >>>>> ... 6 more >>>>> >>>>> On Mar 18, 2016, at 5:35 PM, Thomas Groh <[email protected]> wrote: >>>>> >>>>> I don't believe the FlinkPipelineRunner is registered the same way the >>>>> Dataflow & Direct Pipeline runners are registered; using >>>>> org.apache.beam.runners.flink.FlinkPipelineRunner should work >>>>> >>>>> On Fri, Mar 18, 2016 at 2:10 PM, William McCarthy >>>>> <[email protected]> wrote: >>>>> >>>>>> >>>>>> Thanks Dan, >>>>>> >>>>>> I tried that, but getting the below. Note that the jar contains the >>>>>> FlinkPipelineRunner. >>>>>> >>>>>> >>>>>> >>>>>> % jar -tf target/beam-1.0-SNAPSHOT.jar | grep FlinkPipeline >>>>>> org/apache/beam/runners/flink/FlinkPipelineRunner.class >>>>>> org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.class >>>>>> org/apache/beam/runners/flink/FlinkPipelineOptions.class >>>>>> >>>>>> org/apache/beam/runners/flink/translation/FlinkPipelineTranslator.class >>>>>> >>>>>> % flink run -c >>>>>> com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample >>>>>> target/beam-1.0-SNAPSHOT.jar --runner=FlinkPipelineRunner >>>>>> --bootstrapServers=cl-pu4p:9092 --topics=test_in >>>>>> --outputTopic=test_out >>>>>> >>>>>> ------------------------------------------------------------ >>>>>> The program finished with the following exception: >>>>>> >>>>>> org.apache.flink.client.program.ProgramInvocationException: The main >>>>>> method caused an error. >>>>>> at >>>>>> >>>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:520) >>>>>> at >>>>>> >>>>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403) >>>>>> at org.apache.flink.client.program.Client.runBlocking(Client.java:248) >>>>>> at >>>>>> >>>>>> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866) >>>>>> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333) >>>>>> at >>>>>> >>>>>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189) >>>>>> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239) >>>>>> Caused by: java.lang.IllegalArgumentException: Unknown 'runner' >>>>>> specified >>>>>> 'FlinkPipelineRunner', supported pipeline runners >>>>>> [BlockingDataflowPipelineRunner, DataflowPipelineRunner, >>>>>> DirectPipelineRunner] >>>>>> at >>>>>> >>>>>> com.google.cloud.dataflow.sdk.repackaged.com.google.common.base.Preconditions.checkArgument(Preconditions.java:146) >>>>>> at >>>>>> >>>>>> com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory.parseObjects(PipelineOptionsFactory.java:1445) >>>>>> at >>>>>> >>>>>> com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory.access$400(PipelineOptionsFactory.java:99) >>>>>> at >>>>>> >>>>>> com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory$Builder.as(PipelineOptionsFactory.java:284) >>>>>> at >>>>>> >>>>>> com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample.main(TopHashtagsExample.java:117) >>>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >>>>>> at >>>>>> >>>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >>>>>> at >>>>>> >>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>>>>> at java.lang.reflect.Method.invoke(Method.java:498) >>>>>> at >>>>>> >>>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505) >>>>>> ... 6 more >>>>>> >>>>>> >>>>>> >>>>>> On Mar 18, 2016, at 5:00 PM, Dan Halperin <[email protected]> >>>>>> wrote: >>>>>> >>>>>> Thanks for catching that, Aljoscha! >>>>>> >>>>>> Note that the Flink runner should be available via a command-line >>>>>> option >>>>>> as well: --runner=FlinkPipelineRunner. >>>>>> >>>>>> The list of valid values for that flag is computed by walking the >>>>>> classpath at runtime, so as long as the Flink jar is present it'll >>>>>> work. >>>>>> >>>>>> On Fri, Mar 18, 2016 at 1:21 PM, Aljoscha Krettek < >>>>>> [email protected]> >>>>>> wrote: >>>>>> >>>>>>> >>>>>>> Hi, >>>>>>> looks like the example is being executed with the >>>>>>> DirectPipelineRunner >>>>>>> which does not seem to be able to cope with UnboundedSource. You >>>>>>> need to set >>>>>>> the runner to the FlinkRunner in the example code as described here: >>>>>>> >>>>>>> https://github.com/apache/incubator-beam/tree/master/runners/flink#executing-an-example >>>>>>> >>>>>>> The Flink runner should be able to deal with UnboundedSource but has >>>>>>> the >>>>>>> limitation that sources are always parallelism=1 (this is being >>>>>>> worked on, >>>>>>> however). >>>>>>> >>>>>>> Cheers, >>>>>>> Aljoscha >>>>>>> >>>>>>>> On 18 Mar 2016, at 20:56, Dan Halperin <[email protected]> wrote: >>>>>>>> >>>>>>>> Looks like the Flink runner may not yet support arbitrary code >>>>>>>> written >>>>>>>> with the UnboundedSource API. That is, it looks like the Flink >>>>>>>> runner >>>>>>>> expects the sources to get translated away. >>>>>>>> >>>>>>>> Max? >>>>>>>> >>>>>>>> Dan >>>>>>>> >>>>>>>> On Fri, Mar 18, 2016 at 12:20 PM, William McCarthy >>>>>>>> <[email protected]> wrote: >>>>>>>> Thanks Raghu, >>>>>>>> >>>>>>>> When I try to run it on flink using the incubator-beam code, i.e. >>>>>>>> >>>>>>>> flink run -c >>>>>>>> com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample >>>>>>>> target/beam-1.0-SNAPSHOT.jar --bootstrapServers=cl-pu4p:9092 >>>>>>>> --topics=test_in --outputTopic=test_out >>>>>>>> >>>>>>>> I get this: >>>>>>>> >>>>>>>> org.apache.flink.client.program.ProgramInvocationException: The main >>>>>>>> method caused an error. >>>>>>>> at >>>>>>>> >>>>>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:520) >>>>>>>> at >>>>>>>> >>>>>>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403) >>>>>>>> at >>>>>>>> org.apache.flink.client.program.Client.runBlocking(Client.java:248) >>>>>>>> at >>>>>>>> >>>>>>>> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866) >>>>>>>> at >>>>>>>> org.apache.flink.client.CliFrontend.run(CliFrontend.java:333) >>>>>>>> at >>>>>>>> >>>>>>>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189) >>>>>>>> at >>>>>>>> org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239) >>>>>>>> Caused by: java.lang.IllegalStateException: no evaluator registered >>>>>>>> for Read(UnboundedKafkaSource) >>>>>>>> at >>>>>>>> >>>>>>>> com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner$Evaluator.visitTransform(DirectPipelineRunner.java:852) >>>>>>>> at >>>>>>>> >>>>>>>> com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:219) >>>>>>>> at >>>>>>>> >>>>>>>> com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:215) >>>>>>>> at >>>>>>>> >>>>>>>> com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:215) >>>>>>>> at >>>>>>>> >>>>>>>> com.google.cloud.dataflow.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:102) >>>>>>>> at >>>>>>>> >>>>>>>> com.google.cloud.dataflow.sdk.Pipeline.traverseTopologically(Pipeline.java:259) >>>>>>>> at >>>>>>>> >>>>>>>> com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner$Evaluator.run(DirectPipelineRunner.java:814) >>>>>>>> at >>>>>>>> >>>>>>>> com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:526) >>>>>>>> at >>>>>>>> >>>>>>>> com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:96) >>>>>>>> at >>>>>>>> com.google.cloud.dataflow.sdk.Pipeline.run(Pipeline.java:180) >>>>>>>> at >>>>>>>> >>>>>>>> com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample.main(TopHashtagsExample.java:140) >>>>>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native >>>>>>>> Method) >>>>>>>> at >>>>>>>> >>>>>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >>>>>>>> at >>>>>>>> >>>>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>>>>>>> at java.lang.reflect.Method.invoke(Method.java:498) >>>>>>>> at >>>>>>>> >>>>>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505) >>>>>>>> ... 6 more >>>>>>>> >>>>>>>> Any ideas? >>>>>>>> >>>>>>>> Bill >>>>>>>> >>>>>>>> On Mar 18, 2016, at 2:47 PM, Raghu Angadi <[email protected]> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>> Thanks for trying it. >>>>>>>>> >>>>>>>>> I fixed the CheckStyle error (not sure why my build is not >>>>>>>>> failing). >>>>>>>>> Let me know if you see any issues running with Beam. I haven't >>>>>>>>> tried it. I >>>>>>>>> should. In fact Daniel Halperin says my patch should be against >>>>>>>>> Beam.. >>>>>>>>> >>>>>>>>> Raghu. >>>>>>>>> >>>>>>>>> On Fri, Mar 18, 2016 at 11:22 AM, William McCarthy >>>>>>>>> <[email protected]> wrote: >>>>>>>>> Thanks JB, Emanuele, Kostas & Raghu, especially Kostas and Raghu >>>>>>>>> for >>>>>>>>> pointing me to working code. >>>>>>>>> >>>>>>>>> I’m in the middle of a hack day at the moment, so the speed of your >>>>>>>>> responses has been very welcome. >>>>>>>>> >>>>>>>>> In the first instance, I’ll try using your changes, Raghu. I’ve >>>>>>>>> cloned your repo, switched to the kafka branch and built both >>>>>>>>> contrib/kafka >>>>>>>>> and contrib/examples/kafka. The contrib/kafka initially failed >>>>>>>>> with a >>>>>>>>> CheckStyle error >>>>>>>>> >>>>>>>>> (/Users/bill/dev/DataflowJavaSDK/contrib/kafka/src/main/java/com/google/cloud/dataflow/contrib/kafka/KafkaIO.java:683:12: >>>>>>>>> 'private' modifier out of order with the JLS suggestions)… I’ve >>>>>>>>> fixed that >>>>>>>>> in my local clone and now it’s building fine. I hope to be able to >>>>>>>>> run your >>>>>>>>> contrib unchanged on top of the incubator-beam codebase, which >>>>>>>>> will be what >>>>>>>>> I attempt to do now. >>>>>>>>> >>>>>>>>> Thanks again to all, for your swift help. >>>>>>>>> >>>>>>>>> Bill >>>>>>>>> >>>>>>>>> On Mar 18, 2016, at 12:55 PM, Raghu Angadi <[email protected]> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>> Hi Bill, >>>>>>>>>> >>>>>>>>>> We have fairly well tested patch for KafkaIO (pr #121). It will be >>>>>>>>>> merged soon. The example there keeps track of top hashtags in 10 >>>>>>>>>> minute >>>>>>>>>> sliding window and writes the results to another Kafka topic. >>>>>>>>>> Please try it >>>>>>>>>> if you can. It is well tested on Google Cloud Dataflow. I have >>>>>>>>>> not run it >>>>>>>>>> using Flink runner. >>>>>>>>>> >>>>>>>>>> Raghu. >>>>>>>>>> >>>>>>>>>> On Fri, Mar 18, 2016 at 9:46 AM, Kostas Kloudas >>>>>>>>>> <[email protected]> wrote: >>>>>>>>>> Hello Bill, >>>>>>>>>> >>>>>>>>>> This is a known limitation of the Flink Runner. >>>>>>>>>> There is a JIRA issue for that >>>>>>>>>> https://issues.apache.org/jira/browse/BEAM-127 >>>>>>>>>> >>>>>>>>>> A wrapper for Flink sinks will come soon and as Beam evolves, >>>>>>>>>> a more Beam-y solution will come as well. >>>>>>>>>> >>>>>>>>>> Kostas >>>>>>>>>> >>>>>>>>>>> On Mar 18, 2016, at 5:23 PM, William McCarthy >>>>>>>>>>> <[email protected]> wrote: >>>>>>>>>>> >>>>>>>>>>> Hi, >>>>>>>>>>> >>>>>>>>>>> I’m trying to write a proof-of-concept which takes messages from >>>>>>>>>>> Kafka, transforms them using Beam on Flink, then pushes the >>>>>>>>>>> results onto a >>>>>>>>>>> different Kafka topic. >>>>>>>>>>> >>>>>>>>>>> I’ve used the KafkaWindowedWordCountExample as a starting point, >>>>>>>>>>> and that’s doing the first part of what I want to do, but it >>>>>>>>>>> outputs to text >>>>>>>>>>> files as opposed to Kafka. FlinkKafkaProducer08 looks promising, >>>>>>>>>>> but I can’t >>>>>>>>>>> figure out how to plug it into the pipeline. I was thinking that >>>>>>>>>>> it would be >>>>>>>>>>> wrapped with an UnboundedFlinkSink, or some such, but that >>>>>>>>>>> doesn’t seem to >>>>>>>>>>> exist. >>>>>>>>>>> >>>>>>>>>>> Any advice or thoughts on what I’m trying to do? >>>>>>>>>>> >>>>>>>>>>> I’m running the latest incubator-beam (as of last night from >>>>>>>>>>> Github), Flink 1.0.0 in cluster mode and Kafka 0.9.0.1, all on >>>>>>>>>>> Google >>>>>>>>>>> Compute Engine (Debian Jessie). >>>>>>>>>>> >>>>>>>>>>> Thanks, >>>>>>>>>>> >>>>>>>>>>> Bill McCarthy >>>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>>> >>>>> >>>>> >>>> > -- > Jean-Baptiste Onofré > [email protected] > http://blog.nanthrax.net > Talend - http://www.talend.com >
