[
https://issues.apache.org/jira/browse/KAFKA-1196?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14173943#comment-14173943
]
Ewen Cheslack-Postava commented on KAFKA-1196:
----------------------------------------------
This is a wip patch to fix this issue, which previous discussion suggests was
due to the FetchResponse exceeding 2GB. My approach to triggering the issue,
however, doesn't exhibit exactly the same issue but does cause an unrecoverable
error that causes the consumer connection to terminate. (For reference, it
causes the server to fail when FetchResponseSend.writeTo calls expectIncomplete
and sendSize is negative due to overflow. This confuses the server since it
looks like the message is already done sending and the server forcibly closes
the consumer's connection.)
The patch addresses the core issue by ensuring the returned message doesn't
exceed 2GB by dropping parts of it in a way that otherwise shouldn't affect the
consumer. But there are a lot of points that still need to be addressed:
* I started by building an integration test to trigger the issue, included in
PrimitiveApiTest. However, since we necessarily need to have > 2GB data to
trigger the issue, it's probably too expensive to include in this way. Offline
discussion suggests maybe a system test would be a better place to include
this. It's still included here for completeness.
* The implementation filters to a subset of the data in FetchResponse. The main
reason for this is that this process needs to know the exact (or at least
conservative estimate) size of serialized data, which only FetchResponse knows.
But it's also a bit weird compared to other message classes, which are case
classes and don't modify those inputs.
* Algorithm for choosing subset to return: initial approach is to remove random
elements until we get below the limit. This is simple to understand and avoids
starvation of specific TopicAndPartitions. Any concerns with this basic
approach?
* I'm pretty sure I've managed to keep the < 2GB case to effectively the same
computational cost (computing the serialized size, grouped data, etc. exactly
once as before). However, for the > 2GB case I've only ensured correctness. In
particular, the progressive removal and reevaluation of serialized size could
potentially be very bad for very large data sets (e.g. starting a mirror maker
against a large data set with large # of partitions from scratch).
* Note that the algorithm never deals with the actual message data, only
metadata about what messages are available. This is relevant since this is what
suggested the approach in the patch could still be performant --
ReplicaManager.readMessageSets processes the entire FetchRequest and filters it
down because the metadata involved is relatively small.
* Based on the previous two points, this really needs some more realistic large
scale system tests to make sure this approach is not only correct, but provides
reasonable performance (or indicates we need to revise the algorithm for
selecting a subset of the data).
* Testing isn't really complete -- I triggered the issue with 4 topics * 600
MB/topic, which is > 2GB. Another obvious case to check is when some partitions
contain > 2GB on their own.
* I'd like someone to help sanity check the exact maximum FetchResponse
serialized size we limit messages to. It's not Int.MaxValue because the
FetchResponseSend class adds 4 + FetchResponse.sizeInBytes for it's own size.
I'd like a sanity check that the extra 4 bytes is enough -- is there any
additional wrapping we might need to account for? Getting a test to hit exactly
that narrow range could be tricky.
* The tests include both immediate-response and purgatory paths, but the
purgatory version requires a timeout in the test, which could end up being
flaky + wasting time, but it doesn't look like there's a great way to mock that
right now. Maybe this doesn't matter if it moves to a system test?
* One case this doesn't handle yet is when the data reaches > 2GB after it's in
the purgatory. The result is correct, but the response is not sent as soon as
that condition is satisfied. This is because it looks like evaluating this
exactly would require calling readMessageSets and evaluating the size of the
message for every DelayedFetch.isSatisifed call. This sounds like it could end
up being pretty expensive. Maybe there's a better way, perhaps an approximate
scheme?
* The test requires some extra bytes in the fetchSize for each partition,
presumably for overhead in encoding. I haven't tracked down exactly how big
that should be, but I'm guessing it could end up affecting the results of more
comprehensive tests.
> java.lang.IllegalArgumentException Buffer.limit on FetchResponse.scala + 33
> ---------------------------------------------------------------------------
>
> Key: KAFKA-1196
> URL: https://issues.apache.org/jira/browse/KAFKA-1196
> Project: Kafka
> Issue Type: Bug
> Components: consumer
> Affects Versions: 0.8.0
> Environment: running java 1.7, linux and kafka compiled against scala
> 2.9.2
> Reporter: Gerrit Jansen van Vuuren
> Assignee: Ewen Cheslack-Postava
> Priority: Blocker
> Labels: newbie
> Fix For: 0.9.0
>
> Attachments: KAFKA-1196.patch
>
>
> I have 6 topics each with 8 partitions spread over 4 kafka servers.
> the servers are 24 core 72 gig ram.
> While consuming from the topics I get an IlegalArgumentException and all
> consumption stops, the error keeps on throwing.
> I've tracked it down to FectchResponse.scala line 33
> The error happens when the FetchResponsePartitionData object's readFrom
> method calls:
> messageSetBuffer.limit(messageSetSize)
> I put in some debug code the the messageSetSize is 671758648, while the
> buffer.capacity() gives 155733313, for some reason the buffer is smaller than
> the required message size.
> I don't know the consumer code enough to debug this. It doesn't matter if
> compression is used or not.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)