My bad Alexey, I will review today. I had skimmed through the patch on my
phone. You are right, exactly-once sink support is not required for now. It
is a quite a different beast and necessarily coupled with transactions on a
specific topic-partitions for correctness.

The primary concern is with the API. The user provides a function to map an
output record to its topic. We have found that such an API is usually
problematic. E.g. what if the record does not encode enough information
about topic? Say we want to select topic name based on aggregation window.
It might be bit more code, but simpler to let the user decide topic for
each record _before_ writing to the sink. E.g. it could be
KafkaIO.Writer<KV<topic, KV<key, value>>.
I wanted to think a little bit more about this, but didn't get around to
it. I will comment on the PR today.

thanks for the initiative and the PR.
Raghu.
On Wed, Oct 24, 2018 at 7:03 AM Alexey Romanenko <aromanenko....@gmail.com>
wrote:

> I added a simple support of this for usual type of Kafka sink (PR:
> https://github.com/apache/beam/pull/6776 , welcomed for review, btw :) )
>
> In the same time, there is another, more complicated, type of sink - EOS
> (Exactly Once Sink). In this case the data is partitioned among fixed
> number of shards and it creates one ShardWriter per shard. In its
> order, ShardWriter depends on Kafka topic. So, seems that in case of
> multiple and dynamic sink topics, we need to create new ShardWriter for
> every new topic per shard,
>
> Is my assumption correct or I missed/misunderstood something?
>
> On 20 Oct 2018, at 01:21, Lukasz Cwik <lc...@google.com> wrote:
>
> Thanks Raghu, added starter and newbie labels to the issue.
>
> On Fri, Oct 19, 2018 at 4:20 PM Raghu Angadi <rang...@google.com> wrote:
>
>> It will be a good starter feature for someone interested in Beam & Kafka.
>> Writer is very simple in Beam. It is little more than a ParDo.
>>
>> On Fri, Oct 19, 2018 at 3:37 PM Dmitry Minaev <mina...@gmail.com> wrote:
>>
>>> Lukasz, I appreciate the quick response and filing the JIRA ticket.
>>> Thanks for the suggestion, unfortunately, I don't have a fixed number of
>>> topics. Still, we'll probably use your approach for a limited number of
>>> topics until the functionality is added, thank you!
>>>
>>> Thanks,
>>> Dmitry
>>>
>>> On Fri, Oct 19, 2018 at 2:53 PM Lukasz Cwik <lc...@google.com> wrote:
>>>
>>>> If there are a fixed number of topics, you could partition your write
>>>> by structuring your pipeline as such:
>>>> ParDo(PartitionByTopic) ----> KafkaIO.write(topicA)
>>>>                         \---> KafkaIO.write(topicB)
>>>>                         \---> KafkaIO.write(...)
>>>>
>>>> There is no support currently for writing to Kafka dynamically based
>>>> upon a destination that is part of the data.
>>>> I filed https://issues.apache.org/jira/browse/BEAM-5798 for the issue.
>>>>
>>>> On Fri, Oct 19, 2018 at 2:05 PM mina...@gmail.com <mina...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi guys!!
>>>>>
>>>>> I'm trying to find a way to write to a Kafka topic using
>>>>> KafkaIO.write() But I need to be able to get topic name dynamically based
>>>>> on the data received. For example, I would like to send data for one 
>>>>> tenant
>>>>> to topic "data_feed_1" and for another tenant to "topic data_feed_999".
>>>>> I'm coming from Flink where it's possible via
>>>>> KeyedSerializationSchema.getTargetTopic().
>>>>> Is there anything similar in KafkaIO?
>>>>>
>>>>> Thanks,
>>>>> Dmitry
>>>>>
>>>> --
>>>
>>> --
>>> Dmitry
>>>
>>
>

Reply via email to