@Aljoscha I didn't know that Kafka always stores Key/Value but I see
that we also have support for setting Kafka keys in Flink.
@JB I get your point that a sink is simply a DoFn, but a ParDo is not
a good match for a sink. A Sink doesn't produce a PCollection but
represents the end of a pipeline. Like an UnboundedSource, an
UnboundedSink has also special needs, i.e. it needs to provide a
checkpointing mechanism. I think we need something along the lines of
the existing Write transform for batch.
On Fri, Apr 29, 2016 at 12:27 PM, Jean-Baptiste Onofré <j...@nanthrax.net>
wrote:
Hi,
KafkaIO uses KafkaRecord which is basically key,value + some metadata
(topic, partition, offset).
Can you describe the behavior of an UnboundedSink ?
UnboundedSource is obvious: it's still consuming data creating
PCollection
sent into the pipeline.
But UnboundedSink ? Do you mean that the UnboundedSink will write each
record in the PCollection ? When does it stop ?
Regards
JB
On 04/29/2016 12:07 PM, Aljoscha Krettek wrote:
Hi,
I think the fact that KafkaIO has a <key, value> model comes from Kafka
having a <key, value> model. I imagine most sources will emit the type
of
values appropriate for them.
I agree with Max that the lack of an UnboundedSink seems strange. Do we
have any "sinks" implemented as a ParDo already?
Cheers,
Aljoscha
On Fri, 29 Apr 2016 at 11:22 Jean-Baptiste Onofré <j...@nanthrax.net>
wrote:
Hi Max,
Your four points are valid and we already discussed about that.
1. +1, the runner API should bring utils around that
2. UnboundedSink has been discussed (I don't see strong use case for
now, as it takes a PCollection).
3. +1, Dan talked about improving the hierarchy.
4. +1, I'm working on new IO (JMS, MQTT, JDBC, Cassandra, Camel, ...).
I would add:
5. Add a page on the website listing the IO, their usage and
configuration. Something like we have in Camel:
http://camel.apache.org/components.html
6. Refactore the FileIO to avoid usage of IOChannelFactory and use a
filesystem plugin (file, hdfs, s3, aws, etc).
Dan planned to create "util" to deal with watermark and timestamps.
Regards
JB
On 04/29/2016 11:11 AM, Maximilian Michels wrote:
@Amir: This is the Developer mailing list. Please post your questions
regarding Beam on the user mailing list.
+1 for portability in general. However, I see some crucial TODOs
coming
up:
1) Improving the integration of Runners with the Beam sink/source API
2) Providing interfaces to implement new connectors (i.e. still no
existing UnboundedSink)
3) Extending existing interfaces to ease implementation of connectors
and provide a uniform API (i.e. on top of UnboundedSource)
4) Working on a more complete set of connectors in Beam
Looking at the KafkaIO implementation, I wonder shouldn't we extract
the custom Watermark and Timestamp function into an extra interface?
All connectors are going to have methods these methods. It would be
nice to have a uniform API among the connectors.
Further, the KafkaIO enforces a <key, value> data model which AFAIK is
not enforced by the Beam model. I don't know the details for this
design decision but I would like this to be communicated before it is
merged into the master. Otherwise, it will be harder to achieve
portability among the Runners. In Flink and Spark we are already
experienced with all kinds of connectors and user's needs. It would be
nice to feed that back in the course of adding new connectors to the
Beam API.
I would expect an active discussion on the Dev mailing list before any
new connector API gets merged. Furthermore, let us provide better
interfaces for connector needs. Finally, let us introduce unbounded
sinks :)
On Fri, Apr 29, 2016 at 7:54 AM, amir bahmanyari
<amirto...@yahoo.com.invalid> wrote:
This may help trace it:Exception in thread "main"
java.lang.IllegalStateException: no evaluator registered for
Read(UnboundedKafkaSource) at
org.apache.beam.sdk.runners.DirectPipelineRunner$Evaluator.visitTransform(DirectPipelineRunner.java:898)
at
org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:221)
at
org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:217)
at
org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:217)
at
org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:104)
at
org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:261)
at
org.apache.beam.sdk.runners.DirectPipelineRunner$Evaluator.run(DirectPipelineRunner.java:860)
at
org.apache.beam.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:572)
at
org.apache.beam.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:106)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:182) at benchmark.fli
n
kspark.flink.ReadFromKafka2.main(ReadFromKafka2.java:286)
From: Jean-Baptiste Onofré <j...@nanthrax.net>
To: dev@beam.incubator.apache.org
Sent: Thursday, April 28, 2016 10:08 PM
Subject: Re: [DISCUSS] Beam IO &runners native IO
I gonna take a look. The DirectPipelineRunner didn't support the
unbounded collection (it has been fixed last night AFAIR). It could
be
related.
Regards
JB
On 04/29/2016 07:00 AM, amir bahmanyari wrote:
Hi JB,I used the sample KafkaIO usage
below.p.apply(KafkaIO.read().withBootstrapServers("kafkahost:9092").withTopics(topics));
p.run();
It threw the following at p.run():Exception in thread "main"
java.lang.IllegalStateException: no evaluator registered for
Read(UnboundedKafkaSource) at
org.apache.beam.sdk.runners.DirectPipelineRunner$Evaluator.visitTransform(DirectPipelineRunner.java:898)
I am sure I am missing something.Would be great if I could see a sample
code.I appreciate it sir.Cheers
From: Jean-Baptiste Onofré <j...@nanthrax.net>
To: dev@beam.incubator.apache.org
Sent: Thursday, April 28, 2016 5:30 AM
Subject: [DISCUSS] Beam IO &runners native IO
Hi all,
regarding the recent threads on the mailing list, I would like to
start
a format discussion around the IO.
As we can expect the first contributions on this area (I already
have
some work in progress around this ;)), I think it's a fair
discussion
to
have.
Now, we have two kinds of IO: the one "generic" to Beam, the one
"local"
to the runners.
For example, let's take Kafka: we have the KafkaIO (in IO), and for
instance, we have the spark-streaming kafka connector (in Spark
Runner).
Right now, we have two approaches for the user:
1. In the pipeline, we use KafkaIO from Beam: it's the preferred
approach for sure. However, the user may want to use the runner
specific
IO for two reasons:
* Beam doesn't provide the IO yet (for instance, spark
cassandra
connector is available whereas we don't have yet any CassandraIO
(I'm
working on it anyway ;))
* The runner native IO is optimized or contain more features
that the
Beam native IO
2. So, for the previous reasons, the user could want to use the
native
runner IO. The drawback of this approach is that the pipeline will
be
tight to a specific runner, which is completely against the Beam
design.
I wonder if it wouldn't make sense to add flag on the IO API (and
related on Runner API) like .useNative().
For instance, the user would be able to do:
pipeline.apply(KafkaIO.read().withBootstrapServers("...").withTopics("...").useNative(true);
then, if the runner has a "native" IO, it will use it, else, if
useNative(false) (the default), it won't use any runner native IO.
The point there is for the configuration: assuming the Beam IO and
the
runner IO can differ, it means that the "Beam IO" would have to
populate
all runner specific IO configuration.
Of course, it's always possible to use a PTransform to wrap the
runner
native IO, but we are back on the same concern: the pipeline will be
couple to a specific runner.
The purpose of the useNative() flag is to "automatically" inform the
runner to use a specific IO if it has one: the pipeline stays
decoupled
from the runners.
Thoughts ?
Thanks
Regards
JB
--
Jean-Baptiste Onofré
jbono...@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com
--
Jean-Baptiste Onofré
jbono...@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com
--
Jean-Baptiste Onofré
jbono...@apache.org
http://blog.nanthrax.net
Talend - http://www.talend.com