[jira] [Comment Edited] (KAFKA-3511) Provide built-in aggregators sum() and avg() in Kafka Streams DSL

2016-04-11 Thread Michael Noll (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3511?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15235113#comment-15235113
 ] 

Michael Noll edited comment on KAFKA-3511 at 4/11/16 1:54 PM:
--

PS: As [~guozhang] hinted at, we should also keep in mind that preferably we 
build an API that is pleasant to use in both Java 7 and Java 8+ (notably with 
lambdas).  Perhaps the work on this ticket means that we should take a step 
back and look at the state of the current API again and before we introduce 
possible changes because of the additions of this ticket.


was (Author: miguno):
PS: As [~guozhang] hinted at, we should also keep in mind that preferably we 
build an API that is pleasant to use in both Java 7 and Java 8 (notably with 
lambdas).  Perhaps the work on this ticket means that we should take a step 
back and look at the state of the current API again and before we introduce 
possible changes because of the additions of this ticket.

> Provide built-in aggregators sum() and avg() in Kafka Streams DSL
> -
>
> Key: KAFKA-3511
> URL: https://issues.apache.org/jira/browse/KAFKA-3511
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Guozhang Wang
>  Labels: api, newbie
> Fix For: 0.10.0.0
>
>
> Currently we only have one built-in aggregate function count() in the Kafka 
> Streams DSL, but we want to add more aggregation functions like sum() and 
> avg().



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (KAFKA-3511) Provide built-in aggregators sum() and avg() in Kafka Streams DSL

2016-04-11 Thread Michael Noll (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3511?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15234770#comment-15234770
 ] 

Michael Noll edited comment on KAFKA-3511 at 4/11/16 9:08 AM:
--

Hmm.  I'd prefer an API that allows me to write:

* {{stream.max()}} rather than
* {{stream.aggregate(new Max())}}

That said I think we are talking about two related but different questions:

1. Which aggregations do we provide out of the box?
2. How do we expose these built-in aggregations?

Regarding point 1, we could consider these built-in aggregations (cf. Scala's 
collection API, for example), followed by an example use case.  Perhaps, in 
general, it might be a good idea to think in terms of common use cases -- 
taking into account which would make sense on windowed streams, which on 
non-windowed streams, which on KStream, which on KTable, which on both -- when 
deciding whether/which aggregations we would provide out of the box.  Again, I 
am listing concrete function names here but the purpose is to list their 
*functionality* (point 2 is meant to address how this functionality is being 
exposed).

* {{count}} / {{countBy}} -- "how many pageviews between 4AM and 5 AM?"   
(countBy and filter are related via filter.count)
* {{min}} / {{minBy}} -- "the page with the least views"
* {{max}} / {{maxBy}} -- "the page with the most views", "the IP address with 
the most hits (e.g. to implement throttling, blocklists)"
* {{sum}} -- "total number of pageviews per user"
* {{distinct}} -- "distinct elements in the stream" (this is related to 
changelog stream aka KTable and the stream-table duality; perhaps distinct 
would make sense only for KStream, as "distinct" is essentially a native 
feature of KTable)
* Somewhat related: we may also want to consider a "sortBy" and "take" 
operators to help implement "give me the top N things" type of use cases; min 
and max only return a single element.  Idea: 
"windowedStream.sortBy(...).take(5)".  (For {{take}} I'm ignoring the 
discussion of lazy/non-lazy evaluation here).

Note: Unfortunately, because we use Java, we can't benefit from implicits / 
typeclasses as we could e.g. in Scala.  For example, here's the function 
signature of {{sum}} in Scala:

{code}
def sum[B >: Int](implicit num: Numeric[B]): B
{code}

The benefit is that {{sum}} works automagically as long as type {{B}} "looks 
like a number", which is a great help when you need to implement an 
out-of-the-box version of {{sum}} that does the right thing, always.

Regarding point 2, my personal opinion is that *some* basic functionality 
should be provided via dedicated operators -- call it sugar if you will -- to 
achieve a more pleasant API for common use cases (e.g. "count()" rather than 
"aggregate(new Count())").  Of course, we could opt to implement these 
dedicated operators via "aggregate(new Func())" behind the scenes, which may 
help with composability when one wants to write a new API layer on top.

See also:

* Scala API:
** http://docs.scala-lang.org/overviews/collections/trait-traversable
** http://docs.scala-lang.org/overviews/collections/seqs
* Spark Streaming API: 
http://spark.apache.org/docs/latest/streaming-programming-guide.html
* Flink Datastream API: 
https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/index.html#datastream-transformations


was (Author: miguno):
Hmm.  I'd prefer an API that allows me to write:

* {{stream.max()}} rather than
* {{stream.aggregate(new Max())}}

That said I think we are talking about two related but different questions:

1. Which aggregations do we provide out of the box?
2. How do we expose these built-in aggregations?

