[GitHub] [kafka] mattwong949 commented on a diff in pull request #13206: [KAFKA-14685] Refactor logic to handle OFFSET_MOVED_TO_TIERED_STORAGE error

2023-02-24 Thread via GitHub


mattwong949 commented on code in PR #13206:
URL: https://github.com/apache/kafka/pull/13206#discussion_r1117568502


##
core/src/main/java/kafka/server/TierStateMachine.java:
##
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server;

Review Comment:
   actually I remember why I didn't initially put it in the storage package. it 
seems like it wouldn't be a simple change to move this to there because the TSM 
relies on the PartitionFetchState case class in AbstractFetcherThread.scala. it 
would've created a circular dependency across modules that I wanted to avoid. 
@junrao wdyt about keeping it in the core module in this PR



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mattwong949 commented on a diff in pull request #13206: [KAFKA-14685] Refactor logic to handle OFFSET_MOVED_TO_TIERED_STORAGE error

2023-02-24 Thread via GitHub


mattwong949 commented on code in PR #13206:
URL: https://github.com/apache/kafka/pull/13206#discussion_r1117432801


##
core/src/main/java/kafka/server/TierStateMachine.java:
##
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server;

Review Comment:
   yup makes sense I'll move the interface over



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mattwong949 commented on a diff in pull request #13206: [KAFKA-14685] Refactor logic to handle OFFSET_MOVED_TO_TIERED_STORAGE error

2023-02-23 Thread via GitHub


mattwong949 commented on code in PR #13206:
URL: https://github.com/apache/kafka/pull/13206#discussion_r1116390136


