Thanks Ajioscha,

I’ve pulled the latest master and rerun with —streaming=true. Below is the 
output that I now get.

Bill

$ flink run -c 
com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample 
target/beam-1.0-SNAPSHOT.jar 
--runner=org.apache.beam.runners.flink.FlinkPipelineRunner 
--bootstrapServers=cl-pu4p:9092 --topics=test_in --outputTopic=test_out 
--streaming=true

------------------------------------------------------------
 The program finished with the following exception:

Object 
org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSourceWrapper@2dbd803f
 not serializable
        
org.apache.flink.api.java.ClosureCleaner.ensureSerializable(ClosureCleaner.java:99)
        org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:61)
        
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1219)
        
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1131)
        
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1075)
        
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1057)
        
org.apache.beam.runners.flink.translation.FlinkStreamingTransformTranslators$UnboundedReadSourceTranslator.translateNode(FlinkStreamingTransformTranslators.java:186)
        
org.apache.beam.runners.flink.translation.FlinkStreamingTransformTranslators$UnboundedReadSourceTranslator.translateNode(FlinkStreamingTransformTranslators.java:167)
        
org.apache.beam.runners.flink.translation.FlinkStreamingPipelineTranslator.applyStreamingTransform(FlinkStreamingPipelineTranslator.java:127)
        
org.apache.beam.runners.flink.translation.FlinkStreamingPipelineTranslator.visitTransform(FlinkStreamingPipelineTranslator.java:108)
        
com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:219)
        
com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:215)
        
com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:215)
        
com.google.cloud.dataflow.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:102)
        
com.google.cloud.dataflow.sdk.Pipeline.traverseTopologically(Pipeline.java:259)
        
org.apache.beam.runners.flink.translation.FlinkPipelineTranslator.translate(FlinkPipelineTranslator.java:34)
        
org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment.translate(FlinkPipelineExecutionEnvironment.java:130)
        
org.apache.beam.runners.flink.FlinkPipelineRunner.run(FlinkPipelineRunner.java:109)
        
org.apache.beam.runners.flink.FlinkPipelineRunner.run(FlinkPipelineRunner.java:50)
        com.google.cloud.dataflow.sdk.Pipeline.run(Pipeline.java:180)
        
com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample.main(TopHashtagsExample.java:140)

