Re: [DISCUSS] Partitioning in Kafka

2015-08-06 Thread Gianmarco De Francisci Morales
OK, the general consensus seems to be that more elaborate partitioning
functions belong to the scope of Kafka.
Could somebody have a look at KAFKA-2092
https://issues.apache.org/jira/browse/KAFKA-2092 then?

--
Gianmarco

On 30 July 2015 at 05:57, Jiangjie Qin j...@linkedin.com.invalid wrote:

 Just my two cents. I think it might be OK to put this into Kafka if we
 agree that this might be a good use case for people who wants to use Kafka
 as temporary store for stream processing. At very least I don't see down
 side on this.

 Thanks,

 Jiangjie (Becket) Qin

 On Tue, Jul 28, 2015 at 3:41 AM, Gianmarco De Francisci Morales 
 g...@apache.org wrote:

  Jason,
  Thanks for starting the discussion and for your very concise (and
 correct)
  summary.
 
  Ewen, while what you say is true, those kinds of detasets (large number
 of
  keys with skew) are very typical in the Web (think Twitter users, or Web
  pages, or even just plain text).
  If you want to compute an aggregate on these datasets (either for
 reporting
  purposes, or as part of some analytical task such as machine learning),
  then the skew will kill your performance, and the amount of parallelism
 you
  can effectively extract from your dataset.
  PKG is a solution to that, without the full overhead of going to shuffle
  grouping to compute partial aggregates.
  The problem with shuffle grouping is not only the memory, but also the
 cost
  of combining the aggregates, which increases with the parallelism level.
  Also, by keeping partial aggregates in 2 places, you can query those at
  runtime with constant overhead (similarly to what you would be able to do
  with hashing) rather than needing to broadcast the query to all
 partitions
  (which you need to do with shuffle grouping).
 
  --
  Gianmarco
 
  On 28 July 2015 at 00:54, Gwen Shapira gshap...@cloudera.com wrote:
 
   I guess it depends on whether the original producer did any map
   tasks or simply wrote raw data. We usually advocate writing raw data,
   and since we need to write it anyway, the partitioner doesn't
   introduce any extra hops.
  
   Its definitely useful to look at use-cases and I need to think a bit
   more on whether huge-key-space-with-large-skew is the only one.
   I think that there are use-cases that are not pure-aggregate and
   therefore keeping key-list in memory won't help and scaling to large
   number of partitions is still required (and therefore skew is a
   critical problem). However, I may be making stuff up, so need to
   double check.
  
   Gwen
  
  
  
  
  
   On Mon, Jul 27, 2015 at 2:20 PM, Ewen Cheslack-Postava
   e...@confluent.io wrote:
Gwen - this is really like two steps of map reduce though, right? The
   first
step does the partial shuffle to two partitions per key, second step
  does
partial reduce + final full shuffle, final step does the final
 reduce.
   
This strikes me as similar to partition assignment strategies in the
consumer in that there will probably be a small handful of commonly
  used
strategies that we can just maintain as part of Kafka. A few people
  will
need more obscure strategies and they can maintain those
  implementations
themselves. For reference, a quick grep of Spark shows 5
 partitioners:
   Hash
and RangePartitioner, which are in core, PythonPartitioner,
   GridPartitioner
for partitioning matrices, and ShuffleRowRDD for their SQL
   implementation.
So I don't think it would be a big deal to include it here, although
  I'm
not really sure how often it's useful -- compared to normal
  partitioning
   or
just doing two steps by starting with unpartitioned data, you need to
  be
performing an aggregation, the key set needs to be large enough for
   memory
usage to be a problem (i.e. you don't want each consumer to have to
maintain a map with every key in it), and a sufficiently skewed
distribution (i.e. not just 1 or 2 very hot keys). The key set
   constraint,
in particular, is the one I'm not convinced by since in practice if
 you
have a skewed distribution, you probably also won't actually see
 every
   key
in every partition; each worker actually only needs to maintain a
  subset
   of
the key set (and associated aggregate data) in memory.
   
   
On Mon, Jul 27, 2015 at 12:56 PM, Gwen Shapira 
 gshap...@cloudera.com
wrote:
   
