Hi Max,
Your four points are valid and we already discussed about that.
1. +1, the runner API should bring utils around that
2. UnboundedSink has been discussed (I don't see strong use case for
now, as it takes a PCollection).
3. +1, Dan talked about improving the hierarchy.
4. +1, I'm working on new IO (JMS, MQTT, JDBC, Cassandra, Camel, ...).
I would add:
5. Add a page on the website listing the IO, their usage and
configuration. Something like we have in Camel:
http://camel.apache.org/components.html
6. Refactore the FileIO to avoid usage of IOChannelFactory and use a
filesystem plugin (file, hdfs, s3, aws, etc).
Dan planned to create "util" to deal with watermark and timestamps.
Regards
JB
On 04/29/2016 11:11 AM, Maximilian Michels wrote:
@Amir: This is the Developer mailing list. Please post your questions
regarding Beam on the user mailing list.
+1 for portability in general. However, I see some crucial TODOs coming up:
1) Improving the integration of Runners with the Beam sink/source API
2) Providing interfaces to implement new connectors (i.e. still no
existing UnboundedSink)
3) Extending existing interfaces to ease implementation of connectors
and provide a uniform API (i.e. on top of UnboundedSource)
4) Working on a more complete set of connectors in Beam
Looking at the KafkaIO implementation, I wonder shouldn't we extract
the custom Watermark and Timestamp function into an extra interface?
All connectors are going to have methods these methods. It would be
nice to have a uniform API among the connectors.
Further, the KafkaIO enforces a <key, value> data model which AFAIK is
not enforced by the Beam model. I don't know the details for this
design decision but I would like this to be communicated before it is
merged into the master. Otherwise, it will be harder to achieve
portability among the Runners. In Flink and Spark we are already
experienced with all kinds of connectors and user's needs. It would be
nice to feed that back in the course of adding new connectors to the
Beam API.
I would expect an active discussion on the Dev mailing list before any
new connector API gets merged. Furthermore, let us provide better
interfaces for connector needs. Finally, let us introduce unbounded
sinks :)
On Fri, Apr 29, 2016 at 7:54 AM, amir bahmanyari
<[email protected]> wrote:
This may help trace it:Exception in thread "main"
java.lang.IllegalStateException: no evaluator registered for Read(UnboundedKafkaSource)
at
org.apache.beam.sdk.runners.DirectPipelineRunner$Evaluator.visitTransform(DirectPipelineRunner.java:898)
at org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:221) at
org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:217) at
org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:217) at
org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:104) at
org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:261) at
org.apache.beam.sdk.runners.DirectPipelineRunner$Evaluator.run(DirectPipelineRunner.java:860)
at org.apache.beam.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:572)
at org.apache.beam.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:106) at
org.apache.beam.sdk.Pipeline.run(Pipeline.java:182) at benchmark.fli
n
kspark.flink.ReadFromKafka2.main(ReadFromKafka2.java:286)
From: Jean-Baptiste Onofré <[email protected]>
To: [email protected]
Sent: Thursday, April 28, 2016 10:08 PM
Subject: Re: [DISCUSS] Beam IO &runners native IO
I gonna take a look. The DirectPipelineRunner didn't support the
unbounded collection (it has been fixed last night AFAIR). It could be
related.
Regards
JB
On 04/29/2016 07:00 AM, amir bahmanyari wrote:
Hi JB,I used the sample KafkaIO usage
below.p.apply(KafkaIO.read().withBootstrapServers("kafkahost:9092").withTopics(topics));
p.run();
It threw the following at p.run():Exception in thread "main"
java.lang.IllegalStateException: no evaluator registered for Read(UnboundedKafkaSource)
at
org.apache.beam.sdk.runners.DirectPipelineRunner$Evaluator.visitTransform(DirectPipelineRunner.java:898)
I am sure I am missing something.Would be great if I could see a sample code.I
appreciate it sir.Cheers
From: Jean-Baptiste Onofré <[email protected]>
To: [email protected]
Sent: Thursday, April 28, 2016 5:30 AM
Subject: [DISCUSS] Beam IO &runners native IO
Hi all,
regarding the recent threads on the mailing list, I would like to start
a format discussion around the IO.
As we can expect the first contributions on this area (I already have
some work in progress around this ;)), I think it's a fair discussion to
have.
Now, we have two kinds of IO: the one "generic" to Beam, the one "local"
to the runners.
For example, let's take Kafka: we have the KafkaIO (in IO), and for
instance, we have the spark-streaming kafka connector (in Spark Runner).
Right now, we have two approaches for the user:
1. In the pipeline, we use KafkaIO from Beam: it's the preferred
approach for sure. However, the user may want to use the runner specific
IO for two reasons:
* Beam doesn't provide the IO yet (for instance, spark cassandra
connector is available whereas we don't have yet any CassandraIO (I'm
working on it anyway ;))
* The runner native IO is optimized or contain more features that the
Beam native IO
2. So, for the previous reasons, the user could want to use the native
runner IO. The drawback of this approach is that the pipeline will be
tight to a specific runner, which is completely against the Beam design.
I wonder if it wouldn't make sense to add flag on the IO API (and
related on Runner API) like .useNative().
For instance, the user would be able to do:
pipeline.apply(KafkaIO.read().withBootstrapServers("...").withTopics("...").useNative(true);
then, if the runner has a "native" IO, it will use it, else, if
useNative(false) (the default), it won't use any runner native IO.
The point there is for the configuration: assuming the Beam IO and the
runner IO can differ, it means that the "Beam IO" would have to populate
all runner specific IO configuration.
Of course, it's always possible to use a PTransform to wrap the runner
native IO, but we are back on the same concern: the pipeline will be
couple to a specific runner.
The purpose of the useNative() flag is to "automatically" inform the
runner to use a specific IO if it has one: the pipeline stays decoupled
from the runners.
Thoughts ?
Thanks
Regards
JB
--
Jean-Baptiste Onofré
[email protected]
http://blog.nanthrax.net
Talend - http://www.talend.com
--
Jean-Baptiste Onofré
[email protected]
http://blog.nanthrax.net
Talend - http://www.talend.com