I went ahead and have a patch submitted here: https://github.com/apache/kafka/pull/2638
Per Joel's suggestion, I changed the compression ratio to be per topic as well. It seems working well. Since there is an important behavior change and a new sensor is added, I'll keep the KIP and update it according. Thanks, Jiangjie (Becket) Qin On Mon, Feb 27, 2017 at 3:50 PM, Joel Koshy <jjkosh...@gmail.com> wrote: > > > > Lets say we sent the batch over the wire and received a > > RecordTooLargeException, how do we split it as once we add the message to > > the batch we loose the message level granularity. We will have to > > decompress, do deep iteration and split and again compress. right? This > > looks like a performance bottle neck in case of multi topic producers > like > > mirror maker. > > > > Yes, but these should be outliers if we do estimation on a per-topic basis > and if we target a conservative-enough compression ratio. The producer > should also avoid sending over the wire if it can be made aware of the > max-message size limit on the broker, and split if it determines that a > record exceeds the broker's config. Ideally this should be part of topic > metadata but is not - so it could be off a periodic describe-configs > <https://cwiki.apache.org/confluence/display/KAFKA/KIP- > 4+-+Command+line+and+centralized+administrative+operations#KIP-4- > Commandlineandcentralizedadministrativeoperations-DescribeConfigsRequest> > (which isn't available yet). This doesn't remove the need to split and > recompress though. > > > > On Mon, Feb 27, 2017 at 10:51 AM, Becket Qin <becket....@gmail.com> > wrote: > > > > > Hey Mayuresh, > > > > > > 1) The batch would be split when an RecordTooLargeException is > received. > > > 2) Not lower the actual compression ratio, but lower the estimated > > > compression ratio "according to" the Actual Compression Ratio(ACR). > > > > > > An example, let's start with Estimated Compression Ratio (ECR) = 1.0. > Say > > > the compression ratio of ACR is ~0.8, instead of letting the ECR > dropped > > to > > > 0.8 very quickly, we only drop 0.001 every time when ACR < ECR. > However, > > > once we see an ACR > ECR, we increment ECR by 0.05. If a > > > RecordTooLargeException is received, we reset the ECR back to 1.0 and > > split > > > the batch. > > > > > > Thanks, > > > > > > Jiangjie (Becket) Qin > > > > > > > > > > > > On Mon, Feb 27, 2017 at 10:30 AM, Mayuresh Gharat < > > > gharatmayures...@gmail.com> wrote: > > > > > > > Hi Becket, > > > > > > > > Seems like an interesting idea. > > > > I had couple of questions : > > > > 1) How do we decide when the batch should be split? > > > > 2) What do you mean by slowly lowering the "actual" compression > ratio? > > > > An example would really help here. > > > > > > > > Thanks, > > > > > > > > Mayuresh > > > > > > > > On Fri, Feb 24, 2017 at 3:17 PM, Becket Qin <becket....@gmail.com> > > > wrote: > > > > > > > > > Hi Jay, > > > > > > > > > > Yeah, I got your point. > > > > > > > > > > I think there might be a solution which do not require adding a new > > > > > configuration. We can start from a very conservative compression > > ratio > > > > say > > > > > 1.0 and lower it very slowly according to the actual compression > > ratio > > > > > until we hit a point that we have to split a batch. At that point, > we > > > > > exponentially back off on the compression ratio. The idea is > somewhat > > > > like > > > > > TCP. This should help avoid frequent split. > > > > > > > > > > The upper bound of the batch size is also a little awkward today > > > because > > > > we > > > > > say the batch size is based on compressed size, but users cannot > set > > it > > > > to > > > > > the max message size because that will result in oversized > messages. > > > With > > > > > this change we will be able to allow the users to set the message > > size > > > to > > > > > close to max message size. > > > > > > > > > > However the downside is that there could be latency spikes in the > > > system > > > > in > > > > > this case due to the splitting, especially when there are many > > messages > > > > > need to be split at the same time. That could potentially be an > issue > > > for > > > > > some users. > > > > > > > > > > What do you think about this approach? > > > > > > > > > > Thanks, > > > > > > > > > > Jiangjie (Becket) Qin > > > > > > > > > > > > > > > > > > > > On Thu, Feb 23, 2017 at 1:31 PM, Jay Kreps <j...@confluent.io> > wrote: > > > > > > > > > > > Hey Becket, > > > > > > > > > > > > Yeah that makes sense. > > > > > > > > > > > > I agree that you'd really have to both fix the estimation (i.e. > > make > > > it > > > > > per > > > > > > topic or make it better estimate the high percentiles) AND have > the > > > > > > recovery mechanism. If you are underestimating often and then > > paying > > > a > > > > > high > > > > > > recovery price that won't fly. > > > > > > > > > > > > I think you take my main point though, which is just that I hate > to > > > > > exposes > > > > > > these super low level options to users because it is so hard to > > > explain > > > > > to > > > > > > people what it means and how they should set it. So if it is > > possible > > > > to > > > > > > make either some combination of better estimation and splitting > or > > > > better > > > > > > tolerance of overage that would be preferrable. > > > > > > > > > > > > -Jay > > > > > > > > > > > > On Thu, Feb 23, 2017 at 11:51 AM, Becket Qin < > becket....@gmail.com > > > > > > > > wrote: > > > > > > > > > > > > > @Dong, > > > > > > > > > > > > > > Thanks for the comments. The default behavior of the producer > > won't > > > > > > change. > > > > > > > If the users want to use the uncompressed message size, they > > > probably > > > > > > will > > > > > > > also bump up the batch size to somewhere close to the max > message > > > > size. > > > > > > > This would be in the document. BTW the default batch size is > 16K > > > > which > > > > > is > > > > > > > pretty small. > > > > > > > > > > > > > > @Jay, > > > > > > > > > > > > > > Yeah, we actually had debated quite a bit internally what is > the > > > best > > > > > > > solution to this. > > > > > > > > > > > > > > I completely agree it is a bug. In practice we usually leave > some > > > > > > headroom > > > > > > > to allow the compressed size to grow a little if the the > original > > > > > > messages > > > > > > > are not compressible, for example, 1000 KB instead of exactly 1 > > MB. > > > > It > > > > > is > > > > > > > likely safe enough. > > > > > > > > > > > > > > The major concern for the rejected alternative is performance. > It > > > > > largely > > > > > > > depends on how frequent we need to split a batch, i.e. how > likely > > > the > > > > > > > estimation can go off. If we only need to the split work > > > > occasionally, > > > > > > the > > > > > > > cost would be amortized so we don't need to worry about it too > > > much. > > > > > > > However, it looks that for a producer with shared topics, the > > > > > estimation > > > > > > is > > > > > > > always off. As an example, consider two topics, one with > > > compression > > > > > > ratio > > > > > > > 0.6 the other 0.2, assuming exactly same traffic, the average > > > > > compression > > > > > > > ratio would be roughly 0.4, which is not right for either of > the > > > > > topics. > > > > > > So > > > > > > > almost half of the batches (of the topics with 0.6 compression > > > ratio) > > > > > > will > > > > > > > end up larger than the configured batch size. When it comes to > > more > > > > > > topics > > > > > > > such as mirror maker, this becomes more unpredictable. To avoid > > > > > frequent > > > > > > > rejection / split of the batches, we need to configured the > batch > > > > size > > > > > > > pretty conservatively. This could actually hurt the performance > > > > because > > > > > > we > > > > > > > are shoehorn the messages that are highly compressible to a > small > > > > batch > > > > > > so > > > > > > > that the other topics that are not that compressible will not > > > become > > > > > too > > > > > > > large with the same batch size. At LinkedIn, our batch size is > > > > > configured > > > > > > > to 64 KB because of this. I think we may actually have better > > > > batching > > > > > if > > > > > > > we just use the uncompressed message size and 800 KB batch > size. > > > > > > > > > > > > > > We did not think about loosening the message size restriction, > > but > > > > that > > > > > > > sounds a viable solution given that the consumer now can fetch > > > > > oversized > > > > > > > messages. One concern would be that on the broker side > oversized > > > > > messages > > > > > > > will bring more memory pressure. With KIP-92, we may mitigate > > that, > > > > but > > > > > > the > > > > > > > memory allocation for large messages may not be very GC > > friendly. I > > > > > need > > > > > > to > > > > > > > think about this a little more. > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > > > > Jiangjie (Becket) Qin > > > > > > > > > > > > > > > > > > > > > On Wed, Feb 22, 2017 at 8:57 PM, Jay Kreps <j...@confluent.io> > > > wrote: > > > > > > > > > > > > > > > Hey Becket, > > > > > > > > > > > > > > > > I get the problem we want to solve with this, but I don't > think > > > > this > > > > > is > > > > > > > > something that makes sense as a user controlled knob that > > > everyone > > > > > > > sending > > > > > > > > data to kafka has to think about. It is basically a bug, > right? > > > > > > > > > > > > > > > > First, as a technical question is it true that using the > > > > uncompressed > > > > > > > size > > > > > > > > for batching actually guarantees that you observe the limit? > I > > > > think > > > > > > that > > > > > > > > implies that compression always makes the messages smaller, > > > which i > > > > > > think > > > > > > > > usually true but is not guaranteed, right? e.g. if someone > > > encrypts > > > > > > their > > > > > > > > data which tends to randomize it and then enables > > compressesion, > > > it > > > > > > could > > > > > > > > slightly get bigger? > > > > > > > > > > > > > > > > I also wonder if the rejected alternatives you describe > > couldn't > > > be > > > > > > made > > > > > > > to > > > > > > > > work: basically try to be a bit better at estimation and > > recover > > > > when > > > > > > we > > > > > > > > guess wrong. I don't think the memory usage should be a > > problem: > > > > > isn't > > > > > > it > > > > > > > > the same memory usage the consumer of that topic would need? > > And > > > > > can't > > > > > > > you > > > > > > > > do the splitting and recompression in a streaming fashion? If > > we > > > an > > > > > > make > > > > > > > > the estimation rate low and the recovery cost is just ~2x the > > > > normal > > > > > > cost > > > > > > > > for that batch that should be totally fine, right? (It's > > > > technically > > > > > > true > > > > > > > > you might have to split more than once, but since you halve > it > > > each > > > > > > time > > > > > > > I > > > > > > > > think should you get a number of halvings that is logarithmic > > in > > > > the > > > > > > miss > > > > > > > > size, which, with better estimation you'd hope would be super > > > duper > > > > > > > small). > > > > > > > > > > > > > > > > Alternatively maybe we could work on the other side of the > > > problem > > > > > and > > > > > > > try > > > > > > > > to make it so that a small miss on message size isn't a big > > > > problem. > > > > > I > > > > > > > > think original issue was that max size and fetch size were > > > tightly > > > > > > > coupled > > > > > > > > and the way memory in the consumer worked you really wanted > > fetch > > > > > size > > > > > > to > > > > > > > > be as small as possible because you'd use that much memory > per > > > > > fetched > > > > > > > > partition and the consumer would get stuck if its fetch size > > > wasn't > > > > > big > > > > > > > > enough. I think we made some progress on that issue and maybe > > > more > > > > > > could > > > > > > > be > > > > > > > > done there so that a small bit of fuzziness around the size > > would > > > > not > > > > > > be > > > > > > > an > > > > > > > > issue? > > > > > > > > > > > > > > > > -Jay > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Tue, Feb 21, 2017 at 12:30 PM, Becket Qin < > > > becket....@gmail.com > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > Hi folks, > > > > > > > > > > > > > > > > > > I would like to start the discussion thread on KIP-126. The > > KIP > > > > > > propose > > > > > > > > > adding a new configuration to KafkaProducer to allow > batching > > > > based > > > > > > on > > > > > > > > > uncompressed message size. > > > > > > > > > > > > > > > > > > Comments are welcome. > > > > > > > > > > > > > > > > > > The KIP wiki is following: > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP- > > > > > > > > > 126+-+Allow+KafkaProducer+to+batch+based+on+uncompressed+ > siz > > e > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > > > > > > > > Jiangjie (Becket) Qin > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > -Regards, > > > > Mayuresh R. Gharat > > > > (862) 250-7125 > > > > > > > > > > > > > > > -- > > -Regards, > > Mayuresh R. Gharat > > (862) 250-7125 > > >