If you are used to map-reduce patterns, this sounds like a perfectly
natural way to process streams of data.
   
Call the first consumer map-combine-log, the topic shuffle-log
 and
the second consumer reduce-log :)
I like that a lot. It works well for either embarrassingly
 parallel
cases, or so much data that more parallelism is worth the extra
overhead cases.
   
I personally don't care if its in core-Kafka, KIP-28 or a github
project elsewhere, but I find it useful and non-esoteric.
   
   
   
On Mon, Jul 27, 2015 at 12:51 PM, Jason Gustafson 
 

Re: [DISCUSS] Partitioning in Kafka

2015-07-29 Thread Jiangjie Qin
Just my two cents. I think it might be OK to put this into Kafka if we
agree that this might be a good use case for people who wants to use Kafka
as temporary store for stream processing. At very least I don't see down
side on this.

Thanks,

Jiangjie (Becket) Qin

On Tue, Jul 28, 2015 at 3:41 AM, Gianmarco De Francisci Morales 
g...@apache.org wrote:

 Jason,
 Thanks for starting the discussion and for your very concise (and correct)
 summary.

 Ewen, while what you say is true, those kinds of detasets (large number of
 keys with skew) are very typical in the Web (think Twitter users, or Web
 pages, or even just plain text).
 If you want to compute an aggregate on these datasets (either for reporting
 purposes, or as part of some analytical task such as machine learning),
 then the skew will kill your performance, and the amount of parallelism you
 can effectively extract from your dataset.
 PKG is a solution to that, without the full overhead of going to shuffle
 grouping to compute partial aggregates.
 The problem with shuffle grouping is not only the memory, but also the cost
 of combining the aggregates, which increases with the parallelism level.
 Also, by keeping partial aggregates in 2 places, you can query those at
 runtime with constant overhead (similarly to what you would be able to do
 with hashing) rather than needing to broadcast the query to all partitions
 (which you need to do with shuffle grouping).

 --
 Gianmarco

 On 28 July 2015 at 00:54, Gwen Shapira gshap...@cloudera.com wrote:

  I guess it depends on whether the original producer did any map
  tasks or simply wrote raw data. We usually advocate writing raw data,
  and since we need to write it anyway, the partitioner doesn't
  introduce any extra hops.
 
  Its definitely useful to look at use-cases and I need to think a bit
  more on whether huge-key-space-with-large-skew is the only one.
  I think that there are use-cases that are not pure-aggregate and
  therefore keeping key-list in memory won't help and scaling to large
  number of partitions is still required (and therefore skew is a
  critical problem). However, I may be making stuff up, so need to
  double check.
 
  Gwen
 
 
 
 
 
  On Mon, Jul 27, 2015 at 2:20 PM, Ewen Cheslack-Postava
  e...@confluent.io wrote:
   Gwen - this is really like two steps of map reduce though, right? The
  first
   step does the partial shuffle to two partitions per key, second step
 does
   partial reduce + final full shuffle, final step does the final reduce.
  
   This strikes me as similar to partition assignment strategies in the
   consumer in that there will probably be a small handful of commonly
 used
   strategies that we can just maintain as part of Kafka. A few people
 will
   need more obscure strategies and they can maintain those
 implementations
   themselves. For reference, a quick grep of Spark shows 5 partitioners:
  Hash
   and RangePartitioner, which are in core, PythonPartitioner,
  GridPartitioner
   for partitioning matrices, and ShuffleRowRDD for their SQL
  implementation.
   So I don't think it would be a big deal to include it here, although
 I'm
   not really sure how often it's useful -- compared to normal
 partitioning
  or
   just doing two steps by starting with unpartitioned data, you need to
 be
   performing an aggregation, the key set needs to be large enough for
  memory
   usage to be a problem (i.e. you don't want each consumer to have to
   maintain a map with every key in it), and a sufficiently skewed
   distribution (i.e. not just 1 or 2 very hot keys). The key set
  constraint,
   in particular, is the one I'm not convinced by since in practice if you
   have a skewed distribution, you probably also won't actually see every
  key
   in every partition; each worker actually only needs to maintain a
 subset
  of
   the key set (and associated aggregate data) in memory.
  
  
   On Mon, Jul 27, 2015 at 12:56 PM, Gwen Shapira gshap...@cloudera.com
   wrote:
  
   If you are used to map-reduce patterns, this sounds like a perfectly
   natural way to process streams of data.
  
   Call the first consumer map-combine-log, the topic shuffle-log and
   the second consumer reduce-log :)
   I like that a lot. It works well for either embarrassingly parallel
   cases, or so much data that more parallelism is worth the extra
   overhead cases.
  
   I personally don't care if its in core-Kafka, KIP-28 or a github
   project elsewhere, but I find it useful and non-esoteric.
  
  
  
   On Mon, Jul 27, 2015 at 12:51 PM, Jason Gustafson ja...@confluent.io
 
   wrote:
