Hi,
I think the fact that KafkaIO has a <key, value> model comes from Kafka
having a <key, value> model. I imagine most sources will emit the type of
values appropriate for them.

I agree with Max that the lack of an UnboundedSink seems strange. Do we
have any "sinks" implemented as a ParDo already?

Cheers,
Aljoscha

On Fri, 29 Apr 2016 at 11:22 Jean-Baptiste Onofré <j...@nanthrax.net> wrote:

> Hi Max,
>
> Your four points are valid and we already discussed about that.
>
> 1. +1, the runner API should bring utils around that
> 2. UnboundedSink has been discussed (I don't see strong use case for
> now, as it takes a PCollection).
> 3. +1, Dan talked about improving the hierarchy.
> 4. +1, I'm working on new IO (JMS, MQTT, JDBC, Cassandra, Camel, ...).
>
> I would add:
>
> 5. Add a page on the website listing the IO, their usage and
> configuration. Something like we have in Camel:
> http://camel.apache.org/components.html
> 6. Refactore the FileIO to avoid usage of IOChannelFactory and use a
> filesystem plugin (file, hdfs, s3, aws, etc).
>
> Dan planned to create "util" to deal with watermark and timestamps.
>
> Regards
> JB
>
> On 04/29/2016 11:11 AM, Maximilian Michels wrote:
> > @Amir: This is the Developer mailing list. Please post your questions
> > regarding Beam on the user mailing list.
> >
> > +1 for portability in general. However, I see some crucial TODOs coming
> up:
> >
> > 1) Improving the integration of Runners with the Beam sink/source API
> > 2) Providing interfaces to implement new connectors (i.e. still no
> > existing UnboundedSink)
> > 3) Extending existing interfaces to ease implementation of connectors
> > and provide a uniform API (i.e. on top of UnboundedSource)
> > 4) Working on a more complete set of connectors in Beam
> >
> > Looking at the KafkaIO implementation, I wonder shouldn't we extract
> > the custom Watermark and Timestamp function into an extra interface?
> > All connectors are going to have methods these methods. It would be
> > nice to have a uniform API among the connectors.
> >
> > Further, the KafkaIO enforces a <key, value> data model which AFAIK is
> > not enforced by the Beam model. I don't know the details for this
> > design decision but I would like this to be communicated before it is
> > merged into the master. Otherwise, it will be harder to achieve
> > portability among the Runners. In Flink and Spark we are already
> > experienced with all kinds of connectors and user's needs. It would be
> > nice to feed that back in the course of adding new connectors to the
> > Beam API.
> >
> > I would expect an active discussion on the Dev mailing list before any
> > new connector API gets merged. Furthermore, let us provide better
> > interfaces for connector needs. Finally, let us introduce unbounded
> > sinks :)
> >
> > On Fri, Apr 29, 2016 at 7:54 AM, amir bahmanyari
> > <amirto...@yahoo.com.invalid> wrote:
> >> This may help trace it:Exception in thread "main"
> java.lang.IllegalStateException: no evaluator registered for
> Read(UnboundedKafkaSource) at
> org.apache.beam.sdk.runners.DirectPipelineRunner$Evaluator.visitTransform(DirectPipelineRunner.java:898)
> at
> org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:221)
> at
> org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:217)
> at
> org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:217)
> at
> org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:104)
> at org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:261) at
> org.apache.beam.sdk.runners.DirectPipelineRunner$Evaluator.run(DirectPipelineRunner.java:860)
> at
> org.apache.beam.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:572)
> at
> org.apache.beam.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:106)
> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:182) at benchmark.fli
>  n
> kspark.flink.ReadFromKafka2.main(ReadFromKafka2.java:286)
> >>
> >>        From: Jean-Baptiste Onofré <j...@nanthrax.net>
> >>   To: dev@beam.incubator.apache.org
> >>   Sent: Thursday, April 28, 2016 10:08 PM
> >>   Subject: Re: [DISCUSS] Beam IO &runners native IO
> >>
> >> I gonna take a look. The DirectPipelineRunner didn't support the
> >> unbounded collection (it has been fixed last night AFAIR). It could be
> >> related.
> >>
> >> Regards
> >> JB
> >>
> >> On 04/29/2016 07:00 AM, amir bahmanyari wrote:
> >>> Hi JB,I used the sample KafkaIO usage
> below.p.apply(KafkaIO.read().withBootstrapServers("kafkahost:9092").withTopics(topics));
> >>> p.run();
> >>>
> >>> It threw the following at p.run():Exception in thread "main"
> java.lang.IllegalStateException: no evaluator registered for
> Read(UnboundedKafkaSource) at
> org.apache.beam.sdk.runners.DirectPipelineRunner$Evaluator.visitTransform(DirectPipelineRunner.java:898)
> I am sure I am missing something.Would be great if I could see a sample
> code.I appreciate it sir.Cheers
> >>>
> >>>         From: Jean-Baptiste Onofré <j...@nanthrax.net>
> >>>   To: dev@beam.incubator.apache.org
> >>>   Sent: Thursday, April 28, 2016 5:30 AM
> >>>   Subject: [DISCUSS] Beam IO &runners native IO
> >>>
> >>> Hi all,
> >>>
> >>> regarding the recent threads on the mailing list, I would like to start
> >>> a format discussion around the IO.
> >>> As we can expect the first contributions on this area (I already have
> >>> some work in progress around this ;)), I think it's a fair discussion
> to
> >>> have.
> >>>
> >>> Now, we have two kinds of IO: the one "generic" to Beam, the one
> "local"
> >>> to the runners.
> >>>
> >>> For example, let's take Kafka: we have the KafkaIO (in IO), and for
> >>> instance, we have the spark-streaming kafka connector (in Spark
> Runner).
> >>>
> >>> Right now, we have two approaches for the user:
> >>> 1. In the pipeline, we use KafkaIO from Beam: it's the preferred
> >>> approach for sure. However, the user may want to use the runner
> specific
> >>> IO for two reasons:
> >>>       * Beam doesn't provide the IO yet (for instance, spark cassandra
> >>> connector is available whereas we don't have yet any CassandraIO (I'm
> >>> working on it anyway ;))
> >>>       * The runner native IO is optimized or contain more features
> that the
> >>> Beam native IO
> >>> 2. So, for the previous reasons, the user could want to use the native
> >>> runner IO. The drawback of this approach is that the pipeline will be
> >>> tight to a specific runner, which is completely against the Beam
> design.
> >>>
> >>> I wonder if it wouldn't make sense to add flag on the IO API (and
> >>> related on Runner API) like .useNative().
> >>>
> >>> For instance, the user would be able to do:
> >>>
> >>>
> pipeline.apply(KafkaIO.read().withBootstrapServers("...").withTopics("...").useNative(true);
> >>>
> >>> then, if the runner has a "native" IO, it will use it, else, if
> >>> useNative(false) (the default), it won't use any runner native IO.
> >>>
> >>> The point there is for the configuration: assuming the Beam IO and the
> >>> runner IO can differ, it means that the "Beam IO" would have to
> populate
> >>> all runner specific IO configuration.
> >>>
> >>> Of course, it's always possible to use a PTransform to wrap the runner
> >>> native IO, but we are back on the same concern: the pipeline will be
> >>> couple to a specific runner.
> >>>
> >>> The purpose of the useNative() flag is to "automatically" inform the
> >>> runner to use a specific IO if it has one: the pipeline stays decoupled
> >>> from the runners.
> >>>
> >>> Thoughts ?
> >>>
> >>> Thanks
> >>> Regards
> >>> JB
> >>>
> >>
> >> --
> >> Jean-Baptiste Onofré
> >> jbono...@apache.org
> >> http://blog.nanthrax.net
> >> Talend - http://www.talend.com
> >>
> >>
> >>
>
> --
> Jean-Baptiste Onofré
> jbono...@apache.org
> http://blog.nanthrax.net
> Talend - http://www.talend.com
>

Reply via email to