Hi Raghu, Sure, I’ll run this this evening and get back to you.
Bill > On Mar 21, 2016, at 12:23 PM, Raghu Angadi <[email protected]> wrote: > > Thanks Max. > > Bill McCarthy, > I know you are unblocked and KafkaWriter is good enough. Please try KafkaIO > source from my branch with Flink runner if you get a chance. > > thanks, > Raghu. > > On Mon, Mar 21, 2016 at 6:54 AM, Jean-Baptiste Onofré <[email protected] > <mailto:[email protected]>> wrote: > Thanks for the update Max ! > > Regards > JB > > > On 03/21/2016 02:39 PM, Maximilian Michels wrote: > FYI: The Runner registration has been fixed. The Flink runner > explicitly registers as of [1]. Also, the SDK tries to look up the > PipelineRunner class in case it has not been registered [2]. > > [1] https://github.com/apache/incubator-beam/pull/40 > <https://github.com/apache/incubator-beam/pull/40> > [2] https://github.com/apache/incubator-beam/pull/61 > <https://github.com/apache/incubator-beam/pull/61> > > On Sat, Mar 19, 2016 at 6:43 PM, Maximilian Michels <[email protected] > <mailto:[email protected]>> wrote: > Great to see such a lively discussion here. > > I think we'll support sinks through the Write interface (like in > batched execution) and also have a dedicated wrapper for the Flink > sinks. This is a very pressing but easy to solve issue of the Flink > runner. Expect it to be in next week. > > Also, the proper registration of the runner is about to to be merged. > We just need an ok from the contributor to merge the changes. > > Best, > Max > > On Sat, Mar 19, 2016 at 12:42 AM, Dan Halperin <[email protected] > <mailto:[email protected]>> wrote: > Thanks Bill! > > Filed https://issues.apache.org/jira/browse/BEAM-136 > <https://issues.apache.org/jira/browse/BEAM-136>, but I'm glad it's not > blocking you! > > On Fri, Mar 18, 2016 at 4:04 PM, William McCarthy > <[email protected] <mailto:[email protected]>> wrote: > > I tried that, but still no dice: Just to be clear, it’s not a blocker for > me, given that I have my example running, but for your information the > exception is below. > > I’ll watch the commit log on the beam incubator and look forward to > deleting my copy of Raghu’s contributions when they’re merger to master. > > Thanks again for everyone’s help, > > Bill > > > Command followed by exception: > > $ flink run -c > com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample > target/beam-1.0-SNAPSHOT.jar > --runner=org.apache.beam.runners.flink.FlinkPipelineRunner > --bootstrapServers=cl-pu4p:9092 --topics=test_in --outputTopic=test_out > > ------------------------------------------------------------ > The program finished with the following exception: > > org.apache.flink.client.program.ProgramInvocationException: The main > method caused an error. > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:520) > at > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403) > at org.apache.flink.client.program.Client.runBlocking(Client.java:248) > at > org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866) > at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333) > at > org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189) > at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239) > Caused by: java.lang.IllegalArgumentException: Unknown 'runner' specified > 'org.apache.beam.runners.flink.FlinkPipelineRunner', supported pipeline > runners [BlockingDataflowPipelineRunner, DataflowPipelineRunner, > DirectPipelineRunner] > at > com.google.cloud.dataflow.sdk.repackaged.com.google.common.base.Preconditions.checkArgument(Preconditions.java:146) > at > com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory.parseObjects(PipelineOptionsFactory.java:1445) > at > com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory.access$400(PipelineOptionsFactory.java:99) > at > com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory$Builder.as(PipelineOptionsFactory.java:284) > at > com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample.main(TopHashtagsExample.java:117) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505) > ... 6 more > > On Mar 18, 2016, at 5:35 PM, Thomas Groh <[email protected] > <mailto:[email protected]>> wrote: > > I don't believe the FlinkPipelineRunner is registered the same way the > Dataflow & Direct Pipeline runners are registered; using > org.apache.beam.runners.flink.FlinkPipelineRunner should work > > On Fri, Mar 18, 2016 at 2:10 PM, William McCarthy > <[email protected] <mailto:[email protected]>> wrote: > > Thanks Dan, > > I tried that, but getting the below. Note that the jar contains the > FlinkPipelineRunner. > > > > % jar -tf target/beam-1.0-SNAPSHOT.jar | grep FlinkPipeline > org/apache/beam/runners/flink/FlinkPipelineRunner.class > org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.class > org/apache/beam/runners/flink/FlinkPipelineOptions.class > org/apache/beam/runners/flink/translation/FlinkPipelineTranslator.class > > % flink run -c > com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample > target/beam-1.0-SNAPSHOT.jar --runner=FlinkPipelineRunner > --bootstrapServers=cl-pu4p:9092 --topics=test_in --outputTopic=test_out > > ------------------------------------------------------------ > The program finished with the following exception: > > org.apache.flink.client.program.ProgramInvocationException: The main > method caused an error. > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:520) > at > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403) > at org.apache.flink.client.program.Client.runBlocking(Client.java:248) > at > org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866) > at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333) > at > org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189) > at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239) > Caused by: java.lang.IllegalArgumentException: Unknown 'runner' specified > 'FlinkPipelineRunner', supported pipeline runners > [BlockingDataflowPipelineRunner, DataflowPipelineRunner, > DirectPipelineRunner] > at > com.google.cloud.dataflow.sdk.repackaged.com.google.common.base.Preconditions.checkArgument(Preconditions.java:146) > at > com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory.parseObjects(PipelineOptionsFactory.java:1445) > at > com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory.access$400(PipelineOptionsFactory.java:99) > at > com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory$Builder.as(PipelineOptionsFactory.java:284) > at > com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample.main(TopHashtagsExample.java:117) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505) > ... 6 more > > > > On Mar 18, 2016, at 5:00 PM, Dan Halperin <[email protected] > <mailto:[email protected]>> wrote: > > Thanks for catching that, Aljoscha! > > Note that the Flink runner should be available via a command-line option > as well: --runner=FlinkPipelineRunner. > > The list of valid values for that flag is computed by walking the > classpath at runtime, so as long as the Flink jar is present it'll work. > > On Fri, Mar 18, 2016 at 1:21 PM, Aljoscha Krettek <[email protected] > <mailto:[email protected]>> > wrote: > > Hi, > looks like the example is being executed with the DirectPipelineRunner > which does not seem to be able to cope with UnboundedSource. You need to set > the runner to the FlinkRunner in the example code as described here: > https://github.com/apache/incubator-beam/tree/master/runners/flink#executing-an-example > > <https://github.com/apache/incubator-beam/tree/master/runners/flink#executing-an-example> > > The Flink runner should be able to deal with UnboundedSource but has the > limitation that sources are always parallelism=1 (this is being worked on, > however). > > Cheers, > Aljoscha > On 18 Mar 2016, at 20:56, Dan Halperin <[email protected] > <mailto:[email protected]>> wrote: > > Looks like the Flink runner may not yet support arbitrary code written > with the UnboundedSource API. That is, it looks like the Flink runner > expects the sources to get translated away. > > Max? > > Dan > > On Fri, Mar 18, 2016 at 12:20 PM, William McCarthy > <[email protected] <mailto:[email protected]>> wrote: > Thanks Raghu, > > When I try to run it on flink using the incubator-beam code, i.e. > > flink run -c > com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample > target/beam-1.0-SNAPSHOT.jar --bootstrapServers=cl-pu4p:9092 > --topics=test_in --outputTopic=test_out > > I get this: > > org.apache.flink.client.program.ProgramInvocationException: The main > method caused an error. > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:520) > at > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403) > at > org.apache.flink.client.program.Client.runBlocking(Client.java:248) > at > org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866) > at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333) > at > org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189) > at > org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239) > Caused by: java.lang.IllegalStateException: no evaluator registered > for Read(UnboundedKafkaSource) > at > com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner$Evaluator.visitTransform(DirectPipelineRunner.java:852) > at > com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:219) > at > com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:215) > at > com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:215) > at > com.google.cloud.dataflow.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:102) > at > com.google.cloud.dataflow.sdk.Pipeline.traverseTopologically(Pipeline.java:259) > at > com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner$Evaluator.run(DirectPipelineRunner.java:814) > at > com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:526) > at > com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:96) > at com.google.cloud.dataflow.sdk.Pipeline.run(Pipeline.java:180) > at > com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample.main(TopHashtagsExample.java:140) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505) > ... 6 more > > Any ideas? > > Bill > > On Mar 18, 2016, at 2:47 PM, Raghu Angadi <[email protected] > <mailto:[email protected]>> wrote: > > Thanks for trying it. > > I fixed the CheckStyle error (not sure why my build is not failing). > Let me know if you see any issues running with Beam. I haven't tried it. I > should. In fact Daniel Halperin says my patch should be against Beam.. > > Raghu. > > On Fri, Mar 18, 2016 at 11:22 AM, William McCarthy > <[email protected] <mailto:[email protected]>> wrote: > Thanks JB, Emanuele, Kostas & Raghu, especially Kostas and Raghu for > pointing me to working code. > > I’m in the middle of a hack day at the moment, so the speed of your > responses has been very welcome. > > In the first instance, I’ll try using your changes, Raghu. I’ve > cloned your repo, switched to the kafka branch and built both contrib/kafka > and contrib/examples/kafka. The contrib/kafka initially failed with a > CheckStyle error > (/Users/bill/dev/DataflowJavaSDK/contrib/kafka/src/main/java/com/google/cloud/dataflow/contrib/kafka/KafkaIO.java:683:12: > 'private' modifier out of order with the JLS suggestions)… I’ve fixed that > in my local clone and now it’s building fine. I hope to be able to run your > contrib unchanged on top of the incubator-beam codebase, which will be what > I attempt to do now. > > Thanks again to all, for your swift help. > > Bill > > On Mar 18, 2016, at 12:55 PM, Raghu Angadi <[email protected] > <mailto:[email protected]>> > wrote: > > Hi Bill, > > We have fairly well tested patch for KafkaIO (pr #121). It will be > merged soon. The example there keeps track of top hashtags in 10 minute > sliding window and writes the results to another Kafka topic. Please try it > if you can. It is well tested on Google Cloud Dataflow. I have not run it > using Flink runner. > > Raghu. > > On Fri, Mar 18, 2016 at 9:46 AM, Kostas Kloudas > <[email protected] <mailto:[email protected]>> wrote: > Hello Bill, > > This is a known limitation of the Flink Runner. > There is a JIRA issue for that > https://issues.apache.org/jira/browse/BEAM-127 > <https://issues.apache.org/jira/browse/BEAM-127> > > A wrapper for Flink sinks will come soon and as Beam evolves, > a more Beam-y solution will come as well. > > Kostas > On Mar 18, 2016, at 5:23 PM, William McCarthy > <[email protected] <mailto:[email protected]>> wrote: > > Hi, > > I’m trying to write a proof-of-concept which takes messages from > Kafka, transforms them using Beam on Flink, then pushes the results onto a > different Kafka topic. > > I’ve used the KafkaWindowedWordCountExample as a starting point, > and that’s doing the first part of what I want to do, but it outputs to text > files as opposed to Kafka. FlinkKafkaProducer08 looks promising, but I can’t > figure out how to plug it into the pipeline. I was thinking that it would be > wrapped with an UnboundedFlinkSink, or some such, but that doesn’t seem to > exist. > > Any advice or thoughts on what I’m trying to do? > > I’m running the latest incubator-beam (as of last night from > Github), Flink 1.0.0 in cluster mode and Kafka 0.9.0.1, all on Google > Compute Engine (Debian Jessie). > > Thanks, > > Bill McCarthy > > > > > > > > > > > > > > -- > Jean-Baptiste Onofré > [email protected] <mailto:[email protected]> > http://blog.nanthrax.net <http://blog.nanthrax.net/> > Talend - http://www.talend.com <http://www.talend.com/> >
