@Aljoscha I didn't know that Kafka always stores Key/Value but I see that we also have support for setting Kafka keys in Flink.
@JB I get your point that a sink is simply a DoFn, but a ParDo is not a good match for a sink. A Sink doesn't produce a PCollection but represents the end of a pipeline. Like an UnboundedSource, an UnboundedSink has also special needs, i.e. it needs to provide a checkpointing mechanism. I think we need something along the lines of the existing Write transform for batch. On Fri, Apr 29, 2016 at 12:27 PM, Jean-Baptiste Onofré <[email protected]> wrote: > Hi, > > KafkaIO uses KafkaRecord which is basically key,value + some metadata > (topic, partition, offset). > > Can you describe the behavior of an UnboundedSink ? > > UnboundedSource is obvious: it's still consuming data creating PCollection > sent into the pipeline. > > But UnboundedSink ? Do you mean that the UnboundedSink will write each > record in the PCollection ? When does it stop ? > > Regards > JB > > > On 04/29/2016 12:07 PM, Aljoscha Krettek wrote: >> >> Hi, >> I think the fact that KafkaIO has a <key, value> model comes from Kafka >> having a <key, value> model. I imagine most sources will emit the type of >> values appropriate for them. >> >> I agree with Max that the lack of an UnboundedSink seems strange. Do we >> have any "sinks" implemented as a ParDo already? >> >> Cheers, >> Aljoscha >> >> On Fri, 29 Apr 2016 at 11:22 Jean-Baptiste Onofré <[email protected]> wrote: >> >>> Hi Max, >>> >>> Your four points are valid and we already discussed about that. >>> >>> 1. +1, the runner API should bring utils around that >>> 2. UnboundedSink has been discussed (I don't see strong use case for >>> now, as it takes a PCollection). >>> 3. +1, Dan talked about improving the hierarchy. >>> 4. +1, I'm working on new IO (JMS, MQTT, JDBC, Cassandra, Camel, ...). >>> >>> I would add: >>> >>> 5. Add a page on the website listing the IO, their usage and >>> configuration. Something like we have in Camel: >>> http://camel.apache.org/components.html >>> 6. Refactore the FileIO to avoid usage of IOChannelFactory and use a >>> filesystem plugin (file, hdfs, s3, aws, etc). >>> >>> Dan planned to create "util" to deal with watermark and timestamps. >>> >>> Regards >>> JB >>> >>> On 04/29/2016 11:11 AM, Maximilian Michels wrote: >>>> >>>> @Amir: This is the Developer mailing list. Please post your questions >>>> regarding Beam on the user mailing list. >>>> >>>> +1 for portability in general. However, I see some crucial TODOs coming >>> >>> up: >>>> >>>> >>>> 1) Improving the integration of Runners with the Beam sink/source API >>>> 2) Providing interfaces to implement new connectors (i.e. still no >>>> existing UnboundedSink) >>>> 3) Extending existing interfaces to ease implementation of connectors >>>> and provide a uniform API (i.e. on top of UnboundedSource) >>>> 4) Working on a more complete set of connectors in Beam >>>> >>>> Looking at the KafkaIO implementation, I wonder shouldn't we extract >>>> the custom Watermark and Timestamp function into an extra interface? >>>> All connectors are going to have methods these methods. It would be >>>> nice to have a uniform API among the connectors. >>>> >>>> Further, the KafkaIO enforces a <key, value> data model which AFAIK is >>>> not enforced by the Beam model. I don't know the details for this >>>> design decision but I would like this to be communicated before it is >>>> merged into the master. Otherwise, it will be harder to achieve >>>> portability among the Runners. In Flink and Spark we are already >>>> experienced with all kinds of connectors and user's needs. It would be >>>> nice to feed that back in the course of adding new connectors to the >>>> Beam API. >>>> >>>> I would expect an active discussion on the Dev mailing list before any >>>> new connector API gets merged. Furthermore, let us provide better >>>> interfaces for connector needs. Finally, let us introduce unbounded >>>> sinks :) >>>> >>>> On Fri, Apr 29, 2016 at 7:54 AM, amir bahmanyari >>>> <[email protected]> wrote: >>>>> >>>>> This may help trace it:Exception in thread "main" >>> >>> java.lang.IllegalStateException: no evaluator registered for >>> Read(UnboundedKafkaSource) at >>> >>> org.apache.beam.sdk.runners.DirectPipelineRunner$Evaluator.visitTransform(DirectPipelineRunner.java:898) >>> at >>> >>> org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:221) >>> at >>> >>> org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:217) >>> at >>> >>> org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:217) >>> at >>> >>> org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:104) >>> at org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:261) >>> at >>> >>> org.apache.beam.sdk.runners.DirectPipelineRunner$Evaluator.run(DirectPipelineRunner.java:860) >>> at >>> >>> org.apache.beam.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:572) >>> at >>> >>> org.apache.beam.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:106) >>> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:182) at benchmark.fli >>> n >>> kspark.flink.ReadFromKafka2.main(ReadFromKafka2.java:286) >>>>> >>>>> >>>>> From: Jean-Baptiste Onofré <[email protected]> >>>>> To: [email protected] >>>>> Sent: Thursday, April 28, 2016 10:08 PM >>>>> Subject: Re: [DISCUSS] Beam IO &runners native IO >>>>> >>>>> I gonna take a look. The DirectPipelineRunner didn't support the >>>>> unbounded collection (it has been fixed last night AFAIR). It could be >>>>> related. >>>>> >>>>> Regards >>>>> JB >>>>> >>>>> On 04/29/2016 07:00 AM, amir bahmanyari wrote: >>>>>> >>>>>> Hi JB,I used the sample KafkaIO usage >>> >>> >>> below.p.apply(KafkaIO.read().withBootstrapServers("kafkahost:9092").withTopics(topics)); >>>>>> >>>>>> p.run(); >>>>>> >>>>>> It threw the following at p.run():Exception in thread "main" >>> >>> java.lang.IllegalStateException: no evaluator registered for >>> Read(UnboundedKafkaSource) at >>> >>> org.apache.beam.sdk.runners.DirectPipelineRunner$Evaluator.visitTransform(DirectPipelineRunner.java:898) >>> I am sure I am missing something.Would be great if I could see a sample >>> code.I appreciate it sir.Cheers >>>>>> >>>>>> >>>>>> From: Jean-Baptiste Onofré <[email protected]> >>>>>> To: [email protected] >>>>>> Sent: Thursday, April 28, 2016 5:30 AM >>>>>> Subject: [DISCUSS] Beam IO &runners native IO >>>>>> >>>>>> Hi all, >>>>>> >>>>>> regarding the recent threads on the mailing list, I would like to >>>>>> start >>>>>> a format discussion around the IO. >>>>>> As we can expect the first contributions on this area (I already have >>>>>> some work in progress around this ;)), I think it's a fair discussion >>> >>> to >>>>>> >>>>>> have. >>>>>> >>>>>> Now, we have two kinds of IO: the one "generic" to Beam, the one >>> >>> "local" >>>>>> >>>>>> to the runners. >>>>>> >>>>>> For example, let's take Kafka: we have the KafkaIO (in IO), and for >>>>>> instance, we have the spark-streaming kafka connector (in Spark >>> >>> Runner). >>>>>> >>>>>> >>>>>> Right now, we have two approaches for the user: >>>>>> 1. In the pipeline, we use KafkaIO from Beam: it's the preferred >>>>>> approach for sure. However, the user may want to use the runner >>> >>> specific >>>>>> >>>>>> IO for two reasons: >>>>>> * Beam doesn't provide the IO yet (for instance, spark >>>>>> cassandra >>>>>> connector is available whereas we don't have yet any CassandraIO (I'm >>>>>> working on it anyway ;)) >>>>>> * The runner native IO is optimized or contain more features >>> >>> that the >>>>>> >>>>>> Beam native IO >>>>>> 2. So, for the previous reasons, the user could want to use the native >>>>>> runner IO. The drawback of this approach is that the pipeline will be >>>>>> tight to a specific runner, which is completely against the Beam >>> >>> design. >>>>>> >>>>>> >>>>>> I wonder if it wouldn't make sense to add flag on the IO API (and >>>>>> related on Runner API) like .useNative(). >>>>>> >>>>>> For instance, the user would be able to do: >>>>>> >>>>>> >>> >>> pipeline.apply(KafkaIO.read().withBootstrapServers("...").withTopics("...").useNative(true); >>>>>> >>>>>> >>>>>> then, if the runner has a "native" IO, it will use it, else, if >>>>>> useNative(false) (the default), it won't use any runner native IO. >>>>>> >>>>>> The point there is for the configuration: assuming the Beam IO and the >>>>>> runner IO can differ, it means that the "Beam IO" would have to >>> >>> populate >>>>>> >>>>>> all runner specific IO configuration. >>>>>> >>>>>> Of course, it's always possible to use a PTransform to wrap the runner >>>>>> native IO, but we are back on the same concern: the pipeline will be >>>>>> couple to a specific runner. >>>>>> >>>>>> The purpose of the useNative() flag is to "automatically" inform the >>>>>> runner to use a specific IO if it has one: the pipeline stays >>>>>> decoupled >>>>>> from the runners. >>>>>> >>>>>> Thoughts ? >>>>>> >>>>>> Thanks >>>>>> Regards >>>>>> JB >>>>>> >>>>> >>>>> -- >>>>> Jean-Baptiste Onofré >>>>> [email protected] >>>>> http://blog.nanthrax.net >>>>> Talend - http://www.talend.com >>>>> >>>>> >>>>> >>> >>> -- >>> Jean-Baptiste Onofré >>> [email protected] >>> http://blog.nanthrax.net >>> Talend - http://www.talend.com >>> >> > > -- > Jean-Baptiste Onofré > [email protected] > http://blog.nanthrax.net > Talend - http://www.talend.com