For a little background, the difference between this partitioner and
  the
default one is that it breaks the deterministic mapping from key to
partition. Instead, messages for a given key can end up in either of
  two
partitions. This means that the consumer generally won't see all
  messages
for a given key. Instead the consumer would compute an aggregate for
  each
key on 

Re: [DISCUSS] Partitioning in Kafka

2015-07-28 Thread Gianmarco De Francisci Morales
Jason,
Thanks for starting the discussion and for your very concise (and correct)
summary.

Ewen, while what you say is true, those kinds of detasets (large number of
keys with skew) are very typical in the Web (think Twitter users, or Web
pages, or even just plain text).
If you want to compute an aggregate on these datasets (either for reporting
purposes, or as part of some analytical task such as machine learning),
then the skew will kill your performance, and the amount of parallelism you
can effectively extract from your dataset.
PKG is a solution to that, without the full overhead of going to shuffle
grouping to compute partial aggregates.
The problem with shuffle grouping is not only the memory, but also the cost
of combining the aggregates, which increases with the parallelism level.
Also, by keeping partial aggregates in 2 places, you can query those at
runtime with constant overhead (similarly to what you would be able to do
with hashing) rather than needing to broadcast the query to all partitions
(which you need to do with shuffle grouping).

--
Gianmarco

On 28 July 2015 at 00:54, Gwen Shapira gshap...@cloudera.com wrote:

 I guess it depends on whether the original producer did any map
 tasks or simply wrote raw data. We usually advocate writing raw data,
 and since we need to write it anyway, the partitioner doesn't
 introduce any extra hops.

 Its definitely useful to look at use-cases and I need to think a bit
 more on whether huge-key-space-with-large-skew is the only one.
 I think that there are use-cases that are not pure-aggregate and
 therefore keeping key-list in memory won't help and scaling to large
 number of partitions is still required (and therefore skew is a
 critical problem). However, I may be making stuff up, so need to
 double check.

 Gwen





 On Mon, Jul 27, 2015 at 2:20 PM, Ewen Cheslack-Postava
 e...@confluent.io wrote:
  Gwen - this is really like two steps of map reduce though, right? The
 first
  step does the partial shuffle to two partitions per key, second step does
  partial reduce + final full shuffle, final step does the final reduce.
 
  This strikes me as similar to partition assignment strategies in the
  consumer in that there will probably be a small handful of commonly used
  strategies that we can just maintain as part of Kafka. A few people will
  need more obscure strategies and they can maintain those implementations
  themselves. For reference, a quick grep of Spark shows 5 partitioners:
 Hash
  and RangePartitioner, which are in core, PythonPartitioner,
 GridPartitioner
  for partitioning matrices, and ShuffleRowRDD for their SQL
 implementation.
  So I don't think it would be a big deal to include it here, although I'm
  not really sure how often it's useful -- compared to normal partitioning
 or
  just doing two steps by starting with unpartitioned data, you need to be
  performing an aggregation, the key set needs to be large enough for
 memory
  usage to be a problem (i.e. you don't want each consumer to have to
  maintain a map with every key in it), and a sufficiently skewed
  distribution (i.e. not just 1 or 2 very hot keys). The key set
 constraint,
  in particular, is the one I'm not convinced by since in practice if you
  have a skewed distribution, you probably also won't actually see every
 key
  in every partition; each worker actually only needs to maintain a subset
 of
  the key set (and associated aggregate data) in memory.
 
 
  On Mon, Jul 27, 2015 at 12:56 PM, Gwen Shapira gshap...@cloudera.com
  wrote:
 
  If you are used to map-reduce patterns, this sounds like a perfectly
  natural way to process streams of data.
 
  Call the first consumer map-combine-log, the topic shuffle-log and
  the second consumer reduce-log :)
  I like that a lot. It works well for either embarrassingly parallel
  cases, or so much data that more parallelism is worth the extra
  overhead cases.
 
  I personally don't care if its in core-Kafka, KIP-28 or a github
  project elsewhere, but I find it useful and non-esoteric.
 
 
 
  On Mon, Jul 27, 2015 at 12:51 PM, Jason Gustafson ja...@confluent.io
  wrote:
   For a little background, the difference between this partitioner and
 the
   default one is that it breaks the deterministic mapping from key to
   partition. Instead, messages for a given key can end up in either of
 two
   partitions. This means that the consumer generally won't see all
 messages
   for a given key. Instead the consumer would compute an aggregate for
 each
   key on the partitions it consumes and write them to a separate topic.
 For
   example, if you are writing log messages to a logs topic with the
   hostname as the key, you could this partitioning strategy to compute
   message counts for each host in each partition and write them to a
   log-counts topic. Then a consumer of the log-counts topic would
  compute
   total aggregates based on the two intermediate aggregates. The
 benefit is
   that you are generally going to 

