This is an automated email from the ASF dual-hosted git repository.

satishd pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new caaa4c55fee KAFKA-15410: Expand partitions, segment deletion by 
retention and enable remote log on topic integration tests (1/4) (#14307)
caaa4c55fee is described below

commit caaa4c55fee68c5893d54ffe84287f3b5205fff1
Author: Kamal Chandraprakash <[email protected]>
AuthorDate: Tue Sep 5 05:13:16 2023 +0530

    KAFKA-15410: Expand partitions, segment deletion by retention and enable 
remote log on topic integration tests (1/4) (#14307)
    
    Added the below integration tests with tiered storage
     - PartitionsExpandTest
     - DeleteSegmentsByRetentionSizeTest
     - DeleteSegmentsByRetentionTimeTest and
     - EnableRemoteLogOnTopicTest
     - Enabled the test for both ZK and Kraft modes.
    
    These are enabled for both ZK and Kraft modes.
    
    Reviewers: Satish Duggana <[email protected]>, Luke Chen 
<[email protected]>, Christo Lolov <[email protected]>, Divij Vaidya 
<[email protected]>
---
 .../java/kafka/log/remote/RemoteLogManager.java    |   2 +-
 .../src/main/scala/kafka/server/BrokerServer.scala |  14 ++-
 core/src/main/scala/kafka/server/KafkaServer.scala |   9 +-
 .../tiered/storage/TieredStorageTestContext.java   |   3 +-
 .../tiered/storage/TieredStorageTestHarness.java   |  28 ++++--
 .../actions/ExpectEmptyRemoteStorageAction.java    |   8 +-
 .../integration/BaseDeleteSegmentsTest.java        |  72 ++++++++++++++
 .../DeleteSegmentsByRetentionSizeTest.java         |  30 ++++++
 .../DeleteSegmentsByRetentionTimeTest.java         |  30 ++++++
 .../integration/EnableRemoteLogOnTopicTest.java    |  89 +++++++++++++++++
 .../storage/integration/PartitionsExpandTest.java  | 106 +++++++++++++++++++++
 .../tiered/storage/utils/BrokerLocalStorage.java   |   2 +-
 12 files changed, 372 insertions(+), 21 deletions(-)

diff --git a/core/src/main/java/kafka/log/remote/RemoteLogManager.java 
b/core/src/main/java/kafka/log/remote/RemoteLogManager.java
index fdd16347a89..8296acbc8ad 100644
--- a/core/src/main/java/kafka/log/remote/RemoteLogManager.java
+++ b/core/src/main/java/kafka/log/remote/RemoteLogManager.java
@@ -302,7 +302,7 @@ public class RemoteLogManager implements Closeable {
 
     private void cacheTopicPartitionIds(TopicIdPartition topicIdPartition) {
         Uuid previousTopicId = 
topicIdByPartitionMap.put(topicIdPartition.topicPartition(), 
topicIdPartition.topicId());
-        if (previousTopicId != null && previousTopicId != 
topicIdPartition.topicId()) {
+        if (previousTopicId != null && 
!previousTopicId.equals(topicIdPartition.topicId())) {
             LOGGER.info("Previous cached topic id {} for {} does not match 
updated topic id {}",
                     previousTopicId, topicIdPartition.topicPartition(), 
topicIdPartition.topicId());
         }
diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala 
b/core/src/main/scala/kafka/server/BrokerServer.scala
index 523959e037c..6decbcfd2ec 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -463,17 +463,21 @@ class BrokerServer(
       new KafkaConfig(config.originals(), true)
 
       // Start RemoteLogManager before broker start serving the requests.
-      remoteLogManagerOpt.foreach(rlm => {
+      remoteLogManagerOpt.foreach { rlm =>
         val listenerName = 
config.remoteLogManagerConfig.remoteLogMetadataManagerListenerName()
         if (listenerName != null) {
-          val endpoint = endpoints.stream.filter(e => 
e.listenerName.equals(ListenerName.normalised(listenerName)))
+          val endpoint = endpoints.stream
+            .filter(e =>
+              e.listenerName().isPresent &&
+              
ListenerName.normalised(e.listenerName().get()).equals(ListenerName.normalised(listenerName))
+            )
             .findFirst()
-            .orElseThrow(() => new 
ConfigException(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_LISTENER_NAME_PROP
 +
-              " should be set as a listener name within valid broker listener 
name list."))
+            .orElseThrow(() => new 
ConfigException(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_LISTENER_NAME_PROP,
+              listenerName, "Should be set as a listener name within valid 
broker listener name list: " + endpoints))
           rlm.onEndPointCreated(EndPoint.fromJava(endpoint))
         }
         rlm.startup()
-      })
+      }
 
       // We're now ready to unfence the broker. This also allows this broker 
to transition
       // from RECOVERY state to RUNNING state, once the controller unfences 
the broker.
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala 
b/core/src/main/scala/kafka/server/KafkaServer.scala
index bcc12ef3978..091391e10ca 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -506,17 +506,18 @@ class KafkaServer(
             KafkaServer.MIN_INCREMENTAL_FETCH_SESSION_EVICTION_MS))
 
         // Start RemoteLogManager before broker start serving the requests.
-        remoteLogManagerOpt.foreach(rlm => {
+        remoteLogManagerOpt.foreach { rlm =>
           val listenerName = 
config.remoteLogManagerConfig.remoteLogMetadataManagerListenerName()
           if (listenerName != null) {
             brokerInfo.broker.endPoints
               .find(e => 
e.listenerName.equals(ListenerName.normalised(listenerName)))
-              .orElse(throw new 
ConfigException(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_LISTENER_NAME_PROP
 +
-                " should be set as a listener name within valid broker 
listener name list."))
+              .orElse(throw new 
ConfigException(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_LISTENER_NAME_PROP,
+                listenerName, "Should be set as a listener name within valid 
broker listener name list: "
+                  + 
brokerInfo.broker.endPoints.map(_.listenerName).mkString(",")))
               .foreach(e => rlm.onEndPointCreated(e))
           }
           rlm.startup()
-        })
+        }
 
         /* start processing requests */
         val zkSupport = ZkSupport(adminManager, kafkaController, zkClient, 
forwardingManager, metadataCache, brokerEpochManager)
diff --git 
a/storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestContext.java
 
b/storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestContext.java
index 99e76293e45..1975a1690cf 100644
--- 
a/storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestContext.java
+++ 
b/storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestContext.java
@@ -190,13 +190,14 @@ public final class TieredStorageTestContext implements 
AutoCloseable {
      * @param batchSize the batch size
      */
     public void produce(List<ProducerRecord<String, String>> recordsToProduce, 
Integer batchSize) {
-        int counter = 0;
+        int counter = 1;
         for (ProducerRecord<String, String> record : recordsToProduce) {
             producer.send(record);
             if (counter++ % batchSize == 0) {
                 producer.flush();
             }
         }
+        producer.flush();
     }
 
     public List<ConsumerRecord<String, String>> consume(TopicPartition 
topicPartition,
diff --git 
a/storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestHarness.java
 
b/storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestHarness.java
index bed5452bdf5..34ff17f9d57 100644
--- 
a/storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestHarness.java
+++ 
b/storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestHarness.java
@@ -33,11 +33,13 @@ import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Tag;
-import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
 import scala.collection.Seq;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.Locale;
 import java.util.Optional;
@@ -77,9 +79,9 @@ public abstract class TieredStorageTestHarness extends 
IntegrationTestHarness {
     private static final Integer LOG_CLEANUP_INTERVAL_MS = 500;
     private static final Integer RLM_TASK_INTERVAL_MS = 500;
 
-    protected int numRemoteLogMetadataPartitions = 5;
     private TieredStorageTestContext context;
     private String testClassName = "";
+    private String storageDirPath = "";
 
     @SuppressWarnings("deprecation")
     @Override
@@ -89,6 +91,16 @@ public abstract class TieredStorageTestHarness extends 
IntegrationTestHarness {
         }
     }
 
+    @SuppressWarnings("deprecation")
+    @Override
+    public Seq<Properties> kraftControllerConfigs() {
+        return 
JavaConverters.asScalaBuffer(Collections.singletonList(overridingProps())).toSeq();
+    }
+
+    protected int numRemoteLogMetadataPartitions() {
+        return 5;
+    }
+
     public Properties overridingProps() {
         Assertions.assertTrue(STORAGE_WAIT_TIMEOUT_SEC > 
TimeUnit.MILLISECONDS.toSeconds(RLM_TASK_INTERVAL_MS),
                 "STORAGE_WAIT_TIMEOUT_SEC should be greater than 
RLM_TASK_INTERVAL_MS");
@@ -114,7 +126,7 @@ public abstract class TieredStorageTestHarness extends 
IntegrationTestHarness {
 
         overridingProps.setProperty(
                 
metadataConfigPrefix(TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_PARTITIONS_PROP),
-                String.valueOf(numRemoteLogMetadataPartitions));
+                String.valueOf(numRemoteLogMetadataPartitions()));
         overridingProps.setProperty(
                 
metadataConfigPrefix(TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_REPLICATION_FACTOR_PROP),
                 String.valueOf(brokerCount()));
@@ -130,8 +142,7 @@ public abstract class TieredStorageTestHarness extends 
IntegrationTestHarness {
         // in every broker and throughout the test. Indeed, as brokers are 
restarted during the test.
         // You can override this property with a fixed path of your choice if 
you wish to use a non-temporary
         // directory to access its content after a test terminated.
-        overridingProps.setProperty(storageConfigPrefix(STORAGE_DIR_CONFIG),
-                TestUtils.tempDirectory("kafka-remote-tier-" + 
testClassName).getAbsolutePath());
+        overridingProps.setProperty(storageConfigPrefix(STORAGE_DIR_CONFIG), 
storageDirPath);
         // This configuration will remove all the remote files when close is 
called in remote storage manager.
         // Storage manager close is being called while the server is actively 
processing the socket requests,
         // so enabling this config can break the existing tests.
@@ -150,12 +161,15 @@ public abstract class TieredStorageTestHarness extends 
IntegrationTestHarness {
     @Override
     public void setUp(TestInfo testInfo) {
         testClassName = 
testInfo.getTestClass().get().getSimpleName().toLowerCase(Locale.getDefault());
+        storageDirPath = TestUtils.tempDirectory("kafka-remote-tier-" + 
testClassName).getAbsolutePath();
         super.setUp(testInfo);
         context = new TieredStorageTestContext(this);
     }
 
-    @Test
-    public void executeTieredStorageTest() {
+    // NOTE: Not able to refer TestInfoUtils#TestWithParameterizedQuorumName() 
in the ParameterizedTest name.
+    @ParameterizedTest(name = "{displayName}.quorum={0}")
+    @ValueSource(strings = {"zk", "kraft"})
+    public void executeTieredStorageTest(String quorum) {
         TieredStorageTestBuilder builder = new TieredStorageTestBuilder();
         writeTestSpecifications(builder);
         try {
diff --git 
a/storage/src/test/java/org/apache/kafka/tiered/storage/actions/ExpectEmptyRemoteStorageAction.java
 
b/storage/src/test/java/org/apache/kafka/tiered/storage/actions/ExpectEmptyRemoteStorageAction.java
index 6f9722cce4e..29b8ce22ace 100644
--- 
a/storage/src/test/java/org/apache/kafka/tiered/storage/actions/ExpectEmptyRemoteStorageAction.java
+++ 
b/storage/src/test/java/org/apache/kafka/tiered/storage/actions/ExpectEmptyRemoteStorageAction.java
@@ -36,8 +36,12 @@ public final class ExpectEmptyRemoteStorageAction implements 
TieredStorageTestAc
     public void doExecute(TieredStorageTestContext context) throws 
InterruptedException {
         TestUtils.waitForCondition(() -> {
             LocalTieredStorageSnapshot snapshot = 
context.takeTieredStorageSnapshot();
-            return !snapshot.getTopicPartitions().contains(topicPartition) &&
-                    snapshot.getFilesets(topicPartition).isEmpty();
+            // We don't differentiate the case between segment deletion and 
topic deletion so the underlying
+            // remote-storage-manager (RSM) doesn't know when to remove any 
topic-level marker files/folders.
+            // In case of LocalTieredStorage (RSM), there will be empty 
partition directories.
+            // With KAFKA-15166, the RSM will be able to delete the 
topic-level marker folders, then the
+            // `LocalTieredStorageSnapshot` should not contain the partition 
directories.
+            return snapshot.getFilesets(topicPartition).isEmpty();
         }, 2000L, "Remote storage is not empty for " + topicPartition);
     }
 
diff --git 
a/storage/src/test/java/org/apache/kafka/tiered/storage/integration/BaseDeleteSegmentsTest.java
 
b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/BaseDeleteSegmentsTest.java
new file mode 100644
index 00000000000..0147c9f7c54
--- /dev/null
+++ 
b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/BaseDeleteSegmentsTest.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tiered.storage.integration;
+
+import org.apache.kafka.tiered.storage.TieredStorageTestBuilder;
+import org.apache.kafka.tiered.storage.TieredStorageTestHarness;
+import org.apache.kafka.tiered.storage.specs.KeyValueSpec;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import static 
org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.DELETE_SEGMENT;
+
+public abstract class BaseDeleteSegmentsTest extends TieredStorageTestHarness {
+
+    @Override
+    public int brokerCount() {
+        return 1;
+    }
+
+    @Override
+    protected void writeTestSpecifications(TieredStorageTestBuilder builder) {
+        final Integer broker0 = 0;
+        final String topicA = "topicA";
+        final Integer p0 = 0;
+        final Integer partitionCount = 1;
+        final Integer replicationFactor = 1;
+        final Integer maxBatchCountPerSegment = 1;
+        final Map<Integer, List<Integer>> replicaAssignment = null;
+        final boolean enableRemoteLogStorage = true;
+        final int beginEpoch = 0;
+        final long startOffset = 3;
+
+        // Create topicA with 1 partition, 1 RF and enabled with remote 
storage.
+        builder.createTopic(topicA, partitionCount, replicationFactor, 
maxBatchCountPerSegment, replicaAssignment,
+                        enableRemoteLogStorage)
+                // produce events to partition 0 and expect 3 segments to be 
offloaded
+                .expectSegmentToBeOffloaded(broker0, topicA, p0, 0, new 
KeyValueSpec("k0", "v0"))
+                .expectSegmentToBeOffloaded(broker0, topicA, p0, 1, new 
KeyValueSpec("k1", "v1"))
+                .expectSegmentToBeOffloaded(broker0, topicA, p0, 2, new 
KeyValueSpec("k2", "v2"))
+                .expectEarliestLocalOffsetInLogDirectory(topicA, p0, 3L)
+                .produce(topicA, p0, new KeyValueSpec("k0", "v0"), new 
KeyValueSpec("k1", "v1"),
+                        new KeyValueSpec("k2", "v2"), new KeyValueSpec("k3", 
"v3"))
+                // update the topic config such that it triggers the deletion 
of segments
+                .updateTopicConfig(topicA, configsToBeAdded(), 
Collections.emptyList())
+                // expect that the three offloaded remote log segments are 
deleted
+                .expectDeletionInRemoteStorage(broker0, topicA, p0, 
DELETE_SEGMENT, 3)
+                .waitForRemoteLogSegmentDeletion(topicA)
+                // expect that the leader epoch checkpoint is updated
+                .expectLeaderEpochCheckpoint(broker0, topicA, p0, beginEpoch, 
startOffset)
+                // consume from the beginning of the topic to read data from 
local and remote storage
+                .expectFetchFromTieredStorage(broker0, topicA, p0, 0)
+                .consume(topicA, p0, 0L, 1, 0);
+    }
+
+    protected abstract Map<String, String> configsToBeAdded();
+}
diff --git 
a/storage/src/test/java/org/apache/kafka/tiered/storage/integration/DeleteSegmentsByRetentionSizeTest.java
 
b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/DeleteSegmentsByRetentionSizeTest.java
new file mode 100644
index 00000000000..f767bfe9dc2
--- /dev/null
+++ 
b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/DeleteSegmentsByRetentionSizeTest.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tiered.storage.integration;
+
+import org.apache.kafka.common.config.TopicConfig;
+
+import java.util.Collections;
+import java.util.Map;
+
+public final class DeleteSegmentsByRetentionSizeTest extends 
BaseDeleteSegmentsTest {
+
+    @Override
+    protected Map<String, String> configsToBeAdded() {
+        return Collections.singletonMap(TopicConfig.RETENTION_BYTES_CONFIG, 
"1");
+    }
+}
diff --git 
a/storage/src/test/java/org/apache/kafka/tiered/storage/integration/DeleteSegmentsByRetentionTimeTest.java
 
b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/DeleteSegmentsByRetentionTimeTest.java
new file mode 100644
index 00000000000..2c196713d12
--- /dev/null
+++ 
b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/DeleteSegmentsByRetentionTimeTest.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tiered.storage.integration;
+
+import org.apache.kafka.common.config.TopicConfig;
+
+import java.util.Collections;
+import java.util.Map;
+
+public final class DeleteSegmentsByRetentionTimeTest extends 
BaseDeleteSegmentsTest {
+
+    @Override
+    protected Map<String, String> configsToBeAdded() {
+        return Collections.singletonMap(TopicConfig.RETENTION_MS_CONFIG, "1");
+    }
+}
diff --git 
a/storage/src/test/java/org/apache/kafka/tiered/storage/integration/EnableRemoteLogOnTopicTest.java
 
b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/EnableRemoteLogOnTopicTest.java
new file mode 100644
index 00000000000..d63a9e1f276
--- /dev/null
+++ 
b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/EnableRemoteLogOnTopicTest.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tiered.storage.integration;
+
+import org.apache.kafka.common.config.TopicConfig;
+import org.apache.kafka.tiered.storage.TieredStorageTestBuilder;
+import org.apache.kafka.tiered.storage.TieredStorageTestHarness;
+import org.apache.kafka.tiered.storage.specs.KeyValueSpec;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+
+public final class EnableRemoteLogOnTopicTest extends TieredStorageTestHarness 
{
+
+    @Override
+    public int brokerCount() {
+        return 2;
+    }
+
+    @Override
+    protected void writeTestSpecifications(TieredStorageTestBuilder builder) {
+        final Integer broker0 = 0;
+        final Integer broker1 = 1;
+        final String topicA = "topicA";
+        final Integer p0 = 0;
+        final Integer p1 = 1;
+        final Integer partitionCount = 2;
+        final Integer replicationFactor = 2;
+        final Integer maxBatchCountPerSegment = 1;
+        final boolean enableRemoteLogStorage = false;
+        final Map<Integer, List<Integer>> assignment = mkMap(
+                mkEntry(p0, Arrays.asList(broker0, broker1)),
+                mkEntry(p1, Arrays.asList(broker1, broker0))
+        );
+
+        builder
+                .createTopic(topicA, partitionCount, replicationFactor, 
maxBatchCountPerSegment, assignment,
+                        enableRemoteLogStorage)
+                // send records to partition 0
+                .expectEarliestLocalOffsetInLogDirectory(topicA, p0, 0L)
+                .produce(topicA, p0, new KeyValueSpec("k0", "v0"), new 
KeyValueSpec("k1", "v1"),
+                        new KeyValueSpec("k2", "v2"))
+                // send records to partition 1
+                .expectEarliestLocalOffsetInLogDirectory(topicA, p1, 0L)
+                .produce(topicA, p1, new KeyValueSpec("k0", "v0"), new 
KeyValueSpec("k1", "v1"),
+                        new KeyValueSpec("k2", "v2"))
+                // enable remote log storage
+                .updateTopicConfig(topicA,
+                        
Collections.singletonMap(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true"),
+                        Collections.emptyList())
+                // produce some more records to partition 0
+                // Note that the segment 0-2 gets offloaded for p0, but we 
cannot expect those events deterministically
+                // because the rlm-task-thread runs in background and this 
framework doesn't support it.
+                .expectSegmentToBeOffloaded(broker0, topicA, p0, 3, new 
KeyValueSpec("k3", "v3"))
+                .expectEarliestLocalOffsetInLogDirectory(topicA, p0, 4L)
+                .produce(topicA, p0, new KeyValueSpec("k3", "v3"), new 
KeyValueSpec("k4", "v4"))
+                // produce some more records to partition 1
+                // Note that the segment 0-2 gets offloaded for p1, but we 
cannot expect those events deterministically
+                // because the rlm-task-thread runs in background and this 
framework doesn't support it.
+                .expectSegmentToBeOffloaded(broker1, topicA, p1, 3, new 
KeyValueSpec("k3", "v3"))
+                .expectEarliestLocalOffsetInLogDirectory(topicA, p1, 4L)
+                .produce(topicA, p1, new KeyValueSpec("k3", "v3"), new 
KeyValueSpec("k4", "v4"))
+                // consume from the beginning of the topic to read data from 
local and remote storage for partition 0
+                .expectFetchFromTieredStorage(broker0, topicA, p0, 4)
+                .consume(topicA, p0, 0L, 5, 4)
+                // consume from the beginning of the topic to read data from 
local and remote storage for partition 1
+                .expectFetchFromTieredStorage(broker1, topicA, p1, 4)
+                .consume(topicA, p1, 0L, 5, 4);
+    }
+}
diff --git 
a/storage/src/test/java/org/apache/kafka/tiered/storage/integration/PartitionsExpandTest.java
 
b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/PartitionsExpandTest.java
new file mode 100644
index 00000000000..e3453bdc5ee
--- /dev/null
+++ 
b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/PartitionsExpandTest.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tiered.storage.integration;
+
+import org.apache.kafka.tiered.storage.TieredStorageTestBuilder;
+import org.apache.kafka.tiered.storage.TieredStorageTestHarness;
+import org.apache.kafka.tiered.storage.specs.KeyValueSpec;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+
+public final class PartitionsExpandTest extends TieredStorageTestHarness {
+
+    @Override
+    public int brokerCount() {
+        return 2;
+    }
+
+    @Override
+    protected void writeTestSpecifications(TieredStorageTestBuilder builder) {
+        final Integer broker0 = 0;
+        final Integer broker1 = 1;
+        final String topicA = "topicA";
+        final Integer p0 = 0;
+        final Integer p1 = 1;
+        final Integer p2 = 2;
+        final Integer partitionCount = 1;
+        final Integer replicationFactor = 2;
+        final Integer maxBatchCountPerSegment = 1;
+        final boolean enableRemoteLogStorage = true;
+        final List<Integer> p0Assignment = Arrays.asList(broker0, broker1);
+        final List<Integer> p1Assignment = Arrays.asList(broker0, broker1);
+        final List<Integer> p2Assignment = Arrays.asList(broker1, broker0);
+
+        builder
+                .createTopic(topicA, partitionCount, replicationFactor, 
maxBatchCountPerSegment,
+                        Collections.singletonMap(p0, p0Assignment), 
enableRemoteLogStorage)
+                // produce events to partition 0
+                .expectSegmentToBeOffloaded(broker0, topicA, p0, 0, new 
KeyValueSpec("k0", "v0"))
+                .expectSegmentToBeOffloaded(broker0, topicA, p0, 1, new 
KeyValueSpec("k1", "v1"))
+                .expectEarliestLocalOffsetInLogDirectory(topicA, p0, 2L)
+                .produce(topicA, p0, new KeyValueSpec("k0", "v0"), new 
KeyValueSpec("k1", "v1"),
+                        new KeyValueSpec("k2", "v2"))
+                // expand the topicA partition-count to 3
+                .createPartitions(topicA, 3, mkMap(mkEntry(p1, p1Assignment), 
mkEntry(p2, p2Assignment)))
+                // consume from the beginning of the topic to read data from 
local and remote storage for partition 0
+                .expectFetchFromTieredStorage(broker0, topicA, p0, 2)
+                .consume(topicA, p0, 0L, 3, 2)
+
+                .expectLeader(topicA, p1, broker0, false)
+                .expectLeader(topicA, p2, broker1, false)
+
+                // produce events to partition 1
+                .expectSegmentToBeOffloaded(broker0, topicA, p1, 0, new 
KeyValueSpec("k0", "v0"))
+                .expectSegmentToBeOffloaded(broker0, topicA, p1, 1, new 
KeyValueSpec("k1", "v1"))
+                .expectEarliestLocalOffsetInLogDirectory(topicA, p1, 2L)
+                .produce(topicA, p1, new KeyValueSpec("k0", "v0"), new 
KeyValueSpec("k1", "v1"),
+                        new KeyValueSpec("k2", "v2"))
+
+                // produce events to partition 2
+                .expectSegmentToBeOffloaded(broker1, topicA, p2, 0, new 
KeyValueSpec("k0", "v0"))
+                .expectSegmentToBeOffloaded(broker1, topicA, p2, 1, new 
KeyValueSpec("k1", "v1"))
+                .expectEarliestLocalOffsetInLogDirectory(topicA, p2, 2L)
+                .produce(topicA, p2, new KeyValueSpec("k0", "v0"), new 
KeyValueSpec("k1", "v1"),
+                        new KeyValueSpec("k2", "v2"))
+
+                // produce some more events to partition 0 and expect the 
segments to be offloaded
+                // NOTE: Support needs to be added to capture the offloaded 
segment event for already sent message (k2, v2)
+                // .expectSegmentToBeOffloaded(broker0, topicA, p0, 2, new 
KeyValueSpec("k2", "v2"))
+                .expectSegmentToBeOffloaded(broker0, topicA, p0, 3, new 
KeyValueSpec("k3", "v3"))
+                .expectSegmentToBeOffloaded(broker0, topicA, p0, 4, new 
KeyValueSpec("k4", "v4"))
+                .expectEarliestLocalOffsetInLogDirectory(topicA, p0, 5L)
+                .produce(topicA, p0, new KeyValueSpec("k3", "v3"), new 
KeyValueSpec("k4", "v4"),
+                        new KeyValueSpec("k5", "v5"))
+
+                // consume from the beginning of the topic to read data from 
local and remote storage for partition 0
+                .expectFetchFromTieredStorage(broker0, topicA, p0, 5)
+                .consume(topicA, p0, 0L, 6, 5)
+
+                // consume from the beginning of the topic to read data from 
local and remote storage for partition 1
+                .expectFetchFromTieredStorage(broker0, topicA, p1, 2)
+                .consume(topicA, p1, 0L, 3, 2)
+
+                // consume from the middle of the topic for partition 2
+                .expectFetchFromTieredStorage(broker1, topicA, p2, 1)
+                .consume(topicA, p2, 1L, 2, 1);
+    }
+}
diff --git 
a/storage/src/test/java/org/apache/kafka/tiered/storage/utils/BrokerLocalStorage.java
 
b/storage/src/test/java/org/apache/kafka/tiered/storage/utils/BrokerLocalStorage.java
index 2367714c304..7197f3a9dff 100644
--- 
a/storage/src/test/java/org/apache/kafka/tiered/storage/utils/BrokerLocalStorage.java
+++ 
b/storage/src/test/java/org/apache/kafka/tiered/storage/utils/BrokerLocalStorage.java
@@ -56,7 +56,7 @@ public final class BrokerLocalStorage {
 
     /**
      * Wait until the first segment offset in Apache Kafka storage for the 
given topic-partition is
-     * equal or greater to the provided offset.
+     * equal to the provided offset.
      * This ensures segments can be retrieved from the local tiered storage 
when expected.
      *
      * @param topicPartition The topic-partition to check.

Reply via email to