Re: [DISCUSS] Partitioning in Kafka
OK, the general consensus seems to be that more elaborate partitioning functions belong to the scope of Kafka. Could somebody have a look at KAFKA-2092 https://issues.apache.org/jira/browse/KAFKA-2092 then? -- Gianmarco On 30 July 2015 at 05:57, Jiangjie Qin j...@linkedin.com.invalid wrote: Just my two cents. I think it might be OK to put this into Kafka if we agree that this might be a good use case for people who wants to use Kafka as temporary store for stream processing. At very least I don't see down side on this. Thanks, Jiangjie (Becket) Qin On Tue, Jul 28, 2015 at 3:41 AM, Gianmarco De Francisci Morales g...@apache.org wrote: Jason, Thanks for starting the discussion and for your very concise (and correct) summary. Ewen, while what you say is true, those kinds of detasets (large number of keys with skew) are very typical in the Web (think Twitter users, or Web pages, or even just plain text). If you want to compute an aggregate on these datasets (either for reporting purposes, or as part of some analytical task such as machine learning), then the skew will kill your performance, and the amount of parallelism you can effectively extract from your dataset. PKG is a solution to that, without the full overhead of going to shuffle grouping to compute partial aggregates. The problem with shuffle grouping is not only the memory, but also the cost of combining the aggregates, which increases with the parallelism level. Also, by keeping partial aggregates in 2 places, you can query those at runtime with constant overhead (similarly to what you would be able to do with hashing) rather than needing to broadcast the query to all partitions (which you need to do with shuffle grouping). -- Gianmarco On 28 July 2015 at 00:54, Gwen Shapira gshap...@cloudera.com wrote: I guess it depends on whether the original producer did any map tasks or simply wrote raw data. We usually advocate writing raw data, and since we need to write it anyway, the partitioner doesn't introduce any extra hops. Its definitely useful to look at use-cases and I need to think a bit more on whether huge-key-space-with-large-skew is the only one. I think that there are use-cases that are not pure-aggregate and therefore keeping key-list in memory won't help and scaling to large number of partitions is still required (and therefore skew is a critical problem). However, I may be making stuff up, so need to double check. Gwen On Mon, Jul 27, 2015 at 2:20 PM, Ewen Cheslack-Postava e...@confluent.io wrote: Gwen - this is really like two steps of map reduce though, right? The first step does the partial shuffle to two partitions per key, second step does partial reduce + final full shuffle, final step does the final reduce. This strikes me as similar to partition assignment strategies in the consumer in that there will probably be a small handful of commonly used strategies that we can just maintain as part of Kafka. A few people will need more obscure strategies and they can maintain those implementations themselves. For reference, a quick grep of Spark shows 5 partitioners: Hash and RangePartitioner, which are in core, PythonPartitioner, GridPartitioner for partitioning matrices, and ShuffleRowRDD for their SQL implementation. So I don't think it would be a big deal to include it here, although I'm not really sure how often it's useful -- compared to normal partitioning or just doing two steps by starting with unpartitioned data, you need to be performing an aggregation, the key set needs to be large enough for memory usage to be a problem (i.e. you don't want each consumer to have to maintain a map with every key in it), and a sufficiently skewed distribution (i.e. not just 1 or 2 very hot keys). The key set constraint, in particular, is the one I'm not convinced by since in practice if you have a skewed distribution, you probably also won't actually see every key in every partition; each worker actually only needs to maintain a subset of the key set (and associated aggregate data) in memory. On Mon, Jul 27, 2015 at 12:56 PM, Gwen Shapira gshap...@cloudera.com wrote: If you are used to map-reduce patterns, this sounds like a perfectly natural way to process streams of data. Call the first consumer map-combine-log, the topic shuffle-log and the second consumer reduce-log :) I like that a lot. It works well for either embarrassingly parallel cases, or so much data that more parallelism is worth the extra overhead cases. I personally don't care if its in core-Kafka, KIP-28 or a github project elsewhere, but I find it useful and non-esoteric. On Mon, Jul 27, 2015 at 12:51 PM, Jason Gustafson
Re: [DISCUSS] Partitioning in Kafka
Just my two cents. I think it might be OK to put this into Kafka if we agree that this might be a good use case for people who wants to use Kafka as temporary store for stream processing. At very least I don't see down side on this. Thanks, Jiangjie (Becket) Qin On Tue, Jul 28, 2015 at 3:41 AM, Gianmarco De Francisci Morales g...@apache.org wrote: Jason, Thanks for starting the discussion and for your very concise (and correct) summary. Ewen, while what you say is true, those kinds of detasets (large number of keys with skew) are very typical in the Web (think Twitter users, or Web pages, or even just plain text). If you want to compute an aggregate on these datasets (either for reporting purposes, or as part of some analytical task such as machine learning), then the skew will kill your performance, and the amount of parallelism you can effectively extract from your dataset. PKG is a solution to that, without the full overhead of going to shuffle grouping to compute partial aggregates. The problem with shuffle grouping is not only the memory, but also the cost of combining the aggregates, which increases with the parallelism level. Also, by keeping partial aggregates in 2 places, you can query those at runtime with constant overhead (similarly to what you would be able to do with hashing) rather than needing to broadcast the query to all partitions (which you need to do with shuffle grouping). -- Gianmarco On 28 July 2015 at 00:54, Gwen Shapira gshap...@cloudera.com wrote: I guess it depends on whether the original producer did any map tasks or simply wrote raw data. We usually advocate writing raw data, and since we need to write it anyway, the partitioner doesn't introduce any extra hops. Its definitely useful to look at use-cases and I need to think a bit more on whether huge-key-space-with-large-skew is the only one. I think that there are use-cases that are not pure-aggregate and therefore keeping key-list in memory won't help and scaling to large number of partitions is still required (and therefore skew is a critical problem). However, I may be making stuff up, so need to double check. Gwen On Mon, Jul 27, 2015 at 2:20 PM, Ewen Cheslack-Postava e...@confluent.io wrote: Gwen - this is really like two steps of map reduce though, right? The first step does the partial shuffle to two partitions per key, second step does partial reduce + final full shuffle, final step does the final reduce. This strikes me as similar to partition assignment strategies in the consumer in that there will probably be a small handful of commonly used strategies that we can just maintain as part of Kafka. A few people will need more obscure strategies and they can maintain those implementations themselves. For reference, a quick grep of Spark shows 5 partitioners: Hash and RangePartitioner, which are in core, PythonPartitioner, GridPartitioner for partitioning matrices, and ShuffleRowRDD for their SQL implementation. So I don't think it would be a big deal to include it here, although I'm not really sure how often it's useful -- compared to normal partitioning or just doing two steps by starting with unpartitioned data, you need to be performing an aggregation, the key set needs to be large enough for memory usage to be a problem (i.e. you don't want each consumer to have to maintain a map with every key in it), and a sufficiently skewed distribution (i.e. not just 1 or 2 very hot keys). The key set constraint, in particular, is the one I'm not convinced by since in practice if you have a skewed distribution, you probably also won't actually see every key in every partition; each worker actually only needs to maintain a subset of the key set (and associated aggregate data) in memory. On Mon, Jul 27, 2015 at 12:56 PM, Gwen Shapira gshap...@cloudera.com wrote: If you are used to map-reduce patterns, this sounds like a perfectly natural way to process streams of data. Call the first consumer map-combine-log, the topic shuffle-log and the second consumer reduce-log :) I like that a lot. It works well for either embarrassingly parallel cases, or so much data that more parallelism is worth the extra overhead cases. I personally don't care if its in core-Kafka, KIP-28 or a github project elsewhere, but I find it useful and non-esoteric. On Mon, Jul 27, 2015 at 12:51 PM, Jason Gustafson ja...@confluent.io wrote: For a little background, the difference between this partitioner and the default one is that it breaks the deterministic mapping from key to partition. Instead, messages for a given key can end up in either of two partitions. This means that the consumer generally won't see all messages for a given key. Instead the consumer would compute an aggregate for each key on
Re: [DISCUSS] Partitioning in Kafka
Jason, Thanks for starting the discussion and for your very concise (and correct) summary. Ewen, while what you say is true, those kinds of detasets (large number of keys with skew) are very typical in the Web (think Twitter users, or Web pages, or even just plain text). If you want to compute an aggregate on these datasets (either for reporting purposes, or as part of some analytical task such as machine learning), then the skew will kill your performance, and the amount of parallelism you can effectively extract from your dataset. PKG is a solution to that, without the full overhead of going to shuffle grouping to compute partial aggregates. The problem with shuffle grouping is not only the memory, but also the cost of combining the aggregates, which increases with the parallelism level. Also, by keeping partial aggregates in 2 places, you can query those at runtime with constant overhead (similarly to what you would be able to do with hashing) rather than needing to broadcast the query to all partitions (which you need to do with shuffle grouping). -- Gianmarco On 28 July 2015 at 00:54, Gwen Shapira gshap...@cloudera.com wrote: I guess it depends on whether the original producer did any map tasks or simply wrote raw data. We usually advocate writing raw data, and since we need to write it anyway, the partitioner doesn't introduce any extra hops. Its definitely useful to look at use-cases and I need to think a bit more on whether huge-key-space-with-large-skew is the only one. I think that there are use-cases that are not pure-aggregate and therefore keeping key-list in memory won't help and scaling to large number of partitions is still required (and therefore skew is a critical problem). However, I may be making stuff up, so need to double check. Gwen On Mon, Jul 27, 2015 at 2:20 PM, Ewen Cheslack-Postava e...@confluent.io wrote: Gwen - this is really like two steps of map reduce though, right? The first step does the partial shuffle to two partitions per key, second step does partial reduce + final full shuffle, final step does the final reduce. This strikes me as similar to partition assignment strategies in the consumer in that there will probably be a small handful of commonly used strategies that we can just maintain as part of Kafka. A few people will need more obscure strategies and they can maintain those implementations themselves. For reference, a quick grep of Spark shows 5 partitioners: Hash and RangePartitioner, which are in core, PythonPartitioner, GridPartitioner for partitioning matrices, and ShuffleRowRDD for their SQL implementation. So I don't think it would be a big deal to include it here, although I'm not really sure how often it's useful -- compared to normal partitioning or just doing two steps by starting with unpartitioned data, you need to be performing an aggregation, the key set needs to be large enough for memory usage to be a problem (i.e. you don't want each consumer to have to maintain a map with every key in it), and a sufficiently skewed distribution (i.e. not just 1 or 2 very hot keys). The key set constraint, in particular, is the one I'm not convinced by since in practice if you have a skewed distribution, you probably also won't actually see every key in every partition; each worker actually only needs to maintain a subset of the key set (and associated aggregate data) in memory. On Mon, Jul 27, 2015 at 12:56 PM, Gwen Shapira gshap...@cloudera.com wrote: If you are used to map-reduce patterns, this sounds like a perfectly natural way to process streams of data. Call the first consumer map-combine-log, the topic shuffle-log and the second consumer reduce-log :) I like that a lot. It works well for either embarrassingly parallel cases, or so much data that more parallelism is worth the extra overhead cases. I personally don't care if its in core-Kafka, KIP-28 or a github project elsewhere, but I find it useful and non-esoteric. On Mon, Jul 27, 2015 at 12:51 PM, Jason Gustafson ja...@confluent.io wrote: For a little background, the difference between this partitioner and the default one is that it breaks the deterministic mapping from key to partition. Instead, messages for a given key can end up in either of two partitions. This means that the consumer generally won't see all messages for a given key. Instead the consumer would compute an aggregate for each key on the partitions it consumes and write them to a separate topic. For example, if you are writing log messages to a logs topic with the hostname as the key, you could this partitioning strategy to compute message counts for each host in each partition and write them to a log-counts topic. Then a consumer of the log-counts topic would compute total aggregates based on the two intermediate aggregates. The benefit is that you are generally going to
Re: [DISCUSS] Partitioning in Kafka
If you are used to map-reduce patterns, this sounds like a perfectly natural way to process streams of data. Call the first consumer map-combine-log, the topic shuffle-log and the second consumer reduce-log :) I like that a lot. It works well for either embarrassingly parallel cases, or so much data that more parallelism is worth the extra overhead cases. I personally don't care if its in core-Kafka, KIP-28 or a github project elsewhere, but I find it useful and non-esoteric. On Mon, Jul 27, 2015 at 12:51 PM, Jason Gustafson ja...@confluent.io wrote: For a little background, the difference between this partitioner and the default one is that it breaks the deterministic mapping from key to partition. Instead, messages for a given key can end up in either of two partitions. This means that the consumer generally won't see all messages for a given key. Instead the consumer would compute an aggregate for each key on the partitions it consumes and write them to a separate topic. For example, if you are writing log messages to a logs topic with the hostname as the key, you could this partitioning strategy to compute message counts for each host in each partition and write them to a log-counts topic. Then a consumer of the log-counts topic would compute total aggregates based on the two intermediate aggregates. The benefit is that you are generally going to get better load balancing across partitions than if you used the default partitioner. (Please correct me if my understanding is incorrect, Gianmarco) So I think the question is whether this is a useful primitive for Kafka to provide out of the box? I was a little concerned that this use case is a little esoteric for a core feature, but it may make more sense in the context of KIP-28 which would provide some higher-level processing capabilities (though it doesn't seem like the KStream abstraction would provide a direct way to leverage this partitioner without custom logic). Thanks, Jason On Wed, Jul 22, 2015 at 12:14 AM, Gianmarco De Francisci Morales g...@apache.org wrote: Hello folks, I'd like to ask the community about its opinion on the partitioning functions in Kafka. With KAFKA-2091 https://issues.apache.org/jira/browse/KAFKA-2091 integrated we are now able to have custom partitioners in the producer. The question now becomes *which* partitioners should ship with Kafka? This issue arose in the context of KAFKA-2092 https://issues.apache.org/jira/browse/KAFKA-2092, which implements a specific load-balanced partitioning. This partitioner however assumes some stages of processing on top of it to make proper use of the data, i.e., it envisions Kafka as a substrate for stream processing, and not only as the I/O component. Is this a direction that Kafka wants to go towards? Or is this a role better left to the internal communication systems of other stream processing engines (e.g., Storm)? And if the answer is the latter, how would something such a Samza (which relies mostly on Kafka as its communication substrate) be able to implement advanced partitioning schemes? Cheers, -- Gianmarco
Re: [DISCUSS] Partitioning in Kafka
For a little background, the difference between this partitioner and the default one is that it breaks the deterministic mapping from key to partition. Instead, messages for a given key can end up in either of two partitions. This means that the consumer generally won't see all messages for a given key. Instead the consumer would compute an aggregate for each key on the partitions it consumes and write them to a separate topic. For example, if you are writing log messages to a logs topic with the hostname as the key, you could this partitioning strategy to compute message counts for each host in each partition and write them to a log-counts topic. Then a consumer of the log-counts topic would compute total aggregates based on the two intermediate aggregates. The benefit is that you are generally going to get better load balancing across partitions than if you used the default partitioner. (Please correct me if my understanding is incorrect, Gianmarco) So I think the question is whether this is a useful primitive for Kafka to provide out of the box? I was a little concerned that this use case is a little esoteric for a core feature, but it may make more sense in the context of KIP-28 which would provide some higher-level processing capabilities (though it doesn't seem like the KStream abstraction would provide a direct way to leverage this partitioner without custom logic). Thanks, Jason On Wed, Jul 22, 2015 at 12:14 AM, Gianmarco De Francisci Morales g...@apache.org wrote: Hello folks, I'd like to ask the community about its opinion on the partitioning functions in Kafka. With KAFKA-2091 https://issues.apache.org/jira/browse/KAFKA-2091 integrated we are now able to have custom partitioners in the producer. The question now becomes *which* partitioners should ship with Kafka? This issue arose in the context of KAFKA-2092 https://issues.apache.org/jira/browse/KAFKA-2092, which implements a specific load-balanced partitioning. This partitioner however assumes some stages of processing on top of it to make proper use of the data, i.e., it envisions Kafka as a substrate for stream processing, and not only as the I/O component. Is this a direction that Kafka wants to go towards? Or is this a role better left to the internal communication systems of other stream processing engines (e.g., Storm)? And if the answer is the latter, how would something such a Samza (which relies mostly on Kafka as its communication substrate) be able to implement advanced partitioning schemes? Cheers, -- Gianmarco
Re: [DISCUSS] Partitioning in Kafka
Gwen - this is really like two steps of map reduce though, right? The first step does the partial shuffle to two partitions per key, second step does partial reduce + final full shuffle, final step does the final reduce. This strikes me as similar to partition assignment strategies in the consumer in that there will probably be a small handful of commonly used strategies that we can just maintain as part of Kafka. A few people will need more obscure strategies and they can maintain those implementations themselves. For reference, a quick grep of Spark shows 5 partitioners: Hash and RangePartitioner, which are in core, PythonPartitioner, GridPartitioner for partitioning matrices, and ShuffleRowRDD for their SQL implementation. So I don't think it would be a big deal to include it here, although I'm not really sure how often it's useful -- compared to normal partitioning or just doing two steps by starting with unpartitioned data, you need to be performing an aggregation, the key set needs to be large enough for memory usage to be a problem (i.e. you don't want each consumer to have to maintain a map with every key in it), and a sufficiently skewed distribution (i.e. not just 1 or 2 very hot keys). The key set constraint, in particular, is the one I'm not convinced by since in practice if you have a skewed distribution, you probably also won't actually see every key in every partition; each worker actually only needs to maintain a subset of the key set (and associated aggregate data) in memory. On Mon, Jul 27, 2015 at 12:56 PM, Gwen Shapira gshap...@cloudera.com wrote: If you are used to map-reduce patterns, this sounds like a perfectly natural way to process streams of data. Call the first consumer map-combine-log, the topic shuffle-log and the second consumer reduce-log :) I like that a lot. It works well for either embarrassingly parallel cases, or so much data that more parallelism is worth the extra overhead cases. I personally don't care if its in core-Kafka, KIP-28 or a github project elsewhere, but I find it useful and non-esoteric. On Mon, Jul 27, 2015 at 12:51 PM, Jason Gustafson ja...@confluent.io wrote: For a little background, the difference between this partitioner and the default one is that it breaks the deterministic mapping from key to partition. Instead, messages for a given key can end up in either of two partitions. This means that the consumer generally won't see all messages for a given key. Instead the consumer would compute an aggregate for each key on the partitions it consumes and write them to a separate topic. For example, if you are writing log messages to a logs topic with the hostname as the key, you could this partitioning strategy to compute message counts for each host in each partition and write them to a log-counts topic. Then a consumer of the log-counts topic would compute total aggregates based on the two intermediate aggregates. The benefit is that you are generally going to get better load balancing across partitions than if you used the default partitioner. (Please correct me if my understanding is incorrect, Gianmarco) So I think the question is whether this is a useful primitive for Kafka to provide out of the box? I was a little concerned that this use case is a little esoteric for a core feature, but it may make more sense in the context of KIP-28 which would provide some higher-level processing capabilities (though it doesn't seem like the KStream abstraction would provide a direct way to leverage this partitioner without custom logic). Thanks, Jason On Wed, Jul 22, 2015 at 12:14 AM, Gianmarco De Francisci Morales g...@apache.org wrote: Hello folks, I'd like to ask the community about its opinion on the partitioning functions in Kafka. With KAFKA-2091 https://issues.apache.org/jira/browse/KAFKA-2091 integrated we are now able to have custom partitioners in the producer. The question now becomes *which* partitioners should ship with Kafka? This issue arose in the context of KAFKA-2092 https://issues.apache.org/jira/browse/KAFKA-2092, which implements a specific load-balanced partitioning. This partitioner however assumes some stages of processing on top of it to make proper use of the data, i.e., it envisions Kafka as a substrate for stream processing, and not only as the I/O component. Is this a direction that Kafka wants to go towards? Or is this a role better left to the internal communication systems of other stream processing engines (e.g., Storm)? And if the answer is the latter, how would something such a Samza (which relies mostly on Kafka as its communication substrate) be able to implement advanced partitioning schemes? Cheers, -- Gianmarco -- Thanks, Ewen
Re: [DISCUSS] Partitioning in Kafka
I guess it depends on whether the original producer did any map tasks or simply wrote raw data. We usually advocate writing raw data, and since we need to write it anyway, the partitioner doesn't introduce any extra hops. Its definitely useful to look at use-cases and I need to think a bit more on whether huge-key-space-with-large-skew is the only one. I think that there are use-cases that are not pure-aggregate and therefore keeping key-list in memory won't help and scaling to large number of partitions is still required (and therefore skew is a critical problem). However, I may be making stuff up, so need to double check. Gwen On Mon, Jul 27, 2015 at 2:20 PM, Ewen Cheslack-Postava e...@confluent.io wrote: Gwen - this is really like two steps of map reduce though, right? The first step does the partial shuffle to two partitions per key, second step does partial reduce + final full shuffle, final step does the final reduce. This strikes me as similar to partition assignment strategies in the consumer in that there will probably be a small handful of commonly used strategies that we can just maintain as part of Kafka. A few people will need more obscure strategies and they can maintain those implementations themselves. For reference, a quick grep of Spark shows 5 partitioners: Hash and RangePartitioner, which are in core, PythonPartitioner, GridPartitioner for partitioning matrices, and ShuffleRowRDD for their SQL implementation. So I don't think it would be a big deal to include it here, although I'm not really sure how often it's useful -- compared to normal partitioning or just doing two steps by starting with unpartitioned data, you need to be performing an aggregation, the key set needs to be large enough for memory usage to be a problem (i.e. you don't want each consumer to have to maintain a map with every key in it), and a sufficiently skewed distribution (i.e. not just 1 or 2 very hot keys). The key set constraint, in particular, is the one I'm not convinced by since in practice if you have a skewed distribution, you probably also won't actually see every key in every partition; each worker actually only needs to maintain a subset of the key set (and associated aggregate data) in memory. On Mon, Jul 27, 2015 at 12:56 PM, Gwen Shapira gshap...@cloudera.com wrote: If you are used to map-reduce patterns, this sounds like a perfectly natural way to process streams of data. Call the first consumer map-combine-log, the topic shuffle-log and the second consumer reduce-log :) I like that a lot. It works well for either embarrassingly parallel cases, or so much data that more parallelism is worth the extra overhead cases. I personally don't care if its in core-Kafka, KIP-28 or a github project elsewhere, but I find it useful and non-esoteric. On Mon, Jul 27, 2015 at 12:51 PM, Jason Gustafson ja...@confluent.io wrote: For a little background, the difference between this partitioner and the default one is that it breaks the deterministic mapping from key to partition. Instead, messages for a given key can end up in either of two partitions. This means that the consumer generally won't see all messages for a given key. Instead the consumer would compute an aggregate for each key on the partitions it consumes and write them to a separate topic. For example, if you are writing log messages to a logs topic with the hostname as the key, you could this partitioning strategy to compute message counts for each host in each partition and write them to a log-counts topic. Then a consumer of the log-counts topic would compute total aggregates based on the two intermediate aggregates. The benefit is that you are generally going to get better load balancing across partitions than if you used the default partitioner. (Please correct me if my understanding is incorrect, Gianmarco) So I think the question is whether this is a useful primitive for Kafka to provide out of the box? I was a little concerned that this use case is a little esoteric for a core feature, but it may make more sense in the context of KIP-28 which would provide some higher-level processing capabilities (though it doesn't seem like the KStream abstraction would provide a direct way to leverage this partitioner without custom logic). Thanks, Jason On Wed, Jul 22, 2015 at 12:14 AM, Gianmarco De Francisci Morales g...@apache.org wrote: Hello folks, I'd like to ask the community about its opinion on the partitioning functions in Kafka. With KAFKA-2091 https://issues.apache.org/jira/browse/KAFKA-2091 integrated we are now able to have custom partitioners in the producer. The question now becomes *which* partitioners should ship with Kafka? This issue arose in the context of KAFKA-2092 https://issues.apache.org/jira/browse/KAFKA-2092, which implements a specific load-balanced partitioning. This partitioner however assumes some
[DISCUSS] Partitioning in Kafka
Hello folks, I'd like to ask the community about its opinion on the partitioning functions in Kafka. With KAFKA-2091 https://issues.apache.org/jira/browse/KAFKA-2091 integrated we are now able to have custom partitioners in the producer. The question now becomes *which* partitioners should ship with Kafka? This issue arose in the context of KAFKA-2092 https://issues.apache.org/jira/browse/KAFKA-2092, which implements a specific load-balanced partitioning. This partitioner however assumes some stages of processing on top of it to make proper use of the data, i.e., it envisions Kafka as a substrate for stream processing, and not only as the I/O component. Is this a direction that Kafka wants to go towards? Or is this a role better left to the internal communication systems of other stream processing engines (e.g., Storm)? And if the answer is the latter, how would something such a Samza (which relies mostly on Kafka as its communication substrate) be able to implement advanced partitioning schemes? Cheers, -- Gianmarco