It should work. Do you have log from one of the workers by any chance? I
will send you another kafka endpoint you could try running against.

Raghu.

On Wed, Mar 23, 2016 at 3:12 PM, William McCarthy <[email protected]
> wrote:

> Raghu,
>
> Sorry, I may have spoken too soon when I said that it “worked”.
>
> The code did not throw any exceptions, and seemed to start up happily. I
> then tried injecting a number of “tweets” (with hashtags) into my test_in
> kafka topic. I’ve been waiting for more than 10 mins, now, and I see
> nothing on my test_out kafka topic. I expected to see the top hashtags to
> show up there within a 10 minute window.
>
> Am I misunderstanding something?
>
> To help debug the issue: Previously, I noticed that if I injected a random
> string into the test_in topic, the Beam job fell over with a Jackson
> JsonMappingException, so it would appear that the job is getting my
> messages. But when I restarted the Beam job and inject 3 correctly formed
> tweet message, something is silently being dropped between there and the
> output kafka topic.
>
> Bill
>
> On Mar 23, 2016, at 5:53 PM, Raghu Angadi <[email protected]> wrote:
>
> great news! Thanks for trying multiple fixes. and thanks for Max and Aljoscha
> multiple fixes.
>
> Raghu
>
> On Wed, Mar 23, 2016 at 2:51 PM, William McCarthy <
> [email protected]> wrote:
>
>> Thanks Max,
>>
>> This command now works, thanks:
>>
>> $ flink run -c
>> com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample
>> target/beam-1.0-SNAPSHOT.jar
>> --runner=org.apache.beam.runners.flink.FlinkPipelineRunner
>> --bootstrapServers=cl-pu4p:9092 --topics=test_in --outputTopic=test_out
>> --streaming=true
>>
>> Note that I’m still unable to use --runner=FlinkPipelineRunner (i.e.
>> without the package scope). That behaviour is unexpected to you, given the
>> test that you put in a few days ago, right? i.e.
>>
>> $ flink run -c
>> com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample
>> target/beam-1.0-SNAPSHOT.jar --runner=FlinkPipelineRunner
>> --bootstrapServers=cl-pu4p:9092 --topics=test_in --outputTopic=test_out
>>
>> ------------------------------------------------------------
>>  The program finished with the following exception:
>>
>> org.apache.flink.client.program.ProgramInvocationException: The main
>> method caused an error.
>>         at
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:520)
>>         at
>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
>>         at
>> org.apache.flink.client.program.Client.runBlocking(Client.java:248)
>>         at
>> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866)
>>         at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
>>         at
>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189)
>>         at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239)
>> Caused by: java.lang.IllegalArgumentException: Unknown 'runner' specified
>> 'FlinkPipelineRunner', supported pipeline runners
>> [BlockingDataflowPipelineRunner, DataflowPipelineRunner,
>> DirectPipelineRunner]
>>         at
>> com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory.parseObjects(PipelineOptionsFactory.java:1486)
>>         at
>> com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory.access$400(PipelineOptionsFactory.java:101)
>>         at
>> com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory$Builder.as(PipelineOptionsFactory.java:286)
>>         at
>> com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample.main(TopHashtagsExample.java:117)
>>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>         at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>         at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>         at java.lang.reflect.Method.invoke(Method.java:498)
>>         at
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
>>         ... 6 more
>> Caused by: java.lang.ClassNotFoundException: FlinkPipelineRunner
>>         at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
>>         at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>>         at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>>         at java.lang.Class.forName0(Native Method)
>>         at java.lang.Class.forName(Class.java:264)
>>         at
>> com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory.parseObjects(PipelineOptionsFactory.java:1473)
>>         ... 14 more
>>
>>
>> > On Mar 23, 2016, at 2:42 PM, Maximilian Michels <[email protected]> wrote:
>> >
>> > Hi William,
>> >
>> > It has been merged. Feel free to try again.
>> >
>> > Cheers,
>> > Max
>> >
>> > On Wed, Mar 23, 2016 at 5:09 PM, Maximilian Michels <[email protected]>
>> wrote:
>> >> The pull request is here:
>> https://github.com/apache/incubator-beam/pull/69
>> >>
>> >> I'll merge it after the tests have passed and it has been reviewed.
>> >>
>> >> On Wed, Mar 23, 2016 at 3:12 PM, Maximilian Michels <[email protected]>
>> wrote:
>> >>> Hi William,
>> >>>
>> >>> I started working on a fix and better testing of the
>> >>> UnboundedSourceWrapper. I'll get back to you shortly with a pull
>> >>> request that we should be able to merge soon.
>> >>>
>> >>> - Max
>> >>>
>> >>> On Wed, Mar 23, 2016 at 1:33 PM, Aljoscha Krettek <
>> [email protected]> wrote:
>> >>>> Hi,
>> >>>> as far as I can see from a quick glance the problem is that
>> UnboundedSourceWrapper stores an instance of Reader that it gets from the
>> Source. The Reader is not Serializable while the UnboundedSource is. I
>> think the Reader should be initialized when actually running the source, in
>> the run() method.
>> >>>>
>> >>>> Cheers,
>> >>>> Aljoscha
>> >>>>> On 23 Mar 2016, at 13:07, William McCarthy <
>> [email protected]> wrote:
>> >>>>>
>> >>>>> The first stack trace was from the latest runner, yes. I’ve pulled
>> the very latest, just now, and still get the same thing.
>> >>>>>
>> >>>>> When I write ‘pulled the very latest’, here’s what I mean (all of
>> the following commands, except the latest, finished with success):
>> >>>>>
>> >>>>> $ cd incubator-beam
>> >>>>> $ git pull
>> >>>>> …some output, then success
>> >>>>> $ git branch
>> >>>>> * master
>> >>>>> $ mvn -DskipTests clean install
>> >>>>> …some output, then success
>> >>>>> $ cd <my project>
>> >>>>> $ mvn -DskipTests clean install
>> >>>>> …some output, then success
>> >>>>> $ <command as output below>
>> >>>>>
>> >>>>> Bill
>> >>>>>
>> >>>>>> On Mar 22, 2016, at 5:29 AM, Maximilian Michels <[email protected]>
>> wrote:
>> >>>>>>
>> >>>>>> Hi William,
>> >>>>>>
>> >>>>>> Is the first stack trace from the latest master? Using only the
>> simple
>> >>>>>> class name of the Runner should actually work now. I've also added
>> a
>> >>>>>> test to explicitly test that.
>> >>>>>>
>> >>>>>> The latter error, as Aljoscha pointed out, this due to batch
>> >>>>>> execution. Perhaps we could make it explicit during startup which
>> mode
>> >>>>>> we're executing in. File a JIRA issue for that:
>> >>>>>> https://issues.apache.org/jira/browse/BEAM-139
>> >>>>>>
>> >>>>>> - Max
>> >>>>>>
>> >>>>>> On Tue, Mar 22, 2016 at 10:00 AM, Aljoscha Krettek <
>> [email protected]> wrote:
>> >>>>>>> Hi,
>> >>>>>>> for Flink the runner can internally either translate to a batch
>> job or a streaming job. Unbounded sources are not supported when running in
>> batch mode so you have to somehow specify that you want to have streaming
>> mode. StreamingOptions has method setStreaming, maybe you can specify
>> “—streaming true” on the command line to set that flag.
>> >>>>>>>
>> >>>>>>> Cheers,
>> >>>>>>> Aljoscha
>> >>>>>>>> On 21 Mar 2016, at 23:39, William McCarthy <
>> [email protected]> wrote:
>> >>>>>>>>
>> >>>>>>>> I’ve attempted to run the TopHashtagsExample again, using both
>> ‘FlinkPipelineRunner’ and it's fully qualified name. Both bomb out, though
>> the latter gets further. I hope this helps, here is the output of both:
>> >>>>>>>>
>> >>>>>>>> $ flink run -c
>> com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample
>> target/beam-1.0-SNAPSHOT.jar --runner=FlinkPipelineRunner
>> --bootstrapServers=cl-pu4p:9092 --topics=test_in --outputTopic=test_out
>> >>>>>>>>
>> >>>>>>>> ------------------------------------------------------------
>> >>>>>>>> The program finished with the following exception:
>> >>>>>>>>
>> >>>>>>>> org.apache.flink.client.program.ProgramInvocationException: The
>> main method caused an error.
>> >>>>>>>>    at
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:520)
>> >>>>>>>>    at
>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
>> >>>>>>>>    at
>> org.apache.flink.client.program.Client.runBlocking(Client.java:248)
>> >>>>>>>>    at
>> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866)
>> >>>>>>>>    at
>> org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
>> >>>>>>>>    at
>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189)
>> >>>>>>>>    at
>> org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239)
>> >>>>>>>> Caused by: java.lang.IllegalArgumentException: Unknown 'runner'
>> specified 'FlinkPipelineRunner', supported pipeline runners
>> [BlockingDataflowPipelineRunner, DataflowPipelineRunner,
>> DirectPipelineRunner]
>> >>>>>>>>    at
>> com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory.parseObjects(PipelineOptionsFactory.java:1486)
>> >>>>>>>>    at
>> com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory.access$400(PipelineOptionsFactory.java:101)
>> >>>>>>>>    at
>> com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory$Builder.as(PipelineOptionsFactory.java:286)
>> >>>>>>>>    at
>> com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample.main(TopHashtagsExample.java:117)
>> >>>>>>>>    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> >>>>>>>>    at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> >>>>>>>>    at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> >>>>>>>>    at java.lang.reflect.Method.invoke(Method.java:498)
>> >>>>>>>>    at
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
>> >>>>>>>>    ... 6 more
>> >>>>>>>> Caused by: java.lang.ClassNotFoundException: FlinkPipelineRunner
>> >>>>>>>>    at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
>> >>>>>>>>    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>> >>>>>>>>    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>> >>>>>>>>    at java.lang.Class.forName0(Native Method)
>> >>>>>>>>    at java.lang.Class.forName(Class.java:264)
>> >>>>>>>>    at
>> com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory.parseObjects(PipelineOptionsFactory.java:1473)
>> >>>>>>>>    ... 14 more
>> >>>>>>>>
>> >>>>>>>>
>> >>>>>>>> $ flink run -c
>> com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample
>> target/beam-1.0-SNAPSHOT.jar
>> --runner=org.apache.beam.runners.flink.FlinkPipelineRunner
>> --bootstrapServers=cl-pu4p:9092 --topics=test_in --outputTopic=test_out
>> >>>>>>>>
>> >>>>>>>> ------------------------------------------------------------
>> >>>>>>>> The program finished with the following exception:
>> >>>>>>>>
>> >>>>>>>> org.apache.flink.client.program.ProgramInvocationException: The
>> main method caused an error.
>> >>>>>>>>    at
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:520)
>> >>>>>>>>    at
>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
>> >>>>>>>>    at
>> org.apache.flink.client.program.Client.runBlocking(Client.java:248)
>> >>>>>>>>    at
>> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866)
>> >>>>>>>>    at
>> org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
>> >>>>>>>>    at
>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189)
>> >>>>>>>>    at
>> org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239)
>> >>>>>>>> Caused by: java.lang.UnsupportedOperationException: The
>> transform Read(UnboundedKafkaSource) is currently not supported.
>> >>>>>>>>    at
>> org.apache.beam.runners.flink.translation.FlinkBatchPipelineTranslator.visitTransform(FlinkBatchPipelineTranslator.java:111)
>> >>>>>>>>    at
>> com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:219)
>> >>>>>>>>    at
>> com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:215)
>> >>>>>>>>    at
>> com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:215)
>> >>>>>>>>    at
>> com.google.cloud.dataflow.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:102)
>> >>>>>>>>    at
>> com.google.cloud.dataflow.sdk.Pipeline.traverseTopologically(Pipeline.java:259)
>> >>>>>>>>    at
>> org.apache.beam.runners.flink.translation.FlinkPipelineTranslator.translate(FlinkPipelineTranslator.java:34)
>> >>>>>>>>    at
>> org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment.translate(FlinkPipelineExecutionEnvironment.java:130)
>> >>>>>>>>    at
>> org.apache.beam.runners.flink.FlinkPipelineRunner.run(FlinkPipelineRunner.java:109)
>> >>>>>>>>    at
>> org.apache.beam.runners.flink.FlinkPipelineRunner.run(FlinkPipelineRunner.java:50)
>> >>>>>>>>    at
>> com.google.cloud.dataflow.sdk.Pipeline.run(Pipeline.java:180)
>> >>>>>>>>    at
>> com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample.main(TopHashtagsExample.java:140)
>> >>>>>>>>    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> >>>>>>>>    at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> >>>>>>>>    at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> >>>>>>>>    at java.lang.reflect.Method.invoke(Method.java:498)
>> >>>>>>>>    at
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
>> >>>>>>>>    ... 6 more
>> >>>>>>>>
>> >>>>>>>>
>> >>>>>>>>> On Mar 21, 2016, at 12:23 PM, Raghu Angadi <[email protected]>
>> wrote:
>> >>>>>>>>>
>> >>>>>>>>> Thanks Max.
>> >>>>>>>>>
>> >>>>>>>>> Bill McCarthy,
>> >>>>>>>>> I know you are unblocked and KafkaWriter is good enough. Please
>> try KafkaIO source from my branch with Flink runner if you get a chance.
>> >>>>>>>>>
>> >>>>>>>>> thanks,
>> >>>>>>>>> Raghu.
>> >>>>>>>>>
>> >>>>>>>>> On Mon, Mar 21, 2016 at 6:54 AM, Jean-Baptiste Onofré <
>> [email protected]> wrote:
>> >>>>>>>>> Thanks for the update Max !
>> >>>>>>>>>
>> >>>>>>>>> Regards
>> >>>>>>>>> JB
>> >>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>> On 03/21/2016 02:39 PM, Maximilian Michels wrote:
>> >>>>>>>>> FYI: The Runner registration has been fixed. The Flink runner
>> >>>>>>>>> explicitly registers as of [1]. Also, the SDK tries to look up
>> the
>> >>>>>>>>> PipelineRunner class in case it has not been registered [2].
>> >>>>>>>>>
>> >>>>>>>>> [1] https://github.com/apache/incubator-beam/pull/40
>> >>>>>>>>> [2] https://github.com/apache/incubator-beam/pull/61
>> >>>>>>>>>
>> >>>>>>>>> On Sat, Mar 19, 2016 at 6:43 PM, Maximilian Michels <
>> [email protected]> wrote:
>> >>>>>>>>> Great to see such a lively discussion here.
>> >>>>>>>>>
>> >>>>>>>>> I think we'll support sinks through the Write interface (like in
>> >>>>>>>>> batched execution) and also have a dedicated wrapper for the
>> Flink
>> >>>>>>>>> sinks. This is a very pressing but easy to solve issue of the
>> Flink
>> >>>>>>>>> runner. Expect it to be in next week.
>> >>>>>>>>>
>> >>>>>>>>> Also, the proper registration of the runner is about to to be
>> merged.
>> >>>>>>>>> We just need an ok from the contributor to merge the changes.
>> >>>>>>>>>
>> >>>>>>>>> Best,
>> >>>>>>>>> Max
>> >>>>>>>>>
>> >>>>>>>>> On Sat, Mar 19, 2016 at 12:42 AM, Dan Halperin <
>> [email protected]> wrote:
>> >>>>>>>>> Thanks Bill!
>> >>>>>>>>>
>> >>>>>>>>> Filed https://issues.apache.org/jira/browse/BEAM-136, but I'm
>> glad it's not
>> >>>>>>>>> blocking you!
>> >>>>>>>>>
>> >>>>>>>>> On Fri, Mar 18, 2016 at 4:04 PM, William McCarthy
>> >>>>>>>>> <[email protected]> wrote:
>> >>>>>>>>>
>> >>>>>>>>> I tried that, but still no dice: Just to be clear, it’s not a
>> blocker for
>> >>>>>>>>> me, given that I have my example running, but for your
>> information the
>> >>>>>>>>> exception is below.
>> >>>>>>>>>
>> >>>>>>>>> I’ll watch the commit log on the beam incubator and look
>> forward to
>> >>>>>>>>> deleting my copy of Raghu’s contributions when they’re merger
>> to master.
>> >>>>>>>>>
>> >>>>>>>>> Thanks again for everyone’s help,
>> >>>>>>>>>
>> >>>>>>>>> Bill
>> >>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>> Command followed by exception:
>> >>>>>>>>>
>> >>>>>>>>> $ flink run -c
>> >>>>>>>>>
>> com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample
>> >>>>>>>>> target/beam-1.0-SNAPSHOT.jar
>> >>>>>>>>> --runner=org.apache.beam.runners.flink.FlinkPipelineRunner
>> >>>>>>>>> --bootstrapServers=cl-pu4p:9092 --topics=test_in
>> --outputTopic=test_out
>> >>>>>>>>>
>> >>>>>>>>> ------------------------------------------------------------
>> >>>>>>>>> The program finished with the following exception:
>> >>>>>>>>>
>> >>>>>>>>> org.apache.flink.client.program.ProgramInvocationException: The
>> main
>> >>>>>>>>> method caused an error.
>> >>>>>>>>> at
>> >>>>>>>>>
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:520)
>> >>>>>>>>> at
>> >>>>>>>>>
>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
>> >>>>>>>>> at
>> org.apache.flink.client.program.Client.runBlocking(Client.java:248)
>> >>>>>>>>> at
>> >>>>>>>>>
>> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866)
>> >>>>>>>>> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
>> >>>>>>>>> at
>> >>>>>>>>>
>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189)
>> >>>>>>>>> at
>> org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239)
>> >>>>>>>>> Caused by: java.lang.IllegalArgumentException: Unknown 'runner'
>> specified
>> >>>>>>>>> 'org.apache.beam.runners.flink.FlinkPipelineRunner', supported
>> pipeline
>> >>>>>>>>> runners [BlockingDataflowPipelineRunner, DataflowPipelineRunner,
>> >>>>>>>>> DirectPipelineRunner]
>> >>>>>>>>> at
>> >>>>>>>>>
>> com.google.cloud.dataflow.sdk.repackaged.com.google.common.base.Preconditions.checkArgument(Preconditions.java:146)
>> >>>>>>>>> at
>> >>>>>>>>>
>> com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory.parseObjects(PipelineOptionsFactory.java:1445)
>> >>>>>>>>> at
>> >>>>>>>>>
>> com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory.access$400(PipelineOptionsFactory.java:99)
>> >>>>>>>>> at
>> >>>>>>>>>
>> com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory$Builder.as(PipelineOptionsFactory.java:284)
>> >>>>>>>>> at
>> >>>>>>>>>
>> com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample.main(TopHashtagsExample.java:117)
>> >>>>>>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> >>>>>>>>> at
>> >>>>>>>>>
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> >>>>>>>>> at
>> >>>>>>>>>
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> >>>>>>>>> at java.lang.reflect.Method.invoke(Method.java:498)
>> >>>>>>>>> at
>> >>>>>>>>>
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
>> >>>>>>>>> ... 6 more
>> >>>>>>>>>
>> >>>>>>>>> On Mar 18, 2016, at 5:35 PM, Thomas Groh <[email protected]>
>> wrote:
>> >>>>>>>>>
>> >>>>>>>>> I don't believe the FlinkPipelineRunner is registered the same
>> way the
>> >>>>>>>>> Dataflow & Direct Pipeline runners are registered; using
>> >>>>>>>>> org.apache.beam.runners.flink.FlinkPipelineRunner should work
>> >>>>>>>>>
>> >>>>>>>>> On Fri, Mar 18, 2016 at 2:10 PM, William McCarthy
>> >>>>>>>>> <[email protected]> wrote:
>> >>>>>>>>>
>> >>>>>>>>> Thanks Dan,
>> >>>>>>>>>
>> >>>>>>>>> I tried that, but getting the below. Note that the jar contains
>> the
>> >>>>>>>>> FlinkPipelineRunner.
>> >>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>> % jar -tf target/beam-1.0-SNAPSHOT.jar | grep FlinkPipeline
>> >>>>>>>>> org/apache/beam/runners/flink/FlinkPipelineRunner.class
>> >>>>>>>>>
>> org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.class
>> >>>>>>>>> org/apache/beam/runners/flink/FlinkPipelineOptions.class
>> >>>>>>>>>
>> org/apache/beam/runners/flink/translation/FlinkPipelineTranslator.class
>> >>>>>>>>>
>> >>>>>>>>> % flink run -c
>> >>>>>>>>>
>> com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample
>> >>>>>>>>> target/beam-1.0-SNAPSHOT.jar --runner=FlinkPipelineRunner
>> >>>>>>>>> --bootstrapServers=cl-pu4p:9092 --topics=test_in
>> --outputTopic=test_out
>> >>>>>>>>>
>> >>>>>>>>> ------------------------------------------------------------
>> >>>>>>>>> The program finished with the following exception:
>> >>>>>>>>>
>> >>>>>>>>> org.apache.flink.client.program.ProgramInvocationException: The
>> main
>> >>>>>>>>> method caused an error.
>> >>>>>>>>> at
>> >>>>>>>>>
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:520)
>> >>>>>>>>> at
>> >>>>>>>>>
>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
>> >>>>>>>>> at
>> org.apache.flink.client.program.Client.runBlocking(Client.java:248)
>> >>>>>>>>> at
>> >>>>>>>>>
>> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866)
>> >>>>>>>>> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
>> >>>>>>>>> at
>> >>>>>>>>>
>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189)
>> >>>>>>>>> at
>> org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239)
>> >>>>>>>>> Caused by: java.lang.IllegalArgumentException: Unknown 'runner'
>> specified
>> >>>>>>>>> 'FlinkPipelineRunner', supported pipeline runners
>> >>>>>>>>> [BlockingDataflowPipelineRunner, DataflowPipelineRunner,
>> >>>>>>>>> DirectPipelineRunner]
>> >>>>>>>>> at
>> >>>>>>>>>
>> com.google.cloud.dataflow.sdk.repackaged.com.google.common.base.Preconditions.checkArgument(Preconditions.java:146)
>> >>>>>>>>> at
>> >>>>>>>>>
>> com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory.parseObjects(PipelineOptionsFactory.java:1445)
>> >>>>>>>>> at
>> >>>>>>>>>
>> com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory.access$400(PipelineOptionsFactory.java:99)
>> >>>>>>>>> at
>> >>>>>>>>>
>> com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory$Builder.as(PipelineOptionsFactory.java:284)
>> >>>>>>>>> at
>> >>>>>>>>>
>> com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample.main(TopHashtagsExample.java:117)
>> >>>>>>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> >>>>>>>>> at
>> >>>>>>>>>
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> >>>>>>>>> at
>> >>>>>>>>>
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> >>>>>>>>> at java.lang.reflect.Method.invoke(Method.java:498)
>> >>>>>>>>> at
>> >>>>>>>>>
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
>> >>>>>>>>> ... 6 more
>> >>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>> On Mar 18, 2016, at 5:00 PM, Dan Halperin <[email protected]>
>> wrote:
>> >>>>>>>>>
>> >>>>>>>>> Thanks for catching that, Aljoscha!
>> >>>>>>>>>
>> >>>>>>>>> Note that the Flink runner should be available via a
>> command-line option
>> >>>>>>>>> as well: --runner=FlinkPipelineRunner.
>> >>>>>>>>>
>> >>>>>>>>> The list of valid values for that flag is computed by walking
>> the
>> >>>>>>>>> classpath at runtime, so as long as the Flink jar is present
>> it'll work.
>> >>>>>>>>>
>> >>>>>>>>> On Fri, Mar 18, 2016 at 1:21 PM, Aljoscha Krettek <
>> [email protected]>
>> >>>>>>>>> wrote:
>> >>>>>>>>>
>> >>>>>>>>> Hi,
>> >>>>>>>>> looks like the example is being executed with the
>> DirectPipelineRunner
>> >>>>>>>>> which does not seem to be able to cope with UnboundedSource.
>> You need to set
>> >>>>>>>>> the runner to the FlinkRunner in the example code as described
>> here:
>> >>>>>>>>>
>> https://github.com/apache/incubator-beam/tree/master/runners/flink#executing-an-example
>> >>>>>>>>>
>> >>>>>>>>> The Flink runner should be able to deal with UnboundedSource
>> but has the
>> >>>>>>>>> limitation that sources are always parallelism=1 (this is being
>> worked on,
>> >>>>>>>>> however).
>> >>>>>>>>>
>> >>>>>>>>> Cheers,
>> >>>>>>>>> Aljoscha
>> >>>>>>>>> On 18 Mar 2016, at 20:56, Dan Halperin <[email protected]>
>> wrote:
>> >>>>>>>>>
>> >>>>>>>>> Looks like the Flink runner may not yet support arbitrary code
>> written
>> >>>>>>>>> with the UnboundedSource API. That is, it looks like the Flink
>> runner
>> >>>>>>>>> expects the sources to get translated away.
>> >>>>>>>>>
>> >>>>>>>>> Max?
>> >>>>>>>>>
>> >>>>>>>>> Dan
>> >>>>>>>>>
>> >>>>>>>>> On Fri, Mar 18, 2016 at 12:20 PM, William McCarthy
>> >>>>>>>>> <[email protected]> wrote:
>> >>>>>>>>> Thanks Raghu,
>> >>>>>>>>>
>> >>>>>>>>> When I try to run it on flink using the incubator-beam code,
>> i.e.
>> >>>>>>>>>
>> >>>>>>>>> flink run -c
>> >>>>>>>>>
>> com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample
>> >>>>>>>>> target/beam-1.0-SNAPSHOT.jar --bootstrapServers=cl-pu4p:9092
>> >>>>>>>>> --topics=test_in --outputTopic=test_out
>> >>>>>>>>>
>> >>>>>>>>> I get this:
>> >>>>>>>>>
>> >>>>>>>>> org.apache.flink.client.program.ProgramInvocationException: The
>> main
>> >>>>>>>>> method caused an error.
>> >>>>>>>>>     at
>> >>>>>>>>>
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:520)
>> >>>>>>>>>     at
>> >>>>>>>>>
>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
>> >>>>>>>>>     at
>> >>>>>>>>>
>> org.apache.flink.client.program.Client.runBlocking(Client.java:248)
>> >>>>>>>>>     at
>> >>>>>>>>>
>> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866)
>> >>>>>>>>>     at
>> org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
>> >>>>>>>>>     at
>> >>>>>>>>>
>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189)
>> >>>>>>>>>     at
>> >>>>>>>>> org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239)
>> >>>>>>>>> Caused by: java.lang.IllegalStateException: no evaluator
>> registered
>> >>>>>>>>> for Read(UnboundedKafkaSource)
>> >>>>>>>>>     at
>> >>>>>>>>>
>> com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner$Evaluator.visitTransform(DirectPipelineRunner.java:852)
>> >>>>>>>>>     at
>> >>>>>>>>>
>> com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:219)
>> >>>>>>>>>     at
>> >>>>>>>>>
>> com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:215)
>> >>>>>>>>>     at
>> >>>>>>>>>
>> com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:215)
>> >>>>>>>>>     at
>> >>>>>>>>>
>> com.google.cloud.dataflow.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:102)
>> >>>>>>>>>     at
>> >>>>>>>>>
>> com.google.cloud.dataflow.sdk.Pipeline.traverseTopologically(Pipeline.java:259)
>> >>>>>>>>>     at
>> >>>>>>>>>
>> com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner$Evaluator.run(DirectPipelineRunner.java:814)
>> >>>>>>>>>     at
>> >>>>>>>>>
>> com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:526)
>> >>>>>>>>>     at
>> >>>>>>>>>
>> com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:96)
>> >>>>>>>>>     at
>> com.google.cloud.dataflow.sdk.Pipeline.run(Pipeline.java:180)
>> >>>>>>>>>     at
>> >>>>>>>>>
>> com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample.main(TopHashtagsExample.java:140)
>> >>>>>>>>>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native
>> Method)
>> >>>>>>>>>     at
>> >>>>>>>>>
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> >>>>>>>>>     at
>> >>>>>>>>>
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> >>>>>>>>>     at java.lang.reflect.Method.invoke(Method.java:498)
>> >>>>>>>>>     at
>> >>>>>>>>>
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
>> >>>>>>>>>     ... 6 more
>> >>>>>>>>>
>> >>>>>>>>> Any ideas?
>> >>>>>>>>>
>> >>>>>>>>> Bill
>> >>>>>>>>>
>> >>>>>>>>> On Mar 18, 2016, at 2:47 PM, Raghu Angadi <[email protected]>
>> wrote:
>> >>>>>>>>>
>> >>>>>>>>> Thanks for trying it.
>> >>>>>>>>>
>> >>>>>>>>> I fixed the CheckStyle error  (not sure why my build is not
>> failing).
>> >>>>>>>>> Let me know if you see any issues running with Beam. I haven't
>> tried it. I
>> >>>>>>>>> should. In fact Daniel Halperin says my patch should be against
>> Beam..
>> >>>>>>>>>
>> >>>>>>>>> Raghu.
>> >>>>>>>>>
>> >>>>>>>>> On Fri, Mar 18, 2016 at 11:22 AM, William McCarthy
>> >>>>>>>>> <[email protected]> wrote:
>> >>>>>>>>> Thanks JB, Emanuele, Kostas & Raghu, especially Kostas and
>> Raghu for
>> >>>>>>>>> pointing me to working code.
>> >>>>>>>>>
>> >>>>>>>>> I’m in the middle of a hack day at the moment, so the speed of
>> your
>> >>>>>>>>> responses has been very welcome.
>> >>>>>>>>>
>> >>>>>>>>> In the first instance, I’ll try using your changes, Raghu. I’ve
>> >>>>>>>>> cloned your repo, switched to the kafka branch and built both
>> contrib/kafka
>> >>>>>>>>> and contrib/examples/kafka. The contrib/kafka initially failed
>> with a
>> >>>>>>>>> CheckStyle error
>> >>>>>>>>>
>> (/Users/bill/dev/DataflowJavaSDK/contrib/kafka/src/main/java/com/google/cloud/dataflow/contrib/kafka/KafkaIO.java:683:12:
>> >>>>>>>>> 'private' modifier out of order with the JLS suggestions)… I’ve
>> fixed that
>> >>>>>>>>> in my local clone and now it’s building fine. I hope to be able
>> to run your
>> >>>>>>>>> contrib unchanged on top of the incubator-beam codebase, which
>> will be what
>> >>>>>>>>> I attempt to do now.
>> >>>>>>>>>
>> >>>>>>>>> Thanks again to all, for your swift help.
>> >>>>>>>>>
>> >>>>>>>>> Bill
>> >>>>>>>>>
>> >>>>>>>>> On Mar 18, 2016, at 12:55 PM, Raghu Angadi <[email protected]>
>> >>>>>>>>> wrote:
>> >>>>>>>>>
>> >>>>>>>>> Hi Bill,
>> >>>>>>>>>
>> >>>>>>>>> We have fairly well tested patch for KafkaIO (pr #121). It will
>> be
>> >>>>>>>>> merged soon. The example there keeps track of top hashtags in
>> 10 minute
>> >>>>>>>>> sliding window and writes the results to another Kafka topic.
>> Please try it
>> >>>>>>>>> if you can. It is well tested on Google Cloud Dataflow. I have
>> not run it
>> >>>>>>>>> using Flink runner.
>> >>>>>>>>>
>> >>>>>>>>> Raghu.
>> >>>>>>>>>
>> >>>>>>>>> On Fri, Mar 18, 2016 at 9:46 AM, Kostas Kloudas
>> >>>>>>>>> <[email protected]> wrote:
>> >>>>>>>>> Hello Bill,
>> >>>>>>>>>
>> >>>>>>>>> This is a known limitation of the Flink Runner.
>> >>>>>>>>> There is a JIRA issue for that
>> >>>>>>>>> https://issues.apache.org/jira/browse/BEAM-127
>> >>>>>>>>>
>> >>>>>>>>> A wrapper for Flink sinks will come soon and as Beam evolves,
>> >>>>>>>>> a more Beam-y solution will come as well.
>> >>>>>>>>>
>> >>>>>>>>> Kostas
>> >>>>>>>>> On Mar 18, 2016, at 5:23 PM, William McCarthy
>> >>>>>>>>> <[email protected]> wrote:
>> >>>>>>>>>
>> >>>>>>>>> Hi,
>> >>>>>>>>>
>> >>>>>>>>> I’m trying to write a proof-of-concept which takes messages from
>> >>>>>>>>> Kafka, transforms them using Beam on Flink, then pushes the
>> results onto a
>> >>>>>>>>> different Kafka topic.
>> >>>>>>>>>
>> >>>>>>>>> I’ve used the KafkaWindowedWordCountExample as a starting point,
>> >>>>>>>>> and that’s doing the first part of what I want to do, but it
>> outputs to text
>> >>>>>>>>> files as opposed to Kafka. FlinkKafkaProducer08 looks
>> promising, but I can’t
>> >>>>>>>>> figure out how to plug it into the pipeline. I was thinking
>> that it would be
>> >>>>>>>>> wrapped with an UnboundedFlinkSink, or some such, but that
>> doesn’t seem to
>> >>>>>>>>> exist.
>> >>>>>>>>>
>> >>>>>>>>> Any advice or thoughts on what I’m trying to do?
>> >>>>>>>>>
>> >>>>>>>>> I’m running the latest incubator-beam (as of last night from
>> >>>>>>>>> Github), Flink 1.0.0 in cluster mode and Kafka 0.9.0.1, all on
>> Google
>> >>>>>>>>> Compute Engine (Debian Jessie).
>> >>>>>>>>>
>> >>>>>>>>> Thanks,
>> >>>>>>>>>
>> >>>>>>>>> Bill McCarthy
>> >>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>> --
>> >>>>>>>>> Jean-Baptiste Onofré
>> >>>>>>>>> [email protected]
>> >>>>>>>>> http://blog.nanthrax.net
>> >>>>>>>>> Talend - http://www.talend.com
>> >>>>>>>>>
>> >>>>>>>>
>> >>>>>>>
>> >>>>>
>> >>>>
>>
>>
>
>

Reply via email to