Re: [PR] KAFKA-16399: Add JBOD support in tiered storage [kafka]

2024-05-29 Thread via GitHub


showuon merged PR #15690:
URL: https://github.com/apache/kafka/pull/15690


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16399: Add JBOD support in tiered storage [kafka]

2024-05-29 Thread via GitHub


showuon commented on PR #15690:
URL: https://github.com/apache/kafka/pull/15690#issuecomment-2136661825

   > @showuon Thanks for addressing 
[KAFKA-16709](https://issues.apache.org/jira/browse/KAFKA-16709) , 
[KAFKA-16711](https://issues.apache.org/jira/browse/KAFKA-16711). Do you want 
to rebase the PR with the trunk to pull those changes that remove the test 
flakiness?
   
   Just rebased. Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16399: Add JBOD support in tiered storage [kafka]

2024-05-28 Thread via GitHub


satishd commented on PR #15690:
URL: https://github.com/apache/kafka/pull/15690#issuecomment-2135628611

   @showuon Thanks for addressing KAFKA-16709 , KAFKA-16711. Do you want to 
rebase the PR with the trunk to pull those changes that remove the test 
flakiness?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16399: Add JBOD support in tiered storage [kafka]

2024-05-13 Thread via GitHub


showuon commented on code in PR #15690:
URL: https://github.com/apache/kafka/pull/15690#discussion_r1598169071


##
storage/src/test/java/org/apache/kafka/tiered/storage/utils/TieredStorageTestUtils.java:
##
@@ -55,7 +57,7 @@ public class TieredStorageTestUtils {
 
 // Log cleanup interval is configured to be 500 ms. We need to wait at 
least that amount of time before
 // segments eligible for deletion gets physically removed.
-public static final Integer STORAGE_WAIT_TIMEOUT_SEC = 5;
+public static final Integer STORAGE_WAIT_TIMEOUT_SEC = 10;

Review Comment:
   I've identified the flaky test is because of these issues: 
[KAFKA-16709](https://issues.apache.org/jira/browse/KAFKA-16709) and 
[KAFKA-16711](https://issues.apache.org/jira/browse/KAFKA-16711). Will work on 
fixing them first.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16399: Add JBOD support in tiered storage [kafka]

2024-05-07 Thread via GitHub


showuon commented on code in PR #15690:
URL: https://github.com/apache/kafka/pull/15690#discussion_r1593390067


##
storage/src/test/java/org/apache/kafka/tiered/storage/utils/TieredStorageTestUtils.java:
##
@@ -55,7 +57,7 @@ public class TieredStorageTestUtils {
 
 // Log cleanup interval is configured to be 500 ms. We need to wait at 
least that amount of time before
 // segments eligible for deletion gets physically removed.
-public static final Integer STORAGE_WAIT_TIMEOUT_SEC = 5;
+public static final Integer STORAGE_WAIT_TIMEOUT_SEC = 10;

Review Comment:
   The test is still flaky. Investigating.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16399: Add JBOD support in tiered storage [kafka]

2024-05-07 Thread via GitHub


showuon commented on code in PR #15690:
URL: https://github.com/apache/kafka/pull/15690#discussion_r1592297330


##
storage/src/test/java/org/apache/kafka/tiered/storage/utils/TieredStorageTestUtils.java:
##
@@ -55,7 +57,7 @@ public class TieredStorageTestUtils {
 
 // Log cleanup interval is configured to be 500 ms. We need to wait at 
least that amount of time before
 // segments eligible for deletion gets physically removed.
-public static final Integer STORAGE_WAIT_TIMEOUT_SEC = 5;
+public static final Integer STORAGE_WAIT_TIMEOUT_SEC = 10;

Review Comment:
   Yes, I think the test is flaky because the CI environment is quite slow, and 
maybe the IO is slower than we thought. From the log I can see, at 
`08:59:27,187`, we copied `0002.log` to remote. And after 1 
second of `0002.log` copied to remote (`08:59:28,300`), it 
timed out and all resources started to get closed.
   
   I've waited 10 seconds for the log deletion to get completed, but obviously 
it's not enough for CI env. I've increased to 20 seconds and see if it fixes 
the issue. I think we've done what we can do to make it faster (i.e. set the 
configs to speed up the tests)
   
   ```
   [2024-05-04 08:59:27,187] INFO [RemoteLogManager=0 
partition=DcnVRVRSQd675ZLtCIn21A:topicB-0] Copied 0002.log to 
remote storage with segment-id: 
RemoteLogSegmentId{topicIdPartition=DcnVRVRSQd675ZLtCIn21A:topicB-0, 
id=gcVp790dRlmFCr_0tN0NTg} (kafka.log.remote.RemoteLogManager$RLMTask:792)
   
   [2024-05-04 08:59:28,300] INFO Closing topic-based RLMM resources 
   [2024-05-04 08:59:28,304] INFO Closing the instance 
(org.apache.kafka.server.log.remote.metadata.storage.ConsumerTask:328)
   [2024-05-04 08:59:28,308] INFO Exited from consumer task thread 
(org.apache.kafka.server.log.remote.metadata.storage.ConsumerTask:151)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16399: Add JBOD support in tiered storage [kafka]

2024-05-06 Thread via GitHub


satishd commented on code in PR #15690:
URL: https://github.com/apache/kafka/pull/15690#discussion_r1590586656


##
storage/src/test/java/org/apache/kafka/tiered/storage/utils/TieredStorageTestUtils.java:
##
@@ -55,7 +57,7 @@ public class TieredStorageTestUtils {
 
 // Log cleanup interval is configured to be 500 ms. We need to wait at 
least that amount of time before
 // segments eligible for deletion gets physically removed.
-public static final Integer STORAGE_WAIT_TIMEOUT_SEC = 5;
+public static final Integer STORAGE_WAIT_TIMEOUT_SEC = 10;

Review Comment:
   Why is it increased to 10 secs?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16399: Add JBOD support in tiered storage [kafka]

2024-05-06 Thread via GitHub


satishd commented on code in PR #15690:
URL: https://github.com/apache/kafka/pull/15690#discussion_r1590586656


##
storage/src/test/java/org/apache/kafka/tiered/storage/utils/TieredStorageTestUtils.java:
##
@@ -55,7 +57,7 @@ public class TieredStorageTestUtils {
 
 // Log cleanup interval is configured to be 500 ms. We need to wait at 
least that amount of time before
 // segments eligible for deletion gets physically removed.
-public static final Integer STORAGE_WAIT_TIMEOUT_SEC = 5;
+public static final Integer STORAGE_WAIT_TIMEOUT_SEC = 10;

Review Comment:
   Why is it increased to 10 secs?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16399: Add JBOD support in tiered storage [kafka]

2024-05-06 Thread via GitHub


kamalcph commented on code in PR #15690:
URL: https://github.com/apache/kafka/pull/15690#discussion_r1590764089


##
storage/src/test/java/org/apache/kafka/tiered/storage/utils/TieredStorageTestUtils.java:
##
@@ -55,7 +57,7 @@ public class TieredStorageTestUtils {
 
 // Log cleanup interval is configured to be 500 ms. We need to wait at 
least that amount of time before
 // segments eligible for deletion gets physically removed.
-public static final Integer STORAGE_WAIT_TIMEOUT_SEC = 5;
+public static final Integer STORAGE_WAIT_TIMEOUT_SEC = 10;

Review Comment:
   The AlterLogDirTest still flaky:
   
   ```
   Build / JDK 17 and Scala 2.13 / 
executeTieredStorageTest(String).quorum=kraft – 
org.apache.kafka.tiered.storage.integration.AlterLogDirTest
   
   Error
   java.lang.AssertionError: [BrokerId=0] The base offset of the first log 
segment of topicB-0 in the log directory is 2 which is smaller than the 
expected offset 3. The directory of topicB-0 is made of the following files: 
0003.timeindex
   0002.log
   0002.timeindex
   leader-epoch-checkpoint
   0003.snapshot
   0002.snapshot
   partition.metadata
   0003.index
   0002.index
   0003.log
   
   Stacktrace
   java.lang.AssertionError: [BrokerId=0] The base offset of the first log 
segment of topicB-0 in the log directory is 2 which is smaller than the 
expected offset 3. The directory of topicB-0 is made of the following files: 
0003.timeindex
   0002.log
   0002.timeindex
   leader-epoch-checkpoint
   0003.snapshot
   0002.snapshot
   partition.metadata
   0003.index
   0002.index
   0003.log
at 
org.apache.kafka.tiered.storage.utils.BrokerLocalStorage.waitForOffset(BrokerLocalStorage.java:129)
at 
org.apache.kafka.tiered.storage.utils.BrokerLocalStorage.waitForEarliestLocalOffset(BrokerLocalStorage.java:86)
at 
org.apache.kafka.tiered.storage.actions.ProduceAction.doExecute(ProduceAction.java:124)
at 
org.apache.kafka.tiered.storage.TieredStorageTestAction.execute(TieredStorageTestAction.java:25)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16399: Add JBOD support in tiered storage [kafka]

2024-05-06 Thread via GitHub


kamalcph commented on code in PR #15690:
URL: https://github.com/apache/kafka/pull/15690#discussion_r1590764089


##
storage/src/test/java/org/apache/kafka/tiered/storage/utils/TieredStorageTestUtils.java:
##
@@ -55,7 +57,7 @@ public class TieredStorageTestUtils {
 
 // Log cleanup interval is configured to be 500 ms. We need to wait at 
least that amount of time before
 // segments eligible for deletion gets physically removed.
-public static final Integer STORAGE_WAIT_TIMEOUT_SEC = 5;
+public static final Integer STORAGE_WAIT_TIMEOUT_SEC = 10;

Review Comment:
   The AlterLogDirTest seems flaky:
   
   ```
   Build / JDK 17 and Scala 2.13 / 
executeTieredStorageTest(String).quorum=kraft – 
org.apache.kafka.tiered.storage.integration.AlterLogDirTest
   
   Error
   java.lang.AssertionError: [BrokerId=0] The base offset of the first log 
segment of topicB-0 in the log directory is 2 which is smaller than the 
expected offset 3. The directory of topicB-0 is made of the following files: 
0003.timeindex
   0002.log
   0002.timeindex
   leader-epoch-checkpoint
   0003.snapshot
   0002.snapshot
   partition.metadata
   0003.index
   0002.index
   0003.log
   
   Stacktrace
   java.lang.AssertionError: [BrokerId=0] The base offset of the first log 
segment of topicB-0 in the log directory is 2 which is smaller than the 
expected offset 3. The directory of topicB-0 is made of the following files: 
0003.timeindex
   0002.log
   0002.timeindex
   leader-epoch-checkpoint
   0003.snapshot
   0002.snapshot
   partition.metadata
   0003.index
   0002.index
   0003.log
at 
org.apache.kafka.tiered.storage.utils.BrokerLocalStorage.waitForOffset(BrokerLocalStorage.java:129)
at 
org.apache.kafka.tiered.storage.utils.BrokerLocalStorage.waitForEarliestLocalOffset(BrokerLocalStorage.java:86)
at 
org.apache.kafka.tiered.storage.actions.ProduceAction.doExecute(ProduceAction.java:124)
at 
org.apache.kafka.tiered.storage.TieredStorageTestAction.execute(TieredStorageTestAction.java:25)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16399: Add JBOD support in tiered storage [kafka]

2024-05-06 Thread via GitHub


kamalcph commented on code in PR #15690:
URL: https://github.com/apache/kafka/pull/15690#discussion_r1590764089


##
storage/src/test/java/org/apache/kafka/tiered/storage/utils/TieredStorageTestUtils.java:
##
@@ -55,7 +57,7 @@ public class TieredStorageTestUtils {
 
 // Log cleanup interval is configured to be 500 ms. We need to wait at 
least that amount of time before
 // segments eligible for deletion gets physically removed.
-public static final Integer STORAGE_WAIT_TIMEOUT_SEC = 5;
+public static final Integer STORAGE_WAIT_TIMEOUT_SEC = 10;

Review Comment:
   The AlterLogDirTest seems flaky:
   
   ```
   Build / JDK 17 and Scala 2.13 / 
executeTieredStorageTest(String).quorum=kraft – 
org.apache.kafka.tiered.storage.integration.AlterLogDirTest
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16399: Add JBOD support in tiered storage [kafka]

2024-05-06 Thread via GitHub


kamalcph commented on code in PR #15690:
URL: https://github.com/apache/kafka/pull/15690#discussion_r1590762158


##
storage/src/test/java/org/apache/kafka/tiered/storage/integration/AlterLogDirTest.java:
##
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tiered.storage.integration;
+
+import org.apache.kafka.tiered.storage.TieredStorageTestBuilder;
+import org.apache.kafka.tiered.storage.specs.KeyValueSpec;
+
+import java.util.Collections;
+import java.util.List;
+
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+
+public final class AlterLogDirTest extends BaseReassignReplicaTest {

Review Comment:
   Can we extend `TieredStorageTestHarness` instead of 
`BaseReassignReplicaTest`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16399: Add JBOD support in tiered storage [kafka]

2024-05-06 Thread via GitHub


kamalcph commented on code in PR #15690:
URL: https://github.com/apache/kafka/pull/15690#discussion_r1590749298


##
storage/src/test/java/org/apache/kafka/tiered/storage/utils/TieredStorageTestUtils.java:
##
@@ -55,7 +57,7 @@ public class TieredStorageTestUtils {
 
 // Log cleanup interval is configured to be 500 ms. We need to wait at 
least that amount of time before
 // segments eligible for deletion gets physically removed.
-public static final Integer STORAGE_WAIT_TIMEOUT_SEC = 5;
+public static final Integer STORAGE_WAIT_TIMEOUT_SEC = 10;

Review Comment:
   To get the context, Why the timeout was increased? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16399: Add JBOD support in tiered storage [kafka]

2024-05-06 Thread via GitHub


satishd commented on code in PR #15690:
URL: https://github.com/apache/kafka/pull/15690#discussion_r1590570923


##
core/src/main/java/kafka/server/TierStateMachine.java:
##
@@ -40,19 +90,176 @@ public interface TierStateMachine {
  */
 PartitionFetchState start(TopicPartition topicPartition,
   PartitionFetchState currentFetchState,
-  PartitionData fetchPartitionData) throws 
Exception;
+  PartitionData fetchPartitionData) throws 
Exception {
+OffsetAndEpoch epochAndLeaderLocalStartOffset = 
leader.fetchEarliestLocalOffset(topicPartition, 
currentFetchState.currentLeaderEpoch());
+int epoch = epochAndLeaderLocalStartOffset.leaderEpoch();
+long leaderLocalStartOffset = epochAndLeaderLocalStartOffset.offset();
+
+long offsetToFetch;
+
replicaMgr.brokerTopicStats().topicStats(topicPartition.topic()).buildRemoteLogAuxStateRequestRate().mark();
+
replicaMgr.brokerTopicStats().allTopicsStats().buildRemoteLogAuxStateRequestRate().mark();
+
+UnifiedLog unifiedLog;
+if (useFutureLog) {
+unifiedLog = replicaMgr.futureLogOrException(topicPartition);
+} else {
+unifiedLog = replicaMgr.localLogOrException(topicPartition);
+}
+
+try {
+offsetToFetch = buildRemoteLogAuxState(topicPartition, 
currentFetchState.currentLeaderEpoch(), leaderLocalStartOffset, epoch, 
fetchPartitionData.logStartOffset(), unifiedLog);
+} catch (RemoteStorageException e) {
+
replicaMgr.brokerTopicStats().topicStats(topicPartition.topic()).failedBuildRemoteLogAuxStateRate().mark();
+
replicaMgr.brokerTopicStats().allTopicsStats().failedBuildRemoteLogAuxStateRate().mark();
+throw e;
+}
+
+OffsetAndEpoch fetchLatestOffsetResult = 
leader.fetchLatestOffset(topicPartition, 
currentFetchState.currentLeaderEpoch());
+long leaderEndOffset = fetchLatestOffsetResult.offset();
+
+long initialLag = leaderEndOffset - offsetToFetch;
+
+return PartitionFetchState.apply(currentFetchState.topicId(), 
offsetToFetch, Option.apply(initialLag), currentFetchState.currentLeaderEpoch(),
+Fetching$.MODULE$, unifiedLog.latestEpoch());
+
+}
+
+private OffsetForLeaderEpochResponseData.EpochEndOffset 
fetchEarlierEpochEndOffset(Integer epoch,
+   
TopicPartition partition,
+   
Integer currentLeaderEpoch) {
+int previousEpoch = epoch - 1;
+
+// Find the end-offset for the epoch earlier to the given epoch from 
the leader
+Map partitionsWithEpochs 
= new HashMap<>();
+partitionsWithEpochs.put(partition, new 
OffsetForLeaderEpochRequestData.OffsetForLeaderPartition().setPartition(partition.partition()).setCurrentLeaderEpoch(currentLeaderEpoch).setLeaderEpoch(previousEpoch));
+Option 
maybeEpochEndOffset = 
leader.fetchEpochEndOffsets(JavaConverters.mapAsScalaMap(partitionsWithEpochs)).get(partition);
+if (maybeEpochEndOffset.isEmpty()) {
+throw new KafkaException("No response received for partition: " + 
partition);
+}
+
+OffsetForLeaderEpochResponseData.EpochEndOffset epochEndOffset = 
maybeEpochEndOffset.get();
+if (epochEndOffset.errorCode() != Errors.NONE.code()) {
+throw Errors.forCode(epochEndOffset.errorCode()).exception();
+}
+
+return epochEndOffset;
+}
+
+private List readLeaderEpochCheckpoint(RemoteLogManager rlm,
+   
RemoteLogSegmentMetadata remoteLogSegmentMetadata) throws IOException, 
RemoteStorageException {
+InputStream inputStream = 
rlm.storageManager().fetchIndex(remoteLogSegmentMetadata, 
RemoteStorageManager.IndexType.LEADER_EPOCH);
+try (BufferedReader bufferedReader = new BufferedReader(new 
InputStreamReader(inputStream, StandardCharsets.UTF_8))) {
+CheckpointFile.CheckpointReadBuffer readBuffer = new 
CheckpointFile.CheckpointReadBuffer<>("", bufferedReader, 0, 
LeaderEpochCheckpointFile.FORMATTER);
+return readBuffer.read();
+}
+}
+
+private void buildProducerSnapshotFile(UnifiedLog unifiedLog,
+   long nextOffset,
+   RemoteLogSegmentMetadata 
remoteLogSegmentMetadata,
+   RemoteLogManager rlm) throws 
IOException, RemoteStorageException {
+// Restore producer snapshot
+File snapshotFile = 
LogFileUtils.producerSnapshotFile(unifiedLog.dir(), nextOffset);
+Path tmpSnapshotFile = Paths.get(snapshotFile.getAbsolutePath() + 
".tmp");
+// Copy it to snapshot file in atomic 

Re: [PR] KAFKA-16399: Add JBOD support in tiered storage [kafka]

2024-05-06 Thread via GitHub


satishd commented on code in PR #15690:
URL: https://github.com/apache/kafka/pull/15690#discussion_r1590570923


##
core/src/main/java/kafka/server/TierStateMachine.java:
##
@@ -40,19 +90,176 @@ public interface TierStateMachine {
  */
 PartitionFetchState start(TopicPartition topicPartition,
   PartitionFetchState currentFetchState,
-  PartitionData fetchPartitionData) throws 
Exception;
+  PartitionData fetchPartitionData) throws 
Exception {
+OffsetAndEpoch epochAndLeaderLocalStartOffset = 
leader.fetchEarliestLocalOffset(topicPartition, 
currentFetchState.currentLeaderEpoch());
+int epoch = epochAndLeaderLocalStartOffset.leaderEpoch();
+long leaderLocalStartOffset = epochAndLeaderLocalStartOffset.offset();
+
+long offsetToFetch;
+
replicaMgr.brokerTopicStats().topicStats(topicPartition.topic()).buildRemoteLogAuxStateRequestRate().mark();
+
replicaMgr.brokerTopicStats().allTopicsStats().buildRemoteLogAuxStateRequestRate().mark();
+
+UnifiedLog unifiedLog;
+if (useFutureLog) {
+unifiedLog = replicaMgr.futureLogOrException(topicPartition);
+} else {
+unifiedLog = replicaMgr.localLogOrException(topicPartition);
+}
+
+try {
+offsetToFetch = buildRemoteLogAuxState(topicPartition, 
currentFetchState.currentLeaderEpoch(), leaderLocalStartOffset, epoch, 
fetchPartitionData.logStartOffset(), unifiedLog);
+} catch (RemoteStorageException e) {
+
replicaMgr.brokerTopicStats().topicStats(topicPartition.topic()).failedBuildRemoteLogAuxStateRate().mark();
+
replicaMgr.brokerTopicStats().allTopicsStats().failedBuildRemoteLogAuxStateRate().mark();
+throw e;
+}
+
+OffsetAndEpoch fetchLatestOffsetResult = 
leader.fetchLatestOffset(topicPartition, 
currentFetchState.currentLeaderEpoch());
+long leaderEndOffset = fetchLatestOffsetResult.offset();
+
+long initialLag = leaderEndOffset - offsetToFetch;
+
+return PartitionFetchState.apply(currentFetchState.topicId(), 
offsetToFetch, Option.apply(initialLag), currentFetchState.currentLeaderEpoch(),
+Fetching$.MODULE$, unifiedLog.latestEpoch());
+
+}
+
+private OffsetForLeaderEpochResponseData.EpochEndOffset 
fetchEarlierEpochEndOffset(Integer epoch,
+   
TopicPartition partition,
+   
Integer currentLeaderEpoch) {
+int previousEpoch = epoch - 1;
+
+// Find the end-offset for the epoch earlier to the given epoch from 
the leader
+Map partitionsWithEpochs 
= new HashMap<>();
+partitionsWithEpochs.put(partition, new 
OffsetForLeaderEpochRequestData.OffsetForLeaderPartition().setPartition(partition.partition()).setCurrentLeaderEpoch(currentLeaderEpoch).setLeaderEpoch(previousEpoch));
+Option 
maybeEpochEndOffset = 
leader.fetchEpochEndOffsets(JavaConverters.mapAsScalaMap(partitionsWithEpochs)).get(partition);
+if (maybeEpochEndOffset.isEmpty()) {
+throw new KafkaException("No response received for partition: " + 
partition);
+}
+
+OffsetForLeaderEpochResponseData.EpochEndOffset epochEndOffset = 
maybeEpochEndOffset.get();
+if (epochEndOffset.errorCode() != Errors.NONE.code()) {
+throw Errors.forCode(epochEndOffset.errorCode()).exception();
+}
+
+return epochEndOffset;
+}
+
+private List readLeaderEpochCheckpoint(RemoteLogManager rlm,
+   
RemoteLogSegmentMetadata remoteLogSegmentMetadata) throws IOException, 
RemoteStorageException {
+InputStream inputStream = 
rlm.storageManager().fetchIndex(remoteLogSegmentMetadata, 
RemoteStorageManager.IndexType.LEADER_EPOCH);
+try (BufferedReader bufferedReader = new BufferedReader(new 
InputStreamReader(inputStream, StandardCharsets.UTF_8))) {
+CheckpointFile.CheckpointReadBuffer readBuffer = new 
CheckpointFile.CheckpointReadBuffer<>("", bufferedReader, 0, 
LeaderEpochCheckpointFile.FORMATTER);
+return readBuffer.read();
+}
+}
+
+private void buildProducerSnapshotFile(UnifiedLog unifiedLog,
+   long nextOffset,
+   RemoteLogSegmentMetadata 
remoteLogSegmentMetadata,
+   RemoteLogManager rlm) throws 
IOException, RemoteStorageException {
+// Restore producer snapshot
+File snapshotFile = 
LogFileUtils.producerSnapshotFile(unifiedLog.dir(), nextOffset);
+Path tmpSnapshotFile = Paths.get(snapshotFile.getAbsolutePath() + 
".tmp");
+// Copy it to snapshot file in atomic 

Re: [PR] KAFKA-16399: Add JBOD support in tiered storage [kafka]

2024-05-06 Thread via GitHub


satishd commented on code in PR #15690:
URL: https://github.com/apache/kafka/pull/15690#discussion_r1590567527


##
core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala:
##
@@ -1858,16 +1858,6 @@ class KafkaConfigTest {
 }
   }
 
-  @Test
-  def testMultipleLogDirectoriesNotSupportedWithRemoteLogStorage(): Unit = {

Review Comment:
   Can we add a positive test for mulitple log dirs also like the one created 
below for a single dir, may be refactor the below test to cover for both 
scenarios?



##
core/src/main/java/kafka/server/TierStateMachine.java:
##
@@ -40,19 +90,176 @@ public interface TierStateMachine {
  */
 PartitionFetchState start(TopicPartition topicPartition,
   PartitionFetchState currentFetchState,
-  PartitionData fetchPartitionData) throws 
Exception;
+  PartitionData fetchPartitionData) throws 
Exception {
+OffsetAndEpoch epochAndLeaderLocalStartOffset = 
leader.fetchEarliestLocalOffset(topicPartition, 
currentFetchState.currentLeaderEpoch());
+int epoch = epochAndLeaderLocalStartOffset.leaderEpoch();
+long leaderLocalStartOffset = epochAndLeaderLocalStartOffset.offset();
+
+long offsetToFetch;
+
replicaMgr.brokerTopicStats().topicStats(topicPartition.topic()).buildRemoteLogAuxStateRequestRate().mark();
+
replicaMgr.brokerTopicStats().allTopicsStats().buildRemoteLogAuxStateRequestRate().mark();
+
+UnifiedLog unifiedLog;
+if (useFutureLog) {
+unifiedLog = replicaMgr.futureLogOrException(topicPartition);
+} else {
+unifiedLog = replicaMgr.localLogOrException(topicPartition);
+}
+
+try {
+offsetToFetch = buildRemoteLogAuxState(topicPartition, 
currentFetchState.currentLeaderEpoch(), leaderLocalStartOffset, epoch, 
fetchPartitionData.logStartOffset(), unifiedLog);
+} catch (RemoteStorageException e) {
+
replicaMgr.brokerTopicStats().topicStats(topicPartition.topic()).failedBuildRemoteLogAuxStateRate().mark();
+
replicaMgr.brokerTopicStats().allTopicsStats().failedBuildRemoteLogAuxStateRate().mark();
+throw e;
+}
+
+OffsetAndEpoch fetchLatestOffsetResult = 
leader.fetchLatestOffset(topicPartition, 
currentFetchState.currentLeaderEpoch());
+long leaderEndOffset = fetchLatestOffsetResult.offset();
+
+long initialLag = leaderEndOffset - offsetToFetch;
+
+return PartitionFetchState.apply(currentFetchState.topicId(), 
offsetToFetch, Option.apply(initialLag), currentFetchState.currentLeaderEpoch(),
+Fetching$.MODULE$, unifiedLog.latestEpoch());
+
+}
+
+private OffsetForLeaderEpochResponseData.EpochEndOffset 
fetchEarlierEpochEndOffset(Integer epoch,
+   
TopicPartition partition,
+   
Integer currentLeaderEpoch) {
+int previousEpoch = epoch - 1;
+
+// Find the end-offset for the epoch earlier to the given epoch from 
the leader
+Map partitionsWithEpochs 
= new HashMap<>();
+partitionsWithEpochs.put(partition, new 
OffsetForLeaderEpochRequestData.OffsetForLeaderPartition().setPartition(partition.partition()).setCurrentLeaderEpoch(currentLeaderEpoch).setLeaderEpoch(previousEpoch));
+Option 
maybeEpochEndOffset = 
leader.fetchEpochEndOffsets(JavaConverters.mapAsScalaMap(partitionsWithEpochs)).get(partition);
+if (maybeEpochEndOffset.isEmpty()) {
+throw new KafkaException("No response received for partition: " + 
partition);
+}
+
+OffsetForLeaderEpochResponseData.EpochEndOffset epochEndOffset = 
maybeEpochEndOffset.get();
+if (epochEndOffset.errorCode() != Errors.NONE.code()) {
+throw Errors.forCode(epochEndOffset.errorCode()).exception();
+}
+
+return epochEndOffset;
+}
+
+private List readLeaderEpochCheckpoint(RemoteLogManager rlm,
+   
RemoteLogSegmentMetadata remoteLogSegmentMetadata) throws IOException, 
RemoteStorageException {
+InputStream inputStream = 
rlm.storageManager().fetchIndex(remoteLogSegmentMetadata, 
RemoteStorageManager.IndexType.LEADER_EPOCH);
+try (BufferedReader bufferedReader = new BufferedReader(new 
InputStreamReader(inputStream, StandardCharsets.UTF_8))) {
+CheckpointFile.CheckpointReadBuffer readBuffer = new 
CheckpointFile.CheckpointReadBuffer<>("", bufferedReader, 0, 
LeaderEpochCheckpointFile.FORMATTER);
+return readBuffer.read();
+}
+}
+
+private void buildProducerSnapshotFile(UnifiedLog unifiedLog,
+   long nextOffset,
+   

Re: [PR] KAFKA-16399: Add JBOD support in tiered storage [kafka]

2024-05-03 Thread via GitHub


soarez commented on PR #15690:
URL: https://github.com/apache/kafka/pull/15690#issuecomment-2092980307

   Thanks for fixing the import. There are some failing tests, please have a 
look.
   
   I think we need to delete 
`kafka.server.KafkaConfigTest#testMultipleLogDirectoriesNotSupportedWithRemoteLogStorage()`.
   There  is also at least another related test failure in `AlterLogDirTest`.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16399: Add JBOD support in tiered storage [kafka]

2024-05-02 Thread via GitHub


kamalcph commented on PR #15690:
URL: https://github.com/apache/kafka/pull/15690#issuecomment-2092058377

   The latest changes lgtm.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16399: Add JBOD support in tiered storage [kafka]

2024-05-02 Thread via GitHub


soarez commented on code in PR #15690:
URL: https://github.com/apache/kafka/pull/15690#discussion_r1587465657


##
core/src/main/java/kafka/server/TierStateMachine.java:
##
@@ -17,15 +17,66 @@
 
 package kafka.server;
 
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.StandardCopyOption;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 import java.util.Optional;

Review Comment:
`:core:checkstyleMain`  is failing because this import is unused
   
   ```
   
   > Task :core:checkstyleMain FAILED
   [ant:checkstyle] [ERROR] 
   (...)/kafka/core/src/main/java/kafka/server/TierStateMachine.java:33:8: 
Unused import - java.util.Optional. [UnusedImports]
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16399: Add JBOD support in tiered storage [kafka]

2024-05-02 Thread via GitHub


showuon commented on code in PR #15690:
URL: https://github.com/apache/kafka/pull/15690#discussion_r1587440706


##
core/src/main/java/kafka/server/TierStateMachine.java:
##
@@ -17,15 +17,69 @@
 
 package kafka.server;
 
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.StandardCopyOption;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 import java.util.Optional;
 
+import kafka.cluster.Partition;
+import kafka.log.UnifiedLog;
+import kafka.log.remote.RemoteLogManager;
+import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.message.FetchResponseData.PartitionData;
+import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData;
+import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.common.CheckpointFile;
+import org.apache.kafka.server.common.OffsetAndEpoch;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageManager;
+import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpointFile;
+import org.apache.kafka.storage.internals.log.EpochEntry;
+import org.apache.kafka.storage.internals.log.LogFileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Option;
+import scala.collection.JavaConverters;
+
+import static 
org.apache.kafka.storage.internals.log.LogStartOffsetIncrementReason.LeaderOffsetIncremented;
 
 /**
- * This interface defines the APIs needed to handle any state transitions 
related to tiering
+ *  This class defines the APIs and implementation needed to handle any state 
transitions related to tiering
+ *
+ *  Currently, the tier state machine follows a synchronous execution, and we 
only need to start the machine.
+ *  There is no need to advance the state.
+ *

Review Comment:
   Updated. Thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16399: Add JBOD support in tiered storage [kafka]

2024-05-02 Thread via GitHub


soarez commented on code in PR #15690:
URL: https://github.com/apache/kafka/pull/15690#discussion_r1587367900


##
core/src/main/java/kafka/server/TierStateMachine.java:
##
@@ -17,15 +17,69 @@
 
 package kafka.server;
 
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.StandardCopyOption;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 import java.util.Optional;
 
+import kafka.cluster.Partition;
+import kafka.log.UnifiedLog;
+import kafka.log.remote.RemoteLogManager;
+import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.message.FetchResponseData.PartitionData;
+import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData;
+import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.common.CheckpointFile;
+import org.apache.kafka.server.common.OffsetAndEpoch;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageManager;
+import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpointFile;
+import org.apache.kafka.storage.internals.log.EpochEntry;
+import org.apache.kafka.storage.internals.log.LogFileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Option;
+import scala.collection.JavaConverters;
+
+import static 
org.apache.kafka.storage.internals.log.LogStartOffsetIncrementReason.LeaderOffsetIncremented;
 
 /**
- * This interface defines the APIs needed to handle any state transitions 
related to tiering
+ *  This class defines the APIs and implementation needed to handle any state 
transitions related to tiering
+ *
+ *  Currently, the tier state machine follows a synchronous execution, and we 
only need to start the machine.
+ *  There is no need to advance the state.
+ *

Review Comment:
   Do we still need this?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16399: Add JBOD support in tiered storage [kafka]

2024-05-02 Thread via GitHub


showuon commented on PR #15690:
URL: https://github.com/apache/kafka/pull/15690#issuecomment-2090048261

   @soarez @kamalcph , thanks for the comments. I've updated the PR in this 
commit: 
https://github.com/apache/kafka/pull/15690/commits/97e2b47c68254928c0baf065eee14a7aeb6e12b2
 .


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16399: Add JBOD support in tiered storage [kafka]

2024-05-02 Thread via GitHub


showuon commented on code in PR #15690:
URL: https://github.com/apache/kafka/pull/15690#discussion_r1587357829


##
core/src/main/java/kafka/server/TierStateMachine.java:
##
@@ -54,5 +180,109 @@ PartitionFetchState start(TopicPartition topicPartition,
  * @return the new PartitionFetchState if the tier state machine was 
advanced, otherwise, return the currentFetchState
  */
 Optional maybeAdvanceState(TopicPartition 
topicPartition,
-PartitionFetchState 
currentFetchState);
+PartitionFetchState 
currentFetchState) {
+// This is currently a no-op but will be used for implementing async 
tiering logic in KAFKA-13560.
+return Optional.of(currentFetchState);
+}
+
+/**
+ * It tries to build the required state for this partition from leader and 
remote storage so that it can start
+ * fetching records from the leader. The return value is the next offset 
to fetch from the leader, which is the
+ * next offset following the end offset of the remote log portion.
+ */
+private Long buildRemoteLogAuxState(TopicPartition topicPartition,
+Integer currentLeaderEpoch,
+Long leaderLocalLogStartOffset,
+Integer 
epochForLeaderLocalLogStartOffset,
+Long leaderLogStartOffset,
+UnifiedLog unifiedLog) throws 
IOException, RemoteStorageException {
+
+long nextOffset;
+
+if (unifiedLog.remoteStorageSystemEnable() && 
unifiedLog.config().remoteStorageEnable()) {
+if (replicaMgr.remoteLogManager().isEmpty()) throw new 
IllegalStateException("RemoteLogManager is not yet instantiated");
+
+RemoteLogManager rlm = replicaMgr.remoteLogManager().get();
+
+// Find the respective leader epoch for (leaderLocalLogStartOffset 
- 1). We need to build the leader epoch cache
+// until that offset
+long previousOffsetToLeaderLocalLogStartOffset = 
leaderLocalLogStartOffset - 1;
+int targetEpoch;
+// If the existing epoch is 0, no need to fetch from earlier epoch 
as the desired offset(leaderLogStartOffset - 1)
+// will have the same epoch.
+if (epochForLeaderLocalLogStartOffset == 0) {
+targetEpoch = epochForLeaderLocalLogStartOffset;
+} else {

Review Comment:
   For this, I'd prefer to keeping the original way, which is more readable to 
me.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16399: Add JBOD support in tiered storage [kafka]

2024-05-02 Thread via GitHub


showuon commented on code in PR #15690:
URL: https://github.com/apache/kafka/pull/15690#discussion_r1587356992


##
core/src/main/java/kafka/server/TierStateMachine.java:
##
@@ -54,5 +180,109 @@ PartitionFetchState start(TopicPartition topicPartition,
  * @return the new PartitionFetchState if the tier state machine was 
advanced, otherwise, return the currentFetchState
  */
 Optional maybeAdvanceState(TopicPartition 
topicPartition,
-PartitionFetchState 
currentFetchState);
+PartitionFetchState 
currentFetchState) {
+// This is currently a no-op but will be used for implementing async 
tiering logic in KAFKA-13560.

Review Comment:
   I think it makes sense we remove this unused placeholder first, and add it 
back when implemented. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16399: Add JBOD support in tiered storage [kafka]

2024-05-02 Thread via GitHub


showuon commented on code in PR #15690:
URL: https://github.com/apache/kafka/pull/15690#discussion_r1587346384


##
storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestHarness.java:
##
@@ -154,7 +154,7 @@ public static List 
remoteStorageManagers(Seq br
 @SuppressWarnings("deprecation")
 public static List localStorages(Seq 
brokers) {
 return JavaConverters.seqAsJavaList(brokers).stream()
-.map(b -> new BrokerLocalStorage(b.config().brokerId(), 
b.config().logDirs().head(),
+.map(b -> new BrokerLocalStorage(b.config().brokerId(), 
JavaConverters.asJava(b.config().logDirs().toSet()),

Review Comment:
   You're right! Thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16399: Add JBOD support in tiered storage [kafka]

2024-05-02 Thread via GitHub


showuon commented on code in PR #15690:
URL: https://github.com/apache/kafka/pull/15690#discussion_r1587318846


##
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:
##
@@ -3287,11 +3287,9 @@ class ReplicaManagerTest {
 val path1 = TestUtils.tempRelativeDir("data").getAbsolutePath
 val path2 = TestUtils.tempRelativeDir("data2").getAbsolutePath
 if (enableRemoteStorage) {

Review Comment:
   You're right!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16399: Add JBOD support in tiered storage [kafka]

2024-05-02 Thread via GitHub


showuon commented on code in PR #15690:
URL: https://github.com/apache/kafka/pull/15690#discussion_r1587154091


##
core/src/main/java/kafka/server/TierStateMachine.java:
##
@@ -54,5 +180,109 @@ PartitionFetchState start(TopicPartition topicPartition,
  * @return the new PartitionFetchState if the tier state machine was 
advanced, otherwise, return the currentFetchState
  */
 Optional maybeAdvanceState(TopicPartition 
topicPartition,
-PartitionFetchState 
currentFetchState);
+PartitionFetchState 
currentFetchState) {
+// This is currently a no-op but will be used for implementing async 
tiering logic in KAFKA-13560.
+return Optional.of(currentFetchState);
+}
+
+/**
+ * It tries to build the required state for this partition from leader and 
remote storage so that it can start
+ * fetching records from the leader. The return value is the next offset 
to fetch from the leader, which is the
+ * next offset following the end offset of the remote log portion.
+ */
+private Long buildRemoteLogAuxState(TopicPartition topicPartition,
+Integer currentLeaderEpoch,
+Long leaderLocalLogStartOffset,
+Integer 
epochForLeaderLocalLogStartOffset,
+Long leaderLogStartOffset,
+UnifiedLog unifiedLog) throws 
IOException, RemoteStorageException {
+
+long nextOffset;
+
+if (unifiedLog.remoteStorageSystemEnable() && 
unifiedLog.config().remoteStorageEnable()) {
+if (replicaMgr.remoteLogManager().isEmpty()) throw new 
IllegalStateException("RemoteLogManager is not yet instantiated");
+
+RemoteLogManager rlm = replicaMgr.remoteLogManager().get();
+
+// Find the respective leader epoch for (leaderLocalLogStartOffset 
- 1). We need to build the leader epoch cache
+// until that offset
+long previousOffsetToLeaderLocalLogStartOffset = 
leaderLocalLogStartOffset - 1;
+int targetEpoch;
+// If the existing epoch is 0, no need to fetch from earlier epoch 
as the desired offset(leaderLogStartOffset - 1)
+// will have the same epoch.
+if (epochForLeaderLocalLogStartOffset == 0) {
+targetEpoch = epochForLeaderLocalLogStartOffset;
+} else {
+// Fetch the earlier epoch/end-offset(exclusive) from the 
leader.
+OffsetForLeaderEpochResponseData.EpochEndOffset 
earlierEpochEndOffset = 
fetchEarlierEpochEndOffset(epochForLeaderLocalLogStartOffset, topicPartition, 
currentLeaderEpoch);
+// Check if the target offset lies within the range of earlier 
epoch. Here, epoch's end-offset is exclusive.
+if (earlierEpochEndOffset.endOffset() > 
previousOffsetToLeaderLocalLogStartOffset) {
+// Always use the leader epoch from returned 
earlierEpochEndOffset.
+// This gives the respective leader epoch, that will 
handle any gaps in epochs.
+// For ex, leader epoch cache contains:
+// leader-epoch   start-offset
+//  0   20
+//  1   85
+//  <2> - gap no messages were appended in this leader 
epoch.
+//  3   90
+//  4   98
+// There is a gap in leader epoch. For 
leaderLocalLogStartOffset as 90, leader-epoch is 3.
+// fetchEarlierEpochEndOffset(2) will return leader-epoch 
as 1, end-offset as 90.
+// So, for offset 89, we should return leader epoch as 1 
like below.
+targetEpoch = earlierEpochEndOffset.leaderEpoch();
+} else {
+targetEpoch = epochForLeaderLocalLogStartOffset;
+}
+}
+
+Optional maybeRlsm = 
rlm.fetchRemoteLogSegmentMetadata(topicPartition, targetEpoch, 
previousOffsetToLeaderLocalLogStartOffset);
+
+if (maybeRlsm.isPresent()) {
+RemoteLogSegmentMetadata remoteLogSegmentMetadata = 
maybeRlsm.get();

Review Comment:
   Nice refactor! Thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16399: Add JBOD support in tiered storage [kafka]

2024-04-29 Thread via GitHub


soarez commented on code in PR #15690:
URL: https://github.com/apache/kafka/pull/15690#discussion_r1582791175


##
core/src/main/java/kafka/server/TierStateMachine.java:
##
@@ -54,5 +180,109 @@ PartitionFetchState start(TopicPartition topicPartition,
  * @return the new PartitionFetchState if the tier state machine was 
advanced, otherwise, return the currentFetchState
  */
 Optional maybeAdvanceState(TopicPartition 
topicPartition,
-PartitionFetchState 
currentFetchState);
+PartitionFetchState 
currentFetchState) {
+// This is currently a no-op but will be used for implementing async 
tiering logic in KAFKA-13560.
+return Optional.of(currentFetchState);
+}
+
+/**
+ * It tries to build the required state for this partition from leader and 
remote storage so that it can start
+ * fetching records from the leader. The return value is the next offset 
to fetch from the leader, which is the
+ * next offset following the end offset of the remote log portion.
+ */
+private Long buildRemoteLogAuxState(TopicPartition topicPartition,
+Integer currentLeaderEpoch,
+Long leaderLocalLogStartOffset,
+Integer 
epochForLeaderLocalLogStartOffset,
+Long leaderLogStartOffset,
+UnifiedLog unifiedLog) throws 
IOException, RemoteStorageException {
+
+long nextOffset;
+
+if (unifiedLog.remoteStorageSystemEnable() && 
unifiedLog.config().remoteStorageEnable()) {
+if (replicaMgr.remoteLogManager().isEmpty()) throw new 
IllegalStateException("RemoteLogManager is not yet instantiated");
+
+RemoteLogManager rlm = replicaMgr.remoteLogManager().get();
+
+// Find the respective leader epoch for (leaderLocalLogStartOffset 
- 1). We need to build the leader epoch cache
+// until that offset
+long previousOffsetToLeaderLocalLogStartOffset = 
leaderLocalLogStartOffset - 1;
+int targetEpoch;
+// If the existing epoch is 0, no need to fetch from earlier epoch 
as the desired offset(leaderLogStartOffset - 1)
+// will have the same epoch.
+if (epochForLeaderLocalLogStartOffset == 0) {
+targetEpoch = epochForLeaderLocalLogStartOffset;
+} else {

Review Comment:
   @kamalcph what specific part of the review are you concerned with? My 
thinking is that a refactoring is already in place with the merge of 
`ReplicaAlterLogDirsTierStateMachine` and `ReplicaFetcherTierStateMachine`. In 
my experience is that it's generally easier to reason about what's happening 
after the code is simplified.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16399: Add JBOD support in tiered storage [kafka]

2024-04-28 Thread via GitHub


kamalcph commented on code in PR #15690:
URL: https://github.com/apache/kafka/pull/15690#discussion_r1582059485


##
core/src/main/java/kafka/server/TierStateMachine.java:
##
@@ -54,5 +180,109 @@ PartitionFetchState start(TopicPartition topicPartition,
  * @return the new PartitionFetchState if the tier state machine was 
advanced, otherwise, return the currentFetchState
  */
 Optional maybeAdvanceState(TopicPartition 
topicPartition,
-PartitionFetchState 
currentFetchState);
+PartitionFetchState 
currentFetchState) {
+// This is currently a no-op but will be used for implementing async 
tiering logic in KAFKA-13560.
+return Optional.of(currentFetchState);
+}
+
+/**
+ * It tries to build the required state for this partition from leader and 
remote storage so that it can start
+ * fetching records from the leader. The return value is the next offset 
to fetch from the leader, which is the
+ * next offset following the end offset of the remote log portion.
+ */
+private Long buildRemoteLogAuxState(TopicPartition topicPartition,
+Integer currentLeaderEpoch,
+Long leaderLocalLogStartOffset,
+Integer 
epochForLeaderLocalLogStartOffset,
+Long leaderLogStartOffset,
+UnifiedLog unifiedLog) throws 
IOException, RemoteStorageException {
+
+long nextOffset;
+
+if (unifiedLog.remoteStorageSystemEnable() && 
unifiedLog.config().remoteStorageEnable()) {
+if (replicaMgr.remoteLogManager().isEmpty()) throw new 
IllegalStateException("RemoteLogManager is not yet instantiated");
+
+RemoteLogManager rlm = replicaMgr.remoteLogManager().get();
+
+// Find the respective leader epoch for (leaderLocalLogStartOffset 
- 1). We need to build the leader epoch cache
+// until that offset
+long previousOffsetToLeaderLocalLogStartOffset = 
leaderLocalLogStartOffset - 1;
+int targetEpoch;
+// If the existing epoch is 0, no need to fetch from earlier epoch 
as the desired offset(leaderLogStartOffset - 1)
+// will have the same epoch.
+if (epochForLeaderLocalLogStartOffset == 0) {
+targetEpoch = epochForLeaderLocalLogStartOffset;
+} else {

Review Comment:
   I would suggest to take the refactoring in the next/separate PR to review 
this PR effectively. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16399: Add JBOD support in tiered storage [kafka]

2024-04-28 Thread via GitHub


kamalcph commented on code in PR #15690:
URL: https://github.com/apache/kafka/pull/15690#discussion_r1582055490


##
storage/src/test/java/org/apache/kafka/tiered/storage/integration/TransactionsWithTieredStoreTest.java:
##
@@ -74,7 +74,7 @@ public Properties topicConfig() {
 public void 
maybeWaitForAtLeastOneSegmentUpload(scala.collection.Seq 
topicPartitions) {
 JavaConverters.seqAsJavaList(topicPartitions).forEach(topicPartition 
-> {
 List localStorages = 
JavaConverters.bufferAsJavaList(brokers()).stream()
-.map(b -> new BrokerLocalStorage(b.config().brokerId(), 
b.config().logDirs().head(), STORAGE_WAIT_TIMEOUT_SEC))
+.map(b -> new BrokerLocalStorage(b.config().brokerId(), 
JavaConverters.asJava(b.config().logDirs().toSet()), STORAGE_WAIT_TIMEOUT_SEC))

Review Comment:
   ditto



##
storage/src/test/java/org/apache/kafka/tiered/storage/utils/BrokerLocalStorage.java:
##
@@ -31,31 +31,36 @@
 import java.util.List;
 import java.util.Objects;
 import java.util.Optional;
+import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
 public final class BrokerLocalStorage {
 
 private final Integer brokerId;
-private final File brokerStorageDirectory;
+private final Set brokerStorageDirectorys;

Review Comment:
   nit: `brokerStorageDirectorys` -> `brokerStorageDirectories`
   



##
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:
##
@@ -3287,11 +3287,9 @@ class ReplicaManagerTest {
 val path1 = TestUtils.tempRelativeDir("data").getAbsolutePath
 val path2 = TestUtils.tempRelativeDir("data2").getAbsolutePath
 if (enableRemoteStorage) {

Review Comment:
   nit: do we need this `if` check?



##
storage/src/test/java/org/apache/kafka/tiered/storage/utils/BrokerLocalStorage.java:
##
@@ -141,7 +146,11 @@ private boolean 
isOffsetPresentInFirstLocalSegment(TopicPartition topicPartition
 if (offsetToSearch.equals(firstLogFileBaseOffset)) {
 return true;
 }
-File partitionDir = new File(brokerStorageDirectory.getAbsolutePath(), 
topicPartition.toString());
+File partitionDir = brokerStorageDirectorys.stream()
+.filter(dir -> dirContainsTopicPartition(topicPartition, dir))
+.findFirst()
+.orElseThrow(() -> new 
IllegalArgumentException(String.format("[BrokerId=%d] Directory for the 
topic-partition %s " +
+"was not found", brokerId, topicPartition)));

Review Comment:
   previously, we were returning the `partitionDir` instead of  `logDir`:
   
   ```suggestion
   File logDir = brokerStorageDirectorys.stream()
   .filter(dir -> dirContainsTopicPartition(topicPartition, 
dir))
   .findFirst()
   .orElseThrow(() -> new 
IllegalArgumentException(String.format("[BrokerId=%d] Directory for the 
topic-partition %s " +
   "was not found", brokerId, topicPartition)));
   File partitionDir = new File(logDir.getAbsolutePath(), 
topicPartition.toString());
   ```



##
storage/src/test/java/org/apache/kafka/tiered/storage/utils/BrokerLocalStorage.java:
##
@@ -31,31 +31,36 @@
 import java.util.List;
 import java.util.Objects;
 import java.util.Optional;
+import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
 public final class BrokerLocalStorage {
 
 private final Integer brokerId;
-private final File brokerStorageDirectory;
+private final Set brokerStorageDirectorys;
 private final Integer storageWaitTimeoutSec;
 
 private final int storagePollPeriodSec = 1;
 private final Time time = Time.SYSTEM;
 
 public BrokerLocalStorage(Integer brokerId,
-  String storageDirname,
+  Set storageDirnames,
   Integer storageWaitTimeoutSec) {
 this.brokerId = brokerId;
-this.brokerStorageDirectory = new File(storageDirname);
+this.brokerStorageDirectorys = 
storageDirnames.stream().map(File::new).collect(Collectors.toSet());
 this.storageWaitTimeoutSec = storageWaitTimeoutSec;
 }
 
 public Integer getBrokerId() {
 return brokerId;
 }
 
+public Set getBrokerStorageDirectory() {

Review Comment:
   rename `getBrokerStorageDirectory` -> `getBrokerStorageDirectories`



##
storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestBuilder.java:
##
@@ -313,6 +314,14 @@ public TieredStorageTestBuilder reassignReplica(String 
topic,
 return this;
 }
 
+public TieredStorageTestBuilder alterLogDir(String topic,
+Integer partition,

Review Comment:
   nit: parameter alignment




Re: [PR] KAFKA-16399: Add JBOD support in tiered storage [kafka]

2024-04-26 Thread via GitHub


soarez commented on code in PR #15690:
URL: https://github.com/apache/kafka/pull/15690#discussion_r1580567800


##
core/src/main/java/kafka/server/TierStateMachine.java:
##
@@ -54,5 +180,109 @@ PartitionFetchState start(TopicPartition topicPartition,
  * @return the new PartitionFetchState if the tier state machine was 
advanced, otherwise, return the currentFetchState
  */
 Optional maybeAdvanceState(TopicPartition 
topicPartition,
-PartitionFetchState 
currentFetchState);
+PartitionFetchState 
currentFetchState) {
+// This is currently a no-op but will be used for implementing async 
tiering logic in KAFKA-13560.
+return Optional.of(currentFetchState);
+}
+
+/**
+ * It tries to build the required state for this partition from leader and 
remote storage so that it can start
+ * fetching records from the leader. The return value is the next offset 
to fetch from the leader, which is the
+ * next offset following the end offset of the remote log portion.
+ */
+private Long buildRemoteLogAuxState(TopicPartition topicPartition,
+Integer currentLeaderEpoch,
+Long leaderLocalLogStartOffset,
+Integer 
epochForLeaderLocalLogStartOffset,
+Long leaderLogStartOffset,
+UnifiedLog unifiedLog) throws 
IOException, RemoteStorageException {
+
+long nextOffset;
+
+if (unifiedLog.remoteStorageSystemEnable() && 
unifiedLog.config().remoteStorageEnable()) {
+if (replicaMgr.remoteLogManager().isEmpty()) throw new 
IllegalStateException("RemoteLogManager is not yet instantiated");
+
+RemoteLogManager rlm = replicaMgr.remoteLogManager().get();
+
+// Find the respective leader epoch for (leaderLocalLogStartOffset 
- 1). We need to build the leader epoch cache
+// until that offset
+long previousOffsetToLeaderLocalLogStartOffset = 
leaderLocalLogStartOffset - 1;
+int targetEpoch;
+// If the existing epoch is 0, no need to fetch from earlier epoch 
as the desired offset(leaderLogStartOffset - 1)
+// will have the same epoch.
+if (epochForLeaderLocalLogStartOffset == 0) {
+targetEpoch = epochForLeaderLocalLogStartOffset;
+} else {

Review Comment:
   This can also be simplified:
   
   ```java
   int targetEpoch = epochForLeaderLocalLogStartOffset;
   // If the existing epoch is 0, no need to fetch from earlier epoch as the
   // desired offset (leaderLogStartOffset - 1) will have the same epoch.
   if (epochForLeaderLocalLogStartOffset != 0) {
   ...
   }
   ```



##
core/src/main/java/kafka/server/TierStateMachine.java:
##
@@ -54,5 +180,109 @@ PartitionFetchState start(TopicPartition topicPartition,
  * @return the new PartitionFetchState if the tier state machine was 
advanced, otherwise, return the currentFetchState
  */
 Optional maybeAdvanceState(TopicPartition 
topicPartition,
-PartitionFetchState 
currentFetchState);
+PartitionFetchState 
currentFetchState) {
+// This is currently a no-op but will be used for implementing async 
tiering logic in KAFKA-13560.
+return Optional.of(currentFetchState);
+}
+
+/**
+ * It tries to build the required state for this partition from leader and 
remote storage so that it can start
+ * fetching records from the leader. The return value is the next offset 
to fetch from the leader, which is the
+ * next offset following the end offset of the remote log portion.
+ */
+private Long buildRemoteLogAuxState(TopicPartition topicPartition,
+Integer currentLeaderEpoch,
+Long leaderLocalLogStartOffset,
+Integer 
epochForLeaderLocalLogStartOffset,
+Long leaderLogStartOffset,
+UnifiedLog unifiedLog) throws 
IOException, RemoteStorageException {
+
+long nextOffset;
+
+if (unifiedLog.remoteStorageSystemEnable() && 
unifiedLog.config().remoteStorageEnable()) {
+if (replicaMgr.remoteLogManager().isEmpty()) throw new 
IllegalStateException("RemoteLogManager is not yet instantiated");
+
+RemoteLogManager rlm = replicaMgr.remoteLogManager().get();
+
+// Find the respective leader epoch for (leaderLocalLogStartOffset 
- 1). We need to build the leader epoch cache
+// until that offset
+long previousOffsetToLeaderLocalLogStartOffset = 

Re: [PR] KAFKA:-16399: Add JBOD support in tiered storage [kafka]

2024-04-25 Thread via GitHub


showuon commented on PR #15690:
URL: https://github.com/apache/kafka/pull/15690#issuecomment-2076987711

   @mimaison @soarez , PR updated. Please take a look when available. Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA:-16399: Add JBOD support in tiered storage [kafka]

2024-04-25 Thread via GitHub


showuon commented on code in PR #15690:
URL: https://github.com/apache/kafka/pull/15690#discussion_r1579328510


##
storage/src/test/java/org/apache/kafka/tiered/storage/actions/AlterLogDirAction.java:
##
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tiered.storage.actions;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.TopicPartitionReplica;
+import org.apache.kafka.test.TestUtils;
+import org.apache.kafka.tiered.storage.TieredStorageTestAction;
+import org.apache.kafka.tiered.storage.TieredStorageTestContext;
+import org.apache.kafka.tiered.storage.utils.BrokerLocalStorage;
+
+import java.io.File;
+import java.io.PrintStream;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ExecutionException;
+
+public final class AlterLogDirAction implements TieredStorageTestAction {
+
+private final TopicPartition topicPartition;
+private final int brokerId;
+
+public AlterLogDirAction(TopicPartition topicPartition,
+ int brokerId) {
+this.topicPartition = topicPartition;
+this.brokerId = brokerId;
+}
+
+@Override
+public void doExecute(TieredStorageTestContext context) throws 
InterruptedException, ExecutionException {
+Optional localStorage = 
context.localStorages().stream().filter(storage -> 
storage.getBrokerId().intValue() == brokerId).findFirst();
+if (!localStorage.isPresent()) {
+throw new IllegalArgumentException("cannot find local storage for 
this topic partition:" + topicPartition + " in this broker id:" + brokerId);
+}
+
+Optional sourceDir = 
localStorage.get().getBrokerStorageDirectory().stream().filter(dir -> 
localStorage.get().isTopicPartitionFileExistInDir(topicPartition, 
dir)).findFirst();
+Optional targetDir = 
localStorage.get().getBrokerStorageDirectory().stream().filter(dir -> 
!localStorage.get().isTopicPartitionFileExistInDir(topicPartition, 
dir)).findFirst();
+if (!sourceDir.isPresent()) {
+throw new IllegalArgumentException("No log dir with topic 
partition:" + topicPartition + " in this broker id:" + brokerId);
+}
+
+if (!targetDir.isPresent()) {
+throw new IllegalArgumentException("No log dir without topic 
partition:" + topicPartition + " in this broker id:" + brokerId);
+}
+
+// build alterReplicaLogDirs request content to move from sourceDir to 
targetDir
+Map logDirs = 
Collections.singletonMap(new TopicPartitionReplica(topicPartition.topic(), 
topicPartition.partition(), brokerId), targetDir.get().getAbsolutePath());
+
+context.admin().alterReplicaLogDirs(logDirs);

Review Comment:
   Nice suggestion. Updated.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA:-16399: Add JBOD support in tiered storage [kafka]

2024-04-25 Thread via GitHub


showuon commented on code in PR #15690:
URL: https://github.com/apache/kafka/pull/15690#discussion_r1579328199


##
core/src/main/java/kafka/server/TierStateMachine.java:
##
@@ -40,7 +92,72 @@ public interface TierStateMachine {
  */
 PartitionFetchState start(TopicPartition topicPartition,
   PartitionFetchState currentFetchState,
-  PartitionData fetchPartitionData) throws 
Exception;
+  PartitionData fetchPartitionData) throws 
Exception {
+OffsetAndEpoch epochAndLeaderLocalStartOffset = 
leader.fetchEarliestLocalOffset(topicPartition, 
currentFetchState.currentLeaderEpoch());
+int epoch = epochAndLeaderLocalStartOffset.leaderEpoch();
+long leaderLocalStartOffset = epochAndLeaderLocalStartOffset.offset();
+
+long offsetToFetch;
+
replicaMgr.brokerTopicStats().topicStats(topicPartition.topic()).buildRemoteLogAuxStateRequestRate().mark();
+
replicaMgr.brokerTopicStats().allTopicsStats().buildRemoteLogAuxStateRequestRate().mark();
+
+try {
+offsetToFetch = buildRemoteLogAuxState(topicPartition, 
currentFetchState.currentLeaderEpoch(), leaderLocalStartOffset, epoch, 
fetchPartitionData.logStartOffset());
+} catch (RemoteStorageException e) {
+
replicaMgr.brokerTopicStats().topicStats(topicPartition.topic()).failedBuildRemoteLogAuxStateRate().mark();
+
replicaMgr.brokerTopicStats().allTopicsStats().failedBuildRemoteLogAuxStateRate().mark();
+throw e;
+}
+
+OffsetAndEpoch fetchLatestOffsetResult = 
leader.fetchLatestOffset(topicPartition, 
currentFetchState.currentLeaderEpoch());
+long leaderEndOffset = fetchLatestOffsetResult.offset();
+
+long initialLag = leaderEndOffset - offsetToFetch;
+
+return PartitionFetchState.apply(currentFetchState.topicId(), 
offsetToFetch, Option.apply(initialLag), currentFetchState.currentLeaderEpoch(),
+Fetching$.MODULE$, 
replicaMgr.localLogOrException(topicPartition).latestEpoch());

Review Comment:
   I've checked and because the we've done truncation earlier, so we'll return 
fetching state. And in fetching state, the `latestEpoch` is ignore. The 
`latestEpoch` is only used when doing truncation. But I still change it to the 
correct one in case we use that in the future.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA:-16399: Add JBOD support in tiered storage [kafka]

2024-04-25 Thread via GitHub


showuon commented on code in PR #15690:
URL: https://github.com/apache/kafka/pull/15690#discussion_r1579325677


##
core/src/test/scala/unit/kafka/server/TierStateMachineTest.scala:
##
@@ -153,12 +157,13 @@ class ReplicaFetcherTierStateMachineTest {
 assertEquals(11L, replicaState.logEndOffset)
   }
 
-  @Test
-  def testFencedOffsetResetAfterMovedToRemoteTier(): Unit = {
+  @ParameterizedTest
+  @ArgumentsSource(classOf[TierStateMachineTest.Params])
+  def testFencedOffsetResetAfterMovedToRemoteTier(truncateOnFetch: Boolean, 
useFutureLog: Boolean): Unit = {
 val partition = new TopicPartition("topic", 0)
 var isErrorHandled = false
 val mockLeaderEndpoint = new MockLeaderEndPoint(truncateOnFetch = 
truncateOnFetch, version = version)
-val mockTierStateMachine = new MockTierStateMachine(mockLeaderEndpoint) {
+val mockTierStateMachine = new MockTierStateMachine(mockLeaderEndpoint, 
useFutureLog) {

Review Comment:
   Unfortunately, you're correct! Removed the `useFutureLog`. I'll think about 
how to do unit test for it. Thanks



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA:-16399: Add JBOD support in tiered storage [kafka]

2024-04-25 Thread via GitHub


showuon commented on code in PR #15690:
URL: https://github.com/apache/kafka/pull/15690#discussion_r1579212500


##
storage/src/test/java/org/apache/kafka/tiered/storage/actions/AlterLogDirAction.java:
##
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tiered.storage.actions;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.TopicPartitionReplica;
+import org.apache.kafka.test.TestUtils;
+import org.apache.kafka.tiered.storage.TieredStorageTestAction;
+import org.apache.kafka.tiered.storage.TieredStorageTestContext;
+import org.apache.kafka.tiered.storage.utils.BrokerLocalStorage;
+
+import java.io.File;
+import java.io.PrintStream;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ExecutionException;
+
+public final class AlterLogDirAction implements TieredStorageTestAction {
+
+private final TopicPartition topicPartition;
+private final int brokerId;
+
+public AlterLogDirAction(TopicPartition topicPartition,
+ int brokerId) {
+this.topicPartition = topicPartition;
+this.brokerId = brokerId;
+}
+
+@Override
+public void doExecute(TieredStorageTestContext context) throws 
InterruptedException, ExecutionException {
+Optional localStorage = 
context.localStorages().stream().filter(storage -> 
storage.getBrokerId().intValue() == brokerId).findFirst();
+if (!localStorage.isPresent()) {
+throw new IllegalArgumentException("cannot find local storage for 
this topic partition:" + topicPartition + " in this broker id:" + brokerId);
+}
+
+Optional sourceDir = 
localStorage.get().getBrokerStorageDirectory().stream().filter(dir -> 
localStorage.get().isTopicPartitionFileExistInDir(topicPartition, 
dir)).findFirst();
+Optional targetDir = 
localStorage.get().getBrokerStorageDirectory().stream().filter(dir -> 
!localStorage.get().isTopicPartitionFileExistInDir(topicPartition, 
dir)).findFirst();
+if (!sourceDir.isPresent()) {
+throw new IllegalArgumentException("No log dir with topic 
partition:" + topicPartition + " in this broker id:" + brokerId);
+}
+
+if (!targetDir.isPresent()) {
+throw new IllegalArgumentException("No log dir without topic 
partition:" + topicPartition + " in this broker id:" + brokerId);
+}
+
+// build alterReplicaLogDirs request content to move from sourceDir to 
targetDir
+Map logDirs = 
Collections.singletonMap(new TopicPartitionReplica(topicPartition.topic(), 
topicPartition.partition(), brokerId), targetDir.get().getAbsolutePath());
+
+context.admin().alterReplicaLogDirs(logDirs);
+
+// wait until the topic partition folder disappearing from source dir 
and appearing in the target dir
+TestUtils.waitForCondition(() -> 
localStorage.get().isTopicPartitionFileExistInDir(topicPartition, 
targetDir.get()) &&
+
!localStorage.get().isTopicPartitionFileExistInDir(topicPartition, 
sourceDir.get()),
+"Failed to alter dir:" + logDirs);
+}
+
+@Override
+public void describe(PrintStream output) {
+output.print("alter di for topic partition:" + topicPartition + " in 
this broker id:" + brokerId);

Review Comment:
   Ah, nice catch!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA:-16399: Add JBOD support in tiered storage [kafka]

2024-04-15 Thread via GitHub


showuon commented on code in PR #15690:
URL: https://github.com/apache/kafka/pull/15690#discussion_r1565635052


##
core/src/main/java/kafka/server/TierStateMachine.java:
##
@@ -40,7 +92,72 @@ public interface TierStateMachine {
  */
 PartitionFetchState start(TopicPartition topicPartition,
   PartitionFetchState currentFetchState,
-  PartitionData fetchPartitionData) throws 
Exception;
+  PartitionData fetchPartitionData) throws 
Exception {
+OffsetAndEpoch epochAndLeaderLocalStartOffset = 
leader.fetchEarliestLocalOffset(topicPartition, 
currentFetchState.currentLeaderEpoch());
+int epoch = epochAndLeaderLocalStartOffset.leaderEpoch();
+long leaderLocalStartOffset = epochAndLeaderLocalStartOffset.offset();
+
+long offsetToFetch;
+
replicaMgr.brokerTopicStats().topicStats(topicPartition.topic()).buildRemoteLogAuxStateRequestRate().mark();
+
replicaMgr.brokerTopicStats().allTopicsStats().buildRemoteLogAuxStateRequestRate().mark();
+
+try {
+offsetToFetch = buildRemoteLogAuxState(topicPartition, 
currentFetchState.currentLeaderEpoch(), leaderLocalStartOffset, epoch, 
fetchPartitionData.logStartOffset());
+} catch (RemoteStorageException e) {
+
replicaMgr.brokerTopicStats().topicStats(topicPartition.topic()).failedBuildRemoteLogAuxStateRate().mark();
+
replicaMgr.brokerTopicStats().allTopicsStats().failedBuildRemoteLogAuxStateRate().mark();
+throw e;
+}
+
+OffsetAndEpoch fetchLatestOffsetResult = 
leader.fetchLatestOffset(topicPartition, 
currentFetchState.currentLeaderEpoch());
+long leaderEndOffset = fetchLatestOffsetResult.offset();
+
+long initialLag = leaderEndOffset - offsetToFetch;
+
+return PartitionFetchState.apply(currentFetchState.topicId(), 
offsetToFetch, Option.apply(initialLag), currentFetchState.currentLeaderEpoch(),
+Fetching$.MODULE$, 
replicaMgr.localLogOrException(topicPartition).latestEpoch());

Review Comment:
   Ah, nice catch! Interestingly there's no test caught this error. Let me 
write a test for it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA:-16399: Add JBOD support in tiered storage [kafka]

2024-04-11 Thread via GitHub


mimaison commented on code in PR #15690:
URL: https://github.com/apache/kafka/pull/15690#discussion_r1560966402


##
storage/src/test/java/org/apache/kafka/tiered/storage/actions/AlterLogDirAction.java:
##
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tiered.storage.actions;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.TopicPartitionReplica;
+import org.apache.kafka.test.TestUtils;
+import org.apache.kafka.tiered.storage.TieredStorageTestAction;
+import org.apache.kafka.tiered.storage.TieredStorageTestContext;
+import org.apache.kafka.tiered.storage.utils.BrokerLocalStorage;
+
+import java.io.File;
+import java.io.PrintStream;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ExecutionException;
+
+public final class AlterLogDirAction implements TieredStorageTestAction {
+
+private final TopicPartition topicPartition;
+private final int brokerId;
+
+public AlterLogDirAction(TopicPartition topicPartition,
+ int brokerId) {
+this.topicPartition = topicPartition;
+this.brokerId = brokerId;
+}
+
+@Override
+public void doExecute(TieredStorageTestContext context) throws 
InterruptedException, ExecutionException {
+Optional localStorage = 
context.localStorages().stream().filter(storage -> 
storage.getBrokerId().intValue() == brokerId).findFirst();
+if (!localStorage.isPresent()) {
+throw new IllegalArgumentException("cannot find local storage for 
this topic partition:" + topicPartition + " in this broker id:" + brokerId);
+}
+
+Optional sourceDir = 
localStorage.get().getBrokerStorageDirectory().stream().filter(dir -> 
localStorage.get().isTopicPartitionFileExistInDir(topicPartition, 
dir)).findFirst();
+Optional targetDir = 
localStorage.get().getBrokerStorageDirectory().stream().filter(dir -> 
!localStorage.get().isTopicPartitionFileExistInDir(topicPartition, 
dir)).findFirst();
+if (!sourceDir.isPresent()) {

Review Comment:
   Should we move this before the computation of `targetDir`?



##
storage/src/test/java/org/apache/kafka/tiered/storage/actions/AlterLogDirAction.java:
##
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tiered.storage.actions;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.TopicPartitionReplica;
+import org.apache.kafka.test.TestUtils;
+import org.apache.kafka.tiered.storage.TieredStorageTestAction;
+import org.apache.kafka.tiered.storage.TieredStorageTestContext;
+import org.apache.kafka.tiered.storage.utils.BrokerLocalStorage;
+
+import java.io.File;
+import java.io.PrintStream;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ExecutionException;
+
+public final class AlterLogDirAction implements TieredStorageTestAction {
+
+private final TopicPartition topicPartition;
+private final int brokerId;
+
+public AlterLogDirAction(TopicPartition topicPartition,
+ int brokerId) {
+this.topicPartition = topicPartition;
+this.brokerId = brokerId;
+}
+
+@Override
+public void doExecute(TieredStorageTestContext context) throws 
InterruptedException, ExecutionException {
+

Re: [PR] KAFKA:-16399: Add JBOD support in tiered storage [kafka]

2024-04-10 Thread via GitHub


soarez commented on code in PR #15690:
URL: https://github.com/apache/kafka/pull/15690#discussion_r1560139446


##
core/src/main/java/kafka/server/TierStateMachine.java:
##
@@ -40,7 +92,72 @@ public interface TierStateMachine {
  */
 PartitionFetchState start(TopicPartition topicPartition,
   PartitionFetchState currentFetchState,
-  PartitionData fetchPartitionData) throws 
Exception;
+  PartitionData fetchPartitionData) throws 
Exception {
+OffsetAndEpoch epochAndLeaderLocalStartOffset = 
leader.fetchEarliestLocalOffset(topicPartition, 
currentFetchState.currentLeaderEpoch());
+int epoch = epochAndLeaderLocalStartOffset.leaderEpoch();
+long leaderLocalStartOffset = epochAndLeaderLocalStartOffset.offset();
+
+long offsetToFetch;
+
replicaMgr.brokerTopicStats().topicStats(topicPartition.topic()).buildRemoteLogAuxStateRequestRate().mark();
+
replicaMgr.brokerTopicStats().allTopicsStats().buildRemoteLogAuxStateRequestRate().mark();
+
+try {
+offsetToFetch = buildRemoteLogAuxState(topicPartition, 
currentFetchState.currentLeaderEpoch(), leaderLocalStartOffset, epoch, 
fetchPartitionData.logStartOffset());
+} catch (RemoteStorageException e) {
+
replicaMgr.brokerTopicStats().topicStats(topicPartition.topic()).failedBuildRemoteLogAuxStateRate().mark();
+
replicaMgr.brokerTopicStats().allTopicsStats().failedBuildRemoteLogAuxStateRate().mark();
+throw e;
+}
+
+OffsetAndEpoch fetchLatestOffsetResult = 
leader.fetchLatestOffset(topicPartition, 
currentFetchState.currentLeaderEpoch());
+long leaderEndOffset = fetchLatestOffsetResult.offset();
+
+long initialLag = leaderEndOffset - offsetToFetch;
+
+return PartitionFetchState.apply(currentFetchState.topicId(), 
offsetToFetch, Option.apply(initialLag), currentFetchState.currentLeaderEpoch(),
+Fetching$.MODULE$, 
replicaMgr.localLogOrException(topicPartition).latestEpoch());

Review Comment:
   Should `replicaMgr.futureLogOrException` be used instead, if `useFutureLog`?



##
storage/src/test/java/org/apache/kafka/tiered/storage/utils/BrokerLocalStorage.java:
##
@@ -175,8 +186,38 @@ private OffsetHolder getEarliestLocalOffset(TopicPartition 
topicPartition) {
 return new 
OffsetHolder(LogFileUtils.offsetFromFileName(firstLogFile.get()), 
partitionFiles);
 }
 
-private List getTopicPartitionFiles(TopicPartition topicPartition) 
{
-File[] files = brokerStorageDirectory.listFiles((dir, name) -> 
name.equals(topicPartition.toString()));
+public boolean isTopicPartitionFileExistInDir(TopicPartition 
topicPartition, File logDir) {

Review Comment:
   Maybe `dirContainsTopicPartition` is a better name for this method?



##
storage/src/test/java/org/apache/kafka/tiered/storage/utils/BrokerLocalStorage.java:
##
@@ -31,31 +31,36 @@
 import java.util.List;
 import java.util.Objects;
 import java.util.Optional;
+import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
 public final class BrokerLocalStorage {
 
 private final Integer brokerId;
-private final File brokerStorageDirectory;
+private final Set brokerStorageDirectory;
 private final Integer storageWaitTimeoutSec;
 
 private final int storagePollPeriodSec = 1;
 private final Time time = Time.SYSTEM;
 
 public BrokerLocalStorage(Integer brokerId,
-  String storageDirname,
+  Set storageDirname,
   Integer storageWaitTimeoutSec) {
 this.brokerId = brokerId;
-this.brokerStorageDirectory = new File(storageDirname);
+this.brokerStorageDirectory = 
storageDirname.stream().map(File::new).collect(Collectors.toSet());

Review Comment:
   The names for parameter `storageDirname` and field `brokerStorageDirectory` 
should be pluralized.



##
core/src/test/scala/unit/kafka/server/TierStateMachineTest.scala:
##
@@ -23,20 +23,23 @@ import org.apache.kafka.common.protocol.ApiKeys
 import org.apache.kafka.common.record._
 import org.apache.kafka.common.{TopicPartition, Uuid}
 import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.Test
 import kafka.server.FetcherThreadTestUtils.{initialFetchState, mkBatch}
+import org.junit.jupiter.api.extension.ExtensionContext
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.{Arguments, ArgumentsProvider, 
ArgumentsSource}
 
 import scala.collection.Map
 
-class ReplicaFetcherTierStateMachineTest {
+class TierStateMachineTest {
 
-  val truncateOnFetch = true
+  val truncateOnFetch = false

Review Comment:
   This can be removed



##