[GitHub] [kafka] Hangleton commented on a diff in pull request #13206: [KAFKA-14685] Refactor logic to handle OFFSET_MOVED_TO_TIERED_STORAGE error

2023-02-10 Thread via GitHub


Hangleton commented on code in PR #13206:
URL: https://github.com/apache/kafka/pull/13206#discussion_r1102707234


##
core/src/main/scala/kafka/server/AbstractFetcherThread.scala:
##
@@ -683,33 +655,24 @@ abstract class AbstractFetcherThread(name: String,
* produced to the new leader. While the old leader is trying to handle 
the OffsetOutOfRangeException and query
* the log end offset of the new leader, the new leader's log end offset 
becomes higher than the follower's log end offset.
*
-   * In the first case, the follower's current log end offset is smaller 
than the leader's log start offset
-   * (or leader's local log start offset).
-   * So the follower should truncate all its logs, roll out a new segment 
and start to fetch from the current
-   * leader's log start offset(or leader's local log start offset).
+   * In the first case, the follower's current log end offset is smaller 
than the leader's log start offset. So the
+   * follower should truncate all its logs, roll out a new segment and 
start to fetch from the current leader's log
+   * start offset.
* In the second case, the follower should just keep the current log 
segments and retry the fetch. In the second
* case, there will be some inconsistency of data between old and new 
leader. We are not solving it here.
* If users want to have strong consistency guarantees, appropriate 
configurations needs to be set for both
* brokers and producers.
*
* Putting the two cases together, the follower should fetch from the 
higher one of its replica log end offset
-   * and the current leader's (local-log-start-offset or) log start offset.
+   * and the current leader's log start offset.
*/
-  val (epoch, leaderStartOffset) = if (fetchFromLocalLogStartOffset)
-leader.fetchEarliestLocalOffset(topicPartition, currentLeaderEpoch) 
else
-leader.fetchEarliestOffset(topicPartition, currentLeaderEpoch)
-
+  val (_, leaderStartOffset) = leader.fetchEarliestOffset(topicPartition, 
currentLeaderEpoch)

Review Comment:
   IIUC, this is correct. We should try to fetch from the leader log start 
offset instead of the local leader log start offset so that in the case where 
the leader log start offset < local leader log start offset, the leader returns 
an offset-move-to-tiered-storage error and the follower takes the related code 
path to reconstruct the local replica log prefix.



##
core/src/main/scala/kafka/server/AbstractFetcherThread.scala:
##
@@ -683,33 +655,24 @@ abstract class AbstractFetcherThread(name: String,
* produced to the new leader. While the old leader is trying to handle 
the OffsetOutOfRangeException and query
* the log end offset of the new leader, the new leader's log end offset 
becomes higher than the follower's log end offset.
*
-   * In the first case, the follower's current log end offset is smaller 
than the leader's log start offset
-   * (or leader's local log start offset).
-   * So the follower should truncate all its logs, roll out a new segment 
and start to fetch from the current
-   * leader's log start offset(or leader's local log start offset).
+   * In the first case, the follower's current log end offset is smaller 
than the leader's log start offset. So the
+   * follower should truncate all its logs, roll out a new segment and 
start to fetch from the current leader's log
+   * start offset.

Review Comment:
   I think this case refers to line 674. This comment reverts back to the 
original one, before the change introduced for TS. Agreed it could be made 
clearer though, perhaps by referencing explicitly the case below. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] Hangleton commented on a diff in pull request #13206: [KAFKA-14685] Refactor logic to handle OFFSET_MOVED_TO_TIERED_STORAGE error

2023-02-08 Thread via GitHub


Hangleton commented on code in PR #13206:
URL: https://github.com/apache/kafka/pull/13206#discussion_r1100057051


##
core/src/main/java/kafka/server/ReplicaFetcherTierStateMachine.java:
##
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server;
+
+import kafka.cluster.Partition;
+import kafka.log.LeaderOffsetIncremented$;
+import kafka.log.UnifiedLog;
+import kafka.log.remote.RemoteLogManager;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import 
org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset;
+import 
org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderPartition;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpointFile;
+import org.apache.kafka.storage.internals.log.EpochEntry;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.StandardCopyOption;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Optional;
+
+import org.apache.kafka.common.requests.FetchRequest.PartitionData;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.common.CheckpointFile;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Option;
+import scala.Tuple2;
+import scala.collection.JavaConverters;
+
+/**
+ The replica fetcher tier state machine follows a state machine progression.
+
+ Currently, the tier state machine follows a synchronous execution, and we 
only need to start the machine.
+ There is no need to advance the state.
+
+ When started, the tier state machine will fetch the local log start offset of 
the
+ leader and then build the follower's remote log aux state until the leader's
+ local log start offset.
+ */
+public class ReplicaFetcherTierStateMachine implements TierStateMachine {
+private static final Logger log = 
LoggerFactory.getLogger(ReplicaFetcherTierStateMachine.class);
+
+private LeaderEndPoint leader;
+private ReplicaManager replicaMgr;
+private Integer fetchBackOffMs;
+
+public ReplicaFetcherTierStateMachine(LeaderEndPoint leader,
+  ReplicaManager replicaMgr) {
+this.leader = leader;
+this.replicaMgr = replicaMgr;
+}
+
+
+/**
+ * Start the tier state machine for the provided topic partition. 
Currently, this start method will build the
+ * entire remote aux log state synchronously.
+ *
+ * @param topicPartition the topic partition
+ * @param currentFetchState the current PartitionFetchState which will
+ *  be used to derive the return value
+ * @param fetchPartitionData the data from the fetch response that 
returned the offset moved to tiered storage error
+ *
+ * @return the new PartitionFetchState after the successful start of the
+ * tier state machine
+ */
+public PartitionFetchState start(TopicPartition topicPartition,
+ PartitionFetchState currentFetchState,
+ PartitionData fetchPartitionData) throws 
Exception {
+
+Tuple2 epochAndLeaderLocalStartOffset = 
leader.fetchEarliestLocalOffset(topicPartition, 
currentFetchState.currentLeaderEpoch());
+int epoch = (int) epochAndLeaderLocalStartOffset._1;
+long leaderLocalStartOffset = (long) epochAndLeaderLocalStartOffset._2;
+
+long offsetToFetch = buildRemoteLogAuxState(topicPartition, 
currentFetchState.currentLeaderEpoch(), leaderLocalStartOffset, epoch, 
fetchPartitionData.logStartOffset);
+
+Tuple2 fetchLatestOffsetResult = 
leader.fetchLatestOffset(topicPartition, 
currentFetchState.currentLeaderEpoch());
+long l

[GitHub] [kafka] Hangleton commented on a diff in pull request #13206: [KAFKA-14685] Refactor logic to handle OFFSET_MOVED_TO_TIERED_STORAGE error

2023-02-08 Thread via GitHub


Hangleton commented on code in PR #13206:
URL: https://github.com/apache/kafka/pull/13206#discussion_r1099947908


##
core/src/main/scala/kafka/server/AbstractFetcherThread.scala:
##
@@ -785,17 +732,18 @@ abstract class AbstractFetcherThread(name: String,
*
* @param topicPartition topic partition
* @param fetchState current partition fetch state.
-   * @param leaderEpochInRequest current leader epoch sent in the fetch 
request.
-   * @param leaderLogStartOffset log-start-offset in the leader replica.
+   * @param fetchPartitionData the fetch request data for this topic partition

Review Comment:
   What is the returned boolean indicating?



##
core/src/main/java/kafka/server/ReplicaFetcherTierStateMachine.java:
##
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server;
+
+import kafka.cluster.Partition;
+import kafka.log.LeaderOffsetIncremented$;
+import kafka.log.UnifiedLog;
+import kafka.log.remote.RemoteLogManager;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import 
org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset;
+import 
org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderPartition;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpointFile;
+import org.apache.kafka.storage.internals.log.EpochEntry;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.StandardCopyOption;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Optional;
+
+import org.apache.kafka.common.requests.FetchRequest.PartitionData;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.common.CheckpointFile;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Option;
+import scala.Tuple2;
+import scala.collection.JavaConverters;
+
+/**
+ The replica fetcher tier state machine follows a state machine progression.
+
+ Currently, the tier state machine follows a synchronous execution, and we 
only need to start the machine.
+ There is no need to advance the state.
+
+ When started, the tier state machine will fetch the local log start offset of 
the
+ leader and then build the follower's remote log aux state until the leader's
+ local log start offset.
+ */
+public class ReplicaFetcherTierStateMachine implements TierStateMachine {
+private static final Logger log = 
LoggerFactory.getLogger(ReplicaFetcherTierStateMachine.class);
+
+private LeaderEndPoint leader;
+private ReplicaManager replicaMgr;
+private Integer fetchBackOffMs;
+
+public ReplicaFetcherTierStateMachine(LeaderEndPoint leader,
+  ReplicaManager replicaMgr) {
+this.leader = leader;
+this.replicaMgr = replicaMgr;
+}
+
+
+/**
+ * Start the tier state machine for the provided topic partition. 
Currently, this start method will build the
+ * entire remote aux log state synchronously.
+ *
+ * @param topicPartition the topic partition
+ * @param currentFetchState the current PartitionFetchState which will
+ *  be used to derive the return value
+ * @param fetchPartitionData the data from the fetch response that 
returned the offset moved to tiered storage error
+ *
+ * @return the new PartitionFetchState after the successful start of the
+ * tier state machine
+ */
+public PartitionFetchState start(TopicPartition topicPartition,
+ PartitionFetchState currentFetchState,
+ PartitionData fetchPartitionData) throws 
Exception {
+
+Tuple2 epochAndLeaderLocalStartOff