+Dan Halperin (who is OOO for a couple of days) Yes, there are plans for unbounded sinks. But unlike sources, sinks don't add any additional functionality beyond a ParDo (they just make it more obvious how to use a ParDo appropriately to get the right fault tolerance). So they haven't been prioritized yet.
On Fri, Apr 29, 2016 at 4:43 AM, Jean-Baptiste Onofré <j...@nanthrax.net> wrote: > Hi Amit, > > yes, definitely, the highest priority is that the existing runners have to > fully work with Beam IO. I will work with you on the Spark runner about > that. > > Regards > JB > > > On 04/29/2016 01:06 PM, Amit Sela wrote: > >> +1 on Max's comment on active discussion for connector API. >> I think that we can use Spark and Flink's existing connectors (Spark >> supports many) as test cases and consider a bottom-up design approach >> rather than top-down, especially since we're in incubation. >> >> Also +1 for Davor that runners should make SDK IOs a high priority - this >> will probably give us more input then any good email thread.. I'll do my >> best to make this happen for the Spark runner. >> >> On Fri, Apr 29, 2016 at 1:45 PM Maximilian Michels <m...@apache.org> >> wrote: >> >> @Aljoscha I didn't know that Kafka always stores Key/Value but I see >>> that we also have support for setting Kafka keys in Flink. >>> >>> @JB I get your point that a sink is simply a DoFn, but a ParDo is not >>> a good match for a sink. A Sink doesn't produce a PCollection but >>> represents the end of a pipeline. Like an UnboundedSource, an >>> UnboundedSink has also special needs, i.e. it needs to provide a >>> checkpointing mechanism. I think we need something along the lines of >>> the existing Write transform for batch. >>> >>> On Fri, Apr 29, 2016 at 12:27 PM, Jean-Baptiste Onofré <j...@nanthrax.net> >>> wrote: >>> >>>> Hi, >>>> >>>> KafkaIO uses KafkaRecord which is basically key,value + some metadata >>>> (topic, partition, offset). >>>> >>>> Can you describe the behavior of an UnboundedSink ? >>>> >>>> UnboundedSource is obvious: it's still consuming data creating >>>> >>> PCollection >>> >>>> sent into the pipeline. >>>> >>>> But UnboundedSink ? Do you mean that the UnboundedSink will write each >>>> record in the PCollection ? When does it stop ? >>>> >>>> Regards >>>> JB >>>> >>>> >>>> On 04/29/2016 12:07 PM, Aljoscha Krettek wrote: >>>> >>>>> >>>>> Hi, >>>>> I think the fact that KafkaIO has a <key, value> model comes from Kafka >>>>> having a <key, value> model. I imagine most sources will emit the type >>>>> >>>> of >>> >>>> values appropriate for them. >>>>> >>>>> I agree with Max that the lack of an UnboundedSink seems strange. Do we >>>>> have any "sinks" implemented as a ParDo already? >>>>> >>>>> Cheers, >>>>> Aljoscha >>>>> >>>>> On Fri, 29 Apr 2016 at 11:22 Jean-Baptiste Onofré <j...@nanthrax.net> >>>>> >>>> wrote: >>> >>>> >>>>> Hi Max, >>>>>> >>>>>> Your four points are valid and we already discussed about that. >>>>>> >>>>>> 1. +1, the runner API should bring utils around that >>>>>> 2. UnboundedSink has been discussed (I don't see strong use case for >>>>>> now, as it takes a PCollection). >>>>>> 3. +1, Dan talked about improving the hierarchy. >>>>>> 4. +1, I'm working on new IO (JMS, MQTT, JDBC, Cassandra, Camel, ...). >>>>>> >>>>>> I would add: >>>>>> >>>>>> 5. Add a page on the website listing the IO, their usage and >>>>>> configuration. Something like we have in Camel: >>>>>> http://camel.apache.org/components.html >>>>>> 6. Refactore the FileIO to avoid usage of IOChannelFactory and use a >>>>>> filesystem plugin (file, hdfs, s3, aws, etc). >>>>>> >>>>>> Dan planned to create "util" to deal with watermark and timestamps. >>>>>> >>>>>> Regards >>>>>> JB >>>>>> >>>>>> On 04/29/2016 11:11 AM, Maximilian Michels wrote: >>>>>> >>>>>>> >>>>>>> @Amir: This is the Developer mailing list. Please post your questions >>>>>>> regarding Beam on the user mailing list. >>>>>>> >>>>>>> +1 for portability in general. However, I see some crucial TODOs >>>>>>> >>>>>> coming >>> >>>> >>>>>> up: >>>>>> >>>>>>> >>>>>>> >>>>>>> 1) Improving the integration of Runners with the Beam sink/source API >>>>>>> 2) Providing interfaces to implement new connectors (i.e. still no >>>>>>> existing UnboundedSink) >>>>>>> 3) Extending existing interfaces to ease implementation of connectors >>>>>>> and provide a uniform API (i.e. on top of UnboundedSource) >>>>>>> 4) Working on a more complete set of connectors in Beam >>>>>>> >>>>>>> Looking at the KafkaIO implementation, I wonder shouldn't we extract >>>>>>> the custom Watermark and Timestamp function into an extra interface? >>>>>>> All connectors are going to have methods these methods. It would be >>>>>>> nice to have a uniform API among the connectors. >>>>>>> >>>>>>> Further, the KafkaIO enforces a <key, value> data model which AFAIK >>>>>>> is >>>>>>> not enforced by the Beam model. I don't know the details for this >>>>>>> design decision but I would like this to be communicated before it is >>>>>>> merged into the master. Otherwise, it will be harder to achieve >>>>>>> portability among the Runners. In Flink and Spark we are already >>>>>>> experienced with all kinds of connectors and user's needs. It would >>>>>>> be >>>>>>> nice to feed that back in the course of adding new connectors to the >>>>>>> Beam API. >>>>>>> >>>>>>> I would expect an active discussion on the Dev mailing list before >>>>>>> any >>>>>>> new connector API gets merged. Furthermore, let us provide better >>>>>>> interfaces for connector needs. Finally, let us introduce unbounded >>>>>>> sinks :) >>>>>>> >>>>>>> On Fri, Apr 29, 2016 at 7:54 AM, amir bahmanyari >>>>>>> <amirto...@yahoo.com.invalid> wrote: >>>>>>> >>>>>>>> >>>>>>>> This may help trace it:Exception in thread "main" >>>>>>>> >>>>>>> >>>>>> java.lang.IllegalStateException: no evaluator registered for >>>>>> Read(UnboundedKafkaSource) at >>>>>> >>>>>> >>>>>> >>> org.apache.beam.sdk.runners.DirectPipelineRunner$Evaluator.visitTransform(DirectPipelineRunner.java:898) >>> >>>> at >>>>>> >>>>>> >>>>>> >>> org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:221) >>> >>>> at >>>>>> >>>>>> >>>>>> >>> org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:217) >>> >>>> at >>>>>> >>>>>> >>>>>> >>> org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:217) >>> >>>> at >>>>>> >>>>>> >>>>>> >>> org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:104) >>> >>>> at >>>>>> >>>>> org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:261) >>> >>>> at >>>>>> >>>>>> >>>>>> >>> org.apache.beam.sdk.runners.DirectPipelineRunner$Evaluator.run(DirectPipelineRunner.java:860) >>> >>>> at >>>>>> >>>>>> >>>>>> >>> org.apache.beam.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:572) >>> >>>> at >>>>>> >>>>>> >>>>>> >>> org.apache.beam.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:106) >>> >>>> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:182) at benchmark.fli >>>>>> n >>>>>> kspark.flink.ReadFromKafka2.main(ReadFromKafka2.java:286) >>>>>> >>>>>>> >>>>>>>> >>>>>>>> From: Jean-Baptiste Onofré <j...@nanthrax.net> >>>>>>>> To: dev@beam.incubator.apache.org >>>>>>>> Sent: Thursday, April 28, 2016 10:08 PM >>>>>>>> Subject: Re: [DISCUSS] Beam IO &runners native IO >>>>>>>> >>>>>>>> I gonna take a look. The DirectPipelineRunner didn't support the >>>>>>>> unbounded collection (it has been fixed last night AFAIR). It could >>>>>>>> >>>>>>> be >>> >>>> related. >>>>>>>> >>>>>>>> Regards >>>>>>>> JB >>>>>>>> >>>>>>>> On 04/29/2016 07:00 AM, amir bahmanyari wrote: >>>>>>>> >>>>>>>>> >>>>>>>>> Hi JB,I used the sample KafkaIO usage >>>>>>>>> >>>>>>>> >>>>>> >>>>>> >>>>>> >>> below.p.apply(KafkaIO.read().withBootstrapServers("kafkahost:9092").withTopics(topics)); >>> >>>> >>>>>>>>> p.run(); >>>>>>>>> >>>>>>>>> It threw the following at p.run():Exception in thread "main" >>>>>>>>> >>>>>>>> >>>>>> java.lang.IllegalStateException: no evaluator registered for >>>>>> Read(UnboundedKafkaSource) at >>>>>> >>>>>> >>>>>> >>> org.apache.beam.sdk.runners.DirectPipelineRunner$Evaluator.visitTransform(DirectPipelineRunner.java:898) >>> >>>> I am sure I am missing something.Would be great if I could see a sample >>>>>> code.I appreciate it sir.Cheers >>>>>> >>>>>>> >>>>>>>>> >>>>>>>>> From: Jean-Baptiste Onofré <j...@nanthrax.net> >>>>>>>>> To: dev@beam.incubator.apache.org >>>>>>>>> Sent: Thursday, April 28, 2016 5:30 AM >>>>>>>>> Subject: [DISCUSS] Beam IO &runners native IO >>>>>>>>> >>>>>>>>> Hi all, >>>>>>>>> >>>>>>>>> regarding the recent threads on the mailing list, I would like to >>>>>>>>> start >>>>>>>>> a format discussion around the IO. >>>>>>>>> As we can expect the first contributions on this area (I already >>>>>>>>> >>>>>>>> have >>> >>>> some work in progress around this ;)), I think it's a fair >>>>>>>>> >>>>>>>> discussion >>> >>>> >>>>>> to >>>>>> >>>>>>> >>>>>>>>> have. >>>>>>>>> >>>>>>>>> Now, we have two kinds of IO: the one "generic" to Beam, the one >>>>>>>>> >>>>>>>> >>>>>> "local" >>>>>> >>>>>>> >>>>>>>>> to the runners. >>>>>>>>> >>>>>>>>> For example, let's take Kafka: we have the KafkaIO (in IO), and for >>>>>>>>> instance, we have the spark-streaming kafka connector (in Spark >>>>>>>>> >>>>>>>> >>>>>> Runner). >>>>>> >>>>>>> >>>>>>>>> >>>>>>>>> Right now, we have two approaches for the user: >>>>>>>>> 1. In the pipeline, we use KafkaIO from Beam: it's the preferred >>>>>>>>> approach for sure. However, the user may want to use the runner >>>>>>>>> >>>>>>>> >>>>>> specific >>>>>> >>>>>>> >>>>>>>>> IO for two reasons: >>>>>>>>> * Beam doesn't provide the IO yet (for instance, spark >>>>>>>>> cassandra >>>>>>>>> connector is available whereas we don't have yet any CassandraIO >>>>>>>>> >>>>>>>> (I'm >>> >>>> working on it anyway ;)) >>>>>>>>> * The runner native IO is optimized or contain more >>>>>>>>> features >>>>>>>>> >>>>>>>> >>>>>> that the >>>>>> >>>>>>> >>>>>>>>> Beam native IO >>>>>>>>> 2. So, for the previous reasons, the user could want to use the >>>>>>>>> >>>>>>>> native >>> >>>> runner IO. The drawback of this approach is that the pipeline will >>>>>>>>> >>>>>>>> be >>> >>>> tight to a specific runner, which is completely against the Beam >>>>>>>>> >>>>>>>> >>>>>> design. >>>>>> >>>>>>> >>>>>>>>> >>>>>>>>> I wonder if it wouldn't make sense to add flag on the IO API (and >>>>>>>>> related on Runner API) like .useNative(). >>>>>>>>> >>>>>>>>> For instance, the user would be able to do: >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>> >>>>>> >>> pipeline.apply(KafkaIO.read().withBootstrapServers("...").withTopics("...").useNative(true); >>> >>>> >>>>>>>>> >>>>>>>>> then, if the runner has a "native" IO, it will use it, else, if >>>>>>>>> useNative(false) (the default), it won't use any runner native IO. >>>>>>>>> >>>>>>>>> The point there is for the configuration: assuming the Beam IO and >>>>>>>>> >>>>>>>> the >>> >>>> runner IO can differ, it means that the "Beam IO" would have to >>>>>>>>> >>>>>>>> >>>>>> populate >>>>>> >>>>>>> >>>>>>>>> all runner specific IO configuration. >>>>>>>>> >>>>>>>>> Of course, it's always possible to use a PTransform to wrap the >>>>>>>>> >>>>>>>> runner >>> >>>> native IO, but we are back on the same concern: the pipeline will be >>>>>>>>> couple to a specific runner. >>>>>>>>> >>>>>>>>> The purpose of the useNative() flag is to "automatically" inform >>>>>>>>> the >>>>>>>>> runner to use a specific IO if it has one: the pipeline stays >>>>>>>>> decoupled >>>>>>>>> from the runners. >>>>>>>>> >>>>>>>>> Thoughts ? >>>>>>>>> >>>>>>>>> Thanks >>>>>>>>> Regards >>>>>>>>> JB >>>>>>>>> >>>>>>>>> >>>>>>>> -- >>>>>>>> Jean-Baptiste Onofré >>>>>>>> jbono...@apache.org >>>>>>>> http://blog.nanthrax.net >>>>>>>> Talend - http://www.talend.com >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>> -- >>>>>> Jean-Baptiste Onofré >>>>>> jbono...@apache.org >>>>>> http://blog.nanthrax.net >>>>>> Talend - http://www.talend.com >>>>>> >>>>>> >>>>> >>>> -- >>>> Jean-Baptiste Onofré >>>> jbono...@apache.org >>>> http://blog.nanthrax.net >>>> Talend - http://www.talend.com >>>> >>> >>> >> > -- > Jean-Baptiste Onofré > jbono...@apache.org > http://blog.nanthrax.net > Talend - http://www.talend.com >