@Amir: This is the Developer mailing list. Please post your questions regarding Beam on the user mailing list.
+1 for portability in general. However, I see some crucial TODOs coming up: 1) Improving the integration of Runners with the Beam sink/source API 2) Providing interfaces to implement new connectors (i.e. still no existing UnboundedSink) 3) Extending existing interfaces to ease implementation of connectors and provide a uniform API (i.e. on top of UnboundedSource) 4) Working on a more complete set of connectors in Beam Looking at the KafkaIO implementation, I wonder shouldn't we extract the custom Watermark and Timestamp function into an extra interface? All connectors are going to have methods these methods. It would be nice to have a uniform API among the connectors. Further, the KafkaIO enforces a <key, value> data model which AFAIK is not enforced by the Beam model. I don't know the details for this design decision but I would like this to be communicated before it is merged into the master. Otherwise, it will be harder to achieve portability among the Runners. In Flink and Spark we are already experienced with all kinds of connectors and user's needs. It would be nice to feed that back in the course of adding new connectors to the Beam API. I would expect an active discussion on the Dev mailing list before any new connector API gets merged. Furthermore, let us provide better interfaces for connector needs. Finally, let us introduce unbounded sinks :) On Fri, Apr 29, 2016 at 7:54 AM, amir bahmanyari <amirto...@yahoo.com.invalid> wrote: > This may help trace it:Exception in thread "main" > java.lang.IllegalStateException: no evaluator registered for > Read(UnboundedKafkaSource) at > org.apache.beam.sdk.runners.DirectPipelineRunner$Evaluator.visitTransform(DirectPipelineRunner.java:898) > at > org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:221) > at > org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:217) > at > org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:217) > at > org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:104) > at org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:261) at > org.apache.beam.sdk.runners.DirectPipelineRunner$Evaluator.run(DirectPipelineRunner.java:860) > at > org.apache.beam.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:572) > at > org.apache.beam.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:106) > at org.apache.beam.sdk.Pipeline.run(Pipeline.java:182) at > benchmark.flinkspark.flink.ReadFromKafka2.main(ReadFromKafka2.java:286) > > From: Jean-Baptiste Onofré <j...@nanthrax.net> > To: dev@beam.incubator.apache.org > Sent: Thursday, April 28, 2016 10:08 PM > Subject: Re: [DISCUSS] Beam IO &runners native IO > > I gonna take a look. The DirectPipelineRunner didn't support the > unbounded collection (it has been fixed last night AFAIR). It could be > related. > > Regards > JB > > On 04/29/2016 07:00 AM, amir bahmanyari wrote: >> Hi JB,I used the sample KafkaIO usage >> below.p.apply(KafkaIO.read().withBootstrapServers("kafkahost:9092").withTopics(topics)); >> p.run(); >> >> It threw the following at p.run():Exception in thread "main" >> java.lang.IllegalStateException: no evaluator registered for >> Read(UnboundedKafkaSource) at >> org.apache.beam.sdk.runners.DirectPipelineRunner$Evaluator.visitTransform(DirectPipelineRunner.java:898) >> I am sure I am missing something.Would be great if I could see a sample >> code.I appreciate it sir.Cheers >> >> From: Jean-Baptiste Onofré <j...@nanthrax.net> >> To: dev@beam.incubator.apache.org >> Sent: Thursday, April 28, 2016 5:30 AM >> Subject: [DISCUSS] Beam IO &runners native IO >> >> Hi all, >> >> regarding the recent threads on the mailing list, I would like to start >> a format discussion around the IO. >> As we can expect the first contributions on this area (I already have >> some work in progress around this ;)), I think it's a fair discussion to >> have. >> >> Now, we have two kinds of IO: the one "generic" to Beam, the one "local" >> to the runners. >> >> For example, let's take Kafka: we have the KafkaIO (in IO), and for >> instance, we have the spark-streaming kafka connector (in Spark Runner). >> >> Right now, we have two approaches for the user: >> 1. In the pipeline, we use KafkaIO from Beam: it's the preferred >> approach for sure. However, the user may want to use the runner specific >> IO for two reasons: >> * Beam doesn't provide the IO yet (for instance, spark cassandra >> connector is available whereas we don't have yet any CassandraIO (I'm >> working on it anyway ;)) >> * The runner native IO is optimized or contain more features that the >> Beam native IO >> 2. So, for the previous reasons, the user could want to use the native >> runner IO. The drawback of this approach is that the pipeline will be >> tight to a specific runner, which is completely against the Beam design. >> >> I wonder if it wouldn't make sense to add flag on the IO API (and >> related on Runner API) like .useNative(). >> >> For instance, the user would be able to do: >> >> pipeline.apply(KafkaIO.read().withBootstrapServers("...").withTopics("...").useNative(true); >> >> then, if the runner has a "native" IO, it will use it, else, if >> useNative(false) (the default), it won't use any runner native IO. >> >> The point there is for the configuration: assuming the Beam IO and the >> runner IO can differ, it means that the "Beam IO" would have to populate >> all runner specific IO configuration. >> >> Of course, it's always possible to use a PTransform to wrap the runner >> native IO, but we are back on the same concern: the pipeline will be >> couple to a specific runner. >> >> The purpose of the useNative() flag is to "automatically" inform the >> runner to use a specific IO if it has one: the pipeline stays decoupled >> from the runners. >> >> Thoughts ? >> >> Thanks >> Regards >> JB >> > > -- > Jean-Baptiste Onofré > jbono...@apache.org > http://blog.nanthrax.net > Talend - http://www.talend.com > > >