Just my two cents. I think it might be OK to put this into Kafka if we
agree that this might be a good use case for people who wants to use Kafka
as temporary store for stream processing. At very least I don't see down
side on this.

Thanks,

Jiangjie (Becket) Qin

On Tue, Jul 28, 2015 at 3:41 AM, Gianmarco De Francisci Morales <
g...@apache.org> wrote:

> Jason,
> Thanks for starting the discussion and for your very concise (and correct)
> summary.
>
> Ewen, while what you say is true, those kinds of detasets (large number of
> keys with skew) are very typical in the Web (think Twitter users, or Web
> pages, or even just plain text).
> If you want to compute an aggregate on these datasets (either for reporting
> purposes, or as part of some analytical task such as machine learning),
> then the skew will kill your performance, and the amount of parallelism you
> can effectively extract from your dataset.
> PKG is a solution to that, without the full overhead of going to shuffle
> grouping to compute partial aggregates.
> The problem with shuffle grouping is not only the memory, but also the cost
> of combining the aggregates, which increases with the parallelism level.
> Also, by keeping partial aggregates in 2 places, you can query those at
> runtime with constant overhead (similarly to what you would be able to do
> with hashing) rather than needing to broadcast the query to all partitions
> (which you need to do with shuffle grouping).
>
> --
> Gianmarco
>
> On 28 July 2015 at 00:54, Gwen Shapira <gshap...@cloudera.com> wrote:
>
> > I guess it depends on whether the original producer did any "map"
> > tasks or simply wrote raw data. We usually advocate writing raw data,
> > and since we need to write it anyway, the partitioner doesn't
> > introduce any extra "hops".
> >
> > Its definitely useful to look at use-cases and I need to think a bit
> > more on whether huge-key-space-with-large-skew is the only one.
> > I think that there are use-cases that are not pure-aggregate and
> > therefore keeping key-list in memory won't help and scaling to large
> > number of partitions is still required (and therefore skew is a
> > critical problem). However, I may be making stuff up, so need to
> > double check.
> >
> > Gwen
> >
> >
> >
> >
> >
> > On Mon, Jul 27, 2015 at 2:20 PM, Ewen Cheslack-Postava
> > <e...@confluent.io> wrote:
> > > Gwen - this is really like two steps of map reduce though, right? The
> > first
> > > step does the partial shuffle to two partitions per key, second step
> does
> > > partial reduce + final full shuffle, final step does the final reduce.
> > >
> > > This strikes me as similar to partition assignment strategies in the
> > > consumer in that there will probably be a small handful of commonly
> used
> > > strategies that we can just maintain as part of Kafka. A few people
> will
> > > need more obscure strategies and they can maintain those
> implementations
> > > themselves. For reference, a quick grep of Spark shows 5 partitioners:
> > Hash
> > > and RangePartitioner, which are in core, PythonPartitioner,
> > GridPartitioner
> > > for partitioning matrices, and ShuffleRowRDD for their SQL
> > implementation.
> > > So I don't think it would be a big deal to include it here, although
> I'm
> > > not really sure how often it's useful -- compared to normal
> partitioning
> > or
> > > just doing two steps by starting with unpartitioned data, you need to
> be
> > > performing an aggregation, the key set needs to be large enough for
> > memory
> > > usage to be a problem (i.e. you don't want each consumer to have to
> > > maintain a map with every key in it), and a sufficiently skewed
> > > distribution (i.e. not just 1 or 2 very hot keys). The key set
> > constraint,
> > > in particular, is the one I'm not convinced by since in practice if you
> > > have a skewed distribution, you probably also won't actually see every
> > key
> > > in every partition; each worker actually only needs to maintain a
> subset
> > of
> > > the key set (and associated aggregate data) in memory.
> > >
> > >
> > > On Mon, Jul 27, 2015 at 12:56 PM, Gwen Shapira <gshap...@cloudera.com>
> > > wrote:
> > >
> > >> If you are used to map-reduce patterns, this sounds like a perfectly
> > >> natural way to process streams of data.
> > >>
> > >> Call the first consumer "map-combine-log", the topic "shuffle-log" and
> > >> the second consumer "reduce-log" :)
> > >> I like that a lot. It works well for either "embarrassingly parallel"
> > >> cases, or "so much data that more parallelism is worth the extra
> > >> overhead" cases.
> > >>
> > >> I personally don't care if its in core-Kafka, KIP-28 or a github
> > >> project elsewhere, but I find it useful and non-esoteric.
> > >>
> > >>
> > >>
> > >> On Mon, Jul 27, 2015 at 12:51 PM, Jason Gustafson <ja...@confluent.io
> >
> > >> wrote:
> > >> > For a little background, the difference between this partitioner and
> > the
> > >> > default one is that it breaks the deterministic mapping from key to
> > >> > partition. Instead, messages for a given key can end up in either of
> > two
> > >> > partitions. This means that the consumer generally won't see all
> > messages
> > >> > for a given key. Instead the consumer would compute an aggregate for
> > each
> > >> > key on the partitions it consumes and write them to a separate
> topic.
> > For
> > >> > example, if you are writing log messages to a "logs" topic with the
> > >> > hostname as the key, you could this partitioning strategy to compute
> > >> > message counts for each host in each partition and write them to a
> > >> > "log-counts" topic. Then a consumer of the "log-counts" topic would
> > >> compute
> > >> > total aggregates based on the two intermediate aggregates. The
> > benefit is
> > >> > that you are generally going to get better load balancing across
> > >> partitions
> > >> > than if you used the default partitioner. (Please correct me if my
> > >> > understanding is incorrect, Gianmarco)
> > >> >
> > >> > So I think the question is whether this is a useful primitive for
> > Kafka
> > >> to
> > >> > provide out of the box? I was a little concerned that this use case
> > is a
> > >> > little esoteric for a core feature, but it may make more sense in
> the
> > >> > context of KIP-28 which would provide some higher-level processing
> > >> > capabilities (though it doesn't seem like the KStream abstraction
> > would
> > >> > provide a direct way to leverage this partitioner without custom
> > logic).
> > >> >
> > >> > Thanks,
> > >> > Jason
> > >> >
> > >> >
> > >> > On Wed, Jul 22, 2015 at 12:14 AM, Gianmarco De Francisci Morales <
> > >> > g...@apache.org> wrote:
> > >> >
> > >> >> Hello folks,
> > >> >>
> > >> >> I'd like to ask the community about its opinion on the partitioning
> > >> >> functions in Kafka.
> > >> >>
> > >> >> With KAFKA-2091 <https://issues.apache.org/jira/browse/KAFKA-2091>
> > >> >> integrated we are now able to have custom partitioners in the
> > producer.
> > >> >> The question now becomes *which* partitioners should ship with
> Kafka?
> > >> >> This issue arose in the context of KAFKA-2092
> > >> >> <https://issues.apache.org/jira/browse/KAFKA-2092>, which
> > implements a
> > >> >> specific load-balanced partitioning. This partitioner however
> assumes
> > >> some
> > >> >> stages of processing on top of it to make proper use of the data,
> > i.e.,
> > >> it
> > >> >> envisions Kafka as a substrate for stream processing, and not only
> as
> > >> the
> > >> >> I/O component.
> > >> >> Is this a direction that Kafka wants to go towards? Or is this a
> role
> > >> >> better left to the internal communication systems of other stream
> > >> >> processing engines (e.g., Storm)?
> > >> >> And if the answer is the latter, how would something such a Samza
> > (which
> > >> >> relies mostly on Kafka as its communication substrate) be able to
> > >> implement
> > >> >> advanced partitioning schemes?
> > >> >>
> > >> >> Cheers,
> > >> >> --
> > >> >> Gianmarco
> > >> >>
> > >>
> > >
> > >
> > >
> > > --
> > > Thanks,
> > > Ewen
> >
>

Reply via email to