Thanks Max. I'm happy to move forward with the fully qualified class name for the runner.
Bill > On Mar 24, 2016, at 5:19 AM, Maximilian Michels <[email protected]> wrote: > > Cool. Great to hear it is up and running :) > > @William: I suspect the automatic detection doesn't work because the > registered runners which are put in a META-INF file are not copied > over to your Jar. The fully-qualified class name always works because > the Beam code performs a lookup of the class in your Jar. For the > simple name, it needs to know the available runners in advance. I'll > check if there is an easy way to make this work with your custom jar. > > > >> On Thu, Mar 24, 2016 at 1:53 AM, Raghu Angadi <[email protected]> wrote: >> Good to hear that. >> >> oh, that json is 5 years old. But it should not cause parsing errors. Might >> be some string escaping issue with the generator. Note that my app tolerates >> missing fields (wrong schema), but does not catch json parsing errors. >> >> Raghu. >> >> >> On Wed, Mar 23, 2016 at 5:45 PM, William McCarthy >> <[email protected]> wrote: >>> >>> I’m cannot send the logs due to confidentiality constraints. >>> >>> However, when I look at the taskmanager log, I see some JSON parsing >>> issues. So I suspect that the tweets I’m sending through (borrowed from >>> here, with an added hashtag: https://gist.github.com/hrp/900964 ) do not >>> conform to the schema you’re expecting. >>> >>> You sent through kafka topic information for your tweet stream out of >>> band, and I’m connected to that now. It appears that now the output is >>> getting printed to the taskmanager log file, which is what’s expected. So I >>> think that this issue is now explained. >>> >>> Thanks, >>> >>> Bill >>> >>> >>> On Mar 23, 2016, at 6:33 PM, Raghu Angadi <[email protected]> wrote: >>> >>> It should work. Do you have log from one of the workers by any chance? I >>> will send you another kafka endpoint you could try running against. >>> >>> Raghu. >>> >>> On Wed, Mar 23, 2016 at 3:12 PM, William McCarthy >>> <[email protected]> wrote: >>>> >>>> Raghu, >>>> >>>> Sorry, I may have spoken too soon when I said that it “worked”. >>>> >>>> The code did not throw any exceptions, and seemed to start up happily. I >>>> then tried injecting a number of “tweets” (with hashtags) into my test_in >>>> kafka topic. I’ve been waiting for more than 10 mins, now, and I see >>>> nothing >>>> on my test_out kafka topic. I expected to see the top hashtags to show up >>>> there within a 10 minute window. >>>> >>>> Am I misunderstanding something? >>>> >>>> To help debug the issue: Previously, I noticed that if I injected a >>>> random string into the test_in topic, the Beam job fell over with a Jackson >>>> JsonMappingException, so it would appear that the job is getting my >>>> messages. But when I restarted the Beam job and inject 3 correctly formed >>>> tweet message, something is silently being dropped between there and the >>>> output kafka topic. >>>> >>>> Bill >>>> >>>> On Mar 23, 2016, at 5:53 PM, Raghu Angadi <[email protected]> wrote: >>>> >>>> great news! Thanks for trying multiple fixes. and thanks for Max and >>>> Aljoscha multiple fixes. >>>> >>>> Raghu >>>> >>>> On Wed, Mar 23, 2016 at 2:51 PM, William McCarthy >>>> <[email protected]> wrote: >>>>> >>>>> Thanks Max, >>>>> >>>>> This command now works, thanks: >>>>> >>>>> $ flink run -c >>>>> com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample >>>>> target/beam-1.0-SNAPSHOT.jar >>>>> --runner=org.apache.beam.runners.flink.FlinkPipelineRunner >>>>> --bootstrapServers=cl-pu4p:9092 --topics=test_in --outputTopic=test_out >>>>> --streaming=true >>>>> >>>>> Note that I’m still unable to use --runner=FlinkPipelineRunner (i.e. >>>>> without the package scope). That behaviour is unexpected to you, given the >>>>> test that you put in a few days ago, right? i.e. >>>>> >>>>> $ flink run -c >>>>> com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample >>>>> target/beam-1.0-SNAPSHOT.jar --runner=FlinkPipelineRunner >>>>> --bootstrapServers=cl-pu4p:9092 --topics=test_in --outputTopic=test_out >>>>> >>>>> ------------------------------------------------------------ >>>>> The program finished with the following exception: >>>>> >>>>> org.apache.flink.client.program.ProgramInvocationException: The main >>>>> method caused an error. >>>>> at >>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:520) >>>>> at >>>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403) >>>>> at >>>>> org.apache.flink.client.program.Client.runBlocking(Client.java:248) >>>>> at >>>>> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866) >>>>> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333) >>>>> at >>>>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189) >>>>> at >>>>> org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239) >>>>> Caused by: java.lang.IllegalArgumentException: Unknown 'runner' >>>>> specified 'FlinkPipelineRunner', supported pipeline runners >>>>> [BlockingDataflowPipelineRunner, DataflowPipelineRunner, >>>>> DirectPipelineRunner] >>>>> at >>>>> com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory.parseObjects(PipelineOptionsFactory.java:1486) >>>>> at >>>>> com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory.access$400(PipelineOptionsFactory.java:101) >>>>> at >>>>> com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory$Builder.as(PipelineOptionsFactory.java:286) >>>>> at >>>>> com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample.main(TopHashtagsExample.java:117) >>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >>>>> at >>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >>>>> at >>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>>>> at java.lang.reflect.Method.invoke(Method.java:498) >>>>> at >>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505) >>>>> ... 6 more >>>>> Caused by: java.lang.ClassNotFoundException: FlinkPipelineRunner >>>>> at java.net.URLClassLoader.findClass(URLClassLoader.java:381) >>>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:424) >>>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:357) >>>>> at java.lang.Class.forName0(Native Method) >>>>> at java.lang.Class.forName(Class.java:264) >>>>> at >>>>> com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory.parseObjects(PipelineOptionsFactory.java:1473) >>>>> ... 14 more >>>>> >>>>> >>>>>> On Mar 23, 2016, at 2:42 PM, Maximilian Michels <[email protected]> >>>>>> wrote: >>>>>> >>>>>> Hi William, >>>>>> >>>>>> It has been merged. Feel free to try again. >>>>>> >>>>>> Cheers, >>>>>> Max >>>>>> >>>>>> On Wed, Mar 23, 2016 at 5:09 PM, Maximilian Michels <[email protected]> >>>>>> wrote: >>>>>>> The pull request is here: >>>>>>> https://github.com/apache/incubator-beam/pull/69 >>>>>>> >>>>>>> I'll merge it after the tests have passed and it has been reviewed. >>>>>>> >>>>>>> On Wed, Mar 23, 2016 at 3:12 PM, Maximilian Michels <[email protected]> >>>>>>> wrote: >>>>>>>> Hi William, >>>>>>>> >>>>>>>> I started working on a fix and better testing of the >>>>>>>> UnboundedSourceWrapper. I'll get back to you shortly with a pull >>>>>>>> request that we should be able to merge soon. >>>>>>>> >>>>>>>> - Max >>>>>>>> >>>>>>>> On Wed, Mar 23, 2016 at 1:33 PM, Aljoscha Krettek >>>>>>>> <[email protected]> wrote: >>>>>>>>> Hi, >>>>>>>>> as far as I can see from a quick glance the problem is that >>>>>>>>> UnboundedSourceWrapper stores an instance of Reader that it gets from >>>>>>>>> the >>>>>>>>> Source. The Reader is not Serializable while the UnboundedSource is. >>>>>>>>> I think >>>>>>>>> the Reader should be initialized when actually running the source, in >>>>>>>>> the >>>>>>>>> run() method. >>>>>>>>> >>>>>>>>> Cheers, >>>>>>>>> Aljoscha >>>>>>>>>> On 23 Mar 2016, at 13:07, William McCarthy >>>>>>>>>> <[email protected]> wrote: >>>>>>>>>> >>>>>>>>>> The first stack trace was from the latest runner, yes. I’ve pulled >>>>>>>>>> the very latest, just now, and still get the same thing. >>>>>>>>>> >>>>>>>>>> When I write ‘pulled the very latest’, here’s what I mean (all of >>>>>>>>>> the following commands, except the latest, finished with success): >>>>>>>>>> >>>>>>>>>> $ cd incubator-beam >>>>>>>>>> $ git pull >>>>>>>>>> …some output, then success >>>>>>>>>> $ git branch >>>>>>>>>> * master >>>>>>>>>> $ mvn -DskipTests clean install >>>>>>>>>> …some output, then success >>>>>>>>>> $ cd <my project> >>>>>>>>>> $ mvn -DskipTests clean install >>>>>>>>>> …some output, then success >>>>>>>>>> $ <command as output below> >>>>>>>>>> >>>>>>>>>> Bill >>>>>>>>>> >>>>>>>>>>> On Mar 22, 2016, at 5:29 AM, Maximilian Michels <[email protected]> >>>>>>>>>>> wrote: >>>>>>>>>>> >>>>>>>>>>> Hi William, >>>>>>>>>>> >>>>>>>>>>> Is the first stack trace from the latest master? Using only the >>>>>>>>>>> simple >>>>>>>>>>> class name of the Runner should actually work now. I've also >>>>>>>>>>> added a >>>>>>>>>>> test to explicitly test that. >>>>>>>>>>> >>>>>>>>>>> The latter error, as Aljoscha pointed out, this due to batch >>>>>>>>>>> execution. Perhaps we could make it explicit during startup which >>>>>>>>>>> mode >>>>>>>>>>> we're executing in. File a JIRA issue for that: >>>>>>>>>>> https://issues.apache.org/jira/browse/BEAM-139 >>>>>>>>>>> >>>>>>>>>>> - Max >>>>>>>>>>> >>>>>>>>>>> On Tue, Mar 22, 2016 at 10:00 AM, Aljoscha Krettek >>>>>>>>>>> <[email protected]> wrote: >>>>>>>>>>>> Hi, >>>>>>>>>>>> for Flink the runner can internally either translate to a batch >>>>>>>>>>>> job or a streaming job. Unbounded sources are not supported when >>>>>>>>>>>> running in >>>>>>>>>>>> batch mode so you have to somehow specify that you want to have >>>>>>>>>>>> streaming >>>>>>>>>>>> mode. StreamingOptions has method setStreaming, maybe you can >>>>>>>>>>>> specify >>>>>>>>>>>> “—streaming true” on the command line to set that flag. >>>>>>>>>>>> >>>>>>>>>>>> Cheers, >>>>>>>>>>>> Aljoscha >>>>>>>>>>>>> On 21 Mar 2016, at 23:39, William McCarthy >>>>>>>>>>>>> <[email protected]> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>> I’ve attempted to run the TopHashtagsExample again, using both >>>>>>>>>>>>> ‘FlinkPipelineRunner’ and it's fully qualified name. Both bomb >>>>>>>>>>>>> out, though >>>>>>>>>>>>> the latter gets further. I hope this helps, here is the output of >>>>>>>>>>>>> both: >>>>>>>>>>>>> >>>>>>>>>>>>> $ flink run -c >>>>>>>>>>>>> com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample >>>>>>>>>>>>> target/beam-1.0-SNAPSHOT.jar --runner=FlinkPipelineRunner >>>>>>>>>>>>> --bootstrapServers=cl-pu4p:9092 --topics=test_in >>>>>>>>>>>>> --outputTopic=test_out >>>>>>>>>>>>> >>>>>>>>>>>>> ------------------------------------------------------------ >>>>>>>>>>>>> The program finished with the following exception: >>>>>>>>>>>>> >>>>>>>>>>>>> org.apache.flink.client.program.ProgramInvocationException: The >>>>>>>>>>>>> main method caused an error. >>>>>>>>>>>>> at >>>>>>>>>>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:520) >>>>>>>>>>>>> at >>>>>>>>>>>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403) >>>>>>>>>>>>> at >>>>>>>>>>>>> org.apache.flink.client.program.Client.runBlocking(Client.java:248) >>>>>>>>>>>>> at >>>>>>>>>>>>> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866) >>>>>>>>>>>>> at >>>>>>>>>>>>> org.apache.flink.client.CliFrontend.run(CliFrontend.java:333) >>>>>>>>>>>>> at >>>>>>>>>>>>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189) >>>>>>>>>>>>> at >>>>>>>>>>>>> org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239) >>>>>>>>>>>>> Caused by: java.lang.IllegalArgumentException: Unknown 'runner' >>>>>>>>>>>>> specified 'FlinkPipelineRunner', supported pipeline runners >>>>>>>>>>>>> [BlockingDataflowPipelineRunner, DataflowPipelineRunner, >>>>>>>>>>>>> DirectPipelineRunner] >>>>>>>>>>>>> at >>>>>>>>>>>>> com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory.parseObjects(PipelineOptionsFactory.java:1486) >>>>>>>>>>>>> at >>>>>>>>>>>>> com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory.access$400(PipelineOptionsFactory.java:101) >>>>>>>>>>>>> at >>>>>>>>>>>>> com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory$Builder.as(PipelineOptionsFactory.java:286) >>>>>>>>>>>>> at >>>>>>>>>>>>> com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample.main(TopHashtagsExample.java:117) >>>>>>>>>>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native >>>>>>>>>>>>> Method) >>>>>>>>>>>>> at >>>>>>>>>>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >>>>>>>>>>>>> at >>>>>>>>>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>>>>>>>>>>>> at java.lang.reflect.Method.invoke(Method.java:498) >>>>>>>>>>>>> at >>>>>>>>>>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505) >>>>>>>>>>>>> ... 6 more >>>>>>>>>>>>> Caused by: java.lang.ClassNotFoundException: >>>>>>>>>>>>> FlinkPipelineRunner >>>>>>>>>>>>> at >>>>>>>>>>>>> java.net.URLClassLoader.findClass(URLClassLoader.java:381) >>>>>>>>>>>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:424) >>>>>>>>>>>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:357) >>>>>>>>>>>>> at java.lang.Class.forName0(Native Method) >>>>>>>>>>>>> at java.lang.Class.forName(Class.java:264) >>>>>>>>>>>>> at >>>>>>>>>>>>> com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory.parseObjects(PipelineOptionsFactory.java:1473) >>>>>>>>>>>>> ... 14 more >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> $ flink run -c >>>>>>>>>>>>> com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample >>>>>>>>>>>>> target/beam-1.0-SNAPSHOT.jar >>>>>>>>>>>>> --runner=org.apache.beam.runners.flink.FlinkPipelineRunner >>>>>>>>>>>>> --bootstrapServers=cl-pu4p:9092 --topics=test_in >>>>>>>>>>>>> --outputTopic=test_out >>>>>>>>>>>>> >>>>>>>>>>>>> ------------------------------------------------------------ >>>>>>>>>>>>> The program finished with the following exception: >>>>>>>>>>>>> >>>>>>>>>>>>> org.apache.flink.client.program.ProgramInvocationException: The >>>>>>>>>>>>> main method caused an error. >>>>>>>>>>>>> at >>>>>>>>>>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:520) >>>>>>>>>>>>> at >>>>>>>>>>>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403) >>>>>>>>>>>>> at >>>>>>>>>>>>> org.apache.flink.client.program.Client.runBlocking(Client.java:248) >>>>>>>>>>>>> at >>>>>>>>>>>>> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866) >>>>>>>>>>>>> at >>>>>>>>>>>>> org.apache.flink.client.CliFrontend.run(CliFrontend.java:333) >>>>>>>>>>>>> at >>>>>>>>>>>>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189) >>>>>>>>>>>>> at >>>>>>>>>>>>> org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239) >>>>>>>>>>>>> Caused by: java.lang.UnsupportedOperationException: The >>>>>>>>>>>>> transform Read(UnboundedKafkaSource) is currently not supported. >>>>>>>>>>>>> at >>>>>>>>>>>>> org.apache.beam.runners.flink.translation.FlinkBatchPipelineTranslator.visitTransform(FlinkBatchPipelineTranslator.java:111) >>>>>>>>>>>>> at >>>>>>>>>>>>> com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:219) >>>>>>>>>>>>> at >>>>>>>>>>>>> com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:215) >>>>>>>>>>>>> at >>>>>>>>>>>>> com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:215) >>>>>>>>>>>>> at >>>>>>>>>>>>> com.google.cloud.dataflow.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:102) >>>>>>>>>>>>> at >>>>>>>>>>>>> com.google.cloud.dataflow.sdk.Pipeline.traverseTopologically(Pipeline.java:259) >>>>>>>>>>>>> at >>>>>>>>>>>>> org.apache.beam.runners.flink.translation.FlinkPipelineTranslator.translate(FlinkPipelineTranslator.java:34) >>>>>>>>>>>>> at >>>>>>>>>>>>> org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment.translate(FlinkPipelineExecutionEnvironment.java:130) >>>>>>>>>>>>> at >>>>>>>>>>>>> org.apache.beam.runners.flink.FlinkPipelineRunner.run(FlinkPipelineRunner.java:109) >>>>>>>>>>>>> at >>>>>>>>>>>>> org.apache.beam.runners.flink.FlinkPipelineRunner.run(FlinkPipelineRunner.java:50) >>>>>>>>>>>>> at >>>>>>>>>>>>> com.google.cloud.dataflow.sdk.Pipeline.run(Pipeline.java:180) >>>>>>>>>>>>> at >>>>>>>>>>>>> com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample.main(TopHashtagsExample.java:140) >>>>>>>>>>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native >>>>>>>>>>>>> Method) >>>>>>>>>>>>> at >>>>>>>>>>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >>>>>>>>>>>>> at >>>>>>>>>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>>>>>>>>>>>> at java.lang.reflect.Method.invoke(Method.java:498) >>>>>>>>>>>>> at >>>>>>>>>>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505) >>>>>>>>>>>>> ... 6 more >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>>> On Mar 21, 2016, at 12:23 PM, Raghu Angadi >>>>>>>>>>>>>> <[email protected]> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>> Thanks Max. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Bill McCarthy, >>>>>>>>>>>>>> I know you are unblocked and KafkaWriter is good enough. >>>>>>>>>>>>>> Please try KafkaIO source from my branch with Flink runner if >>>>>>>>>>>>>> you get a >>>>>>>>>>>>>> chance. >>>>>>>>>>>>>> >>>>>>>>>>>>>> thanks, >>>>>>>>>>>>>> Raghu. >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Mon, Mar 21, 2016 at 6:54 AM, Jean-Baptiste Onofré >>>>>>>>>>>>>> <[email protected]> wrote: >>>>>>>>>>>>>> Thanks for the update Max ! >>>>>>>>>>>>>> >>>>>>>>>>>>>> Regards >>>>>>>>>>>>>> JB >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> On 03/21/2016 02:39 PM, Maximilian Michels wrote: >>>>>>>>>>>>>> FYI: The Runner registration has been fixed. The Flink runner >>>>>>>>>>>>>> explicitly registers as of [1]. Also, the SDK tries to look up >>>>>>>>>>>>>> the >>>>>>>>>>>>>> PipelineRunner class in case it has not been registered [2]. >>>>>>>>>>>>>> >>>>>>>>>>>>>> [1] https://github.com/apache/incubator-beam/pull/40 >>>>>>>>>>>>>> [2] https://github.com/apache/incubator-beam/pull/61 >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Sat, Mar 19, 2016 at 6:43 PM, Maximilian Michels >>>>>>>>>>>>>> <[email protected]> wrote: >>>>>>>>>>>>>> Great to see such a lively discussion here. >>>>>>>>>>>>>> >>>>>>>>>>>>>> I think we'll support sinks through the Write interface (like >>>>>>>>>>>>>> in >>>>>>>>>>>>>> batched execution) and also have a dedicated wrapper for the >>>>>>>>>>>>>> Flink >>>>>>>>>>>>>> sinks. This is a very pressing but easy to solve issue of the >>>>>>>>>>>>>> Flink >>>>>>>>>>>>>> runner. Expect it to be in next week. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Also, the proper registration of the runner is about to to be >>>>>>>>>>>>>> merged. >>>>>>>>>>>>>> We just need an ok from the contributor to merge the changes. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Best, >>>>>>>>>>>>>> Max >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Sat, Mar 19, 2016 at 12:42 AM, Dan Halperin >>>>>>>>>>>>>> <[email protected]> wrote: >>>>>>>>>>>>>> Thanks Bill! >>>>>>>>>>>>>> >>>>>>>>>>>>>> Filed https://issues.apache.org/jira/browse/BEAM-136, but I'm >>>>>>>>>>>>>> glad it's not >>>>>>>>>>>>>> blocking you! >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Fri, Mar 18, 2016 at 4:04 PM, William McCarthy >>>>>>>>>>>>>> <[email protected]> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>> I tried that, but still no dice: Just to be clear, it’s not a >>>>>>>>>>>>>> blocker for >>>>>>>>>>>>>> me, given that I have my example running, but for your >>>>>>>>>>>>>> information the >>>>>>>>>>>>>> exception is below. >>>>>>>>>>>>>> >>>>>>>>>>>>>> I’ll watch the commit log on the beam incubator and look >>>>>>>>>>>>>> forward to >>>>>>>>>>>>>> deleting my copy of Raghu’s contributions when they’re merger >>>>>>>>>>>>>> to master. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Thanks again for everyone’s help, >>>>>>>>>>>>>> >>>>>>>>>>>>>> Bill >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> Command followed by exception: >>>>>>>>>>>>>> >>>>>>>>>>>>>> $ flink run -c >>>>>>>>>>>>>> >>>>>>>>>>>>>> com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample >>>>>>>>>>>>>> target/beam-1.0-SNAPSHOT.jar >>>>>>>>>>>>>> --runner=org.apache.beam.runners.flink.FlinkPipelineRunner >>>>>>>>>>>>>> --bootstrapServers=cl-pu4p:9092 --topics=test_in >>>>>>>>>>>>>> --outputTopic=test_out >>>>>>>>>>>>>> >>>>>>>>>>>>>> ------------------------------------------------------------ >>>>>>>>>>>>>> The program finished with the following exception: >>>>>>>>>>>>>> >>>>>>>>>>>>>> org.apache.flink.client.program.ProgramInvocationException: >>>>>>>>>>>>>> The main >>>>>>>>>>>>>> method caused an error. >>>>>>>>>>>>>> at >>>>>>>>>>>>>> >>>>>>>>>>>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:520) >>>>>>>>>>>>>> at >>>>>>>>>>>>>> >>>>>>>>>>>>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403) >>>>>>>>>>>>>> at >>>>>>>>>>>>>> org.apache.flink.client.program.Client.runBlocking(Client.java:248) >>>>>>>>>>>>>> at >>>>>>>>>>>>>> >>>>>>>>>>>>>> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866) >>>>>>>>>>>>>> at >>>>>>>>>>>>>> org.apache.flink.client.CliFrontend.run(CliFrontend.java:333) >>>>>>>>>>>>>> at >>>>>>>>>>>>>> >>>>>>>>>>>>>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189) >>>>>>>>>>>>>> at >>>>>>>>>>>>>> org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239) >>>>>>>>>>>>>> Caused by: java.lang.IllegalArgumentException: Unknown >>>>>>>>>>>>>> 'runner' specified >>>>>>>>>>>>>> 'org.apache.beam.runners.flink.FlinkPipelineRunner', supported >>>>>>>>>>>>>> pipeline >>>>>>>>>>>>>> runners [BlockingDataflowPipelineRunner, >>>>>>>>>>>>>> DataflowPipelineRunner, >>>>>>>>>>>>>> DirectPipelineRunner] >>>>>>>>>>>>>> at >>>>>>>>>>>>>> >>>>>>>>>>>>>> com.google.cloud.dataflow.sdk.repackaged.com.google.common.base.Preconditions.checkArgument(Preconditions.java:146) >>>>>>>>>>>>>> at >>>>>>>>>>>>>> >>>>>>>>>>>>>> com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory.parseObjects(PipelineOptionsFactory.java:1445) >>>>>>>>>>>>>> at >>>>>>>>>>>>>> >>>>>>>>>>>>>> com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory.access$400(PipelineOptionsFactory.java:99) >>>>>>>>>>>>>> at >>>>>>>>>>>>>> >>>>>>>>>>>>>> com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory$Builder.as(PipelineOptionsFactory.java:284) >>>>>>>>>>>>>> at >>>>>>>>>>>>>> >>>>>>>>>>>>>> com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample.main(TopHashtagsExample.java:117) >>>>>>>>>>>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >>>>>>>>>>>>>> at >>>>>>>>>>>>>> >>>>>>>>>>>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >>>>>>>>>>>>>> at >>>>>>>>>>>>>> >>>>>>>>>>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>>>>>>>>>>>>> at java.lang.reflect.Method.invoke(Method.java:498) >>>>>>>>>>>>>> at >>>>>>>>>>>>>> >>>>>>>>>>>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505) >>>>>>>>>>>>>> ... 6 more >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Mar 18, 2016, at 5:35 PM, Thomas Groh <[email protected]> >>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>> I don't believe the FlinkPipelineRunner is registered the same >>>>>>>>>>>>>> way the >>>>>>>>>>>>>> Dataflow & Direct Pipeline runners are registered; using >>>>>>>>>>>>>> org.apache.beam.runners.flink.FlinkPipelineRunner should work >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Fri, Mar 18, 2016 at 2:10 PM, William McCarthy >>>>>>>>>>>>>> <[email protected]> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>> Thanks Dan, >>>>>>>>>>>>>> >>>>>>>>>>>>>> I tried that, but getting the below. Note that the jar >>>>>>>>>>>>>> contains the >>>>>>>>>>>>>> FlinkPipelineRunner. >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> % jar -tf target/beam-1.0-SNAPSHOT.jar | grep FlinkPipeline >>>>>>>>>>>>>> org/apache/beam/runners/flink/FlinkPipelineRunner.class >>>>>>>>>>>>>> >>>>>>>>>>>>>> org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.class >>>>>>>>>>>>>> org/apache/beam/runners/flink/FlinkPipelineOptions.class >>>>>>>>>>>>>> >>>>>>>>>>>>>> org/apache/beam/runners/flink/translation/FlinkPipelineTranslator.class >>>>>>>>>>>>>> >>>>>>>>>>>>>> % flink run -c >>>>>>>>>>>>>> >>>>>>>>>>>>>> com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample >>>>>>>>>>>>>> target/beam-1.0-SNAPSHOT.jar --runner=FlinkPipelineRunner >>>>>>>>>>>>>> --bootstrapServers=cl-pu4p:9092 --topics=test_in >>>>>>>>>>>>>> --outputTopic=test_out >>>>>>>>>>>>>> >>>>>>>>>>>>>> ------------------------------------------------------------ >>>>>>>>>>>>>> The program finished with the following exception: >>>>>>>>>>>>>> >>>>>>>>>>>>>> org.apache.flink.client.program.ProgramInvocationException: >>>>>>>>>>>>>> The main >>>>>>>>>>>>>> method caused an error. >>>>>>>>>>>>>> at >>>>>>>>>>>>>> >>>>>>>>>>>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:520) >>>>>>>>>>>>>> at >>>>>>>>>>>>>> >>>>>>>>>>>>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403) >>>>>>>>>>>>>> at >>>>>>>>>>>>>> org.apache.flink.client.program.Client.runBlocking(Client.java:248) >>>>>>>>>>>>>> at >>>>>>>>>>>>>> >>>>>>>>>>>>>> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866) >>>>>>>>>>>>>> at >>>>>>>>>>>>>> org.apache.flink.client.CliFrontend.run(CliFrontend.java:333) >>>>>>>>>>>>>> at >>>>>>>>>>>>>> >>>>>>>>>>>>>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189) >>>>>>>>>>>>>> at >>>>>>>>>>>>>> org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239) >>>>>>>>>>>>>> Caused by: java.lang.IllegalArgumentException: Unknown >>>>>>>>>>>>>> 'runner' specified >>>>>>>>>>>>>> 'FlinkPipelineRunner', supported pipeline runners >>>>>>>>>>>>>> [BlockingDataflowPipelineRunner, DataflowPipelineRunner, >>>>>>>>>>>>>> DirectPipelineRunner] >>>>>>>>>>>>>> at >>>>>>>>>>>>>> >>>>>>>>>>>>>> com.google.cloud.dataflow.sdk.repackaged.com.google.common.base.Preconditions.checkArgument(Preconditions.java:146) >>>>>>>>>>>>>> at >>>>>>>>>>>>>> >>>>>>>>>>>>>> com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory.parseObjects(PipelineOptionsFactory.java:1445) >>>>>>>>>>>>>> at >>>>>>>>>>>>>> >>>>>>>>>>>>>> com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory.access$400(PipelineOptionsFactory.java:99) >>>>>>>>>>>>>> at >>>>>>>>>>>>>> >>>>>>>>>>>>>> com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory$Builder.as(PipelineOptionsFactory.java:284) >>>>>>>>>>>>>> at >>>>>>>>>>>>>> >>>>>>>>>>>>>> com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample.main(TopHashtagsExample.java:117) >>>>>>>>>>>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >>>>>>>>>>>>>> at >>>>>>>>>>>>>> >>>>>>>>>>>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >>>>>>>>>>>>>> at >>>>>>>>>>>>>> >>>>>>>>>>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>>>>>>>>>>>>> at java.lang.reflect.Method.invoke(Method.java:498) >>>>>>>>>>>>>> at >>>>>>>>>>>>>> >>>>>>>>>>>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505) >>>>>>>>>>>>>> ... 6 more >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Mar 18, 2016, at 5:00 PM, Dan Halperin >>>>>>>>>>>>>> <[email protected]> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>> Thanks for catching that, Aljoscha! >>>>>>>>>>>>>> >>>>>>>>>>>>>> Note that the Flink runner should be available via a >>>>>>>>>>>>>> command-line option >>>>>>>>>>>>>> as well: --runner=FlinkPipelineRunner. >>>>>>>>>>>>>> >>>>>>>>>>>>>> The list of valid values for that flag is computed by walking >>>>>>>>>>>>>> the >>>>>>>>>>>>>> classpath at runtime, so as long as the Flink jar is present >>>>>>>>>>>>>> it'll work. >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Fri, Mar 18, 2016 at 1:21 PM, Aljoscha Krettek >>>>>>>>>>>>>> <[email protected]> >>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>> Hi, >>>>>>>>>>>>>> looks like the example is being executed with the >>>>>>>>>>>>>> DirectPipelineRunner >>>>>>>>>>>>>> which does not seem to be able to cope with UnboundedSource. >>>>>>>>>>>>>> You need to set >>>>>>>>>>>>>> the runner to the FlinkRunner in the example code as described >>>>>>>>>>>>>> here: >>>>>>>>>>>>>> >>>>>>>>>>>>>> https://github.com/apache/incubator-beam/tree/master/runners/flink#executing-an-example >>>>>>>>>>>>>> >>>>>>>>>>>>>> The Flink runner should be able to deal with UnboundedSource >>>>>>>>>>>>>> but has the >>>>>>>>>>>>>> limitation that sources are always parallelism=1 (this is >>>>>>>>>>>>>> being worked on, >>>>>>>>>>>>>> however). >>>>>>>>>>>>>> >>>>>>>>>>>>>> Cheers, >>>>>>>>>>>>>> Aljoscha >>>>>>>>>>>>>> On 18 Mar 2016, at 20:56, Dan Halperin <[email protected]> >>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>> Looks like the Flink runner may not yet support arbitrary code >>>>>>>>>>>>>> written >>>>>>>>>>>>>> with the UnboundedSource API. That is, it looks like the Flink >>>>>>>>>>>>>> runner >>>>>>>>>>>>>> expects the sources to get translated away. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Max? >>>>>>>>>>>>>> >>>>>>>>>>>>>> Dan >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Fri, Mar 18, 2016 at 12:20 PM, William McCarthy >>>>>>>>>>>>>> <[email protected]> wrote: >>>>>>>>>>>>>> Thanks Raghu, >>>>>>>>>>>>>> >>>>>>>>>>>>>> When I try to run it on flink using the incubator-beam code, >>>>>>>>>>>>>> i.e. >>>>>>>>>>>>>> >>>>>>>>>>>>>> flink run -c >>>>>>>>>>>>>> >>>>>>>>>>>>>> com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample >>>>>>>>>>>>>> target/beam-1.0-SNAPSHOT.jar --bootstrapServers=cl-pu4p:9092 >>>>>>>>>>>>>> --topics=test_in --outputTopic=test_out >>>>>>>>>>>>>> >>>>>>>>>>>>>> I get this: >>>>>>>>>>>>>> >>>>>>>>>>>>>> org.apache.flink.client.program.ProgramInvocationException: >>>>>>>>>>>>>> The main >>>>>>>>>>>>>> method caused an error. >>>>>>>>>>>>>> at >>>>>>>>>>>>>> >>>>>>>>>>>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:520) >>>>>>>>>>>>>> at >>>>>>>>>>>>>> >>>>>>>>>>>>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403) >>>>>>>>>>>>>> at >>>>>>>>>>>>>> >>>>>>>>>>>>>> org.apache.flink.client.program.Client.runBlocking(Client.java:248) >>>>>>>>>>>>>> at >>>>>>>>>>>>>> >>>>>>>>>>>>>> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866) >>>>>>>>>>>>>> at >>>>>>>>>>>>>> org.apache.flink.client.CliFrontend.run(CliFrontend.java:333) >>>>>>>>>>>>>> at >>>>>>>>>>>>>> >>>>>>>>>>>>>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189) >>>>>>>>>>>>>> at >>>>>>>>>>>>>> >>>>>>>>>>>>>> org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239) >>>>>>>>>>>>>> Caused by: java.lang.IllegalStateException: no evaluator >>>>>>>>>>>>>> registered >>>>>>>>>>>>>> for Read(UnboundedKafkaSource) >>>>>>>>>>>>>> at >>>>>>>>>>>>>> >>>>>>>>>>>>>> com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner$Evaluator.visitTransform(DirectPipelineRunner.java:852) >>>>>>>>>>>>>> at >>>>>>>>>>>>>> >>>>>>>>>>>>>> com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:219) >>>>>>>>>>>>>> at >>>>>>>>>>>>>> >>>>>>>>>>>>>> com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:215) >>>>>>>>>>>>>> at >>>>>>>>>>>>>> >>>>>>>>>>>>>> com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:215) >>>>>>>>>>>>>> at >>>>>>>>>>>>>> >>>>>>>>>>>>>> com.google.cloud.dataflow.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:102) >>>>>>>>>>>>>> at >>>>>>>>>>>>>> >>>>>>>>>>>>>> com.google.cloud.dataflow.sdk.Pipeline.traverseTopologically(Pipeline.java:259) >>>>>>>>>>>>>> at >>>>>>>>>>>>>> >>>>>>>>>>>>>> com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner$Evaluator.run(DirectPipelineRunner.java:814) >>>>>>>>>>>>>> at >>>>>>>>>>>>>> >>>>>>>>>>>>>> com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:526) >>>>>>>>>>>>>> at >>>>>>>>>>>>>> >>>>>>>>>>>>>> com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:96) >>>>>>>>>>>>>> at >>>>>>>>>>>>>> com.google.cloud.dataflow.sdk.Pipeline.run(Pipeline.java:180) >>>>>>>>>>>>>> at >>>>>>>>>>>>>> >>>>>>>>>>>>>> com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample.main(TopHashtagsExample.java:140) >>>>>>>>>>>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native >>>>>>>>>>>>>> Method) >>>>>>>>>>>>>> at >>>>>>>>>>>>>> >>>>>>>>>>>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >>>>>>>>>>>>>> at >>>>>>>>>>>>>> >>>>>>>>>>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>>>>>>>>>>>>> at java.lang.reflect.Method.invoke(Method.java:498) >>>>>>>>>>>>>> at >>>>>>>>>>>>>> >>>>>>>>>>>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505) >>>>>>>>>>>>>> ... 6 more >>>>>>>>>>>>>> >>>>>>>>>>>>>> Any ideas? >>>>>>>>>>>>>> >>>>>>>>>>>>>> Bill >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Mar 18, 2016, at 2:47 PM, Raghu Angadi <[email protected]> >>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>> Thanks for trying it. >>>>>>>>>>>>>> >>>>>>>>>>>>>> I fixed the CheckStyle error (not sure why my build is not >>>>>>>>>>>>>> failing). >>>>>>>>>>>>>> Let me know if you see any issues running with Beam. I haven't >>>>>>>>>>>>>> tried it. I >>>>>>>>>>>>>> should. In fact Daniel Halperin says my patch should be >>>>>>>>>>>>>> against Beam.. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Raghu. >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Fri, Mar 18, 2016 at 11:22 AM, William McCarthy >>>>>>>>>>>>>> <[email protected]> wrote: >>>>>>>>>>>>>> Thanks JB, Emanuele, Kostas & Raghu, especially Kostas and >>>>>>>>>>>>>> Raghu for >>>>>>>>>>>>>> pointing me to working code. >>>>>>>>>>>>>> >>>>>>>>>>>>>> I’m in the middle of a hack day at the moment, so the speed of >>>>>>>>>>>>>> your >>>>>>>>>>>>>> responses has been very welcome. >>>>>>>>>>>>>> >>>>>>>>>>>>>> In the first instance, I’ll try using your changes, Raghu. >>>>>>>>>>>>>> I’ve >>>>>>>>>>>>>> cloned your repo, switched to the kafka branch and built both >>>>>>>>>>>>>> contrib/kafka >>>>>>>>>>>>>> and contrib/examples/kafka. The contrib/kafka initially failed >>>>>>>>>>>>>> with a >>>>>>>>>>>>>> CheckStyle error >>>>>>>>>>>>>> >>>>>>>>>>>>>> (/Users/bill/dev/DataflowJavaSDK/contrib/kafka/src/main/java/com/google/cloud/dataflow/contrib/kafka/KafkaIO.java:683:12: >>>>>>>>>>>>>> 'private' modifier out of order with the JLS suggestions)… >>>>>>>>>>>>>> I’ve fixed that >>>>>>>>>>>>>> in my local clone and now it’s building fine. I hope to be >>>>>>>>>>>>>> able to run your >>>>>>>>>>>>>> contrib unchanged on top of the incubator-beam codebase, which >>>>>>>>>>>>>> will be what >>>>>>>>>>>>>> I attempt to do now. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Thanks again to all, for your swift help. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Bill >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Mar 18, 2016, at 12:55 PM, Raghu Angadi >>>>>>>>>>>>>> <[email protected]> >>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>> Hi Bill, >>>>>>>>>>>>>> >>>>>>>>>>>>>> We have fairly well tested patch for KafkaIO (pr #121). It >>>>>>>>>>>>>> will be >>>>>>>>>>>>>> merged soon. The example there keeps track of top hashtags in >>>>>>>>>>>>>> 10 minute >>>>>>>>>>>>>> sliding window and writes the results to another Kafka topic. >>>>>>>>>>>>>> Please try it >>>>>>>>>>>>>> if you can. It is well tested on Google Cloud Dataflow. I have >>>>>>>>>>>>>> not run it >>>>>>>>>>>>>> using Flink runner. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Raghu. >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Fri, Mar 18, 2016 at 9:46 AM, Kostas Kloudas >>>>>>>>>>>>>> <[email protected]> wrote: >>>>>>>>>>>>>> Hello Bill, >>>>>>>>>>>>>> >>>>>>>>>>>>>> This is a known limitation of the Flink Runner. >>>>>>>>>>>>>> There is a JIRA issue for that >>>>>>>>>>>>>> https://issues.apache.org/jira/browse/BEAM-127 >>>>>>>>>>>>>> >>>>>>>>>>>>>> A wrapper for Flink sinks will come soon and as Beam evolves, >>>>>>>>>>>>>> a more Beam-y solution will come as well. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Kostas >>>>>>>>>>>>>> On Mar 18, 2016, at 5:23 PM, William McCarthy >>>>>>>>>>>>>> <[email protected]> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>> Hi, >>>>>>>>>>>>>> >>>>>>>>>>>>>> I’m trying to write a proof-of-concept which takes messages >>>>>>>>>>>>>> from >>>>>>>>>>>>>> Kafka, transforms them using Beam on Flink, then pushes the >>>>>>>>>>>>>> results onto a >>>>>>>>>>>>>> different Kafka topic. >>>>>>>>>>>>>> >>>>>>>>>>>>>> I’ve used the KafkaWindowedWordCountExample as a starting >>>>>>>>>>>>>> point, >>>>>>>>>>>>>> and that’s doing the first part of what I want to do, but it >>>>>>>>>>>>>> outputs to text >>>>>>>>>>>>>> files as opposed to Kafka. FlinkKafkaProducer08 looks >>>>>>>>>>>>>> promising, but I can’t >>>>>>>>>>>>>> figure out how to plug it into the pipeline. I was thinking >>>>>>>>>>>>>> that it would be >>>>>>>>>>>>>> wrapped with an UnboundedFlinkSink, or some such, but that >>>>>>>>>>>>>> doesn’t seem to >>>>>>>>>>>>>> exist. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Any advice or thoughts on what I’m trying to do? >>>>>>>>>>>>>> >>>>>>>>>>>>>> I’m running the latest incubator-beam (as of last night from >>>>>>>>>>>>>> Github), Flink 1.0.0 in cluster mode and Kafka 0.9.0.1, all on >>>>>>>>>>>>>> Google >>>>>>>>>>>>>> Compute Engine (Debian Jessie). >>>>>>>>>>>>>> >>>>>>>>>>>>>> Thanks, >>>>>>>>>>>>>> >>>>>>>>>>>>>> Bill McCarthy >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> -- >>>>>>>>>>>>>> Jean-Baptiste Onofré >>>>>>>>>>>>>> [email protected] >>>>>>>>>>>>>> http://blog.nanthrax.net >>>>>>>>>>>>>> Talend - http://www.talend.com >
