[jira] [Commented] (KAFKA-7820) distinct count kafka streams api
[ https://issues.apache.org/jira/browse/KAFKA-7820?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16744663#comment-16744663 ] Guozhang Wang commented on KAFKA-7820: -- [~vinubarro] Thanks for sharing your use case. I think the proposal 2) from [~bchen225242] may well fit your needs. To be more specific: say you need 10-20 fields that require distinct counts, you can create a repartition key which is a combo of all of these fields via a single repartition topic. For example, if your interested fields are A,B,C, and you create a combo key is (A,B,C), the semantics of a co-partition key is that "all the records with the same values in A,B,C will go to the same partition", which inplies "all the records with the same values of A will go to the same partition" (same for B, C), so after you've done the repartitioning, say to distinctly count on field A, you can aggregate on B/C and count on A, and aggregate on A/C to count on B etc. > distinct count kafka streams api > > > Key: KAFKA-7820 > URL: https://issues.apache.org/jira/browse/KAFKA-7820 > Project: Kafka > Issue Type: New Feature > Components: streams >Reporter: Vinoth Rajasekar >Priority: Minor > Labels: needs-kip > > we are using Kafka streams for our real-time analytic use cases. most of our > use cases involved with doing distinct count on certain fields. > currently we do distinct count by storing the hash map value of the data in a > set and do a count as event flows in. There are lot of challenges doing this > using application memory, because storing the hashmap value and counting them > is limited by the allotted memory size. When we get high volume or spike in > traffic hash map of the distinct count fields grows beyond allotted memory > size leading to issues. > other issue is when we scale the app, we need to use global ktables so we > get all the values for doing distinct count and this adds back pressure in > the cluster or we have to re-partition the topic and do count on the key. > Can we have feature, where the distinct count is supported by through streams > api at the framework level, rather than dealing it with application level. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7820) distinct count kafka streams api
[ https://issues.apache.org/jira/browse/KAFKA-7820?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16743479#comment-16743479 ] Boyang Chen commented on KAFKA-7820: Thanks [~vinubarro] for more details. I think we need to further understand the use case before we decide whether we need to add a new API. # About repartition, since KStream does aggregation on partition level, so it is required to have the same key hashing to the same partition. My question is that how many unique keys we are having here (the combo out of all 15-20 fields)? If the total number of the keys are not that big, it should be ok # We don't need to have multiple KTables to solve the problem. We could just get a common aggregation key and do the counts, if you are referring to one single streaming instance. At Pinterest, we are building a generic API on top of #aggregate() by extracting necessary fields to generate a superset join key through our thrift structure. In conclusion, with proper repartition applied, count() should be suffice for our use case. If we want the field extraction on framework level, we need to have the API support multiple data types (Json, Avro, Thrift). Let me know if this answers your question. > distinct count kafka streams api > > > Key: KAFKA-7820 > URL: https://issues.apache.org/jira/browse/KAFKA-7820 > Project: Kafka > Issue Type: New Feature > Components: streams >Reporter: Vinoth Rajasekar >Priority: Minor > Labels: needs-kip > > we are using Kafka streams for our real-time analytic use cases. most of our > use cases involved with doing distinct count on certain fields. > currently we do distinct count by storing the hash map value of the data in a > set and do a count as event flows in. There are lot of challenges doing this > using application memory, because storing the hashmap value and counting them > is limited by the allotted memory size. When we get high volume or spike in > traffic hash map of the distinct count fields grows beyond allotted memory > size leading to issues. > other issue is when we scale the app, we need to use global ktables so we > get all the values for doing distinct count and this adds back pressure in > the cluster or we have to re-partition the topic and do count on the key. > Can we have feature, where the distinct count is supported by through streams > api at the framework level, rather than dealing it with application level. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7820) distinct count kafka streams api
[ https://issues.apache.org/jira/browse/KAFKA-7820?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16743375#comment-16743375 ] Vinoth Rajasekar commented on KAFKA-7820: - Hey Boyang, That's a good idea and we considered that option too. And that comes with a trade-off allocating more space in the cluster for distinct count itself with re-partitioning. we have large schema with around 15-20 fields qualify for distinct counts based on the use cases for making near-real time decisions. re-partitioning for those many fields we have to trade-off on the space. this will be challenging at enterprise level, since many teams share the same cluster. There are other ways where we can perform distinct count for a field using statetore or ktables. In this scenario we have too maintain too many tables at the application level. This is a nice to have feature for apps that uses Kafka streams for analytical use cases making real-time decisions. since many other streaming frameworks supports this feature, we thought it would be a very useful feature in the streams for many use cases, if this is something that can be handled at framework level . Would implementing this feature at the framework level going to be a heavy-lift? > distinct count kafka streams api > > > Key: KAFKA-7820 > URL: https://issues.apache.org/jira/browse/KAFKA-7820 > Project: Kafka > Issue Type: New Feature > Components: streams >Reporter: Vinoth Rajasekar >Priority: Minor > Labels: needs-kip > > we are using Kafka streams for our real-time analytic use cases. most of our > use cases involved with doing distinct count on certain fields. > currently we do distinct count by storing the hash map value of the data in a > set and do a count as event flows in. There are lot of challenges doing this > using application memory, because storing the hashmap value and counting them > is limited by the allotted memory size. When we get high volume or spike in > traffic hash map of the distinct count fields grows beyond allotted memory > size leading to issues. > other issue is when we scale the app, we need to use global ktables so we > get all the values for doing distinct count and this adds back pressure in > the cluster or we have to re-partition the topic and do count on the key. > Can we have feature, where the distinct count is supported by through streams > api at the framework level, rather than dealing it with application level. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7820) distinct count kafka streams api
[ https://issues.apache.org/jira/browse/KAFKA-7820?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16742549#comment-16742549 ] Boyang Chen commented on KAFKA-7820: Hey Vinoth, thanks for proposing this! Based on your use case, I'm wondering whether we could repartition the input with all the cared fields are a compound key, and aggregate based on the key? That should be able to fulfill your requirement. > distinct count kafka streams api > > > Key: KAFKA-7820 > URL: https://issues.apache.org/jira/browse/KAFKA-7820 > Project: Kafka > Issue Type: New Feature > Components: core >Reporter: Vinoth Rajasekar >Priority: Minor > > we are using Kafka streams for our real-time analytic use cases. most of our > use cases involved with doing distinct count on certain fields. > currently we do distinct count by storing the hash map value of the data in a > set and do a count as event flows in. There are lot of challenges doing this > using application memory, because storing the hashmap value and counting them > is limited by the allotted memory size. When we get high volume or spike in > traffic hash map of the distinct count fields grows beyond allotted memory > size leading to issues. > other issue is when we scale the app, we need to use global ktables so we > get all the values for doing distinct count and this adds back pressure in > the cluster or we have to re-partition the topic and do count on the key. > Can we have feature, where the distinct count is supported by through streams > api at the framework level, rather than dealing it with application level. -- This message was sent by Atlassian JIRA (v7.6.3#76005)