Re: [DISCUSS] Partitioning in Kafka

2015-07-27 Thread Gwen Shapira
If you are used to map-reduce patterns, this sounds like a perfectly
natural way to process streams of data.

Call the first consumer map-combine-log, the topic shuffle-log and
the second consumer reduce-log :)
I like that a lot. It works well for either embarrassingly parallel
cases, or so much data that more parallelism is worth the extra
overhead cases.

I personally don't care if its in core-Kafka, KIP-28 or a github
project elsewhere, but I find it useful and non-esoteric.



On Mon, Jul 27, 2015 at 12:51 PM, Jason Gustafson ja...@confluent.io wrote:
 For a little background, the difference between this partitioner and the
 default one is that it breaks the deterministic mapping from key to
 partition. Instead, messages for a given key can end up in either of two
 partitions. This means that the consumer generally won't see all messages
 for a given key. Instead the consumer would compute an aggregate for each
 key on the partitions it consumes and write them to a separate topic. For
 example, if you are writing log messages to a logs topic with the
 hostname as the key, you could this partitioning strategy to compute
 message counts for each host in each partition and write them to a
 log-counts topic. Then a consumer of the log-counts topic would compute
 total aggregates based on the two intermediate aggregates. The benefit is
 that you are generally going to get better load balancing across partitions
 than if you used the default partitioner. (Please correct me if my
 understanding is incorrect, Gianmarco)

 So I think the question is whether this is a useful primitive for Kafka to
 provide out of the box? I was a little concerned that this use case is a
 little esoteric for a core feature, but it may make more sense in the
 context of KIP-28 which would provide some higher-level processing
 capabilities (though it doesn't seem like the KStream abstraction would
 provide a direct way to leverage this partitioner without custom logic).

 Thanks,
 Jason


 On Wed, Jul 22, 2015 at 12:14 AM, Gianmarco De Francisci Morales 
 g...@apache.org wrote:

 Hello folks,

 I'd like to ask the community about its opinion on the partitioning
 functions in Kafka.

 With KAFKA-2091 https://issues.apache.org/jira/browse/KAFKA-2091
 integrated we are now able to have custom partitioners in the producer.
 The question now becomes *which* partitioners should ship with Kafka?
 This issue arose in the context of KAFKA-2092
 https://issues.apache.org/jira/browse/KAFKA-2092, which implements a
 specific load-balanced partitioning. This partitioner however assumes some
 stages of processing on top of it to make proper use of the data, i.e., it
 envisions Kafka as a substrate for stream processing, and not only as the
 I/O component.
 Is this a direction that Kafka wants to go towards? Or is this a role
 better left to the internal communication systems of other stream
 processing engines (e.g., Storm)?
 And if the answer is the latter, how would something such a Samza (which
 relies mostly on Kafka as its communication substrate) be able to implement
 advanced partitioning schemes?

 Cheers,
 --
 Gianmarco



