divijvaidya commented on code in PR #14116: URL: https://github.com/apache/kafka/pull/14116#discussion_r1286748068
########## storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestBuilder.java: ########## @@ -0,0 +1,427 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.tiered.storage; + +import org.apache.kafka.tiered.storage.actions.BounceBrokerAction; +import org.apache.kafka.tiered.storage.actions.ConsumeAction; +import org.apache.kafka.tiered.storage.actions.CreatePartitionsAction; +import org.apache.kafka.tiered.storage.actions.CreateTopicAction; +import org.apache.kafka.tiered.storage.actions.DeleteRecordsAction; +import org.apache.kafka.tiered.storage.actions.DeleteTopicAction; +import org.apache.kafka.tiered.storage.actions.EraseBrokerStorageAction; +import org.apache.kafka.tiered.storage.actions.ExpectBrokerInISRAction; +import org.apache.kafka.tiered.storage.actions.ExpectEmptyRemoteStorageAction; +import org.apache.kafka.tiered.storage.actions.ExpectLeaderAction; +import org.apache.kafka.tiered.storage.actions.ExpectLeaderEpochCheckpointAction; +import org.apache.kafka.tiered.storage.actions.ExpectListOffsetsAction; +import org.apache.kafka.tiered.storage.actions.ExpectTopicIdToMatchInRemoteStorageAction; +import org.apache.kafka.tiered.storage.actions.ExpectUserTopicMappedToMetadataPartitionsAction; +import org.apache.kafka.tiered.storage.actions.ProduceAction; +import org.apache.kafka.tiered.storage.actions.ReassignReplicaAction; +import org.apache.kafka.tiered.storage.actions.ShrinkReplicaAction; +import org.apache.kafka.tiered.storage.actions.StartBrokerAction; +import org.apache.kafka.tiered.storage.actions.StopBrokerAction; +import org.apache.kafka.tiered.storage.actions.UpdateBrokerConfigAction; +import org.apache.kafka.tiered.storage.actions.UpdateTopicConfigAction; +import org.apache.kafka.tiered.storage.specs.ConsumableSpec; +import org.apache.kafka.tiered.storage.specs.DeletableSpec; +import org.apache.kafka.tiered.storage.specs.ExpandPartitionCountSpec; +import org.apache.kafka.tiered.storage.specs.FetchableSpec; +import org.apache.kafka.tiered.storage.specs.KeyValueSpec; +import org.apache.kafka.tiered.storage.specs.OffloadableSpec; +import org.apache.kafka.tiered.storage.specs.OffloadedSegmentSpec; +import org.apache.kafka.tiered.storage.specs.ProducableSpec; +import org.apache.kafka.tiered.storage.specs.RemoteDeleteSegmentSpec; +import org.apache.kafka.tiered.storage.specs.RemoteFetchSpec; +import org.apache.kafka.tiered.storage.specs.TopicSpec; +import org.apache.kafka.clients.admin.OffsetSpec; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.config.TopicConfig; +import org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent; +import org.apache.kafka.storage.internals.log.EpochEntry; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +public final class TieredStorageTestBuilder { + + private final int defaultProducedBatchSize = 1; + private final long defaultEarliestLocalOffsetExpectedInLogDirectory = 0; + + private Map<TopicPartition, ProducableSpec> producables = new HashMap<>(); + private Map<TopicPartition, List<OffloadableSpec>> offloadables = new HashMap<>(); + private Map<TopicPartition, ConsumableSpec> consumables = new HashMap<>(); + private Map<TopicPartition, FetchableSpec> fetchables = new HashMap<>(); + private Map<TopicPartition, List<DeletableSpec>> deletables = new HashMap<>(); + private List<TieredStorageTestAction> actions = new ArrayList<>(); + + public TieredStorageTestBuilder() { + } + + public TieredStorageTestBuilder createTopic(String topic, + Integer partitionCount, + Integer replicationFactor, + Integer maxBatchCountPerSegment, + Map<Integer, List<Integer>> replicaAssignment, + Boolean enableRemoteLogStorage) { + assert maxBatchCountPerSegment >= 1 : "Segments size for topic " + topic + " needs to be >= 1"; + assert partitionCount >= 1 : "Partition count for topic " + topic + " needs to be >= 1"; + assert replicationFactor >= 1 : "Replication factor for topic " + topic + " needs to be >= 1"; + maybeCreateProduceAction(); + maybeCreateConsumeActions(); + Map<String, String> properties = new HashMap<>(); + properties.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, enableRemoteLogStorage.toString()); + TopicSpec topicSpec = new TopicSpec(topic, partitionCount, replicationFactor, maxBatchCountPerSegment, + replicaAssignment, properties); + actions.add(new CreateTopicAction(topicSpec)); + return this; + } + + public TieredStorageTestBuilder createPartitions(String topic, + Integer partitionCount, + Map<Integer, List<Integer>> replicaAssignment) { + assert partitionCount >= 1 : "Partition count for topic " + topic + " needs to be >= 1"; + maybeCreateProduceAction(); + maybeCreateConsumeActions(); + ExpandPartitionCountSpec spec = new ExpandPartitionCountSpec(topic, partitionCount, replicaAssignment); + actions.add(new CreatePartitionsAction(spec)); + return this; + } + + public TieredStorageTestBuilder updateTopicConfig(String topic, + Map<String, String> configsToBeAdded, + List<String> configsToBeDeleted) { + assert !configsToBeAdded.isEmpty() || !configsToBeDeleted.isEmpty() + : "Topic " + topic + " configs shouldn't be empty"; + maybeCreateProduceAction(); + maybeCreateConsumeActions(); + actions.add(new UpdateTopicConfigAction(topic, configsToBeAdded, configsToBeDeleted)); + return this; + } + + public TieredStorageTestBuilder updateBrokerConfig(Integer brokerId, + Map<String, String> configsToBeAdded, + List<String> configsToBeDeleted) { + assert !configsToBeAdded.isEmpty() || !configsToBeDeleted.isEmpty() + : "Broker " + brokerId + " configs shouldn't be empty"; + maybeCreateProduceAction(); + maybeCreateConsumeActions(); + actions.add(new UpdateBrokerConfigAction(brokerId, configsToBeAdded, configsToBeDeleted)); + return this; + } + + public TieredStorageTestBuilder deleteTopic(List<String> topics) { + maybeCreateProduceAction(); + maybeCreateConsumeActions(); + topics.forEach(topic -> actions.add(buildDeleteTopicAction(topic, true))); + return this; + } + + public TieredStorageTestBuilder produce(String topic, + Integer partition, + KeyValueSpec... keyValues) { + assert partition >= 0 : "Partition must be >= 0"; + maybeCreateConsumeActions(); + ProducableSpec spec = getOrCreateProducable(topic, partition); + for (KeyValueSpec kv : keyValues) { + spec.getRecords().add(new ProducerRecord<>(topic, partition, kv.getKey(), kv.getValue())); + } + return this; + } + + public TieredStorageTestBuilder produceWithTimestamp(String topic, + Integer partition, + KeyValueSpec... keyValues) { + assert partition >= 0 : "Partition must be >= 0"; + maybeCreateConsumeActions(); + ProducableSpec spec = getOrCreateProducable(topic, partition); + for (KeyValueSpec kv : keyValues) { + spec.getRecords() + .add(new ProducerRecord<>(topic, partition, kv.getTimestamp(), kv.getKey(), kv.getValue())); + } + return this; + } + + public TieredStorageTestBuilder withBatchSize(String topic, + Integer partition, + Integer batchSize) { + assert batchSize >= 1 : "The size of a batch of produced records must >= 1"; + getOrCreateProducable(topic, partition).setBatchSize(batchSize); + return this; + } + + public TieredStorageTestBuilder expectEarliestLocalOffsetInLogDirectory(String topic, + Integer partition, + Long earliestLocalOffset) { + assert earliestLocalOffset >= 0 : "Record offset must be >= 0"; + getOrCreateProducable(topic, partition).setEarliestLocalLogOffset(earliestLocalOffset); + return this; + } + + public TieredStorageTestBuilder expectSegmentToBeOffloaded(Integer fromBroker, + String topic, + Integer partition, + Integer baseOffset, + KeyValueSpec... keyValues) { + TopicPartition topicPartition = new TopicPartition(topic, partition); + List<ProducerRecord<String, String>> records = new ArrayList<>(); + for (KeyValueSpec kv: keyValues) { + records.add(new ProducerRecord<>(topic, partition, kv.getKey(), kv.getValue())); + } + offloadables.computeIfAbsent(topicPartition, k -> new ArrayList<>()) + .add(new OffloadableSpec(fromBroker, baseOffset, records)); + return this; + } + + public TieredStorageTestBuilder expectTopicIdToMatchInRemoteStorage(String topic) { + maybeCreateProduceAction(); + maybeCreateConsumeActions(); + actions.add(new ExpectTopicIdToMatchInRemoteStorageAction(topic)); + return this; + } + + public TieredStorageTestBuilder maybeEnqueueActions() { + maybeCreateProduceAction(); + maybeCreateConsumeActions(); + return this; + } + + public TieredStorageTestBuilder consume(String topic, + Integer partition, + Long fetchOffset, + Integer expectedTotalRecord, + Integer expectedRecordsFromSecondTier) { + TopicPartition topicPartition = new TopicPartition(topic, partition); + assert partition >= 0 : "Partition must be >= 0"; + assert fetchOffset >= 0 : "Fetch offset must be >=0"; + assert expectedTotalRecord >= 1 : "Must read at least one record"; + assert expectedRecordsFromSecondTier >= 0 : "Expected read cannot be < 0"; + assert expectedRecordsFromSecondTier <= expectedTotalRecord : "Cannot fetch more records than consumed"; + assert !consumables.containsKey(topicPartition) : "Consume already in progress for " + topicPartition; + maybeCreateProduceAction(); + consumables.put( + topicPartition, new ConsumableSpec(fetchOffset, expectedTotalRecord, expectedRecordsFromSecondTier)); + return this; + } + + public TieredStorageTestBuilder expectLeader(String topic, + Integer partition, + Integer brokerId, + Boolean electLeader) { + actions.add(new ExpectLeaderAction(new TopicPartition(topic, partition), brokerId, electLeader)); + return this; + } + + public TieredStorageTestBuilder expectInIsr(String topic, + Integer partition, + Integer brokerId) { + actions.add(new ExpectBrokerInISRAction(new TopicPartition(topic, partition), brokerId)); + return this; + } + + public TieredStorageTestBuilder expectFetchFromTieredStorage(Integer fromBroker, + String topic, + Integer partition, + Integer remoteFetchRequestCount) { + TopicPartition topicPartition = new TopicPartition(topic, partition); + assert partition >= 0 : "Partition must be >= 0"; + assert remoteFetchRequestCount >= 0 : "Expected fetch count from tiered storage must be >= 0"; + assert !fetchables.containsKey(topicPartition) : "Consume already in progress for " + topicPartition; + fetchables.put(topicPartition, new FetchableSpec(fromBroker, remoteFetchRequestCount)); + return this; + } + + public TieredStorageTestBuilder expectDeletionInRemoteStorage(Integer fromBroker, + String topic, + Integer partition, + LocalTieredStorageEvent.EventType eventType, + Integer eventCount) { + TopicPartition topicPartition = new TopicPartition(topic, partition); + deletables.computeIfAbsent(topicPartition, k -> new ArrayList<>()) + .add(new DeletableSpec(fromBroker, eventType, eventCount)); + return this; + } + + public TieredStorageTestBuilder waitForRemoteLogSegmentDeletion(String topic) { + maybeCreateProduceAction(); + maybeCreateConsumeActions(); + actions.add(buildDeleteTopicAction(topic, false)); + return this; + } + + public TieredStorageTestBuilder expectLeaderEpochCheckpoint(Integer brokerId, + String topic, + Integer partition, + Integer beginEpoch, + Long startOffset) { + TopicPartition topicPartition = new TopicPartition(topic, partition); + actions.add(new ExpectLeaderEpochCheckpointAction(brokerId, topicPartition, beginEpoch, startOffset)); + return this; + } + + public TieredStorageTestBuilder expectListOffsets(String topic, + Integer partition, + OffsetSpec offsetSpec, + EpochEntry epochEntry) { + maybeCreateProduceAction(); + maybeCreateConsumeActions(); + TopicPartition topicPartition = new TopicPartition(topic, partition); + actions.add(new ExpectListOffsetsAction(topicPartition, offsetSpec, epochEntry)); + return this; + } + + public TieredStorageTestBuilder bounce(Integer brokerId) { + maybeCreateProduceAction(); + maybeCreateConsumeActions(); + actions.add(new BounceBrokerAction(brokerId)); + return this; + } + + public TieredStorageTestBuilder stop(Integer brokerId) { + maybeCreateProduceAction(); + maybeCreateConsumeActions(); + actions.add(new StopBrokerAction(brokerId)); + return this; + } + + public TieredStorageTestBuilder start(Integer brokerId) { + maybeCreateProduceAction(); + maybeCreateConsumeActions(); + actions.add(new StartBrokerAction(brokerId)); + return this; + } + + public TieredStorageTestBuilder eraseBrokerStorage(Integer brokerId) { + actions.add(new EraseBrokerStorageAction(brokerId)); + return this; + } + + public TieredStorageTestBuilder expectEmptyRemoteStorage(String topic, + Integer partition) { + TopicPartition topicPartition = new TopicPartition(topic, partition); + actions.add(new ExpectEmptyRemoteStorageAction(topicPartition)); + return this; + } + + public TieredStorageTestBuilder shrinkReplica(String topic, + Integer partition, + List<Integer> replicaIds) { + maybeCreateProduceAction(); + maybeCreateConsumeActions(); + TopicPartition topicPartition = new TopicPartition(topic, partition); + actions.add(new ShrinkReplicaAction(topicPartition, replicaIds)); + return this; + } + + public TieredStorageTestBuilder reassignReplica(String topic, + Integer partition, + List<Integer> replicaIds) { + maybeCreateProduceAction(); + maybeCreateConsumeActions(); + TopicPartition topicPartition = new TopicPartition(topic, partition); + actions.add(new ReassignReplicaAction(topicPartition, replicaIds)); + return this; + } + + public TieredStorageTestBuilder expectUserTopicMappedToMetadataPartitions(String topic, + List<Integer> metadataPartitions) { + actions.add(new ExpectUserTopicMappedToMetadataPartitionsAction(topic, metadataPartitions)); + return this; + } + + public TieredStorageTestBuilder deleteRecords(String topic, Review Comment: I mentioned this as an example of how this interface can help write tests where we want to assert an exception on the client. DeleteRecords was probably not the best choice here but the idea was to demonstrate how we can change the harness to expect exceptions. We can do this later outside this PR if a need arises. This is optional. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org