great news! Thanks for trying multiple fixes. and thanks for Max and Aljoscha multiple fixes.
Raghu On Wed, Mar 23, 2016 at 2:51 PM, William McCarthy <[email protected] > wrote: > Thanks Max, > > This command now works, thanks: > > $ flink run -c > com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample > target/beam-1.0-SNAPSHOT.jar > --runner=org.apache.beam.runners.flink.FlinkPipelineRunner > --bootstrapServers=cl-pu4p:9092 --topics=test_in --outputTopic=test_out > --streaming=true > > Note that I’m still unable to use --runner=FlinkPipelineRunner (i.e. > without the package scope). That behaviour is unexpected to you, given the > test that you put in a few days ago, right? i.e. > > $ flink run -c > com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample > target/beam-1.0-SNAPSHOT.jar --runner=FlinkPipelineRunner > --bootstrapServers=cl-pu4p:9092 --topics=test_in --outputTopic=test_out > > ------------------------------------------------------------ > The program finished with the following exception: > > org.apache.flink.client.program.ProgramInvocationException: The main > method caused an error. > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:520) > at > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403) > at > org.apache.flink.client.program.Client.runBlocking(Client.java:248) > at > org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866) > at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333) > at > org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189) > at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239) > Caused by: java.lang.IllegalArgumentException: Unknown 'runner' specified > 'FlinkPipelineRunner', supported pipeline runners > [BlockingDataflowPipelineRunner, DataflowPipelineRunner, > DirectPipelineRunner] > at > com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory.parseObjects(PipelineOptionsFactory.java:1486) > at > com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory.access$400(PipelineOptionsFactory.java:101) > at > com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory$Builder.as(PipelineOptionsFactory.java:286) > at > com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample.main(TopHashtagsExample.java:117) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505) > ... 6 more > Caused by: java.lang.ClassNotFoundException: FlinkPipelineRunner > at java.net.URLClassLoader.findClass(URLClassLoader.java:381) > at java.lang.ClassLoader.loadClass(ClassLoader.java:424) > at java.lang.ClassLoader.loadClass(ClassLoader.java:357) > at java.lang.Class.forName0(Native Method) > at java.lang.Class.forName(Class.java:264) > at > com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory.parseObjects(PipelineOptionsFactory.java:1473) > ... 14 more > > > > On Mar 23, 2016, at 2:42 PM, Maximilian Michels <[email protected]> wrote: > > > > Hi William, > > > > It has been merged. Feel free to try again. > > > > Cheers, > > Max > > > > On Wed, Mar 23, 2016 at 5:09 PM, Maximilian Michels <[email protected]> > wrote: > >> The pull request is here: > https://github.com/apache/incubator-beam/pull/69 > >> > >> I'll merge it after the tests have passed and it has been reviewed. > >> > >> On Wed, Mar 23, 2016 at 3:12 PM, Maximilian Michels <[email protected]> > wrote: > >>> Hi William, > >>> > >>> I started working on a fix and better testing of the > >>> UnboundedSourceWrapper. I'll get back to you shortly with a pull > >>> request that we should be able to merge soon. > >>> > >>> - Max > >>> > >>> On Wed, Mar 23, 2016 at 1:33 PM, Aljoscha Krettek <[email protected]> > wrote: > >>>> Hi, > >>>> as far as I can see from a quick glance the problem is that > UnboundedSourceWrapper stores an instance of Reader that it gets from the > Source. The Reader is not Serializable while the UnboundedSource is. I > think the Reader should be initialized when actually running the source, in > the run() method. > >>>> > >>>> Cheers, > >>>> Aljoscha > >>>>> On 23 Mar 2016, at 13:07, William McCarthy < > [email protected]> wrote: > >>>>> > >>>>> The first stack trace was from the latest runner, yes. I’ve pulled > the very latest, just now, and still get the same thing. > >>>>> > >>>>> When I write ‘pulled the very latest’, here’s what I mean (all of > the following commands, except the latest, finished with success): > >>>>> > >>>>> $ cd incubator-beam > >>>>> $ git pull > >>>>> …some output, then success > >>>>> $ git branch > >>>>> * master > >>>>> $ mvn -DskipTests clean install > >>>>> …some output, then success > >>>>> $ cd <my project> > >>>>> $ mvn -DskipTests clean install > >>>>> …some output, then success > >>>>> $ <command as output below> > >>>>> > >>>>> Bill > >>>>> > >>>>>> On Mar 22, 2016, at 5:29 AM, Maximilian Michels <[email protected]> > wrote: > >>>>>> > >>>>>> Hi William, > >>>>>> > >>>>>> Is the first stack trace from the latest master? Using only the > simple > >>>>>> class name of the Runner should actually work now. I've also added a > >>>>>> test to explicitly test that. > >>>>>> > >>>>>> The latter error, as Aljoscha pointed out, this due to batch > >>>>>> execution. Perhaps we could make it explicit during startup which > mode > >>>>>> we're executing in. File a JIRA issue for that: > >>>>>> https://issues.apache.org/jira/browse/BEAM-139 > >>>>>> > >>>>>> - Max > >>>>>> > >>>>>> On Tue, Mar 22, 2016 at 10:00 AM, Aljoscha Krettek < > [email protected]> wrote: > >>>>>>> Hi, > >>>>>>> for Flink the runner can internally either translate to a batch > job or a streaming job. Unbounded sources are not supported when running in > batch mode so you have to somehow specify that you want to have streaming > mode. StreamingOptions has method setStreaming, maybe you can specify > “—streaming true” on the command line to set that flag. > >>>>>>> > >>>>>>> Cheers, > >>>>>>> Aljoscha > >>>>>>>> On 21 Mar 2016, at 23:39, William McCarthy < > [email protected]> wrote: > >>>>>>>> > >>>>>>>> I’ve attempted to run the TopHashtagsExample again, using both > ‘FlinkPipelineRunner’ and it's fully qualified name. Both bomb out, though > the latter gets further. I hope this helps, here is the output of both: > >>>>>>>> > >>>>>>>> $ flink run -c > com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample > target/beam-1.0-SNAPSHOT.jar --runner=FlinkPipelineRunner > --bootstrapServers=cl-pu4p:9092 --topics=test_in --outputTopic=test_out > >>>>>>>> > >>>>>>>> ------------------------------------------------------------ > >>>>>>>> The program finished with the following exception: > >>>>>>>> > >>>>>>>> org.apache.flink.client.program.ProgramInvocationException: The > main method caused an error. > >>>>>>>> at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:520) > >>>>>>>> at > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403) > >>>>>>>> at > org.apache.flink.client.program.Client.runBlocking(Client.java:248) > >>>>>>>> at > org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866) > >>>>>>>> at > org.apache.flink.client.CliFrontend.run(CliFrontend.java:333) > >>>>>>>> at > org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189) > >>>>>>>> at > org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239) > >>>>>>>> Caused by: java.lang.IllegalArgumentException: Unknown 'runner' > specified 'FlinkPipelineRunner', supported pipeline runners > [BlockingDataflowPipelineRunner, DataflowPipelineRunner, > DirectPipelineRunner] > >>>>>>>> at > com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory.parseObjects(PipelineOptionsFactory.java:1486) > >>>>>>>> at > com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory.access$400(PipelineOptionsFactory.java:101) > >>>>>>>> at > com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory$Builder.as(PipelineOptionsFactory.java:286) > >>>>>>>> at > com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample.main(TopHashtagsExample.java:117) > >>>>>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > >>>>>>>> at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > >>>>>>>> at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > >>>>>>>> at java.lang.reflect.Method.invoke(Method.java:498) > >>>>>>>> at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505) > >>>>>>>> ... 6 more > >>>>>>>> Caused by: java.lang.ClassNotFoundException: FlinkPipelineRunner > >>>>>>>> at java.net.URLClassLoader.findClass(URLClassLoader.java:381) > >>>>>>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:424) > >>>>>>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:357) > >>>>>>>> at java.lang.Class.forName0(Native Method) > >>>>>>>> at java.lang.Class.forName(Class.java:264) > >>>>>>>> at > com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory.parseObjects(PipelineOptionsFactory.java:1473) > >>>>>>>> ... 14 more > >>>>>>>> > >>>>>>>> > >>>>>>>> $ flink run -c > com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample > target/beam-1.0-SNAPSHOT.jar > --runner=org.apache.beam.runners.flink.FlinkPipelineRunner > --bootstrapServers=cl-pu4p:9092 --topics=test_in --outputTopic=test_out > >>>>>>>> > >>>>>>>> ------------------------------------------------------------ > >>>>>>>> The program finished with the following exception: > >>>>>>>> > >>>>>>>> org.apache.flink.client.program.ProgramInvocationException: The > main method caused an error. > >>>>>>>> at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:520) > >>>>>>>> at > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403) > >>>>>>>> at > org.apache.flink.client.program.Client.runBlocking(Client.java:248) > >>>>>>>> at > org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866) > >>>>>>>> at > org.apache.flink.client.CliFrontend.run(CliFrontend.java:333) > >>>>>>>> at > org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189) > >>>>>>>> at > org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239) > >>>>>>>> Caused by: java.lang.UnsupportedOperationException: The transform > Read(UnboundedKafkaSource) is currently not supported. > >>>>>>>> at > org.apache.beam.runners.flink.translation.FlinkBatchPipelineTranslator.visitTransform(FlinkBatchPipelineTranslator.java:111) > >>>>>>>> at > com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:219) > >>>>>>>> at > com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:215) > >>>>>>>> at > com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:215) > >>>>>>>> at > com.google.cloud.dataflow.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:102) > >>>>>>>> at > com.google.cloud.dataflow.sdk.Pipeline.traverseTopologically(Pipeline.java:259) > >>>>>>>> at > org.apache.beam.runners.flink.translation.FlinkPipelineTranslator.translate(FlinkPipelineTranslator.java:34) > >>>>>>>> at > org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment.translate(FlinkPipelineExecutionEnvironment.java:130) > >>>>>>>> at > org.apache.beam.runners.flink.FlinkPipelineRunner.run(FlinkPipelineRunner.java:109) > >>>>>>>> at > org.apache.beam.runners.flink.FlinkPipelineRunner.run(FlinkPipelineRunner.java:50) > >>>>>>>> at > com.google.cloud.dataflow.sdk.Pipeline.run(Pipeline.java:180) > >>>>>>>> at > com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample.main(TopHashtagsExample.java:140) > >>>>>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > >>>>>>>> at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > >>>>>>>> at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > >>>>>>>> at java.lang.reflect.Method.invoke(Method.java:498) > >>>>>>>> at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505) > >>>>>>>> ... 6 more > >>>>>>>> > >>>>>>>> > >>>>>>>>> On Mar 21, 2016, at 12:23 PM, Raghu Angadi <[email protected]> > wrote: > >>>>>>>>> > >>>>>>>>> Thanks Max. > >>>>>>>>> > >>>>>>>>> Bill McCarthy, > >>>>>>>>> I know you are unblocked and KafkaWriter is good enough. Please > try KafkaIO source from my branch with Flink runner if you get a chance. > >>>>>>>>> > >>>>>>>>> thanks, > >>>>>>>>> Raghu. > >>>>>>>>> > >>>>>>>>> On Mon, Mar 21, 2016 at 6:54 AM, Jean-Baptiste Onofré < > [email protected]> wrote: > >>>>>>>>> Thanks for the update Max ! > >>>>>>>>> > >>>>>>>>> Regards > >>>>>>>>> JB > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> On 03/21/2016 02:39 PM, Maximilian Michels wrote: > >>>>>>>>> FYI: The Runner registration has been fixed. The Flink runner > >>>>>>>>> explicitly registers as of [1]. Also, the SDK tries to look up > the > >>>>>>>>> PipelineRunner class in case it has not been registered [2]. > >>>>>>>>> > >>>>>>>>> [1] https://github.com/apache/incubator-beam/pull/40 > >>>>>>>>> [2] https://github.com/apache/incubator-beam/pull/61 > >>>>>>>>> > >>>>>>>>> On Sat, Mar 19, 2016 at 6:43 PM, Maximilian Michels < > [email protected]> wrote: > >>>>>>>>> Great to see such a lively discussion here. > >>>>>>>>> > >>>>>>>>> I think we'll support sinks through the Write interface (like in > >>>>>>>>> batched execution) and also have a dedicated wrapper for the > Flink > >>>>>>>>> sinks. This is a very pressing but easy to solve issue of the > Flink > >>>>>>>>> runner. Expect it to be in next week. > >>>>>>>>> > >>>>>>>>> Also, the proper registration of the runner is about to to be > merged. > >>>>>>>>> We just need an ok from the contributor to merge the changes. > >>>>>>>>> > >>>>>>>>> Best, > >>>>>>>>> Max > >>>>>>>>> > >>>>>>>>> On Sat, Mar 19, 2016 at 12:42 AM, Dan Halperin < > [email protected]> wrote: > >>>>>>>>> Thanks Bill! > >>>>>>>>> > >>>>>>>>> Filed https://issues.apache.org/jira/browse/BEAM-136, but I'm > glad it's not > >>>>>>>>> blocking you! > >>>>>>>>> > >>>>>>>>> On Fri, Mar 18, 2016 at 4:04 PM, William McCarthy > >>>>>>>>> <[email protected]> wrote: > >>>>>>>>> > >>>>>>>>> I tried that, but still no dice: Just to be clear, it’s not a > blocker for > >>>>>>>>> me, given that I have my example running, but for your > information the > >>>>>>>>> exception is below. > >>>>>>>>> > >>>>>>>>> I’ll watch the commit log on the beam incubator and look forward > to > >>>>>>>>> deleting my copy of Raghu’s contributions when they’re merger to > master. > >>>>>>>>> > >>>>>>>>> Thanks again for everyone’s help, > >>>>>>>>> > >>>>>>>>> Bill > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> Command followed by exception: > >>>>>>>>> > >>>>>>>>> $ flink run -c > >>>>>>>>> > com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample > >>>>>>>>> target/beam-1.0-SNAPSHOT.jar > >>>>>>>>> --runner=org.apache.beam.runners.flink.FlinkPipelineRunner > >>>>>>>>> --bootstrapServers=cl-pu4p:9092 --topics=test_in > --outputTopic=test_out > >>>>>>>>> > >>>>>>>>> ------------------------------------------------------------ > >>>>>>>>> The program finished with the following exception: > >>>>>>>>> > >>>>>>>>> org.apache.flink.client.program.ProgramInvocationException: The > main > >>>>>>>>> method caused an error. > >>>>>>>>> at > >>>>>>>>> > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:520) > >>>>>>>>> at > >>>>>>>>> > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403) > >>>>>>>>> at > org.apache.flink.client.program.Client.runBlocking(Client.java:248) > >>>>>>>>> at > >>>>>>>>> > org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866) > >>>>>>>>> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333) > >>>>>>>>> at > >>>>>>>>> > org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189) > >>>>>>>>> at > org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239) > >>>>>>>>> Caused by: java.lang.IllegalArgumentException: Unknown 'runner' > specified > >>>>>>>>> 'org.apache.beam.runners.flink.FlinkPipelineRunner', supported > pipeline > >>>>>>>>> runners [BlockingDataflowPipelineRunner, DataflowPipelineRunner, > >>>>>>>>> DirectPipelineRunner] > >>>>>>>>> at > >>>>>>>>> > com.google.cloud.dataflow.sdk.repackaged.com.google.common.base.Preconditions.checkArgument(Preconditions.java:146) > >>>>>>>>> at > >>>>>>>>> > com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory.parseObjects(PipelineOptionsFactory.java:1445) > >>>>>>>>> at > >>>>>>>>> > com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory.access$400(PipelineOptionsFactory.java:99) > >>>>>>>>> at > >>>>>>>>> > com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory$Builder.as(PipelineOptionsFactory.java:284) > >>>>>>>>> at > >>>>>>>>> > com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample.main(TopHashtagsExample.java:117) > >>>>>>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > >>>>>>>>> at > >>>>>>>>> > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > >>>>>>>>> at > >>>>>>>>> > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > >>>>>>>>> at java.lang.reflect.Method.invoke(Method.java:498) > >>>>>>>>> at > >>>>>>>>> > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505) > >>>>>>>>> ... 6 more > >>>>>>>>> > >>>>>>>>> On Mar 18, 2016, at 5:35 PM, Thomas Groh <[email protected]> > wrote: > >>>>>>>>> > >>>>>>>>> I don't believe the FlinkPipelineRunner is registered the same > way the > >>>>>>>>> Dataflow & Direct Pipeline runners are registered; using > >>>>>>>>> org.apache.beam.runners.flink.FlinkPipelineRunner should work > >>>>>>>>> > >>>>>>>>> On Fri, Mar 18, 2016 at 2:10 PM, William McCarthy > >>>>>>>>> <[email protected]> wrote: > >>>>>>>>> > >>>>>>>>> Thanks Dan, > >>>>>>>>> > >>>>>>>>> I tried that, but getting the below. Note that the jar contains > the > >>>>>>>>> FlinkPipelineRunner. > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> % jar -tf target/beam-1.0-SNAPSHOT.jar | grep FlinkPipeline > >>>>>>>>> org/apache/beam/runners/flink/FlinkPipelineRunner.class > >>>>>>>>> > org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.class > >>>>>>>>> org/apache/beam/runners/flink/FlinkPipelineOptions.class > >>>>>>>>> > org/apache/beam/runners/flink/translation/FlinkPipelineTranslator.class > >>>>>>>>> > >>>>>>>>> % flink run -c > >>>>>>>>> > com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample > >>>>>>>>> target/beam-1.0-SNAPSHOT.jar --runner=FlinkPipelineRunner > >>>>>>>>> --bootstrapServers=cl-pu4p:9092 --topics=test_in > --outputTopic=test_out > >>>>>>>>> > >>>>>>>>> ------------------------------------------------------------ > >>>>>>>>> The program finished with the following exception: > >>>>>>>>> > >>>>>>>>> org.apache.flink.client.program.ProgramInvocationException: The > main > >>>>>>>>> method caused an error. > >>>>>>>>> at > >>>>>>>>> > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:520) > >>>>>>>>> at > >>>>>>>>> > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403) > >>>>>>>>> at > org.apache.flink.client.program.Client.runBlocking(Client.java:248) > >>>>>>>>> at > >>>>>>>>> > org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866) > >>>>>>>>> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333) > >>>>>>>>> at > >>>>>>>>> > org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189) > >>>>>>>>> at > org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239) > >>>>>>>>> Caused by: java.lang.IllegalArgumentException: Unknown 'runner' > specified > >>>>>>>>> 'FlinkPipelineRunner', supported pipeline runners > >>>>>>>>> [BlockingDataflowPipelineRunner, DataflowPipelineRunner, > >>>>>>>>> DirectPipelineRunner] > >>>>>>>>> at > >>>>>>>>> > com.google.cloud.dataflow.sdk.repackaged.com.google.common.base.Preconditions.checkArgument(Preconditions.java:146) > >>>>>>>>> at > >>>>>>>>> > com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory.parseObjects(PipelineOptionsFactory.java:1445) > >>>>>>>>> at > >>>>>>>>> > com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory.access$400(PipelineOptionsFactory.java:99) > >>>>>>>>> at > >>>>>>>>> > com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory$Builder.as(PipelineOptionsFactory.java:284) > >>>>>>>>> at > >>>>>>>>> > com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample.main(TopHashtagsExample.java:117) > >>>>>>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > >>>>>>>>> at > >>>>>>>>> > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > >>>>>>>>> at > >>>>>>>>> > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > >>>>>>>>> at java.lang.reflect.Method.invoke(Method.java:498) > >>>>>>>>> at > >>>>>>>>> > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505) > >>>>>>>>> ... 6 more > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> On Mar 18, 2016, at 5:00 PM, Dan Halperin <[email protected]> > wrote: > >>>>>>>>> > >>>>>>>>> Thanks for catching that, Aljoscha! > >>>>>>>>> > >>>>>>>>> Note that the Flink runner should be available via a > command-line option > >>>>>>>>> as well: --runner=FlinkPipelineRunner. > >>>>>>>>> > >>>>>>>>> The list of valid values for that flag is computed by walking the > >>>>>>>>> classpath at runtime, so as long as the Flink jar is present > it'll work. > >>>>>>>>> > >>>>>>>>> On Fri, Mar 18, 2016 at 1:21 PM, Aljoscha Krettek < > [email protected]> > >>>>>>>>> wrote: > >>>>>>>>> > >>>>>>>>> Hi, > >>>>>>>>> looks like the example is being executed with the > DirectPipelineRunner > >>>>>>>>> which does not seem to be able to cope with UnboundedSource. You > need to set > >>>>>>>>> the runner to the FlinkRunner in the example code as described > here: > >>>>>>>>> > https://github.com/apache/incubator-beam/tree/master/runners/flink#executing-an-example > >>>>>>>>> > >>>>>>>>> The Flink runner should be able to deal with UnboundedSource but > has the > >>>>>>>>> limitation that sources are always parallelism=1 (this is being > worked on, > >>>>>>>>> however). > >>>>>>>>> > >>>>>>>>> Cheers, > >>>>>>>>> Aljoscha > >>>>>>>>> On 18 Mar 2016, at 20:56, Dan Halperin <[email protected]> > wrote: > >>>>>>>>> > >>>>>>>>> Looks like the Flink runner may not yet support arbitrary code > written > >>>>>>>>> with the UnboundedSource API. That is, it looks like the Flink > runner > >>>>>>>>> expects the sources to get translated away. > >>>>>>>>> > >>>>>>>>> Max? > >>>>>>>>> > >>>>>>>>> Dan > >>>>>>>>> > >>>>>>>>> On Fri, Mar 18, 2016 at 12:20 PM, William McCarthy > >>>>>>>>> <[email protected]> wrote: > >>>>>>>>> Thanks Raghu, > >>>>>>>>> > >>>>>>>>> When I try to run it on flink using the incubator-beam code, i.e. > >>>>>>>>> > >>>>>>>>> flink run -c > >>>>>>>>> > com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample > >>>>>>>>> target/beam-1.0-SNAPSHOT.jar --bootstrapServers=cl-pu4p:9092 > >>>>>>>>> --topics=test_in --outputTopic=test_out > >>>>>>>>> > >>>>>>>>> I get this: > >>>>>>>>> > >>>>>>>>> org.apache.flink.client.program.ProgramInvocationException: The > main > >>>>>>>>> method caused an error. > >>>>>>>>> at > >>>>>>>>> > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:520) > >>>>>>>>> at > >>>>>>>>> > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403) > >>>>>>>>> at > >>>>>>>>> > org.apache.flink.client.program.Client.runBlocking(Client.java:248) > >>>>>>>>> at > >>>>>>>>> > org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866) > >>>>>>>>> at > org.apache.flink.client.CliFrontend.run(CliFrontend.java:333) > >>>>>>>>> at > >>>>>>>>> > org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189) > >>>>>>>>> at > >>>>>>>>> org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239) > >>>>>>>>> Caused by: java.lang.IllegalStateException: no evaluator > registered > >>>>>>>>> for Read(UnboundedKafkaSource) > >>>>>>>>> at > >>>>>>>>> > com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner$Evaluator.visitTransform(DirectPipelineRunner.java:852) > >>>>>>>>> at > >>>>>>>>> > com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:219) > >>>>>>>>> at > >>>>>>>>> > com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:215) > >>>>>>>>> at > >>>>>>>>> > com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:215) > >>>>>>>>> at > >>>>>>>>> > com.google.cloud.dataflow.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:102) > >>>>>>>>> at > >>>>>>>>> > com.google.cloud.dataflow.sdk.Pipeline.traverseTopologically(Pipeline.java:259) > >>>>>>>>> at > >>>>>>>>> > com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner$Evaluator.run(DirectPipelineRunner.java:814) > >>>>>>>>> at > >>>>>>>>> > com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:526) > >>>>>>>>> at > >>>>>>>>> > com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:96) > >>>>>>>>> at > com.google.cloud.dataflow.sdk.Pipeline.run(Pipeline.java:180) > >>>>>>>>> at > >>>>>>>>> > com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample.main(TopHashtagsExample.java:140) > >>>>>>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native > Method) > >>>>>>>>> at > >>>>>>>>> > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > >>>>>>>>> at > >>>>>>>>> > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > >>>>>>>>> at java.lang.reflect.Method.invoke(Method.java:498) > >>>>>>>>> at > >>>>>>>>> > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505) > >>>>>>>>> ... 6 more > >>>>>>>>> > >>>>>>>>> Any ideas? > >>>>>>>>> > >>>>>>>>> Bill > >>>>>>>>> > >>>>>>>>> On Mar 18, 2016, at 2:47 PM, Raghu Angadi <[email protected]> > wrote: > >>>>>>>>> > >>>>>>>>> Thanks for trying it. > >>>>>>>>> > >>>>>>>>> I fixed the CheckStyle error (not sure why my build is not > failing). > >>>>>>>>> Let me know if you see any issues running with Beam. I haven't > tried it. I > >>>>>>>>> should. In fact Daniel Halperin says my patch should be against > Beam.. > >>>>>>>>> > >>>>>>>>> Raghu. > >>>>>>>>> > >>>>>>>>> On Fri, Mar 18, 2016 at 11:22 AM, William McCarthy > >>>>>>>>> <[email protected]> wrote: > >>>>>>>>> Thanks JB, Emanuele, Kostas & Raghu, especially Kostas and Raghu > for > >>>>>>>>> pointing me to working code. > >>>>>>>>> > >>>>>>>>> I’m in the middle of a hack day at the moment, so the speed of > your > >>>>>>>>> responses has been very welcome. > >>>>>>>>> > >>>>>>>>> In the first instance, I’ll try using your changes, Raghu. I’ve > >>>>>>>>> cloned your repo, switched to the kafka branch and built both > contrib/kafka > >>>>>>>>> and contrib/examples/kafka. The contrib/kafka initially failed > with a > >>>>>>>>> CheckStyle error > >>>>>>>>> > (/Users/bill/dev/DataflowJavaSDK/contrib/kafka/src/main/java/com/google/cloud/dataflow/contrib/kafka/KafkaIO.java:683:12: > >>>>>>>>> 'private' modifier out of order with the JLS suggestions)… I’ve > fixed that > >>>>>>>>> in my local clone and now it’s building fine. I hope to be able > to run your > >>>>>>>>> contrib unchanged on top of the incubator-beam codebase, which > will be what > >>>>>>>>> I attempt to do now. > >>>>>>>>> > >>>>>>>>> Thanks again to all, for your swift help. > >>>>>>>>> > >>>>>>>>> Bill > >>>>>>>>> > >>>>>>>>> On Mar 18, 2016, at 12:55 PM, Raghu Angadi <[email protected]> > >>>>>>>>> wrote: > >>>>>>>>> > >>>>>>>>> Hi Bill, > >>>>>>>>> > >>>>>>>>> We have fairly well tested patch for KafkaIO (pr #121). It will > be > >>>>>>>>> merged soon. The example there keeps track of top hashtags in 10 > minute > >>>>>>>>> sliding window and writes the results to another Kafka topic. > Please try it > >>>>>>>>> if you can. It is well tested on Google Cloud Dataflow. I have > not run it > >>>>>>>>> using Flink runner. > >>>>>>>>> > >>>>>>>>> Raghu. > >>>>>>>>> > >>>>>>>>> On Fri, Mar 18, 2016 at 9:46 AM, Kostas Kloudas > >>>>>>>>> <[email protected]> wrote: > >>>>>>>>> Hello Bill, > >>>>>>>>> > >>>>>>>>> This is a known limitation of the Flink Runner. > >>>>>>>>> There is a JIRA issue for that > >>>>>>>>> https://issues.apache.org/jira/browse/BEAM-127 > >>>>>>>>> > >>>>>>>>> A wrapper for Flink sinks will come soon and as Beam evolves, > >>>>>>>>> a more Beam-y solution will come as well. > >>>>>>>>> > >>>>>>>>> Kostas > >>>>>>>>> On Mar 18, 2016, at 5:23 PM, William McCarthy > >>>>>>>>> <[email protected]> wrote: > >>>>>>>>> > >>>>>>>>> Hi, > >>>>>>>>> > >>>>>>>>> I’m trying to write a proof-of-concept which takes messages from > >>>>>>>>> Kafka, transforms them using Beam on Flink, then pushes the > results onto a > >>>>>>>>> different Kafka topic. > >>>>>>>>> > >>>>>>>>> I’ve used the KafkaWindowedWordCountExample as a starting point, > >>>>>>>>> and that’s doing the first part of what I want to do, but it > outputs to text > >>>>>>>>> files as opposed to Kafka. FlinkKafkaProducer08 looks promising, > but I can’t > >>>>>>>>> figure out how to plug it into the pipeline. I was thinking that > it would be > >>>>>>>>> wrapped with an UnboundedFlinkSink, or some such, but that > doesn’t seem to > >>>>>>>>> exist. > >>>>>>>>> > >>>>>>>>> Any advice or thoughts on what I’m trying to do? > >>>>>>>>> > >>>>>>>>> I’m running the latest incubator-beam (as of last night from > >>>>>>>>> Github), Flink 1.0.0 in cluster mode and Kafka 0.9.0.1, all on > Google > >>>>>>>>> Compute Engine (Debian Jessie). > >>>>>>>>> > >>>>>>>>> Thanks, > >>>>>>>>> > >>>>>>>>> Bill McCarthy > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> -- > >>>>>>>>> Jean-Baptiste Onofré > >>>>>>>>> [email protected] > >>>>>>>>> http://blog.nanthrax.net > >>>>>>>>> Talend - http://www.talend.com > >>>>>>>>> > >>>>>>>> > >>>>>>> > >>>>> > >>>> > >
