OK, the general consensus seems to be that more elaborate partitioning functions belong to the scope of Kafka. Could somebody have a look at KAFKA-2092 <https://issues.apache.org/jira/browse/KAFKA-2092> then?
-- Gianmarco On 30 July 2015 at 05:57, Jiangjie Qin <j...@linkedin.com.invalid> wrote: > Just my two cents. I think it might be OK to put this into Kafka if we > agree that this might be a good use case for people who wants to use Kafka > as temporary store for stream processing. At very least I don't see down > side on this. > > Thanks, > > Jiangjie (Becket) Qin > > On Tue, Jul 28, 2015 at 3:41 AM, Gianmarco De Francisci Morales < > g...@apache.org> wrote: > > > Jason, > > Thanks for starting the discussion and for your very concise (and > correct) > > summary. > > > > Ewen, while what you say is true, those kinds of detasets (large number > of > > keys with skew) are very typical in the Web (think Twitter users, or Web > > pages, or even just plain text). > > If you want to compute an aggregate on these datasets (either for > reporting > > purposes, or as part of some analytical task such as machine learning), > > then the skew will kill your performance, and the amount of parallelism > you > > can effectively extract from your dataset. > > PKG is a solution to that, without the full overhead of going to shuffle > > grouping to compute partial aggregates. > > The problem with shuffle grouping is not only the memory, but also the > cost > > of combining the aggregates, which increases with the parallelism level. > > Also, by keeping partial aggregates in 2 places, you can query those at > > runtime with constant overhead (similarly to what you would be able to do > > with hashing) rather than needing to broadcast the query to all > partitions > > (which you need to do with shuffle grouping). > > > > -- > > Gianmarco > > > > On 28 July 2015 at 00:54, Gwen Shapira <gshap...@cloudera.com> wrote: > > > > > I guess it depends on whether the original producer did any "map" > > > tasks or simply wrote raw data. We usually advocate writing raw data, > > > and since we need to write it anyway, the partitioner doesn't > > > introduce any extra "hops". > > > > > > Its definitely useful to look at use-cases and I need to think a bit > > > more on whether huge-key-space-with-large-skew is the only one. > > > I think that there are use-cases that are not pure-aggregate and > > > therefore keeping key-list in memory won't help and scaling to large > > > number of partitions is still required (and therefore skew is a > > > critical problem). However, I may be making stuff up, so need to > > > double check. > > > > > > Gwen > > > > > > > > > > > > > > > > > > On Mon, Jul 27, 2015 at 2:20 PM, Ewen Cheslack-Postava > > > <e...@confluent.io> wrote: > > > > Gwen - this is really like two steps of map reduce though, right? The > > > first > > > > step does the partial shuffle to two partitions per key, second step > > does > > > > partial reduce + final full shuffle, final step does the final > reduce. > > > > > > > > This strikes me as similar to partition assignment strategies in the > > > > consumer in that there will probably be a small handful of commonly > > used > > > > strategies that we can just maintain as part of Kafka. A few people > > will > > > > need more obscure strategies and they can maintain those > > implementations > > > > themselves. For reference, a quick grep of Spark shows 5 > partitioners: > > > Hash > > > > and RangePartitioner, which are in core, PythonPartitioner, > > > GridPartitioner > > > > for partitioning matrices, and ShuffleRowRDD for their SQL > > > implementation. > > > > So I don't think it would be a big deal to include it here, although > > I'm > > > > not really sure how often it's useful -- compared to normal > > partitioning > > > or > > > > just doing two steps by starting with unpartitioned data, you need to > > be > > > > performing an aggregation, the key set needs to be large enough for > > > memory > > > > usage to be a problem (i.e. you don't want each consumer to have to > > > > maintain a map with every key in it), and a sufficiently skewed > > > > distribution (i.e. not just 1 or 2 very hot keys). The key set > > > constraint, > > > > in particular, is the one I'm not convinced by since in practice if > you > > > > have a skewed distribution, you probably also won't actually see > every > > > key > > > > in every partition; each worker actually only needs to maintain a > > subset > > > of > > > > the key set (and associated aggregate data) in memory. > > > > > > > > > > > > On Mon, Jul 27, 2015 at 12:56 PM, Gwen Shapira < > gshap...@cloudera.com> > > > > wrote: > > > > > > > >> If you are used to map-reduce patterns, this sounds like a perfectly > > > >> natural way to process streams of data. > > > >> > > > >> Call the first consumer "map-combine-log", the topic "shuffle-log" > and > > > >> the second consumer "reduce-log" :) > > > >> I like that a lot. It works well for either "embarrassingly > parallel" > > > >> cases, or "so much data that more parallelism is worth the extra > > > >> overhead" cases. > > > >> > > > >> I personally don't care if its in core-Kafka, KIP-28 or a github > > > >> project elsewhere, but I find it useful and non-esoteric. > > > >> > > > >> > > > >> > > > >> On Mon, Jul 27, 2015 at 12:51 PM, Jason Gustafson < > ja...@confluent.io > > > > > > >> wrote: > > > >> > For a little background, the difference between this partitioner > and > > > the > > > >> > default one is that it breaks the deterministic mapping from key > to > > > >> > partition. Instead, messages for a given key can end up in either > of > > > two > > > >> > partitions. This means that the consumer generally won't see all > > > messages > > > >> > for a given key. Instead the consumer would compute an aggregate > for > > > each > > > >> > key on the partitions it consumes and write them to a separate > > topic. > > > For > > > >> > example, if you are writing log messages to a "logs" topic with > the > > > >> > hostname as the key, you could this partitioning strategy to > compute > > > >> > message counts for each host in each partition and write them to a > > > >> > "log-counts" topic. Then a consumer of the "log-counts" topic > would > > > >> compute > > > >> > total aggregates based on the two intermediate aggregates. The > > > benefit is > > > >> > that you are generally going to get better load balancing across > > > >> partitions > > > >> > than if you used the default partitioner. (Please correct me if my > > > >> > understanding is incorrect, Gianmarco) > > > >> > > > > >> > So I think the question is whether this is a useful primitive for > > > Kafka > > > >> to > > > >> > provide out of the box? I was a little concerned that this use > case > > > is a > > > >> > little esoteric for a core feature, but it may make more sense in > > the > > > >> > context of KIP-28 which would provide some higher-level processing > > > >> > capabilities (though it doesn't seem like the KStream abstraction > > > would > > > >> > provide a direct way to leverage this partitioner without custom > > > logic). > > > >> > > > > >> > Thanks, > > > >> > Jason > > > >> > > > > >> > > > > >> > On Wed, Jul 22, 2015 at 12:14 AM, Gianmarco De Francisci Morales < > > > >> > g...@apache.org> wrote: > > > >> > > > > >> >> Hello folks, > > > >> >> > > > >> >> I'd like to ask the community about its opinion on the > partitioning > > > >> >> functions in Kafka. > > > >> >> > > > >> >> With KAFKA-2091 < > https://issues.apache.org/jira/browse/KAFKA-2091> > > > >> >> integrated we are now able to have custom partitioners in the > > > producer. > > > >> >> The question now becomes *which* partitioners should ship with > > Kafka? > > > >> >> This issue arose in the context of KAFKA-2092 > > > >> >> <https://issues.apache.org/jira/browse/KAFKA-2092>, which > > > implements a > > > >> >> specific load-balanced partitioning. This partitioner however > > assumes > > > >> some > > > >> >> stages of processing on top of it to make proper use of the data, > > > i.e., > > > >> it > > > >> >> envisions Kafka as a substrate for stream processing, and not > only > > as > > > >> the > > > >> >> I/O component. > > > >> >> Is this a direction that Kafka wants to go towards? Or is this a > > role > > > >> >> better left to the internal communication systems of other stream > > > >> >> processing engines (e.g., Storm)? > > > >> >> And if the answer is the latter, how would something such a Samza > > > (which > > > >> >> relies mostly on Kafka as its communication substrate) be able to > > > >> implement > > > >> >> advanced partitioning schemes? > > > >> >> > > > >> >> Cheers, > > > >> >> -- > > > >> >> Gianmarco > > > >> >> > > > >> > > > > > > > > > > > > > > > > -- > > > > Thanks, > > > > Ewen > > > > > >