divijvaidya commented on code in PR #14116:
URL: https://github.com/apache/kafka/pull/14116#discussion_r1286748068


##########
storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestBuilder.java:
##########
@@ -0,0 +1,427 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tiered.storage;
+
+import org.apache.kafka.tiered.storage.actions.BounceBrokerAction;
+import org.apache.kafka.tiered.storage.actions.ConsumeAction;
+import org.apache.kafka.tiered.storage.actions.CreatePartitionsAction;
+import org.apache.kafka.tiered.storage.actions.CreateTopicAction;
+import org.apache.kafka.tiered.storage.actions.DeleteRecordsAction;
+import org.apache.kafka.tiered.storage.actions.DeleteTopicAction;
+import org.apache.kafka.tiered.storage.actions.EraseBrokerStorageAction;
+import org.apache.kafka.tiered.storage.actions.ExpectBrokerInISRAction;
+import org.apache.kafka.tiered.storage.actions.ExpectEmptyRemoteStorageAction;
+import org.apache.kafka.tiered.storage.actions.ExpectLeaderAction;
+import 
org.apache.kafka.tiered.storage.actions.ExpectLeaderEpochCheckpointAction;
+import org.apache.kafka.tiered.storage.actions.ExpectListOffsetsAction;
+import 
org.apache.kafka.tiered.storage.actions.ExpectTopicIdToMatchInRemoteStorageAction;
+import 
org.apache.kafka.tiered.storage.actions.ExpectUserTopicMappedToMetadataPartitionsAction;
+import org.apache.kafka.tiered.storage.actions.ProduceAction;
+import org.apache.kafka.tiered.storage.actions.ReassignReplicaAction;
+import org.apache.kafka.tiered.storage.actions.ShrinkReplicaAction;
+import org.apache.kafka.tiered.storage.actions.StartBrokerAction;
+import org.apache.kafka.tiered.storage.actions.StopBrokerAction;
+import org.apache.kafka.tiered.storage.actions.UpdateBrokerConfigAction;
+import org.apache.kafka.tiered.storage.actions.UpdateTopicConfigAction;
+import org.apache.kafka.tiered.storage.specs.ConsumableSpec;
+import org.apache.kafka.tiered.storage.specs.DeletableSpec;
+import org.apache.kafka.tiered.storage.specs.ExpandPartitionCountSpec;
+import org.apache.kafka.tiered.storage.specs.FetchableSpec;
+import org.apache.kafka.tiered.storage.specs.KeyValueSpec;
+import org.apache.kafka.tiered.storage.specs.OffloadableSpec;
+import org.apache.kafka.tiered.storage.specs.OffloadedSegmentSpec;
+import org.apache.kafka.tiered.storage.specs.ProducableSpec;
+import org.apache.kafka.tiered.storage.specs.RemoteDeleteSegmentSpec;
+import org.apache.kafka.tiered.storage.specs.RemoteFetchSpec;
+import org.apache.kafka.tiered.storage.specs.TopicSpec;
+import org.apache.kafka.clients.admin.OffsetSpec;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.config.TopicConfig;
+import org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent;
+import org.apache.kafka.storage.internals.log.EpochEntry;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public final class TieredStorageTestBuilder {
+
+    private final int defaultProducedBatchSize = 1;
+    private final long defaultEarliestLocalOffsetExpectedInLogDirectory = 0;
+
+    private Map<TopicPartition, ProducableSpec> producables = new HashMap<>();
+    private Map<TopicPartition, List<OffloadableSpec>> offloadables = new 
HashMap<>();
+    private Map<TopicPartition, ConsumableSpec> consumables = new HashMap<>();
+    private Map<TopicPartition, FetchableSpec> fetchables = new HashMap<>();
+    private Map<TopicPartition, List<DeletableSpec>> deletables = new 
HashMap<>();
+    private List<TieredStorageTestAction> actions = new ArrayList<>();
+
+    public TieredStorageTestBuilder() {
+    }
+
+    public TieredStorageTestBuilder createTopic(String topic,
+                                                Integer partitionCount,
+                                                Integer replicationFactor,
+                                                Integer 
maxBatchCountPerSegment,
+                                                Map<Integer, List<Integer>> 
replicaAssignment,
+                                                Boolean 
enableRemoteLogStorage) {
+        assert maxBatchCountPerSegment >= 1 : "Segments size for topic " + 
topic + " needs to be >= 1";
+        assert partitionCount >= 1 : "Partition count for topic " + topic + " 
needs to be >= 1";
+        assert replicationFactor >= 1 : "Replication factor for topic " + 
topic + " needs to be >= 1";
+        maybeCreateProduceAction();
+        maybeCreateConsumeActions();
+        Map<String, String> properties = new HashMap<>();
+        properties.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, 
enableRemoteLogStorage.toString());
+        TopicSpec topicSpec = new TopicSpec(topic, partitionCount, 
replicationFactor, maxBatchCountPerSegment,
+                replicaAssignment, properties);
+        actions.add(new CreateTopicAction(topicSpec));
+        return this;
+    }
+
+    public TieredStorageTestBuilder createPartitions(String topic,
+                                                     Integer partitionCount,
+                                                     Map<Integer, 
List<Integer>> replicaAssignment) {
+        assert partitionCount >= 1 : "Partition count for topic " + topic + " 
needs to be >= 1";
+        maybeCreateProduceAction();
+        maybeCreateConsumeActions();
+        ExpandPartitionCountSpec spec = new ExpandPartitionCountSpec(topic, 
partitionCount, replicaAssignment);
+        actions.add(new CreatePartitionsAction(spec));
+        return this;
+    }
+
+    public TieredStorageTestBuilder updateTopicConfig(String topic,
+                                                      Map<String, String> 
configsToBeAdded,
+                                                      List<String> 
configsToBeDeleted) {
+        assert !configsToBeAdded.isEmpty() || !configsToBeDeleted.isEmpty()
+                : "Topic " + topic + " configs shouldn't be empty";
+        maybeCreateProduceAction();
+        maybeCreateConsumeActions();
+        actions.add(new UpdateTopicConfigAction(topic, configsToBeAdded, 
configsToBeDeleted));
+        return this;
+    }
+
+    public TieredStorageTestBuilder updateBrokerConfig(Integer brokerId,
+                                                       Map<String, String> 
configsToBeAdded,
+                                                       List<String> 
configsToBeDeleted) {
+        assert !configsToBeAdded.isEmpty() || !configsToBeDeleted.isEmpty()
+                : "Broker " + brokerId + " configs shouldn't be empty";
+        maybeCreateProduceAction();
+        maybeCreateConsumeActions();
+        actions.add(new UpdateBrokerConfigAction(brokerId, configsToBeAdded, 
configsToBeDeleted));
+        return this;
+    }
+
+    public TieredStorageTestBuilder deleteTopic(List<String> topics) {
+        maybeCreateProduceAction();
+        maybeCreateConsumeActions();
+        topics.forEach(topic -> actions.add(buildDeleteTopicAction(topic, 
true)));
+        return this;
+    }
+
+    public TieredStorageTestBuilder produce(String topic,
+                                            Integer partition,
+                                            KeyValueSpec... keyValues) {
+        assert partition >= 0 : "Partition must be >= 0";
+        maybeCreateConsumeActions();
+        ProducableSpec spec = getOrCreateProducable(topic, partition);
+        for (KeyValueSpec kv : keyValues) {
+            spec.getRecords().add(new ProducerRecord<>(topic, partition, 
kv.getKey(), kv.getValue()));
+        }
+        return this;
+    }
+
+    public TieredStorageTestBuilder produceWithTimestamp(String topic,
+                                                         Integer partition,
+                                                         KeyValueSpec... 
keyValues) {
+        assert partition >= 0 : "Partition must be >= 0";
+        maybeCreateConsumeActions();
+        ProducableSpec spec = getOrCreateProducable(topic, partition);
+        for (KeyValueSpec kv : keyValues) {
+            spec.getRecords()
+                    .add(new ProducerRecord<>(topic, partition, 
kv.getTimestamp(), kv.getKey(), kv.getValue()));
+        }
+        return this;
+    }
+
+    public TieredStorageTestBuilder withBatchSize(String topic,
+                                                  Integer partition,
+                                                  Integer batchSize) {
+        assert batchSize >= 1 : "The size of a batch of produced records must 
>= 1";
+        getOrCreateProducable(topic, partition).setBatchSize(batchSize);
+        return this;
+    }
+
+    public TieredStorageTestBuilder 
expectEarliestLocalOffsetInLogDirectory(String topic,
+                                                                            
Integer partition,
+                                                                            
Long earliestLocalOffset) {
+        assert earliestLocalOffset >= 0 : "Record offset must be >= 0";
+        getOrCreateProducable(topic, 
partition).setEarliestLocalLogOffset(earliestLocalOffset);
+        return this;
+    }
+
+    public TieredStorageTestBuilder expectSegmentToBeOffloaded(Integer 
fromBroker,
+                                                               String topic,
+                                                               Integer 
partition,
+                                                               Integer 
baseOffset,
+                                                               KeyValueSpec... 
keyValues) {
+        TopicPartition topicPartition = new TopicPartition(topic, partition);
+        List<ProducerRecord<String, String>> records = new ArrayList<>();
+        for (KeyValueSpec kv: keyValues) {
+            records.add(new ProducerRecord<>(topic, partition, kv.getKey(), 
kv.getValue()));
+        }
+        offloadables.computeIfAbsent(topicPartition, k -> new ArrayList<>())
+                .add(new OffloadableSpec(fromBroker, baseOffset, records));
+        return this;
+    }
+
+    public TieredStorageTestBuilder expectTopicIdToMatchInRemoteStorage(String 
topic) {
+        maybeCreateProduceAction();
+        maybeCreateConsumeActions();
+        actions.add(new ExpectTopicIdToMatchInRemoteStorageAction(topic));
+        return this;
+    }
+
+    public TieredStorageTestBuilder maybeEnqueueActions() {
+        maybeCreateProduceAction();
+        maybeCreateConsumeActions();
+        return this;
+    }
+
+    public TieredStorageTestBuilder consume(String topic,
+                                            Integer partition,
+                                            Long fetchOffset,
+                                            Integer expectedTotalRecord,
+                                            Integer 
expectedRecordsFromSecondTier) {
+        TopicPartition topicPartition = new TopicPartition(topic, partition);
+        assert partition >= 0 : "Partition must be >= 0";
+        assert fetchOffset >= 0 : "Fetch offset must be >=0";
+        assert expectedTotalRecord >= 1 : "Must read at least one record";
+        assert expectedRecordsFromSecondTier >= 0 : "Expected read cannot be < 
0";
+        assert expectedRecordsFromSecondTier <= expectedTotalRecord : "Cannot 
fetch more records than consumed";
+        assert !consumables.containsKey(topicPartition) : "Consume already in 
progress for " + topicPartition;
+        maybeCreateProduceAction();
+        consumables.put(
+                topicPartition, new ConsumableSpec(fetchOffset, 
expectedTotalRecord, expectedRecordsFromSecondTier));
+        return this;
+    }
+
+    public TieredStorageTestBuilder expectLeader(String topic,
+                                                 Integer partition,
+                                                 Integer brokerId,
+                                                 Boolean electLeader) {
+        actions.add(new ExpectLeaderAction(new TopicPartition(topic, 
partition), brokerId, electLeader));
+        return this;
+    }
+
+    public TieredStorageTestBuilder expectInIsr(String topic,
+                                                Integer partition,
+                                                Integer brokerId) {
+        actions.add(new ExpectBrokerInISRAction(new TopicPartition(topic, 
partition), brokerId));
+        return this;
+    }
+
+    public TieredStorageTestBuilder expectFetchFromTieredStorage(Integer 
fromBroker,
+                                                                 String topic,
+                                                                 Integer 
partition,
+                                                                 Integer 
remoteFetchRequestCount) {
+        TopicPartition topicPartition = new TopicPartition(topic, partition);
+        assert partition >= 0 : "Partition must be >= 0";
+        assert remoteFetchRequestCount >= 0 : "Expected fetch count from 
tiered storage must be >= 0";
+        assert !fetchables.containsKey(topicPartition) : "Consume already in 
progress for " + topicPartition;
+        fetchables.put(topicPartition, new FetchableSpec(fromBroker, 
remoteFetchRequestCount));
+        return this;
+    }
+
+    public TieredStorageTestBuilder expectDeletionInRemoteStorage(Integer 
fromBroker,
+                                                                  String topic,
+                                                                  Integer 
partition,
+                                                                  
LocalTieredStorageEvent.EventType eventType,
+                                                                  Integer 
eventCount) {
+        TopicPartition topicPartition = new TopicPartition(topic, partition);
+        deletables.computeIfAbsent(topicPartition, k -> new ArrayList<>())
+                .add(new DeletableSpec(fromBroker, eventType, eventCount));
+        return this;
+    }
+
+    public TieredStorageTestBuilder waitForRemoteLogSegmentDeletion(String 
topic) {
+        maybeCreateProduceAction();
+        maybeCreateConsumeActions();
+        actions.add(buildDeleteTopicAction(topic, false));
+        return this;
+    }
+
+    public TieredStorageTestBuilder expectLeaderEpochCheckpoint(Integer 
brokerId,
+                                                                String topic,
+                                                                Integer 
partition,
+                                                                Integer 
beginEpoch,
+                                                                Long 
startOffset) {
+        TopicPartition topicPartition = new TopicPartition(topic, partition);
+        actions.add(new ExpectLeaderEpochCheckpointAction(brokerId, 
topicPartition, beginEpoch, startOffset));
+        return this;
+    }
+
+    public TieredStorageTestBuilder expectListOffsets(String topic,
+                                                      Integer partition,
+                                                      OffsetSpec offsetSpec,
+                                                      EpochEntry epochEntry) {
+        maybeCreateProduceAction();
+        maybeCreateConsumeActions();
+        TopicPartition topicPartition = new TopicPartition(topic, partition);
+        actions.add(new ExpectListOffsetsAction(topicPartition, offsetSpec, 
epochEntry));
+        return this;
+    }
+
+    public TieredStorageTestBuilder bounce(Integer brokerId) {
+        maybeCreateProduceAction();
+        maybeCreateConsumeActions();
+        actions.add(new BounceBrokerAction(brokerId));
+        return this;
+    }
+
+    public TieredStorageTestBuilder stop(Integer brokerId) {
+        maybeCreateProduceAction();
+        maybeCreateConsumeActions();
+        actions.add(new StopBrokerAction(brokerId));
+        return this;
+    }
+
+    public TieredStorageTestBuilder start(Integer brokerId) {
+        maybeCreateProduceAction();
+        maybeCreateConsumeActions();
+        actions.add(new StartBrokerAction(brokerId));
+        return this;
+    }
+
+    public TieredStorageTestBuilder eraseBrokerStorage(Integer brokerId) {
+        actions.add(new EraseBrokerStorageAction(brokerId));
+        return this;
+    }
+
+    public TieredStorageTestBuilder expectEmptyRemoteStorage(String topic,
+                                                             Integer 
partition) {
+        TopicPartition topicPartition = new TopicPartition(topic, partition);
+        actions.add(new ExpectEmptyRemoteStorageAction(topicPartition));
+        return this;
+    }
+
+    public TieredStorageTestBuilder shrinkReplica(String topic,
+                                                  Integer partition,
+                                                  List<Integer> replicaIds) {
+        maybeCreateProduceAction();
+        maybeCreateConsumeActions();
+        TopicPartition topicPartition = new TopicPartition(topic, partition);
+        actions.add(new ShrinkReplicaAction(topicPartition, replicaIds));
+        return this;
+    }
+
+    public TieredStorageTestBuilder reassignReplica(String topic,
+                                                    Integer partition,
+                                                    List<Integer> replicaIds) {
+        maybeCreateProduceAction();
+        maybeCreateConsumeActions();
+        TopicPartition topicPartition = new TopicPartition(topic, partition);
+        actions.add(new ReassignReplicaAction(topicPartition, replicaIds));
+        return this;
+    }
+
+    public TieredStorageTestBuilder 
expectUserTopicMappedToMetadataPartitions(String topic,
+                                                                              
List<Integer> metadataPartitions) {
+        actions.add(new ExpectUserTopicMappedToMetadataPartitionsAction(topic, 
metadataPartitions));
+        return this;
+    }
+
+    public TieredStorageTestBuilder deleteRecords(String topic,

Review Comment:
   I mentioned this as an example of how this interface can help write tests 
where we want to assert an exception on the client. DeleteRecords was probably 
not the best choice here but the idea was to demonstrate how we can change the 
harness to expect exceptions.
   
   We can do this later outside this PR if a need arises. This is optional.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to