> On Mar 22, 2016, at 5:00 AM, Aljoscha Krettek <[email protected]> wrote:
> 
> Hi,
> for Flink the runner can internally either translate to a batch job or a 
> streaming job. Unbounded sources are not supported when running in batch mode 
> so you have to somehow specify that you want to have streaming mode. 
> StreamingOptions has method setStreaming, maybe you can specify “—streaming 
> true” on the command line to set that flag.
> 
> Cheers,
> Aljoscha
>> On 21 Mar 2016, at 23:39, William McCarthy <[email protected]> wrote:
>> 
>> I’ve attempted to run the TopHashtagsExample again, using both 
>> ‘FlinkPipelineRunner’ and it's fully qualified name. Both bomb out, though 
>> the latter gets further. I hope this helps, here is the output of both:
>> 
>> $ flink run -c 
>> com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample 
>> target/beam-1.0-SNAPSHOT.jar --runner=FlinkPipelineRunner 
>> --bootstrapServers=cl-pu4p:9092 --topics=test_in --outputTopic=test_out
>> 
>> ------------------------------------------------------------
>> The program finished with the following exception:
>> 
>> org.apache.flink.client.program.ProgramInvocationException: The main method 
>> caused an error.
>>      at 
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:520)
>>      at 
>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
>>      at org.apache.flink.client.program.Client.runBlocking(Client.java:248)
>>      at 
>> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866)
>>      at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
>>      at 
>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189)
>>      at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239)
>> Caused by: java.lang.IllegalArgumentException: Unknown 'runner' specified 
>> 'FlinkPipelineRunner', supported pipeline runners 
>> [BlockingDataflowPipelineRunner, DataflowPipelineRunner, 
>> DirectPipelineRunner]
>>      at 
>> com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory.parseObjects(PipelineOptionsFactory.java:1486)
>>      at 
>> com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory.access$400(PipelineOptionsFactory.java:101)
>>      at 
>> com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory$Builder.as(PipelineOptionsFactory.java:286)
>>      at 
>> com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample.main(TopHashtagsExample.java:117)
>>      at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>      at 
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>      at 
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>      at java.lang.reflect.Method.invoke(Method.java:498)
>>      at 
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
>>      ... 6 more
>> Caused by: java.lang.ClassNotFoundException: FlinkPipelineRunner
>>      at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
>>      at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>>      at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>>      at java.lang.Class.forName0(Native Method)
>>      at java.lang.Class.forName(Class.java:264)
>>      at 
>> com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory.parseObjects(PipelineOptionsFactory.java:1473)
>>      ... 14 more
>> 
>> 
>> $ flink run -c 
>> com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample 
>> target/beam-1.0-SNAPSHOT.jar 
>> --runner=org.apache.beam.runners.flink.FlinkPipelineRunner 
>> --bootstrapServers=cl-pu4p:9092 --topics=test_in --outputTopic=test_out
>> 
>> ------------------------------------------------------------
>> The program finished with the following exception:
>> 
>> org.apache.flink.client.program.ProgramInvocationException: The main method 
>> caused an error.
>>      at 
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:520)
>>      at 
>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
>>      at org.apache.flink.client.program.Client.runBlocking(Client.java:248)
>>      at 
>> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866)
>>      at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
>>      at 
>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189)
>>      at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239)
>> Caused by: java.lang.UnsupportedOperationException: The transform 
>> Read(UnboundedKafkaSource) is currently not supported.
>>      at 
>> org.apache.beam.runners.flink.translation.FlinkBatchPipelineTranslator.visitTransform(FlinkBatchPipelineTranslator.java:111)
>>      at 
>> com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:219)
>>      at 
>> com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:215)
>>      at 
>> com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:215)
>>      at 
>> com.google.cloud.dataflow.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:102)
>>      at 
>> com.google.cloud.dataflow.sdk.Pipeline.traverseTopologically(Pipeline.java:259)
>>      at 
>> org.apache.beam.runners.flink.translation.FlinkPipelineTranslator.translate(FlinkPipelineTranslator.java:34)
>>      at 
>> org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment.translate(FlinkPipelineExecutionEnvironment.java:130)
>>      at 
>> org.apache.beam.runners.flink.FlinkPipelineRunner.run(FlinkPipelineRunner.java:109)
>>      at 
>> org.apache.beam.runners.flink.FlinkPipelineRunner.run(FlinkPipelineRunner.java:50)
>>      at com.google.cloud.dataflow.sdk.Pipeline.run(Pipeline.java:180)
>>      at 
>> com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample.main(TopHashtagsExample.java:140)
>>      at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>      at 
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>      at 
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>      at java.lang.reflect.Method.invoke(Method.java:498)
>>      at 
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
>>      ... 6 more
>> 
>> 
>>> On Mar 21, 2016, at 12:23 PM, Raghu Angadi <[email protected]> wrote:
>>> 
>>> Thanks Max.
>>> 
>>> Bill McCarthy, 
>>> I know you are unblocked and KafkaWriter is good enough. Please try KafkaIO 
>>> source from my branch with Flink runner if you get a chance.
>>> 
>>> thanks,
>>> Raghu.
>>> 
>>> On Mon, Mar 21, 2016 at 6:54 AM, Jean-Baptiste Onofré <[email protected]> 
>>> wrote:
>>> Thanks for the update Max !
>>> 
>>> Regards
>>> JB
>>> 
>>> 
>>> On 03/21/2016 02:39 PM, Maximilian Michels wrote:
>>> FYI: The Runner registration has been fixed. The Flink runner
>>> explicitly registers as of [1]. Also, the SDK tries to look up the
>>> PipelineRunner class in case it has not been registered [2].
>>> 
>>> [1] https://github.com/apache/incubator-beam/pull/40
>>> [2] https://github.com/apache/incubator-beam/pull/61
>>> 
>>> On Sat, Mar 19, 2016 at 6:43 PM, Maximilian Michels <[email protected]> wrote:
>>> Great to see such a lively discussion here.
>>> 
>>> I think we'll support sinks through the Write interface (like in
>>> batched execution) and also have a dedicated wrapper for the Flink
>>> sinks. This is a very pressing but easy to solve issue of the Flink
>>> runner. Expect it to be in next week.
>>> 
>>> Also, the proper registration of the runner is about to to be merged.
>>> We just need an ok from the contributor to merge the changes.
>>> 
>>> Best,
>>> Max
>>> 
>>> On Sat, Mar 19, 2016 at 12:42 AM, Dan Halperin <[email protected]> wrote:
>>> Thanks Bill!
>>> 
>>> Filed https://issues.apache.org/jira/browse/BEAM-136, but I'm glad it's not
>>> blocking you!
>>> 
>>> On Fri, Mar 18, 2016 at 4:04 PM, William McCarthy
>>> <[email protected]> wrote:
>>> 
>>> I tried that, but still no dice: Just to be clear, it’s not a blocker for
>>> me, given that I have my example running, but for your information the
>>> exception is below.
>>> 
>>> I’ll watch the commit log on the beam incubator and look forward to
>>> deleting my copy of Raghu’s contributions when they’re merger to master.
>>> 
>>> Thanks again for everyone’s help,
>>> 
>>> Bill
>>> 
>>> 
>>> Command followed by exception:
>>> 
>>> $ flink run -c
>>> com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample
>>> target/beam-1.0-SNAPSHOT.jar
>>> --runner=org.apache.beam.runners.flink.FlinkPipelineRunner
>>> --bootstrapServers=cl-pu4p:9092 --topics=test_in --outputTopic=test_out
>>> 
>>> ------------------------------------------------------------
>>>  The program finished with the following exception:
>>> 
>>> org.apache.flink.client.program.ProgramInvocationException: The main
>>> method caused an error.
>>> at
>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:520)
>>> at
>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
>>> at org.apache.flink.client.program.Client.runBlocking(Client.java:248)
>>> at
>>> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866)
>>> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
>>> at
>>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189)
>>> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239)
>>> Caused by: java.lang.IllegalArgumentException: Unknown 'runner' specified
>>> 'org.apache.beam.runners.flink.FlinkPipelineRunner', supported pipeline
>>> runners [BlockingDataflowPipelineRunner, DataflowPipelineRunner,
>>> DirectPipelineRunner]
>>> at
>>> com.google.cloud.dataflow.sdk.repackaged.com.google.common.base.Preconditions.checkArgument(Preconditions.java:146)
>>> at
>>> com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory.parseObjects(PipelineOptionsFactory.java:1445)
>>> at
>>> com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory.access$400(PipelineOptionsFactory.java:99)
>>> at
>>> com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory$Builder.as(PipelineOptionsFactory.java:284)
>>> at
>>> com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample.main(TopHashtagsExample.java:117)
>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>> at
>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>> at
>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>> at java.lang.reflect.Method.invoke(Method.java:498)
>>> at
>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
>>> ... 6 more
>>> 
>>> On Mar 18, 2016, at 5:35 PM, Thomas Groh <[email protected]> wrote:
>>> 
>>> I don't believe the FlinkPipelineRunner is registered the same way the
>>> Dataflow & Direct Pipeline runners are registered; using
>>> org.apache.beam.runners.flink.FlinkPipelineRunner should work
>>> 
>>> On Fri, Mar 18, 2016 at 2:10 PM, William McCarthy
>>> <[email protected]> wrote:
>>> 
>>> Thanks Dan,
>>> 
>>> I tried that, but getting the below. Note that the jar contains the
>>> FlinkPipelineRunner.
>>> 
>>> 
>>> 
>>> % jar -tf target/beam-1.0-SNAPSHOT.jar | grep FlinkPipeline
>>> org/apache/beam/runners/flink/FlinkPipelineRunner.class
>>> org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.class
>>> org/apache/beam/runners/flink/FlinkPipelineOptions.class
>>> org/apache/beam/runners/flink/translation/FlinkPipelineTranslator.class
>>> 
>>> % flink run -c
>>> com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample
>>> target/beam-1.0-SNAPSHOT.jar --runner=FlinkPipelineRunner
>>> --bootstrapServers=cl-pu4p:9092 --topics=test_in --outputTopic=test_out
>>> 
>>> ------------------------------------------------------------
>>>  The program finished with the following exception:
>>> 
>>> org.apache.flink.client.program.ProgramInvocationException: The main
>>> method caused an error.
>>> at
>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:520)
>>> at
>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
>>> at org.apache.flink.client.program.Client.runBlocking(Client.java:248)
>>> at
>>> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866)
>>> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
>>> at
>>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189)
>>> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239)
>>> Caused by: java.lang.IllegalArgumentException: Unknown 'runner' specified
>>> 'FlinkPipelineRunner', supported pipeline runners
>>> [BlockingDataflowPipelineRunner, DataflowPipelineRunner,
>>> DirectPipelineRunner]
>>> at
>>> com.google.cloud.dataflow.sdk.repackaged.com.google.common.base.Preconditions.checkArgument(Preconditions.java:146)
>>> at
>>> com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory.parseObjects(PipelineOptionsFactory.java:1445)
>>> at
>>> com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory.access$400(PipelineOptionsFactory.java:99)
>>> at
>>> com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory$Builder.as(PipelineOptionsFactory.java:284)
>>> at
>>> com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample.main(TopHashtagsExample.java:117)
>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>> at
>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>> at
>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>> at java.lang.reflect.Method.invoke(Method.java:498)
>>> at
>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
>>> ... 6 more
>>> 
>>> 
>>> 
>>> On Mar 18, 2016, at 5:00 PM, Dan Halperin <[email protected]> wrote:
>>> 
>>> Thanks for catching that, Aljoscha!
>>> 
>>> Note that the Flink runner should be available via a command-line option
>>> as well: --runner=FlinkPipelineRunner.
>>> 
>>> The list of valid values for that flag is computed by walking the
>>> classpath at runtime, so as long as the Flink jar is present it'll work.
>>> 
>>> On Fri, Mar 18, 2016 at 1:21 PM, Aljoscha Krettek <[email protected]>
>>> wrote:
>>> 
>>> Hi,
>>> looks like the example is being executed with the DirectPipelineRunner
>>> which does not seem to be able to cope with UnboundedSource. You need to set
>>> the runner to the FlinkRunner in the example code as described here:
>>> https://github.com/apache/incubator-beam/tree/master/runners/flink#executing-an-example
>>> 
>>> The Flink runner should be able to deal with UnboundedSource but has the
>>> limitation that sources are always parallelism=1 (this is being worked on,
>>> however).
>>> 
>>> Cheers,
>>> Aljoscha
>>> On 18 Mar 2016, at 20:56, Dan Halperin <[email protected]> wrote:
>>> 
>>> Looks like the Flink runner may not yet support arbitrary code written
>>> with the UnboundedSource API. That is, it looks like the Flink runner
>>> expects the sources to get translated away.
>>> 
>>> Max?
>>> 
>>> Dan
>>> 
>>> On Fri, Mar 18, 2016 at 12:20 PM, William McCarthy
>>> <[email protected]> wrote:
>>> Thanks Raghu,
>>> 
>>> When I try to run it on flink using the incubator-beam code, i.e.
>>> 
>>> flink run -c
>>> com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample
>>> target/beam-1.0-SNAPSHOT.jar --bootstrapServers=cl-pu4p:9092
>>> --topics=test_in --outputTopic=test_out
>>> 
>>> I get this:
>>> 
>>> org.apache.flink.client.program.ProgramInvocationException: The main
>>> method caused an error.
>>>       at
>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:520)
>>>       at
>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
>>>       at
>>> org.apache.flink.client.program.Client.runBlocking(Client.java:248)
>>>       at
>>> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866)
>>>       at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
>>>       at
>>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189)
>>>       at
>>> org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239)
>>> Caused by: java.lang.IllegalStateException: no evaluator registered
>>> for Read(UnboundedKafkaSource)
>>>       at
>>> com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner$Evaluator.visitTransform(DirectPipelineRunner.java:852)
>>>       at
>>> com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:219)
>>>       at
>>> com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:215)
>>>       at
>>> com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:215)
>>>       at
>>> com.google.cloud.dataflow.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:102)
>>>       at
>>> com.google.cloud.dataflow.sdk.Pipeline.traverseTopologically(Pipeline.java:259)
>>>       at
>>> com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner$Evaluator.run(DirectPipelineRunner.java:814)
>>>       at
>>> com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:526)
>>>       at
>>> com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:96)
>>>       at com.google.cloud.dataflow.sdk.Pipeline.run(Pipeline.java:180)
>>>       at
>>> com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample.main(TopHashtagsExample.java:140)
>>>       at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>       at
>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>       at
>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>       at java.lang.reflect.Method.invoke(Method.java:498)
>>>       at
>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
>>>       ... 6 more
>>> 
>>> Any ideas?
>>> 
>>> Bill
>>> 
>>> On Mar 18, 2016, at 2:47 PM, Raghu Angadi <[email protected]> wrote:
>>> 
>>> Thanks for trying it.
>>> 
>>> I fixed the CheckStyle error  (not sure why my build is not failing).
>>> Let me know if you see any issues running with Beam. I haven't tried it. I
>>> should. In fact Daniel Halperin says my patch should be against Beam..
>>> 
>>> Raghu.
>>> 
>>> On Fri, Mar 18, 2016 at 11:22 AM, William McCarthy
>>> <[email protected]> wrote:
>>> Thanks JB, Emanuele, Kostas & Raghu, especially Kostas and Raghu for
>>> pointing me to working code.
>>> 
>>> I’m in the middle of a hack day at the moment, so the speed of your
>>> responses has been very welcome.
>>> 
>>> In the first instance, I’ll try using your changes, Raghu. I’ve
>>> cloned your repo, switched to the kafka branch and built both contrib/kafka
>>> and contrib/examples/kafka. The contrib/kafka initially failed with a
>>> CheckStyle error
>>> (/Users/bill/dev/DataflowJavaSDK/contrib/kafka/src/main/java/com/google/cloud/dataflow/contrib/kafka/KafkaIO.java:683:12:
>>> 'private' modifier out of order with the JLS suggestions)… I’ve fixed that
>>> in my local clone and now it’s building fine. I hope to be able to run your
>>> contrib unchanged on top of the incubator-beam codebase, which will be what
>>> I attempt to do now.
>>> 
>>> Thanks again to all, for your swift help.
>>> 
>>> Bill
>>> 
>>> On Mar 18, 2016, at 12:55 PM, Raghu Angadi <[email protected]>
>>> wrote:
>>> 
>>> Hi Bill,
>>> 
>>> We have fairly well tested patch for KafkaIO (pr #121). It will be
>>> merged soon. The example there keeps track of top hashtags in 10 minute
>>> sliding window and writes the results to another Kafka topic. Please try it
>>> if you can. It is well tested on Google Cloud Dataflow. I have not run it
>>> using Flink runner.
>>> 
>>> Raghu.
>>> 
>>> On Fri, Mar 18, 2016 at 9:46 AM, Kostas Kloudas
>>> <[email protected]> wrote:
>>> Hello Bill,
>>> 
>>> This is a known limitation of the Flink Runner.
>>> There is a JIRA issue for that
>>> https://issues.apache.org/jira/browse/BEAM-127
>>> 
>>> A wrapper for Flink sinks will come soon and as Beam evolves,
>>> a more Beam-y solution will come as well.
>>> 
>>> Kostas
>>> On Mar 18, 2016, at 5:23 PM, William McCarthy
>>> <[email protected]> wrote:
>>> 
>>> Hi,
>>> 
>>> I’m trying to write a proof-of-concept which takes messages from
>>> Kafka, transforms them using Beam on Flink, then pushes the results onto a
>>> different Kafka topic.
>>> 
>>> I’ve used the KafkaWindowedWordCountExample as a starting point,
>>> and that’s doing the first part of what I want to do, but it outputs to text
>>> files as opposed to Kafka. FlinkKafkaProducer08 looks promising, but I can’t
>>> figure out how to plug it into the pipeline. I was thinking that it would be
>>> wrapped with an UnboundedFlinkSink, or some such, but that doesn’t seem to
>>> exist.
>>> 
>>> Any advice or thoughts on what I’m trying to do?
>>> 
>>> I’m running the latest incubator-beam (as of last night from
>>> Github), Flink 1.0.0 in cluster mode and Kafka 0.9.0.1, all on Google
>>> Compute Engine (Debian Jessie).
>>> 
>>> Thanks,
>>> 
>>> Bill McCarthy
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> -- 
>>> Jean-Baptiste Onofré
>>> [email protected]
>>> http://blog.nanthrax.net
>>> Talend - http://www.talend.com
>>> 
>> 
> 

Reply via email to