Jason Gustafson created KAFKA-9807:
--------------------------------------
Summary: Race condition updating high watermark allows reads above
LSO
Key: KAFKA-9807
URL: https://issues.apache.org/jira/browse/KAFKA-9807
Project: Kafka
Issue Type: Bug
Reporter: Jason Gustafson
Assignee: Jason Gustafson
We had a transaction system test fail with the following error:
{code}
AssertionError: Detected 37 dups in concurrently consumed messages
{code}
After investigation, we found the duplicates were a result of the consumer
reading an aborted transaction, which should not be possible with the
read_committed isolation level.
We tracked down the fetch request which returned the aborted data:
{code}
[2020-03-24 07:27:58,284] INFO Completed request:RequestHeader(apiKey=FETCH,
apiVersion=11, clientId=console-consumer, correlationId=283) --
{replica_id=-1,max_wait_time=500,min_bytes=1,max_bytes=52428800,isolation_level=1,session_id=2043970605,session_epoch=87,topics=[{topic=output-topic,partitions=[{partition=1,current_leader_epoch=3,fetch_offset=48393,log_start_offset=-1,partition_max_bytes=1048576}]}],forgotten_topics_data=[],rack_id=},response:{throttle_time_ms=0,error_code=0,session_id=2043970605,responses=[{topic=output-topic,partition_responses=[{partition_header={partition=1,error_code=0,high_watermark=50646,last_stable_offset=50646,log_start_offset=0,aborted_transactions=[],preferred_read_replica=-1},record_set=FileRecords(size=31582,
file=/mnt/kafka/kafka-data-logs-1/output-topic-1/00000000000000045694.log,
start=37613, end=69195)}]}]}
{code}
After correlating with the contents of the log segment
00000000000000045694.log, we found that this fetch response included data which
was above the returned LSO which is 50646. In fact, the high watermark matched
the LSO in this case, so the data was above the high watermark as well.
At the same time this request was received, we noted that the high watermark
was updated:
{code}
[2020-03-24 07:27:58,284] DEBUG [Partition output-topic-1 broker=3] High
watermark updated from (offset=50646 segment=[45694:68690]) to (offset=50683
segment=[45694:69195]) (kafka.cluster.Partition)
{code}
The position of the new high watermark matched the end position from the fetch
response, so that led us to believe there was a race condition with the
updating of this value. In the code, we have the following (abridged) logic for
fetching the LSO:
{code}
firstUnstableOffsetMetadata match {
case Some(offsetMetadata) if offsetMetadata.messageOffset < highWatermark
=> offsetMetadata
case _ => fetchHighWatermarkMetadata
}
{code}
If the first unstable offset is less than the high watermark, we should use
that; otherwise we use the high watermark. The problem is that the high
watermark referenced here could be updated between the range check and the call
to `fetchHighWatermarkMetadata`. If that happens, we would end up reading data
which is above the first unstable offset.
The solution to fix this problem is to cache the high watermark value so that
it is used in both places. We may consider some additional improvements here as
well, such as fixing the inconsistency problem in the fetch response which
included data above the returned high watermark. We may also consider having
the client react more defensively by ignoring fetched data above the high
watermark. This would fix this problem for newer clients talking to older
brokers which might hit this problem.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)