I added a simple support of this for usual type of Kafka sink (PR: 
https://github.com/apache/beam/pull/6776 
<https://github.com/apache/beam/pull/6776> , welcomed for review, btw :) )

In the same time, there is another, more complicated, type of sink - EOS 
(Exactly Once Sink). In this case the data is partitioned among fixed number of 
shards and it creates one ShardWriter per shard. In its order, ShardWriter 
depends on Kafka topic. So, seems that in case of multiple and dynamic sink 
topics, we need to create new ShardWriter for every new topic per shard, 

Is my assumption correct or I missed/misunderstood something? 

> On 20 Oct 2018, at 01:21, Lukasz Cwik <lc...@google.com> wrote:
> 
> Thanks Raghu, added starter and newbie labels to the issue.
> 
> On Fri, Oct 19, 2018 at 4:20 PM Raghu Angadi <rang...@google.com 
> <mailto:rang...@google.com>> wrote:
> It will be a good starter feature for someone interested in Beam & Kafka. 
> Writer is very simple in Beam. It is little more than a ParDo. 
> 
> On Fri, Oct 19, 2018 at 3:37 PM Dmitry Minaev <mina...@gmail.com 
> <mailto:mina...@gmail.com>> wrote:
> Lukasz, I appreciate the quick response and filing the JIRA ticket. Thanks 
> for the suggestion, unfortunately, I don't have a fixed number of topics. 
> Still, we'll probably use your approach for a limited number of topics until 
> the functionality is added, thank you!
> 
> Thanks,
> Dmitry
> 
> On Fri, Oct 19, 2018 at 2:53 PM Lukasz Cwik <lc...@google.com 
> <mailto:lc...@google.com>> wrote:
> If there are a fixed number of topics, you could partition your write by 
> structuring your pipeline as such:
> ParDo(PartitionByTopic) ----> KafkaIO.write(topicA)
>                         \---> KafkaIO.write(topicB)
>                         \---> KafkaIO.write(...)
> 
> There is no support currently for writing to Kafka dynamically based upon a 
> destination that is part of the data.
> I filed https://issues.apache.org/jira/browse/BEAM-5798 
> <https://issues.apache.org/jira/browse/BEAM-5798> for the issue.
> 
> On Fri, Oct 19, 2018 at 2:05 PM mina...@gmail.com <mailto:mina...@gmail.com> 
> <mina...@gmail.com <mailto:mina...@gmail.com>> wrote:
> Hi guys!!
> 
> I'm trying to find a way to write to a Kafka topic using KafkaIO.write() But 
> I need to be able to get topic name dynamically based on the data received. 
> For example, I would like to send data for one tenant to topic "data_feed_1" 
> and for another tenant to "topic data_feed_999".
> I'm coming from Flink where it's possible via 
> KeyedSerializationSchema.getTargetTopic().
> Is there anything similar in KafkaIO?
> 
> Thanks,
> Dmitry
> -- 
> --
> Dmitry
> 

Reply via email to