Just my two cents. I think it might be OK to put this into Kafka if we agree that this might be a good use case for people who wants to use Kafka as temporary store for stream processing. At very least I don't see down side on this.
Thanks, Jiangjie (Becket) Qin On Tue, Jul 28, 2015 at 3:41 AM, Gianmarco De Francisci Morales < g...@apache.org> wrote: > Jason, > Thanks for starting the discussion and for your very concise (and correct) > summary. > > Ewen, while what you say is true, those kinds of detasets (large number of > keys with skew) are very typical in the Web (think Twitter users, or Web > pages, or even just plain text). > If you want to compute an aggregate on these datasets (either for reporting > purposes, or as part of some analytical task such as machine learning), > then the skew will kill your performance, and the amount of parallelism you > can effectively extract from your dataset. > PKG is a solution to that, without the full overhead of going to shuffle > grouping to compute partial aggregates. > The problem with shuffle grouping is not only the memory, but also the cost > of combining the aggregates, which increases with the parallelism level. > Also, by keeping partial aggregates in 2 places, you can query those at > runtime with constant overhead (similarly to what you would be able to do > with hashing) rather than needing to broadcast the query to all partitions > (which you need to do with shuffle grouping). > > -- > Gianmarco > > On 28 July 2015 at 00:54, Gwen Shapira <gshap...@cloudera.com> wrote: > > > I guess it depends on whether the original producer did any "map" > > tasks or simply wrote raw data. We usually advocate writing raw data, > > and since we need to write it anyway, the partitioner doesn't > > introduce any extra "hops". > > > > Its definitely useful to look at use-cases and I need to think a bit > > more on whether huge-key-space-with-large-skew is the only one. > > I think that there are use-cases that are not pure-aggregate and > > therefore keeping key-list in memory won't help and scaling to large > > number of partitions is still required (and therefore skew is a > > critical problem). However, I may be making stuff up, so need to > > double check. > > > > Gwen > > > > > > > > > > > > On Mon, Jul 27, 2015 at 2:20 PM, Ewen Cheslack-Postava > > <e...@confluent.io> wrote: > > > Gwen - this is really like two steps of map reduce though, right? The > > first > > > step does the partial shuffle to two partitions per key, second step > does > > > partial reduce + final full shuffle, final step does the final reduce. > > > > > > This strikes me as similar to partition assignment strategies in the > > > consumer in that there will probably be a small handful of commonly > used > > > strategies that we can just maintain as part of Kafka. A few people > will > > > need more obscure strategies and they can maintain those > implementations > > > themselves. For reference, a quick grep of Spark shows 5 partitioners: > > Hash > > > and RangePartitioner, which are in core, PythonPartitioner, > > GridPartitioner > > > for partitioning matrices, and ShuffleRowRDD for their SQL > > implementation. > > > So I don't think it would be a big deal to include it here, although > I'm > > > not really sure how often it's useful -- compared to normal > partitioning > > or > > > just doing two steps by starting with unpartitioned data, you need to > be > > > performing an aggregation, the key set needs to be large enough for > > memory > > > usage to be a problem (i.e. you don't want each consumer to have to > > > maintain a map with every key in it), and a sufficiently skewed > > > distribution (i.e. not just 1 or 2 very hot keys). The key set > > constraint, > > > in particular, is the one I'm not convinced by since in practice if you > > > have a skewed distribution, you probably also won't actually see every > > key > > > in every partition; each worker actually only needs to maintain a > subset > > of > > > the key set (and associated aggregate data) in memory. > > > > > > > > > On Mon, Jul 27, 2015 at 12:56 PM, Gwen Shapira <gshap...@cloudera.com> > > > wrote: > > > > > >> If you are used to map-reduce patterns, this sounds like a perfectly > > >> natural way to process streams of data. > > >> > > >> Call the first consumer "map-combine-log", the topic "shuffle-log" and > > >> the second consumer "reduce-log" :) > > >> I like that a lot. It works well for either "embarrassingly parallel" > > >> cases, or "so much data that more parallelism is worth the extra > > >> overhead" cases. > > >> > > >> I personally don't care if its in core-Kafka, KIP-28 or a github > > >> project elsewhere, but I find it useful and non-esoteric. > > >> > > >> > > >> > > >> On Mon, Jul 27, 2015 at 12:51 PM, Jason Gustafson <ja...@confluent.io > > > > >> wrote: > > >> > For a little background, the difference between this partitioner and > > the > > >> > default one is that it breaks the deterministic mapping from key to > > >> > partition. Instead, messages for a given key can end up in either of > > two > > >> > partitions. This means that the consumer generally won't see all > > messages > > >> > for a given key. Instead the consumer would compute an aggregate for > > each > > >> > key on the partitions it consumes and write them to a separate > topic. > > For > > >> > example, if you are writing log messages to a "logs" topic with the > > >> > hostname as the key, you could this partitioning strategy to compute > > >> > message counts for each host in each partition and write them to a > > >> > "log-counts" topic. Then a consumer of the "log-counts" topic would > > >> compute > > >> > total aggregates based on the two intermediate aggregates. The > > benefit is > > >> > that you are generally going to get better load balancing across > > >> partitions > > >> > than if you used the default partitioner. (Please correct me if my > > >> > understanding is incorrect, Gianmarco) > > >> > > > >> > So I think the question is whether this is a useful primitive for > > Kafka > > >> to > > >> > provide out of the box? I was a little concerned that this use case > > is a > > >> > little esoteric for a core feature, but it may make more sense in > the > > >> > context of KIP-28 which would provide some higher-level processing > > >> > capabilities (though it doesn't seem like the KStream abstraction > > would > > >> > provide a direct way to leverage this partitioner without custom > > logic). > > >> > > > >> > Thanks, > > >> > Jason > > >> > > > >> > > > >> > On Wed, Jul 22, 2015 at 12:14 AM, Gianmarco De Francisci Morales < > > >> > g...@apache.org> wrote: > > >> > > > >> >> Hello folks, > > >> >> > > >> >> I'd like to ask the community about its opinion on the partitioning > > >> >> functions in Kafka. > > >> >> > > >> >> With KAFKA-2091 <https://issues.apache.org/jira/browse/KAFKA-2091> > > >> >> integrated we are now able to have custom partitioners in the > > producer. > > >> >> The question now becomes *which* partitioners should ship with > Kafka? > > >> >> This issue arose in the context of KAFKA-2092 > > >> >> <https://issues.apache.org/jira/browse/KAFKA-2092>, which > > implements a > > >> >> specific load-balanced partitioning. This partitioner however > assumes > > >> some > > >> >> stages of processing on top of it to make proper use of the data, > > i.e., > > >> it > > >> >> envisions Kafka as a substrate for stream processing, and not only > as > > >> the > > >> >> I/O component. > > >> >> Is this a direction that Kafka wants to go towards? Or is this a > role > > >> >> better left to the internal communication systems of other stream > > >> >> processing engines (e.g., Storm)? > > >> >> And if the answer is the latter, how would something such a Samza > > (which > > >> >> relies mostly on Kafka as its communication substrate) be able to > > >> implement > > >> >> advanced partitioning schemes? > > >> >> > > >> >> Cheers, > > >> >> -- > > >> >> Gianmarco > > >> >> > > >> > > > > > > > > > > > > -- > > > Thanks, > > > Ewen > > >