showuon commented on code in PR #13206: URL: https://github.com/apache/kafka/pull/13206#discussion_r1102353791
########## core/src/main/java/kafka/server/TierStateMachine.java: ########## @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server; + +import java.util.Optional; + +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.requests.FetchRequest.PartitionData; + +/** + * This interface defines the APIs needed to handle any state transitions + * related to tiering in AbstractFetcherThread. Review Comment: nit: Could we remove the `AbstractFetcherThread` class name here? Just `This interface defines the APIs needed to handle any state transitions related to tiering` to make it general. Otherwise, this will confuse readers if something changed in `AbstractFetcherThread`. ########## core/src/main/java/kafka/server/TierStateMachine.java: ########## @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server; + +import java.util.Optional; + +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.requests.FetchRequest.PartitionData; + +/** + * This interface defines the APIs needed to handle any state transitions + * related to tiering in AbstractFetcherThread. + */ +public interface TierStateMachine { + + /** + * Start the tier state machine for the provided topic partition. + * + * @param topicPartition the topic partition + * @param currentFetchState the current PartitionFetchState which will + * be used to derive the return value + * @param fetchPartitionData the data from the fetch response that returned the offset moved to tiered storage error + * + * @return the new PartitionFetchState after the successful start of the + * tier state machine + */ + PartitionFetchState start(TopicPartition topicPartition, + PartitionFetchState currentFetchState, + PartitionData fetchPartitionData) throws Exception; + + /** + * Optionally advance the state of the tier state machine, based on the + * current PartitionFetchState. The decision to advance the tier + * state machine is implementation specific. + * + * @param topicPartition the topic partition + * @param currentFetchState the current PartitionFetchState which will + * be used to derive the return value + * + * @return the new PartitionFetchState if the tier state machine was advanced Review Comment: nit: return the new PartitionFetchState if the tier state machine was advanced, **otherwise, return the currentFetchState** ########## core/src/main/scala/kafka/server/AbstractFetcherThread.scala: ########## @@ -683,33 +655,24 @@ abstract class AbstractFetcherThread(name: String, * produced to the new leader. While the old leader is trying to handle the OffsetOutOfRangeException and query * the log end offset of the new leader, the new leader's log end offset becomes higher than the follower's log end offset. * - * In the first case, the follower's current log end offset is smaller than the leader's log start offset - * (or leader's local log start offset). - * So the follower should truncate all its logs, roll out a new segment and start to fetch from the current - * leader's log start offset(or leader's local log start offset). + * In the first case, the follower's current log end offset is smaller than the leader's log start offset. So the + * follower should truncate all its logs, roll out a new segment and start to fetch from the current leader's log + * start offset. Review Comment: I read through the code and comments, it is not correct. We're saying `leaderEndOffset >= replicaEndOffset` case here, not `leaderStratOffset`. The leaderStartOffset is another case under leaderEndOffset >= replicaEndOffset`. So, maybe change to: In the first case, [if] the follower's current log end offset is smaller than the leader's log start offset, the follower should truncate all its logs, roll out a new segment and start to fetch from the current leader's log start offset since the data are all stale. WDYT? ########## core/src/main/scala/kafka/server/AbstractFetcherThread.scala: ########## @@ -400,12 +386,7 @@ abstract class AbstractFetcherThread(name: String, case Errors.OFFSET_OUT_OF_RANGE => if (!handleOutOfRangeError(topicPartition, currentFetchState, fetchPartitionData.currentLeaderEpoch)) partitionsWithError += topicPartition - case Errors.OFFSET_MOVED_TO_TIERED_STORAGE => - debug(s"Received error ${Errors.OFFSET_MOVED_TO_TIERED_STORAGE}, " + - s"at fetch offset: ${currentFetchState.fetchOffset}, " + s"topic-partition: $topicPartition") - if (!handleOffsetsMovedToTieredStorage(topicPartition, currentFetchState, - fetchPartitionData.currentLeaderEpoch, partitionData.logStartOffset())) Review Comment: For the `logStartOffset`, we used to retrieve from `partitionData`, which is from fetchResponse (from leader), and now, we changed to get from `fetchPartitionData`, which is from fetchRequest. Any reason we change it? I'm thinking the logStartOffset should still rely on the leader response to avoid some inconsistency. WDYT? ########## core/src/main/scala/kafka/server/AbstractFetcherThread.scala: ########## @@ -683,33 +655,24 @@ abstract class AbstractFetcherThread(name: String, * produced to the new leader. While the old leader is trying to handle the OffsetOutOfRangeException and query * the log end offset of the new leader, the new leader's log end offset becomes higher than the follower's log end offset. * - * In the first case, the follower's current log end offset is smaller than the leader's log start offset - * (or leader's local log start offset). - * So the follower should truncate all its logs, roll out a new segment and start to fetch from the current - * leader's log start offset(or leader's local log start offset). + * In the first case, the follower's current log end offset is smaller than the leader's log start offset. So the + * follower should truncate all its logs, roll out a new segment and start to fetch from the current leader's log + * start offset. * In the second case, the follower should just keep the current log segments and retry the fetch. In the second * case, there will be some inconsistency of data between old and new leader. We are not solving it here. * If users want to have strong consistency guarantees, appropriate configurations needs to be set for both * brokers and producers. * * Putting the two cases together, the follower should fetch from the higher one of its replica log end offset - * and the current leader's (local-log-start-offset or) log start offset. + * and the current leader's log start offset. */ - val (epoch, leaderStartOffset) = if (fetchFromLocalLogStartOffset) - leader.fetchEarliestLocalOffset(topicPartition, currentLeaderEpoch) else - leader.fetchEarliestOffset(topicPartition, currentLeaderEpoch) - + val (_, leaderStartOffset) = leader.fetchEarliestOffset(topicPartition, currentLeaderEpoch) Review Comment: Could you explain why we change this behavior? I think we should try fetch from local start offset if possible to save time to catch up. But after this change, we always fetch from log start offset (not local log start offset). Why should we change it? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org