Regarding point 1, we could consider these built-in aggregations (cf. Scala's 
collection API, for example), followed by an example use case.  Perhaps, in 
general, it might be a good idea to think in terms of common use cases -- 
taking into account which would make sense on windowed streams, which on 
non-windowed streams, which on KStream, which on KTable, which on both -- when 
deciding whether/which aggregations we would provide out of the box.  Again, I 
am listing concrete function names here but the purpose is to list their 
*functionality* (point 2 is meant to address how this functionality is being 
exposed).

* {{count}} / {{countBy}} -- "how many pageviews between 4AM and 5 AM?"   
(countBy and filter are related via filter.count)
* {{min}} / {{minBy}} -- "the page with the least views"
* {{max}} / {{maxBy}} -- "the page with the most views", "the IP address with 
the most hits (e.g. to implement throttling, blocklists)"
* {{sum}} -- "total number of pageviews per user"
* {{distinct}} -- "distinct elements in the stream" (this is related to 
changelog stream aka KTable and the stream-table duality; perhaps distinct 
would make sense only for KStream, as "distinct" is essentially a native 
feature of KTable)
* Somewhat 

[jira] [Comment Edited] (KAFKA-3511) Provide built-in aggregators sum() and avg() in Kafka Streams DSL

2016-04-11 Thread Michael Noll (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3511?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15234770#comment-15234770
 ] 

Michael Noll edited comment on KAFKA-3511 at 4/11/16 9:07 AM:
--

Hmm.  I'd prefer an API that allows me to write:

* {{stream.max()}} rather than
* {{stream.aggregate(new Max())}}

That said I think we are talking about two related but different questions:

1. Which aggregations do we provide out of the box?
2. How do we expose these built-in aggregations?

Regarding point 1, we could consider these built-in aggregations (cf. Scala's 
collection API, for example), followed by an example use case.  Perhaps, in 
general, it might be a good idea to think in terms of common use cases -- 
taking into account which would make sense on windowed streams, which on 
non-windowed streams, which on KStream, which on KTable, which on both -- when 
deciding whether/which aggregations we would provide out of the box.  Again, I 
am listing concrete function names here but the purpose is to list their 
*functionality* (point 2 is meant to address how this functionality is being 
exposed).

* {{count}} / {{countBy}} -- "how many pageviews between 4AM and 5 AM?"   
(countBy and filter are related via filter.count)
* {{min}} / {{minBy}} -- "the page with the least views"
* {{max}} / {{maxBy}} -- "the page with the most views", "the IP address with 
the most hits (e.g. to implement throttling, blocklists)"
* {{sum}} -- "total number of pageviews per user"
* {{distinct}} -- "distinct elements in the stream" (this is related to 
changelog stream aka KTable and the stream-table duality; perhaps distinct 
would make sense only for KStream, as "distinct" is essentially a native 
feature of KTable)
* Somewhat related: we may also want to consider a "sortBy" and "take" 
operators to help implement "give me the top N things" type of use cases; min 
and max only return a single element.  Idea: 
"windowedStream.sortBy(...).take(5)".  I'm ignoring the discussion of 
lazy/non-lazy evaluation here.

Note: Unfortunately, because we use Java, we can't benefit from implicits / 
typeclasses as we could e.g. in Scala.  For example, here's the function 
signature of {{sum}} in Scala:

{code}
def sum[B >: Int](implicit num: Numeric[B]): B
{code}

The benefit is that {{sum}} works automagically as long as type {{B}} "looks 
like a number", which is a great help when you need to implement an 
out-of-the-box version of {{sum}} that does the right thing, always.

Regarding point 2, my personal opinion is that *some* basic functionality 
should be provided via dedicated operators -- call it sugar if you will -- to 
achieve a more pleasant API for common use cases (e.g. "count()" rather than 
"aggregate(new Count())").  Of course, we could opt to implement these 
dedicated operators via "aggregate(new Func())" behind the scenes, which may 
help with composability when one wants to write a new API layer on top.

See also:

* Scala API:
** http://docs.scala-lang.org/overviews/collections/trait-traversable
** http://docs.scala-lang.org/overviews/collections/seqs
* Spark Streaming API: 
http://spark.apache.org/docs/latest/streaming-programming-guide.html
* Flink Datastream API: 
https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/index.html#datastream-transformations


was (Author: miguno):
Hmm.  I'd prefer an API that allows me to write:

* {{stream.max()}} rather than
* {{stream.aggregate(new Max())}}

That said I think we are talking about two related but different questions:

1. Which aggregations do we provide out of the box?
2. How do we expose these built-in aggregations?

Regarding point 1, we could consider these built-in aggregations (cf. Scala's 
collection API, for example), followed by an example use case.  Perhaps, in 
general, it might be a good idea to think in terms of common use cases -- 
taking into account which would make sense on windowed streams, which on 
non-windowed streams, which on KStream, which on KTable, which on both -- when 
deciding whether/which aggregations we would provide out of the box.  Again, I 
am listing concrete function names here but the purpose is to list their 
*functionality* (point 2 is meant to address how this functionality is being 
exposed).

