Hi Max,

Understood. It remembers me a discussion we had with Dan.

Regards
JB

On 04/29/2016 12:44 PM, Maximilian Michels wrote:
@Aljoscha I didn't know that Kafka always stores Key/Value but I see
that we also have support for setting Kafka keys in Flink.

@JB I get your point that a sink is simply a DoFn, but a ParDo is not
a good match for a sink. A Sink doesn't produce a PCollection but
represents the end of a pipeline. Like an UnboundedSource, an
UnboundedSink has also special needs, i.e. it needs to provide a
checkpointing mechanism. I think we need something along the lines of
the existing Write transform for batch.

On Fri, Apr 29, 2016 at 12:27 PM, Jean-Baptiste Onofré <[email protected]> 
wrote:
Hi,

KafkaIO uses KafkaRecord which is basically key,value + some metadata
(topic, partition, offset).

Can you describe the behavior of an UnboundedSink ?

UnboundedSource is obvious: it's still consuming data creating PCollection
sent into the pipeline.

But UnboundedSink ? Do you mean that the UnboundedSink will write each
record in the PCollection ? When does it stop ?

Regards
JB


On 04/29/2016 12:07 PM, Aljoscha Krettek wrote:

Hi,
I think the fact that KafkaIO has a <key, value> model comes from Kafka
having a <key, value> model. I imagine most sources will emit the type of
values appropriate for them.

I agree with Max that the lack of an UnboundedSink seems strange. Do we
have any "sinks" implemented as a ParDo already?

Cheers,
Aljoscha

On Fri, 29 Apr 2016 at 11:22 Jean-Baptiste Onofré <[email protected]> wrote:

Hi Max,

Your four points are valid and we already discussed about that.

1. +1, the runner API should bring utils around that
2. UnboundedSink has been discussed (I don't see strong use case for
now, as it takes a PCollection).
3. +1, Dan talked about improving the hierarchy.
4. +1, I'm working on new IO (JMS, MQTT, JDBC, Cassandra, Camel, ...).

I would add:

5. Add a page on the website listing the IO, their usage and
configuration. Something like we have in Camel:
http://camel.apache.org/components.html
6. Refactore the FileIO to avoid usage of IOChannelFactory and use a
filesystem plugin (file, hdfs, s3, aws, etc).

Dan planned to create "util" to deal with watermark and timestamps.

Regards
JB

On 04/29/2016 11:11 AM, Maximilian Michels wrote:

@Amir: This is the Developer mailing list. Please post your questions
regarding Beam on the user mailing list.

+1 for portability in general. However, I see some crucial TODOs coming

up:


1) Improving the integration of Runners with the Beam sink/source API
2) Providing interfaces to implement new connectors (i.e. still no
existing UnboundedSink)
3) Extending existing interfaces to ease implementation of connectors
and provide a uniform API (i.e. on top of UnboundedSource)
4) Working on a more complete set of connectors in Beam

Looking at the KafkaIO implementation, I wonder shouldn't we extract
the custom Watermark and Timestamp function into an extra interface?
All connectors are going to have methods these methods. It would be
nice to have a uniform API among the connectors.

Further, the KafkaIO enforces a <key, value> data model which AFAIK is
not enforced by the Beam model. I don't know the details for this
design decision but I would like this to be communicated before it is
merged into the master. Otherwise, it will be harder to achieve
portability among the Runners. In Flink and Spark we are already
experienced with all kinds of connectors and user's needs. It would be
nice to feed that back in the course of adding new connectors to the
Beam API.

I would expect an active discussion on the Dev mailing list before any
new connector API gets merged. Furthermore, let us provide better
interfaces for connector needs. Finally, let us introduce unbounded
sinks :)

On Fri, Apr 29, 2016 at 7:54 AM, amir bahmanyari
<[email protected]> wrote:

This may help trace it:Exception in thread "main"

java.lang.IllegalStateException: no evaluator registered for
Read(UnboundedKafkaSource) at

org.apache.beam.sdk.runners.DirectPipelineRunner$Evaluator.visitTransform(DirectPipelineRunner.java:898)
at

