Jason Gustafson created KAFKA-9807:
--------------------------------------

             Summary: Race condition updating high watermark allows reads above 
LSO
                 Key: KAFKA-9807
                 URL: https://issues.apache.org/jira/browse/KAFKA-9807
             Project: Kafka
          Issue Type: Bug
            Reporter: Jason Gustafson
            Assignee: Jason Gustafson


We had a transaction system test fail with the following error:

{code}
AssertionError: Detected 37 dups in concurrently consumed messages
{code}

After investigation, we found the duplicates were a result of the consumer 
reading an aborted transaction, which should not be possible with the 
read_committed isolation level.

We tracked down the fetch request which returned the aborted data:

{code}
[2020-03-24 07:27:58,284] INFO Completed request:RequestHeader(apiKey=FETCH, 
apiVersion=11, clientId=console-consumer, correlationId=283) -- 
{replica_id=-1,max_wait_time=500,min_bytes=1,max_bytes=52428800,isolation_level=1,session_id=2043970605,session_epoch=87,topics=[{topic=output-topic,partitions=[{partition=1,current_leader_epoch=3,fetch_offset=48393,log_start_offset=-1,partition_max_bytes=1048576}]}],forgotten_topics_data=[],rack_id=},response:{throttle_time_ms=0,error_code=0,session_id=2043970605,responses=[{topic=output-topic,partition_responses=[{partition_header={partition=1,error_code=0,high_watermark=50646,last_stable_offset=50646,log_start_offset=0,aborted_transactions=[],preferred_read_replica=-1},record_set=FileRecords(size=31582,
 file=/mnt/kafka/kafka-data-logs-1/output-topic-1/00000000000000045694.log, 
start=37613, end=69195)}]}]} 
{code}

After correlating with the contents of the log segment 
00000000000000045694.log, we found that this fetch response included data which 
was above the returned LSO which is 50646. In fact, the high watermark matched 
the LSO in this case, so the data was above the high watermark as well. 

At the same time this request was received, we noted that the high watermark 
was updated:

{code}
[2020-03-24 07:27:58,284] DEBUG [Partition output-topic-1 broker=3] High 
watermark updated from (offset=50646 segment=[45694:68690]) to (offset=50683 
segment=[45694:69195]) (kafka.cluster.Partition)
{code}

The position of the new high watermark matched the end position from the fetch 
response, so that led us to believe there was a race condition with the 
updating of this value. In the code, we have the following (abridged) logic for 
fetching the LSO:

{code}
    firstUnstableOffsetMetadata match {
      case Some(offsetMetadata) if offsetMetadata.messageOffset < highWatermark 
=> offsetMetadata
      case _ => fetchHighWatermarkMetadata
    }
{code}

If the first unstable offset is less than the high watermark, we should use 
that; otherwise we use the high watermark. The problem is that the high 
watermark referenced here could be updated between the range check and the call 
to `fetchHighWatermarkMetadata`. If that happens, we would end up reading data 
which is above the first unstable offset.

The solution to fix this problem is to cache the high watermark value so that 
it is used in both places. We may consider some additional improvements here as 
well, such as fixing the inconsistency problem in the fetch response which 
included data above the returned high watermark. We may also consider having 
the client react more defensively by ignoring fetched data above the high 
watermark. This would fix this problem for newer clients talking to older 
brokers which might hit this problem.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to