* {{count}} / {{countBy}} -- "how many pageviews between 4AM and 5 AM?"   
(countBy and filter are related via filter.count)
* {{min}} / {{minBy}} -- "the page with the least views"
* {{max}} / {{maxBy}} -- "the page with the most views", "the IP address with 
the most hits (e.g. to implement throttling, blocklists)"
* {{sum}} -- "total number of pageviews per user"
* {{distinct}} -- "distinct elements in the stream" (this is related to 
changelog stream aka KTable and the stream-table duality; perhaps distinct 
would make sense only for KStream, as "distinct" is essentially a native 
feature of KTable)
* Somewhat related: we may 

[jira] [Comment Edited] (KAFKA-3511) Provide built-in aggregators sum() and avg() in Kafka Streams DSL

2016-04-11 Thread Michael Noll (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3511?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15234770#comment-15234770
 ] 

Michael Noll edited comment on KAFKA-3511 at 4/11/16 9:05 AM:
--

Hmm.  I'd prefer an API that allows me to write:

* {{stream.max()}} rather than
* {{stream.aggregate(new Max())}}

That said I think we are talking about two related but different questions:

1. Which aggregations do we provide out of the box?
2. How do we expose these built-in aggregations?

Regarding point 1, we could consider these built-in aggregations (cf. Scala's 
collection API, for example), followed by an example use case.  Perhaps, in 
general, it might be a good idea to think in terms of common use cases -- 
taking into account which would make sense on windowed streams, which on 
non-windowed streams, which on KStream, which on KTable, which on both -- when 
deciding whether/which aggregations we would provide out of the box.  Again, I 
am listing concrete function names here but the purpose is to list their 
*functionality* (point 2 is meant to address how this functionality is being 
exposed).

* {{count}} / {{countBy}} -- "how many pageviews between 4AM and 5 AM?"   
(countBy and filter are related via filter.count)
* {{min}} / {{minBy}} -- "the page with the least views"
* {{max}} / {{maxBy}} -- "the page with the most views", "the IP address with 
the most hits (e.g. to implement throttling, blocklists)"
* {{sum}} -- "total number of pageviews per user"
* {{distinct}} -- "distinct elements in the stream" (this is related to 
changelog stream aka KTable and the stream-table duality; perhaps distinct 
would make sense only for KStream, as "distinct" is essentially a native 
feature of KTable)
* Somewhat related: we may also want to consider a "sortBy" and "take" 
operators to help implement "give me the top N things" type of use cases; min 
and max only return a single element.  Idea: 
"windowedStream.sortBy(...).take(5)".

Note: Unfortunately, because we use Java, we can't benefit from implicits / 
typeclasses as we could e.g. in Scala.  For example, here's the function 
signature of {{sum}} in Scala:

{code}
def sum[B >: Int](implicit num: Numeric[B]): B
{code}

The benefit is that {{sum}} works automagically as long as type {{B}} "looks 
like a number", which is a great help when you need to implement an 
out-of-the-box version of {{sum}} that does the right thing, always.

Regarding point 2, my personal opinion is that *some* basic functionality 
should be provided via dedicated operators -- call it sugar if you will -- to 
achieve a more pleasant API for common use cases (e.g. "count()" rather than 
"aggregate(new Count())").  Of course, we could opt to implement these 
dedicated operators via "aggregate(new Func())" behind the scenes, which may 
help with composability when one wants to write a new API layer on top.

See also:

* Scala API:
** http://docs.scala-lang.org/overviews/collections/trait-traversable
** http://docs.scala-lang.org/overviews/collections/seqs
* Spark Streaming API: 
http://spark.apache.org/docs/latest/streaming-programming-guide.html
* Flink Datastream API: 
https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/index.html#datastream-transformations


was (Author: miguno):
Hmm.  I'd prefer an API that allows me to write:

* {{stream.max()}} rather than
* {{stream.aggregate(new Max())}}

That said I think we are talking about two related but different questions:

1. Which aggregations do we provide out of the box?
2. How do we expose these built-in aggregations?

Regarding point 1, we could consider these built-in aggregations (cf. Scala's 
collection API, for example), followed by an example use case.  Perhaps, in 
general, it might be a good idea to think in terms of common use cases -- 
taking into account which would make sense on windowed streams, which on 
non-windowed streams, which on KStream, which on KTable, which on both -- when 
deciding whether/which aggregations we would provide out of the box.  Again, I 
am listing concrete function names here but the purpose is to list their 
*functionality* (point 2 is meant to address how this functionality is being 
exposed).

* {{count}} / {{countBy}} -- "how many pageviews between 4AM and 5 AM?"   
(countBy and filter are related via filter.count)
* {{min}} / {{minBy}} -- "the page with the least views"
* {{max}} / {{maxBy}} -- "the page with the most views", "the IP address with 
the most hits (e.g. to implement throttling, blocklists)"
* {{sum}} -- "total number of pageviews per user"
* {{distinct}} -- "distinct elements in the stream" (this is related to 
changelog stream aka KTable and the stream-table duality; perhaps distinct 
would make sense only for KStream, as "distinct" is essentially a native 
feature of KTable)
* Somewhat related: we may also want to consider a "sortBy" and "take" 
operators to help