+1 on Max's comment on active discussion for connector API. I think that we can use Spark and Flink's existing connectors (Spark supports many) as test cases and consider a bottom-up design approach rather than top-down, especially since we're in incubation.
Also +1 for Davor that runners should make SDK IOs a high priority - this will probably give us more input then any good email thread.. I'll do my best to make this happen for the Spark runner. On Fri, Apr 29, 2016 at 1:45 PM Maximilian Michels <m...@apache.org> wrote: > @Aljoscha I didn't know that Kafka always stores Key/Value but I see > that we also have support for setting Kafka keys in Flink. > > @JB I get your point that a sink is simply a DoFn, but a ParDo is not > a good match for a sink. A Sink doesn't produce a PCollection but > represents the end of a pipeline. Like an UnboundedSource, an > UnboundedSink has also special needs, i.e. it needs to provide a > checkpointing mechanism. I think we need something along the lines of > the existing Write transform for batch. > > On Fri, Apr 29, 2016 at 12:27 PM, Jean-Baptiste Onofré <j...@nanthrax.net> > wrote: > > Hi, > > > > KafkaIO uses KafkaRecord which is basically key,value + some metadata > > (topic, partition, offset). > > > > Can you describe the behavior of an UnboundedSink ? > > > > UnboundedSource is obvious: it's still consuming data creating > PCollection > > sent into the pipeline. > > > > But UnboundedSink ? Do you mean that the UnboundedSink will write each > > record in the PCollection ? When does it stop ? > > > > Regards > > JB > > > > > > On 04/29/2016 12:07 PM, Aljoscha Krettek wrote: > >> > >> Hi, > >> I think the fact that KafkaIO has a <key, value> model comes from Kafka > >> having a <key, value> model. I imagine most sources will emit the type > of > >> values appropriate for them. > >> > >> I agree with Max that the lack of an UnboundedSink seems strange. Do we > >> have any "sinks" implemented as a ParDo already? > >> > >> Cheers, > >> Aljoscha > >> > >> On Fri, 29 Apr 2016 at 11:22 Jean-Baptiste Onofré <j...@nanthrax.net> > wrote: > >> > >>> Hi Max, > >>> > >>> Your four points are valid and we already discussed about that. > >>> > >>> 1. +1, the runner API should bring utils around that > >>> 2. UnboundedSink has been discussed (I don't see strong use case for > >>> now, as it takes a PCollection). > >>> 3. +1, Dan talked about improving the hierarchy. > >>> 4. +1, I'm working on new IO (JMS, MQTT, JDBC, Cassandra, Camel, ...). > >>> > >>> I would add: > >>> > >>> 5. Add a page on the website listing the IO, their usage and > >>> configuration. Something like we have in Camel: > >>> http://camel.apache.org/components.html > >>> 6. Refactore the FileIO to avoid usage of IOChannelFactory and use a > >>> filesystem plugin (file, hdfs, s3, aws, etc). > >>> > >>> Dan planned to create "util" to deal with watermark and timestamps. > >>> > >>> Regards > >>> JB > >>> > >>> On 04/29/2016 11:11 AM, Maximilian Michels wrote: > >>>> > >>>> @Amir: This is the Developer mailing list. Please post your questions > >>>> regarding Beam on the user mailing list. > >>>> > >>>> +1 for portability in general. However, I see some crucial TODOs > coming > >>> > >>> up: > >>>> > >>>> > >>>> 1) Improving the integration of Runners with the Beam sink/source API > >>>> 2) Providing interfaces to implement new connectors (i.e. still no > >>>> existing UnboundedSink) > >>>> 3) Extending existing interfaces to ease implementation of connectors > >>>> and provide a uniform API (i.e. on top of UnboundedSource) > >>>> 4) Working on a more complete set of connectors in Beam > >>>> > >>>> Looking at the KafkaIO implementation, I wonder shouldn't we extract > >>>> the custom Watermark and Timestamp function into an extra interface? > >>>> All connectors are going to have methods these methods. It would be > >>>> nice to have a uniform API among the connectors. > >>>> > >>>> Further, the KafkaIO enforces a <key, value> data model which AFAIK is > >>>> not enforced by the Beam model. I don't know the details for this > >>>> design decision but I would like this to be communicated before it is > >>>> merged into the master. Otherwise, it will be harder to achieve > >>>> portability among the Runners. In Flink and Spark we are already > >>>> experienced with all kinds of connectors and user's needs. It would be > >>>> nice to feed that back in the course of adding new connectors to the > >>>> Beam API. > >>>> > >>>> I would expect an active discussion on the Dev mailing list before any > >>>> new connector API gets merged. Furthermore, let us provide better > >>>> interfaces for connector needs. Finally, let us introduce unbounded > >>>> sinks :) > >>>> > >>>> On Fri, Apr 29, 2016 at 7:54 AM, amir bahmanyari > >>>> <amirto...@yahoo.com.invalid> wrote: > >>>>> > >>>>> This may help trace it:Exception in thread "main" > >>> > >>> java.lang.IllegalStateException: no evaluator registered for > >>> Read(UnboundedKafkaSource) at > >>> > >>> > org.apache.beam.sdk.runners.DirectPipelineRunner$Evaluator.visitTransform(DirectPipelineRunner.java:898) > >>> at > >>> > >>> > org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:221) > >>> at > >>> > >>> > org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:217) > >>> at > >>> > >>> > org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:217) > >>> at > >>> > >>> > org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:104) > >>> at > org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:261) > >>> at > >>> > >>> > org.apache.beam.sdk.runners.DirectPipelineRunner$Evaluator.run(DirectPipelineRunner.java:860) > >>> at > >>> > >>> > org.apache.beam.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:572) > >>> at > >>> > >>> > org.apache.beam.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:106) > >>> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:182) at benchmark.fli > >>> n > >>> kspark.flink.ReadFromKafka2.main(ReadFromKafka2.java:286) > >>>>> > >>>>> > >>>>> From: Jean-Baptiste Onofré <j...@nanthrax.net> > >>>>> To: dev@beam.incubator.apache.org > >>>>> Sent: Thursday, April 28, 2016 10:08 PM > >>>>> Subject: Re: [DISCUSS] Beam IO &runners native IO > >>>>> > >>>>> I gonna take a look. The DirectPipelineRunner didn't support the > >>>>> unbounded collection (it has been fixed last night AFAIR). It could > be > >>>>> related. > >>>>> > >>>>> Regards > >>>>> JB > >>>>> > >>>>> On 04/29/2016 07:00 AM, amir bahmanyari wrote: > >>>>>> > >>>>>> Hi JB,I used the sample KafkaIO usage > >>> > >>> > >>> > below.p.apply(KafkaIO.read().withBootstrapServers("kafkahost:9092").withTopics(topics)); > >>>>>> > >>>>>> p.run(); > >>>>>> > >>>>>> It threw the following at p.run():Exception in thread "main" > >>> > >>> java.lang.IllegalStateException: no evaluator registered for > >>> Read(UnboundedKafkaSource) at > >>> > >>> > org.apache.beam.sdk.runners.DirectPipelineRunner$Evaluator.visitTransform(DirectPipelineRunner.java:898) > >>> I am sure I am missing something.Would be great if I could see a sample > >>> code.I appreciate it sir.Cheers > >>>>>> > >>>>>> > >>>>>> From: Jean-Baptiste Onofré <j...@nanthrax.net> > >>>>>> To: dev@beam.incubator.apache.org > >>>>>> Sent: Thursday, April 28, 2016 5:30 AM > >>>>>> Subject: [DISCUSS] Beam IO &runners native IO > >>>>>> > >>>>>> Hi all, > >>>>>> > >>>>>> regarding the recent threads on the mailing list, I would like to > >>>>>> start > >>>>>> a format discussion around the IO. > >>>>>> As we can expect the first contributions on this area (I already > have > >>>>>> some work in progress around this ;)), I think it's a fair > discussion > >>> > >>> to > >>>>>> > >>>>>> have. > >>>>>> > >>>>>> Now, we have two kinds of IO: the one "generic" to Beam, the one > >>> > >>> "local" > >>>>>> > >>>>>> to the runners. > >>>>>> > >>>>>> For example, let's take Kafka: we have the KafkaIO (in IO), and for > >>>>>> instance, we have the spark-streaming kafka connector (in Spark > >>> > >>> Runner). > >>>>>> > >>>>>> > >>>>>> Right now, we have two approaches for the user: > >>>>>> 1. In the pipeline, we use KafkaIO from Beam: it's the preferred > >>>>>> approach for sure. However, the user may want to use the runner > >>> > >>> specific > >>>>>> > >>>>>> IO for two reasons: > >>>>>> * Beam doesn't provide the IO yet (for instance, spark > >>>>>> cassandra > >>>>>> connector is available whereas we don't have yet any CassandraIO > (I'm > >>>>>> working on it anyway ;)) > >>>>>> * The runner native IO is optimized or contain more features > >>> > >>> that the > >>>>>> > >>>>>> Beam native IO > >>>>>> 2. So, for the previous reasons, the user could want to use the > native > >>>>>> runner IO. The drawback of this approach is that the pipeline will > be > >>>>>> tight to a specific runner, which is completely against the Beam > >>> > >>> design. > >>>>>> > >>>>>> > >>>>>> I wonder if it wouldn't make sense to add flag on the IO API (and > >>>>>> related on Runner API) like .useNative(). > >>>>>> > >>>>>> For instance, the user would be able to do: > >>>>>> > >>>>>> > >>> > >>> > pipeline.apply(KafkaIO.read().withBootstrapServers("...").withTopics("...").useNative(true); > >>>>>> > >>>>>> > >>>>>> then, if the runner has a "native" IO, it will use it, else, if > >>>>>> useNative(false) (the default), it won't use any runner native IO. > >>>>>> > >>>>>> The point there is for the configuration: assuming the Beam IO and > the > >>>>>> runner IO can differ, it means that the "Beam IO" would have to > >>> > >>> populate > >>>>>> > >>>>>> all runner specific IO configuration. > >>>>>> > >>>>>> Of course, it's always possible to use a PTransform to wrap the > runner > >>>>>> native IO, but we are back on the same concern: the pipeline will be > >>>>>> couple to a specific runner. > >>>>>> > >>>>>> The purpose of the useNative() flag is to "automatically" inform the > >>>>>> runner to use a specific IO if it has one: the pipeline stays > >>>>>> decoupled > >>>>>> from the runners. > >>>>>> > >>>>>> Thoughts ? > >>>>>> > >>>>>> Thanks > >>>>>> Regards > >>>>>> JB > >>>>>> > >>>>> > >>>>> -- > >>>>> Jean-Baptiste Onofré > >>>>> jbono...@apache.org > >>>>> http://blog.nanthrax.net > >>>>> Talend - http://www.talend.com > >>>>> > >>>>> > >>>>> > >>> > >>> -- > >>> Jean-Baptiste Onofré > >>> jbono...@apache.org > >>> http://blog.nanthrax.net > >>> Talend - http://www.talend.com > >>> > >> > > > > -- > > Jean-Baptiste Onofré > > jbono...@apache.org > > http://blog.nanthrax.net > > Talend - http://www.talend.com >