On Wed, Oct 24, 2018 at 10:47 AM Raghu Angadi <rang...@google.com> wrote:

> My bad Alexey, I will review today. I had skimmed through the patch on my
> phone. You are right, exactly-once sink support is not required for now.
>



> It is a quite a different beast and necessarily coupled with transactions
> on a specific topic-partitions for correctness.
>
Actually I take this back. It I don't think it coupled with output topic
and partitions. It might just work (assuming Kafka can handle individual
transactions spanning many topics well). As you mentioned, we would still
need to plumb it through. As such we don't know if exactly-once sink is
being used much... (I would love to hear about it if anyone is using it).


>
> The primary concern is with the API. The user provides a function to map
> an output record to its topic. We have found that such an API is usually
> problematic. E.g. what if the record does not encode enough information
> about topic? Say we want to select topic name based on aggregation window.
> It might be bit more code, but simpler to let the user decide topic for
> each record _before_ writing to the sink. E.g. it could be
> KafkaIO.Writer<KV<topic, KV<key, value>>.
> I wanted to think a little bit more about this, but didn't get around to
> it. I will comment on the PR today.
>
> thanks for the initiative and the PR.
> Raghu.
> On Wed, Oct 24, 2018 at 7:03 AM Alexey Romanenko <aromanenko....@gmail.com>
> wrote:
>
>> I added a simple support of this for usual type of Kafka sink (PR:
>> https://github.com/apache/beam/pull/6776 , welcomed for review, btw :) )
>>
>> In the same time, there is another, more complicated, type of sink - EOS
>> (Exactly Once Sink). In this case the data is partitioned among fixed
>> number of shards and it creates one ShardWriter per shard. In its
>> order, ShardWriter depends on Kafka topic. So, seems that in case of
>> multiple and dynamic sink topics, we need to create new ShardWriter for
>> every new topic per shard,
>>
>> Is my assumption correct or I missed/misunderstood something?
>>
>> On 20 Oct 2018, at 01:21, Lukasz Cwik <lc...@google.com> wrote:
>>
>> Thanks Raghu, added starter and newbie labels to the issue.
>>
>> On Fri, Oct 19, 2018 at 4:20 PM Raghu Angadi <rang...@google.com> wrote:
>>
>>> It will be a good starter feature for someone interested in Beam &
>>> Kafka. Writer is very simple in Beam. It is little more than a ParDo.
>>>
>>> On Fri, Oct 19, 2018 at 3:37 PM Dmitry Minaev <mina...@gmail.com> wrote:
>>>
>>>> Lukasz, I appreciate the quick response and filing the JIRA ticket.
>>>> Thanks for the suggestion, unfortunately, I don't have a fixed number of
>>>> topics. Still, we'll probably use your approach for a limited number of
>>>> topics until the functionality is added, thank you!
>>>>
>>>> Thanks,
>>>> Dmitry
>>>>
>>>> On Fri, Oct 19, 2018 at 2:53 PM Lukasz Cwik <lc...@google.com> wrote:
>>>>
>>>>> If there are a fixed number of topics, you could partition your write
>>>>> by structuring your pipeline as such:
>>>>> ParDo(PartitionByTopic) ----> KafkaIO.write(topicA)
>>>>>                         \---> KafkaIO.write(topicB)
>>>>>                         \---> KafkaIO.write(...)
>>>>>
>>>>> There is no support currently for writing to Kafka dynamically based
>>>>> upon a destination that is part of the data.
>>>>> I filed https://issues.apache.org/jira/browse/BEAM-5798 for the issue.
>>>>>
>>>>> On Fri, Oct 19, 2018 at 2:05 PM mina...@gmail.com <mina...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi guys!!
>>>>>>
>>>>>> I'm trying to find a way to write to a Kafka topic using
>>>>>> KafkaIO.write() But I need to be able to get topic name dynamically based
>>>>>> on the data received. For example, I would like to send data for one 
>>>>>> tenant
>>>>>> to topic "data_feed_1" and for another tenant to "topic data_feed_999".
>>>>>> I'm coming from Flink where it's possible via
>>>>>> KeyedSerializationSchema.getTargetTopic().
>>>>>> Is there anything similar in KafkaIO?
>>>>>>
>>>>>> Thanks,
>>>>>> Dmitry
>>>>>>
>>>>> --
>>>>
>>>> --
>>>> Dmitry
>>>>
>>>
>>

Reply via email to