+Dan Halperin (who is OOO for a couple of days)

Yes, there are plans for unbounded sinks. But unlike sources, sinks don't
add any additional functionality beyond a ParDo (they just make it more
obvious how to use a ParDo appropriately to get the right fault tolerance).
So they haven't been prioritized yet.

On Fri, Apr 29, 2016 at 4:43 AM, Jean-Baptiste Onofré <j...@nanthrax.net>
wrote:

> Hi Amit,
>
> yes, definitely, the highest priority is that the existing runners have to
> fully work with Beam IO. I will work with you on the Spark runner about
> that.
>
> Regards
> JB
>
>
> On 04/29/2016 01:06 PM, Amit Sela wrote:
>
>> +1 on Max's comment on active discussion for connector API.
>> I think that we can use Spark and Flink's existing connectors (Spark
>> supports many) as test cases and consider a bottom-up design approach
>> rather than top-down, especially since we're in incubation.
>>
>> Also +1 for Davor that runners should make SDK IOs a high priority - this
>> will probably give us more input then any good email thread.. I'll do my
>> best to make this happen for the Spark runner.
>>
>> On Fri, Apr 29, 2016 at 1:45 PM Maximilian Michels <m...@apache.org>
>> wrote:
>>
>> @Aljoscha I didn't know that Kafka always stores Key/Value but I see
>>> that we also have support for setting Kafka keys in Flink.
>>>
>>> @JB I get your point that a sink is simply a DoFn, but a ParDo is not
>>> a good match for a sink. A Sink doesn't produce a PCollection but
>>> represents the end of a pipeline. Like an UnboundedSource, an
>>> UnboundedSink has also special needs, i.e. it needs to provide a
>>> checkpointing mechanism. I think we need something along the lines of
>>> the existing Write transform for batch.
>>>
>>> On Fri, Apr 29, 2016 at 12:27 PM, Jean-Baptiste Onofré <j...@nanthrax.net>
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> KafkaIO uses KafkaRecord which is basically key,value + some metadata
>>>> (topic, partition, offset).
>>>>
>>>> Can you describe the behavior of an UnboundedSink ?
>>>>
>>>> UnboundedSource is obvious: it's still consuming data creating
>>>>
>>> PCollection
>>>
>>>> sent into the pipeline.
>>>>
>>>> But UnboundedSink ? Do you mean that the UnboundedSink will write each
>>>> record in the PCollection ? When does it stop ?
>>>>
>>>> Regards
>>>> JB
>>>>
>>>>
>>>> On 04/29/2016 12:07 PM, Aljoscha Krettek wrote:
>>>>
>>>>>
>>>>> Hi,
>>>>> I think the fact that KafkaIO has a <key, value> model comes from Kafka
>>>>> having a <key, value> model. I imagine most sources will emit the type
>>>>>
>>>> of
>>>
>>>> values appropriate for them.
>>>>>
>>>>> I agree with Max that the lack of an UnboundedSink seems strange. Do we
>>>>> have any "sinks" implemented as a ParDo already?
>>>>>
>>>>> Cheers,
>>>>> Aljoscha
>>>>>
>>>>> On Fri, 29 Apr 2016 at 11:22 Jean-Baptiste Onofré <j...@nanthrax.net>
>>>>>
>>>> wrote:
>>>
>>>>
>>>>> Hi Max,
>>>>>>
>>>>>> Your four points are valid and we already discussed about that.
>>>>>>
>>>>>> 1. +1, the runner API should bring utils around that
>>>>>> 2. UnboundedSink has been discussed (I don't see strong use case for
>>>>>> now, as it takes a PCollection).
>>>>>> 3. +1, Dan talked about improving the hierarchy.
>>>>>> 4. +1, I'm working on new IO (JMS, MQTT, JDBC, Cassandra, Camel, ...).
>>>>>>
>>>>>> I would add:
>>>>>>
>>>>>> 5. Add a page on the website listing the IO, their usage and
>>>>>> configuration. Something like we have in Camel:
>>>>>> http://camel.apache.org/components.html
>>>>>> 6. Refactore the FileIO to avoid usage of IOChannelFactory and use a
>>>>>> filesystem plugin (file, hdfs, s3, aws, etc).
>>>>>>
>>>>>> Dan planned to create "util" to deal with watermark and timestamps.
>>>>>>
>>>>>> Regards
>>>>>> JB
>>>>>>
>>>>>> On 04/29/2016 11:11 AM, Maximilian Michels wrote:
>>>>>>
>>>>>>>
>>>>>>> @Amir: This is the Developer mailing list. Please post your questions
>>>>>>> regarding Beam on the user mailing list.
>>>>>>>
>>>>>>> +1 for portability in general. However, I see some crucial TODOs
>>>>>>>
>>>>>> coming
>>>
>>>>
>>>>>> up:
>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> 1) Improving the integration of Runners with the Beam sink/source API
>>>>>>> 2) Providing interfaces to implement new connectors (i.e. still no
>>>>>>> existing UnboundedSink)
>>>>>>> 3) Extending existing interfaces to ease implementation of connectors
>>>>>>> and provide a uniform API (i.e. on top of UnboundedSource)
>>>>>>> 4) Working on a more complete set of connectors in Beam
>>>>>>>
>>>>>>> Looking at the KafkaIO implementation, I wonder shouldn't we extract
>>>>>>> the custom Watermark and Timestamp function into an extra interface?
>>>>>>> All connectors are going to have methods these methods. It would be
>>>>>>> nice to have a uniform API among the connectors.
>>>>>>>
>>>>>>> Further, the KafkaIO enforces a <key, value> data model which AFAIK
>>>>>>> is
>>>>>>> not enforced by the Beam model. I don't know the details for this
>>>>>>> design decision but I would like this to be communicated before it is
>>>>>>> merged into the master. Otherwise, it will be harder to achieve
>>>>>>> portability among the Runners. In Flink and Spark we are already
>>>>>>> experienced with all kinds of connectors and user's needs. It would
>>>>>>> be
>>>>>>> nice to feed that back in the course of adding new connectors to the
>>>>>>> Beam API.
>>>>>>>
>>>>>>> I would expect an active discussion on the Dev mailing list before
>>>>>>> any
>>>>>>> new connector API gets merged. Furthermore, let us provide better
>>>>>>> interfaces for connector needs. Finally, let us introduce unbounded
>>>>>>> sinks :)
>>>>>>>
>>>>>>> On Fri, Apr 29, 2016 at 7:54 AM, amir bahmanyari
>>>>>>> <amirto...@yahoo.com.invalid> wrote:
>>>>>>>
>>>>>>>>
>>>>>>>> This may help trace it:Exception in thread "main"
>>>>>>>>
>>>>>>>
>>>>>> java.lang.IllegalStateException: no evaluator registered for
>>>>>> Read(UnboundedKafkaSource) at
>>>>>>
>>>>>>
>>>>>>
>>> org.apache.beam.sdk.runners.DirectPipelineRunner$Evaluator.visitTransform(DirectPipelineRunner.java:898)
>>>
>>>> at
>>>>>>
>>>>>>
>>>>>>
>>> org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:221)
>>>
>>>> at
>>>>>>
>>>>>>
>>>>>>
>>> org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:217)
>>>
>>>> at
>>>>>>
>>>>>>
>>>>>>
>>> org.apache.beam.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:217)
>>>
>>>> at
>>>>>>
>>>>>>
>>>>>>
>>> org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:104)
>>>
>>>> at
>>>>>>
>>>>> org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:261)
>>>
>>>> at
>>>>>>
>>>>>>
>>>>>>
>>> org.apache.beam.sdk.runners.DirectPipelineRunner$Evaluator.run(DirectPipelineRunner.java:860)
>>>
>>>> at
>>>>>>
>>>>>>
>>>>>>
>>> org.apache.beam.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:572)
>>>
>>>> at
>>>>>>
>>>>>>
>>>>>>
>>> org.apache.beam.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:106)
>>>
>>>> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:182) at benchmark.fli
>>>>>>    n
>>>>>> kspark.flink.ReadFromKafka2.main(ReadFromKafka2.java:286)
>>>>>>
>>>>>>>
>>>>>>>>
>>>>>>>>          From: Jean-Baptiste Onofré <j...@nanthrax.net>
>>>>>>>>     To: dev@beam.incubator.apache.org
>>>>>>>>     Sent: Thursday, April 28, 2016 10:08 PM
>>>>>>>>     Subject: Re: [DISCUSS] Beam IO &runners native IO
>>>>>>>>
>>>>>>>> I gonna take a look. The DirectPipelineRunner didn't support the
>>>>>>>> unbounded collection (it has been fixed last night AFAIR). It could
>>>>>>>>
>>>>>>> be
>>>
>>>> related.
>>>>>>>>
>>>>>>>> Regards
>>>>>>>> JB
>>>>>>>>
>>>>>>>> On 04/29/2016 07:00 AM, amir bahmanyari wrote:
>>>>>>>>
>>>>>>>>>
>>>>>>>>> Hi JB,I used the sample KafkaIO usage
>>>>>>>>>
>>>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>> below.p.apply(KafkaIO.read().withBootstrapServers("kafkahost:9092").withTopics(topics));
>>>
>>>>
>>>>>>>>> p.run();
>>>>>>>>>
>>>>>>>>> It threw the following at p.run():Exception in thread "main"
>>>>>>>>>
>>>>>>>>
>>>>>> java.lang.IllegalStateException: no evaluator registered for
>>>>>> Read(UnboundedKafkaSource) at
>>>>>>
>>>>>>
>>>>>>
>>> org.apache.beam.sdk.runners.DirectPipelineRunner$Evaluator.visitTransform(DirectPipelineRunner.java:898)
>>>
>>>> I am sure I am missing something.Would be great if I could see a sample
>>>>>> code.I appreciate it sir.Cheers
>>>>>>
>>>>>>>
>>>>>>>>>
>>>>>>>>>           From: Jean-Baptiste Onofré <j...@nanthrax.net>
>>>>>>>>>     To: dev@beam.incubator.apache.org
>>>>>>>>>     Sent: Thursday, April 28, 2016 5:30 AM
>>>>>>>>>     Subject: [DISCUSS] Beam IO &runners native IO
>>>>>>>>>
>>>>>>>>> Hi all,
>>>>>>>>>
>>>>>>>>> regarding the recent threads on the mailing list, I would like to
>>>>>>>>> start
>>>>>>>>> a format discussion around the IO.
>>>>>>>>> As we can expect the first contributions on this area (I already
>>>>>>>>>
>>>>>>>> have
>>>
>>>> some work in progress around this ;)), I think it's a fair
>>>>>>>>>
>>>>>>>> discussion
>>>
>>>>
>>>>>> to
>>>>>>
>>>>>>>
>>>>>>>>> have.
>>>>>>>>>
>>>>>>>>> Now, we have two kinds of IO: the one "generic" to Beam, the one
>>>>>>>>>
>>>>>>>>
>>>>>> "local"
>>>>>>
>>>>>>>
>>>>>>>>> to the runners.
>>>>>>>>>
>>>>>>>>> For example, let's take Kafka: we have the KafkaIO (in IO), and for
>>>>>>>>> instance, we have the spark-streaming kafka connector (in Spark
>>>>>>>>>
>>>>>>>>
>>>>>> Runner).
>>>>>>
>>>>>>>
>>>>>>>>>
>>>>>>>>> Right now, we have two approaches for the user:
>>>>>>>>> 1. In the pipeline, we use KafkaIO from Beam: it's the preferred
>>>>>>>>> approach for sure. However, the user may want to use the runner
>>>>>>>>>
>>>>>>>>
>>>>>> specific
>>>>>>
>>>>>>>
>>>>>>>>> IO for two reasons:
>>>>>>>>>         * Beam doesn't provide the IO yet (for instance, spark
>>>>>>>>> cassandra
>>>>>>>>> connector is available whereas we don't have yet any CassandraIO
>>>>>>>>>
>>>>>>>> (I'm
>>>
>>>> working on it anyway ;))
>>>>>>>>>         * The runner native IO is optimized or contain more
>>>>>>>>> features
>>>>>>>>>
>>>>>>>>
>>>>>> that the
>>>>>>
>>>>>>>
>>>>>>>>> Beam native IO
>>>>>>>>> 2. So, for the previous reasons, the user could want to use the
>>>>>>>>>
>>>>>>>> native
>>>
>>>> runner IO. The drawback of this approach is that the pipeline will
>>>>>>>>>
>>>>>>>> be
>>>
>>>> tight to a specific runner, which is completely against the Beam
>>>>>>>>>
>>>>>>>>
>>>>>> design.
>>>>>>
>>>>>>>
>>>>>>>>>
>>>>>>>>> I wonder if it wouldn't make sense to add flag on the IO API (and
>>>>>>>>> related on Runner API) like .useNative().
>>>>>>>>>
>>>>>>>>> For instance, the user would be able to do:
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>
>>>>>>
>>> pipeline.apply(KafkaIO.read().withBootstrapServers("...").withTopics("...").useNative(true);
>>>
>>>>
>>>>>>>>>
>>>>>>>>> then, if the runner has a "native" IO, it will use it, else, if
>>>>>>>>> useNative(false) (the default), it won't use any runner native IO.
>>>>>>>>>
>>>>>>>>> The point there is for the configuration: assuming the Beam IO and
>>>>>>>>>
>>>>>>>> the
>>>
>>>> runner IO can differ, it means that the "Beam IO" would have to
>>>>>>>>>
>>>>>>>>
>>>>>> populate
>>>>>>
>>>>>>>
>>>>>>>>> all runner specific IO configuration.
>>>>>>>>>
>>>>>>>>> Of course, it's always possible to use a PTransform to wrap the
>>>>>>>>>
>>>>>>>> runner
>>>
>>>> native IO, but we are back on the same concern: the pipeline will be
>>>>>>>>> couple to a specific runner.
>>>>>>>>>
>>>>>>>>> The purpose of the useNative() flag is to "automatically" inform
>>>>>>>>> the
>>>>>>>>> runner to use a specific IO if it has one: the pipeline stays
>>>>>>>>> decoupled
>>>>>>>>> from the runners.
>>>>>>>>>
>>>>>>>>> Thoughts ?
>>>>>>>>>
>>>>>>>>> Thanks
>>>>>>>>> Regards
>>>>>>>>> JB
>>>>>>>>>
>>>>>>>>>
>>>>>>>> --
>>>>>>>> Jean-Baptiste Onofré
>>>>>>>> jbono...@apache.org
>>>>>>>> http://blog.nanthrax.net
>>>>>>>> Talend - http://www.talend.com
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>> --
>>>>>> Jean-Baptiste Onofré
>>>>>> jbono...@apache.org
>>>>>> http://blog.nanthrax.net
>>>>>> Talend - http://www.talend.com
>>>>>>
>>>>>>
>>>>>
>>>> --
>>>> Jean-Baptiste Onofré
>>>> jbono...@apache.org
>>>> http://blog.nanthrax.net
>>>> Talend - http://www.talend.com
>>>>
>>>
>>>
>>
> --
> Jean-Baptiste Onofré
> jbono...@apache.org
> http://blog.nanthrax.net
> Talend - http://www.talend.com
>

Reply via email to