[jira] [Commented] (BEAM-5798) Add support for dynamic destinations when writing to Kafka
[ https://issues.apache.org/jira/browse/BEAM-5798?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16733135#comment-16733135 ] Alexey Romanenko commented on BEAM-5798: [~rangadi] I submitted new PR using {{ProducerRecord}} to write into different topics that can be set dynamically. Could you take a look on this? Thanks! PR: [https://github.com/apache/beam/pull/7371] > Add support for dynamic destinations when writing to Kafka > -- > > Key: BEAM-5798 > URL: https://issues.apache.org/jira/browse/BEAM-5798 > Project: Beam > Issue Type: New Feature > Components: io-java-kafka >Reporter: Luke Cwik >Assignee: Alexey Romanenko >Priority: Major > Labels: newbie, starter > Time Spent: 2.5h > Remaining Estimate: 0h > > Add support for writing to Kafka based upon contents of the data. This is > similar to the dynamic destination approach for file IO and other sinks. > > Source of request: > https://lists.apache.org/thread.html/a89d1d32ecdb50c42271e805cc01a651ee3623b4df97c39baf4f2053@%3Cuser.beam.apache.org%3E -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (BEAM-5798) Add support for dynamic destinations when writing to Kafka
[ https://issues.apache.org/jira/browse/BEAM-5798?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16678582#comment-16678582 ] Raghu Angadi commented on BEAM-5798: [~aromanenko], yeah, that's what I had in mind. But we should go with whichever we are more comfortable with. > Add support for dynamic destinations when writing to Kafka > -- > > Key: BEAM-5798 > URL: https://issues.apache.org/jira/browse/BEAM-5798 > Project: Beam > Issue Type: New Feature > Components: io-java-kafka >Reporter: Luke Cwik >Assignee: Alexey Romanenko >Priority: Major > Labels: newbie, starter > Time Spent: 1h 50m > Remaining Estimate: 0h > > Add support for writing to Kafka based upon contents of the data. This is > similar to the dynamic destination approach for file IO and other sinks. > > Source of request: > https://lists.apache.org/thread.html/a89d1d32ecdb50c42271e805cc01a651ee3623b4df97c39baf4f2053@%3Cuser.beam.apache.org%3E -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (BEAM-5798) Add support for dynamic destinations when writing to Kafka
[ https://issues.apache.org/jira/browse/BEAM-5798?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16678500#comment-16678500 ] Alexey Romanenko commented on BEAM-5798: [~rangadi] You mean to change the internal API from using KV to use ProducerRecord but keep external API as "it is"? Yes, perhaps it would makes sense too, thanks! > Add support for dynamic destinations when writing to Kafka > -- > > Key: BEAM-5798 > URL: https://issues.apache.org/jira/browse/BEAM-5798 > Project: Beam > Issue Type: New Feature > Components: io-java-kafka >Reporter: Luke Cwik >Assignee: Alexey Romanenko >Priority: Major > Labels: newbie, starter > Time Spent: 1h 50m > Remaining Estimate: 0h > > Add support for writing to Kafka based upon contents of the data. This is > similar to the dynamic destination approach for file IO and other sinks. > > Source of request: > https://lists.apache.org/thread.html/a89d1d32ecdb50c42271e805cc01a651ee3623b4df97c39baf4f2053@%3Cuser.beam.apache.org%3E -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (BEAM-5798) Add support for dynamic destinations when writing to Kafka
[ https://issues.apache.org/jira/browse/BEAM-5798?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16677246#comment-16677246 ] Raghu Angadi commented on BEAM-5798: Outline sounds great. Thanks [~aromanenko]. While implementing, it might be simpler for most of the implementation work with ProducerRecords where, the current KV writer just converts each KV to a ProducerRecord. > Add support for dynamic destinations when writing to Kafka > -- > > Key: BEAM-5798 > URL: https://issues.apache.org/jira/browse/BEAM-5798 > Project: Beam > Issue Type: New Feature > Components: io-java-kafka >Reporter: Luke Cwik >Assignee: Alexey Romanenko >Priority: Major > Labels: newbie, starter > Time Spent: 1h 50m > Remaining Estimate: 0h > > Add support for writing to Kafka based upon contents of the data. This is > similar to the dynamic destination approach for file IO and other sinks. > > Source of request: > https://lists.apache.org/thread.html/a89d1d32ecdb50c42271e805cc01a651ee3623b4df97c39baf4f2053@%3Cuser.beam.apache.org%3E -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (BEAM-5798) Add support for dynamic destinations when writing to Kafka
[ https://issues.apache.org/jira/browse/BEAM-5798?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16677083#comment-16677083 ] Alexey Romanenko commented on BEAM-5798: Following a [discussion|https://lists.apache.org/thread.html/a89d1d32ecdb50c42271e805cc01a651ee3623b4df97c39baf4f2053@%3Cuser.beam.apache.org%3E] on user@ list, I'd summarise the final implementation of this feature: - Do not use user-provided function (as I did in the first PR) since the topic name for current key/value could be defined earlier in a pipeline (for example, the name depends on window where window duration is 1 day). Instead, we will use {{org.apache.kafka.clients.producer.ProducerRecord}} as input record format. Actually, we can continue a work, that has been started in BEAM-4038 - The implementation assumes the followings things to do: -- Add new transform {{ProducerRecordWrite}} which will receive {{ProducerRecord}} as input value, create new KV, where key is null and value is {{ProducerRecord}}, and pass it to general {{KafkaIO.Write}} transform. -- Add new public method {{PTransform, PDone> KafkaIO.Write.writeRecords()}} which will create new {{ProducerRecordWrite}} transform and should be used if user wants to use {{ProducerRecord}} with KafkaIO. -- Add {{ProducerRecordCoder}} and use it by default for {{ProducerRecord}} -- Inside {{KafkaWriter}} we check if the value is instance of {{ProducerRecord}} and if it's a case then use topic name encoded there or default topic if it's null. [~rangadi] what do you think? Is it going to work for you? > Add support for dynamic destinations when writing to Kafka > -- > > Key: BEAM-5798 > URL: https://issues.apache.org/jira/browse/BEAM-5798 > Project: Beam > Issue Type: New Feature > Components: io-java-kafka >Reporter: Luke Cwik >Assignee: Alexey Romanenko >Priority: Major > Labels: newbie, starter > Time Spent: 1h 50m > Remaining Estimate: 0h > > Add support for writing to Kafka based upon contents of the data. This is > similar to the dynamic destination approach for file IO and other sinks. > > Source of request: > https://lists.apache.org/thread.html/a89d1d32ecdb50c42271e805cc01a651ee3623b4df97c39baf4f2053@%3Cuser.beam.apache.org%3E -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (BEAM-5798) Add support for dynamic destinations when writing to Kafka
[ https://issues.apache.org/jira/browse/BEAM-5798?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16658965#comment-16658965 ] Alexey Romanenko commented on BEAM-5798: If nobody minds, I'd take care of this one. > Add support for dynamic destinations when writing to Kafka > -- > > Key: BEAM-5798 > URL: https://issues.apache.org/jira/browse/BEAM-5798 > Project: Beam > Issue Type: New Feature > Components: io-java-kafka >Reporter: Luke Cwik >Priority: Major > Labels: newbie, starter > > Add support for writing to Kafka based upon contents of the data. This is > similar to the dynamic destination approach for file IO and other sinks. > > Source of request: > https://lists.apache.org/thread.html/a89d1d32ecdb50c42271e805cc01a651ee3623b4df97c39baf4f2053@%3Cuser.beam.apache.org%3E -- This message was sent by Atlassian JIRA (v7.6.3#76005)