[ 
https://issues.apache.org/jira/browse/KAFKA-15641?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kirk True updated KAFKA-15641:
------------------------------
    Description: 
The {{FetcherTest}} and {{FetchRequestManagerTest}} classes have a test named 
testFetchedRecordsAfterSeek, which [upon closer 
inspection|https://github.com/apache/kafka/pull/14406#discussion_r1347894828] 
may reveal some incorrect logic in {{FetchCollector.handleInitializeErrors}}.

Here is the test code:

{code:java}
@Test
public void testFetchedRecordsAfterSeek() {
    buildFetcher(OffsetResetStrategy.NONE,
                 new ByteArrayDeserializer(),
                 new ByteArrayDeserializer(),
                 2,
                 IsolationLevel.READ_UNCOMMITTED);

    assignFromUser(singleton(tp0));

    // Step 1: seek to offset 0 of our partition.
    subscriptions.seek(tp0, 0);

    // Step 2: issue a mock broker request to fetch data from the current 
offset in our local state, i.e. offset 0.
    assertTrue(sendFetches() > 0);

    // Step 3: mock an OFFSET_OUT_OF_RANGE response from the broker.
    client.prepareResponse(fullFetchResponse(tidp0, records, 
Errors.OFFSET_OUT_OF_RANGE, 100L, 0));

    // Step 4: process the network I/O to receive the response from the broker 
with the OFFSET_OUT_OF_RANGE
    // that was injected. Note, however, that we haven't "collected" the fetch 
data included in the response.
    networkClientDelegate.poll(time.timer(0));

    // Step 5: validate that the partition is not marked as needing its offset 
reset. The response validation
    // logic is performed during the fetch collection, which doesn't happen 
until we call assertEmptyFetch below.
    assertFalse(subscriptions.isOffsetResetNeeded(tp0));

    // Step 6: update the partition's position in our local state to offset 2. 
We still haven't collected the
    // fetch, so we haven't performed any validation of the fetch response.
    subscriptions.seek(tp0, 2);

    // Step 7: perform the fetch collection. As part of that process, error 
handling is performed. Since
    // we intentionally injected an error above, this error will be checked and 
handled in the
    // FetchCollector.handleInitializeErrors method. When handling 
OFFSET_OUT_OF_RANGE, handleInitializeErrors
    // will notice that the original requested offset (0) is different from the 
state of our current offset (2).
    assertEmptyFetch("Should not return records or advance position after 
seeking to end of topic partition");
}
{code}

Here is the code from FetchCollector.handleInitializeErrors:

{code:java}
private void handleInitializeErrors(final CompletedFetch completedFetch, final 
Errors error) {
    final TopicPartition tp = completedFetch.partition;
    final long fetchOffset = completedFetch.nextFetchOffset();

    if (error == Errors.NOT_LEADER_OR_FOLLOWER ||
            error == Errors.REPLICA_NOT_AVAILABLE ||
            error == Errors.KAFKA_STORAGE_ERROR ||
            error == Errors.FENCED_LEADER_EPOCH ||
            error == Errors.OFFSET_NOT_AVAILABLE) {
        . . .
    } else if (error == Errors.UNKNOWN_TOPIC_OR_PARTITION) {
        . . .
    } else if (error == Errors.UNKNOWN_TOPIC_ID) {
        . . .
    } else if (error == Errors.INCONSISTENT_TOPIC_ID) {
        . . .
    } else if (error == Errors.OFFSET_OUT_OF_RANGE) {
        Optional<Integer> clearedReplicaId = 
subscriptions.clearPreferredReadReplica(tp);

        if (!clearedReplicaId.isPresent()) {
            // If there's no preferred replica to clear, we're fetching from 
the leader so handle
            // this error normally
            SubscriptionState.FetchPosition position = 
subscriptions.position(tp);

            if (position == null || fetchOffset != position.offset) {
                log.debug("Discarding stale fetch response for partition {} 
since the fetched offset {} " +
                        "does not match the current offset {}", tp, 
fetchOffset, position);
            } else {
                String errorMessage = "Fetch position " + position + " is out 
of range for partition " + tp;

                if (subscriptions.hasDefaultOffsetResetPolicy()) {
                    log.info("{}, resetting offset", errorMessage);
                    subscriptions.requestOffsetReset(tp);
                } else {
                    log.info("{}, raising error to the application since no 
reset policy is configured", 
                        errorMessage);
                    throw new OffsetOutOfRangeException(errorMessage,
                            Collections.singletonMap(tp, position.offset));
                }
            }
        } else {
            log.debug("Unset the preferred read replica {} for partition {} 
since we got {} when fetching {}",
                    clearedReplicaId.get(), tp, error, fetchOffset);
        }
    } else if (error == Errors.TOPIC_AUTHORIZATION_FAILED) {
        . . .
    } else if (error == Errors.UNKNOWN_LEADER_EPOCH) {
        . . .
    } else if (error == Errors.UNKNOWN_SERVER_ERROR) {
        . . .
    } else if (error == Errors.CORRUPT_MESSAGE) {
        . . .
    } else {
        throw new IllegalStateException(. . .);
    }
}
{code}

The question is: why is the OFFSET_OUT_OF_RANGE error ignored just because of 
the following code?

{code:java}
        if (position == null || fetchOffset != position.offset) {
{code}

It's a bit weird that the above check is only done for the OFFSET_OUT_OF_RANGE 
error, instead of any error.

  was:
The {{FetcherTest}} and {{FetchRequestManagerTest}} classes have a test named 
testFetchedRecordsAfterSeek, which [upon closer 
inspection|https://github.com/apache/kafka/pull/14406#discussion_r1347894828] 
may reveal some incorrect logic in 
{{{}FetchCollector.handleInitializeErrors{}}}.

Here is the test code:

 
{code:java}
@Test
public void testFetchedRecordsAfterSeek() {
    buildFetcher(OffsetResetStrategy.NONE,
                 new ByteArrayDeserializer(),
                 new ByteArrayDeserializer(),
                 2,
                 IsolationLevel.READ_UNCOMMITTED);

    assignFromUser(singleton(tp0));

    // Step 1: seek to offset 0 of our partition.
    subscriptions.seek(tp0, 0);

    // Step 2: issue a mock broker request to fetch data from the current 
offset in our local state, i.e. offset 0.
    assertTrue(sendFetches() > 0);

    // Step 3: mock an OFFSET_OUT_OF_RANGE response from the broker.
    client.prepareResponse(fullFetchResponse(tidp0, records, 
Errors.OFFSET_OUT_OF_RANGE, 100L, 0));

    // Step 4: process the network I/O to receive the response from the broker 
with the OFFSET_OUT_OF_RANGE
    // that was injected. Note, however, that we haven't "collected" the fetch 
data included in the response.
    networkClientDelegate.poll(time.timer(0));

    // Step 5: validate that the partition is not marked as needing its offset 
reset. The response validation
    // logic is performed during the fetch collection, which doesn't happen 
until we call assertEmptyFetch below.
    assertFalse(subscriptions.isOffsetResetNeeded(tp0));

    // Step 6: update the partition's position in our local state to offset 2. 
We still haven't collected the
    // fetch, so we haven't performed any validation of the fetch response.
    subscriptions.seek(tp0, 2);

    // Step 7: perform the fetch collection. As part of that process, error 
handling is performed. Since
    // we intentionally injected an error above, this error will be checked and 
handled in the
    // FetchCollector.handleInitializeErrors method. When handling 
OFFSET_OUT_OF_RANGE, handleInitializeErrors
    // will notice that the original requested offset (0) is different from the 
state of our current offset (2).
    assertEmptyFetch("Should not return records or advance position after 
seeking to end of topic partition");
}{code}
 

 

Here is the code from FetchCollector.handleInitializeErrors:

 

 
{code:java}
private void handleInitializeErrors(final CompletedFetch completedFetch, final 
Errors error) {
    final TopicPartition tp = completedFetch.partition;
    final long fetchOffset = completedFetch.nextFetchOffset();

    if (error == Errors.NOT_LEADER_OR_FOLLOWER ||
            error == Errors.REPLICA_NOT_AVAILABLE ||
            error == Errors.KAFKA_STORAGE_ERROR ||
            error == Errors.FENCED_LEADER_EPOCH ||
            error == Errors.OFFSET_NOT_AVAILABLE) {
        . . .
    } else if (error == Errors.UNKNOWN_TOPIC_OR_PARTITION) {
        . . .
    } else if (error == Errors.UNKNOWN_TOPIC_ID) {
        . . .
    } else if (error == Errors.INCONSISTENT_TOPIC_ID) {
        . . .
    } else if (error == Errors.OFFSET_OUT_OF_RANGE) {
        Optional<Integer> clearedReplicaId = 
subscriptions.clearPreferredReadReplica(tp);

        if (!clearedReplicaId.isPresent()) {
            // If there's no preferred replica to clear, we're fetching from 
the leader so handle
            // this error normally
            SubscriptionState.FetchPosition position = 
subscriptions.position(tp);

            if (position == null || fetchOffset != position.offset) {
                log.debug("Discarding stale fetch response for partition {} 
since the fetched offset {} " +
                        "does not match the current offset {}", tp, 
fetchOffset, position);
            } else {
                String errorMessage = "Fetch position " + position + " is out 
of range for partition " + tp;

                if (subscriptions.hasDefaultOffsetResetPolicy()) {
                    log.info("{}, resetting offset", errorMessage);
                    subscriptions.requestOffsetReset(tp);
                } else {
                    log.info("{}, raising error to the application since no 
reset policy is configured", 
                        errorMessage);
                    throw new OffsetOutOfRangeException(errorMessage,
                            Collections.singletonMap(tp, position.offset));
                }
            }
        } else {
            log.debug("Unset the preferred read replica {} for partition {} 
since we got {} when fetching {}",
                    clearedReplicaId.get(), tp, error, fetchOffset);
        }
    } else if (error == Errors.TOPIC_AUTHORIZATION_FAILED) {
        . . .
    } else if (error == Errors.UNKNOWN_LEADER_EPOCH) {
        . . .
    } else if (error == Errors.UNKNOWN_SERVER_ERROR) {
        . . .
    } else if (error == Errors.CORRUPT_MESSAGE) {
        . . .
    } else {
        throw new IllegalStateException(. . .);
    }
}
 {code}
 

 

 

 

The question is: why does 

 


> Investigate CompletedFetch handleInitializeErrors for accuracy
> --------------------------------------------------------------
>
>                 Key: KAFKA-15641
>                 URL: https://issues.apache.org/jira/browse/KAFKA-15641
>             Project: Kafka
>          Issue Type: Improvement
>          Components: clients, consumer
>            Reporter: Kirk True
>            Assignee: Kirk True
>            Priority: Major
>
> The {{FetcherTest}} and {{FetchRequestManagerTest}} classes have a test named 
> testFetchedRecordsAfterSeek, which [upon closer 
> inspection|https://github.com/apache/kafka/pull/14406#discussion_r1347894828] 
> may reveal some incorrect logic in {{FetchCollector.handleInitializeErrors}}.
> Here is the test code:
> {code:java}
> @Test
> public void testFetchedRecordsAfterSeek() {
>     buildFetcher(OffsetResetStrategy.NONE,
>                  new ByteArrayDeserializer(),
>                  new ByteArrayDeserializer(),
>                  2,
>                  IsolationLevel.READ_UNCOMMITTED);
>     assignFromUser(singleton(tp0));
>     // Step 1: seek to offset 0 of our partition.
>     subscriptions.seek(tp0, 0);
>     // Step 2: issue a mock broker request to fetch data from the current 
> offset in our local state, i.e. offset 0.
>     assertTrue(sendFetches() > 0);
>     // Step 3: mock an OFFSET_OUT_OF_RANGE response from the broker.
>     client.prepareResponse(fullFetchResponse(tidp0, records, 
> Errors.OFFSET_OUT_OF_RANGE, 100L, 0));
>     // Step 4: process the network I/O to receive the response from the 
> broker with the OFFSET_OUT_OF_RANGE
>     // that was injected. Note, however, that we haven't "collected" the 
> fetch data included in the response.
>     networkClientDelegate.poll(time.timer(0));
>     // Step 5: validate that the partition is not marked as needing its 
> offset reset. The response validation
>     // logic is performed during the fetch collection, which doesn't happen 
> until we call assertEmptyFetch below.
>     assertFalse(subscriptions.isOffsetResetNeeded(tp0));
>     // Step 6: update the partition's position in our local state to offset 
> 2. We still haven't collected the
>     // fetch, so we haven't performed any validation of the fetch response.
>     subscriptions.seek(tp0, 2);
>     // Step 7: perform the fetch collection. As part of that process, error 
> handling is performed. Since
>     // we intentionally injected an error above, this error will be checked 
> and handled in the
>     // FetchCollector.handleInitializeErrors method. When handling 
> OFFSET_OUT_OF_RANGE, handleInitializeErrors
>     // will notice that the original requested offset (0) is different from 
> the state of our current offset (2).
>     assertEmptyFetch("Should not return records or advance position after 
> seeking to end of topic partition");
> }
> {code}
> Here is the code from FetchCollector.handleInitializeErrors:
> {code:java}
> private void handleInitializeErrors(final CompletedFetch completedFetch, 
> final Errors error) {
>     final TopicPartition tp = completedFetch.partition;
>     final long fetchOffset = completedFetch.nextFetchOffset();
>     if (error == Errors.NOT_LEADER_OR_FOLLOWER ||
>             error == Errors.REPLICA_NOT_AVAILABLE ||
>             error == Errors.KAFKA_STORAGE_ERROR ||
>             error == Errors.FENCED_LEADER_EPOCH ||
>             error == Errors.OFFSET_NOT_AVAILABLE) {
>         . . .
>     } else if (error == Errors.UNKNOWN_TOPIC_OR_PARTITION) {
>         . . .
>     } else if (error == Errors.UNKNOWN_TOPIC_ID) {
>         . . .
>     } else if (error == Errors.INCONSISTENT_TOPIC_ID) {
>         . . .
>     } else if (error == Errors.OFFSET_OUT_OF_RANGE) {
>         Optional<Integer> clearedReplicaId = 
> subscriptions.clearPreferredReadReplica(tp);
>         if (!clearedReplicaId.isPresent()) {
>             // If there's no preferred replica to clear, we're fetching from 
> the leader so handle
>             // this error normally
>             SubscriptionState.FetchPosition position = 
> subscriptions.position(tp);
>             if (position == null || fetchOffset != position.offset) {
>                 log.debug("Discarding stale fetch response for partition {} 
> since the fetched offset {} " +
>                         "does not match the current offset {}", tp, 
> fetchOffset, position);
>             } else {
>                 String errorMessage = "Fetch position " + position + " is out 
> of range for partition " + tp;
>                 if (subscriptions.hasDefaultOffsetResetPolicy()) {
>                     log.info("{}, resetting offset", errorMessage);
>                     subscriptions.requestOffsetReset(tp);
>                 } else {
>                     log.info("{}, raising error to the application since no 
> reset policy is configured", 
>                         errorMessage);
>                     throw new OffsetOutOfRangeException(errorMessage,
>                             Collections.singletonMap(tp, position.offset));
>                 }
>             }
>         } else {
>             log.debug("Unset the preferred read replica {} for partition {} 
> since we got {} when fetching {}",
>                     clearedReplicaId.get(), tp, error, fetchOffset);
>         }
>     } else if (error == Errors.TOPIC_AUTHORIZATION_FAILED) {
>         . . .
>     } else if (error == Errors.UNKNOWN_LEADER_EPOCH) {
>         . . .
>     } else if (error == Errors.UNKNOWN_SERVER_ERROR) {
>         . . .
>     } else if (error == Errors.CORRUPT_MESSAGE) {
>         . . .
>     } else {
>         throw new IllegalStateException(. . .);
>     }
> }
> {code}
> The question is: why is the OFFSET_OUT_OF_RANGE error ignored just because of 
> the following code?
> {code:java}
>         if (position == null || fetchOffset != position.offset) {
> {code}
> It's a bit weird that the above check is only done for the 
> OFFSET_OUT_OF_RANGE error, instead of any error.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to