org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:221)
at

org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:217)
at

org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:217)
at

org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:104)
at org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:261)
at

org.apache.beam.sdk.runners.DirectPipelineRunner$Evaluator.run(DirectPipelineRunner.java:860)
at

org.apache.beam.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:572)
at

org.apache.beam.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:106)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:182) at benchmark.fli
   n
kspark.flink.ReadFromKafka2.main(ReadFromKafka2.java:286)


         From: Jean-Baptiste Onofré <[email protected]>
    To: [email protected]
    Sent: Thursday, April 28, 2016 10:08 PM
    Subject: Re: [DISCUSS] Beam IO &runners native IO

I gonna take a look. The DirectPipelineRunner didn't support the
unbounded collection (it has been fixed last night AFAIR). It could be
related.

Regards
JB

On 04/29/2016 07:00 AM, amir bahmanyari wrote:

Hi JB,I used the sample KafkaIO usage


below.p.apply(KafkaIO.read().withBootstrapServers("kafkahost:9092").withTopics(topics));

p.run();

It threw the following at p.run():Exception in thread "main"

java.lang.IllegalStateException: no evaluator registered for
Read(UnboundedKafkaSource) at

org.apache.beam.sdk.runners.DirectPipelineRunner$Evaluator.visitTransform(DirectPipelineRunner.java:898)
I am sure I am missing something.Would be great if I could see a sample
code.I appreciate it sir.Cheers


          From: Jean-Baptiste Onofré <[email protected]>
    To: [email protected]
    Sent: Thursday, April 28, 2016 5:30 AM
    Subject: [DISCUSS] Beam IO &runners native IO

Hi all,

regarding the recent threads on the mailing list, I would like to
start
a format discussion around the IO.
As we can expect the first contributions on this area (I already have
some work in progress around this ;)), I think it's a fair discussion

to

have.

Now, we have two kinds of IO: the one "generic" to Beam, the one

"local"

to the runners.

For example, let's take Kafka: we have the KafkaIO (in IO), and for
instance, we have the spark-streaming kafka connector (in Spark

Runner).


Right now, we have two approaches for the user:
1. In the pipeline, we use KafkaIO from Beam: it's the preferred
approach for sure. However, the user may want to use the runner

specific

IO for two reasons:
        * Beam doesn't provide the IO yet (for instance, spark
cassandra
connector is available whereas we don't have yet any CassandraIO (I'm
working on it anyway ;))
        * The runner native IO is optimized or contain more features

that the

Beam native IO
2. So, for the previous reasons, the user could want to use the native
runner IO. The drawback of this approach is that the pipeline will be
tight to a specific runner, which is completely against the Beam

design.


I wonder if it wouldn't make sense to add flag on the IO API (and
related on Runner API) like .useNative().

For instance, the user would be able to do:



pipeline.apply(KafkaIO.read().withBootstrapServers("...").withTopics("...").useNative(true);


then, if the runner has a "native" IO, it will use it, else, if
useNative(false) (the default), it won't use any runner native IO.

The point there is for the configuration: assuming the Beam IO and the
runner IO can differ, it means that the "Beam IO" would have to

populate

all runner specific IO configuration.

Of course, it's always possible to use a PTransform to wrap the runner
native IO, but we are back on the same concern: the pipeline will be
couple to a specific runner.

The purpose of the useNative() flag is to "automatically" inform the
runner to use a specific IO if it has one: the pipeline stays
decoupled
from the runners.

Thoughts ?

Thanks
Regards
JB


--
Jean-Baptiste Onofré
[email protected]
http://blog.nanthrax.net
Talend - http://www.talend.com




--
Jean-Baptiste Onofré
[email protected]
http://blog.nanthrax.net
Talend - http://www.talend.com



--
Jean-Baptiste Onofré
[email protected]
http://blog.nanthrax.net
Talend - http://www.talend.com

--
Jean-Baptiste Onofré
[email protected]
http://blog.nanthrax.net
Talend - http://www.talend.com

Reply via email to