Re: [DISCUSS] Partitioning in Kafka

2015-07-27 Thread Jason Gustafson
For a little background, the difference between this partitioner and the
default one is that it breaks the deterministic mapping from key to
partition. Instead, messages for a given key can end up in either of two
partitions. This means that the consumer generally won't see all messages
for a given key. Instead the consumer would compute an aggregate for each
key on the partitions it consumes and write them to a separate topic. For
example, if you are writing log messages to a logs topic with the
hostname as the key, you could this partitioning strategy to compute
message counts for each host in each partition and write them to a
log-counts topic. Then a consumer of the log-counts topic would compute
total aggregates based on the two intermediate aggregates. The benefit is
that you are generally going to get better load balancing across partitions
than if you used the default partitioner. (Please correct me if my
understanding is incorrect, Gianmarco)

So I think the question is whether this is a useful primitive for Kafka to
provide out of the box? I was a little concerned that this use case is a
little esoteric for a core feature, but it may make more sense in the
context of KIP-28 which would provide some higher-level processing
capabilities (though it doesn't seem like the KStream abstraction would
provide a direct way to leverage this partitioner without custom logic).

Thanks,
Jason


On Wed, Jul 22, 2015 at 12:14 AM, Gianmarco De Francisci Morales 
g...@apache.org wrote:

 Hello folks,

 I'd like to ask the community about its opinion on the partitioning
 functions in Kafka.

 With KAFKA-2091 https://issues.apache.org/jira/browse/KAFKA-2091
 integrated we are now able to have custom partitioners in the producer.
 The question now becomes *which* partitioners should ship with Kafka?
 This issue arose in the context of KAFKA-2092
 https://issues.apache.org/jira/browse/KAFKA-2092, which implements a
 specific load-balanced partitioning. This partitioner however assumes some
 stages of processing on top of it to make proper use of the data, i.e., it
 envisions Kafka as a substrate for stream processing, and not only as the
 I/O component.
 Is this a direction that Kafka wants to go towards? Or is this a role
 better left to the internal communication systems of other stream
 processing engines (e.g., Storm)?
 And if the answer is the latter, how would something such a Samza (which
 relies mostly on Kafka as its communication substrate) be able to implement
 advanced partitioning schemes?

 Cheers,
 --
 Gianmarco



Re: [DISCUSS] Partitioning in Kafka

2015-07-27 Thread Ewen Cheslack-Postava
Gwen - this is really like two steps of map reduce though, right? The first
step does the partial shuffle to two partitions per key, second step does
partial reduce + final full shuffle, final step does the final reduce.

This strikes me as similar to partition assignment strategies in the
consumer in that there will probably be a small handful of commonly used
strategies that we can just maintain as part of Kafka. A few people will
need more obscure strategies and they can maintain those implementations
themselves. For reference, a quick grep of Spark shows 5 partitioners: Hash
and RangePartitioner, which are in core, PythonPartitioner, GridPartitioner
for partitioning matrices, and ShuffleRowRDD for their SQL implementation.
So I don't think it would be a big deal to include it here, although I'm
not really sure how often it's useful -- compared to normal partitioning or
just doing two steps by starting with unpartitioned data, you need to be
performing an aggregation, the key set needs to be large enough for memory
usage to be a problem (i.e. you don't want each consumer to have to
maintain a map with every key in it), and a sufficiently skewed
distribution (i.e. not just 1 or 2 very hot keys). The key set constraint,
in particular, is the one I'm not convinced by since in practice if you
have a skewed distribution, you probably also won't actually see every key
in every partition; each worker actually only needs to maintain a subset of
the key set (and associated aggregate data) in memory.


