[
https://issues.apache.org/jira/browse/KAFKA-19067?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
PoAn Yang resolved KAFKA-19067.
-------------------------------
Resolution: Won't Fix
> AsyncKafkaConsumer may return stale fetch result after seek operation
> ---------------------------------------------------------------------
>
> Key: KAFKA-19067
> URL: https://issues.apache.org/jira/browse/KAFKA-19067
> Project: Kafka
> Issue Type: Bug
> Components: clients, consumer
> Reporter: PoAn Yang
> Assignee: PoAn Yang
> Priority: Major
> Labels: consumer-threading-refactor
>
> The KafkaConsumer sends FetchRequest after it subscribes topics. The
> FetchResponse data stores to FetchBuffer. For KafkaConsumer#seek operation,
> the FetchState changes to AWAIT_RESET and the consumer sends LIST_OFFSET
> request. The state changes back to FETCHING after the consumer receives
> LIST_OFFSET response.
> If a KafkaConsumer subscribes topics and calls seek function, there may have
> stale FetchResponse data in FetchBuffer. For ClassicKafkaConsumer#poll, it
> gets data from FetchBuffer first and then calls ConsumerNetworkClient#poll.
> If there is stale data in FetchBuffer, the data is ignored because the
> FetchState is in AWAIT_RESET. The FetchState in ClassicKafkaConsumer changes
> back to FETCHING after ConsumerNetworkClient#poll receives LIST_OFFSET
> response.
> However, for AsyncKafkaConsumer, it may return stale FetchResponse data to
> users, because the ConsumerNetworkThread runs in another thread. The
> FetchState may changes back to FETCHING before AsyncKafkaConsumer#poll does
> valid position check.
> Following logs show the case for ClassicKafkaConsumer:
> {noformat}
> [Consumer clientId=consumer-group-1, groupId=group] Added read_uncommitted
> fetch request for partition topic-0 at position FetchPosition{offset=0,
> offsetEpoch=Optional.empty,
> currentLeader=LeaderAndEpoch{leader=Optional[localhost:50923 (id: 1 rack:
> null isFenced: false)], epoch=0}} to node localhost:50923 (id: 1 rack: null
> isFenced: false)
> (org.apache.kafka.clients.consumer.internals.AbstractFetch:471)
> [Consumer clientId=consumer-group-1, groupId=group] Sending FETCH request
> with header RequestHeader(apiKey=FETCH, apiVersion=17,
> clientId=consumer-group-1, correlationId=12, headerVersion=2) and timeout
> 30000 to node 1: FetchRequestData(clusterId=null, replicaId=-1,
> replicaState=ReplicaState(replicaId=-1, replicaEpoch=-1), maxWaitMs=500,
> minBytes=1, maxBytes=52428800, isolationLevel=0, sessionId=0, sessionEpoch=0,
> topics=[FetchTopic(topic='topic', topicId=BatA1H3WQ6KdwhZpMq6fOw,
> partitions=[FetchPartition(partition=0, currentLeaderEpoch=0, fetchOffset=0,
> lastFetchedEpoch=-1, logStartOffset=-1, partitionMaxBytes=1048576,
> replicaDirectoryId=AAAAAAAAAAAAAAAAAAAAAA)])], forgottenTopicsData=[],
> rackId='') (org.apache.kafka.clients.NetworkClient:604)
> ### consumer calls seekToBeginning
> [Consumer clientId=consumer-group-1, groupId=group] Sending LIST_OFFSETS
> request with header RequestHeader(apiKey=LIST_OFFSETS, apiVersion=10,
> clientId=consumer-group-1, correlationId=13, headerVersion=2) and timeout
> 30000 to node 1: ListOffsetsRequestData(replicaId=-1, isolationLevel=0,
> topics=[ListOffsetsTopic(name='topic',
> partitions=[ListOffsetsPartition(partitionIndex=0, currentLeaderEpoch=0,
> timestamp=-2)])], timeoutMs=30000)
> (org.apache.kafka.clients.NetworkClient:604)
> [Consumer clientId=consumer-group-1, groupId=group] Fetch read_uncommitted at
> offset 0 for partition topic-0 returned fetch data
> PartitionData(partitionIndex=0, errorCode=0, highWatermark=10,
> lastStableOffset=10, logStartOffset=0,
> divergingEpoch=EpochEndOffset(epoch=-1, endOffset=-1),
> currentLeader=LeaderIdAndEpoch(leaderId=-1, leaderEpoch=-1),
> snapshotId=SnapshotId(endOffset=-1, epoch=-1), abortedTransactions=null,
> preferredReadReplica=-1, records=MemoryRecords(size=151,
> buffer=java.nio.HeapByteBuffer[pos=0 lim=151 cap=154]))
> (org.apache.kafka.clients.consumer.internals.AbstractFetch:203) <-- The
> response of FetchRequest which is sent before calling seekToBeginning. The
> FetchState is AWAIT_RESET, so the data is ignored.
> [Consumer clientId=consumer-group-1, groupId=group] Ignoring fetched records
> for partition topic-0 since it no longer has valid position
> (org.apache.kafka.clients.consumer.internals.FetchCollector:226)
> [Consumer clientId=consumer-group-1, groupId=group] Resetting offset for
> partition topic-0 to position FetchPosition{offset=0,
> offsetEpoch=Optional.empty,
> currentLeader=LeaderAndEpoch{leader=Optional[localhost:50923 (id: 1 rack:
> null isFenced: false)], epoch=0}}.
> (org.apache.kafka.clients.consumer.internals.SubscriptionState:451) <-- The
> result of seekToBeginning.
> [Consumer clientId=consumer-group-1, groupId=group] Added read_uncommitted
> fetch request for partition topic-0 at position FetchPosition{offset=0,
> offsetEpoch=Optional.empty,
> currentLeader=LeaderAndEpoch{leader=Optional[localhost:50923 (id: 1 rack:
> null isFenced: false)], epoch=0}} to node localhost:50923 (id: 1 rack: null
> isFenced: false)
> (org.apache.kafka.clients.consumer.internals.AbstractFetch:471) <-- Send
> another FetchRequest starts from offset 0.
> [Consumer clientId=consumer-group-1, groupId=group] Sending FETCH request
> with header RequestHeader(apiKey=FETCH, apiVersion=17,
> clientId=consumer-group-1, correlationId=14, headerVersion=2) and timeout
> 30000 to node 1: FetchRequestData(clusterId=null, replicaId=-1,
> replicaState=ReplicaState(replicaId=-1, replicaEpoch=-1), maxWaitMs=500,
> minBytes=1, maxBytes=52428800, isolationLevel=0, sessionId=166492720,
> sessionEpoch=1, topics=[], forgottenTopicsData=[], rackId='')
> (org.apache.kafka.clients.NetworkClient:604)
> {noformat}
> Following logs show the case for AsyncKafkaConsumer:
> {noformat}
> [Consumer clientId=consumer-group-2, groupId=group] Added read_uncommitted
> fetch request for partition topic-0 at position FetchPosition{offset=0,
> offsetEpoch=Optional.empty,
> currentLeader=LeaderAndEpoch{leader=Optional[localhost:50970 (id: 2 rack:
> null isFenced: false)], epoch=0}} to node localhost:50970 (id: 2 rack: null
> isFenced: false)
> (org.apache.kafka.clients.consumer.internals.AbstractFetch:471)
> [Consumer clientId=consumer-group-2, groupId=group] Sending FETCH request
> with header RequestHeader(apiKey=FETCH, apiVersion=17,
> clientId=consumer-group-2, correlationId=30, headerVersion=2) and timeout
> 30000 to node 2: FetchRequestData(clusterId=null, replicaId=-1,
> replicaState=ReplicaState(replicaId=-1, replicaEpoch=-1), maxWaitMs=500,
> minBytes=1, maxBytes=52428800, isolationLevel=0, sessionId=0, sessionEpoch=0,
> topics=[FetchTopic(topic='topic', topicId=E2BqIjY8RU2mbcUClbcx3A,
> partitions=[FetchPartition(partition=0, currentLeaderEpoch=0, fetchOffset=0,
> lastFetchedEpoch=-1, logStartOffset=-1, partitionMaxBytes=1048576,
> replicaDirectoryId=AAAAAAAAAAAAAAAAAAAAAA)])], forgottenTopicsData=[],
> rackId='') (org.apache.kafka.clients.NetworkClient:604)
> [Consumer clientId=consumer-group-2, groupId=group] Fetch read_uncommitted at
> offset 0 for partition topic-0 returned fetch data
> PartitionData(partitionIndex=0, errorCode=0, highWatermark=10,
> lastStableOffset=10, logStartOffset=0,
> divergingEpoch=EpochEndOffset(epoch=-1, endOffset=-1),
> currentLeader=LeaderIdAndEpoch(leaderId=-1, leaderEpoch=-1),
> snapshotId=SnapshotId(endOffset=-1, epoch=-1), abortedTransactions=null,
> preferredReadReplica=-1, records=MemoryRecords(size=151,
> buffer=java.nio.HeapByteBuffer[pos=0 lim=151 cap=154]))
> (org.apache.kafka.clients.consumer.internals.AbstractFetch:203) <-- The data
> of FetchRequest before calling seekToBeginning, but it has not been used by
> application thread.
> ### consumer calls seekToBeginning
> [Consumer clientId=consumer-group-2, groupId=group] Sending LIST_OFFSETS
> request with header RequestHeader(apiKey=LIST_OFFSETS, apiVersion=10,
> clientId=consumer-group-2, correlationId=31, headerVersion=2) and timeout
> 30000 to node 2: ListOffsetsRequestData(replicaId=-1, isolationLevel=0,
> topics=[ListOffsetsTopic(name='topic',
> partitions=[ListOffsetsPartition(partitionIndex=0, currentLeaderEpoch=0,
> timestamp=-2)])], timeoutMs=30000)
> (org.apache.kafka.clients.NetworkClient:604)
> [Consumer clientId=consumer-group-2, groupId=group] Resetting offset for
> partition topic-0 to position FetchPosition{offset=0,
> offsetEpoch=Optional.empty,
> currentLeader=LeaderAndEpoch{leader=Optional[localhost:50970 (id: 2 rack:
> null isFenced: false)], epoch=0}}.
> (org.apache.kafka.clients.consumer.internals.SubscriptionState:451)
> ### The stale data before seekToBeginning is used by application thread, so
> the next FetchRequest starts from offset 10.
> [Consumer clientId=consumer-group-2, groupId=group] Added read_uncommitted
> fetch request for partition topic-0 at position FetchPosition{offset=10,
> offsetEpoch=Optional[0],
> currentLeader=LeaderAndEpoch{leader=Optional[localhost:50970 (id: 2 rack:
> null isFenced: false)], epoch=0}} to node localhost:50970 (id: 2 rack: null
> isFenced: false)
> (org.apache.kafka.clients.consumer.internals.AbstractFetch:471)
> [Consumer clientId=consumer-group-2, groupId=group] Sending FETCH request
> with header RequestHeader(apiKey=FETCH, apiVersion=17,
> clientId=consumer-group-2, correlationId=32, headerVersion=2) and timeout
> 30000 to node 2: FetchRequestData(clusterId=null, replicaId=-1,
> replicaState=ReplicaState(replicaId=-1, replicaEpoch=-1), maxWaitMs=500,
> minBytes=1, maxBytes=52428800, isolationLevel=0, sessionId=784149360,
> sessionEpoch=1, topics=[FetchTopic(topic='topic',
> topicId=E2BqIjY8RU2mbcUClbcx3A, partitions=[FetchPartition(partition=0,
> currentLeaderEpoch=0, fetchOffset=10, lastFetchedEpoch=-1, logStartOffset=-1,
> partitionMaxBytes=1048576, replicaDirectoryId=AAAAAAAAAAAAAAAAAAAAAA)])],
> forgottenTopicsData=[], rackId='')
> (org.apache.kafka.clients.NetworkClient:604)
> {noformat}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)