Thanks Max.

Bill McCarthy,
I know you are unblocked and KafkaWriter is good enough. Please try KafkaIO
source from my branch with Flink runner if you get a chance.

thanks,
Raghu.

On Mon, Mar 21, 2016 at 6:54 AM, Jean-Baptiste Onofré <[email protected]>
wrote:

> Thanks for the update Max !
>
> Regards
> JB
>
>
> On 03/21/2016 02:39 PM, Maximilian Michels wrote:
>
>> FYI: The Runner registration has been fixed. The Flink runner
>> explicitly registers as of [1]. Also, the SDK tries to look up the
>> PipelineRunner class in case it has not been registered [2].
>>
>> [1] https://github.com/apache/incubator-beam/pull/40
>> [2] https://github.com/apache/incubator-beam/pull/61
>>
>> On Sat, Mar 19, 2016 at 6:43 PM, Maximilian Michels <[email protected]>
>> wrote:
>>
>>> Great to see such a lively discussion here.
>>>
>>> I think we'll support sinks through the Write interface (like in
>>> batched execution) and also have a dedicated wrapper for the Flink
>>> sinks. This is a very pressing but easy to solve issue of the Flink
>>> runner. Expect it to be in next week.
>>>
>>> Also, the proper registration of the runner is about to to be merged.
>>> We just need an ok from the contributor to merge the changes.
>>>
>>> Best,
>>> Max
>>>
>>> On Sat, Mar 19, 2016 at 12:42 AM, Dan Halperin <[email protected]>
>>> wrote:
>>>
>>>> Thanks Bill!
>>>>
>>>> Filed https://issues.apache.org/jira/browse/BEAM-136, but I'm glad
>>>> it's not
>>>> blocking you!
>>>>
>>>> On Fri, Mar 18, 2016 at 4:04 PM, William McCarthy
>>>> <[email protected]> wrote:
>>>>
>>>>>
>>>>> I tried that, but still no dice: Just to be clear, it’s not a blocker
>>>>> for
>>>>> me, given that I have my example running, but for your information the
>>>>> exception is below.
>>>>>
>>>>> I’ll watch the commit log on the beam incubator and look forward to
>>>>> deleting my copy of Raghu’s contributions when they’re merger to
>>>>> master.
>>>>>
>>>>> Thanks again for everyone’s help,
>>>>>
>>>>> Bill
>>>>>
>>>>>
>>>>> Command followed by exception:
>>>>>
>>>>> $ flink run -c
>>>>> com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample
>>>>> target/beam-1.0-SNAPSHOT.jar
>>>>> --runner=org.apache.beam.runners.flink.FlinkPipelineRunner
>>>>> --bootstrapServers=cl-pu4p:9092 --topics=test_in --outputTopic=test_out
>>>>>
>>>>> ------------------------------------------------------------
>>>>>   The program finished with the following exception:
>>>>>
>>>>> org.apache.flink.client.program.ProgramInvocationException: The main
>>>>> method caused an error.
>>>>> at
>>>>>
>>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:520)
>>>>> at
>>>>>
>>>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
>>>>> at org.apache.flink.client.program.Client.runBlocking(Client.java:248)
>>>>> at
>>>>>
>>>>> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866)
>>>>> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
>>>>> at
>>>>>
>>>>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189)
>>>>> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239)
>>>>> Caused by: java.lang.IllegalArgumentException: Unknown 'runner'
>>>>> specified
>>>>> 'org.apache.beam.runners.flink.FlinkPipelineRunner', supported pipeline
>>>>> runners [BlockingDataflowPipelineRunner, DataflowPipelineRunner,
>>>>> DirectPipelineRunner]
>>>>> at
>>>>>
>>>>> com.google.cloud.dataflow.sdk.repackaged.com.google.common.base.Preconditions.checkArgument(Preconditions.java:146)
>>>>> at
>>>>>
>>>>> com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory.parseObjects(PipelineOptionsFactory.java:1445)
>>>>> at
>>>>>
>>>>> com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory.access$400(PipelineOptionsFactory.java:99)
>>>>> at
>>>>>
>>>>> com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory$Builder.as(PipelineOptionsFactory.java:284)
>>>>> at
>>>>>
>>>>> com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample.main(TopHashtagsExample.java:117)
>>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>>> at
>>>>>
>>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>>> at
>>>>>
>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>> at java.lang.reflect.Method.invoke(Method.java:498)
>>>>> at
>>>>>
>>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
>>>>> ... 6 more
>>>>>
>>>>> On Mar 18, 2016, at 5:35 PM, Thomas Groh <[email protected]> wrote:
>>>>>
>>>>> I don't believe the FlinkPipelineRunner is registered the same way the
>>>>> Dataflow & Direct Pipeline runners are registered; using
>>>>> org.apache.beam.runners.flink.FlinkPipelineRunner should work
>>>>>
>>>>> On Fri, Mar 18, 2016 at 2:10 PM, William McCarthy
>>>>> <[email protected]> wrote:
>>>>>
>>>>>>
>>>>>> Thanks Dan,
>>>>>>
>>>>>> I tried that, but getting the below. Note that the jar contains the
>>>>>> FlinkPipelineRunner.
>>>>>>
>>>>>>
>>>>>>
>>>>>> % jar -tf target/beam-1.0-SNAPSHOT.jar | grep FlinkPipeline
>>>>>> org/apache/beam/runners/flink/FlinkPipelineRunner.class
>>>>>> org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.class
>>>>>> org/apache/beam/runners/flink/FlinkPipelineOptions.class
>>>>>>
>>>>>> org/apache/beam/runners/flink/translation/FlinkPipelineTranslator.class
>>>>>>
>>>>>> % flink run -c
>>>>>> com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample
>>>>>> target/beam-1.0-SNAPSHOT.jar --runner=FlinkPipelineRunner
>>>>>> --bootstrapServers=cl-pu4p:9092 --topics=test_in
>>>>>> --outputTopic=test_out
>>>>>>
>>>>>> ------------------------------------------------------------
>>>>>>   The program finished with the following exception:
>>>>>>
>>>>>> org.apache.flink.client.program.ProgramInvocationException: The main
>>>>>> method caused an error.
>>>>>> at
>>>>>>
>>>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:520)
>>>>>> at
>>>>>>
>>>>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
>>>>>> at org.apache.flink.client.program.Client.runBlocking(Client.java:248)
>>>>>> at
>>>>>>
>>>>>> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866)
>>>>>> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
>>>>>> at
>>>>>>
>>>>>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189)
>>>>>> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239)
>>>>>> Caused by: java.lang.IllegalArgumentException: Unknown 'runner'
>>>>>> specified
>>>>>> 'FlinkPipelineRunner', supported pipeline runners
>>>>>> [BlockingDataflowPipelineRunner, DataflowPipelineRunner,
>>>>>> DirectPipelineRunner]
>>>>>> at
>>>>>>
>>>>>> com.google.cloud.dataflow.sdk.repackaged.com.google.common.base.Preconditions.checkArgument(Preconditions.java:146)
>>>>>> at
>>>>>>
>>>>>> com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory.parseObjects(PipelineOptionsFactory.java:1445)
>>>>>> at
>>>>>>
>>>>>> com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory.access$400(PipelineOptionsFactory.java:99)
>>>>>> at
>>>>>>
>>>>>> com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory$Builder.as(PipelineOptionsFactory.java:284)
>>>>>> at
>>>>>>
>>>>>> com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample.main(TopHashtagsExample.java:117)
>>>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>>>> at
>>>>>>
>>>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>>>> at
>>>>>>
>>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>>> at java.lang.reflect.Method.invoke(Method.java:498)
>>>>>> at
>>>>>>
>>>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
>>>>>> ... 6 more
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Mar 18, 2016, at 5:00 PM, Dan Halperin <[email protected]>
>>>>>> wrote:
>>>>>>
>>>>>> Thanks for catching that, Aljoscha!
>>>>>>
>>>>>> Note that the Flink runner should be available via a command-line
>>>>>> option
>>>>>> as well: --runner=FlinkPipelineRunner.
>>>>>>
>>>>>> The list of valid values for that flag is computed by walking the
>>>>>> classpath at runtime, so as long as the Flink jar is present it'll
>>>>>> work.
>>>>>>
>>>>>> On Fri, Mar 18, 2016 at 1:21 PM, Aljoscha Krettek <
>>>>>> [email protected]>
>>>>>> wrote:
>>>>>>
>>>>>>>
>>>>>>> Hi,
>>>>>>> looks like the example is being executed with the
>>>>>>> DirectPipelineRunner
>>>>>>> which does not seem to be able to cope with UnboundedSource. You
>>>>>>> need to set
>>>>>>> the runner to the FlinkRunner in the example code as described here:
>>>>>>>
>>>>>>> https://github.com/apache/incubator-beam/tree/master/runners/flink#executing-an-example
>>>>>>>
>>>>>>> The Flink runner should be able to deal with UnboundedSource but has
>>>>>>> the
>>>>>>> limitation that sources are always parallelism=1 (this is being
>>>>>>> worked on,
>>>>>>> however).
>>>>>>>
>>>>>>> Cheers,
>>>>>>> Aljoscha
>>>>>>>
>>>>>>>> On 18 Mar 2016, at 20:56, Dan Halperin <[email protected]> wrote:
>>>>>>>>
>>>>>>>> Looks like the Flink runner may not yet support arbitrary code
>>>>>>>> written
>>>>>>>> with the UnboundedSource API. That is, it looks like the Flink
>>>>>>>> runner
>>>>>>>> expects the sources to get translated away.
>>>>>>>>
>>>>>>>> Max?
>>>>>>>>
>>>>>>>> Dan
>>>>>>>>
>>>>>>>> On Fri, Mar 18, 2016 at 12:20 PM, William McCarthy
>>>>>>>> <[email protected]> wrote:
>>>>>>>> Thanks Raghu,
>>>>>>>>
>>>>>>>> When I try to run it on flink using the incubator-beam code, i.e.
>>>>>>>>
>>>>>>>> flink run -c
>>>>>>>> com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample
>>>>>>>> target/beam-1.0-SNAPSHOT.jar --bootstrapServers=cl-pu4p:9092
>>>>>>>> --topics=test_in --outputTopic=test_out
>>>>>>>>
>>>>>>>> I get this:
>>>>>>>>
>>>>>>>> org.apache.flink.client.program.ProgramInvocationException: The main
>>>>>>>> method caused an error.
>>>>>>>>        at
>>>>>>>>
>>>>>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:520)
>>>>>>>>        at
>>>>>>>>
>>>>>>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
>>>>>>>>        at
>>>>>>>> org.apache.flink.client.program.Client.runBlocking(Client.java:248)
>>>>>>>>        at
>>>>>>>>
>>>>>>>> org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:866)
>>>>>>>>        at
>>>>>>>> org.apache.flink.client.CliFrontend.run(CliFrontend.java:333)
>>>>>>>>        at
>>>>>>>>
>>>>>>>> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1189)
>>>>>>>>        at
>>>>>>>> org.apache.flink.client.CliFrontend.main(CliFrontend.java:1239)
>>>>>>>> Caused by: java.lang.IllegalStateException: no evaluator registered
>>>>>>>> for Read(UnboundedKafkaSource)
>>>>>>>>        at
>>>>>>>>
>>>>>>>> com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner$Evaluator.visitTransform(DirectPipelineRunner.java:852)
>>>>>>>>        at
>>>>>>>>
>>>>>>>> com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:219)
>>>>>>>>        at
>>>>>>>>
>>>>>>>> com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:215)
>>>>>>>>        at
>>>>>>>>
>>>>>>>> com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:215)
>>>>>>>>        at
>>>>>>>>
>>>>>>>> com.google.cloud.dataflow.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:102)
>>>>>>>>        at
>>>>>>>>
>>>>>>>> com.google.cloud.dataflow.sdk.Pipeline.traverseTopologically(Pipeline.java:259)
>>>>>>>>        at
>>>>>>>>
>>>>>>>> com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner$Evaluator.run(DirectPipelineRunner.java:814)
>>>>>>>>        at
>>>>>>>>
>>>>>>>> com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:526)
>>>>>>>>        at
>>>>>>>>
>>>>>>>> com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:96)
>>>>>>>>        at
>>>>>>>> com.google.cloud.dataflow.sdk.Pipeline.run(Pipeline.java:180)
>>>>>>>>        at
>>>>>>>>
>>>>>>>> com.google.cloud.dataflow.contrib.kafka.examples.TopHashtagsExample.main(TopHashtagsExample.java:140)
>>>>>>>>        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native
>>>>>>>> Method)
>>>>>>>>        at
>>>>>>>>
>>>>>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>>>>>>        at
>>>>>>>>
>>>>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>>>>>        at java.lang.reflect.Method.invoke(Method.java:498)
>>>>>>>>        at
>>>>>>>>
>>>>>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:505)
>>>>>>>>        ... 6 more
>>>>>>>>
>>>>>>>> Any ideas?
>>>>>>>>
>>>>>>>> Bill
>>>>>>>>
>>>>>>>> On Mar 18, 2016, at 2:47 PM, Raghu Angadi <[email protected]>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>> Thanks for trying it.
>>>>>>>>>
>>>>>>>>> I fixed the CheckStyle error  (not sure why my build is not
>>>>>>>>> failing).
>>>>>>>>> Let me know if you see any issues running with Beam. I haven't
>>>>>>>>> tried it. I
>>>>>>>>> should. In fact Daniel Halperin says my patch should be against
>>>>>>>>> Beam..
>>>>>>>>>
>>>>>>>>> Raghu.
>>>>>>>>>
>>>>>>>>> On Fri, Mar 18, 2016 at 11:22 AM, William McCarthy
>>>>>>>>> <[email protected]> wrote:
>>>>>>>>> Thanks JB, Emanuele, Kostas & Raghu, especially Kostas and Raghu
>>>>>>>>> for
>>>>>>>>> pointing me to working code.
>>>>>>>>>
>>>>>>>>> I’m in the middle of a hack day at the moment, so the speed of your
>>>>>>>>> responses has been very welcome.
>>>>>>>>>
>>>>>>>>> In the first instance, I’ll try using your changes, Raghu. I’ve
>>>>>>>>> cloned your repo, switched to the kafka branch and built both
>>>>>>>>> contrib/kafka
>>>>>>>>> and contrib/examples/kafka. The contrib/kafka initially failed
>>>>>>>>> with a
>>>>>>>>> CheckStyle error
>>>>>>>>>
>>>>>>>>> (/Users/bill/dev/DataflowJavaSDK/contrib/kafka/src/main/java/com/google/cloud/dataflow/contrib/kafka/KafkaIO.java:683:12:
>>>>>>>>> 'private' modifier out of order with the JLS suggestions)… I’ve
>>>>>>>>> fixed that
>>>>>>>>> in my local clone and now it’s building fine. I hope to be able to
>>>>>>>>> run your
>>>>>>>>> contrib unchanged on top of the incubator-beam codebase, which
>>>>>>>>> will be what
>>>>>>>>> I attempt to do now.
>>>>>>>>>
>>>>>>>>> Thanks again to all, for your swift help.
>>>>>>>>>
>>>>>>>>> Bill
>>>>>>>>>
>>>>>>>>> On Mar 18, 2016, at 12:55 PM, Raghu Angadi <[email protected]>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>> Hi Bill,
>>>>>>>>>>
>>>>>>>>>> We have fairly well tested patch for KafkaIO (pr #121). It will be
>>>>>>>>>> merged soon. The example there keeps track of top hashtags in 10
>>>>>>>>>> minute
>>>>>>>>>> sliding window and writes the results to another Kafka topic.
>>>>>>>>>> Please try it
>>>>>>>>>> if you can. It is well tested on Google Cloud Dataflow. I have
>>>>>>>>>> not run it
>>>>>>>>>> using Flink runner.
>>>>>>>>>>
>>>>>>>>>> Raghu.
>>>>>>>>>>
>>>>>>>>>> On Fri, Mar 18, 2016 at 9:46 AM, Kostas Kloudas
>>>>>>>>>> <[email protected]> wrote:
>>>>>>>>>> Hello Bill,
>>>>>>>>>>
>>>>>>>>>> This is a known limitation of the Flink Runner.
>>>>>>>>>> There is a JIRA issue for that
>>>>>>>>>> https://issues.apache.org/jira/browse/BEAM-127
>>>>>>>>>>
>>>>>>>>>> A wrapper for Flink sinks will come soon and as Beam evolves,
>>>>>>>>>> a more Beam-y solution will come as well.
>>>>>>>>>>
>>>>>>>>>> Kostas
>>>>>>>>>>
>>>>>>>>>>> On Mar 18, 2016, at 5:23 PM, William McCarthy
>>>>>>>>>>> <[email protected]> wrote:
>>>>>>>>>>>
>>>>>>>>>>> Hi,
>>>>>>>>>>>
>>>>>>>>>>> I’m trying to write a proof-of-concept which takes messages from
>>>>>>>>>>> Kafka, transforms them using Beam on Flink, then pushes the
>>>>>>>>>>> results onto a
>>>>>>>>>>> different Kafka topic.
>>>>>>>>>>>
>>>>>>>>>>> I’ve used the KafkaWindowedWordCountExample as a starting point,
>>>>>>>>>>> and that’s doing the first part of what I want to do, but it
>>>>>>>>>>> outputs to text
>>>>>>>>>>> files as opposed to Kafka. FlinkKafkaProducer08 looks promising,
>>>>>>>>>>> but I can’t
>>>>>>>>>>> figure out how to plug it into the pipeline. I was thinking that
>>>>>>>>>>> it would be
>>>>>>>>>>> wrapped with an UnboundedFlinkSink, or some such, but that
>>>>>>>>>>> doesn’t seem to
>>>>>>>>>>> exist.
>>>>>>>>>>>
>>>>>>>>>>> Any advice or thoughts on what I’m trying to do?
>>>>>>>>>>>
>>>>>>>>>>> I’m running the latest incubator-beam (as of last night from
>>>>>>>>>>> Github), Flink 1.0.0 in cluster mode and Kafka 0.9.0.1, all on
>>>>>>>>>>> Google
>>>>>>>>>>> Compute Engine (Debian Jessie).
>>>>>>>>>>>
>>>>>>>>>>> Thanks,
>>>>>>>>>>>
>>>>>>>>>>> Bill McCarthy
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>
> --
> Jean-Baptiste Onofré
> [email protected]
> http://blog.nanthrax.net
> Talend - http://www.talend.com
>

Reply via email to