On Mon, Jul 27, 2015 at 12:56 PM, Gwen Shapira gshap...@cloudera.com
wrote:

 If you are used to map-reduce patterns, this sounds like a perfectly
 natural way to process streams of data.

 Call the first consumer map-combine-log, the topic shuffle-log and
 the second consumer reduce-log :)
 I like that a lot. It works well for either embarrassingly parallel
 cases, or so much data that more parallelism is worth the extra
 overhead cases.

 I personally don't care if its in core-Kafka, KIP-28 or a github
 project elsewhere, but I find it useful and non-esoteric.



 On Mon, Jul 27, 2015 at 12:51 PM, Jason Gustafson ja...@confluent.io
 wrote:
  For a little background, the difference between this partitioner and the
  default one is that it breaks the deterministic mapping from key to
  partition. Instead, messages for a given key can end up in either of two
  partitions. This means that the consumer generally won't see all messages
  for a given key. Instead the consumer would compute an aggregate for each
  key on the partitions it consumes and write them to a separate topic. For
  example, if you are writing log messages to a logs topic with the
  hostname as the key, you could this partitioning strategy to compute
  message counts for each host in each partition and write them to a
  log-counts topic. Then a consumer of the log-counts topic would
 compute
  total aggregates based on the two intermediate aggregates. The benefit is
  that you are generally going to get better load balancing across
 partitions
  than if you used the default partitioner. (Please correct me if my
  understanding is incorrect, Gianmarco)
 
  So I think the question is whether this is a useful primitive for Kafka
 to
  provide out of the box? I was a little concerned that this use case is a
  little esoteric for a core feature, but it may make more sense in the
  context of KIP-28 which would provide some higher-level processing
  capabilities (though it doesn't seem like the KStream abstraction would
  provide a direct way to leverage this partitioner without custom logic).
 
  Thanks,
  Jason
 
 
  On Wed, Jul 22, 2015 at 12:14 AM, Gianmarco De Francisci Morales 
  g...@apache.org wrote:
 
  Hello folks,
 
  I'd like to ask the community about its opinion on the partitioning
  functions in Kafka.
 
  With KAFKA-2091 https://issues.apache.org/jira/browse/KAFKA-2091
  integrated we are now able to have custom partitioners in the producer.
  The question now becomes *which* partitioners should ship with Kafka?
  This issue arose in the context of KAFKA-2092
  https://issues.apache.org/jira/browse/KAFKA-2092, which implements a
  specific load-balanced partitioning. This partitioner however assumes
 some
  stages of processing on top of it to make proper use of the data, i.e.,
 it
  envisions Kafka as a substrate for stream processing, and not only as
 the
  I/O component.
  Is this a direction that Kafka wants to go towards? Or is this a role
  better left to the internal communication systems of other stream
  processing engines (e.g., Storm)?
  And if the answer is the latter, how would something such a Samza (which
  relies mostly on Kafka as its communication substrate) be able to
 implement
  advanced partitioning schemes?
 
  Cheers,
  --
  Gianmarco
 




-- 
Thanks,
Ewen


Re: [DISCUSS] Partitioning in Kafka

2015-07-27 Thread Gwen Shapira
I guess it depends on whether the original producer did any map
tasks or simply wrote raw data. We usually advocate writing raw data,
and since we need to write it anyway, the partitioner doesn't
introduce any extra hops.

Its definitely useful to look at use-cases and I need to think a bit
more on whether huge-key-space-with-large-skew is the only one.
I think that there are use-cases that are not pure-aggregate and
therefore keeping key-list in memory won't help and scaling to large
number of partitions is still required (and therefore skew is a
critical problem). However, I may be making stuff up, so need to
double check.

Gwen





