kamalcph commented on code in PR #15690: URL: https://github.com/apache/kafka/pull/15690#discussion_r1582055490
########## storage/src/test/java/org/apache/kafka/tiered/storage/integration/TransactionsWithTieredStoreTest.java: ########## @@ -74,7 +74,7 @@ public Properties topicConfig() { public void maybeWaitForAtLeastOneSegmentUpload(scala.collection.Seq<TopicPartition> topicPartitions) { JavaConverters.seqAsJavaList(topicPartitions).forEach(topicPartition -> { List<BrokerLocalStorage> localStorages = JavaConverters.bufferAsJavaList(brokers()).stream() - .map(b -> new BrokerLocalStorage(b.config().brokerId(), b.config().logDirs().head(), STORAGE_WAIT_TIMEOUT_SEC)) + .map(b -> new BrokerLocalStorage(b.config().brokerId(), JavaConverters.asJava(b.config().logDirs().toSet()), STORAGE_WAIT_TIMEOUT_SEC)) Review Comment: ditto ########## storage/src/test/java/org/apache/kafka/tiered/storage/utils/BrokerLocalStorage.java: ########## @@ -31,31 +31,36 @@ import java.util.List; import java.util.Objects; import java.util.Optional; +import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.function.Function; import java.util.stream.Collectors; public final class BrokerLocalStorage { private final Integer brokerId; - private final File brokerStorageDirectory; + private final Set<File> brokerStorageDirectorys; Review Comment: nit: `brokerStorageDirectorys` -> `brokerStorageDirectories` ########## core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala: ########## @@ -3287,11 +3287,9 @@ class ReplicaManagerTest { val path1 = TestUtils.tempRelativeDir("data").getAbsolutePath val path2 = TestUtils.tempRelativeDir("data2").getAbsolutePath if (enableRemoteStorage) { Review Comment: nit: do we need this `if` check? ########## storage/src/test/java/org/apache/kafka/tiered/storage/utils/BrokerLocalStorage.java: ########## @@ -141,7 +146,11 @@ private boolean isOffsetPresentInFirstLocalSegment(TopicPartition topicPartition if (offsetToSearch.equals(firstLogFileBaseOffset)) { return true; } - File partitionDir = new File(brokerStorageDirectory.getAbsolutePath(), topicPartition.toString()); + File partitionDir = brokerStorageDirectorys.stream() + .filter(dir -> dirContainsTopicPartition(topicPartition, dir)) + .findFirst() + .orElseThrow(() -> new IllegalArgumentException(String.format("[BrokerId=%d] Directory for the topic-partition %s " + + "was not found", brokerId, topicPartition))); Review Comment: previously, we were returning the `partitionDir` instead of `logDir`: ```suggestion File logDir = brokerStorageDirectorys.stream() .filter(dir -> dirContainsTopicPartition(topicPartition, dir)) .findFirst() .orElseThrow(() -> new IllegalArgumentException(String.format("[BrokerId=%d] Directory for the topic-partition %s " + "was not found", brokerId, topicPartition))); File partitionDir = new File(logDir.getAbsolutePath(), topicPartition.toString()); ``` ########## storage/src/test/java/org/apache/kafka/tiered/storage/utils/BrokerLocalStorage.java: ########## @@ -31,31 +31,36 @@ import java.util.List; import java.util.Objects; import java.util.Optional; +import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.function.Function; import java.util.stream.Collectors; public final class BrokerLocalStorage { private final Integer brokerId; - private final File brokerStorageDirectory; + private final Set<File> brokerStorageDirectorys; private final Integer storageWaitTimeoutSec; private final int storagePollPeriodSec = 1; private final Time time = Time.SYSTEM; public BrokerLocalStorage(Integer brokerId, - String storageDirname, + Set<String> storageDirnames, Integer storageWaitTimeoutSec) { this.brokerId = brokerId; - this.brokerStorageDirectory = new File(storageDirname); + this.brokerStorageDirectorys = storageDirnames.stream().map(File::new).collect(Collectors.toSet()); this.storageWaitTimeoutSec = storageWaitTimeoutSec; } public Integer getBrokerId() { return brokerId; } + public Set<File> getBrokerStorageDirectory() { Review Comment: rename `getBrokerStorageDirectory` -> `getBrokerStorageDirectories` ########## storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestBuilder.java: ########## @@ -313,6 +314,14 @@ public TieredStorageTestBuilder reassignReplica(String topic, return this; } + public TieredStorageTestBuilder alterLogDir(String topic, + Integer partition, Review Comment: nit: parameter alignment ########## storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestHarness.java: ########## @@ -154,7 +154,7 @@ public static List<LocalTieredStorage> remoteStorageManagers(Seq<KafkaBroker> br @SuppressWarnings("deprecation") public static List<BrokerLocalStorage> localStorages(Seq<KafkaBroker> brokers) { return JavaConverters.seqAsJavaList(brokers).stream() - .map(b -> new BrokerLocalStorage(b.config().brokerId(), b.config().logDirs().head(), + .map(b -> new BrokerLocalStorage(b.config().brokerId(), JavaConverters.asJava(b.config().logDirs().toSet()), Review Comment: The build will fail to compile with scala 2.12 when `JavaConverters.asJava` is used: ``` ./gradlew clean :storage:build -PscalaVersion=2.12 ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org