Hi JB,I used the sample KafkaIO usage 
below.p.apply(KafkaIO.read().withBootstrapServers("kafkahost:9092").withTopics(topics));
p.run();

It threw the following at p.run():Exception in thread "main" 
java.lang.IllegalStateException: no evaluator registered for 
Read(UnboundedKafkaSource) at 
org.apache.beam.sdk.runners.DirectPipelineRunner$Evaluator.visitTransform(DirectPipelineRunner.java:898)
 I am sure I am missing something.Would be great if I could see a sample code.I 
appreciate it sir.Cheers

      From: Jean-Baptiste Onofré <j...@nanthrax.net>
 To: dev@beam.incubator.apache.org 
 Sent: Thursday, April 28, 2016 5:30 AM
 Subject: [DISCUSS] Beam IO &runners native IO
   
Hi all,

regarding the recent threads on the mailing list, I would like to start 
a format discussion around the IO.
As we can expect the first contributions on this area (I already have 
some work in progress around this ;)), I think it's a fair discussion to 
have.

Now, we have two kinds of IO: the one "generic" to Beam, the one "local" 
to the runners.

For example, let's take Kafka: we have the KafkaIO (in IO), and for 
instance, we have the spark-streaming kafka connector (in Spark Runner).

Right now, we have two approaches for the user:
1. In the pipeline, we use KafkaIO from Beam: it's the preferred 
approach for sure. However, the user may want to use the runner specific 
IO for two reasons:
    * Beam doesn't provide the IO yet (for instance, spark cassandra 
connector is available whereas we don't have yet any CassandraIO (I'm 
working on it anyway ;))
    * The runner native IO is optimized or contain more features that the 
Beam native IO
2. So, for the previous reasons, the user could want to use the native 
runner IO. The drawback of this approach is that the pipeline will be 
tight to a specific runner, which is completely against the Beam design.

I wonder if it wouldn't make sense to add flag on the IO API (and 
related on Runner API) like .useNative().

For instance, the user would be able to do:

pipeline.apply(KafkaIO.read().withBootstrapServers("...").withTopics("...").useNative(true);

then, if the runner has a "native" IO, it will use it, else, if 
useNative(false) (the default), it won't use any runner native IO.

The point there is for the configuration: assuming the Beam IO and the 
runner IO can differ, it means that the "Beam IO" would have to populate 
all runner specific IO configuration.

Of course, it's always possible to use a PTransform to wrap the runner 
native IO, but we are back on the same concern: the pipeline will be 
couple to a specific runner.

The purpose of the useNative() flag is to "automatically" inform the 
runner to use a specific IO if it has one: the pipeline stays decoupled 
from the runners.

Thoughts ?

Thanks
Regards
JB
-- 
Jean-Baptiste Onofré
jbono...@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com


  

Reply via email to