On Mon, Jul 27, 2015 at 2:20 PM, Ewen Cheslack-Postava
e...@confluent.io wrote:
 Gwen - this is really like two steps of map reduce though, right? The first
 step does the partial shuffle to two partitions per key, second step does
 partial reduce + final full shuffle, final step does the final reduce.

 This strikes me as similar to partition assignment strategies in the
 consumer in that there will probably be a small handful of commonly used
 strategies that we can just maintain as part of Kafka. A few people will
 need more obscure strategies and they can maintain those implementations
 themselves. For reference, a quick grep of Spark shows 5 partitioners: Hash
 and RangePartitioner, which are in core, PythonPartitioner, GridPartitioner
 for partitioning matrices, and ShuffleRowRDD for their SQL implementation.
 So I don't think it would be a big deal to include it here, although I'm
 not really sure how often it's useful -- compared to normal partitioning or
 just doing two steps by starting with unpartitioned data, you need to be
 performing an aggregation, the key set needs to be large enough for memory
 usage to be a problem (i.e. you don't want each consumer to have to
 maintain a map with every key in it), and a sufficiently skewed
 distribution (i.e. not just 1 or 2 very hot keys). The key set constraint,
 in particular, is the one I'm not convinced by since in practice if you
 have a skewed distribution, you probably also won't actually see every key
 in every partition; each worker actually only needs to maintain a subset of
 the key set (and associated aggregate data) in memory.


 On Mon, Jul 27, 2015 at 12:56 PM, Gwen Shapira gshap...@cloudera.com
 wrote:

 If you are used to map-reduce patterns, this sounds like a perfectly
 natural way to process streams of data.

 Call the first consumer map-combine-log, the topic shuffle-log and
 the second consumer reduce-log :)
 I like that a lot. It works well for either embarrassingly parallel
 cases, or so much data that more parallelism is worth the extra
 overhead cases.

 I personally don't care if its in core-Kafka, KIP-28 or a github
 project elsewhere, but I find it useful and non-esoteric.



 On Mon, Jul 27, 2015 at 12:51 PM, Jason Gustafson ja...@confluent.io
 wrote:
  For a little background, the difference between this partitioner and the
  default one is that it breaks the deterministic mapping from key to
  partition. Instead, messages for a given key can end up in either of two
  partitions. This means that the consumer generally won't see all messages
  for a given key. Instead the consumer would compute an aggregate for each
  key on the partitions it consumes and write them to a separate topic. For
  example, if you are writing log messages to a logs topic with the
  hostname as the key, you could this partitioning strategy to compute
  message counts for each host in each partition and write them to a
  log-counts topic. Then a consumer of the log-counts topic would
 compute
  total aggregates based on the two intermediate aggregates. The benefit is
  that you are generally going to get better load balancing across
 partitions
  than if you used the default partitioner. (Please correct me if my
  understanding is incorrect, Gianmarco)
 
  So I think the question is whether this is a useful primitive for Kafka
 to
  provide out of the box? I was a little concerned that this use case is a
  little esoteric for a core feature, but it may make more sense in the
  context of KIP-28 which would provide some higher-level processing
  capabilities (though it doesn't seem like the KStream abstraction would
  provide a direct way to leverage this partitioner without custom logic).
 
  Thanks,
  Jason
 
 
  On Wed, Jul 22, 2015 at 12:14 AM, Gianmarco De Francisci Morales 
  g...@apache.org wrote:
 
  Hello folks,
 
  I'd like to ask the community about its opinion on the partitioning
  functions in Kafka.
 
  With KAFKA-2091 https://issues.apache.org/jira/browse/KAFKA-2091
  integrated we are now able to have custom partitioners in the producer.
  The question now becomes *which* partitioners should ship with Kafka?
  This issue arose in the context of KAFKA-2092
  https://issues.apache.org/jira/browse/KAFKA-2092, which implements a
  specific load-balanced partitioning. This partitioner however assumes
 some

[DISCUSS] Partitioning in Kafka

2015-07-22 Thread Gianmarco De Francisci Morales
Hello folks,

I'd like to ask the community about its opinion on the partitioning
functions in Kafka.

With KAFKA-2091 https://issues.apache.org/jira/browse/KAFKA-2091
integrated we are now able to have custom partitioners in the producer.
The question now becomes *which* partitioners should ship with Kafka?
This issue arose in the context of KAFKA-2092
https://issues.apache.org/jira/browse/KAFKA-2092, which implements a
specific load-balanced partitioning. This partitioner however assumes some
stages of processing on top of it to make proper use of the data, i.e., it
envisions Kafka as a substrate for stream processing, and not only as the
I/O component.
Is this a direction that Kafka wants to go towards? Or is this a role
better left to the internal communication systems of other stream
processing engines (e.g., Storm)?
And if the answer is the latter, how would something such a Samza (which
relies mostly on Kafka as its communication substrate) be able to implement
advanced partitioning schemes?

Cheers,
--
Gianmarco