Hi William, It has been merged. Feel free to try again.
Cheers, Max On Wed, Mar 23, 2016 at 5:09 PM, Maximilian Michels <[email protected]> wrote: > The pull request is here: https://github.com/apache/incubator-beam/pull/69 > > I'll merge it after the tests have passed and it has been reviewed. > > On Wed, Mar 23, 2016 at 3:12 PM, Maximilian Michels <[email protected]> wrote: >> Hi William, >> >> I started working on a fix and better testing of the >> UnboundedSourceWrapper. I'll get back to you shortly with a pull >> request that we should be able to merge soon. >> >> - Max >> >> On Wed, Mar 23, 2016 at 1:33 PM, Aljoscha Krettek <[email protected]> >> wrote: >>> Hi, >>> as far as I can see from a quick glance the problem is that >>> UnboundedSourceWrapper stores an instance of Reader that it gets from the >>> Source. The Reader is not Serializable while the UnboundedSource is. I >>> think the Reader should be initialized when actually running the source, in >>> the run() method. >>> >>> Cheers, >>> Aljoscha >>>> On 23 Mar 2016, at 13:07, William McCarthy <[email protected]> >>>> wrote: >>>> >>>> The first stack trace was from the latest runner, yes. I’ve pulled the >>>> very latest, just now, and still get the same thing. >>>> >>>> When I write ‘pulled the very latest’, here’s what I mean (all of the >>>> following commands, except the latest, finished with success): >>>> >>>> $ cd incubator-beam >>>> $ git pull >>>> …some output, then success >>>> $ git branch >>>> * master >>>> $ mvn -DskipTests clean install >>>> …some output, then success >>>> $ cd <my project> >>>> $ mvn -DskipTests clean install >>>> …some output, then success >>>> $ <command as output below> >>>> >>>> Bill >>>> >>>>> On Mar 22, 2016, at 5:29 AM, Maximilian Michels <[email protected]> wrote: >>>>> >>>>> Hi William, >>>>> >>>>> Is the first stack trace from the latest master? Using only the simple >>>>> class name of the Runner should actually work now. I've also added a >>>>> test to explicitly test that. >>>>> >>>>> The latter error, as Aljoscha pointed out, this due to batch >>>>> execution. Perhaps we could make it explicit during startup which mode >>>>> we're executing in. File a JIRA issue for that: >>>>> https://issues.apache.org/jira/browse/BEAM-139 >>>>> >>>>> - Max >>>>> >>>>> On Tue, Mar 22, 2016 at 10:00 AM, Aljoscha Krettek <[email protected]> >>>>> wrote: >>>>>> Hi, >>>>>> for Flink the runner can internally either translate to a batch job or a >>>>>> streaming job. Unbounded sources are not supported when running in batch >>>>>> mode so you have to somehow specify that you want to have streaming >>>>>> mode. StreamingOptions has method setStreaming, maybe you can specify >>>>>> “—streaming true” on the command line to set that flag. >>>>>> >>>>>> Cheers, >>>>>> Aljoscha >>>>>>> On 21 Mar 2016, at 23:39, William McCarthy <[email protected]> >>>>>>> wrote: >>>>>>> >>>>>>> I’ve attempted to run the TopHashtagsExample again, using both >>>>>>> ‘FlinkPipelineRunner’ and it's fully qualified name. Both bomb out, >>>>>>> though the latter gets further. I hope this helps, here is the output >>>>>>> of both: >>>>>>> >>>>>>> $ flink run -c >>>>>>> com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample >>>>>>> target/beam-1.0-SNAPSHOT.jar --runner=FlinkPipelineRunner >>>>>>> --bootstrapServers=cl-pu4p:9092 --topics=test_in --outputTopic=test_out >>>>>>> >>>>>>> ------------------------------------------------------------ >>>>>>> The program finished with the following exception: >>>>>>> >>>>>>> org.apache.flink.client.program.ProgramInvocationException: The main >>>>>>> method caused an error. >>>>>>> at >>>>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:520) >>>>>>> at >>>>>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403) >>>>>>> at >>>>>>> org.apache.flink.client.program.Client.runBlocking(Client.java:248) >>>>>>> at >>>>>>> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866) >>>>>>> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333) >>>>>>> at >>>>>>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189) >>>>>>> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239) >>>>>>> Caused by: java.lang.IllegalArgumentException: Unknown 'runner' >>>>>>> specified 'FlinkPipelineRunner', supported pipeline runners >>>>>>> [BlockingDataflowPipelineRunner, DataflowPipelineRunner, >>>>>>> DirectPipelineRunner] >>>>>>> at >>>>>>> com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory.parseObjects(PipelineOptionsFactory.java:1486) >>>>>>> at >>>>>>> com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory.access$400(PipelineOptionsFactory.java:101) >>>>>>> at >>>>>>> com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory$Builder.as(PipelineOptionsFactory.java:286) >>>>>>> at >>>>>>> com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample.main(TopHashtagsExample.java:117) >>>>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >>>>>>> at >>>>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >>>>>>> at >>>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>>>>>> at java.lang.reflect.Method.invoke(Method.java:498) >>>>>>> at >>>>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505) >>>>>>> ... 6 more >>>>>>> Caused by: java.lang.ClassNotFoundException: FlinkPipelineRunner >>>>>>> at java.net.URLClassLoader.findClass(URLClassLoader.java:381) >>>>>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:424) >>>>>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:357) >>>>>>> at java.lang.Class.forName0(Native Method) >>>>>>> at java.lang.Class.forName(Class.java:264) >>>>>>> at >>>>>>> com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory.parseObjects(PipelineOptionsFactory.java:1473) >>>>>>> ... 14 more >>>>>>> >>>>>>> >>>>>>> $ flink run -c >>>>>>> com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample >>>>>>> target/beam-1.0-SNAPSHOT.jar >>>>>>> --runner=org.apache.beam.runners.flink.FlinkPipelineRunner >>>>>>> --bootstrapServers=cl-pu4p:9092 --topics=test_in --outputTopic=test_out >>>>>>> >>>>>>> ------------------------------------------------------------ >>>>>>> The program finished with the following exception: >>>>>>> >>>>>>> org.apache.flink.client.program.ProgramInvocationException: The main >>>>>>> method caused an error. >>>>>>> at >>>>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:520) >>>>>>> at >>>>>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403) >>>>>>> at >>>>>>> org.apache.flink.client.program.Client.runBlocking(Client.java:248) >>>>>>> at >>>>>>> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866) >>>>>>> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333) >>>>>>> at >>>>>>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189) >>>>>>> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239) >>>>>>> Caused by: java.lang.UnsupportedOperationException: The transform >>>>>>> Read(UnboundedKafkaSource) is currently not supported. >>>>>>> at >>>>>>> org.apache.beam.runners.flink.translation.FlinkBatchPipelineTranslator.visitTransform(FlinkBatchPipelineTranslator.java:111) >>>>>>> at >>>>>>> com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:219) >>>>>>> at >>>>>>> com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:215) >>>>>>> at >>>>>>> com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:215) >>>>>>> at >>>>>>> com.google.cloud.dataflow.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:102) >>>>>>> at >>>>>>> com.google.cloud.dataflow.sdk.Pipeline.traverseTopologically(Pipeline.java:259) >>>>>>> at >>>>>>> org.apache.beam.runners.flink.translation.FlinkPipelineTranslator.translate(FlinkPipelineTranslator.java:34) >>>>>>> at >>>>>>> org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment.translate(FlinkPipelineExecutionEnvironment.java:130) >>>>>>> at >>>>>>> org.apache.beam.runners.flink.FlinkPipelineRunner.run(FlinkPipelineRunner.java:109) >>>>>>> at >>>>>>> org.apache.beam.runners.flink.FlinkPipelineRunner.run(FlinkPipelineRunner.java:50) >>>>>>> at com.google.cloud.dataflow.sdk.Pipeline.run(Pipeline.java:180) >>>>>>> at >>>>>>> com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample.main(TopHashtagsExample.java:140) >>>>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >>>>>>> at >>>>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >>>>>>> at >>>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>>>>>> at java.lang.reflect.Method.invoke(Method.java:498) >>>>>>> at >>>>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505) >>>>>>> ... 6 more >>>>>>> >>>>>>> >>>>>>>> On Mar 21, 2016, at 12:23 PM, Raghu Angadi <[email protected]> wrote: >>>>>>>> >>>>>>>> Thanks Max. >>>>>>>> >>>>>>>> Bill McCarthy, >>>>>>>> I know you are unblocked and KafkaWriter is good enough. Please try >>>>>>>> KafkaIO source from my branch with Flink runner if you get a chance. >>>>>>>> >>>>>>>> thanks, >>>>>>>> Raghu. >>>>>>>> >>>>>>>> On Mon, Mar 21, 2016 at 6:54 AM, Jean-Baptiste Onofré >>>>>>>> <[email protected]> wrote: >>>>>>>> Thanks for the update Max ! >>>>>>>> >>>>>>>> Regards >>>>>>>> JB >>>>>>>> >>>>>>>> >>>>>>>> On 03/21/2016 02:39 PM, Maximilian Michels wrote: >>>>>>>> FYI: The Runner registration has been fixed. The Flink runner >>>>>>>> explicitly registers as of [1]. Also, the SDK tries to look up the >>>>>>>> PipelineRunner class in case it has not been registered [2]. >>>>>>>> >>>>>>>> [1] https://github.com/apache/incubator-beam/pull/40 >>>>>>>> [2] https://github.com/apache/incubator-beam/pull/61 >>>>>>>> >>>>>>>> On Sat, Mar 19, 2016 at 6:43 PM, Maximilian Michels <[email protected]> >>>>>>>> wrote: >>>>>>>> Great to see such a lively discussion here. >>>>>>>> >>>>>>>> I think we'll support sinks through the Write interface (like in >>>>>>>> batched execution) and also have a dedicated wrapper for the Flink >>>>>>>> sinks. This is a very pressing but easy to solve issue of the Flink >>>>>>>> runner. Expect it to be in next week. >>>>>>>> >>>>>>>> Also, the proper registration of the runner is about to to be merged. >>>>>>>> We just need an ok from the contributor to merge the changes. >>>>>>>> >>>>>>>> Best, >>>>>>>> Max >>>>>>>> >>>>>>>> On Sat, Mar 19, 2016 at 12:42 AM, Dan Halperin <[email protected]> >>>>>>>> wrote: >>>>>>>> Thanks Bill! >>>>>>>> >>>>>>>> Filed https://issues.apache.org/jira/browse/BEAM-136, but I'm glad >>>>>>>> it's not >>>>>>>> blocking you! >>>>>>>> >>>>>>>> On Fri, Mar 18, 2016 at 4:04 PM, William McCarthy >>>>>>>> <[email protected]> wrote: >>>>>>>> >>>>>>>> I tried that, but still no dice: Just to be clear, it’s not a blocker >>>>>>>> for >>>>>>>> me, given that I have my example running, but for your information the >>>>>>>> exception is below. >>>>>>>> >>>>>>>> I’ll watch the commit log on the beam incubator and look forward to >>>>>>>> deleting my copy of Raghu’s contributions when they’re merger to >>>>>>>> master. >>>>>>>> >>>>>>>> Thanks again for everyone’s help, >>>>>>>> >>>>>>>> Bill >>>>>>>> >>>>>>>> >>>>>>>> Command followed by exception: >>>>>>>> >>>>>>>> $ flink run -c >>>>>>>> com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample >>>>>>>> target/beam-1.0-SNAPSHOT.jar >>>>>>>> --runner=org.apache.beam.runners.flink.FlinkPipelineRunner >>>>>>>> --bootstrapServers=cl-pu4p:9092 --topics=test_in --outputTopic=test_out >>>>>>>> >>>>>>>> ------------------------------------------------------------ >>>>>>>> The program finished with the following exception: >>>>>>>> >>>>>>>> org.apache.flink.client.program.ProgramInvocationException: The main >>>>>>>> method caused an error. >>>>>>>> at >>>>>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:520) >>>>>>>> at >>>>>>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403) >>>>>>>> at org.apache.flink.client.program.Client.runBlocking(Client.java:248) >>>>>>>> at >>>>>>>> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866) >>>>>>>> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333) >>>>>>>> at >>>>>>>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189) >>>>>>>> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239) >>>>>>>> Caused by: java.lang.IllegalArgumentException: Unknown 'runner' >>>>>>>> specified >>>>>>>> 'org.apache.beam.runners.flink.FlinkPipelineRunner', supported pipeline >>>>>>>> runners [BlockingDataflowPipelineRunner, DataflowPipelineRunner, >>>>>>>> DirectPipelineRunner] >>>>>>>> at >>>>>>>> com.google.cloud.dataflow.sdk.repackaged.com.google.common.base.Preconditions.checkArgument(Preconditions.java:146) >>>>>>>> at >>>>>>>> com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory.parseObjects(PipelineOptionsFactory.java:1445) >>>>>>>> at >>>>>>>> com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory.access$400(PipelineOptionsFactory.java:99) >>>>>>>> at >>>>>>>> com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory$Builder.as(PipelineOptionsFactory.java:284) >>>>>>>> at >>>>>>>> com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample.main(TopHashtagsExample.java:117) >>>>>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >>>>>>>> at >>>>>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >>>>>>>> at >>>>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>>>>>>> at java.lang.reflect.Method.invoke(Method.java:498) >>>>>>>> at >>>>>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505) >>>>>>>> ... 6 more >>>>>>>> >>>>>>>> On Mar 18, 2016, at 5:35 PM, Thomas Groh <[email protected]> wrote: >>>>>>>> >>>>>>>> I don't believe the FlinkPipelineRunner is registered the same way the >>>>>>>> Dataflow & Direct Pipeline runners are registered; using >>>>>>>> org.apache.beam.runners.flink.FlinkPipelineRunner should work >>>>>>>> >>>>>>>> On Fri, Mar 18, 2016 at 2:10 PM, William McCarthy >>>>>>>> <[email protected]> wrote: >>>>>>>> >>>>>>>> Thanks Dan, >>>>>>>> >>>>>>>> I tried that, but getting the below. Note that the jar contains the >>>>>>>> FlinkPipelineRunner. >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> % jar -tf target/beam-1.0-SNAPSHOT.jar | grep FlinkPipeline >>>>>>>> org/apache/beam/runners/flink/FlinkPipelineRunner.class >>>>>>>> org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.class >>>>>>>> org/apache/beam/runners/flink/FlinkPipelineOptions.class >>>>>>>> org/apache/beam/runners/flink/translation/FlinkPipelineTranslator.class >>>>>>>> >>>>>>>> % flink run -c >>>>>>>> com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample >>>>>>>> target/beam-1.0-SNAPSHOT.jar --runner=FlinkPipelineRunner >>>>>>>> --bootstrapServers=cl-pu4p:9092 --topics=test_in --outputTopic=test_out >>>>>>>> >>>>>>>> ------------------------------------------------------------ >>>>>>>> The program finished with the following exception: >>>>>>>> >>>>>>>> org.apache.flink.client.program.ProgramInvocationException: The main >>>>>>>> method caused an error. >>>>>>>> at >>>>>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:520) >>>>>>>> at >>>>>>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403) >>>>>>>> at org.apache.flink.client.program.Client.runBlocking(Client.java:248) >>>>>>>> at >>>>>>>> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866) >>>>>>>> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333) >>>>>>>> at >>>>>>>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189) >>>>>>>> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239) >>>>>>>> Caused by: java.lang.IllegalArgumentException: Unknown 'runner' >>>>>>>> specified >>>>>>>> 'FlinkPipelineRunner', supported pipeline runners >>>>>>>> [BlockingDataflowPipelineRunner, DataflowPipelineRunner, >>>>>>>> DirectPipelineRunner] >>>>>>>> at >>>>>>>> com.google.cloud.dataflow.sdk.repackaged.com.google.common.base.Preconditions.checkArgument(Preconditions.java:146) >>>>>>>> at >>>>>>>> com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory.parseObjects(PipelineOptionsFactory.java:1445) >>>>>>>> at >>>>>>>> com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory.access$400(PipelineOptionsFactory.java:99) >>>>>>>> at >>>>>>>> com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory$Builder.as(PipelineOptionsFactory.java:284) >>>>>>>> at >>>>>>>> com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample.main(TopHashtagsExample.java:117) >>>>>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >>>>>>>> at >>>>>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >>>>>>>> at >>>>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>>>>>>> at java.lang.reflect.Method.invoke(Method.java:498) >>>>>>>> at >>>>>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505) >>>>>>>> ... 6 more >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> On Mar 18, 2016, at 5:00 PM, Dan Halperin <[email protected]> wrote: >>>>>>>> >>>>>>>> Thanks for catching that, Aljoscha! >>>>>>>> >>>>>>>> Note that the Flink runner should be available via a command-line >>>>>>>> option >>>>>>>> as well: --runner=FlinkPipelineRunner. >>>>>>>> >>>>>>>> The list of valid values for that flag is computed by walking the >>>>>>>> classpath at runtime, so as long as the Flink jar is present it'll >>>>>>>> work. >>>>>>>> >>>>>>>> On Fri, Mar 18, 2016 at 1:21 PM, Aljoscha Krettek <[email protected]> >>>>>>>> wrote: >>>>>>>> >>>>>>>> Hi, >>>>>>>> looks like the example is being executed with the DirectPipelineRunner >>>>>>>> which does not seem to be able to cope with UnboundedSource. You need >>>>>>>> to set >>>>>>>> the runner to the FlinkRunner in the example code as described here: >>>>>>>> https://github.com/apache/incubator-beam/tree/master/runners/flink#executing-an-example >>>>>>>> >>>>>>>> The Flink runner should be able to deal with UnboundedSource but has >>>>>>>> the >>>>>>>> limitation that sources are always parallelism=1 (this is being worked >>>>>>>> on, >>>>>>>> however). >>>>>>>> >>>>>>>> Cheers, >>>>>>>> Aljoscha >>>>>>>> On 18 Mar 2016, at 20:56, Dan Halperin <[email protected]> wrote: >>>>>>>> >>>>>>>> Looks like the Flink runner may not yet support arbitrary code written >>>>>>>> with the UnboundedSource API. That is, it looks like the Flink runner >>>>>>>> expects the sources to get translated away. >>>>>>>> >>>>>>>> Max? >>>>>>>> >>>>>>>> Dan >>>>>>>> >>>>>>>> On Fri, Mar 18, 2016 at 12:20 PM, William McCarthy >>>>>>>> <[email protected]> wrote: >>>>>>>> Thanks Raghu, >>>>>>>> >>>>>>>> When I try to run it on flink using the incubator-beam code, i.e. >>>>>>>> >>>>>>>> flink run -c >>>>>>>> com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample >>>>>>>> target/beam-1.0-SNAPSHOT.jar --bootstrapServers=cl-pu4p:9092 >>>>>>>> --topics=test_in --outputTopic=test_out >>>>>>>> >>>>>>>> I get this: >>>>>>>> >>>>>>>> org.apache.flink.client.program.ProgramInvocationException: The main >>>>>>>> method caused an error. >>>>>>>> at >>>>>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:520) >>>>>>>> at >>>>>>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403) >>>>>>>> at >>>>>>>> org.apache.flink.client.program.Client.runBlocking(Client.java:248) >>>>>>>> at >>>>>>>> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866) >>>>>>>> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333) >>>>>>>> at >>>>>>>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189) >>>>>>>> at >>>>>>>> org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239) >>>>>>>> Caused by: java.lang.IllegalStateException: no evaluator registered >>>>>>>> for Read(UnboundedKafkaSource) >>>>>>>> at >>>>>>>> com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner$Evaluator.visitTransform(DirectPipelineRunner.java:852) >>>>>>>> at >>>>>>>> com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:219) >>>>>>>> at >>>>>>>> com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:215) >>>>>>>> at >>>>>>>> com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:215) >>>>>>>> at >>>>>>>> com.google.cloud.dataflow.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:102) >>>>>>>> at >>>>>>>> com.google.cloud.dataflow.sdk.Pipeline.traverseTopologically(Pipeline.java:259) >>>>>>>> at >>>>>>>> com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner$Evaluator.run(DirectPipelineRunner.java:814) >>>>>>>> at >>>>>>>> com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:526) >>>>>>>> at >>>>>>>> com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:96) >>>>>>>> at com.google.cloud.dataflow.sdk.Pipeline.run(Pipeline.java:180) >>>>>>>> at >>>>>>>> com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample.main(TopHashtagsExample.java:140) >>>>>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >>>>>>>> at >>>>>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >>>>>>>> at >>>>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>>>>>>> at java.lang.reflect.Method.invoke(Method.java:498) >>>>>>>> at >>>>>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505) >>>>>>>> ... 6 more >>>>>>>> >>>>>>>> Any ideas? >>>>>>>> >>>>>>>> Bill >>>>>>>> >>>>>>>> On Mar 18, 2016, at 2:47 PM, Raghu Angadi <[email protected]> wrote: >>>>>>>> >>>>>>>> Thanks for trying it. >>>>>>>> >>>>>>>> I fixed the CheckStyle error (not sure why my build is not failing). >>>>>>>> Let me know if you see any issues running with Beam. I haven't tried >>>>>>>> it. I >>>>>>>> should. In fact Daniel Halperin says my patch should be against Beam.. >>>>>>>> >>>>>>>> Raghu. >>>>>>>> >>>>>>>> On Fri, Mar 18, 2016 at 11:22 AM, William McCarthy >>>>>>>> <[email protected]> wrote: >>>>>>>> Thanks JB, Emanuele, Kostas & Raghu, especially Kostas and Raghu for >>>>>>>> pointing me to working code. >>>>>>>> >>>>>>>> I’m in the middle of a hack day at the moment, so the speed of your >>>>>>>> responses has been very welcome. >>>>>>>> >>>>>>>> In the first instance, I’ll try using your changes, Raghu. I’ve >>>>>>>> cloned your repo, switched to the kafka branch and built both >>>>>>>> contrib/kafka >>>>>>>> and contrib/examples/kafka. The contrib/kafka initially failed with a >>>>>>>> CheckStyle error >>>>>>>> (/Users/bill/dev/DataflowJavaSDK/contrib/kafka/src/main/java/com/google/cloud/dataflow/contrib/kafka/KafkaIO.java:683:12: >>>>>>>> 'private' modifier out of order with the JLS suggestions)… I’ve fixed >>>>>>>> that >>>>>>>> in my local clone and now it’s building fine. I hope to be able to run >>>>>>>> your >>>>>>>> contrib unchanged on top of the incubator-beam codebase, which will be >>>>>>>> what >>>>>>>> I attempt to do now. >>>>>>>> >>>>>>>> Thanks again to all, for your swift help. >>>>>>>> >>>>>>>> Bill >>>>>>>> >>>>>>>> On Mar 18, 2016, at 12:55 PM, Raghu Angadi <[email protected]> >>>>>>>> wrote: >>>>>>>> >>>>>>>> Hi Bill, >>>>>>>> >>>>>>>> We have fairly well tested patch for KafkaIO (pr #121). It will be >>>>>>>> merged soon. The example there keeps track of top hashtags in 10 minute >>>>>>>> sliding window and writes the results to another Kafka topic. Please >>>>>>>> try it >>>>>>>> if you can. It is well tested on Google Cloud Dataflow. I have not run >>>>>>>> it >>>>>>>> using Flink runner. >>>>>>>> >>>>>>>> Raghu. >>>>>>>> >>>>>>>> On Fri, Mar 18, 2016 at 9:46 AM, Kostas Kloudas >>>>>>>> <[email protected]> wrote: >>>>>>>> Hello Bill, >>>>>>>> >>>>>>>> This is a known limitation of the Flink Runner. >>>>>>>> There is a JIRA issue for that >>>>>>>> https://issues.apache.org/jira/browse/BEAM-127 >>>>>>>> >>>>>>>> A wrapper for Flink sinks will come soon and as Beam evolves, >>>>>>>> a more Beam-y solution will come as well. >>>>>>>> >>>>>>>> Kostas >>>>>>>> On Mar 18, 2016, at 5:23 PM, William McCarthy >>>>>>>> <[email protected]> wrote: >>>>>>>> >>>>>>>> Hi, >>>>>>>> >>>>>>>> I’m trying to write a proof-of-concept which takes messages from >>>>>>>> Kafka, transforms them using Beam on Flink, then pushes the results >>>>>>>> onto a >>>>>>>> different Kafka topic. >>>>>>>> >>>>>>>> I’ve used the KafkaWindowedWordCountExample as a starting point, >>>>>>>> and that’s doing the first part of what I want to do, but it outputs >>>>>>>> to text >>>>>>>> files as opposed to Kafka. FlinkKafkaProducer08 looks promising, but I >>>>>>>> can’t >>>>>>>> figure out how to plug it into the pipeline. I was thinking that it >>>>>>>> would be >>>>>>>> wrapped with an UnboundedFlinkSink, or some such, but that doesn’t >>>>>>>> seem to >>>>>>>> exist. >>>>>>>> >>>>>>>> Any advice or thoughts on what I’m trying to do? >>>>>>>> >>>>>>>> I’m running the latest incubator-beam (as of last night from >>>>>>>> Github), Flink 1.0.0 in cluster mode and Kafka 0.9.0.1, all on Google >>>>>>>> Compute Engine (Debian Jessie). >>>>>>>> >>>>>>>> Thanks, >>>>>>>> >>>>>>>> Bill McCarthy >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> -- >>>>>>>> Jean-Baptiste Onofré >>>>>>>> [email protected] >>>>>>>> http://blog.nanthrax.net >>>>>>>> Talend - http://www.talend.com >>>>>>>> >>>>>>> >>>>>> >>>> >>>
