+1 on Max's comment on active discussion for connector API.
I think that we can use Spark and Flink's existing connectors (Spark
supports many) as test cases and consider a bottom-up design approach
rather than top-down, especially since we're in incubation.

Also +1 for Davor that runners should make SDK IOs a high priority - this
will probably give us more input then any good email thread.. I'll do my
best to make this happen for the Spark runner.

On Fri, Apr 29, 2016 at 1:45 PM Maximilian Michels <m...@apache.org> wrote:

> @Aljoscha I didn't know that Kafka always stores Key/Value but I see
> that we also have support for setting Kafka keys in Flink.
>
> @JB I get your point that a sink is simply a DoFn, but a ParDo is not
> a good match for a sink. A Sink doesn't produce a PCollection but
> represents the end of a pipeline. Like an UnboundedSource, an
> UnboundedSink has also special needs, i.e. it needs to provide a
> checkpointing mechanism. I think we need something along the lines of
> the existing Write transform for batch.
>
> On Fri, Apr 29, 2016 at 12:27 PM, Jean-Baptiste Onofré <j...@nanthrax.net>
> wrote:
> > Hi,
> >
> > KafkaIO uses KafkaRecord which is basically key,value + some metadata
> > (topic, partition, offset).
> >
> > Can you describe the behavior of an UnboundedSink ?
> >
> > UnboundedSource is obvious: it's still consuming data creating
> PCollection
> > sent into the pipeline.
> >
> > But UnboundedSink ? Do you mean that the UnboundedSink will write each
> > record in the PCollection ? When does it stop ?
> >
> > Regards
> > JB
> >
> >
> > On 04/29/2016 12:07 PM, Aljoscha Krettek wrote:
> >>
> >> Hi,
> >> I think the fact that KafkaIO has a <key, value> model comes from Kafka
> >> having a <key, value> model. I imagine most sources will emit the type
> of
> >> values appropriate for them.
> >>
> >> I agree with Max that the lack of an UnboundedSink seems strange. Do we
> >> have any "sinks" implemented as a ParDo already?
> >>
> >> Cheers,
> >> Aljoscha
> >>
> >> On Fri, 29 Apr 2016 at 11:22 Jean-Baptiste Onofré <j...@nanthrax.net>
> wrote:
> >>
> >>> Hi Max,
> >>>
> >>> Your four points are valid and we already discussed about that.
> >>>
> >>> 1. +1, the runner API should bring utils around that
> >>> 2. UnboundedSink has been discussed (I don't see strong use case for
> >>> now, as it takes a PCollection).
> >>> 3. +1, Dan talked about improving the hierarchy.
> >>> 4. +1, I'm working on new IO (JMS, MQTT, JDBC, Cassandra, Camel, ...).
> >>>
> >>> I would add:
> >>>
> >>> 5. Add a page on the website listing the IO, their usage and
> >>> configuration. Something like we have in Camel:
> >>> http://camel.apache.org/components.html
> >>> 6. Refactore the FileIO to avoid usage of IOChannelFactory and use a
> >>> filesystem plugin (file, hdfs, s3, aws, etc).
> >>>
> >>> Dan planned to create "util" to deal with watermark and timestamps.
> >>>
> >>> Regards
> >>> JB
> >>>
> >>> On 04/29/2016 11:11 AM, Maximilian Michels wrote:
> >>>>
> >>>> @Amir: This is the Developer mailing list. Please post your questions
> >>>> regarding Beam on the user mailing list.
> >>>>
> >>>> +1 for portability in general. However, I see some crucial TODOs
> coming
> >>>
> >>> up:
> >>>>
> >>>>
> >>>> 1) Improving the integration of Runners with the Beam sink/source API
> >>>> 2) Providing interfaces to implement new connectors (i.e. still no
> >>>> existing UnboundedSink)
> >>>> 3) Extending existing interfaces to ease implementation of connectors
> >>>> and provide a uniform API (i.e. on top of UnboundedSource)
> >>>> 4) Working on a more complete set of connectors in Beam
> >>>>
> >>>> Looking at the KafkaIO implementation, I wonder shouldn't we extract
> >>>> the custom Watermark and Timestamp function into an extra interface?
> >>>> All connectors are going to have methods these methods. It would be
> >>>> nice to have a uniform API among the connectors.
> >>>>
> >>>> Further, the KafkaIO enforces a <key, value> data model which AFAIK is
> >>>> not enforced by the Beam model. I don't know the details for this
> >>>> design decision but I would like this to be communicated before it is
> >>>> merged into the master. Otherwise, it will be harder to achieve
> >>>> portability among the Runners. In Flink and Spark we are already
> >>>> experienced with all kinds of connectors and user's needs. It would be
> >>>> nice to feed that back in the course of adding new connectors to the
> >>>> Beam API.
> >>>>
> >>>> I would expect an active discussion on the Dev mailing list before any
> >>>> new connector API gets merged. Furthermore, let us provide better
> >>>> interfaces for connector needs. Finally, let us introduce unbounded
> >>>> sinks :)
> >>>>
> >>>> On Fri, Apr 29, 2016 at 7:54 AM, amir bahmanyari
> >>>> <amirto...@yahoo.com.invalid> wrote:
> >>>>>
> >>>>> This may help trace it:Exception in thread "main"
> >>>
> >>> java.lang.IllegalStateException: no evaluator registered for
> >>> Read(UnboundedKafkaSource) at
> >>>
> >>>
> org.apache.beam.sdk.runners.DirectPipelineRunner$Evaluator.visitTransform(DirectPipelineRunner.java:898)
> >>> at
> >>>
> >>>
> org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:221)
> >>> at
> >>>
> >>>
> org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:217)
> >>> at
> >>>
> >>>
> org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:217)
> >>> at
> >>>
> >>>
> org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:104)
> >>> at
> org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:261)
> >>> at
> >>>
> >>>
> org.apache.beam.sdk.runners.DirectPipelineRunner$Evaluator.run(DirectPipelineRunner.java:860)
> >>> at
> >>>
> >>>
> org.apache.beam.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:572)
> >>> at
> >>>
> >>>
> org.apache.beam.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:106)
> >>> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:182) at benchmark.fli
> >>>   n
> >>> kspark.flink.ReadFromKafka2.main(ReadFromKafka2.java:286)
> >>>>>
> >>>>>
> >>>>>         From: Jean-Baptiste Onofré <j...@nanthrax.net>
> >>>>>    To: dev@beam.incubator.apache.org
> >>>>>    Sent: Thursday, April 28, 2016 10:08 PM
> >>>>>    Subject: Re: [DISCUSS] Beam IO &runners native IO
> >>>>>
> >>>>> I gonna take a look. The DirectPipelineRunner didn't support the
> >>>>> unbounded collection (it has been fixed last night AFAIR). It could
> be
> >>>>> related.
> >>>>>
> >>>>> Regards
> >>>>> JB
> >>>>>
> >>>>> On 04/29/2016 07:00 AM, amir bahmanyari wrote:
> >>>>>>
> >>>>>> Hi JB,I used the sample KafkaIO usage
> >>>
> >>>
> >>>
> below.p.apply(KafkaIO.read().withBootstrapServers("kafkahost:9092").withTopics(topics));
> >>>>>>
> >>>>>> p.run();
> >>>>>>
> >>>>>> It threw the following at p.run():Exception in thread "main"
> >>>
> >>> java.lang.IllegalStateException: no evaluator registered for
> >>> Read(UnboundedKafkaSource) at
> >>>
> >>>
> org.apache.beam.sdk.runners.DirectPipelineRunner$Evaluator.visitTransform(DirectPipelineRunner.java:898)
> >>> I am sure I am missing something.Would be great if I could see a sample
> >>> code.I appreciate it sir.Cheers
> >>>>>>
> >>>>>>
> >>>>>>          From: Jean-Baptiste Onofré <j...@nanthrax.net>
> >>>>>>    To: dev@beam.incubator.apache.org
> >>>>>>    Sent: Thursday, April 28, 2016 5:30 AM
> >>>>>>    Subject: [DISCUSS] Beam IO &runners native IO
> >>>>>>
> >>>>>> Hi all,
> >>>>>>
> >>>>>> regarding the recent threads on the mailing list, I would like to
> >>>>>> start
> >>>>>> a format discussion around the IO.
> >>>>>> As we can expect the first contributions on this area (I already
> have
> >>>>>> some work in progress around this ;)), I think it's a fair
> discussion
> >>>
> >>> to
> >>>>>>
> >>>>>> have.
> >>>>>>
> >>>>>> Now, we have two kinds of IO: the one "generic" to Beam, the one
> >>>
> >>> "local"
> >>>>>>
> >>>>>> to the runners.
> >>>>>>
> >>>>>> For example, let's take Kafka: we have the KafkaIO (in IO), and for
> >>>>>> instance, we have the spark-streaming kafka connector (in Spark
> >>>
> >>> Runner).
> >>>>>>
> >>>>>>
> >>>>>> Right now, we have two approaches for the user:
> >>>>>> 1. In the pipeline, we use KafkaIO from Beam: it's the preferred
> >>>>>> approach for sure. However, the user may want to use the runner
> >>>
> >>> specific
> >>>>>>
> >>>>>> IO for two reasons:
> >>>>>>        * Beam doesn't provide the IO yet (for instance, spark
> >>>>>> cassandra
> >>>>>> connector is available whereas we don't have yet any CassandraIO
> (I'm
> >>>>>> working on it anyway ;))
> >>>>>>        * The runner native IO is optimized or contain more features
> >>>
> >>> that the
> >>>>>>
> >>>>>> Beam native IO
> >>>>>> 2. So, for the previous reasons, the user could want to use the
> native
> >>>>>> runner IO. The drawback of this approach is that the pipeline will
> be
> >>>>>> tight to a specific runner, which is completely against the Beam
> >>>
> >>> design.
> >>>>>>
> >>>>>>
> >>>>>> I wonder if it wouldn't make sense to add flag on the IO API (and
> >>>>>> related on Runner API) like .useNative().
> >>>>>>
> >>>>>> For instance, the user would be able to do:
> >>>>>>
> >>>>>>
> >>>
> >>>
> pipeline.apply(KafkaIO.read().withBootstrapServers("...").withTopics("...").useNative(true);
> >>>>>>
> >>>>>>
> >>>>>> then, if the runner has a "native" IO, it will use it, else, if
> >>>>>> useNative(false) (the default), it won't use any runner native IO.
> >>>>>>
> >>>>>> The point there is for the configuration: assuming the Beam IO and
> the
> >>>>>> runner IO can differ, it means that the "Beam IO" would have to
> >>>
> >>> populate
> >>>>>>
> >>>>>> all runner specific IO configuration.
> >>>>>>
> >>>>>> Of course, it's always possible to use a PTransform to wrap the
> runner
> >>>>>> native IO, but we are back on the same concern: the pipeline will be
> >>>>>> couple to a specific runner.
> >>>>>>
> >>>>>> The purpose of the useNative() flag is to "automatically" inform the
> >>>>>> runner to use a specific IO if it has one: the pipeline stays
> >>>>>> decoupled
> >>>>>> from the runners.
> >>>>>>
> >>>>>> Thoughts ?
> >>>>>>
> >>>>>> Thanks
> >>>>>> Regards
> >>>>>> JB
> >>>>>>
> >>>>>
> >>>>> --
> >>>>> Jean-Baptiste Onofré
> >>>>> jbono...@apache.org
> >>>>> http://blog.nanthrax.net
> >>>>> Talend - http://www.talend.com
> >>>>>
> >>>>>
> >>>>>
> >>>
> >>> --
> >>> Jean-Baptiste Onofré
> >>> jbono...@apache.org
> >>> http://blog.nanthrax.net
> >>> Talend - http://www.talend.com
> >>>
> >>
> >
> > --
> > Jean-Baptiste Onofré
> > jbono...@apache.org
> > http://blog.nanthrax.net
> > Talend - http://www.talend.com
>

Reply via email to