[jira] [Commented] (KAFKA-7820) distinct count kafka streams api

2019-01-16 Thread Guozhang Wang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7820?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16744663#comment-16744663
 ] 

Guozhang Wang commented on KAFKA-7820:
--

[~vinubarro] Thanks for sharing your use case. I think the proposal 2) from 
[~bchen225242] may well fit your needs. To be more specific: say you need 10-20 
fields that require distinct counts, you can create a repartition key which is 
a combo of all of these fields via a single repartition topic. For example, if 
your interested fields are A,B,C, and you create a combo key is (A,B,C), the 
semantics of a co-partition key is that "all the records with the same values 
in A,B,C will go to the same partition", which inplies "all the records with 
the same values of A will go to the same partition" (same for B, C), so after 
you've done the repartitioning, say to distinctly count on field A, you can 
aggregate on B/C and count on A, and aggregate on A/C to count on B etc.

> distinct count kafka streams api
> 
>
> Key: KAFKA-7820
> URL: https://issues.apache.org/jira/browse/KAFKA-7820
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Vinoth Rajasekar
>Priority: Minor
>  Labels: needs-kip
>
> we are using Kafka streams for our real-time analytic use cases. most of our 
> use cases involved with doing distinct count on certain fields.
> currently we do distinct count by storing the hash map value of the data in a 
> set and do a count as event flows in. There are lot of challenges doing this 
> using application memory, because storing the hashmap value and counting them 
> is limited by the allotted memory size. When we get high volume  or spike in 
> traffic hash map of the distinct count fields grows beyond allotted memory 
> size leading to issues.
> other issue is when  we scale the app, we need to use global ktables so we 
> get all the values for doing distinct count and this adds back pressure in 
> the cluster or we have to re-partition the topic and do count on the key.
> Can we have feature, where the distinct count is supported by through streams 
> api at the framework level, rather than dealing it with application level.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7820) distinct count kafka streams api

2019-01-15 Thread Boyang Chen (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7820?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16743479#comment-16743479
 ] 

Boyang Chen commented on KAFKA-7820:


Thanks [~vinubarro] for more details. I think we need to further understand the 
use case before we decide whether we need to add a new API. 
 # About repartition, since KStream does aggregation on partition level, so it 
is required to have the same key hashing to the same partition. My question is 
that how many unique keys we are having here (the combo out of all 15-20 
fields)? If the total number of the keys are not that big, it should be ok 
 # We don't need to have multiple KTables to solve the problem. We could just 
get a common aggregation key and do the counts, if you are referring to one 
single streaming instance. At Pinterest, we are building a generic API on top 
of #aggregate() by extracting necessary fields to generate a superset join key 
through our thrift structure.

In conclusion, with proper repartition applied, count() should be suffice for 
our use case. If we want the field extraction on framework level, we need to 
have the API support multiple data types (Json, Avro, Thrift). Let me know if 
this answers your question. 

> distinct count kafka streams api
> 
>
> Key: KAFKA-7820
> URL: https://issues.apache.org/jira/browse/KAFKA-7820
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Vinoth Rajasekar
>Priority: Minor
>  Labels: needs-kip
>
> we are using Kafka streams for our real-time analytic use cases. most of our 
> use cases involved with doing distinct count on certain fields.
> currently we do distinct count by storing the hash map value of the data in a 
> set and do a count as event flows in. There are lot of challenges doing this 
> using application memory, because storing the hashmap value and counting them 
> is limited by the allotted memory size. When we get high volume  or spike in 
> traffic hash map of the distinct count fields grows beyond allotted memory 
> size leading to issues.
> other issue is when  we scale the app, we need to use global ktables so we 
> get all the values for doing distinct count and this adds back pressure in 
> the cluster or we have to re-partition the topic and do count on the key.
> Can we have feature, where the distinct count is supported by through streams 
> api at the framework level, rather than dealing it with application level.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7820) distinct count kafka streams api

2019-01-15 Thread Vinoth Rajasekar (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7820?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16743375#comment-16743375
 ] 

Vinoth Rajasekar commented on KAFKA-7820:
-

Hey Boyang,

That's a good idea and we considered that option too. And that comes with a 
trade-off allocating more space in the cluster for distinct count itself with 
re-partitioning. we have large schema with around 15-20 fields qualify for 
distinct counts based on the use cases for making near-real time decisions. 
re-partitioning for those many fields we have to trade-off on the space. this 
will be challenging at enterprise level, since many teams share the same 
cluster.

There are other ways where we can perform distinct count for a field using 
statetore or ktables. In this scenario we have too maintain too many tables at 
the application level. 

This is a nice to have feature for apps that uses Kafka streams for analytical 
use cases making real-time decisions. since many other streaming frameworks 
supports this feature, we thought it would be a very useful feature in the 
streams for many use cases, if this is something that can be handled at 
framework level .

Would implementing this feature at the framework level going to be a heavy-lift?

 

> distinct count kafka streams api
> 
>
> Key: KAFKA-7820
> URL: https://issues.apache.org/jira/browse/KAFKA-7820
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Vinoth Rajasekar
>Priority: Minor
>  Labels: needs-kip
>
> we are using Kafka streams for our real-time analytic use cases. most of our 
> use cases involved with doing distinct count on certain fields.
> currently we do distinct count by storing the hash map value of the data in a 
> set and do a count as event flows in. There are lot of challenges doing this 
> using application memory, because storing the hashmap value and counting them 
> is limited by the allotted memory size. When we get high volume  or spike in 
> traffic hash map of the distinct count fields grows beyond allotted memory 
> size leading to issues.
> other issue is when  we scale the app, we need to use global ktables so we 
> get all the values for doing distinct count and this adds back pressure in 
> the cluster or we have to re-partition the topic and do count on the key.
> Can we have feature, where the distinct count is supported by through streams 
> api at the framework level, rather than dealing it with application level.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7820) distinct count kafka streams api

2019-01-14 Thread Boyang Chen (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7820?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16742549#comment-16742549
 ] 

Boyang Chen commented on KAFKA-7820:


Hey Vinoth,

thanks for proposing this! Based on your use case, I'm wondering whether we 
could repartition the input with all the cared fields are a compound key, and 
aggregate based on the key? That should be able to fulfill your requirement.

> distinct count kafka streams api
> 
>
> Key: KAFKA-7820
> URL: https://issues.apache.org/jira/browse/KAFKA-7820
> Project: Kafka
>  Issue Type: New Feature
>  Components: core
>Reporter: Vinoth Rajasekar
>Priority: Minor
>
> we are using Kafka streams for our real-time analytic use cases. most of our 
> use cases involved with doing distinct count on certain fields.
> currently we do distinct count by storing the hash map value of the data in a 
> set and do a count as event flows in. There are lot of challenges doing this 
> using application memory, because storing the hashmap value and counting them 
> is limited by the allotted memory size. When we get high volume  or spike in 
> traffic hash map of the distinct count fields grows beyond allotted memory 
> size leading to issues.
> other issue is when  we scale the app, we need to use global ktables so we 
> get all the values for doing distinct count and this adds back pressure in 
> the cluster or we have to re-partition the topic and do count on the key.
> Can we have feature, where the distinct count is supported by through streams 
> api at the framework level, rather than dealing it with application level.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)