OK, the general consensus seems to be that more elaborate partitioning
functions belong to the scope of Kafka.
Could somebody have a look at KAFKA-2092
<https://issues.apache.org/jira/browse/KAFKA-2092> then?

--
Gianmarco

On 30 July 2015 at 05:57, Jiangjie Qin <j...@linkedin.com.invalid> wrote:

> Just my two cents. I think it might be OK to put this into Kafka if we
> agree that this might be a good use case for people who wants to use Kafka
> as temporary store for stream processing. At very least I don't see down
> side on this.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
> On Tue, Jul 28, 2015 at 3:41 AM, Gianmarco De Francisci Morales <
> g...@apache.org> wrote:
>
> > Jason,
> > Thanks for starting the discussion and for your very concise (and
> correct)
> > summary.
> >
> > Ewen, while what you say is true, those kinds of detasets (large number
> of
> > keys with skew) are very typical in the Web (think Twitter users, or Web
> > pages, or even just plain text).
> > If you want to compute an aggregate on these datasets (either for
> reporting
> > purposes, or as part of some analytical task such as machine learning),
> > then the skew will kill your performance, and the amount of parallelism
> you
> > can effectively extract from your dataset.
> > PKG is a solution to that, without the full overhead of going to shuffle
> > grouping to compute partial aggregates.
> > The problem with shuffle grouping is not only the memory, but also the
> cost
> > of combining the aggregates, which increases with the parallelism level.
> > Also, by keeping partial aggregates in 2 places, you can query those at
> > runtime with constant overhead (similarly to what you would be able to do
> > with hashing) rather than needing to broadcast the query to all
> partitions
> > (which you need to do with shuffle grouping).
> >
> > --
> > Gianmarco
> >
> > On 28 July 2015 at 00:54, Gwen Shapira <gshap...@cloudera.com> wrote:
> >
> > > I guess it depends on whether the original producer did any "map"
> > > tasks or simply wrote raw data. We usually advocate writing raw data,
> > > and since we need to write it anyway, the partitioner doesn't
> > > introduce any extra "hops".
> > >
> > > Its definitely useful to look at use-cases and I need to think a bit
> > > more on whether huge-key-space-with-large-skew is the only one.
> > > I think that there are use-cases that are not pure-aggregate and
> > > therefore keeping key-list in memory won't help and scaling to large
> > > number of partitions is still required (and therefore skew is a
> > > critical problem). However, I may be making stuff up, so need to
> > > double check.
> > >
> > > Gwen
> > >
> > >
> > >
> > >
> > >
> > > On Mon, Jul 27, 2015 at 2:20 PM, Ewen Cheslack-Postava
> > > <e...@confluent.io> wrote:
> > > > Gwen - this is really like two steps of map reduce though, right? The
> > > first
> > > > step does the partial shuffle to two partitions per key, second step
> > does
> > > > partial reduce + final full shuffle, final step does the final
> reduce.
> > > >
> > > > This strikes me as similar to partition assignment strategies in the
> > > > consumer in that there will probably be a small handful of commonly
> > used
> > > > strategies that we can just maintain as part of Kafka. A few people
> > will
> > > > need more obscure strategies and they can maintain those
> > implementations
> > > > themselves. For reference, a quick grep of Spark shows 5
> partitioners:
> > > Hash
> > > > and RangePartitioner, which are in core, PythonPartitioner,
> > > GridPartitioner
> > > > for partitioning matrices, and ShuffleRowRDD for their SQL
> > > implementation.
> > > > So I don't think it would be a big deal to include it here, although
> > I'm
> > > > not really sure how often it's useful -- compared to normal
> > partitioning
> > > or
> > > > just doing two steps by starting with unpartitioned data, you need to
> > be
> > > > performing an aggregation, the key set needs to be large enough for
> > > memory
> > > > usage to be a problem (i.e. you don't want each consumer to have to
> > > > maintain a map with every key in it), and a sufficiently skewed
> > > > distribution (i.e. not just 1 or 2 very hot keys). The key set
> > > constraint,
> > > > in particular, is the one I'm not convinced by since in practice if
> you
> > > > have a skewed distribution, you probably also won't actually see
> every
> > > key
> > > > in every partition; each worker actually only needs to maintain a
> > subset
> > > of
> > > > the key set (and associated aggregate data) in memory.
> > > >
> > > >
> > > > On Mon, Jul 27, 2015 at 12:56 PM, Gwen Shapira <
> gshap...@cloudera.com>
> > > > wrote:
> > > >
> > > >> If you are used to map-reduce patterns, this sounds like a perfectly
> > > >> natural way to process streams of data.
> > > >>
> > > >> Call the first consumer "map-combine-log", the topic "shuffle-log"
> and
> > > >> the second consumer "reduce-log" :)
> > > >> I like that a lot. It works well for either "embarrassingly
> parallel"
> > > >> cases, or "so much data that more parallelism is worth the extra
> > > >> overhead" cases.
> > > >>
> > > >> I personally don't care if its in core-Kafka, KIP-28 or a github
> > > >> project elsewhere, but I find it useful and non-esoteric.
> > > >>
> > > >>
> > > >>
> > > >> On Mon, Jul 27, 2015 at 12:51 PM, Jason Gustafson <
> ja...@confluent.io
> > >
> > > >> wrote:
> > > >> > For a little background, the difference between this partitioner
> and
> > > the
> > > >> > default one is that it breaks the deterministic mapping from key
> to
> > > >> > partition. Instead, messages for a given key can end up in either
> of
> > > two
> > > >> > partitions. This means that the consumer generally won't see all
> > > messages
> > > >> > for a given key. Instead the consumer would compute an aggregate
> for
> > > each
> > > >> > key on the partitions it consumes and write them to a separate
> > topic.
> > > For
> > > >> > example, if you are writing log messages to a "logs" topic with
> the
> > > >> > hostname as the key, you could this partitioning strategy to
> compute
> > > >> > message counts for each host in each partition and write them to a
> > > >> > "log-counts" topic. Then a consumer of the "log-counts" topic
> would
> > > >> compute
> > > >> > total aggregates based on the two intermediate aggregates. The
> > > benefit is
> > > >> > that you are generally going to get better load balancing across
> > > >> partitions
> > > >> > than if you used the default partitioner. (Please correct me if my
> > > >> > understanding is incorrect, Gianmarco)
> > > >> >
> > > >> > So I think the question is whether this is a useful primitive for
> > > Kafka
> > > >> to
> > > >> > provide out of the box? I was a little concerned that this use
> case
> > > is a
> > > >> > little esoteric for a core feature, but it may make more sense in
> > the
> > > >> > context of KIP-28 which would provide some higher-level processing
> > > >> > capabilities (though it doesn't seem like the KStream abstraction
> > > would
> > > >> > provide a direct way to leverage this partitioner without custom
> > > logic).
> > > >> >
> > > >> > Thanks,
> > > >> > Jason
> > > >> >
> > > >> >
> > > >> > On Wed, Jul 22, 2015 at 12:14 AM, Gianmarco De Francisci Morales <
> > > >> > g...@apache.org> wrote:
> > > >> >
> > > >> >> Hello folks,
> > > >> >>
> > > >> >> I'd like to ask the community about its opinion on the
> partitioning
> > > >> >> functions in Kafka.
> > > >> >>
> > > >> >> With KAFKA-2091 <
> https://issues.apache.org/jira/browse/KAFKA-2091>
> > > >> >> integrated we are now able to have custom partitioners in the
> > > producer.
> > > >> >> The question now becomes *which* partitioners should ship with
> > Kafka?
> > > >> >> This issue arose in the context of KAFKA-2092
> > > >> >> <https://issues.apache.org/jira/browse/KAFKA-2092>, which
> > > implements a
> > > >> >> specific load-balanced partitioning. This partitioner however
> > assumes
> > > >> some
> > > >> >> stages of processing on top of it to make proper use of the data,
> > > i.e.,
> > > >> it
> > > >> >> envisions Kafka as a substrate for stream processing, and not
> only
> > as
> > > >> the
> > > >> >> I/O component.
> > > >> >> Is this a direction that Kafka wants to go towards? Or is this a
> > role
> > > >> >> better left to the internal communication systems of other stream
> > > >> >> processing engines (e.g., Storm)?
> > > >> >> And if the answer is the latter, how would something such a Samza
> > > (which
> > > >> >> relies mostly on Kafka as its communication substrate) be able to
> > > >> implement
> > > >> >> advanced partitioning schemes?
> > > >> >>
> > > >> >> Cheers,
> > > >> >> --
> > > >> >> Gianmarco
> > > >> >>
> > > >>
> > > >
> > > >
> > > >
> > > > --
> > > > Thanks,
> > > > Ewen
> > >
> >
>

Reply via email to