@Aljoscha I didn't know that Kafka always stores Key/Value but I see
that we also have support for setting Kafka keys in Flink.

@JB I get your point that a sink is simply a DoFn, but a ParDo is not
a good match for a sink. A Sink doesn't produce a PCollection but
represents the end of a pipeline. Like an UnboundedSource, an
UnboundedSink has also special needs, i.e. it needs to provide a
checkpointing mechanism. I think we need something along the lines of
the existing Write transform for batch.

On Fri, Apr 29, 2016 at 12:27 PM, Jean-Baptiste Onofré <[email protected]> 
wrote:
> Hi,
>
> KafkaIO uses KafkaRecord which is basically key,value + some metadata
> (topic, partition, offset).
>
> Can you describe the behavior of an UnboundedSink ?
>
> UnboundedSource is obvious: it's still consuming data creating PCollection
> sent into the pipeline.
>
> But UnboundedSink ? Do you mean that the UnboundedSink will write each
> record in the PCollection ? When does it stop ?
>
> Regards
> JB
>
>
> On 04/29/2016 12:07 PM, Aljoscha Krettek wrote:
>>
>> Hi,
>> I think the fact that KafkaIO has a <key, value> model comes from Kafka
>> having a <key, value> model. I imagine most sources will emit the type of
>> values appropriate for them.
>>
>> I agree with Max that the lack of an UnboundedSink seems strange. Do we
>> have any "sinks" implemented as a ParDo already?
>>
>> Cheers,
>> Aljoscha
>>
>> On Fri, 29 Apr 2016 at 11:22 Jean-Baptiste Onofré <[email protected]> wrote:
>>
>>> Hi Max,
>>>
>>> Your four points are valid and we already discussed about that.
>>>
>>> 1. +1, the runner API should bring utils around that
>>> 2. UnboundedSink has been discussed (I don't see strong use case for
>>> now, as it takes a PCollection).
>>> 3. +1, Dan talked about improving the hierarchy.
>>> 4. +1, I'm working on new IO (JMS, MQTT, JDBC, Cassandra, Camel, ...).
>>>
>>> I would add:
>>>
>>> 5. Add a page on the website listing the IO, their usage and
>>> configuration. Something like we have in Camel:
>>> http://camel.apache.org/components.html
>>> 6. Refactore the FileIO to avoid usage of IOChannelFactory and use a
>>> filesystem plugin (file, hdfs, s3, aws, etc).
>>>
>>> Dan planned to create "util" to deal with watermark and timestamps.
>>>
>>> Regards
>>> JB
>>>
>>> On 04/29/2016 11:11 AM, Maximilian Michels wrote:
>>>>
>>>> @Amir: This is the Developer mailing list. Please post your questions
>>>> regarding Beam on the user mailing list.
>>>>
>>>> +1 for portability in general. However, I see some crucial TODOs coming
>>>
>>> up:
>>>>
>>>>
>>>> 1) Improving the integration of Runners with the Beam sink/source API
>>>> 2) Providing interfaces to implement new connectors (i.e. still no
>>>> existing UnboundedSink)
>>>> 3) Extending existing interfaces to ease implementation of connectors
>>>> and provide a uniform API (i.e. on top of UnboundedSource)
>>>> 4) Working on a more complete set of connectors in Beam
>>>>
>>>> Looking at the KafkaIO implementation, I wonder shouldn't we extract
>>>> the custom Watermark and Timestamp function into an extra interface?
>>>> All connectors are going to have methods these methods. It would be
>>>> nice to have a uniform API among the connectors.
>>>>
>>>> Further, the KafkaIO enforces a <key, value> data model which AFAIK is
>>>> not enforced by the Beam model. I don't know the details for this
>>>> design decision but I would like this to be communicated before it is
>>>> merged into the master. Otherwise, it will be harder to achieve
>>>> portability among the Runners. In Flink and Spark we are already
>>>> experienced with all kinds of connectors and user's needs. It would be
>>>> nice to feed that back in the course of adding new connectors to the
>>>> Beam API.
>>>>
>>>> I would expect an active discussion on the Dev mailing list before any
>>>> new connector API gets merged. Furthermore, let us provide better
>>>> interfaces for connector needs. Finally, let us introduce unbounded
>>>> sinks :)
>>>>
>>>> On Fri, Apr 29, 2016 at 7:54 AM, amir bahmanyari
>>>> <[email protected]> wrote:
>>>>>
>>>>> This may help trace it:Exception in thread "main"
>>>
>>> java.lang.IllegalStateException: no evaluator registered for
>>> Read(UnboundedKafkaSource) at
>>>
>>> org.apache.beam.sdk.runners.DirectPipelineRunner$Evaluator.visitTransform(DirectPipelineRunner.java:898)
>>> at
>>>
>>> org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:221)
>>> at
>>>
>>> org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:217)
>>> at
>>>
>>> org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:217)
>>> at
>>>
>>> org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:104)
>>> at org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:261)
>>> at
>>>
>>> org.apache.beam.sdk.runners.DirectPipelineRunner$Evaluator.run(DirectPipelineRunner.java:860)
>>> at
>>>
>>> org.apache.beam.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:572)
>>> at
>>>
>>> org.apache.beam.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:106)
>>> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:182) at benchmark.fli
>>>   n
>>> kspark.flink.ReadFromKafka2.main(ReadFromKafka2.java:286)
>>>>>
>>>>>
>>>>>         From: Jean-Baptiste Onofré <[email protected]>
>>>>>    To: [email protected]
>>>>>    Sent: Thursday, April 28, 2016 10:08 PM
>>>>>    Subject: Re: [DISCUSS] Beam IO &runners native IO
>>>>>
>>>>> I gonna take a look. The DirectPipelineRunner didn't support the
>>>>> unbounded collection (it has been fixed last night AFAIR). It could be
>>>>> related.
>>>>>
>>>>> Regards
>>>>> JB
>>>>>
>>>>> On 04/29/2016 07:00 AM, amir bahmanyari wrote:
>>>>>>
>>>>>> Hi JB,I used the sample KafkaIO usage
>>>
>>>
>>> below.p.apply(KafkaIO.read().withBootstrapServers("kafkahost:9092").withTopics(topics));
>>>>>>
>>>>>> p.run();
>>>>>>
>>>>>> It threw the following at p.run():Exception in thread "main"
>>>
>>> java.lang.IllegalStateException: no evaluator registered for
>>> Read(UnboundedKafkaSource) at
>>>
>>> org.apache.beam.sdk.runners.DirectPipelineRunner$Evaluator.visitTransform(DirectPipelineRunner.java:898)
>>> I am sure I am missing something.Would be great if I could see a sample
>>> code.I appreciate it sir.Cheers
>>>>>>
>>>>>>
>>>>>>          From: Jean-Baptiste Onofré <[email protected]>
>>>>>>    To: [email protected]
>>>>>>    Sent: Thursday, April 28, 2016 5:30 AM
>>>>>>    Subject: [DISCUSS] Beam IO &runners native IO
>>>>>>
>>>>>> Hi all,
>>>>>>
>>>>>> regarding the recent threads on the mailing list, I would like to
>>>>>> start
>>>>>> a format discussion around the IO.
>>>>>> As we can expect the first contributions on this area (I already have
>>>>>> some work in progress around this ;)), I think it's a fair discussion
>>>
>>> to
>>>>>>
>>>>>> have.
>>>>>>
>>>>>> Now, we have two kinds of IO: the one "generic" to Beam, the one
>>>
>>> "local"
>>>>>>
>>>>>> to the runners.
>>>>>>
>>>>>> For example, let's take Kafka: we have the KafkaIO (in IO), and for
>>>>>> instance, we have the spark-streaming kafka connector (in Spark
>>>
>>> Runner).
>>>>>>
>>>>>>
>>>>>> Right now, we have two approaches for the user:
>>>>>> 1. In the pipeline, we use KafkaIO from Beam: it's the preferred
>>>>>> approach for sure. However, the user may want to use the runner
>>>
>>> specific
>>>>>>
>>>>>> IO for two reasons:
>>>>>>        * Beam doesn't provide the IO yet (for instance, spark
>>>>>> cassandra
>>>>>> connector is available whereas we don't have yet any CassandraIO (I'm
>>>>>> working on it anyway ;))
>>>>>>        * The runner native IO is optimized or contain more features
>>>
>>> that the
>>>>>>
>>>>>> Beam native IO
>>>>>> 2. So, for the previous reasons, the user could want to use the native
>>>>>> runner IO. The drawback of this approach is that the pipeline will be
>>>>>> tight to a specific runner, which is completely against the Beam
>>>
>>> design.
>>>>>>
>>>>>>
>>>>>> I wonder if it wouldn't make sense to add flag on the IO API (and
>>>>>> related on Runner API) like .useNative().
>>>>>>
>>>>>> For instance, the user would be able to do:
>>>>>>
>>>>>>
>>>
>>> pipeline.apply(KafkaIO.read().withBootstrapServers("...").withTopics("...").useNative(true);
>>>>>>
>>>>>>
>>>>>> then, if the runner has a "native" IO, it will use it, else, if
>>>>>> useNative(false) (the default), it won't use any runner native IO.
>>>>>>
>>>>>> The point there is for the configuration: assuming the Beam IO and the
>>>>>> runner IO can differ, it means that the "Beam IO" would have to
>>>
>>> populate
>>>>>>
>>>>>> all runner specific IO configuration.
>>>>>>
>>>>>> Of course, it's always possible to use a PTransform to wrap the runner
>>>>>> native IO, but we are back on the same concern: the pipeline will be
>>>>>> couple to a specific runner.
>>>>>>
>>>>>> The purpose of the useNative() flag is to "automatically" inform the
>>>>>> runner to use a specific IO if it has one: the pipeline stays
>>>>>> decoupled
>>>>>> from the runners.
>>>>>>
>>>>>> Thoughts ?
>>>>>>
>>>>>> Thanks
>>>>>> Regards
>>>>>> JB
>>>>>>
>>>>>
>>>>> --
>>>>> Jean-Baptiste Onofré
>>>>> [email protected]
>>>>> http://blog.nanthrax.net
>>>>> Talend - http://www.talend.com
>>>>>
>>>>>
>>>>>
>>>
>>> --
>>> Jean-Baptiste Onofré
>>> [email protected]
>>> http://blog.nanthrax.net
>>> Talend - http://www.talend.com
>>>
>>
>
> --
> Jean-Baptiste Onofré
> [email protected]
> http://blog.nanthrax.net
> Talend - http://www.talend.com

Reply via email to