[GitHub] [kafka] mattwong949 commented on a diff in pull request #13206: [KAFKA-14685] Refactor logic to handle OFFSET_MOVED_TO_TIERED_STORAGE error
mattwong949 commented on code in PR #13206: URL: https://github.com/apache/kafka/pull/13206#discussion_r1117568502 ## core/src/main/java/kafka/server/TierStateMachine.java: ## @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server; Review Comment: actually I remember why I didn't initially put it in the storage package. it seems like it wouldn't be a simple change to move this to there because the TSM relies on the PartitionFetchState case class in AbstractFetcherThread.scala. it would've created a circular dependency across modules that I wanted to avoid. @junrao wdyt about keeping it in the core module in this PR -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mattwong949 commented on a diff in pull request #13206: [KAFKA-14685] Refactor logic to handle OFFSET_MOVED_TO_TIERED_STORAGE error
mattwong949 commented on code in PR #13206: URL: https://github.com/apache/kafka/pull/13206#discussion_r1117432801 ## core/src/main/java/kafka/server/TierStateMachine.java: ## @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server; Review Comment: yup makes sense I'll move the interface over -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mattwong949 commented on a diff in pull request #13206: [KAFKA-14685] Refactor logic to handle OFFSET_MOVED_TO_TIERED_STORAGE error
mattwong949 commented on code in PR #13206: URL: https://github.com/apache/kafka/pull/13206#discussion_r1116390136 ## core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala: ## @@ -615,14 +649,29 @@ class AbstractFetcherThreadTest { @Test def testFollowerFetchMovedToTieredStore(): Unit = { val partition = new TopicPartition("topic", 0) -val fetcher = new MockFetcherThread(new MockLeaderEndPoint) val replicaLog = Seq( mkBatch(baseOffset = 0, leaderEpoch = 0, new SimpleRecord("a".getBytes)), mkBatch(baseOffset = 1, leaderEpoch = 2, new SimpleRecord("b".getBytes)), mkBatch(baseOffset = 2, leaderEpoch = 4, new SimpleRecord("c".getBytes))) val replicaState = PartitionState(replicaLog, leaderEpoch = 5, highWatermark = 0L, rlmEnabled = true) + +val mockLeaderEndpoint = new MockLeaderEndPoint +val mockTierStateMachine = new MockTierStateMachine(mockLeaderEndpoint) { + // override the start() of MockTierStateMachine to mimic truncateFullyAndStartAt and update the replicaState in the MockFetcherThread + override def start(topicPartition: TopicPartition, + currentFetchState: PartitionFetchState, + fetchPartitionData: FetchResponseData.PartitionData): PartitionFetchState = { +replicaState.log.clear() +replicaState.localLogStartOffset = 5 +replicaState.logEndOffset = 5 Review Comment: hm yeah I see what you are saying, I can make this change. the only somewhat tricky part is that we pass in the MockTierStateMachine to the fetcher itself, so we'd have to pass the reference to the fetcher to the MockTierStateMachine after both are instantiated -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mattwong949 commented on a diff in pull request #13206: [KAFKA-14685] Refactor logic to handle OFFSET_MOVED_TO_TIERED_STORAGE error
mattwong949 commented on code in PR #13206: URL: https://github.com/apache/kafka/pull/13206#discussion_r1116326600 ## core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala: ## @@ -633,13 +669,18 @@ class AbstractFetcherThreadTest { mkBatch(baseOffset = 7, leaderEpoch = 5, new SimpleRecord("h".getBytes)), mkBatch(baseOffset = 8, leaderEpoch = 5, new SimpleRecord("i".getBytes))) - val leaderState = PartitionState(leaderLog, leaderEpoch = 5, highWatermark = 8L, rlmEnabled = true) // Overriding the log start offset to zero for mocking the scenario of segment 0-4 moved to remote store. leaderState.logStartOffset = 0 fetcher.mockLeader.setLeaderState(partition, leaderState) fetcher.mockLeader.setReplicaPartitionStateCallback(fetcher.replicaPartitionState) +def buildRemoteLog(topicPartition: TopicPartition, leaderLogStartOffset: Long): Unit = { + fetcher.truncateFullyAndStartAt(topicPartition, leaderState.localLogStartOffset) + replicaState.logStartOffset = leaderLogStartOffset Review Comment: hm ok I think I see what you're suggesting. Let me try the refactor w/o the callback. thanks Jun -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mattwong949 commented on a diff in pull request #13206: [KAFKA-14685] Refactor logic to handle OFFSET_MOVED_TO_TIERED_STORAGE error
mattwong949 commented on code in PR #13206: URL: https://github.com/apache/kafka/pull/13206#discussion_r1116325758 ## core/src/main/scala/kafka/server/AbstractFetcherThread.scala: ## @@ -794,17 +742,18 @@ abstract class AbstractFetcherThread(name: String, * * @param topicPartition topic partition * @param fetchState current partition fetch state. - * @param leaderEpochInRequest current leader epoch sent in the fetch request. - * @param leaderLogStartOffset log-start-offset in the leader replica. + * @param fetchPartitionData the fetch request data for this topic partition Review Comment: ah thanks for the catches. I have fixed those -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mattwong949 commented on a diff in pull request #13206: [KAFKA-14685] Refactor logic to handle OFFSET_MOVED_TO_TIERED_STORAGE error
mattwong949 commented on code in PR #13206: URL: https://github.com/apache/kafka/pull/13206#discussion_r1116323021 ## core/src/main/scala/kafka/server/AbstractFetcherThread.scala: ## @@ -27,9 +27,11 @@ import kafka.utils.{DelayedItem, Logging, Pool} import org.apache.kafka.common.errors._ import org.apache.kafka.common.internals.PartitionStates import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset +import org.apache.kafka.common.message.FetchResponseData.PartitionData import org.apache.kafka.common.message.{FetchResponseData, OffsetForLeaderEpochRequestData} import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.record.{FileRecords, MemoryRecords, Records} +//import org.apache.kafka.common.requests.FetchRequest.PartitionData Review Comment: yup I think I did this from Rittika's comment in an earlier commit -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mattwong949 commented on a diff in pull request #13206: [KAFKA-14685] Refactor logic to handle OFFSET_MOVED_TO_TIERED_STORAGE error
mattwong949 commented on code in PR #13206: URL: https://github.com/apache/kafka/pull/13206#discussion_r1113713186 ## core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala: ## @@ -633,13 +669,18 @@ class AbstractFetcherThreadTest { mkBatch(baseOffset = 7, leaderEpoch = 5, new SimpleRecord("h".getBytes)), mkBatch(baseOffset = 8, leaderEpoch = 5, new SimpleRecord("i".getBytes))) - val leaderState = PartitionState(leaderLog, leaderEpoch = 5, highWatermark = 8L, rlmEnabled = true) // Overriding the log start offset to zero for mocking the scenario of segment 0-4 moved to remote store. leaderState.logStartOffset = 0 fetcher.mockLeader.setLeaderState(partition, leaderState) fetcher.mockLeader.setReplicaPartitionStateCallback(fetcher.replicaPartitionState) +def buildRemoteLog(topicPartition: TopicPartition, leaderLogStartOffset: Long): Unit = { + fetcher.truncateFullyAndStartAt(topicPartition, leaderState.localLogStartOffset) + replicaState.logStartOffset = leaderLogStartOffset Review Comment: I had tried to ensure that the MockTierStateMachine code would get invoked as part of the test as a sanity check that the TierStateMachine logic is getting called from handleOffsetsMovedToTieredStorage. I couldn't think of another way to get the replicaPartitionState updated from the MockTierStateMachine since it is contained in the MockFetcherThread. I am not sure about the effectiveness of the test if we override the `doWork` from the MockFetcherThread to update the replicaPartitionState given my thoughts on trying to invoke the MockTierStateMachine code, but maybe I'm misunderstanding the alternative you mentioned. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mattwong949 commented on a diff in pull request #13206: [KAFKA-14685] Refactor logic to handle OFFSET_MOVED_TO_TIERED_STORAGE error
mattwong949 commented on code in PR #13206: URL: https://github.com/apache/kafka/pull/13206#discussion_r1113664534 ## core/src/main/scala/kafka/server/AbstractFetcherThread.scala: ## @@ -683,33 +655,24 @@ abstract class AbstractFetcherThread(name: String, * produced to the new leader. While the old leader is trying to handle the OffsetOutOfRangeException and query * the log end offset of the new leader, the new leader's log end offset becomes higher than the follower's log end offset. * - * In the first case, the follower's current log end offset is smaller than the leader's log start offset - * (or leader's local log start offset). - * So the follower should truncate all its logs, roll out a new segment and start to fetch from the current - * leader's log start offset(or leader's local log start offset). + * In the first case, the follower's current log end offset is smaller than the leader's log start offset. So the + * follower should truncate all its logs, roll out a new segment and start to fetch from the current leader's log + * start offset. * In the second case, the follower should just keep the current log segments and retry the fetch. In the second * case, there will be some inconsistency of data between old and new leader. We are not solving it here. * If users want to have strong consistency guarantees, appropriate configurations needs to be set for both * brokers and producers. * * Putting the two cases together, the follower should fetch from the higher one of its replica log end offset - * and the current leader's (local-log-start-offset or) log start offset. + * and the current leader's log start offset. */ - val (epoch, leaderStartOffset) = if (fetchFromLocalLogStartOffset) -leader.fetchEarliestLocalOffset(topicPartition, currentLeaderEpoch) else -leader.fetchEarliestOffset(topicPartition, currentLeaderEpoch) - + val (_, leaderStartOffset) = leader.fetchEarliestOffset(topicPartition, currentLeaderEpoch) Review Comment: +1 to @Hangleton 's comment. I restored this function back to the original logic, so this function is only called when first trying to fetch the leader log start offset (after starting fetch or after getting the offset out of range error). If the follower gets the `OFFSET_MOVED_TO_TIERED_STORAGE` error, it proceeds to other the code path to build the remote aux log state via TierStateMachine interface -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mattwong949 commented on a diff in pull request #13206: [KAFKA-14685] Refactor logic to handle OFFSET_MOVED_TO_TIERED_STORAGE error
mattwong949 commented on code in PR #13206: URL: https://github.com/apache/kafka/pull/13206#discussion_r1113664534 ## core/src/main/scala/kafka/server/AbstractFetcherThread.scala: ## @@ -683,33 +655,24 @@ abstract class AbstractFetcherThread(name: String, * produced to the new leader. While the old leader is trying to handle the OffsetOutOfRangeException and query * the log end offset of the new leader, the new leader's log end offset becomes higher than the follower's log end offset. * - * In the first case, the follower's current log end offset is smaller than the leader's log start offset - * (or leader's local log start offset). - * So the follower should truncate all its logs, roll out a new segment and start to fetch from the current - * leader's log start offset(or leader's local log start offset). + * In the first case, the follower's current log end offset is smaller than the leader's log start offset. So the + * follower should truncate all its logs, roll out a new segment and start to fetch from the current leader's log + * start offset. * In the second case, the follower should just keep the current log segments and retry the fetch. In the second * case, there will be some inconsistency of data between old and new leader. We are not solving it here. * If users want to have strong consistency guarantees, appropriate configurations needs to be set for both * brokers and producers. * * Putting the two cases together, the follower should fetch from the higher one of its replica log end offset - * and the current leader's (local-log-start-offset or) log start offset. + * and the current leader's log start offset. */ - val (epoch, leaderStartOffset) = if (fetchFromLocalLogStartOffset) -leader.fetchEarliestLocalOffset(topicPartition, currentLeaderEpoch) else -leader.fetchEarliestOffset(topicPartition, currentLeaderEpoch) - + val (_, leaderStartOffset) = leader.fetchEarliestOffset(topicPartition, currentLeaderEpoch) Review Comment: +1 to @Hangleton 's comment. I restored this function back to the original logic, so this function is only called when first trying to fetch the leader log start offset. If the follower gets the `OFFSET_MOVED_TO_TIERED_STORAGE` error, it proceeds to the code path to build the remote aux log state -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mattwong949 commented on a diff in pull request #13206: [KAFKA-14685] Refactor logic to handle OFFSET_MOVED_TO_TIERED_STORAGE error
mattwong949 commented on code in PR #13206: URL: https://github.com/apache/kafka/pull/13206#discussion_r1113664441 ## core/src/main/scala/kafka/server/AbstractFetcherThread.scala: ## @@ -400,12 +386,7 @@ abstract class AbstractFetcherThread(name: String, case Errors.OFFSET_OUT_OF_RANGE => if (!handleOutOfRangeError(topicPartition, currentFetchState, fetchPartitionData.currentLeaderEpoch)) partitionsWithError += topicPartition -case Errors.OFFSET_MOVED_TO_TIERED_STORAGE => - debug(s"Received error ${Errors.OFFSET_MOVED_TO_TIERED_STORAGE}, " + -s"at fetch offset: ${currentFetchState.fetchOffset}, " + s"topic-partition: $topicPartition") - if (!handleOffsetsMovedToTieredStorage(topicPartition, currentFetchState, -fetchPartitionData.currentLeaderEpoch, partitionData.logStartOffset())) Review Comment: ah i believe this was a mistake. I wanted the `TierStateMachine.start` to use the fetch response as well. thanks for the catch ## core/src/main/scala/kafka/server/AbstractFetcherThread.scala: ## @@ -683,33 +655,24 @@ abstract class AbstractFetcherThread(name: String, * produced to the new leader. While the old leader is trying to handle the OffsetOutOfRangeException and query * the log end offset of the new leader, the new leader's log end offset becomes higher than the follower's log end offset. * - * In the first case, the follower's current log end offset is smaller than the leader's log start offset - * (or leader's local log start offset). - * So the follower should truncate all its logs, roll out a new segment and start to fetch from the current - * leader's log start offset(or leader's local log start offset). + * In the first case, the follower's current log end offset is smaller than the leader's log start offset. So the + * follower should truncate all its logs, roll out a new segment and start to fetch from the current leader's log + * start offset. Review Comment: Yes this was just a revert to the original comments and logic. but this makes sense, I can try to clear up the cases. thanks for the comment suggestion -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mattwong949 commented on a diff in pull request #13206: [KAFKA-14685] Refactor logic to handle OFFSET_MOVED_TO_TIERED_STORAGE error
mattwong949 commented on code in PR #13206: URL: https://github.com/apache/kafka/pull/13206#discussion_r1100818551 ## core/src/main/scala/kafka/server/AbstractFetcherThread.scala: ## @@ -785,17 +732,18 @@ abstract class AbstractFetcherThread(name: String, * * @param topicPartition topic partition * @param fetchState current partition fetch state. - * @param leaderEpochInRequest current leader epoch sent in the fetch request. - * @param leaderLogStartOffset log-start-offset in the leader replica. + * @param fetchPartitionData the fetch request data for this topic partition Review Comment: the returned value indicates whether the handler method returned with or without error. It's mentioned in the documentation description following convention of the other handler functions -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mattwong949 commented on a diff in pull request #13206: [KAFKA-14685] Refactor logic to handle OFFSET_MOVED_TO_TIERED_STORAGE error
mattwong949 commented on code in PR #13206: URL: https://github.com/apache/kafka/pull/13206#discussion_r1100818551 ## core/src/main/scala/kafka/server/AbstractFetcherThread.scala: ## @@ -785,17 +732,18 @@ abstract class AbstractFetcherThread(name: String, * * @param topicPartition topic partition * @param fetchState current partition fetch state. - * @param leaderEpochInRequest current leader epoch sent in the fetch request. - * @param leaderLogStartOffset log-start-offset in the leader replica. + * @param fetchPartitionData the fetch request data for this topic partition Review Comment: the returned value indicates whether the handler method returned with or without error. It's mentioned in the documentation, but I'll update it to be more noticeable -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mattwong949 commented on a diff in pull request #13206: [KAFKA-14685] Refactor logic to handle OFFSET_MOVED_TO_TIERED_STORAGE error
mattwong949 commented on code in PR #13206: URL: https://github.com/apache/kafka/pull/13206#discussion_r1100818551 ## core/src/main/scala/kafka/server/AbstractFetcherThread.scala: ## @@ -785,17 +732,18 @@ abstract class AbstractFetcherThread(name: String, * * @param topicPartition topic partition * @param fetchState current partition fetch state. - * @param leaderEpochInRequest current leader epoch sent in the fetch request. - * @param leaderLogStartOffset log-start-offset in the leader replica. + * @param fetchPartitionData the fetch request data for this topic partition Review Comment: the returned value indicates whether the handler method returned with or without error. I'll update the documentation -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mattwong949 commented on a diff in pull request #13206: [KAFKA-14685] Refactor logic to handle OFFSET_MOVED_TO_TIERED_STORAGE error
mattwong949 commented on code in PR #13206: URL: https://github.com/apache/kafka/pull/13206#discussion_r1100802687 ## core/src/main/java/kafka/server/ReplicaFetcherTierStateMachine.java: ## @@ -0,0 +1,265 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server; + +import kafka.cluster.Partition; +import kafka.log.LeaderOffsetIncremented$; +import kafka.log.UnifiedLog; +import kafka.log.remote.RemoteLogManager; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset; +import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderPartition; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpointFile; +import org.apache.kafka.storage.internals.log.EpochEntry; + +import java.io.BufferedReader; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.StandardCopyOption; +import java.util.HashMap; +import java.util.List; +import java.util.Optional; + +import org.apache.kafka.common.requests.FetchRequest.PartitionData; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.server.common.CheckpointFile; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata; +import org.apache.kafka.server.log.remote.storage.RemoteStorageException; +import org.apache.kafka.server.log.remote.storage.RemoteStorageManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.Option; +import scala.Tuple2; +import scala.collection.JavaConverters; + +/** + The replica fetcher tier state machine follows a state machine progression. + + Currently, the tier state machine follows a synchronous execution, and we only need to start the machine. + There is no need to advance the state. + + When started, the tier state machine will fetch the local log start offset of the + leader and then build the follower's remote log aux state until the leader's + local log start offset. + */ +public class ReplicaFetcherTierStateMachine implements TierStateMachine { +private static final Logger log = LoggerFactory.getLogger(ReplicaFetcherTierStateMachine.class); Review Comment: i'll remove the static -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mattwong949 commented on a diff in pull request #13206: [KAFKA-14685] Refactor logic to handle OFFSET_MOVED_TO_TIERED_STORAGE error
mattwong949 commented on code in PR #13206: URL: https://github.com/apache/kafka/pull/13206#discussion_r1100799705 ## core/src/main/java/kafka/server/ReplicaFetcherTierStateMachine.java: ## @@ -0,0 +1,246 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server; + +import kafka.cluster.Partition; +import kafka.log.LeaderOffsetIncremented$; +import kafka.log.UnifiedLog; +import kafka.log.remote.RemoteLogManager; +import kafka.server.checkpoints.LeaderEpochCheckpointFile; +import kafka.server.epoch.EpochEntry; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset; +import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderPartition; +import org.apache.kafka.common.protocol.Errors; + +import java.io.BufferedReader; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.StandardCopyOption; +import java.util.HashMap; +import java.util.List; +import java.util.Optional; + +import org.apache.kafka.common.requests.FetchRequest.PartitionData; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.server.common.CheckpointFile; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata; +import org.apache.kafka.server.log.remote.storage.RemoteStorageException; +import org.apache.kafka.server.log.remote.storage.RemoteStorageManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.Option; +import scala.Tuple2; +import scala.collection.JavaConverters; +import scala.collection.immutable.Seq; + +/** + The replica fetcher tier state machine follows a state machine progression. + + Currently, the tier state machine follows a synchronous execution and only the start is needed. + There is no need to advance the state. + + When started, the tier state machine will fetch the local log start offset of the + leader and then build the follower's remote log aux state until the leader's + local log start offset. + */ +public class ReplicaFetcherTierStateMachine implements TierStateMachine { +private static final Logger LOGGER = LoggerFactory.getLogger(ReplicaFetcherTierStateMachine.class); + +private LeaderEndPoint leader; +private ReplicaManager replicaMgr; +private Integer fetchBackOffMs; Review Comment: sorry, thought I had it removed in my previous commits, but I guess not. I'll pick it up in the next commits -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mattwong949 commented on a diff in pull request #13206: [KAFKA-14685] Refactor logic to handle OFFSET_MOVED_TO_TIERED_STORAGE error
mattwong949 commented on code in PR #13206: URL: https://github.com/apache/kafka/pull/13206#discussion_r1099642781 ## core/src/main/java/kafka/server/ReplicaFetcherTierStateMachine.java: ## @@ -0,0 +1,246 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server; + +import kafka.cluster.Partition; +import kafka.log.LeaderOffsetIncremented$; +import kafka.log.UnifiedLog; +import kafka.log.remote.RemoteLogManager; +import kafka.server.checkpoints.LeaderEpochCheckpointFile; +import kafka.server.epoch.EpochEntry; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset; +import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderPartition; +import org.apache.kafka.common.protocol.Errors; + +import java.io.BufferedReader; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.StandardCopyOption; +import java.util.HashMap; +import java.util.List; +import java.util.Optional; + +import org.apache.kafka.common.requests.FetchRequest.PartitionData; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.server.common.CheckpointFile; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata; +import org.apache.kafka.server.log.remote.storage.RemoteStorageException; +import org.apache.kafka.server.log.remote.storage.RemoteStorageManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.Option; +import scala.Tuple2; +import scala.collection.JavaConverters; +import scala.collection.immutable.Seq; + +/** + The replica fetcher tier state machine follows a state machine progression. + + Currently, the tier state machine follows a synchronous execution and only the start is needed. + There is no need to advance the state. + + When started, the tier state machine will fetch the local log start offset of the + leader and then build the follower's remote log aux state until the leader's + local log start offset. + */ +public class ReplicaFetcherTierStateMachine implements TierStateMachine { +private static final Logger LOGGER = LoggerFactory.getLogger(ReplicaFetcherTierStateMachine.class); Review Comment: ~~I think the Java checkstyle gave me an error previously. I can change it to LOG perhaps?~~ nvm I was able to change it to `log` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mattwong949 commented on a diff in pull request #13206: [KAFKA-14685] Refactor logic to handle OFFSET_MOVED_TO_TIERED_STORAGE error
mattwong949 commented on code in PR #13206: URL: https://github.com/apache/kafka/pull/13206#discussion_r1099642781 ## core/src/main/java/kafka/server/ReplicaFetcherTierStateMachine.java: ## @@ -0,0 +1,246 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server; + +import kafka.cluster.Partition; +import kafka.log.LeaderOffsetIncremented$; +import kafka.log.UnifiedLog; +import kafka.log.remote.RemoteLogManager; +import kafka.server.checkpoints.LeaderEpochCheckpointFile; +import kafka.server.epoch.EpochEntry; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset; +import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderPartition; +import org.apache.kafka.common.protocol.Errors; + +import java.io.BufferedReader; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.StandardCopyOption; +import java.util.HashMap; +import java.util.List; +import java.util.Optional; + +import org.apache.kafka.common.requests.FetchRequest.PartitionData; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.server.common.CheckpointFile; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata; +import org.apache.kafka.server.log.remote.storage.RemoteStorageException; +import org.apache.kafka.server.log.remote.storage.RemoteStorageManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.Option; +import scala.Tuple2; +import scala.collection.JavaConverters; +import scala.collection.immutable.Seq; + +/** + The replica fetcher tier state machine follows a state machine progression. + + Currently, the tier state machine follows a synchronous execution and only the start is needed. + There is no need to advance the state. + + When started, the tier state machine will fetch the local log start offset of the + leader and then build the follower's remote log aux state until the leader's + local log start offset. + */ +public class ReplicaFetcherTierStateMachine implements TierStateMachine { +private static final Logger LOGGER = LoggerFactory.getLogger(ReplicaFetcherTierStateMachine.class); Review Comment: I think the Java checkstyle gave me an error previously. I can change it to LOG perhaps? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mattwong949 commented on a diff in pull request #13206: [KAFKA-14685] Refactor logic to handle OFFSET_MOVED_TO_TIERED_STORAGE error
mattwong949 commented on code in PR #13206: URL: https://github.com/apache/kafka/pull/13206#discussion_r1099641934 ## core/src/main/java/kafka/server/ReplicaFetcherTierStateMachine.java: ## @@ -0,0 +1,246 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server; + +import kafka.cluster.Partition; +import kafka.log.LeaderOffsetIncremented$; +import kafka.log.UnifiedLog; +import kafka.log.remote.RemoteLogManager; +import kafka.server.checkpoints.LeaderEpochCheckpointFile; +import kafka.server.epoch.EpochEntry; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset; +import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderPartition; +import org.apache.kafka.common.protocol.Errors; + +import java.io.BufferedReader; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.StandardCopyOption; +import java.util.HashMap; +import java.util.List; +import java.util.Optional; + +import org.apache.kafka.common.requests.FetchRequest.PartitionData; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.server.common.CheckpointFile; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata; +import org.apache.kafka.server.log.remote.storage.RemoteStorageException; +import org.apache.kafka.server.log.remote.storage.RemoteStorageManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.Option; +import scala.Tuple2; +import scala.collection.JavaConverters; +import scala.collection.immutable.Seq; + +/** + The replica fetcher tier state machine follows a state machine progression. + + Currently, the tier state machine follows a synchronous execution and only the start is needed. + There is no need to advance the state. + + When started, the tier state machine will fetch the local log start offset of the + leader and then build the follower's remote log aux state until the leader's + local log start offset. + */ +public class ReplicaFetcherTierStateMachine implements TierStateMachine { +private static final Logger LOGGER = LoggerFactory.getLogger(ReplicaFetcherTierStateMachine.class); + +private LeaderEndPoint leader; +private ReplicaManager replicaMgr; +private Integer fetchBackOffMs; Review Comment: makes sense, I can remove it to clean up the implementation for now -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mattwong949 commented on a diff in pull request #13206: [KAFKA-14685] Refactor logic to handle OFFSET_MOVED_TO_TIERED_STORAGE error
mattwong949 commented on code in PR #13206: URL: https://github.com/apache/kafka/pull/13206#discussion_r1099635815 ## core/src/main/scala/kafka/server/AbstractFetcherThread.scala: ## @@ -400,12 +386,7 @@ abstract class AbstractFetcherThread(name: String, case Errors.OFFSET_OUT_OF_RANGE => if (!handleOutOfRangeError(topicPartition, currentFetchState, fetchPartitionData.currentLeaderEpoch)) partitionsWithError += topicPartition -case Errors.OFFSET_MOVED_TO_TIERED_STORAGE => - debug(s"Received error ${Errors.OFFSET_MOVED_TO_TIERED_STORAGE}, " + -s"at fetch offset: ${currentFetchState.fetchOffset}, " + s"topic-partition: $topicPartition") - if (!handleOffsetsMovedToTieredStorage(topicPartition, currentFetchState, -fetchPartitionData.currentLeaderEpoch, partitionData.logStartOffset())) -partitionsWithError += topicPartition + Review Comment: this seems to be the white space convention in this part of the code -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org