##
core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala:
##
@@ -615,14 +649,29 @@ class AbstractFetcherThreadTest {
   @Test
   def testFollowerFetchMovedToTieredStore(): Unit = {
 val partition = new TopicPartition("topic", 0)
-val fetcher = new MockFetcherThread(new MockLeaderEndPoint)
 
 val replicaLog = Seq(
   mkBatch(baseOffset = 0, leaderEpoch = 0, new SimpleRecord("a".getBytes)),
   mkBatch(baseOffset = 1, leaderEpoch = 2, new SimpleRecord("b".getBytes)),
   mkBatch(baseOffset = 2, leaderEpoch = 4, new SimpleRecord("c".getBytes)))
 
 val replicaState = PartitionState(replicaLog, leaderEpoch = 5, 
highWatermark = 0L, rlmEnabled = true)
+
+val mockLeaderEndpoint = new MockLeaderEndPoint
+val mockTierStateMachine = new MockTierStateMachine(mockLeaderEndpoint) {
+  // override the start() of MockTierStateMachine to mimic 
truncateFullyAndStartAt and update the replicaState in the MockFetcherThread
+  override def start(topicPartition: TopicPartition,
+ currentFetchState: PartitionFetchState,
+ fetchPartitionData: FetchResponseData.PartitionData): 
PartitionFetchState = {
+replicaState.log.clear()
+replicaState.localLogStartOffset = 5
+replicaState.logEndOffset = 5

Review Comment:
   hm yeah I see what you are saying, I can make this change. the only somewhat 
tricky part is that we pass in the MockTierStateMachine to the fetcher itself, 
so we'd have to pass the reference to the fetcher to the MockTierStateMachine 
after both are instantiated



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mattwong949 commented on a diff in pull request #13206: [KAFKA-14685] Refactor logic to handle OFFSET_MOVED_TO_TIERED_STORAGE error

2023-02-23 Thread via GitHub


mattwong949 commented on code in PR #13206:
URL: https://github.com/apache/kafka/pull/13206#discussion_r1116326600


##
core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala:
##
@@ -633,13 +669,18 @@ class AbstractFetcherThreadTest {
   mkBatch(baseOffset = 7, leaderEpoch = 5, new SimpleRecord("h".getBytes)),
   mkBatch(baseOffset = 8, leaderEpoch = 5, new SimpleRecord("i".getBytes)))
 
-
 val leaderState = PartitionState(leaderLog, leaderEpoch = 5, highWatermark 
= 8L, rlmEnabled = true)
 // Overriding the log start offset to zero for mocking the scenario of 
segment 0-4 moved to remote store.
 leaderState.logStartOffset = 0
 fetcher.mockLeader.setLeaderState(partition, leaderState)
 
fetcher.mockLeader.setReplicaPartitionStateCallback(fetcher.replicaPartitionState)
 
+def buildRemoteLog(topicPartition: TopicPartition, leaderLogStartOffset: 
Long): Unit = {
+  fetcher.truncateFullyAndStartAt(topicPartition, 
leaderState.localLogStartOffset)
+  replicaState.logStartOffset = leaderLogStartOffset

Review Comment:
   hm ok I think I see what you're suggesting. Let me try the refactor w/o the 
callback. thanks Jun



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mattwong949 commented on a diff in pull request #13206: [KAFKA-14685] Refactor logic to handle OFFSET_MOVED_TO_TIERED_STORAGE error

2023-02-23 Thread via GitHub


mattwong949 commented on code in PR #13206:
URL: https://github.com/apache/kafka/pull/13206#discussion_r1116325758


##
core/src/main/scala/kafka/server/AbstractFetcherThread.scala:
##
@@ -794,17 +742,18 @@ abstract class AbstractFetcherThread(name: String,
*
* @param topicPartition topic partition
* @param fetchState current partition fetch state.
-   * @param leaderEpochInRequest current leader epoch sent in the fetch 
request.
-   * @param leaderLogStartOffset log-start-offset in the leader replica.
+   * @param fetchPartitionData the fetch request data for this topic partition

Review Comment:
   ah thanks for the catches. I have fixed those



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mattwong949 commented on a diff in pull request #13206: [KAFKA-14685] Refactor logic to handle OFFSET_MOVED_TO_TIERED_STORAGE error

2023-02-23 Thread via GitHub


mattwong949 commented on code in PR #13206:
URL: https://github.com/apache/kafka/pull/13206#discussion_r1116323021


##
core/src/main/scala/kafka/server/AbstractFetcherThread.scala:
##
@@ -27,9 +27,11 @@ import kafka.utils.{DelayedItem, Logging, Pool}
 import org.apache.kafka.common.errors._
 import org.apache.kafka.common.internals.PartitionStates
 import 
org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset
+import org.apache.kafka.common.message.FetchResponseData.PartitionData
 import org.apache.kafka.common.message.{FetchResponseData, 
OffsetForLeaderEpochRequestData}
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.record.{FileRecords, MemoryRecords, Records}
+//import org.apache.kafka.common.requests.FetchRequest.PartitionData

Review Comment:
   yup I think I did this from Rittika's comment in an earlier commit



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mattwong949 commented on a diff in pull request #13206: [KAFKA-14685] Refactor logic to handle OFFSET_MOVED_TO_TIERED_STORAGE error

2023-02-21 Thread via GitHub


mattwong949 commented on code in PR #13206:
URL: https://github.com/apache/kafka/pull/13206#discussion_r1113713186


##
core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala:
##
@@ -633,13 +669,18 @@ class AbstractFetcherThreadTest {
   mkBatch(baseOffset = 7, leaderEpoch = 5, new SimpleRecord("h".getBytes)),
   mkBatch(baseOffset = 8, leaderEpoch = 5, new SimpleRecord("i".getBytes)))
 
-
 val leaderState = PartitionState(leaderLog, leaderEpoch = 5, highWatermark 
= 8L, rlmEnabled = true)
 // Overriding the log start offset to zero for mocking the scenario of 
segment 0-4 moved to remote store.
 leaderState.logStartOffset = 0
 fetcher.mockLeader.setLeaderState(partition, leaderState)
 
fetcher.mockLeader.setReplicaPartitionStateCallback(fetcher.replicaPartitionState)
 
+def buildRemoteLog(topicPartition: TopicPartition, leaderLogStartOffset: 
Long): Unit = {
+  fetcher.truncateFullyAndStartAt(topicPartition, 
leaderState.localLogStartOffset)
+  replicaState.logStartOffset = leaderLogStartOffset

Review Comment:
   I had tried to ensure that the MockTierStateMachine code would get invoked 
as part of the test as a sanity check that the TierStateMachine logic is 
getting called from handleOffsetsMovedToTieredStorage. I couldn't think of 
another way to get the replicaPartitionState updated from the 
MockTierStateMachine since it is contained in the MockFetcherThread.
   
   I am not sure about the effectiveness of the test if we override the 
`doWork` from the MockFetcherThread to update the replicaPartitionState given 
my thoughts on trying to invoke the MockTierStateMachine code, but maybe I'm 
misunderstanding the alternative you mentioned.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mattwong949 commented on a diff in pull request #13206: [KAFKA-14685] Refactor logic to handle OFFSET_MOVED_TO_TIERED_STORAGE error

2023-02-21 Thread via GitHub


mattwong949 commented on code in PR #13206:
URL: https://github.com/apache/kafka/pull/13206#discussion_r1113664534


##
core/src/main/scala/kafka/server/AbstractFetcherThread.scala:
##
@@ -683,33 +655,24 @@ abstract class AbstractFetcherThread(name: String,
* produced to the new leader. While the old leader is trying to handle 
the OffsetOutOfRangeException and query
* the log end offset of the new leader, the new leader's log end offset 
becomes higher than the follower's log end offset.
*
-   * In the first case, the follower's current log end offset is smaller 
than the leader's log start offset
-   * (or leader's local log start offset).
-   * So the follower should truncate all its logs, roll out a new segment 
and start to fetch from the current
-   * leader's log start offset(or leader's local log start offset).
+   * In the first case, the follower's current log end offset is smaller 
than the leader's log start offset. So the
+   * follower should truncate all its logs, roll out a new segment and 
start to fetch from the current leader's log
+   * start offset.
* In the second case, the follower should just keep the current log 
segments and retry the fetch. In the second
* case, there will be some inconsistency of data between old and new 
leader. We are not solving it here.
* If users want to have strong consistency guarantees, appropriate 
configurations needs to be set for both
* brokers and producers.
*
* Putting the two cases together, the follower should fetch from the 
higher one of its replica log end offset
-   * and the current leader's (local-log-start-offset or) log start offset.
+   * and the current leader's log start offset.
*/
-  val (epoch, leaderStartOffset) = if (fetchFromLocalLogStartOffset)
-leader.fetchEarliestLocalOffset(topicPartition, currentLeaderEpoch) 
else
-leader.fetchEarliestOffset(topicPartition, currentLeaderEpoch)
-
+  val (_, leaderStartOffset) = leader.fetchEarliestOffset(topicPartition, 
currentLeaderEpoch)

Review Comment:
   +1 to @Hangleton 's comment. I restored this function back to the original 
logic, so this function is only called when first trying to fetch the leader 
log start offset (after starting fetch or after getting the offset out of range 
error). If the follower gets the `OFFSET_MOVED_TO_TIERED_STORAGE` error, it 
proceeds to other the code path to build the remote aux log state via 
TierStateMachine interface



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mattwong949 commented on a diff in pull request #13206: [KAFKA-14685] Refactor logic to handle OFFSET_MOVED_TO_TIERED_STORAGE error

2023-02-21 Thread via GitHub


mattwong949 commented on code in PR #13206:
URL: https://github.com/apache/kafka/pull/13206#discussion_r1113664534


##
core/src/main/scala/kafka/server/AbstractFetcherThread.scala:
##
@@ -683,33 +655,24 @@ abstract class AbstractFetcherThread(name: String,
* produced to the new leader. While the old leader is trying to handle 
the OffsetOutOfRangeException and query
* the log end offset of the new leader, the new leader's log end offset 
becomes higher than the follower's log end offset.
*
-   * In the first case, the follower's current log end offset is smaller 
than the leader's log start offset
-   * (or leader's local log start offset).
-   * So the follower should truncate all its logs, roll out a new segment 
and start to fetch from the current
-   * leader's log start offset(or leader's local log start offset).
+   * In the first case, the follower's current log end offset is smaller 
than the leader's log start offset. So the
+   * follower should truncate all its logs, roll out a new segment and 
start to fetch from the current leader's log
+   * start offset.
* In the second case, the follower should just keep the current log 
segments and retry the fetch. In the second
* case, there will be some inconsistency of data between old and new 
leader. We are not solving it here.
* If users want to have strong consistency guarantees, appropriate 
configurations needs to be set for both
* brokers and producers.
*
* Putting the two cases together, the follower should fetch from the 
higher one of its replica log end offset
-   * and the current leader's (local-log-start-offset or) log start offset.
+   * and the current leader's log start offset.
*/
-  val (epoch, leaderStartOffset) = if (fetchFromLocalLogStartOffset)
-leader.fetchEarliestLocalOffset(topicPartition, currentLeaderEpoch) 
else
-leader.fetchEarliestOffset(topicPartition, currentLeaderEpoch)
-
+  val (_, leaderStartOffset) = leader.fetchEarliestOffset(topicPartition, 
currentLeaderEpoch)

Review Comment:
   +1 to @Hangleton 's comment. I restored this function back to the original 
logic, so this function is only called when first trying to fetch the leader 
log start offset. If the follower gets the `OFFSET_MOVED_TO_TIERED_STORAGE` 
error, it proceeds to the code path to build the remote aux log state



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mattwong949 commented on a diff in pull request #13206: [KAFKA-14685] Refactor logic to handle OFFSET_MOVED_TO_TIERED_STORAGE error

2023-02-21 Thread via GitHub


mattwong949 commented on code in PR #13206:
URL: https://github.com/apache/kafka/pull/13206#discussion_r1113664441


##
core/src/main/scala/kafka/server/AbstractFetcherThread.scala:
##
@@ -400,12 +386,7 @@ abstract class AbstractFetcherThread(name: String,
 case Errors.OFFSET_OUT_OF_RANGE =>
   if (!handleOutOfRangeError(topicPartition, 
currentFetchState, fetchPartitionData.currentLeaderEpoch))
 partitionsWithError += topicPartition
-case Errors.OFFSET_MOVED_TO_TIERED_STORAGE =>
-  debug(s"Received error 
${Errors.OFFSET_MOVED_TO_TIERED_STORAGE}, " +
-s"at fetch offset: ${currentFetchState.fetchOffset}, " + 
s"topic-partition: $topicPartition")
-  if (!handleOffsetsMovedToTieredStorage(topicPartition, 
currentFetchState,
-fetchPartitionData.currentLeaderEpoch, 
partitionData.logStartOffset()))

Review Comment:
   ah i believe this was a mistake. I wanted the `TierStateMachine.start` to 
use the fetch response as well. thanks for the catch



##
core/src/main/scala/kafka/server/AbstractFetcherThread.scala:
##
@@ -683,33 +655,24 @@ abstract class AbstractFetcherThread(name: String,
* produced to the new leader. While the old leader is trying to handle 
the OffsetOutOfRangeException and query
* the log end offset of the new leader, the new leader's log end offset 
becomes higher than the follower's log end offset.
*
-   * In the first case, the follower's current log end offset is smaller 
than the leader's log start offset
-   * (or leader's local log start offset).
-   * So the follower should truncate all its logs, roll out a new segment 
and start to fetch from the current
-   * leader's log start offset(or leader's local log start offset).
+   * In the first case, the follower's current log end offset is smaller 
than the leader's log start offset. So the
+   * follower should truncate all its logs, roll out a new segment and 
start to fetch from the current leader's log
+   * start offset.

Review Comment:
   Yes this was just a revert to the original comments and logic. but this 
makes sense, I can try to clear up the cases. thanks for the comment suggestion



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mattwong949 commented on a diff in pull request #13206: [KAFKA-14685] Refactor logic to handle OFFSET_MOVED_TO_TIERED_STORAGE error

2023-02-08 Thread via GitHub


mattwong949 commented on code in PR #13206:
URL: https://github.com/apache/kafka/pull/13206#discussion_r1100818551


##
core/src/main/scala/kafka/server/AbstractFetcherThread.scala:
##
@@ -785,17 +732,18 @@ abstract class AbstractFetcherThread(name: String,
*
* @param topicPartition topic partition
* @param fetchState current partition fetch state.
-   * @param leaderEpochInRequest current leader epoch sent in the fetch 
request.
-   * @param leaderLogStartOffset log-start-offset in the leader replica.
+   * @param fetchPartitionData the fetch request data for this topic partition

Review Comment:
   the returned value indicates whether the handler method returned with or 
without error. It's mentioned in the documentation description following 
convention of the other handler functions



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mattwong949 commented on a diff in pull request #13206: [KAFKA-14685] Refactor logic to handle OFFSET_MOVED_TO_TIERED_STORAGE error

2023-02-08 Thread via GitHub


mattwong949 commented on code in PR #13206:
URL: https://github.com/apache/kafka/pull/13206#discussion_r1100818551


##
core/src/main/scala/kafka/server/AbstractFetcherThread.scala:
##
@@ -785,17 +732,18 @@ abstract class AbstractFetcherThread(name: String,
*
* @param topicPartition topic partition
* @param fetchState current partition fetch state.
-   * @param leaderEpochInRequest current leader epoch sent in the fetch 
request.
-   * @param leaderLogStartOffset log-start-offset in the leader replica.
+   * @param fetchPartitionData the fetch request data for this topic partition

Review Comment:
   the returned value indicates whether the handler method returned with or 
without error. It's mentioned in the documentation, but I'll update it to be 
more noticeable



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mattwong949 commented on a diff in pull request #13206: [KAFKA-14685] Refactor logic to handle OFFSET_MOVED_TO_TIERED_STORAGE error

2023-02-08 Thread via GitHub


mattwong949 commented on code in PR #13206:
URL: https://github.com/apache/kafka/pull/13206#discussion_r1100818551


##
core/src/main/scala/kafka/server/AbstractFetcherThread.scala:
##
@@ -785,17 +732,18 @@ abstract class AbstractFetcherThread(name: String,
*
* @param topicPartition topic partition
* @param fetchState current partition fetch state.
-   * @param leaderEpochInRequest current leader epoch sent in the fetch 
request.
-   * @param leaderLogStartOffset log-start-offset in the leader replica.
+   * @param fetchPartitionData the fetch request data for this topic partition

Review Comment:
   the returned value indicates whether the handler method returned with or 
without error. I'll update the documentation



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mattwong949 commented on a diff in pull request #13206: [KAFKA-14685] Refactor logic to handle OFFSET_MOVED_TO_TIERED_STORAGE error

2023-02-08 Thread via GitHub


mattwong949 commented on code in PR #13206:
URL: https://github.com/apache/kafka/pull/13206#discussion_r1100802687


##
core/src/main/java/kafka/server/ReplicaFetcherTierStateMachine.java:
##
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server;
+
+import kafka.cluster.Partition;
+import kafka.log.LeaderOffsetIncremented$;
+import kafka.log.UnifiedLog;
+import kafka.log.remote.RemoteLogManager;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import 
org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset;
+import 
org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderPartition;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpointFile;
+import org.apache.kafka.storage.internals.log.EpochEntry;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.StandardCopyOption;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Optional;
+
+import org.apache.kafka.common.requests.FetchRequest.PartitionData;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.common.CheckpointFile;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Option;
+import scala.Tuple2;
+import scala.collection.JavaConverters;
+
+/**
+ The replica fetcher tier state machine follows a state machine progression.
+
+ Currently, the tier state machine follows a synchronous execution, and we 
only need to start the machine.
+ There is no need to advance the state.
+
+ When started, the tier state machine will fetch the local log start offset of 
the
+ leader and then build the follower's remote log aux state until the leader's
+ local log start offset.
+ */
+public class ReplicaFetcherTierStateMachine implements TierStateMachine {
+private static final Logger log = 
LoggerFactory.getLogger(ReplicaFetcherTierStateMachine.class);

Review Comment:
   i'll remove the static



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mattwong949 commented on a diff in pull request #13206: [KAFKA-14685] Refactor logic to handle OFFSET_MOVED_TO_TIERED_STORAGE error

2023-02-08 Thread via GitHub


mattwong949 commented on code in PR #13206:
URL: https://github.com/apache/kafka/pull/13206#discussion_r1100799705


##
core/src/main/java/kafka/server/ReplicaFetcherTierStateMachine.java:
##
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server;
+
+import kafka.cluster.Partition;
+import kafka.log.LeaderOffsetIncremented$;
+import kafka.log.UnifiedLog;
+import kafka.log.remote.RemoteLogManager;
+import kafka.server.checkpoints.LeaderEpochCheckpointFile;
+import kafka.server.epoch.EpochEntry;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import 
org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset;
+import 
org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderPartition;
+import org.apache.kafka.common.protocol.Errors;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.StandardCopyOption;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Optional;
+
+import org.apache.kafka.common.requests.FetchRequest.PartitionData;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.common.CheckpointFile;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Option;
+import scala.Tuple2;
+import scala.collection.JavaConverters;
+import scala.collection.immutable.Seq;
+
+/**
+ The replica fetcher tier state machine follows a state machine progression.
+
+ Currently, the tier state machine follows a synchronous execution and only 
the start is needed.
+ There is no need to advance the state.
+
+ When started, the tier state machine will fetch the local log start offset of 
the
+ leader and then build the follower's remote log aux state until the leader's
+ local log start offset.
+ */
+public class ReplicaFetcherTierStateMachine implements TierStateMachine {
+private static final Logger LOGGER = 
LoggerFactory.getLogger(ReplicaFetcherTierStateMachine.class);
+
+private LeaderEndPoint leader;
+private ReplicaManager replicaMgr;
+private Integer fetchBackOffMs;

Review Comment:
   sorry, thought I had it removed in my previous commits, but I guess not. 
I'll pick it up in the next commits



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mattwong949 commented on a diff in pull request #13206: [KAFKA-14685] Refactor logic to handle OFFSET_MOVED_TO_TIERED_STORAGE error

2023-02-07 Thread via GitHub


mattwong949 commented on code in PR #13206:
URL: https://github.com/apache/kafka/pull/13206#discussion_r1099642781


##
core/src/main/java/kafka/server/ReplicaFetcherTierStateMachine.java:
##
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server;
+
+import kafka.cluster.Partition;
+import kafka.log.LeaderOffsetIncremented$;
+import kafka.log.UnifiedLog;
+import kafka.log.remote.RemoteLogManager;
+import kafka.server.checkpoints.LeaderEpochCheckpointFile;
+import kafka.server.epoch.EpochEntry;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import 
org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset;
+import 
org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderPartition;
+import org.apache.kafka.common.protocol.Errors;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.StandardCopyOption;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Optional;
+
+import org.apache.kafka.common.requests.FetchRequest.PartitionData;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.common.CheckpointFile;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Option;
+import scala.Tuple2;
+import scala.collection.JavaConverters;
+import scala.collection.immutable.Seq;
+
+/**
+ The replica fetcher tier state machine follows a state machine progression.
+
+ Currently, the tier state machine follows a synchronous execution and only 
the start is needed.
+ There is no need to advance the state.
+
+ When started, the tier state machine will fetch the local log start offset of 
the
+ leader and then build the follower's remote log aux state until the leader's
+ local log start offset.
+ */
+public class ReplicaFetcherTierStateMachine implements TierStateMachine {
+private static final Logger LOGGER = 
LoggerFactory.getLogger(ReplicaFetcherTierStateMachine.class);

Review Comment:
   ~~I think the Java checkstyle gave me an error previously. I can change it 
to LOG perhaps?~~
   
   nvm I was able to change it to `log`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mattwong949 commented on a diff in pull request #13206: [KAFKA-14685] Refactor logic to handle OFFSET_MOVED_TO_TIERED_STORAGE error

2023-02-07 Thread via GitHub


mattwong949 commented on code in PR #13206:
URL: https://github.com/apache/kafka/pull/13206#discussion_r1099642781


##
core/src/main/java/kafka/server/ReplicaFetcherTierStateMachine.java:
##
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server;
+
+import kafka.cluster.Partition;
+import kafka.log.LeaderOffsetIncremented$;
+import kafka.log.UnifiedLog;
+import kafka.log.remote.RemoteLogManager;
+import kafka.server.checkpoints.LeaderEpochCheckpointFile;
+import kafka.server.epoch.EpochEntry;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import 
org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset;
+import 
org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderPartition;
+import org.apache.kafka.common.protocol.Errors;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.StandardCopyOption;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Optional;
+
+import org.apache.kafka.common.requests.FetchRequest.PartitionData;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.common.CheckpointFile;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Option;
+import scala.Tuple2;
+import scala.collection.JavaConverters;
+import scala.collection.immutable.Seq;
+
+/**
+ The replica fetcher tier state machine follows a state machine progression.
+
+ Currently, the tier state machine follows a synchronous execution and only 
the start is needed.
+ There is no need to advance the state.
+
+ When started, the tier state machine will fetch the local log start offset of 
the
+ leader and then build the follower's remote log aux state until the leader's
+ local log start offset.
+ */
+public class ReplicaFetcherTierStateMachine implements TierStateMachine {
+private static final Logger LOGGER = 
LoggerFactory.getLogger(ReplicaFetcherTierStateMachine.class);

Review Comment:
   I think the Java checkstyle gave me an error previously. I can change it to 
LOG perhaps?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mattwong949 commented on a diff in pull request #13206: [KAFKA-14685] Refactor logic to handle OFFSET_MOVED_TO_TIERED_STORAGE error

2023-02-07 Thread via GitHub


mattwong949 commented on code in PR #13206:
URL: https://github.com/apache/kafka/pull/13206#discussion_r1099641934


##
core/src/main/java/kafka/server/ReplicaFetcherTierStateMachine.java:
##
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server;
+
+import kafka.cluster.Partition;
+import kafka.log.LeaderOffsetIncremented$;
+import kafka.log.UnifiedLog;
+import kafka.log.remote.RemoteLogManager;
+import kafka.server.checkpoints.LeaderEpochCheckpointFile;
+import kafka.server.epoch.EpochEntry;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import 
org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset;
+import 
org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetForLeaderPartition;
+import org.apache.kafka.common.protocol.Errors;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.StandardCopyOption;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Optional;
+
+import org.apache.kafka.common.requests.FetchRequest.PartitionData;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.common.CheckpointFile;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Option;
+import scala.Tuple2;
+import scala.collection.JavaConverters;
+import scala.collection.immutable.Seq;
+
+/**
+ The replica fetcher tier state machine follows a state machine progression.
+
+ Currently, the tier state machine follows a synchronous execution and only 
the start is needed.
+ There is no need to advance the state.
+
+ When started, the tier state machine will fetch the local log start offset of 
the
+ leader and then build the follower's remote log aux state until the leader's
+ local log start offset.
+ */
+public class ReplicaFetcherTierStateMachine implements TierStateMachine {
+private static final Logger LOGGER = 
LoggerFactory.getLogger(ReplicaFetcherTierStateMachine.class);
+
+private LeaderEndPoint leader;
+private ReplicaManager replicaMgr;
+private Integer fetchBackOffMs;

Review Comment:
   makes sense, I can remove it to clean up the implementation for now



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mattwong949 commented on a diff in pull request #13206: [KAFKA-14685] Refactor logic to handle OFFSET_MOVED_TO_TIERED_STORAGE error

2023-02-07 Thread via GitHub


mattwong949 commented on code in PR #13206:
URL: https://github.com/apache/kafka/pull/13206#discussion_r1099635815


##
core/src/main/scala/kafka/server/AbstractFetcherThread.scala:
##
@@ -400,12 +386,7 @@ abstract class AbstractFetcherThread(name: String,
 case Errors.OFFSET_OUT_OF_RANGE =>
   if (!handleOutOfRangeError(topicPartition, 
currentFetchState, fetchPartitionData.currentLeaderEpoch))
 partitionsWithError += topicPartition
-case Errors.OFFSET_MOVED_TO_TIERED_STORAGE =>
-  debug(s"Received error 
${Errors.OFFSET_MOVED_TO_TIERED_STORAGE}, " +
-s"at fetch offset: ${currentFetchState.fetchOffset}, " + 
s"topic-partition: $topicPartition")
-  if (!handleOffsetsMovedToTieredStorage(topicPartition, 
currentFetchState,
-fetchPartitionData.currentLeaderEpoch, 
partitionData.logStartOffset()))
-partitionsWithError += topicPartition
+

Review Comment:
   this seems to be the white space convention in this part of the code



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org