Hi, I think the fact that KafkaIO has a <key, value> model comes from Kafka having a <key, value> model. I imagine most sources will emit the type of values appropriate for them.
I agree with Max that the lack of an UnboundedSink seems strange. Do we have any "sinks" implemented as a ParDo already? Cheers, Aljoscha On Fri, 29 Apr 2016 at 11:22 Jean-Baptiste Onofré <j...@nanthrax.net> wrote: > Hi Max, > > Your four points are valid and we already discussed about that. > > 1. +1, the runner API should bring utils around that > 2. UnboundedSink has been discussed (I don't see strong use case for > now, as it takes a PCollection). > 3. +1, Dan talked about improving the hierarchy. > 4. +1, I'm working on new IO (JMS, MQTT, JDBC, Cassandra, Camel, ...). > > I would add: > > 5. Add a page on the website listing the IO, their usage and > configuration. Something like we have in Camel: > http://camel.apache.org/components.html > 6. Refactore the FileIO to avoid usage of IOChannelFactory and use a > filesystem plugin (file, hdfs, s3, aws, etc). > > Dan planned to create "util" to deal with watermark and timestamps. > > Regards > JB > > On 04/29/2016 11:11 AM, Maximilian Michels wrote: > > @Amir: This is the Developer mailing list. Please post your questions > > regarding Beam on the user mailing list. > > > > +1 for portability in general. However, I see some crucial TODOs coming > up: > > > > 1) Improving the integration of Runners with the Beam sink/source API > > 2) Providing interfaces to implement new connectors (i.e. still no > > existing UnboundedSink) > > 3) Extending existing interfaces to ease implementation of connectors > > and provide a uniform API (i.e. on top of UnboundedSource) > > 4) Working on a more complete set of connectors in Beam > > > > Looking at the KafkaIO implementation, I wonder shouldn't we extract > > the custom Watermark and Timestamp function into an extra interface? > > All connectors are going to have methods these methods. It would be > > nice to have a uniform API among the connectors. > > > > Further, the KafkaIO enforces a <key, value> data model which AFAIK is > > not enforced by the Beam model. I don't know the details for this > > design decision but I would like this to be communicated before it is > > merged into the master. Otherwise, it will be harder to achieve > > portability among the Runners. In Flink and Spark we are already > > experienced with all kinds of connectors and user's needs. It would be > > nice to feed that back in the course of adding new connectors to the > > Beam API. > > > > I would expect an active discussion on the Dev mailing list before any > > new connector API gets merged. Furthermore, let us provide better > > interfaces for connector needs. Finally, let us introduce unbounded > > sinks :) > > > > On Fri, Apr 29, 2016 at 7:54 AM, amir bahmanyari > > <amirto...@yahoo.com.invalid> wrote: > >> This may help trace it:Exception in thread "main" > java.lang.IllegalStateException: no evaluator registered for > Read(UnboundedKafkaSource) at > org.apache.beam.sdk.runners.DirectPipelineRunner$Evaluator.visitTransform(DirectPipelineRunner.java:898) > at > org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:221) > at > org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:217) > at > org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:217) > at > org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:104) > at org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:261) at > org.apache.beam.sdk.runners.DirectPipelineRunner$Evaluator.run(DirectPipelineRunner.java:860) > at > org.apache.beam.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:572) > at > org.apache.beam.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:106) > at org.apache.beam.sdk.Pipeline.run(Pipeline.java:182) at benchmark.fli > n > kspark.flink.ReadFromKafka2.main(ReadFromKafka2.java:286) > >> > >> From: Jean-Baptiste Onofré <j...@nanthrax.net> > >> To: dev@beam.incubator.apache.org > >> Sent: Thursday, April 28, 2016 10:08 PM > >> Subject: Re: [DISCUSS] Beam IO &runners native IO > >> > >> I gonna take a look. The DirectPipelineRunner didn't support the > >> unbounded collection (it has been fixed last night AFAIR). It could be > >> related. > >> > >> Regards > >> JB > >> > >> On 04/29/2016 07:00 AM, amir bahmanyari wrote: > >>> Hi JB,I used the sample KafkaIO usage > below.p.apply(KafkaIO.read().withBootstrapServers("kafkahost:9092").withTopics(topics)); > >>> p.run(); > >>> > >>> It threw the following at p.run():Exception in thread "main" > java.lang.IllegalStateException: no evaluator registered for > Read(UnboundedKafkaSource) at > org.apache.beam.sdk.runners.DirectPipelineRunner$Evaluator.visitTransform(DirectPipelineRunner.java:898) > I am sure I am missing something.Would be great if I could see a sample > code.I appreciate it sir.Cheers > >>> > >>> From: Jean-Baptiste Onofré <j...@nanthrax.net> > >>> To: dev@beam.incubator.apache.org > >>> Sent: Thursday, April 28, 2016 5:30 AM > >>> Subject: [DISCUSS] Beam IO &runners native IO > >>> > >>> Hi all, > >>> > >>> regarding the recent threads on the mailing list, I would like to start > >>> a format discussion around the IO. > >>> As we can expect the first contributions on this area (I already have > >>> some work in progress around this ;)), I think it's a fair discussion > to > >>> have. > >>> > >>> Now, we have two kinds of IO: the one "generic" to Beam, the one > "local" > >>> to the runners. > >>> > >>> For example, let's take Kafka: we have the KafkaIO (in IO), and for > >>> instance, we have the spark-streaming kafka connector (in Spark > Runner). > >>> > >>> Right now, we have two approaches for the user: > >>> 1. In the pipeline, we use KafkaIO from Beam: it's the preferred > >>> approach for sure. However, the user may want to use the runner > specific > >>> IO for two reasons: > >>> * Beam doesn't provide the IO yet (for instance, spark cassandra > >>> connector is available whereas we don't have yet any CassandraIO (I'm > >>> working on it anyway ;)) > >>> * The runner native IO is optimized or contain more features > that the > >>> Beam native IO > >>> 2. So, for the previous reasons, the user could want to use the native > >>> runner IO. The drawback of this approach is that the pipeline will be > >>> tight to a specific runner, which is completely against the Beam > design. > >>> > >>> I wonder if it wouldn't make sense to add flag on the IO API (and > >>> related on Runner API) like .useNative(). > >>> > >>> For instance, the user would be able to do: > >>> > >>> > pipeline.apply(KafkaIO.read().withBootstrapServers("...").withTopics("...").useNative(true); > >>> > >>> then, if the runner has a "native" IO, it will use it, else, if > >>> useNative(false) (the default), it won't use any runner native IO. > >>> > >>> The point there is for the configuration: assuming the Beam IO and the > >>> runner IO can differ, it means that the "Beam IO" would have to > populate > >>> all runner specific IO configuration. > >>> > >>> Of course, it's always possible to use a PTransform to wrap the runner > >>> native IO, but we are back on the same concern: the pipeline will be > >>> couple to a specific runner. > >>> > >>> The purpose of the useNative() flag is to "automatically" inform the > >>> runner to use a specific IO if it has one: the pipeline stays decoupled > >>> from the runners. > >>> > >>> Thoughts ? > >>> > >>> Thanks > >>> Regards > >>> JB > >>> > >> > >> -- > >> Jean-Baptiste Onofré > >> jbono...@apache.org > >> http://blog.nanthrax.net > >> Talend - http://www.talend.com > >> > >> > >> > > -- > Jean-Baptiste Onofré > jbono...@apache.org > http://blog.nanthrax.net > Talend - http://www.talend.com >