[ 
https://issues.apache.org/jira/browse/KAFKA-15641?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15641:
------------------------------
    Labels: fetcher  (was: )

> Investigate CompletedFetch handleInitializeErrors for accuracy
> --------------------------------------------------------------
>
>                 Key: KAFKA-15641
>                 URL: https://issues.apache.org/jira/browse/KAFKA-15641
>             Project: Kafka
>          Issue Type: Bug
>          Components: clients, consumer
>            Reporter: Kirk True
>            Assignee: Kirk True
>            Priority: Major
>              Labels: fetcher
>
> The {{FetcherTest}} and {{FetchRequestManagerTest}} classes have a test named 
> testFetchedRecordsAfterSeek, which [upon closer 
> inspection|https://github.com/apache/kafka/pull/14406#discussion_r1347894828] 
> may reveal some incorrect logic in {{FetchCollector.handleInitializeErrors}}.
> Here is the test code:
> {code:java}
> @Test
> public void testFetchedRecordsAfterSeek() {
>     buildFetcher(OffsetResetStrategy.NONE,
>                  new ByteArrayDeserializer(),
>                  new ByteArrayDeserializer(),
>                  2,
>                  IsolationLevel.READ_UNCOMMITTED);
>     assignFromUser(singleton(tp0));
>     // Step 1: seek to offset 0 of our partition.
>     subscriptions.seek(tp0, 0);
>     // Step 2: issue a mock broker request to fetch data from the current 
> offset in our local state,
>     // i.e. offset 0.
>     assertTrue(sendFetches() > 0);
>     // Step 3: mock an OFFSET_OUT_OF_RANGE response from the broker.
>     client.prepareResponse(fullFetchResponse(tidp0, records, 
> Errors.OFFSET_OUT_OF_RANGE, 100L, 0));
>     // Step 4: process the network I/O to receive the response from the 
> broker with the OFFSET_OUT_OF_RANGE
>     // that was injected. Note, however, that we haven't "collected" the 
> fetch data included in the response.
>     networkClientDelegate.poll(time.timer(0));
>     // Step 5: validate that the partition is not marked as needing its 
> offset reset. The response validation
>     // logic is performed during the fetch collection, which doesn't happen 
> until assertEmptyFetch below.
>     assertFalse(subscriptions.isOffsetResetNeeded(tp0));
>     // Step 6: update the partition's position in our local state to offset 
> 2. We still haven't collected the
>     // fetch, so we haven't performed any validation of the fetch response.
>     subscriptions.seek(tp0, 2);
>     // Step 7: perform the fetch collection. As part of that process, error 
> handling is performed. Since
>     // we intentionally injected an error above, this error will be checked 
> and handled in the
>     // FetchCollector.handleInitializeErrors method. When handling 
> OFFSET_OUT_OF_RANGE, handleInitializeErrors
>     // will notice that the original requested offset (0) is different from 
> the state of our current offset (2).
>     assertEmptyFetch("Should not return records or advance position after 
> seeking to end of topic partition");
> }
> {code}
> Here is the code from {{FetchCollector.handleInitializeErrors}}:
> {code:java}
> private void handleInitializeErrors(final CompletedFetch completedFetch, 
> final Errors error) {
>     final TopicPartition tp = completedFetch.partition;
>     final long fetchOffset = completedFetch.nextFetchOffset();
>     . . .
>     if (error == Errors.OFFSET_OUT_OF_RANGE) {
>         Optional<Integer> clearedReplicaId = 
> subscriptions.clearPreferredReadReplica(tp);
>         if (!clearedReplicaId.isPresent()) {
>             // If there's no preferred replica to clear, we're fetching from 
> the leader so handle
>             // this error normally
>             SubscriptionState.FetchPosition position = 
> subscriptions.position(tp);
>             if (position == null || fetchOffset != position.offset) {
>                 log.debug("Discarding stale fetch response for partition {} 
> since the fetched offset {} " +
>                         "does not match the current offset {}", tp, 
> fetchOffset, position);
>             } else {
>                 String errorMessage = "Fetch position " + position + " is out 
> of range for partition " + tp;
>                 if (subscriptions.hasDefaultOffsetResetPolicy()) {
>                     log.info("{}, resetting offset", errorMessage);
>                     subscriptions.requestOffsetReset(tp);
>                 } else {
>                     log.info("{}, raising error to the application since no 
> reset policy is configured", 
>                         errorMessage);
>                     throw new OffsetOutOfRangeException(errorMessage,
>                             Collections.singletonMap(tp, position.offset));
>                 }
>             }
>         } else {
>             log.debug("Unset the preferred read replica {} for partition {} 
> since we got {} when fetching {}",
>                     clearedReplicaId.get(), tp, error, fetchOffset);
>         }
>     }
>     . . .
> }
> {code}
> The question is: why is the {{OFFSET_OUT_OF_RANGE}} error ignored just 
> because of the following code?
> {code:java}
>         if (position == null || fetchOffset != position.offset) {
> {code}
> It's a bit weird that the above check is only done for the 
> {{OFFSET_OUT_OF_RANGE}} error, instead of any error.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to