This is an automated email from the ASF dual-hosted git repository.
satishd pushed a commit to branch 3.6
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.6 by this push:
new 991c5c0610f KAFKA-15410: Expand partitions, segment deletion by
retention and enable remote log on topic integration tests (1/4) (#14307)
991c5c0610f is described below
commit 991c5c0610f52319d5af5d7489899ec0604b77db
Author: Kamal Chandraprakash <[email protected]>
AuthorDate: Tue Sep 5 05:13:16 2023 +0530
KAFKA-15410: Expand partitions, segment deletion by retention and enable
remote log on topic integration tests (1/4) (#14307)
Added the below integration tests with tiered storage
- PartitionsExpandTest
- DeleteSegmentsByRetentionSizeTest
- DeleteSegmentsByRetentionTimeTest and
- EnableRemoteLogOnTopicTest
- Enabled the test for both ZK and Kraft modes.
These are enabled for both ZK and Kraft modes.
Reviewers: Satish Duggana <[email protected]>, Luke Chen
<[email protected]>, Christo Lolov <[email protected]>, Divij Vaidya
<[email protected]>
---
.../java/kafka/log/remote/RemoteLogManager.java | 2 +-
.../src/main/scala/kafka/server/BrokerServer.scala | 14 ++-
core/src/main/scala/kafka/server/KafkaServer.scala | 9 +-
.../tiered/storage/TieredStorageTestContext.java | 3 +-
.../tiered/storage/TieredStorageTestHarness.java | 28 ++++--
.../actions/ExpectEmptyRemoteStorageAction.java | 8 +-
.../integration/BaseDeleteSegmentsTest.java | 72 ++++++++++++++
.../DeleteSegmentsByRetentionSizeTest.java | 30 ++++++
.../DeleteSegmentsByRetentionTimeTest.java | 30 ++++++
.../integration/EnableRemoteLogOnTopicTest.java | 89 +++++++++++++++++
.../storage/integration/PartitionsExpandTest.java | 106 +++++++++++++++++++++
.../tiered/storage/utils/BrokerLocalStorage.java | 2 +-
12 files changed, 372 insertions(+), 21 deletions(-)
diff --git a/core/src/main/java/kafka/log/remote/RemoteLogManager.java
b/core/src/main/java/kafka/log/remote/RemoteLogManager.java
index fdd16347a89..8296acbc8ad 100644
--- a/core/src/main/java/kafka/log/remote/RemoteLogManager.java
+++ b/core/src/main/java/kafka/log/remote/RemoteLogManager.java
@@ -302,7 +302,7 @@ public class RemoteLogManager implements Closeable {
private void cacheTopicPartitionIds(TopicIdPartition topicIdPartition) {
Uuid previousTopicId =
topicIdByPartitionMap.put(topicIdPartition.topicPartition(),
topicIdPartition.topicId());
- if (previousTopicId != null && previousTopicId !=
topicIdPartition.topicId()) {
+ if (previousTopicId != null &&
!previousTopicId.equals(topicIdPartition.topicId())) {
LOGGER.info("Previous cached topic id {} for {} does not match
updated topic id {}",
previousTopicId, topicIdPartition.topicPartition(),
topicIdPartition.topicId());
}
diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala
b/core/src/main/scala/kafka/server/BrokerServer.scala
index 523959e037c..6decbcfd2ec 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -463,17 +463,21 @@ class BrokerServer(
new KafkaConfig(config.originals(), true)
// Start RemoteLogManager before broker start serving the requests.
- remoteLogManagerOpt.foreach(rlm => {
+ remoteLogManagerOpt.foreach { rlm =>
val listenerName =
config.remoteLogManagerConfig.remoteLogMetadataManagerListenerName()
if (listenerName != null) {
- val endpoint = endpoints.stream.filter(e =>
e.listenerName.equals(ListenerName.normalised(listenerName)))
+ val endpoint = endpoints.stream
+ .filter(e =>
+ e.listenerName().isPresent &&
+
ListenerName.normalised(e.listenerName().get()).equals(ListenerName.normalised(listenerName))
+ )
.findFirst()
- .orElseThrow(() => new
ConfigException(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_LISTENER_NAME_PROP
+
- " should be set as a listener name within valid broker listener
name list."))
+ .orElseThrow(() => new
ConfigException(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_LISTENER_NAME_PROP,
+ listenerName, "Should be set as a listener name within valid
broker listener name list: " + endpoints))
rlm.onEndPointCreated(EndPoint.fromJava(endpoint))
}
rlm.startup()
- })
+ }
// We're now ready to unfence the broker. This also allows this broker
to transition
// from RECOVERY state to RUNNING state, once the controller unfences
the broker.
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala
b/core/src/main/scala/kafka/server/KafkaServer.scala
index bcc12ef3978..091391e10ca 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -506,17 +506,18 @@ class KafkaServer(
KafkaServer.MIN_INCREMENTAL_FETCH_SESSION_EVICTION_MS))
// Start RemoteLogManager before broker start serving the requests.
- remoteLogManagerOpt.foreach(rlm => {
+ remoteLogManagerOpt.foreach { rlm =>
val listenerName =
config.remoteLogManagerConfig.remoteLogMetadataManagerListenerName()
if (listenerName != null) {
brokerInfo.broker.endPoints
.find(e =>
e.listenerName.equals(ListenerName.normalised(listenerName)))
- .orElse(throw new
ConfigException(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_LISTENER_NAME_PROP
+
- " should be set as a listener name within valid broker
listener name list."))
+ .orElse(throw new
ConfigException(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_LISTENER_NAME_PROP,
+ listenerName, "Should be set as a listener name within valid
broker listener name list: "
+ +
brokerInfo.broker.endPoints.map(_.listenerName).mkString(",")))
.foreach(e => rlm.onEndPointCreated(e))
}
rlm.startup()
- })
+ }
/* start processing requests */
val zkSupport = ZkSupport(adminManager, kafkaController, zkClient,
forwardingManager, metadataCache, brokerEpochManager)
diff --git
a/storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestContext.java
b/storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestContext.java
index 99e76293e45..1975a1690cf 100644
---
a/storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestContext.java
+++
b/storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestContext.java
@@ -190,13 +190,14 @@ public final class TieredStorageTestContext implements
AutoCloseable {
* @param batchSize the batch size
*/
public void produce(List<ProducerRecord<String, String>> recordsToProduce,
Integer batchSize) {
- int counter = 0;
+ int counter = 1;
for (ProducerRecord<String, String> record : recordsToProduce) {
producer.send(record);
if (counter++ % batchSize == 0) {
producer.flush();
}
}
+ producer.flush();
}
public List<ConsumerRecord<String, String>> consume(TopicPartition
topicPartition,
diff --git
a/storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestHarness.java
b/storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestHarness.java
index bed5452bdf5..34ff17f9d57 100644
---
a/storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestHarness.java
+++
b/storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestHarness.java
@@ -33,11 +33,13 @@ import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
-import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
import scala.collection.Seq;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.Optional;
@@ -77,9 +79,9 @@ public abstract class TieredStorageTestHarness extends
IntegrationTestHarness {
private static final Integer LOG_CLEANUP_INTERVAL_MS = 500;
private static final Integer RLM_TASK_INTERVAL_MS = 500;
- protected int numRemoteLogMetadataPartitions = 5;
private TieredStorageTestContext context;
private String testClassName = "";
+ private String storageDirPath = "";
@SuppressWarnings("deprecation")
@Override
@@ -89,6 +91,16 @@ public abstract class TieredStorageTestHarness extends
IntegrationTestHarness {
}
}
+ @SuppressWarnings("deprecation")
+ @Override
+ public Seq<Properties> kraftControllerConfigs() {
+ return
JavaConverters.asScalaBuffer(Collections.singletonList(overridingProps())).toSeq();
+ }
+
+ protected int numRemoteLogMetadataPartitions() {
+ return 5;
+ }
+
public Properties overridingProps() {
Assertions.assertTrue(STORAGE_WAIT_TIMEOUT_SEC >
TimeUnit.MILLISECONDS.toSeconds(RLM_TASK_INTERVAL_MS),
"STORAGE_WAIT_TIMEOUT_SEC should be greater than
RLM_TASK_INTERVAL_MS");
@@ -114,7 +126,7 @@ public abstract class TieredStorageTestHarness extends
IntegrationTestHarness {
overridingProps.setProperty(
metadataConfigPrefix(TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_PARTITIONS_PROP),
- String.valueOf(numRemoteLogMetadataPartitions));
+ String.valueOf(numRemoteLogMetadataPartitions()));
overridingProps.setProperty(
metadataConfigPrefix(TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_REPLICATION_FACTOR_PROP),
String.valueOf(brokerCount()));
@@ -130,8 +142,7 @@ public abstract class TieredStorageTestHarness extends
IntegrationTestHarness {
// in every broker and throughout the test. Indeed, as brokers are
restarted during the test.
// You can override this property with a fixed path of your choice if
you wish to use a non-temporary
// directory to access its content after a test terminated.
- overridingProps.setProperty(storageConfigPrefix(STORAGE_DIR_CONFIG),
- TestUtils.tempDirectory("kafka-remote-tier-" +
testClassName).getAbsolutePath());
+ overridingProps.setProperty(storageConfigPrefix(STORAGE_DIR_CONFIG),
storageDirPath);
// This configuration will remove all the remote files when close is
called in remote storage manager.
// Storage manager close is being called while the server is actively
processing the socket requests,
// so enabling this config can break the existing tests.
@@ -150,12 +161,15 @@ public abstract class TieredStorageTestHarness extends
IntegrationTestHarness {
@Override
public void setUp(TestInfo testInfo) {
testClassName =
testInfo.getTestClass().get().getSimpleName().toLowerCase(Locale.getDefault());
+ storageDirPath = TestUtils.tempDirectory("kafka-remote-tier-" +
testClassName).getAbsolutePath();
super.setUp(testInfo);
context = new TieredStorageTestContext(this);
}
- @Test
- public void executeTieredStorageTest() {
+ // NOTE: Not able to refer TestInfoUtils#TestWithParameterizedQuorumName()
in the ParameterizedTest name.
+ @ParameterizedTest(name = "{displayName}.quorum={0}")
+ @ValueSource(strings = {"zk", "kraft"})
+ public void executeTieredStorageTest(String quorum) {
TieredStorageTestBuilder builder = new TieredStorageTestBuilder();
writeTestSpecifications(builder);
try {
diff --git
a/storage/src/test/java/org/apache/kafka/tiered/storage/actions/ExpectEmptyRemoteStorageAction.java
b/storage/src/test/java/org/apache/kafka/tiered/storage/actions/ExpectEmptyRemoteStorageAction.java
index 6f9722cce4e..29b8ce22ace 100644
---
a/storage/src/test/java/org/apache/kafka/tiered/storage/actions/ExpectEmptyRemoteStorageAction.java
+++
b/storage/src/test/java/org/apache/kafka/tiered/storage/actions/ExpectEmptyRemoteStorageAction.java
@@ -36,8 +36,12 @@ public final class ExpectEmptyRemoteStorageAction implements
TieredStorageTestAc
public void doExecute(TieredStorageTestContext context) throws
InterruptedException {
TestUtils.waitForCondition(() -> {
LocalTieredStorageSnapshot snapshot =
context.takeTieredStorageSnapshot();
- return !snapshot.getTopicPartitions().contains(topicPartition) &&
- snapshot.getFilesets(topicPartition).isEmpty();
+ // We don't differentiate the case between segment deletion and
topic deletion so the underlying
+ // remote-storage-manager (RSM) doesn't know when to remove any
topic-level marker files/folders.
+ // In case of LocalTieredStorage (RSM), there will be empty
partition directories.
+ // With KAFKA-15166, the RSM will be able to delete the
topic-level marker folders, then the
+ // `LocalTieredStorageSnapshot` should not contain the partition
directories.
+ return snapshot.getFilesets(topicPartition).isEmpty();
}, 2000L, "Remote storage is not empty for " + topicPartition);
}
diff --git
a/storage/src/test/java/org/apache/kafka/tiered/storage/integration/BaseDeleteSegmentsTest.java
b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/BaseDeleteSegmentsTest.java
new file mode 100644
index 00000000000..0147c9f7c54
--- /dev/null
+++
b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/BaseDeleteSegmentsTest.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tiered.storage.integration;
+
+import org.apache.kafka.tiered.storage.TieredStorageTestBuilder;
+import org.apache.kafka.tiered.storage.TieredStorageTestHarness;
+import org.apache.kafka.tiered.storage.specs.KeyValueSpec;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import static
org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.DELETE_SEGMENT;
+
+public abstract class BaseDeleteSegmentsTest extends TieredStorageTestHarness {
+
+ @Override
+ public int brokerCount() {
+ return 1;
+ }
+
+ @Override
+ protected void writeTestSpecifications(TieredStorageTestBuilder builder) {
+ final Integer broker0 = 0;
+ final String topicA = "topicA";
+ final Integer p0 = 0;
+ final Integer partitionCount = 1;
+ final Integer replicationFactor = 1;
+ final Integer maxBatchCountPerSegment = 1;
+ final Map<Integer, List<Integer>> replicaAssignment = null;
+ final boolean enableRemoteLogStorage = true;
+ final int beginEpoch = 0;
+ final long startOffset = 3;
+
+ // Create topicA with 1 partition, 1 RF and enabled with remote
storage.
+ builder.createTopic(topicA, partitionCount, replicationFactor,
maxBatchCountPerSegment, replicaAssignment,
+ enableRemoteLogStorage)
+ // produce events to partition 0 and expect 3 segments to be
offloaded
+ .expectSegmentToBeOffloaded(broker0, topicA, p0, 0, new
KeyValueSpec("k0", "v0"))
+ .expectSegmentToBeOffloaded(broker0, topicA, p0, 1, new
KeyValueSpec("k1", "v1"))
+ .expectSegmentToBeOffloaded(broker0, topicA, p0, 2, new
KeyValueSpec("k2", "v2"))
+ .expectEarliestLocalOffsetInLogDirectory(topicA, p0, 3L)
+ .produce(topicA, p0, new KeyValueSpec("k0", "v0"), new
KeyValueSpec("k1", "v1"),
+ new KeyValueSpec("k2", "v2"), new KeyValueSpec("k3",
"v3"))
+ // update the topic config such that it triggers the deletion
of segments
+ .updateTopicConfig(topicA, configsToBeAdded(),
Collections.emptyList())
+ // expect that the three offloaded remote log segments are
deleted
+ .expectDeletionInRemoteStorage(broker0, topicA, p0,
DELETE_SEGMENT, 3)
+ .waitForRemoteLogSegmentDeletion(topicA)
+ // expect that the leader epoch checkpoint is updated
+ .expectLeaderEpochCheckpoint(broker0, topicA, p0, beginEpoch,
startOffset)
+ // consume from the beginning of the topic to read data from
local and remote storage
+ .expectFetchFromTieredStorage(broker0, topicA, p0, 0)
+ .consume(topicA, p0, 0L, 1, 0);
+ }
+
+ protected abstract Map<String, String> configsToBeAdded();
+}
diff --git
a/storage/src/test/java/org/apache/kafka/tiered/storage/integration/DeleteSegmentsByRetentionSizeTest.java
b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/DeleteSegmentsByRetentionSizeTest.java
new file mode 100644
index 00000000000..f767bfe9dc2
--- /dev/null
+++
b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/DeleteSegmentsByRetentionSizeTest.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tiered.storage.integration;
+
+import org.apache.kafka.common.config.TopicConfig;
+
+import java.util.Collections;
+import java.util.Map;
+
+public final class DeleteSegmentsByRetentionSizeTest extends
BaseDeleteSegmentsTest {
+
+ @Override
+ protected Map<String, String> configsToBeAdded() {
+ return Collections.singletonMap(TopicConfig.RETENTION_BYTES_CONFIG,
"1");
+ }
+}
diff --git
a/storage/src/test/java/org/apache/kafka/tiered/storage/integration/DeleteSegmentsByRetentionTimeTest.java
b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/DeleteSegmentsByRetentionTimeTest.java
new file mode 100644
index 00000000000..2c196713d12
--- /dev/null
+++
b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/DeleteSegmentsByRetentionTimeTest.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tiered.storage.integration;
+
+import org.apache.kafka.common.config.TopicConfig;
+
+import java.util.Collections;
+import java.util.Map;
+
+public final class DeleteSegmentsByRetentionTimeTest extends
BaseDeleteSegmentsTest {
+
+ @Override
+ protected Map<String, String> configsToBeAdded() {
+ return Collections.singletonMap(TopicConfig.RETENTION_MS_CONFIG, "1");
+ }
+}
diff --git
a/storage/src/test/java/org/apache/kafka/tiered/storage/integration/EnableRemoteLogOnTopicTest.java
b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/EnableRemoteLogOnTopicTest.java
new file mode 100644
index 00000000000..d63a9e1f276
--- /dev/null
+++
b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/EnableRemoteLogOnTopicTest.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tiered.storage.integration;
+
+import org.apache.kafka.common.config.TopicConfig;
+import org.apache.kafka.tiered.storage.TieredStorageTestBuilder;
+import org.apache.kafka.tiered.storage.TieredStorageTestHarness;
+import org.apache.kafka.tiered.storage.specs.KeyValueSpec;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+
+public final class EnableRemoteLogOnTopicTest extends TieredStorageTestHarness
{
+
+ @Override
+ public int brokerCount() {
+ return 2;
+ }
+
+ @Override
+ protected void writeTestSpecifications(TieredStorageTestBuilder builder) {
+ final Integer broker0 = 0;
+ final Integer broker1 = 1;
+ final String topicA = "topicA";
+ final Integer p0 = 0;
+ final Integer p1 = 1;
+ final Integer partitionCount = 2;
+ final Integer replicationFactor = 2;
+ final Integer maxBatchCountPerSegment = 1;
+ final boolean enableRemoteLogStorage = false;
+ final Map<Integer, List<Integer>> assignment = mkMap(
+ mkEntry(p0, Arrays.asList(broker0, broker1)),
+ mkEntry(p1, Arrays.asList(broker1, broker0))
+ );
+
+ builder
+ .createTopic(topicA, partitionCount, replicationFactor,
maxBatchCountPerSegment, assignment,
+ enableRemoteLogStorage)
+ // send records to partition 0
+ .expectEarliestLocalOffsetInLogDirectory(topicA, p0, 0L)
+ .produce(topicA, p0, new KeyValueSpec("k0", "v0"), new
KeyValueSpec("k1", "v1"),
+ new KeyValueSpec("k2", "v2"))
+ // send records to partition 1
+ .expectEarliestLocalOffsetInLogDirectory(topicA, p1, 0L)
+ .produce(topicA, p1, new KeyValueSpec("k0", "v0"), new
KeyValueSpec("k1", "v1"),
+ new KeyValueSpec("k2", "v2"))
+ // enable remote log storage
+ .updateTopicConfig(topicA,
+
Collections.singletonMap(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true"),
+ Collections.emptyList())
+ // produce some more records to partition 0
+ // Note that the segment 0-2 gets offloaded for p0, but we
cannot expect those events deterministically
+ // because the rlm-task-thread runs in background and this
framework doesn't support it.
+ .expectSegmentToBeOffloaded(broker0, topicA, p0, 3, new
KeyValueSpec("k3", "v3"))
+ .expectEarliestLocalOffsetInLogDirectory(topicA, p0, 4L)
+ .produce(topicA, p0, new KeyValueSpec("k3", "v3"), new
KeyValueSpec("k4", "v4"))
+ // produce some more records to partition 1
+ // Note that the segment 0-2 gets offloaded for p1, but we
cannot expect those events deterministically
+ // because the rlm-task-thread runs in background and this
framework doesn't support it.
+ .expectSegmentToBeOffloaded(broker1, topicA, p1, 3, new
KeyValueSpec("k3", "v3"))
+ .expectEarliestLocalOffsetInLogDirectory(topicA, p1, 4L)
+ .produce(topicA, p1, new KeyValueSpec("k3", "v3"), new
KeyValueSpec("k4", "v4"))
+ // consume from the beginning of the topic to read data from
local and remote storage for partition 0
+ .expectFetchFromTieredStorage(broker0, topicA, p0, 4)
+ .consume(topicA, p0, 0L, 5, 4)
+ // consume from the beginning of the topic to read data from
local and remote storage for partition 1
+ .expectFetchFromTieredStorage(broker1, topicA, p1, 4)
+ .consume(topicA, p1, 0L, 5, 4);
+ }
+}
diff --git
a/storage/src/test/java/org/apache/kafka/tiered/storage/integration/PartitionsExpandTest.java
b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/PartitionsExpandTest.java
new file mode 100644
index 00000000000..e3453bdc5ee
--- /dev/null
+++
b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/PartitionsExpandTest.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tiered.storage.integration;
+
+import org.apache.kafka.tiered.storage.TieredStorageTestBuilder;
+import org.apache.kafka.tiered.storage.TieredStorageTestHarness;
+import org.apache.kafka.tiered.storage.specs.KeyValueSpec;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+
+public final class PartitionsExpandTest extends TieredStorageTestHarness {
+
+ @Override
+ public int brokerCount() {
+ return 2;
+ }
+
+ @Override
+ protected void writeTestSpecifications(TieredStorageTestBuilder builder) {
+ final Integer broker0 = 0;
+ final Integer broker1 = 1;
+ final String topicA = "topicA";
+ final Integer p0 = 0;
+ final Integer p1 = 1;
+ final Integer p2 = 2;
+ final Integer partitionCount = 1;
+ final Integer replicationFactor = 2;
+ final Integer maxBatchCountPerSegment = 1;
+ final boolean enableRemoteLogStorage = true;
+ final List<Integer> p0Assignment = Arrays.asList(broker0, broker1);
+ final List<Integer> p1Assignment = Arrays.asList(broker0, broker1);
+ final List<Integer> p2Assignment = Arrays.asList(broker1, broker0);
+
+ builder
+ .createTopic(topicA, partitionCount, replicationFactor,
maxBatchCountPerSegment,
+ Collections.singletonMap(p0, p0Assignment),
enableRemoteLogStorage)
+ // produce events to partition 0
+ .expectSegmentToBeOffloaded(broker0, topicA, p0, 0, new
KeyValueSpec("k0", "v0"))
+ .expectSegmentToBeOffloaded(broker0, topicA, p0, 1, new
KeyValueSpec("k1", "v1"))
+ .expectEarliestLocalOffsetInLogDirectory(topicA, p0, 2L)
+ .produce(topicA, p0, new KeyValueSpec("k0", "v0"), new
KeyValueSpec("k1", "v1"),
+ new KeyValueSpec("k2", "v2"))
+ // expand the topicA partition-count to 3
+ .createPartitions(topicA, 3, mkMap(mkEntry(p1, p1Assignment),
mkEntry(p2, p2Assignment)))
+ // consume from the beginning of the topic to read data from
local and remote storage for partition 0
+ .expectFetchFromTieredStorage(broker0, topicA, p0, 2)
+ .consume(topicA, p0, 0L, 3, 2)
+
+ .expectLeader(topicA, p1, broker0, false)
+ .expectLeader(topicA, p2, broker1, false)
+
+ // produce events to partition 1
+ .expectSegmentToBeOffloaded(broker0, topicA, p1, 0, new
KeyValueSpec("k0", "v0"))
+ .expectSegmentToBeOffloaded(broker0, topicA, p1, 1, new
KeyValueSpec("k1", "v1"))
+ .expectEarliestLocalOffsetInLogDirectory(topicA, p1, 2L)
+ .produce(topicA, p1, new KeyValueSpec("k0", "v0"), new
KeyValueSpec("k1", "v1"),
+ new KeyValueSpec("k2", "v2"))
+
+ // produce events to partition 2
+ .expectSegmentToBeOffloaded(broker1, topicA, p2, 0, new
KeyValueSpec("k0", "v0"))
+ .expectSegmentToBeOffloaded(broker1, topicA, p2, 1, new
KeyValueSpec("k1", "v1"))
+ .expectEarliestLocalOffsetInLogDirectory(topicA, p2, 2L)
+ .produce(topicA, p2, new KeyValueSpec("k0", "v0"), new
KeyValueSpec("k1", "v1"),
+ new KeyValueSpec("k2", "v2"))
+
+ // produce some more events to partition 0 and expect the
segments to be offloaded
+ // NOTE: Support needs to be added to capture the offloaded
segment event for already sent message (k2, v2)
+ // .expectSegmentToBeOffloaded(broker0, topicA, p0, 2, new
KeyValueSpec("k2", "v2"))
+ .expectSegmentToBeOffloaded(broker0, topicA, p0, 3, new
KeyValueSpec("k3", "v3"))
+ .expectSegmentToBeOffloaded(broker0, topicA, p0, 4, new
KeyValueSpec("k4", "v4"))
+ .expectEarliestLocalOffsetInLogDirectory(topicA, p0, 5L)
+ .produce(topicA, p0, new KeyValueSpec("k3", "v3"), new
KeyValueSpec("k4", "v4"),
+ new KeyValueSpec("k5", "v5"))
+
+ // consume from the beginning of the topic to read data from
local and remote storage for partition 0
+ .expectFetchFromTieredStorage(broker0, topicA, p0, 5)
+ .consume(topicA, p0, 0L, 6, 5)
+
+ // consume from the beginning of the topic to read data from
local and remote storage for partition 1
+ .expectFetchFromTieredStorage(broker0, topicA, p1, 2)
+ .consume(topicA, p1, 0L, 3, 2)
+
+ // consume from the middle of the topic for partition 2
+ .expectFetchFromTieredStorage(broker1, topicA, p2, 1)
+ .consume(topicA, p2, 1L, 2, 1);
+ }
+}
diff --git
a/storage/src/test/java/org/apache/kafka/tiered/storage/utils/BrokerLocalStorage.java
b/storage/src/test/java/org/apache/kafka/tiered/storage/utils/BrokerLocalStorage.java
index 2367714c304..7197f3a9dff 100644
---
a/storage/src/test/java/org/apache/kafka/tiered/storage/utils/BrokerLocalStorage.java
+++
b/storage/src/test/java/org/apache/kafka/tiered/storage/utils/BrokerLocalStorage.java
@@ -56,7 +56,7 @@ public final class BrokerLocalStorage {
/**
* Wait until the first segment offset in Apache Kafka storage for the
given topic-partition is
- * equal or greater to the provided offset.
+ * equal to the provided offset.
* This ensures segments can be retrieved from the local tiered storage
when expected.
*
* @param topicPartition The topic-partition to check.