showuon commented on code in PR #13206:
URL: https://github.com/apache/kafka/pull/13206#discussion_r1102353791


##########
core/src/main/java/kafka/server/TierStateMachine.java:
##########
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server;
+
+import java.util.Optional;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.requests.FetchRequest.PartitionData;
+
+/**
+ * This interface defines the APIs needed to handle any state transitions
+ * related to tiering in AbstractFetcherThread.

Review Comment:
   nit: Could we remove the `AbstractFetcherThread` class name here? Just `This 
interface defines the APIs needed to handle any state transitions related to 
tiering` to make it general. Otherwise, this will confuse readers if something 
changed in `AbstractFetcherThread`.



##########
core/src/main/java/kafka/server/TierStateMachine.java:
##########
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server;
+
+import java.util.Optional;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.requests.FetchRequest.PartitionData;
+
+/**
+ * This interface defines the APIs needed to handle any state transitions
+ * related to tiering in AbstractFetcherThread.
+ */
+public interface TierStateMachine {
+
+    /**
+     * Start the tier state machine for the provided topic partition.
+     *
+     * @param topicPartition the topic partition
+     * @param currentFetchState the current PartitionFetchState which will
+     *                          be used to derive the return value
+     * @param fetchPartitionData the data from the fetch response that 
returned the offset moved to tiered storage error
+     *
+     * @return the new PartitionFetchState after the successful start of the
+     *         tier state machine
+     */
+    PartitionFetchState start(TopicPartition topicPartition,
+                              PartitionFetchState currentFetchState,
+                              PartitionData fetchPartitionData) throws 
Exception;
+
+    /**
+     * Optionally advance the state of the tier state machine, based on the
+     * current PartitionFetchState. The decision to advance the tier
+     * state machine is implementation specific.
+     *
+     * @param topicPartition the topic partition
+     * @param currentFetchState the current PartitionFetchState which will
+     *                          be used to derive the return value
+     *
+     * @return the new PartitionFetchState if the tier state machine was 
advanced

Review Comment:
   nit: return the new PartitionFetchState if the tier state machine was 
advanced, **otherwise, return the currentFetchState**



##########
core/src/main/scala/kafka/server/AbstractFetcherThread.scala:
##########
@@ -683,33 +655,24 @@ abstract class AbstractFetcherThread(name: String,
        * produced to the new leader. While the old leader is trying to handle 
the OffsetOutOfRangeException and query
        * the log end offset of the new leader, the new leader's log end offset 
becomes higher than the follower's log end offset.
        *
-       * In the first case, the follower's current log end offset is smaller 
than the leader's log start offset
-       * (or leader's local log start offset).
-       * So the follower should truncate all its logs, roll out a new segment 
and start to fetch from the current
-       * leader's log start offset(or leader's local log start offset).
+       * In the first case, the follower's current log end offset is smaller 
than the leader's log start offset. So the
+       * follower should truncate all its logs, roll out a new segment and 
start to fetch from the current leader's log
+       * start offset.

Review Comment:
   I read through the code and comments, it is not correct. We're saying 
`leaderEndOffset >= replicaEndOffset` case here, not `leaderStratOffset`. The 
leaderStartOffset is another case under leaderEndOffset >= replicaEndOffset`. 
So, maybe change to:
   
   In the first case, [if] the follower's current log end offset is smaller 
than the leader's log start offset, the follower should truncate all its logs, 
roll out a new segment and start to fetch from the current leader's log start 
offset since the data are all stale.
   
   WDYT?



##########
core/src/main/scala/kafka/server/AbstractFetcherThread.scala:
##########
@@ -400,12 +386,7 @@ abstract class AbstractFetcherThread(name: String,
                 case Errors.OFFSET_OUT_OF_RANGE =>
                   if (!handleOutOfRangeError(topicPartition, 
currentFetchState, fetchPartitionData.currentLeaderEpoch))
                     partitionsWithError += topicPartition
-                case Errors.OFFSET_MOVED_TO_TIERED_STORAGE =>
-                  debug(s"Received error 
${Errors.OFFSET_MOVED_TO_TIERED_STORAGE}, " +
-                    s"at fetch offset: ${currentFetchState.fetchOffset}, " + 
s"topic-partition: $topicPartition")
-                  if (!handleOffsetsMovedToTieredStorage(topicPartition, 
currentFetchState,
-                    fetchPartitionData.currentLeaderEpoch, 
partitionData.logStartOffset()))

Review Comment:
   For the `logStartOffset`, we used to retrieve from `partitionData`, which is 
from fetchResponse (from leader), and now, we changed to get from 
`fetchPartitionData`, which is from fetchRequest. Any reason we change it? I'm 
thinking the logStartOffset should still rely on the leader response to avoid 
some inconsistency. WDYT?



##########
core/src/main/scala/kafka/server/AbstractFetcherThread.scala:
##########
@@ -683,33 +655,24 @@ abstract class AbstractFetcherThread(name: String,
        * produced to the new leader. While the old leader is trying to handle 
the OffsetOutOfRangeException and query
        * the log end offset of the new leader, the new leader's log end offset 
becomes higher than the follower's log end offset.
        *
-       * In the first case, the follower's current log end offset is smaller 
than the leader's log start offset
-       * (or leader's local log start offset).
-       * So the follower should truncate all its logs, roll out a new segment 
and start to fetch from the current
-       * leader's log start offset(or leader's local log start offset).
+       * In the first case, the follower's current log end offset is smaller 
than the leader's log start offset. So the
+       * follower should truncate all its logs, roll out a new segment and 
start to fetch from the current leader's log
+       * start offset.
        * In the second case, the follower should just keep the current log 
segments and retry the fetch. In the second
        * case, there will be some inconsistency of data between old and new 
leader. We are not solving it here.
        * If users want to have strong consistency guarantees, appropriate 
configurations needs to be set for both
        * brokers and producers.
        *
        * Putting the two cases together, the follower should fetch from the 
higher one of its replica log end offset
-       * and the current leader's (local-log-start-offset or) log start offset.
+       * and the current leader's log start offset.
        */
-      val (epoch, leaderStartOffset) = if (fetchFromLocalLogStartOffset)
-        leader.fetchEarliestLocalOffset(topicPartition, currentLeaderEpoch) 
else
-        leader.fetchEarliestOffset(topicPartition, currentLeaderEpoch)
-
+      val (_, leaderStartOffset) = leader.fetchEarliestOffset(topicPartition, 
currentLeaderEpoch)

Review Comment:
   Could you explain why we change this behavior? I think we should try fetch 
from local start offset if possible to save time to catch up. But after this 
change, we always fetch from log start offset (not local log start offset). Why 
should we change it?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to