divijvaidya commented on code in PR #14116:
URL: https://github.com/apache/kafka/pull/14116#discussion_r1285556220


##########
storage/src/test/java/org/apache/kafka/tiered/storage/integration/OffloadAndConsumeFromLeaderTest.java:
##########
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tiered.storage.integration;
+
+import org.apache.kafka.tiered.storage.TieredStorageTestBuilder;
+import org.apache.kafka.tiered.storage.TieredStorageTestHarness;
+import org.apache.kafka.tiered.storage.specs.KeyValueSpec;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Test Cases (A):
+ *    Elementary offloads and fetches from tiered storage.
+ */
+public final class OffloadAndConsumeFromLeaderTest extends 
TieredStorageTestHarness {

Review Comment:
   Since this test is currently failing, I will suggest to mark it as 
`@disabled` since it will add noise for other PRs and they may wonder why this 
test is failing. Folks who are working on TS related changes can choose to 
remove this once it passes.



##########
storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestHarness.java:
##########
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tiered.storage;
+
+import org.apache.kafka.tiered.storage.utils.BrokerLocalStorage;
+import kafka.api.IntegrationTestHarness;
+import kafka.log.remote.RemoteLogManager;
+import kafka.server.KafkaBroker;
+import kafka.server.KafkaConfig;
+import kafka.utils.TestUtils;
+import org.apache.kafka.common.replica.ReplicaSelector;
+import 
org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManager;
+import 
org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig;
+import 
org.apache.kafka.server.log.remote.storage.ClassLoaderAwareRemoteStorageManager;
+import org.apache.kafka.server.log.remote.storage.LocalTieredStorage;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageManager;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+import scala.collection.Seq;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.stream.Collectors;
+
+import scala.collection.JavaConverters;
+
+import static 
org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.REMOTE_STORAGE_MANAGER_CLASS_NAME_PROP;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.REMOTE_LOG_MANAGER_TASK_INTERVAL_MS_PROP;
+
+import static 
org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX_PROP;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.REMOTE_STORAGE_MANAGER_CONFIG_PREFIX_PROP;
+
+import static 
org.apache.kafka.server.log.remote.storage.LocalTieredStorage.DELETE_ON_CLOSE_CONFIG;
+import static 
org.apache.kafka.server.log.remote.storage.LocalTieredStorage.STORAGE_DIR_CONFIG;
+
+/**
+ * Base class for integration tests exercising the tiered storage 
functionality in Apache Kafka.
+ */
+public abstract class TieredStorageTestHarness extends IntegrationTestHarness {
+
+    /**
+     * InitialTaskDelayMs is set to 30 seconds for the delete-segment 
scheduler in Apache Kafka.
+     * Hence, we need to wait at least that amount of time before segments 
eligible for deletion
+     * gets physically removed.
+     */
+    private static final Integer STORAGE_WAIT_TIMEOUT_SEC = 35;
+
+    private TieredStorageTestContext context;
+
+    protected int numMetadataPartition() {
+        return 5;
+    }
+
+    @SuppressWarnings("deprecation")
+    @Override
+    public Seq<KafkaConfig> generateConfigs() {
+        Properties overridingProps = getOverridingProps();
+        List<KafkaConfig> configs =
+                
JavaConverters.seqAsJavaList(TestUtils.createServerConfigs(brokerCount(), 
zkConnectOrNull()))
+                        .stream()
+                        .map(config -> KafkaConfig.fromProps(config, 
overridingProps))
+                        .collect(Collectors.toList());
+        return JavaConverters.asScalaBuffer(configs).toSeq();
+    }
+
+    public Properties getOverridingProps() {

Review Comment:
   in Kafka code base, we don't use prefix such as "get". May I suggest 
renaming this to overrideProps()



##########
core/src/main/scala/kafka/server/KafkaBroker.scala:
##########
@@ -19,6 +19,7 @@ package kafka.server
 
 import com.yammer.metrics.core.MetricName
 import kafka.log.LogManager
+import kafka.log.remote.RemoteLogManager

Review Comment:
   Extra import?



##########
storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestHarness.java:
##########
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tiered.storage;
+
+import org.apache.kafka.tiered.storage.utils.BrokerLocalStorage;
+import kafka.api.IntegrationTestHarness;
+import kafka.log.remote.RemoteLogManager;
+import kafka.server.KafkaBroker;
+import kafka.server.KafkaConfig;
+import kafka.utils.TestUtils;
+import org.apache.kafka.common.replica.ReplicaSelector;
+import 
org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManager;
+import 
org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig;
+import 
org.apache.kafka.server.log.remote.storage.ClassLoaderAwareRemoteStorageManager;
+import org.apache.kafka.server.log.remote.storage.LocalTieredStorage;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageManager;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+import scala.collection.Seq;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.stream.Collectors;
+
+import scala.collection.JavaConverters;
+
+import static 
org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.REMOTE_STORAGE_MANAGER_CLASS_NAME_PROP;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.REMOTE_LOG_MANAGER_TASK_INTERVAL_MS_PROP;
+
+import static 
org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX_PROP;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.REMOTE_STORAGE_MANAGER_CONFIG_PREFIX_PROP;
+
+import static 
org.apache.kafka.server.log.remote.storage.LocalTieredStorage.DELETE_ON_CLOSE_CONFIG;
+import static 
org.apache.kafka.server.log.remote.storage.LocalTieredStorage.STORAGE_DIR_CONFIG;
+
+/**
+ * Base class for integration tests exercising the tiered storage 
functionality in Apache Kafka.
+ */
+public abstract class TieredStorageTestHarness extends IntegrationTestHarness {
+
+    /**
+     * InitialTaskDelayMs is set to 30 seconds for the delete-segment 
scheduler in Apache Kafka.
+     * Hence, we need to wait at least that amount of time before segments 
eligible for deletion
+     * gets physically removed.
+     */
+    private static final Integer STORAGE_WAIT_TIMEOUT_SEC = 35;
+
+    private TieredStorageTestContext context;
+
+    protected int numMetadataPartition() {
+        return 5;
+    }
+
+    @SuppressWarnings("deprecation")
+    @Override
+    public Seq<KafkaConfig> generateConfigs() {
+        Properties overridingProps = getOverridingProps();
+        List<KafkaConfig> configs =
+                
JavaConverters.seqAsJavaList(TestUtils.createServerConfigs(brokerCount(), 
zkConnectOrNull()))
+                        .stream()
+                        .map(config -> KafkaConfig.fromProps(config, 
overridingProps))
+                        .collect(Collectors.toList());
+        return JavaConverters.asScalaBuffer(configs).toSeq();
+    }
+
+    public Properties getOverridingProps() {
+        Properties overridingProps = new Properties();
+        // Configure the tiered storage in Kafka. Set an interval of 1 second 
for the remote log manager background
+        // activity to ensure the tiered storage has enough room to be 
exercised within the lifetime of a test.
+        //
+        // The replication factor of the remote log metadata topic needs to be 
chosen so that in resiliency
+        // tests, metadata can survive the loss of one replica for its 
topic-partitions.
+        //
+        // The second-tier storage system is mocked via the LocalTieredStorage 
instance which persists transferred
+        // data files on the local file system.
+        overridingProps.setProperty(REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, 
"true");
+        overridingProps.setProperty(REMOTE_STORAGE_MANAGER_CLASS_NAME_PROP, 
LocalTieredStorage.class.getName());
+        
overridingProps.setProperty(REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP,
+                TopicBasedRemoteLogMetadataManager.class.getName());
+        overridingProps.setProperty(REMOTE_LOG_MANAGER_TASK_INTERVAL_MS_PROP, 
"1000");
+
+        overridingProps.setProperty(REMOTE_STORAGE_MANAGER_CONFIG_PREFIX_PROP, 
storageConfigPrefix(""));
+        
overridingProps.setProperty(REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX_PROP, 
metadataConfigPrefix(""));
+
+        overridingProps.setProperty(
+                
metadataConfigPrefix(TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_PARTITIONS_PROP),
+                String.valueOf(numMetadataPartition()));
+        overridingProps.setProperty(
+                
metadataConfigPrefix(TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_REPLICATION_FACTOR_PROP),
+                String.valueOf(brokerCount()));
+        // This configuration ensures inactive log segments are deleted fast 
enough so that
+        // the integration tests can confirm a given log segment is present 
only in the second-tier storage.
+        // Note that this does not impact the eligibility of a log segment to 
be offloaded to the
+        // second-tier storage.
+        overridingProps.setProperty(KafkaConfig.LogCleanupIntervalMsProp(), 
"1000");
+        // This can be customized to read remote log segments from followers.
+        readReplicaSelectorClass()
+                .ifPresent(c -> 
overridingProps.put(KafkaConfig.ReplicaSelectorClassProp(), c.getName()));
+        // The directory of the second-tier storage needs to be constant 
across all instances of storage managers
+        // in every broker and throughout the test. Indeed, as brokers are 
restarted during the test.
+        // You can override this property with a fixed path of your choice if 
you wish to use a non-temporary
+        // directory to access its content after a test terminated.
+        overridingProps.setProperty(storageConfigPrefix(STORAGE_DIR_CONFIG), 
TestUtils.tempDir().getAbsolutePath());
+        
overridingProps.setProperty(KafkaConfig.UncleanLeaderElectionEnableProp(), 
"true");
+        // This configuration will remove all the remote files when close is 
called in remote storage manager.
+        // Storage manager close is being called while the server is actively 
processing the socket requests,
+        // so enabling this config can break the existing tests.
+        // NOTE: When using TestUtils#tempDir(), the folder gets deleted when 
VM terminates.
+        
overridingProps.setProperty(storageConfigPrefix(DELETE_ON_CLOSE_CONFIG), 
"false");
+        return overridingProps;
+    }
+
+    protected Optional<Class<ReplicaSelector>> readReplicaSelectorClass() {
+        return Optional.empty();
+    }
+
+    protected abstract void writeTestSpecifications(TieredStorageTestBuilder 
builder);
+
+    @BeforeEach
+    @Override
+    public void setUp(TestInfo testInfo) {
+        super.setUp(testInfo);
+        context = new TieredStorageTestContext(brokers(), producerConfig(), 
consumerConfig(), listenerName());
+    }
+
+    @Test
+    public void executeTieredStorageTest() {

Review Comment:
   Please add a tag @Tag("integration") which tells JUnit5 that any class 
extending this class will be run as part of ./gradlew integrationTest and not 
with ./gradlew unitTest



##########
storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestHarness.java:
##########
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tiered.storage;
+
+import org.apache.kafka.tiered.storage.utils.BrokerLocalStorage;
+import kafka.api.IntegrationTestHarness;
+import kafka.log.remote.RemoteLogManager;
+import kafka.server.KafkaBroker;
+import kafka.server.KafkaConfig;
+import kafka.utils.TestUtils;
+import org.apache.kafka.common.replica.ReplicaSelector;
+import 
org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManager;
+import 
org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig;
+import 
org.apache.kafka.server.log.remote.storage.ClassLoaderAwareRemoteStorageManager;
+import org.apache.kafka.server.log.remote.storage.LocalTieredStorage;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageManager;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+import scala.collection.Seq;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.stream.Collectors;
+
+import scala.collection.JavaConverters;
+
+import static 
org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.REMOTE_STORAGE_MANAGER_CLASS_NAME_PROP;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.REMOTE_LOG_MANAGER_TASK_INTERVAL_MS_PROP;
+
+import static 
org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX_PROP;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.REMOTE_STORAGE_MANAGER_CONFIG_PREFIX_PROP;
+
+import static 
org.apache.kafka.server.log.remote.storage.LocalTieredStorage.DELETE_ON_CLOSE_CONFIG;
+import static 
org.apache.kafka.server.log.remote.storage.LocalTieredStorage.STORAGE_DIR_CONFIG;
+
+/**
+ * Base class for integration tests exercising the tiered storage 
functionality in Apache Kafka.
+ */
+public abstract class TieredStorageTestHarness extends IntegrationTestHarness {
+
+    /**
+     * InitialTaskDelayMs is set to 30 seconds for the delete-segment 
scheduler in Apache Kafka.
+     * Hence, we need to wait at least that amount of time before segments 
eligible for deletion
+     * gets physically removed.
+     */
+    private static final Integer STORAGE_WAIT_TIMEOUT_SEC = 35;
+
+    private TieredStorageTestContext context;
+
+    protected int numMetadataPartition() {
+        return 5;
+    }
+
+    @SuppressWarnings("deprecation")
+    @Override
+    public Seq<KafkaConfig> generateConfigs() {
+        Properties overridingProps = getOverridingProps();
+        List<KafkaConfig> configs =
+                
JavaConverters.seqAsJavaList(TestUtils.createServerConfigs(brokerCount(), 
zkConnectOrNull()))
+                        .stream()
+                        .map(config -> KafkaConfig.fromProps(config, 
overridingProps))
+                        .collect(Collectors.toList());
+        return JavaConverters.asScalaBuffer(configs).toSeq();
+    }
+
+    public Properties getOverridingProps() {
+        Properties overridingProps = new Properties();
+        // Configure the tiered storage in Kafka. Set an interval of 1 second 
for the remote log manager background
+        // activity to ensure the tiered storage has enough room to be 
exercised within the lifetime of a test.
+        //
+        // The replication factor of the remote log metadata topic needs to be 
chosen so that in resiliency
+        // tests, metadata can survive the loss of one replica for its 
topic-partitions.
+        //
+        // The second-tier storage system is mocked via the LocalTieredStorage 
instance which persists transferred
+        // data files on the local file system.
+        overridingProps.setProperty(REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, 
"true");
+        overridingProps.setProperty(REMOTE_STORAGE_MANAGER_CLASS_NAME_PROP, 
LocalTieredStorage.class.getName());
+        
overridingProps.setProperty(REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP,
+                TopicBasedRemoteLogMetadataManager.class.getName());
+        overridingProps.setProperty(REMOTE_LOG_MANAGER_TASK_INTERVAL_MS_PROP, 
"1000");
+
+        overridingProps.setProperty(REMOTE_STORAGE_MANAGER_CONFIG_PREFIX_PROP, 
storageConfigPrefix(""));
+        
overridingProps.setProperty(REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX_PROP, 
metadataConfigPrefix(""));
+
+        overridingProps.setProperty(
+                
metadataConfigPrefix(TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_PARTITIONS_PROP),
+                String.valueOf(numMetadataPartition()));
+        overridingProps.setProperty(
+                
metadataConfigPrefix(TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_REPLICATION_FACTOR_PROP),
+                String.valueOf(brokerCount()));
+        // This configuration ensures inactive log segments are deleted fast 
enough so that
+        // the integration tests can confirm a given log segment is present 
only in the second-tier storage.
+        // Note that this does not impact the eligibility of a log segment to 
be offloaded to the
+        // second-tier storage.
+        overridingProps.setProperty(KafkaConfig.LogCleanupIntervalMsProp(), 
"1000");
+        // This can be customized to read remote log segments from followers.
+        readReplicaSelectorClass()
+                .ifPresent(c -> 
overridingProps.put(KafkaConfig.ReplicaSelectorClassProp(), c.getName()));
+        // The directory of the second-tier storage needs to be constant 
across all instances of storage managers
+        // in every broker and throughout the test. Indeed, as brokers are 
restarted during the test.
+        // You can override this property with a fixed path of your choice if 
you wish to use a non-temporary
+        // directory to access its content after a test terminated.
+        overridingProps.setProperty(storageConfigPrefix(STORAGE_DIR_CONFIG), 
TestUtils.tempDir().getAbsolutePath());
+        
overridingProps.setProperty(KafkaConfig.UncleanLeaderElectionEnableProp(), 
"true");
+        // This configuration will remove all the remote files when close is 
called in remote storage manager.
+        // Storage manager close is being called while the server is actively 
processing the socket requests,
+        // so enabling this config can break the existing tests.
+        // NOTE: When using TestUtils#tempDir(), the folder gets deleted when 
VM terminates.
+        
overridingProps.setProperty(storageConfigPrefix(DELETE_ON_CLOSE_CONFIG), 
"false");
+        return overridingProps;
+    }
+
+    protected Optional<Class<ReplicaSelector>> readReplicaSelectorClass() {
+        return Optional.empty();
+    }
+
+    protected abstract void writeTestSpecifications(TieredStorageTestBuilder 
builder);
+
+    @BeforeEach
+    @Override
+    public void setUp(TestInfo testInfo) {
+        super.setUp(testInfo);
+        context = new TieredStorageTestContext(brokers(), producerConfig(), 
consumerConfig(), listenerName());
+    }
+
+    @Test
+    public void executeTieredStorageTest() {
+        TieredStorageTestBuilder builder = new TieredStorageTestBuilder();
+        writeTestSpecifications(builder);
+        try {
+            for (TieredStorageTestAction action : builder.complete()) {
+                action.execute(context);
+            }
+        } catch (Exception ex) {
+            throw new AssertionError("Could not build test specifications. No 
test was executed.", ex);
+        }
+    }
+
+    @AfterEach
+    @Override
+    public void tearDown() {
+        try {
+            context.close();
+            super.tearDown();
+            context.printReport(System.out);
+        } catch (Exception ex) {
+            throw new AssertionError("Failed to close the tear down the test 
harness.", ex);
+        }
+    }
+
+    private String storageConfigPrefix(String key) {
+        return LocalTieredStorage.STORAGE_CONFIG_PREFIX + key;
+    }
+
+    private String metadataConfigPrefix(String key) {
+        return "rlmm.config." + key;
+    }
+
+    @SuppressWarnings("deprecation")
+    public static List<LocalTieredStorage> getTieredStorages(Seq<KafkaBroker> 
brokers) {
+        List<LocalTieredStorage> storages = new ArrayList<>();
+        JavaConverters.seqAsJavaList(brokers).forEach(broker -> {
+            if (broker.remoteLogManagerOpt().isDefined()) {
+                RemoteLogManager remoteLogManager = 
broker.remoteLogManagerOpt().get();
+                RemoteStorageManager storageManager = 
remoteLogManager.storageManager();
+                if (storageManager instanceof 
ClassLoaderAwareRemoteStorageManager) {
+                    ClassLoaderAwareRemoteStorageManager loaderAwareRSM =
+                            (ClassLoaderAwareRemoteStorageManager) 
storageManager;
+                    if (loaderAwareRSM.delegate() instanceof 
LocalTieredStorage) {
+                        storages.add((LocalTieredStorage) 
loaderAwareRSM.delegate());
+                    }
+                }
+            }
+        });
+        return storages;
+    }
+
+    @SuppressWarnings("deprecation")

Review Comment:
   what is the deprecated thing that we are using here and why?
   
   (same question for rest of the places)



##########
storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestHarness.java:
##########
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tiered.storage;
+
+import org.apache.kafka.tiered.storage.utils.BrokerLocalStorage;
+import kafka.api.IntegrationTestHarness;
+import kafka.log.remote.RemoteLogManager;
+import kafka.server.KafkaBroker;
+import kafka.server.KafkaConfig;
+import kafka.utils.TestUtils;
+import org.apache.kafka.common.replica.ReplicaSelector;
+import 
org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManager;
+import 
org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig;
+import 
org.apache.kafka.server.log.remote.storage.ClassLoaderAwareRemoteStorageManager;
+import org.apache.kafka.server.log.remote.storage.LocalTieredStorage;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageManager;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+import scala.collection.Seq;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.stream.Collectors;
+
+import scala.collection.JavaConverters;
+
+import static 
org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.REMOTE_STORAGE_MANAGER_CLASS_NAME_PROP;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.REMOTE_LOG_MANAGER_TASK_INTERVAL_MS_PROP;
+
+import static 
org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX_PROP;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.REMOTE_STORAGE_MANAGER_CONFIG_PREFIX_PROP;
+
+import static 
org.apache.kafka.server.log.remote.storage.LocalTieredStorage.DELETE_ON_CLOSE_CONFIG;
+import static 
org.apache.kafka.server.log.remote.storage.LocalTieredStorage.STORAGE_DIR_CONFIG;
+
+/**
+ * Base class for integration tests exercising the tiered storage 
functionality in Apache Kafka.
+ */
+public abstract class TieredStorageTestHarness extends IntegrationTestHarness {
+
+    /**
+     * InitialTaskDelayMs is set to 30 seconds for the delete-segment 
scheduler in Apache Kafka.
+     * Hence, we need to wait at least that amount of time before segments 
eligible for deletion
+     * gets physically removed.
+     */
+    private static final Integer STORAGE_WAIT_TIMEOUT_SEC = 35;
+
+    private TieredStorageTestContext context;
+
+    protected int numMetadataPartition() {

Review Comment:
   this is a very confusing name since it could mean partitions for consumer 
metadata topic or kraft metadata topic. can you please rename this 
appropriately. Also, could this be moved to a member in this class instead 
(which can be accessed by subClasses).



##########
storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestHarness.java:
##########
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tiered.storage;
+
+import org.apache.kafka.tiered.storage.utils.BrokerLocalStorage;
+import kafka.api.IntegrationTestHarness;
+import kafka.log.remote.RemoteLogManager;
+import kafka.server.KafkaBroker;
+import kafka.server.KafkaConfig;
+import kafka.utils.TestUtils;
+import org.apache.kafka.common.replica.ReplicaSelector;
+import 
org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManager;
+import 
org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig;
+import 
org.apache.kafka.server.log.remote.storage.ClassLoaderAwareRemoteStorageManager;
+import org.apache.kafka.server.log.remote.storage.LocalTieredStorage;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageManager;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+import scala.collection.Seq;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.stream.Collectors;
+
+import scala.collection.JavaConverters;
+
+import static 
org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.REMOTE_STORAGE_MANAGER_CLASS_NAME_PROP;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.REMOTE_LOG_MANAGER_TASK_INTERVAL_MS_PROP;
+
+import static 
org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX_PROP;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.REMOTE_STORAGE_MANAGER_CONFIG_PREFIX_PROP;
+
+import static 
org.apache.kafka.server.log.remote.storage.LocalTieredStorage.DELETE_ON_CLOSE_CONFIG;
+import static 
org.apache.kafka.server.log.remote.storage.LocalTieredStorage.STORAGE_DIR_CONFIG;
+
+/**
+ * Base class for integration tests exercising the tiered storage 
functionality in Apache Kafka.
+ */
+public abstract class TieredStorageTestHarness extends IntegrationTestHarness {
+
+    /**
+     * InitialTaskDelayMs is set to 30 seconds for the delete-segment 
scheduler in Apache Kafka.
+     * Hence, we need to wait at least that amount of time before segments 
eligible for deletion
+     * gets physically removed.
+     */
+    private static final Integer STORAGE_WAIT_TIMEOUT_SEC = 35;
+
+    private TieredStorageTestContext context;
+
+    protected int numMetadataPartition() {
+        return 5;
+    }
+
+    @SuppressWarnings("deprecation")
+    @Override
+    public Seq<KafkaConfig> generateConfigs() {

Review Comment:
   I don't think we need to override the entire method. IntegrationTestHarness 
provides an alternative way to override configs using `modifyConfigs()` method. 
You can see how it could be used in other examples such as: 
https://github.com/apache/kafka/blob/7a2e11cae739f3391f61e2e29b148d3a3ebea8b3/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala#L46
   
   The advantage of using `modifyConfigs` instead of current method is that we 
don't end up losing additional configuration that 
`IntegrationTestHarness#generateConfigs` does such as automatically adding:
   ```
       if (isZkMigrationTest()) {
         cfgs.foreach(_.setProperty(KafkaConfig.MigrationEnabledProp, "true"))
       }
   ```
   
   



##########
storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestContext.java:
##########
@@ -0,0 +1,348 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tiered.storage;
+
+import org.apache.kafka.tiered.storage.specs.ExpandPartitionCountSpec;
+import org.apache.kafka.tiered.storage.specs.TopicSpec;
+import org.apache.kafka.tiered.storage.utils.BrokerLocalStorage;
+import kafka.log.UnifiedLog;
+import kafka.server.KafkaBroker;
+import kafka.utils.TestUtils;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AlterConfigOp;
+import org.apache.kafka.clients.admin.AlterConfigsOptions;
+import org.apache.kafka.clients.admin.ConfigEntry;
+import org.apache.kafka.clients.admin.NewPartitions;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.admin.TopicDescription;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.TopicPartitionInfo;
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.network.ListenerName;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.metadata.BrokerState;
+import org.apache.kafka.server.log.remote.storage.LocalTieredStorage;
+import org.apache.kafka.server.log.remote.storage.LocalTieredStorageHistory;
+import org.apache.kafka.server.log.remote.storage.LocalTieredStorageSnapshot;
+import scala.Function0;
+import scala.Function1;
+
+import java.io.IOException;
+import java.io.PrintStream;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+
+import scala.Option;
+import scala.collection.Seq;
+
+import static 
org.apache.kafka.clients.CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG;
+import static 
org.apache.kafka.clients.producer.ProducerConfig.LINGER_MS_CONFIG;
+
+public final class TieredStorageTestContext {
+
+    private final Seq<KafkaBroker> brokers;
+    private final Properties producerConfig;
+    private final Properties consumerConfig;
+    private final ListenerName listenerName;
+
+    private final Serializer<String> ser;
+    private final Deserializer<String> de;
+
+    private final Map<String, TopicSpec> topicSpecs;
+    private final TieredStorageTestReport testReport;
+
+    private volatile KafkaProducer<String, String> producer;
+    private volatile KafkaConsumer<String, String> consumer;
+    private volatile Admin admin;
+    private volatile List<LocalTieredStorage> tieredStorages;
+    private volatile List<BrokerLocalStorage> localStorages;
+
+    public TieredStorageTestContext(Seq<KafkaBroker> brokers,
+                                    Properties producerConfig,
+                                    Properties consumerConfig,
+                                    ListenerName listenerName) {
+        this.brokers = brokers;
+        this.producerConfig = producerConfig;
+        this.consumerConfig = consumerConfig;
+        this.listenerName = listenerName;
+        this.ser = Serdes.String().serializer();
+        this.de = Serdes.String().deserializer();
+        this.topicSpecs = new HashMap<>();
+        this.testReport = new TieredStorageTestReport(this);
+        initContext();
+    }
+
+    private void initContext() {
+        String bootstrapServers = TestUtils.bootstrapServers(brokers, 
listenerName);
+        producerConfig.put(BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+        // Set a producer linger of 60 seconds, in order to optimistically 
generate batches of
+        // records with a pre-determined size.
+        producerConfig.put(LINGER_MS_CONFIG, 
String.valueOf(TimeUnit.SECONDS.toMillis(60)));
+        producer = new KafkaProducer<>(producerConfig, ser, ser);
+
+        consumerConfig.put(BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+        consumer = new KafkaConsumer<>(consumerConfig, de, de);
+
+        Properties adminConfig = new Properties();
+        adminConfig.put(BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+        admin = Admin.create(adminConfig);
+
+        tieredStorages = TieredStorageTestHarness.getTieredStorages(brokers);
+        localStorages = TieredStorageTestHarness.getLocalStorages(brokers);
+    }
+
+    public void createTopic(TopicSpec spec) throws ExecutionException, 
InterruptedException {
+        NewTopic newTopic;
+        if (spec.getAssignment() == null || spec.getAssignment().isEmpty()) {
+            newTopic = new NewTopic(spec.getTopicName(), 
spec.getPartitionCount(), (short) spec.getReplicationFactor());
+        } else {
+            Map<Integer, List<Integer>> replicasAssignments = 
spec.getAssignment();
+            newTopic = new NewTopic(spec.getTopicName(), replicasAssignments);
+        }
+        newTopic.configs(spec.getProperties());
+        admin.createTopics(Collections.singletonList(newTopic)).all().get();

Review Comment:
   please use `KafkaServerTestHarness.createTopic()` which is available in 
`TieredStorageTestHarness` since it's a super class of `IntegrationTestHarness `



##########
storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestContext.java:
##########
@@ -0,0 +1,348 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tiered.storage;
+
+import org.apache.kafka.tiered.storage.specs.ExpandPartitionCountSpec;
+import org.apache.kafka.tiered.storage.specs.TopicSpec;
+import org.apache.kafka.tiered.storage.utils.BrokerLocalStorage;
+import kafka.log.UnifiedLog;
+import kafka.server.KafkaBroker;
+import kafka.utils.TestUtils;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AlterConfigOp;
+import org.apache.kafka.clients.admin.AlterConfigsOptions;
+import org.apache.kafka.clients.admin.ConfigEntry;
+import org.apache.kafka.clients.admin.NewPartitions;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.admin.TopicDescription;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.TopicPartitionInfo;
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.network.ListenerName;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.metadata.BrokerState;
+import org.apache.kafka.server.log.remote.storage.LocalTieredStorage;
+import org.apache.kafka.server.log.remote.storage.LocalTieredStorageHistory;
+import org.apache.kafka.server.log.remote.storage.LocalTieredStorageSnapshot;
+import scala.Function0;
+import scala.Function1;
+
+import java.io.IOException;
+import java.io.PrintStream;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+
+import scala.Option;
+import scala.collection.Seq;
+
+import static 
org.apache.kafka.clients.CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG;
+import static 
org.apache.kafka.clients.producer.ProducerConfig.LINGER_MS_CONFIG;
+
+public final class TieredStorageTestContext {

Review Comment:
   I think that a lot of the functionality in this class already exists in 
`IntegrationTestHarness` or it's parent classes. I left comments for few 
examples but I think we can simply re-use the existing methods and for the ones 
which we cannot re-use, we can fold them into Kafka `TieredStorageTestHarness`. 
We can re-factor this class out later if required from 
`TieredStorageTestHarness`.



##########
storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestBuilder.java:
##########
@@ -0,0 +1,427 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tiered.storage;
+
+import org.apache.kafka.tiered.storage.actions.BounceBrokerAction;
+import org.apache.kafka.tiered.storage.actions.ConsumeAction;
+import org.apache.kafka.tiered.storage.actions.CreatePartitionsAction;
+import org.apache.kafka.tiered.storage.actions.CreateTopicAction;
+import org.apache.kafka.tiered.storage.actions.DeleteRecordsAction;
+import org.apache.kafka.tiered.storage.actions.DeleteTopicAction;
+import org.apache.kafka.tiered.storage.actions.EraseBrokerStorageAction;
+import org.apache.kafka.tiered.storage.actions.ExpectBrokerInISRAction;
+import org.apache.kafka.tiered.storage.actions.ExpectEmptyRemoteStorageAction;
+import org.apache.kafka.tiered.storage.actions.ExpectLeaderAction;
+import 
org.apache.kafka.tiered.storage.actions.ExpectLeaderEpochCheckpointAction;
+import org.apache.kafka.tiered.storage.actions.ExpectListOffsetsAction;
+import 
org.apache.kafka.tiered.storage.actions.ExpectTopicIdToMatchInRemoteStorageAction;
+import 
org.apache.kafka.tiered.storage.actions.ExpectUserTopicMappedToMetadataPartitionsAction;
+import org.apache.kafka.tiered.storage.actions.ProduceAction;
+import org.apache.kafka.tiered.storage.actions.ReassignReplicaAction;
+import org.apache.kafka.tiered.storage.actions.ShrinkReplicaAction;
+import org.apache.kafka.tiered.storage.actions.StartBrokerAction;
+import org.apache.kafka.tiered.storage.actions.StopBrokerAction;
+import org.apache.kafka.tiered.storage.actions.UpdateBrokerConfigAction;
+import org.apache.kafka.tiered.storage.actions.UpdateTopicConfigAction;
+import org.apache.kafka.tiered.storage.specs.ConsumableSpec;
+import org.apache.kafka.tiered.storage.specs.DeletableSpec;
+import org.apache.kafka.tiered.storage.specs.ExpandPartitionCountSpec;
+import org.apache.kafka.tiered.storage.specs.FetchableSpec;
+import org.apache.kafka.tiered.storage.specs.KeyValueSpec;
+import org.apache.kafka.tiered.storage.specs.OffloadableSpec;
+import org.apache.kafka.tiered.storage.specs.OffloadedSegmentSpec;
+import org.apache.kafka.tiered.storage.specs.ProducableSpec;
+import org.apache.kafka.tiered.storage.specs.RemoteDeleteSegmentSpec;
+import org.apache.kafka.tiered.storage.specs.RemoteFetchSpec;
+import org.apache.kafka.tiered.storage.specs.TopicSpec;
+import org.apache.kafka.clients.admin.OffsetSpec;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.config.TopicConfig;
+import org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent;
+import org.apache.kafka.storage.internals.log.EpochEntry;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public final class TieredStorageTestBuilder {
+
+    private final int defaultProducedBatchSize = 1;
+    private final long defaultEarliestLocalOffsetExpectedInLogDirectory = 0;
+
+    private Map<TopicPartition, ProducableSpec> producables = new HashMap<>();
+    private Map<TopicPartition, List<OffloadableSpec>> offloadables = new 
HashMap<>();
+    private Map<TopicPartition, ConsumableSpec> consumables = new HashMap<>();
+    private Map<TopicPartition, FetchableSpec> fetchables = new HashMap<>();
+    private Map<TopicPartition, List<DeletableSpec>> deletables = new 
HashMap<>();
+    private List<TieredStorageTestAction> actions = new ArrayList<>();
+
+    public TieredStorageTestBuilder() {
+    }
+
+    public TieredStorageTestBuilder createTopic(String topic,
+                                                Integer partitionCount,
+                                                Integer replicationFactor,
+                                                Integer 
maxBatchCountPerSegment,
+                                                Map<Integer, List<Integer>> 
replicaAssignment,
+                                                Boolean 
enableRemoteLogStorage) {
+        assert maxBatchCountPerSegment >= 1 : "Segments size for topic " + 
topic + " needs to be >= 1";
+        assert partitionCount >= 1 : "Partition count for topic " + topic + " 
needs to be >= 1";
+        assert replicationFactor >= 1 : "Replication factor for topic " + 
topic + " needs to be >= 1";
+        maybeCreateProduceAction();
+        maybeCreateConsumeActions();
+        Map<String, String> properties = new HashMap<>();
+        properties.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, 
enableRemoteLogStorage.toString());
+        TopicSpec topicSpec = new TopicSpec(topic, partitionCount, 
replicationFactor, maxBatchCountPerSegment,
+                replicaAssignment, properties);
+        actions.add(new CreateTopicAction(topicSpec));
+        return this;
+    }
+
+    public TieredStorageTestBuilder createPartitions(String topic,
+                                                     Integer partitionCount,
+                                                     Map<Integer, 
List<Integer>> replicaAssignment) {
+        assert partitionCount >= 1 : "Partition count for topic " + topic + " 
needs to be >= 1";
+        maybeCreateProduceAction();
+        maybeCreateConsumeActions();
+        ExpandPartitionCountSpec spec = new ExpandPartitionCountSpec(topic, 
partitionCount, replicaAssignment);
+        actions.add(new CreatePartitionsAction(spec));
+        return this;
+    }
+
+    public TieredStorageTestBuilder updateTopicConfig(String topic,
+                                                      Map<String, String> 
configsToBeAdded,
+                                                      List<String> 
configsToBeDeleted) {
+        assert !configsToBeAdded.isEmpty() || !configsToBeDeleted.isEmpty()
+                : "Topic " + topic + " configs shouldn't be empty";
+        maybeCreateProduceAction();
+        maybeCreateConsumeActions();
+        actions.add(new UpdateTopicConfigAction(topic, configsToBeAdded, 
configsToBeDeleted));
+        return this;
+    }
+
+    public TieredStorageTestBuilder updateBrokerConfig(Integer brokerId,
+                                                       Map<String, String> 
configsToBeAdded,
+                                                       List<String> 
configsToBeDeleted) {
+        assert !configsToBeAdded.isEmpty() || !configsToBeDeleted.isEmpty()
+                : "Broker " + brokerId + " configs shouldn't be empty";
+        maybeCreateProduceAction();
+        maybeCreateConsumeActions();
+        actions.add(new UpdateBrokerConfigAction(brokerId, configsToBeAdded, 
configsToBeDeleted));
+        return this;
+    }
+
+    public TieredStorageTestBuilder deleteTopic(List<String> topics) {
+        maybeCreateProduceAction();
+        maybeCreateConsumeActions();
+        topics.forEach(topic -> actions.add(buildDeleteTopicAction(topic, 
true)));
+        return this;
+    }
+
+    public TieredStorageTestBuilder produce(String topic,

Review Comment:
   when I call this method, it doesn't add produce actions but it does add 
consume actions for any pending consumables. Why? I am not sure I understand 
why aren't we adding produce actions at the end of this method AND why add 
consume actions added by some previous calls when invoking this call? Why do we 
have the concept of producibles or consumables?



##########
storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestBuilder.java:
##########
@@ -0,0 +1,427 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tiered.storage;
+
+import org.apache.kafka.tiered.storage.actions.BounceBrokerAction;
+import org.apache.kafka.tiered.storage.actions.ConsumeAction;
+import org.apache.kafka.tiered.storage.actions.CreatePartitionsAction;
+import org.apache.kafka.tiered.storage.actions.CreateTopicAction;
+import org.apache.kafka.tiered.storage.actions.DeleteRecordsAction;
+import org.apache.kafka.tiered.storage.actions.DeleteTopicAction;
+import org.apache.kafka.tiered.storage.actions.EraseBrokerStorageAction;
+import org.apache.kafka.tiered.storage.actions.ExpectBrokerInISRAction;
+import org.apache.kafka.tiered.storage.actions.ExpectEmptyRemoteStorageAction;
+import org.apache.kafka.tiered.storage.actions.ExpectLeaderAction;
+import 
org.apache.kafka.tiered.storage.actions.ExpectLeaderEpochCheckpointAction;
+import org.apache.kafka.tiered.storage.actions.ExpectListOffsetsAction;
+import 
org.apache.kafka.tiered.storage.actions.ExpectTopicIdToMatchInRemoteStorageAction;
+import 
org.apache.kafka.tiered.storage.actions.ExpectUserTopicMappedToMetadataPartitionsAction;
+import org.apache.kafka.tiered.storage.actions.ProduceAction;
+import org.apache.kafka.tiered.storage.actions.ReassignReplicaAction;
+import org.apache.kafka.tiered.storage.actions.ShrinkReplicaAction;
+import org.apache.kafka.tiered.storage.actions.StartBrokerAction;
+import org.apache.kafka.tiered.storage.actions.StopBrokerAction;
+import org.apache.kafka.tiered.storage.actions.UpdateBrokerConfigAction;
+import org.apache.kafka.tiered.storage.actions.UpdateTopicConfigAction;
+import org.apache.kafka.tiered.storage.specs.ConsumableSpec;
+import org.apache.kafka.tiered.storage.specs.DeletableSpec;
+import org.apache.kafka.tiered.storage.specs.ExpandPartitionCountSpec;
+import org.apache.kafka.tiered.storage.specs.FetchableSpec;
+import org.apache.kafka.tiered.storage.specs.KeyValueSpec;
+import org.apache.kafka.tiered.storage.specs.OffloadableSpec;
+import org.apache.kafka.tiered.storage.specs.OffloadedSegmentSpec;
+import org.apache.kafka.tiered.storage.specs.ProducableSpec;
+import org.apache.kafka.tiered.storage.specs.RemoteDeleteSegmentSpec;
+import org.apache.kafka.tiered.storage.specs.RemoteFetchSpec;
+import org.apache.kafka.tiered.storage.specs.TopicSpec;
+import org.apache.kafka.clients.admin.OffsetSpec;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.config.TopicConfig;
+import org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent;
+import org.apache.kafka.storage.internals.log.EpochEntry;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public final class TieredStorageTestBuilder {
+
+    private final int defaultProducedBatchSize = 1;
+    private final long defaultEarliestLocalOffsetExpectedInLogDirectory = 0;
+
+    private Map<TopicPartition, ProducableSpec> producables = new HashMap<>();
+    private Map<TopicPartition, List<OffloadableSpec>> offloadables = new 
HashMap<>();
+    private Map<TopicPartition, ConsumableSpec> consumables = new HashMap<>();
+    private Map<TopicPartition, FetchableSpec> fetchables = new HashMap<>();
+    private Map<TopicPartition, List<DeletableSpec>> deletables = new 
HashMap<>();
+    private List<TieredStorageTestAction> actions = new ArrayList<>();
+
+    public TieredStorageTestBuilder() {
+    }
+
+    public TieredStorageTestBuilder createTopic(String topic,
+                                                Integer partitionCount,
+                                                Integer replicationFactor,
+                                                Integer 
maxBatchCountPerSegment,
+                                                Map<Integer, List<Integer>> 
replicaAssignment,
+                                                Boolean 
enableRemoteLogStorage) {
+        assert maxBatchCountPerSegment >= 1 : "Segments size for topic " + 
topic + " needs to be >= 1";
+        assert partitionCount >= 1 : "Partition count for topic " + topic + " 
needs to be >= 1";
+        assert replicationFactor >= 1 : "Replication factor for topic " + 
topic + " needs to be >= 1";
+        maybeCreateProduceAction();
+        maybeCreateConsumeActions();
+        Map<String, String> properties = new HashMap<>();
+        properties.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, 
enableRemoteLogStorage.toString());
+        TopicSpec topicSpec = new TopicSpec(topic, partitionCount, 
replicationFactor, maxBatchCountPerSegment,
+                replicaAssignment, properties);
+        actions.add(new CreateTopicAction(topicSpec));
+        return this;
+    }
+
+    public TieredStorageTestBuilder createPartitions(String topic,
+                                                     Integer partitionCount,
+                                                     Map<Integer, 
List<Integer>> replicaAssignment) {
+        assert partitionCount >= 1 : "Partition count for topic " + topic + " 
needs to be >= 1";
+        maybeCreateProduceAction();
+        maybeCreateConsumeActions();
+        ExpandPartitionCountSpec spec = new ExpandPartitionCountSpec(topic, 
partitionCount, replicaAssignment);
+        actions.add(new CreatePartitionsAction(spec));
+        return this;
+    }
+
+    public TieredStorageTestBuilder updateTopicConfig(String topic,
+                                                      Map<String, String> 
configsToBeAdded,
+                                                      List<String> 
configsToBeDeleted) {
+        assert !configsToBeAdded.isEmpty() || !configsToBeDeleted.isEmpty()
+                : "Topic " + topic + " configs shouldn't be empty";
+        maybeCreateProduceAction();
+        maybeCreateConsumeActions();
+        actions.add(new UpdateTopicConfigAction(topic, configsToBeAdded, 
configsToBeDeleted));
+        return this;
+    }
+
+    public TieredStorageTestBuilder updateBrokerConfig(Integer brokerId,
+                                                       Map<String, String> 
configsToBeAdded,
+                                                       List<String> 
configsToBeDeleted) {
+        assert !configsToBeAdded.isEmpty() || !configsToBeDeleted.isEmpty()
+                : "Broker " + brokerId + " configs shouldn't be empty";
+        maybeCreateProduceAction();
+        maybeCreateConsumeActions();
+        actions.add(new UpdateBrokerConfigAction(brokerId, configsToBeAdded, 
configsToBeDeleted));
+        return this;
+    }
+
+    public TieredStorageTestBuilder deleteTopic(List<String> topics) {
+        maybeCreateProduceAction();
+        maybeCreateConsumeActions();
+        topics.forEach(topic -> actions.add(buildDeleteTopicAction(topic, 
true)));
+        return this;
+    }
+
+    public TieredStorageTestBuilder produce(String topic,
+                                            Integer partition,
+                                            KeyValueSpec... keyValues) {
+        assert partition >= 0 : "Partition must be >= 0";
+        maybeCreateConsumeActions();
+        ProducableSpec spec = getOrCreateProducable(topic, partition);
+        for (KeyValueSpec kv : keyValues) {
+            spec.getRecords().add(new ProducerRecord<>(topic, partition, 
kv.getKey(), kv.getValue()));
+        }
+        return this;
+    }
+
+    public TieredStorageTestBuilder produceWithTimestamp(String topic,
+                                                         Integer partition,
+                                                         KeyValueSpec... 
keyValues) {
+        assert partition >= 0 : "Partition must be >= 0";
+        maybeCreateConsumeActions();
+        ProducableSpec spec = getOrCreateProducable(topic, partition);
+        for (KeyValueSpec kv : keyValues) {
+            spec.getRecords()
+                    .add(new ProducerRecord<>(topic, partition, 
kv.getTimestamp(), kv.getKey(), kv.getValue()));
+        }
+        return this;
+    }
+
+    public TieredStorageTestBuilder withBatchSize(String topic,
+                                                  Integer partition,
+                                                  Integer batchSize) {
+        assert batchSize >= 1 : "The size of a batch of produced records must 
>= 1";
+        getOrCreateProducable(topic, partition).setBatchSize(batchSize);
+        return this;
+    }
+
+    public TieredStorageTestBuilder 
expectEarliestLocalOffsetInLogDirectory(String topic,
+                                                                            
Integer partition,
+                                                                            
Long earliestLocalOffset) {
+        assert earliestLocalOffset >= 0 : "Record offset must be >= 0";
+        getOrCreateProducable(topic, 
partition).setEarliestLocalLogOffset(earliestLocalOffset);
+        return this;
+    }
+
+    public TieredStorageTestBuilder expectSegmentToBeOffloaded(Integer 
fromBroker,
+                                                               String topic,
+                                                               Integer 
partition,
+                                                               Integer 
baseOffset,
+                                                               KeyValueSpec... 
keyValues) {
+        TopicPartition topicPartition = new TopicPartition(topic, partition);
+        List<ProducerRecord<String, String>> records = new ArrayList<>();
+        for (KeyValueSpec kv: keyValues) {
+            records.add(new ProducerRecord<>(topic, partition, kv.getKey(), 
kv.getValue()));
+        }
+        offloadables.computeIfAbsent(topicPartition, k -> new ArrayList<>())
+                .add(new OffloadableSpec(fromBroker, baseOffset, records));
+        return this;
+    }
+
+    public TieredStorageTestBuilder expectTopicIdToMatchInRemoteStorage(String 
topic) {
+        maybeCreateProduceAction();
+        maybeCreateConsumeActions();
+        actions.add(new ExpectTopicIdToMatchInRemoteStorageAction(topic));
+        return this;
+    }
+
+    public TieredStorageTestBuilder maybeEnqueueActions() {
+        maybeCreateProduceAction();
+        maybeCreateConsumeActions();
+        return this;
+    }
+
+    public TieredStorageTestBuilder consume(String topic,
+                                            Integer partition,
+                                            Long fetchOffset,
+                                            Integer expectedTotalRecord,
+                                            Integer 
expectedRecordsFromSecondTier) {
+        TopicPartition topicPartition = new TopicPartition(topic, partition);
+        assert partition >= 0 : "Partition must be >= 0";
+        assert fetchOffset >= 0 : "Fetch offset must be >=0";
+        assert expectedTotalRecord >= 1 : "Must read at least one record";
+        assert expectedRecordsFromSecondTier >= 0 : "Expected read cannot be < 
0";
+        assert expectedRecordsFromSecondTier <= expectedTotalRecord : "Cannot 
fetch more records than consumed";
+        assert !consumables.containsKey(topicPartition) : "Consume already in 
progress for " + topicPartition;
+        maybeCreateProduceAction();
+        consumables.put(
+                topicPartition, new ConsumableSpec(fetchOffset, 
expectedTotalRecord, expectedRecordsFromSecondTier));
+        return this;
+    }
+
+    public TieredStorageTestBuilder expectLeader(String topic,
+                                                 Integer partition,
+                                                 Integer brokerId,
+                                                 Boolean electLeader) {
+        actions.add(new ExpectLeaderAction(new TopicPartition(topic, 
partition), brokerId, electLeader));
+        return this;
+    }
+
+    public TieredStorageTestBuilder expectInIsr(String topic,
+                                                Integer partition,
+                                                Integer brokerId) {
+        actions.add(new ExpectBrokerInISRAction(new TopicPartition(topic, 
partition), brokerId));
+        return this;
+    }
+
+    public TieredStorageTestBuilder expectFetchFromTieredStorage(Integer 
fromBroker,
+                                                                 String topic,
+                                                                 Integer 
partition,
+                                                                 Integer 
remoteFetchRequestCount) {
+        TopicPartition topicPartition = new TopicPartition(topic, partition);
+        assert partition >= 0 : "Partition must be >= 0";
+        assert remoteFetchRequestCount >= 0 : "Expected fetch count from 
tiered storage must be >= 0";
+        assert !fetchables.containsKey(topicPartition) : "Consume already in 
progress for " + topicPartition;
+        fetchables.put(topicPartition, new FetchableSpec(fromBroker, 
remoteFetchRequestCount));
+        return this;
+    }
+
+    public TieredStorageTestBuilder expectDeletionInRemoteStorage(Integer 
fromBroker,
+                                                                  String topic,
+                                                                  Integer 
partition,
+                                                                  
LocalTieredStorageEvent.EventType eventType,
+                                                                  Integer 
eventCount) {
+        TopicPartition topicPartition = new TopicPartition(topic, partition);
+        deletables.computeIfAbsent(topicPartition, k -> new ArrayList<>())
+                .add(new DeletableSpec(fromBroker, eventType, eventCount));
+        return this;
+    }
+
+    public TieredStorageTestBuilder waitForRemoteLogSegmentDeletion(String 
topic) {
+        maybeCreateProduceAction();
+        maybeCreateConsumeActions();
+        actions.add(buildDeleteTopicAction(topic, false));
+        return this;
+    }
+
+    public TieredStorageTestBuilder expectLeaderEpochCheckpoint(Integer 
brokerId,
+                                                                String topic,
+                                                                Integer 
partition,
+                                                                Integer 
beginEpoch,
+                                                                Long 
startOffset) {
+        TopicPartition topicPartition = new TopicPartition(topic, partition);
+        actions.add(new ExpectLeaderEpochCheckpointAction(brokerId, 
topicPartition, beginEpoch, startOffset));
+        return this;
+    }
+
+    public TieredStorageTestBuilder expectListOffsets(String topic,
+                                                      Integer partition,
+                                                      OffsetSpec offsetSpec,
+                                                      EpochEntry epochEntry) {
+        maybeCreateProduceAction();
+        maybeCreateConsumeActions();
+        TopicPartition topicPartition = new TopicPartition(topic, partition);
+        actions.add(new ExpectListOffsetsAction(topicPartition, offsetSpec, 
epochEntry));
+        return this;
+    }
+
+    public TieredStorageTestBuilder bounce(Integer brokerId) {
+        maybeCreateProduceAction();
+        maybeCreateConsumeActions();
+        actions.add(new BounceBrokerAction(brokerId));
+        return this;
+    }
+
+    public TieredStorageTestBuilder stop(Integer brokerId) {
+        maybeCreateProduceAction();
+        maybeCreateConsumeActions();
+        actions.add(new StopBrokerAction(brokerId));
+        return this;
+    }
+
+    public TieredStorageTestBuilder start(Integer brokerId) {
+        maybeCreateProduceAction();
+        maybeCreateConsumeActions();
+        actions.add(new StartBrokerAction(brokerId));
+        return this;
+    }
+
+    public TieredStorageTestBuilder eraseBrokerStorage(Integer brokerId) {
+        actions.add(new EraseBrokerStorageAction(brokerId));
+        return this;
+    }
+
+    public TieredStorageTestBuilder expectEmptyRemoteStorage(String topic,
+                                                             Integer 
partition) {
+        TopicPartition topicPartition = new TopicPartition(topic, partition);
+        actions.add(new ExpectEmptyRemoteStorageAction(topicPartition));
+        return this;
+    }
+
+    public TieredStorageTestBuilder shrinkReplica(String topic,
+                                                  Integer partition,
+                                                  List<Integer> replicaIds) {
+        maybeCreateProduceAction();
+        maybeCreateConsumeActions();
+        TopicPartition topicPartition = new TopicPartition(topic, partition);
+        actions.add(new ShrinkReplicaAction(topicPartition, replicaIds));
+        return this;
+    }
+
+    public TieredStorageTestBuilder reassignReplica(String topic,
+                                                    Integer partition,
+                                                    List<Integer> replicaIds) {
+        maybeCreateProduceAction();
+        maybeCreateConsumeActions();
+        TopicPartition topicPartition = new TopicPartition(topic, partition);
+        actions.add(new ReassignReplicaAction(topicPartition, replicaIds));
+        return this;
+    }
+
+    public TieredStorageTestBuilder 
expectUserTopicMappedToMetadataPartitions(String topic,
+                                                                              
List<Integer> metadataPartitions) {
+        actions.add(new ExpectUserTopicMappedToMetadataPartitionsAction(topic, 
metadataPartitions));
+        return this;
+    }
+
+    public TieredStorageTestBuilder deleteRecords(String topic,

Review Comment:
   suggestion - (for this and other such methods)
   
   Change to take an optional argument of expected exception.
   `def deleteRecords(topic: String, partition: Int, beforeOffset: Int, 
expectedException: Option[Class[_]] = None)`
   
   and deleteRecordsAction should do:
   ```
   try {
         context.admin().deleteRecords(recordsToDelete).all().get
         expectedException.map(exp => fail("Expected exception: " + exp))
       } catch {
         case e: Throwable => {
           if(expectedException.isDefined) {
             assertEquals(expectedException.get, e.getCause.getClass, 
"Unexpected exception cause " + e.getCause)
           }
         }
   ```



##########
storage/src/test/java/org/apache/kafka/tiered/storage/integration/OffloadAndConsumeFromLeaderTest.java:
##########
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tiered.storage.integration;
+
+import org.apache.kafka.tiered.storage.TieredStorageTestBuilder;
+import org.apache.kafka.tiered.storage.TieredStorageTestHarness;
+import org.apache.kafka.tiered.storage.specs.KeyValueSpec;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Test Cases (A):
+ *    Elementary offloads and fetches from tiered storage.
+ */
+public final class OffloadAndConsumeFromLeaderTest extends 
TieredStorageTestHarness {
+
+    /**
+     * Cluster of one broker
+     * @return number of brokers in the cluster
+     */
+    @Override
+    public int brokerCount() {
+        return 1;
+    }
+
+    @Override
+    protected void writeTestSpecifications(TieredStorageTestBuilder builder) {
+        final Integer broker = 0;
+        final String topicA = "topicA";
+        final String topicB = "topicB";
+        final Integer p0 = 0;
+        final Integer partitionCount = 1;
+        final Integer replicationFactor = 1;
+        final Integer maxBatchCountPerSegment = 1;
+        final Map<Integer, List<Integer>> replicaAssignment = null;
+        final boolean enableRemoteLogStorage = true;
+        final Integer batchSize = 1;
+
+        builder
+                /*
+                 * (A.1) Create a topic which segments contain only one batch 
and produce three records
+                 *       with a batch size of 1.
+                 *
+                 *       The topic and broker are configured so that the two 
rolled segments are picked from
+                 *       the offloaded to the tiered storage and not present 
in the first-tier broker storage.
+                 *
+                 *       Acceptance:
+                 *       -----------
+                 *       State of the storages after production of the records 
and propagation of the log
+                 *       segment lifecycles to peer subsystems (log cleaner, 
remote log manager).
+                 *
+                 *         - First-tier storage -            - Second-tier 
storage -
+                 *           Log tA-p0                         Log tA-p0
+                 *          *-------------------*             
*-------------------*
+                 *          | base offset = 2   |             |  base offset = 
0  |
+                 *          | (k3, v3)          |             |  (k1, v1)      
   |
+                 *          *-------------------*             
*-------------------*
+                 *                                            
*-------------------*
+                 *                                            |  base offset = 
1  |
+                 *                                            |  (k2, v2)      
   |
+                 *                                            
*-------------------*
+                 */
+                .createTopic(topicA, partitionCount, replicationFactor, 
maxBatchCountPerSegment, replicaAssignment,
+                        enableRemoteLogStorage)
+                .produce(topicA, p0, new KeyValueSpec("k1", "v1"), new 
KeyValueSpec("k2", "v2"),
+                        new KeyValueSpec("k3", "v3"))
+                .withBatchSize(topicA, p0, batchSize)
+                .expectSegmentToBeOffloaded(broker, topicA, p0, 0, new 
KeyValueSpec("k1", "v1"))
+                .expectSegmentToBeOffloaded(broker, topicA, p0, 0, new 
KeyValueSpec("k2", "v2"))
+
+                /*
+                 * (A.2) Similar scenario as above, but with segments of two 
records.
+                 *
+                 *       Acceptance:
+                 *       -----------
+                 *       State of the storages after production of the records 
and propagation of the log
+                 *       segment lifecycles to peer subsystems (log cleaner, 
remote log manager).
+                 *
+                 *         - First-tier storage -            - Second-tier 
storage -
+                 *           Log tB-p0                         Log tB-p0
+                 *          *-------------------*             
*-------------------*
+                 *          | base offset = 4   |             |  base offset = 
0  |
+                 *          | (k5, v5)          |             |  (k1, v1)      
   |
+                 *          *-------------------*             |  (k2, v2)      
   |
+                 *                                            
*-------------------*
+                 *                                            
*-------------------*
+                 *                                            |  base offset = 
2  |
+                 *                                            |  (k3, v3)      
   |
+                 *                                            |  (k4, v4)      
   |
+                 *                                            
*-------------------*
+                 */
+                .createTopic(topicB, partitionCount, replicationFactor, 2, 
replicaAssignment,
+                        enableRemoteLogStorage)
+                .produce(topicB, p0, new KeyValueSpec("k1", "v1"), new 
KeyValueSpec("k2", "v2"),
+                        new KeyValueSpec("k3", "v3"), new KeyValueSpec("k4", 
"v4"), new KeyValueSpec("k5", "v5"))
+                .withBatchSize(topicB, p0, batchSize)
+                .expectEarliestLocalOffsetInLogDirectory(topicB, p0, 4L)
+                .expectSegmentToBeOffloaded(broker, topicB, p0, 0,
+                        new KeyValueSpec("k1", "v1"), new KeyValueSpec("k2", 
"v2"))
+                .expectSegmentToBeOffloaded(broker, topicB, p0, 2,
+                        new KeyValueSpec("k3", "v3"), new KeyValueSpec("k4", 
"v4"))
+
+                /*
+                 * (A.3) Stops and restarts the broker. The purpose of this 
test is to a) exercise consumption
+                 *       from a given offset and b) verify that upon broker 
start, existing remote log segments
+                 *       metadata are loaded by Kafka and these log segments 
available.
+                 *
+                 *       Acceptance:
+                 *       -----------
+                 *       - For topic A, this offset is defined such that only 
the second segment is fetched from
+                 *         the tiered storage.
+                 *       - For topic B, only one segment is present in the 
tiered storage, as asserted by the
+                 *         previous sub-test-case.
+                 */
+                //.bounce(broker)

Review Comment:
   Is this intentional?
   
   I assume we will need it to replicate the "Stops and restarts the broker" 
scenario.



##########
storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestContext.java:
##########
@@ -0,0 +1,348 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tiered.storage;
+
+import org.apache.kafka.tiered.storage.specs.ExpandPartitionCountSpec;
+import org.apache.kafka.tiered.storage.specs.TopicSpec;
+import org.apache.kafka.tiered.storage.utils.BrokerLocalStorage;
+import kafka.log.UnifiedLog;
+import kafka.server.KafkaBroker;
+import kafka.utils.TestUtils;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AlterConfigOp;
+import org.apache.kafka.clients.admin.AlterConfigsOptions;
+import org.apache.kafka.clients.admin.ConfigEntry;
+import org.apache.kafka.clients.admin.NewPartitions;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.admin.TopicDescription;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.TopicPartitionInfo;
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.network.ListenerName;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.metadata.BrokerState;
+import org.apache.kafka.server.log.remote.storage.LocalTieredStorage;
+import org.apache.kafka.server.log.remote.storage.LocalTieredStorageHistory;
+import org.apache.kafka.server.log.remote.storage.LocalTieredStorageSnapshot;
+import scala.Function0;
+import scala.Function1;
+
+import java.io.IOException;
+import java.io.PrintStream;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+
+import scala.Option;
+import scala.collection.Seq;
+
+import static 
org.apache.kafka.clients.CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG;
+import static 
org.apache.kafka.clients.producer.ProducerConfig.LINGER_MS_CONFIG;
+
+public final class TieredStorageTestContext {
+
+    private final Seq<KafkaBroker> brokers;
+    private final Properties producerConfig;
+    private final Properties consumerConfig;
+    private final ListenerName listenerName;
+
+    private final Serializer<String> ser;
+    private final Deserializer<String> de;
+
+    private final Map<String, TopicSpec> topicSpecs;
+    private final TieredStorageTestReport testReport;
+
+    private volatile KafkaProducer<String, String> producer;
+    private volatile KafkaConsumer<String, String> consumer;
+    private volatile Admin admin;
+    private volatile List<LocalTieredStorage> tieredStorages;
+    private volatile List<BrokerLocalStorage> localStorages;
+
+    public TieredStorageTestContext(Seq<KafkaBroker> brokers,
+                                    Properties producerConfig,
+                                    Properties consumerConfig,
+                                    ListenerName listenerName) {
+        this.brokers = brokers;
+        this.producerConfig = producerConfig;
+        this.consumerConfig = consumerConfig;
+        this.listenerName = listenerName;
+        this.ser = Serdes.String().serializer();
+        this.de = Serdes.String().deserializer();
+        this.topicSpecs = new HashMap<>();
+        this.testReport = new TieredStorageTestReport(this);
+        initContext();
+    }
+
+    private void initContext() {
+        String bootstrapServers = TestUtils.bootstrapServers(brokers, 
listenerName);
+        producerConfig.put(BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+        // Set a producer linger of 60 seconds, in order to optimistically 
generate batches of
+        // records with a pre-determined size.
+        producerConfig.put(LINGER_MS_CONFIG, 
String.valueOf(TimeUnit.SECONDS.toMillis(60)));
+        producer = new KafkaProducer<>(producerConfig, ser, ser);

Review Comment:
   please use `IntegrationTestHarness.createProducer()` method to initialize 
the producer in `TieredStorageTestHarness`



##########
core/src/test/scala/unit/kafka/utils/TestUtils.scala:
##########
@@ -216,6 +216,14 @@ object TestUtils extends Logging {
     (new Broker(id, host, port, 
ListenerName.forSecurityProtocol(securityProtocol), securityProtocol), epoch)
   }
 
+  /**
+   * Create a test config for the provided parameters.
+   */
+  def createServerConfigs(numConfigs: Int,

Review Comment:
   why do we need to add this? Can't we directly use `createBrokerConfigs`?



##########
storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestHarness.java:
##########
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tiered.storage;
+
+import org.apache.kafka.tiered.storage.utils.BrokerLocalStorage;
+import kafka.api.IntegrationTestHarness;
+import kafka.log.remote.RemoteLogManager;
+import kafka.server.KafkaBroker;
+import kafka.server.KafkaConfig;
+import kafka.utils.TestUtils;
+import org.apache.kafka.common.replica.ReplicaSelector;
+import 
org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManager;
+import 
org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig;
+import 
org.apache.kafka.server.log.remote.storage.ClassLoaderAwareRemoteStorageManager;
+import org.apache.kafka.server.log.remote.storage.LocalTieredStorage;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageManager;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+import scala.collection.Seq;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.stream.Collectors;
+
+import scala.collection.JavaConverters;
+
+import static 
org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.REMOTE_STORAGE_MANAGER_CLASS_NAME_PROP;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.REMOTE_LOG_MANAGER_TASK_INTERVAL_MS_PROP;
+
+import static 
org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX_PROP;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.REMOTE_STORAGE_MANAGER_CONFIG_PREFIX_PROP;
+
+import static 
org.apache.kafka.server.log.remote.storage.LocalTieredStorage.DELETE_ON_CLOSE_CONFIG;
+import static 
org.apache.kafka.server.log.remote.storage.LocalTieredStorage.STORAGE_DIR_CONFIG;
+
+/**
+ * Base class for integration tests exercising the tiered storage 
functionality in Apache Kafka.
+ */
+public abstract class TieredStorageTestHarness extends IntegrationTestHarness {
+
+    /**
+     * InitialTaskDelayMs is set to 30 seconds for the delete-segment 
scheduler in Apache Kafka.
+     * Hence, we need to wait at least that amount of time before segments 
eligible for deletion
+     * gets physically removed.
+     */
+    private static final Integer STORAGE_WAIT_TIMEOUT_SEC = 35;
+
+    private TieredStorageTestContext context;
+
+    protected int numMetadataPartition() {
+        return 5;
+    }
+
+    @SuppressWarnings("deprecation")
+    @Override
+    public Seq<KafkaConfig> generateConfigs() {
+        Properties overridingProps = getOverridingProps();
+        List<KafkaConfig> configs =
+                
JavaConverters.seqAsJavaList(TestUtils.createServerConfigs(brokerCount(), 
zkConnectOrNull()))
+                        .stream()
+                        .map(config -> KafkaConfig.fromProps(config, 
overridingProps))
+                        .collect(Collectors.toList());
+        return JavaConverters.asScalaBuffer(configs).toSeq();
+    }
+
+    public Properties getOverridingProps() {
+        Properties overridingProps = new Properties();
+        // Configure the tiered storage in Kafka. Set an interval of 1 second 
for the remote log manager background
+        // activity to ensure the tiered storage has enough room to be 
exercised within the lifetime of a test.
+        //
+        // The replication factor of the remote log metadata topic needs to be 
chosen so that in resiliency
+        // tests, metadata can survive the loss of one replica for its 
topic-partitions.
+        //
+        // The second-tier storage system is mocked via the LocalTieredStorage 
instance which persists transferred
+        // data files on the local file system.
+        overridingProps.setProperty(REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, 
"true");
+        overridingProps.setProperty(REMOTE_STORAGE_MANAGER_CLASS_NAME_PROP, 
LocalTieredStorage.class.getName());
+        
overridingProps.setProperty(REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP,
+                TopicBasedRemoteLogMetadataManager.class.getName());
+        overridingProps.setProperty(REMOTE_LOG_MANAGER_TASK_INTERVAL_MS_PROP, 
"1000");
+
+        overridingProps.setProperty(REMOTE_STORAGE_MANAGER_CONFIG_PREFIX_PROP, 
storageConfigPrefix(""));
+        
overridingProps.setProperty(REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX_PROP, 
metadataConfigPrefix(""));
+
+        overridingProps.setProperty(
+                
metadataConfigPrefix(TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_PARTITIONS_PROP),
+                String.valueOf(numMetadataPartition()));
+        overridingProps.setProperty(
+                
metadataConfigPrefix(TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_REPLICATION_FACTOR_PROP),
+                String.valueOf(brokerCount()));
+        // This configuration ensures inactive log segments are deleted fast 
enough so that
+        // the integration tests can confirm a given log segment is present 
only in the second-tier storage.
+        // Note that this does not impact the eligibility of a log segment to 
be offloaded to the
+        // second-tier storage.
+        overridingProps.setProperty(KafkaConfig.LogCleanupIntervalMsProp(), 
"1000");
+        // This can be customized to read remote log segments from followers.
+        readReplicaSelectorClass()
+                .ifPresent(c -> 
overridingProps.put(KafkaConfig.ReplicaSelectorClassProp(), c.getName()));
+        // The directory of the second-tier storage needs to be constant 
across all instances of storage managers
+        // in every broker and throughout the test. Indeed, as brokers are 
restarted during the test.
+        // You can override this property with a fixed path of your choice if 
you wish to use a non-temporary
+        // directory to access its content after a test terminated.
+        overridingProps.setProperty(storageConfigPrefix(STORAGE_DIR_CONFIG), 
TestUtils.tempDir().getAbsolutePath());
+        
overridingProps.setProperty(KafkaConfig.UncleanLeaderElectionEnableProp(), 
"true");

Review Comment:
   this should default to false to match the default value of Kafka 3.6.0. Is 
there a reason we are setting this as true here?



##########
storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestContext.java:
##########
@@ -0,0 +1,348 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tiered.storage;
+
+import org.apache.kafka.tiered.storage.specs.ExpandPartitionCountSpec;
+import org.apache.kafka.tiered.storage.specs.TopicSpec;
+import org.apache.kafka.tiered.storage.utils.BrokerLocalStorage;
+import kafka.log.UnifiedLog;
+import kafka.server.KafkaBroker;
+import kafka.utils.TestUtils;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AlterConfigOp;
+import org.apache.kafka.clients.admin.AlterConfigsOptions;
+import org.apache.kafka.clients.admin.ConfigEntry;
+import org.apache.kafka.clients.admin.NewPartitions;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.admin.TopicDescription;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.TopicPartitionInfo;
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.network.ListenerName;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.metadata.BrokerState;
+import org.apache.kafka.server.log.remote.storage.LocalTieredStorage;
+import org.apache.kafka.server.log.remote.storage.LocalTieredStorageHistory;
+import org.apache.kafka.server.log.remote.storage.LocalTieredStorageSnapshot;
+import scala.Function0;
+import scala.Function1;
+
+import java.io.IOException;
+import java.io.PrintStream;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+
+import scala.Option;
+import scala.collection.Seq;
+
+import static 
org.apache.kafka.clients.CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG;
+import static 
org.apache.kafka.clients.producer.ProducerConfig.LINGER_MS_CONFIG;
+
+public final class TieredStorageTestContext {
+
+    private final Seq<KafkaBroker> brokers;
+    private final Properties producerConfig;
+    private final Properties consumerConfig;
+    private final ListenerName listenerName;
+
+    private final Serializer<String> ser;
+    private final Deserializer<String> de;
+
+    private final Map<String, TopicSpec> topicSpecs;
+    private final TieredStorageTestReport testReport;
+
+    private volatile KafkaProducer<String, String> producer;
+    private volatile KafkaConsumer<String, String> consumer;
+    private volatile Admin admin;
+    private volatile List<LocalTieredStorage> tieredStorages;
+    private volatile List<BrokerLocalStorage> localStorages;
+
+    public TieredStorageTestContext(Seq<KafkaBroker> brokers,
+                                    Properties producerConfig,
+                                    Properties consumerConfig,
+                                    ListenerName listenerName) {
+        this.brokers = brokers;
+        this.producerConfig = producerConfig;
+        this.consumerConfig = consumerConfig;
+        this.listenerName = listenerName;
+        this.ser = Serdes.String().serializer();
+        this.de = Serdes.String().deserializer();
+        this.topicSpecs = new HashMap<>();
+        this.testReport = new TieredStorageTestReport(this);
+        initContext();
+    }
+
+    private void initContext() {
+        String bootstrapServers = TestUtils.bootstrapServers(brokers, 
listenerName);
+        producerConfig.put(BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+        // Set a producer linger of 60 seconds, in order to optimistically 
generate batches of
+        // records with a pre-determined size.
+        producerConfig.put(LINGER_MS_CONFIG, 
String.valueOf(TimeUnit.SECONDS.toMillis(60)));
+        producer = new KafkaProducer<>(producerConfig, ser, ser);
+
+        consumerConfig.put(BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+        consumer = new KafkaConsumer<>(consumerConfig, de, de);
+
+        Properties adminConfig = new Properties();
+        adminConfig.put(BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+        admin = Admin.create(adminConfig);

Review Comment:
   please use `IntegrationTestHarness.createAdminClient()` method to initialize 
the producer in `TieredStorageTestHarness`



##########
storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestContext.java:
##########
@@ -0,0 +1,348 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tiered.storage;
+
+import org.apache.kafka.tiered.storage.specs.ExpandPartitionCountSpec;
+import org.apache.kafka.tiered.storage.specs.TopicSpec;
+import org.apache.kafka.tiered.storage.utils.BrokerLocalStorage;
+import kafka.log.UnifiedLog;
+import kafka.server.KafkaBroker;
+import kafka.utils.TestUtils;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AlterConfigOp;
+import org.apache.kafka.clients.admin.AlterConfigsOptions;
+import org.apache.kafka.clients.admin.ConfigEntry;
+import org.apache.kafka.clients.admin.NewPartitions;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.admin.TopicDescription;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.TopicPartitionInfo;
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.network.ListenerName;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.metadata.BrokerState;
+import org.apache.kafka.server.log.remote.storage.LocalTieredStorage;
+import org.apache.kafka.server.log.remote.storage.LocalTieredStorageHistory;
+import org.apache.kafka.server.log.remote.storage.LocalTieredStorageSnapshot;
+import scala.Function0;
+import scala.Function1;
+
+import java.io.IOException;
+import java.io.PrintStream;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+
+import scala.Option;
+import scala.collection.Seq;
+
+import static 
org.apache.kafka.clients.CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG;
+import static 
org.apache.kafka.clients.producer.ProducerConfig.LINGER_MS_CONFIG;
+
+public final class TieredStorageTestContext {
+
+    private final Seq<KafkaBroker> brokers;
+    private final Properties producerConfig;
+    private final Properties consumerConfig;
+    private final ListenerName listenerName;
+
+    private final Serializer<String> ser;
+    private final Deserializer<String> de;
+
+    private final Map<String, TopicSpec> topicSpecs;
+    private final TieredStorageTestReport testReport;
+
+    private volatile KafkaProducer<String, String> producer;
+    private volatile KafkaConsumer<String, String> consumer;
+    private volatile Admin admin;
+    private volatile List<LocalTieredStorage> tieredStorages;
+    private volatile List<BrokerLocalStorage> localStorages;
+
+    public TieredStorageTestContext(Seq<KafkaBroker> brokers,
+                                    Properties producerConfig,
+                                    Properties consumerConfig,
+                                    ListenerName listenerName) {
+        this.brokers = brokers;
+        this.producerConfig = producerConfig;
+        this.consumerConfig = consumerConfig;
+        this.listenerName = listenerName;
+        this.ser = Serdes.String().serializer();
+        this.de = Serdes.String().deserializer();
+        this.topicSpecs = new HashMap<>();
+        this.testReport = new TieredStorageTestReport(this);
+        initContext();
+    }
+
+    private void initContext() {
+        String bootstrapServers = TestUtils.bootstrapServers(brokers, 
listenerName);
+        producerConfig.put(BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+        // Set a producer linger of 60 seconds, in order to optimistically 
generate batches of
+        // records with a pre-determined size.
+        producerConfig.put(LINGER_MS_CONFIG, 
String.valueOf(TimeUnit.SECONDS.toMillis(60)));
+        producer = new KafkaProducer<>(producerConfig, ser, ser);
+
+        consumerConfig.put(BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+        consumer = new KafkaConsumer<>(consumerConfig, de, de);

Review Comment:
   please use `IntegrationTestHarness.createConsumer()` method to initialize 
the producer in `TieredStorageTestHarness`



##########
storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestContext.java:
##########
@@ -0,0 +1,348 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tiered.storage;
+
+import org.apache.kafka.tiered.storage.specs.ExpandPartitionCountSpec;
+import org.apache.kafka.tiered.storage.specs.TopicSpec;
+import org.apache.kafka.tiered.storage.utils.BrokerLocalStorage;
+import kafka.log.UnifiedLog;
+import kafka.server.KafkaBroker;
+import kafka.utils.TestUtils;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AlterConfigOp;
+import org.apache.kafka.clients.admin.AlterConfigsOptions;
+import org.apache.kafka.clients.admin.ConfigEntry;
+import org.apache.kafka.clients.admin.NewPartitions;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.admin.TopicDescription;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.TopicPartitionInfo;
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.network.ListenerName;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.metadata.BrokerState;
+import org.apache.kafka.server.log.remote.storage.LocalTieredStorage;
+import org.apache.kafka.server.log.remote.storage.LocalTieredStorageHistory;
+import org.apache.kafka.server.log.remote.storage.LocalTieredStorageSnapshot;
+import scala.Function0;
+import scala.Function1;
+
+import java.io.IOException;
+import java.io.PrintStream;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+
+import scala.Option;
+import scala.collection.Seq;
+
+import static 
org.apache.kafka.clients.CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG;
+import static 
org.apache.kafka.clients.producer.ProducerConfig.LINGER_MS_CONFIG;
+
+public final class TieredStorageTestContext {
+
+    private final Seq<KafkaBroker> brokers;
+    private final Properties producerConfig;
+    private final Properties consumerConfig;
+    private final ListenerName listenerName;
+
+    private final Serializer<String> ser;
+    private final Deserializer<String> de;
+
+    private final Map<String, TopicSpec> topicSpecs;
+    private final TieredStorageTestReport testReport;
+
+    private volatile KafkaProducer<String, String> producer;
+    private volatile KafkaConsumer<String, String> consumer;
+    private volatile Admin admin;
+    private volatile List<LocalTieredStorage> tieredStorages;
+    private volatile List<BrokerLocalStorage> localStorages;
+
+    public TieredStorageTestContext(Seq<KafkaBroker> brokers,
+                                    Properties producerConfig,
+                                    Properties consumerConfig,
+                                    ListenerName listenerName) {
+        this.brokers = brokers;
+        this.producerConfig = producerConfig;
+        this.consumerConfig = consumerConfig;
+        this.listenerName = listenerName;
+        this.ser = Serdes.String().serializer();
+        this.de = Serdes.String().deserializer();
+        this.topicSpecs = new HashMap<>();
+        this.testReport = new TieredStorageTestReport(this);
+        initContext();
+    }
+
+    private void initContext() {
+        String bootstrapServers = TestUtils.bootstrapServers(brokers, 
listenerName);

Review Comment:
   we don't have to do this manually. IntegrationTestHarness does it for you. 
When you use TieredStorageHarness, you will automatically have brokers 
initialized.



##########
storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestContext.java:
##########
@@ -0,0 +1,348 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tiered.storage;
+
+import org.apache.kafka.tiered.storage.specs.ExpandPartitionCountSpec;
+import org.apache.kafka.tiered.storage.specs.TopicSpec;
+import org.apache.kafka.tiered.storage.utils.BrokerLocalStorage;
+import kafka.log.UnifiedLog;
+import kafka.server.KafkaBroker;
+import kafka.utils.TestUtils;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AlterConfigOp;
+import org.apache.kafka.clients.admin.AlterConfigsOptions;
+import org.apache.kafka.clients.admin.ConfigEntry;
+import org.apache.kafka.clients.admin.NewPartitions;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.admin.TopicDescription;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.TopicPartitionInfo;
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.network.ListenerName;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.metadata.BrokerState;
+import org.apache.kafka.server.log.remote.storage.LocalTieredStorage;
+import org.apache.kafka.server.log.remote.storage.LocalTieredStorageHistory;
+import org.apache.kafka.server.log.remote.storage.LocalTieredStorageSnapshot;
+import scala.Function0;
+import scala.Function1;
+
+import java.io.IOException;
+import java.io.PrintStream;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+
+import scala.Option;
+import scala.collection.Seq;
+
+import static 
org.apache.kafka.clients.CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG;
+import static 
org.apache.kafka.clients.producer.ProducerConfig.LINGER_MS_CONFIG;
+
+public final class TieredStorageTestContext {
+
+    private final Seq<KafkaBroker> brokers;
+    private final Properties producerConfig;
+    private final Properties consumerConfig;
+    private final ListenerName listenerName;
+
+    private final Serializer<String> ser;
+    private final Deserializer<String> de;
+
+    private final Map<String, TopicSpec> topicSpecs;
+    private final TieredStorageTestReport testReport;
+
+    private volatile KafkaProducer<String, String> producer;
+    private volatile KafkaConsumer<String, String> consumer;
+    private volatile Admin admin;
+    private volatile List<LocalTieredStorage> tieredStorages;
+    private volatile List<BrokerLocalStorage> localStorages;
+
+    public TieredStorageTestContext(Seq<KafkaBroker> brokers,
+                                    Properties producerConfig,
+                                    Properties consumerConfig,
+                                    ListenerName listenerName) {
+        this.brokers = brokers;
+        this.producerConfig = producerConfig;
+        this.consumerConfig = consumerConfig;
+        this.listenerName = listenerName;
+        this.ser = Serdes.String().serializer();
+        this.de = Serdes.String().deserializer();
+        this.topicSpecs = new HashMap<>();
+        this.testReport = new TieredStorageTestReport(this);
+        initContext();
+    }
+
+    private void initContext() {
+        String bootstrapServers = TestUtils.bootstrapServers(brokers, 
listenerName);
+        producerConfig.put(BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+        // Set a producer linger of 60 seconds, in order to optimistically 
generate batches of
+        // records with a pre-determined size.
+        producerConfig.put(LINGER_MS_CONFIG, 
String.valueOf(TimeUnit.SECONDS.toMillis(60)));
+        producer = new KafkaProducer<>(producerConfig, ser, ser);
+
+        consumerConfig.put(BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+        consumer = new KafkaConsumer<>(consumerConfig, de, de);
+
+        Properties adminConfig = new Properties();
+        adminConfig.put(BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+        admin = Admin.create(adminConfig);
+
+        tieredStorages = TieredStorageTestHarness.getTieredStorages(brokers);
+        localStorages = TieredStorageTestHarness.getLocalStorages(brokers);
+    }
+
+    public void createTopic(TopicSpec spec) throws ExecutionException, 
InterruptedException {
+        NewTopic newTopic;
+        if (spec.getAssignment() == null || spec.getAssignment().isEmpty()) {
+            newTopic = new NewTopic(spec.getTopicName(), 
spec.getPartitionCount(), (short) spec.getReplicationFactor());
+        } else {
+            Map<Integer, List<Integer>> replicasAssignments = 
spec.getAssignment();
+            newTopic = new NewTopic(spec.getTopicName(), replicasAssignments);
+        }
+        newTopic.configs(spec.getProperties());
+        admin.createTopics(Collections.singletonList(newTopic)).all().get();
+        synchronized (this) {
+            topicSpecs.put(spec.getTopicName(), spec);
+        }
+    }
+
+    public void createPartitions(ExpandPartitionCountSpec spec) throws 
ExecutionException, InterruptedException {
+        NewPartitions newPartitions;
+        if (spec.getAssignment() == null || spec.getAssignment().isEmpty()) {
+            newPartitions = NewPartitions.increaseTo(spec.getPartitionCount());
+        } else {
+            Map<Integer, List<Integer>> assignment = spec.getAssignment();
+            List<List<Integer>> newAssignments = assignment.entrySet().stream()
+                    .sorted(Map.Entry.comparingByKey())
+                    .map(Map.Entry::getValue)
+                    .collect(Collectors.toList());
+            newPartitions = NewPartitions.increaseTo(spec.getPartitionCount(), 
newAssignments);
+        }
+        Map<String, NewPartitions> partitionsMap = 
Collections.singletonMap(spec.getTopicName(), newPartitions);
+        admin.createPartitions(partitionsMap).all().get();
+    }
+
+    public void updateTopicConfig(String topic,
+                                  Map<String, String> configsToBeAdded,
+                                  List<String> configsToBeDeleted)
+            throws ExecutionException, InterruptedException, TimeoutException {
+        ConfigResource configResource = new 
ConfigResource(ConfigResource.Type.TOPIC, topic);
+        updateResource(configResource, configsToBeAdded, configsToBeDeleted);
+    }
+
+    public void updateBrokerConfig(Integer brokerId,
+                                   Map<String, String> configsToBeAdded,
+                                   List<String> configsToBeDeleted)
+            throws ExecutionException, InterruptedException, TimeoutException {
+        ConfigResource configResource = new 
ConfigResource(ConfigResource.Type.BROKER, brokerId.toString());
+        updateResource(configResource, configsToBeAdded, configsToBeDeleted);
+    }
+
+    private void updateResource(ConfigResource configResource,
+                                Map<String, String> configsToBeAdded,
+                                List<String> configsToBeDeleted)
+            throws ExecutionException, InterruptedException, TimeoutException {
+        List<AlterConfigOp> alterEntries = new ArrayList<>();
+        configsToBeDeleted.forEach(k ->
+                alterEntries.add(new AlterConfigOp(new ConfigEntry(k, ""), 
AlterConfigOp.OpType.DELETE)));
+        configsToBeAdded.forEach((k, v) ->
+                alterEntries.add(new AlterConfigOp(new ConfigEntry(k, v), 
AlterConfigOp.OpType.SET)));
+        AlterConfigsOptions alterOptions = new 
AlterConfigsOptions().timeoutMs(30000);
+        Map<ConfigResource, Collection<AlterConfigOp>> configsMap =
+                Collections.singletonMap(configResource, alterEntries);
+        admin.incrementalAlterConfigs(configsMap, alterOptions).all().get(30, 
TimeUnit.SECONDS);
+    }
+
+    public void deleteTopic(String topic) throws ExecutionException, 
InterruptedException, TimeoutException {

Review Comment:
   again, `KafkaServerTestHarness.deleteTopic()` could be used directly here. 
We don't need to implement this.



##########
storage/src/test/java/org/apache/kafka/tiered/storage/actions/ProduceAction.java:
##########
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tiered.storage.actions;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.server.log.remote.storage.LocalTieredStorage;
+import org.apache.kafka.server.log.remote.storage.LocalTieredStorageCondition;
+import org.apache.kafka.tiered.storage.TieredStorageTestAction;
+import org.apache.kafka.tiered.storage.TieredStorageTestContext;
+import org.apache.kafka.tiered.storage.specs.OffloadedSegmentSpec;
+import org.apache.kafka.tiered.storage.specs.TopicSpec;
+import org.apache.kafka.tiered.storage.utils.BrokerLocalStorage;
+
+import java.io.PrintStream;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+
+import static 
org.apache.kafka.server.log.remote.storage.LocalTieredStorageCondition.expectEvent;
+import static 
org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.COPY_SEGMENT;
+import static 
org.apache.kafka.tiered.storage.utils.ActionUtils.getTieredStorageRecords;
+import static 
org.apache.kafka.tiered.storage.utils.RecordsKeyValueMatcher.correspondTo;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+public final class ProduceAction implements TieredStorageTestAction {
+
+    // How much time to wait for all remote log segments of a topic-partition 
to be offloaded
+    // to the second-tier storage.
+    private static final int OFFLOAD_WAIT_TIMEOUT_SEC = 20;
+
+    private final TopicPartition topicPartition;
+    private final List<OffloadedSegmentSpec> offloadedSegmentSpecs;
+    private final List<ProducerRecord<String, String>> recordsToProduce;
+    private final Integer batchSize;
+    private final Long expectedEarliestLocalOffset;
+    private final Serde<String> serde = Serdes.String();
+
+    public ProduceAction(TopicPartition topicPartition,
+                         List<OffloadedSegmentSpec> offloadedSegmentSpecs,
+                         List<ProducerRecord<String, String>> recordsToProduce,
+                         Integer batchSize,
+                         Long expectedEarliestLocalOffset) {
+        this.topicPartition = topicPartition;
+        this.offloadedSegmentSpecs = offloadedSegmentSpecs;
+        this.recordsToProduce = recordsToProduce;
+        this.batchSize = batchSize;
+        this.expectedEarliestLocalOffset = expectedEarliestLocalOffset;
+    }
+
+    @Override
+    public void doExecute(TieredStorageTestContext context)
+            throws InterruptedException, ExecutionException, TimeoutException {
+        List<LocalTieredStorage> tieredStorages = context.getTieredStorages();
+        List<BrokerLocalStorage> localStorages = context.getLocalStorages();
+
+        List<LocalTieredStorageCondition> tieredStorageConditions = 
offloadedSegmentSpecs.stream()
+                .map(spec -> expectEvent(
+                        tieredStorages,
+                        COPY_SEGMENT,
+                        spec.getSourceBrokerId(),
+                        spec.getTopicPartition(),
+                        false))
+                .collect(Collectors.toList());
+
+        // Retrieve the offset of the next record which would be consumed from 
the topic-partition
+        // before records are produced. This allows consuming only the newly 
produced records afterwards.
+        long startOffset = context.nextOffset(topicPartition);
+        long beginOffset = context.beginOffset(topicPartition);
+
+        // Records are produced here.
+        context.produce(recordsToProduce, batchSize);
+
+        if (!tieredStorageConditions.isEmpty()) {
+            tieredStorageConditions.stream()
+                    .reduce(LocalTieredStorageCondition::and)
+                    .get()
+                    .waitUntilTrue(OFFLOAD_WAIT_TIMEOUT_SEC, TimeUnit.SECONDS);
+        }
+
+        // At this stage, records were produced, and the expected remote log 
segments found in the second-tier storage.
+        // Further steps are:
+        // 1) Verify the local (first-tier) storages contain only the expected 
log segments - that is to say,
+        //    in the special case of these integration tests, only the active 
segment.
+        // 2) Consume the records and verify they match the produced records.
+        TopicSpec topicSpec = context.topicSpec(topicPartition.topic());
+        long earliestLocalOffset = expectedEarliestLocalOffset != -1L ? 
expectedEarliestLocalOffset
+                : startOffset + recordsToProduce.size()
+                - (recordsToProduce.size() % 
topicSpec.getMaxBatchCountPerSegment()) - 1;
+
+        for (BrokerLocalStorage localStorage : localStorages) {
+            // Select brokers which are assigned a replica of the 
topic-partition
+            boolean isAssignedReplica = 
context.isAssignedReplica(topicPartition, localStorage.getBrokerId());
+            if (isAssignedReplica) {
+                // Filter out inactive brokers, which may still contain log 
segments we would expect
+                // to be deleted based on the retention configuration.
+                boolean isActive = 
context.isActive(localStorage.getBrokerId());
+                if (isActive) {
+                    // Wait until the brokers local storage has been cleared 
from the inactive log segments.
+                    localStorage.waitForEarliestLocalOffset(topicPartition, 
earliestLocalOffset);
+                }
+            }
+        }
+
+        List<ConsumerRecord<String, String>> consumedRecords =
+                context.consume(topicPartition, recordsToProduce.size(), 
startOffset);

Review Comment:
   why are we consuming in a produce action? I am assuming that the reason is 
to validate that produce was executed correctly. yes? Please add a comment 
about this here. 



##########
storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestBuilder.java:
##########
@@ -0,0 +1,427 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tiered.storage;
+
+import org.apache.kafka.tiered.storage.actions.BounceBrokerAction;
+import org.apache.kafka.tiered.storage.actions.ConsumeAction;
+import org.apache.kafka.tiered.storage.actions.CreatePartitionsAction;
+import org.apache.kafka.tiered.storage.actions.CreateTopicAction;
+import org.apache.kafka.tiered.storage.actions.DeleteRecordsAction;
+import org.apache.kafka.tiered.storage.actions.DeleteTopicAction;
+import org.apache.kafka.tiered.storage.actions.EraseBrokerStorageAction;
+import org.apache.kafka.tiered.storage.actions.ExpectBrokerInISRAction;
+import org.apache.kafka.tiered.storage.actions.ExpectEmptyRemoteStorageAction;
+import org.apache.kafka.tiered.storage.actions.ExpectLeaderAction;
+import 
org.apache.kafka.tiered.storage.actions.ExpectLeaderEpochCheckpointAction;
+import org.apache.kafka.tiered.storage.actions.ExpectListOffsetsAction;
+import 
org.apache.kafka.tiered.storage.actions.ExpectTopicIdToMatchInRemoteStorageAction;
+import 
org.apache.kafka.tiered.storage.actions.ExpectUserTopicMappedToMetadataPartitionsAction;
+import org.apache.kafka.tiered.storage.actions.ProduceAction;
+import org.apache.kafka.tiered.storage.actions.ReassignReplicaAction;
+import org.apache.kafka.tiered.storage.actions.ShrinkReplicaAction;
+import org.apache.kafka.tiered.storage.actions.StartBrokerAction;
+import org.apache.kafka.tiered.storage.actions.StopBrokerAction;
+import org.apache.kafka.tiered.storage.actions.UpdateBrokerConfigAction;
+import org.apache.kafka.tiered.storage.actions.UpdateTopicConfigAction;
+import org.apache.kafka.tiered.storage.specs.ConsumableSpec;
+import org.apache.kafka.tiered.storage.specs.DeletableSpec;
+import org.apache.kafka.tiered.storage.specs.ExpandPartitionCountSpec;
+import org.apache.kafka.tiered.storage.specs.FetchableSpec;
+import org.apache.kafka.tiered.storage.specs.KeyValueSpec;
+import org.apache.kafka.tiered.storage.specs.OffloadableSpec;
+import org.apache.kafka.tiered.storage.specs.OffloadedSegmentSpec;
+import org.apache.kafka.tiered.storage.specs.ProducableSpec;
+import org.apache.kafka.tiered.storage.specs.RemoteDeleteSegmentSpec;
+import org.apache.kafka.tiered.storage.specs.RemoteFetchSpec;
+import org.apache.kafka.tiered.storage.specs.TopicSpec;
+import org.apache.kafka.clients.admin.OffsetSpec;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.config.TopicConfig;
+import org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent;
+import org.apache.kafka.storage.internals.log.EpochEntry;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public final class TieredStorageTestBuilder {
+
+    private final int defaultProducedBatchSize = 1;
+    private final long defaultEarliestLocalOffsetExpectedInLogDirectory = 0;
+
+    private Map<TopicPartition, ProducableSpec> producables = new HashMap<>();
+    private Map<TopicPartition, List<OffloadableSpec>> offloadables = new 
HashMap<>();
+    private Map<TopicPartition, ConsumableSpec> consumables = new HashMap<>();
+    private Map<TopicPartition, FetchableSpec> fetchables = new HashMap<>();
+    private Map<TopicPartition, List<DeletableSpec>> deletables = new 
HashMap<>();
+    private List<TieredStorageTestAction> actions = new ArrayList<>();
+
+    public TieredStorageTestBuilder() {
+    }
+
+    public TieredStorageTestBuilder createTopic(String topic,
+                                                Integer partitionCount,
+                                                Integer replicationFactor,
+                                                Integer 
maxBatchCountPerSegment,
+                                                Map<Integer, List<Integer>> 
replicaAssignment,
+                                                Boolean 
enableRemoteLogStorage) {
+        assert maxBatchCountPerSegment >= 1 : "Segments size for topic " + 
topic + " needs to be >= 1";
+        assert partitionCount >= 1 : "Partition count for topic " + topic + " 
needs to be >= 1";
+        assert replicationFactor >= 1 : "Replication factor for topic " + 
topic + " needs to be >= 1";
+        maybeCreateProduceAction();
+        maybeCreateConsumeActions();

Review Comment:
   addition of these actions is very confusing for me. My understanding is that 
we add these actions because the parent action is dependent on it? But for 
example, in this case, createTopicAction is not dependent on 
createProduce/ConsumeAction, then why do we add them before this?



##########
storage/src/test/java/org/apache/kafka/tiered/storage/integration/OffloadAndConsumeFromLeaderTest.java:
##########
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tiered.storage.integration;
+
+import org.apache.kafka.tiered.storage.TieredStorageTestBuilder;
+import org.apache.kafka.tiered.storage.TieredStorageTestHarness;
+import org.apache.kafka.tiered.storage.specs.KeyValueSpec;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Test Cases (A):
+ *    Elementary offloads and fetches from tiered storage.
+ */
+public final class OffloadAndConsumeFromLeaderTest extends 
TieredStorageTestHarness {

Review Comment:
   we need a lightweight README.md in tiered/storage folder. That README should 
explain the basic construction of a test.
   
   Step 1: For every test, setup is done via `TieredStorageTestHarness` which 
extends `IntegrationTestHarness` and sets up a cluster with TS enabled on it.
   Step 2: The test is written as a specification consisting of sequential 
actions and assertions. The spec for the complete test is written down first 
which creates "actions" to be executed.
   Step 3: Once we have the test spec in-place (which includes assertion 
actions), we execute the test which will execute each action sequentially.
   Step 4: The test execution stops when any of the action throws an exception 
(or an assertion error). 
   Step 5: Clean-up for the test is performed on test exit



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to