[GitHub] [kafka] kamalcph commented on a diff in pull request #14116: KAFKA-15167: Tiered Storage Test Harness Framework
kamalcph commented on code in PR #14116: URL: https://github.com/apache/kafka/pull/14116#discussion_r1308574000 ## storage/src/test/java/org/apache/kafka/tiered/storage/actions/CreateTopicAction.java: ## @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.tiered.storage.actions; + +import org.apache.kafka.tiered.storage.TieredStorageTestAction; +import org.apache.kafka.tiered.storage.TieredStorageTestContext; +import org.apache.kafka.tiered.storage.specs.TopicSpec; +import org.apache.kafka.common.config.TopicConfig; + +import java.io.PrintStream; +import java.util.concurrent.ExecutionException; + +public final class CreateTopicAction implements TieredStorageTestAction { + +private final TopicSpec spec; + +public CreateTopicAction(TopicSpec spec) { +this.spec = spec; +} + +@Override +public void doExecute(TieredStorageTestContext context) throws ExecutionException, InterruptedException { +// Ensure offset and time indexes are generated for every record. +spec.getProperties().put(TopicConfig.INDEX_INTERVAL_BYTES_CONFIG, "1"); +// Leverage the use of the segment index size to create a log-segment accepting one and only one record. +// The minimum size of the indexes is that of an entry, which is 8 for the offset index and 12 for the +// time index. Hence, since the topic is configured to generate index entries for every record with, for +// a "small" number of records (i.e. such that the average record size times the number of records is +// much less than the segment size), the number of records which hold in a segment is the multiple of 12 +// defined below. +if (spec.getMaxBatchCountPerSegment() != -1) { +spec.getProperties().put( +TopicConfig.SEGMENT_INDEX_BYTES_CONFIG, String.valueOf(12 * spec.getMaxBatchCountPerSegment())); Review Comment: This is the allocated size for all the indexes. Since, timeIndex requires 12 bytes for one record batch which is higher than the offsetIndex of 8 bytes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kamalcph commented on a diff in pull request #14116: KAFKA-15167: Tiered Storage Test Harness Framework
kamalcph commented on code in PR #14116: URL: https://github.com/apache/kafka/pull/14116#discussion_r1308568867 ## storage/src/test/java/org/apache/kafka/tiered/storage/actions/ShrinkReplicaAction.java: ## @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.tiered.storage.actions; + +import org.apache.kafka.tiered.storage.TieredStorageTestAction; +import org.apache.kafka.tiered.storage.TieredStorageTestContext; +import org.apache.kafka.clients.admin.NewPartitionReassignment; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.TopicPartitionInfo; +import org.apache.kafka.test.TestUtils; + +import java.io.PrintStream; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.ExecutionException; + +import static org.apache.kafka.tiered.storage.utils.ActionUtils.describeTopic; + +public final class ShrinkReplicaAction implements TieredStorageTestAction { Review Comment: Kafka does not support decreasing the number of partitions but shrinking the replication factor is supported via `ALTER_PARTITION_REASSIGNMENTS` API. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kamalcph commented on a diff in pull request #14116: KAFKA-15167: Tiered Storage Test Harness Framework
kamalcph commented on code in PR #14116: URL: https://github.com/apache/kafka/pull/14116#discussion_r1308567665 ## storage/src/test/java/org/apache/kafka/tiered/storage/utils/BrokerLocalStorage.java: ## @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.tiered.storage.utils; + +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Timer; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.storage.internals.log.LogFileUtils; + +import java.io.File; +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Objects; +import java.util.Optional; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; +import java.util.stream.Collectors; + +public final class BrokerLocalStorage { + +private final Integer brokerId; +private final File brokerStorageDirectory; +private final Integer storageWaitTimeoutSec; + +private final int storagePollPeriodSec = 1; +private final Time time = Time.SYSTEM; + +public BrokerLocalStorage(Integer brokerId, + String storageDirname, + Integer storageWaitTimeoutSec) { +this.brokerId = brokerId; +this.brokerStorageDirectory = new File(storageDirname); +this.storageWaitTimeoutSec = storageWaitTimeoutSec; +} + +public Integer getBrokerId() { +return brokerId; +} + +/** + * Wait until the first segment offset in Apache Kafka storage for the given topic-partition is + * equal or greater to the provided offset. Review Comment: ack, typo error. Will update the comment in the next PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kamalcph commented on a diff in pull request #14116: KAFKA-15167: Tiered Storage Test Harness Framework
kamalcph commented on code in PR #14116: URL: https://github.com/apache/kafka/pull/14116#discussion_r1294653156 ## storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestReport.java: ## @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.tiered.storage; + +import org.apache.kafka.tiered.storage.utils.DumpLocalTieredStorage; +import org.apache.kafka.server.log.remote.storage.LocalTieredStorage; + +import java.io.PrintStream; +import java.util.ArrayList; +import java.util.List; + +public final class TieredStorageTestReport { + +private final TieredStorageTestContext context; +private final List successfulActions = new ArrayList<>(); +private final List failedActions = new ArrayList<>(); + +public TieredStorageTestReport(TieredStorageTestContext context) { +this.context = context; +} + +public void addSucceeded(TieredStorageTestAction action) { +synchronized (this) { +successfulActions.add(action); +} +} + +public void addFailed(TieredStorageTestAction action) { +synchronized (this) { +failedActions.add(action); +} +} + +public void print(PrintStream output) { +output.println(); +int seqNo = 0; +List> actionsLists = new ArrayList<>(); +actionsLists.add(successfulActions); +actionsLists.add(failedActions); + +List statusList = new ArrayList<>(); +statusList.add("SUCCESS"); +statusList.add("FAILURE"); + +for (int i = 0; i < actionsLists.size(); i++) { +List actions = actionsLists.get(i); +String ident = statusList.get(i); +for (TieredStorageTestAction action : actions) { +seqNo++; +output.print("[" + ident + "] (" + seqNo + ") "); +action.describe(output); +output.println(); +} +} +String lts = ""; +if (!context.getTieredStorages().isEmpty()) { Review Comment: If the test is setup correctly, then the `getTieredStorages` (or) `remoteStorageManagers` won't be empty. Added the `isEmpty` check to avoid NPE. We can remove/refactor the code later. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kamalcph commented on a diff in pull request #14116: KAFKA-15167: Tiered Storage Test Harness Framework
kamalcph commented on code in PR #14116: URL: https://github.com/apache/kafka/pull/14116#discussion_r1294640475 ## storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestHarness.java: ## @@ -0,0 +1,204 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.tiered.storage; + +import org.apache.kafka.tiered.storage.utils.BrokerLocalStorage; +import kafka.api.IntegrationTestHarness; +import kafka.log.remote.RemoteLogManager; +import kafka.server.KafkaBroker; +import kafka.server.KafkaConfig; +import kafka.utils.TestUtils; +import org.apache.kafka.common.replica.ReplicaSelector; +import org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManager; +import org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig; +import org.apache.kafka.server.log.remote.storage.ClassLoaderAwareRemoteStorageManager; +import org.apache.kafka.server.log.remote.storage.LocalTieredStorage; +import org.apache.kafka.server.log.remote.storage.RemoteStorageManager; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInfo; +import scala.collection.Seq; + +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import java.util.Properties; +import java.util.stream.Collectors; + +import scala.collection.JavaConverters; + +import static org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP; +import static org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.REMOTE_STORAGE_MANAGER_CLASS_NAME_PROP; +import static org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP; +import static org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.REMOTE_LOG_MANAGER_TASK_INTERVAL_MS_PROP; + +import static org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX_PROP; +import static org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.REMOTE_STORAGE_MANAGER_CONFIG_PREFIX_PROP; + +import static org.apache.kafka.server.log.remote.storage.LocalTieredStorage.DELETE_ON_CLOSE_CONFIG; +import static org.apache.kafka.server.log.remote.storage.LocalTieredStorage.STORAGE_DIR_CONFIG; + +/** + * Base class for integration tests exercising the tiered storage functionality in Apache Kafka. + */ +@Tag("integration") +public abstract class TieredStorageTestHarness extends IntegrationTestHarness { + +/** + * InitialTaskDelayMs is set to 30 seconds for the delete-segment scheduler in Apache Kafka. + * Hence, we need to wait at least that amount of time before segments eligible for deletion + * gets physically removed. + */ +private static final Integer STORAGE_WAIT_TIMEOUT_SEC = 35; + +private TieredStorageTestContext context; + +protected int numRemoteLogMetadataPartitions() { +return 5; +} + +@SuppressWarnings("deprecation") +@Override +public void modifyConfigs(Seq props) { +for (Properties p : JavaConverters.seqAsJavaList(props)) { +p.putAll(overridingProps()); +} +} + +public Properties overridingProps() { +Properties overridingProps = new Properties(); +// Configure the tiered storage in Kafka. Set an interval of 1 second for the remote log manager background +// activity to ensure the tiered storage has enough room to be exercised within the lifetime of a test. +// +// The replication factor of the remote log metadata topic needs to be chosen so that in resiliency +// tests, metadata can survive the loss of one replica for its topic-partitions. +// +// The second-tier storage system is mocked via the LocalTieredStorage instance which persists transferred +// data files on the local file system. +overridingProps.setProperty(REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, "true"); +
[GitHub] [kafka] kamalcph commented on a diff in pull request #14116: KAFKA-15167: Tiered Storage Test Harness Framework
kamalcph commented on code in PR #14116: URL: https://github.com/apache/kafka/pull/14116#discussion_r1294609562 ## storage/src/test/java/org/apache/kafka/tiered/storage/README.md: ## @@ -0,0 +1,9 @@ +Step 1: For every test, setup is done via TieredStorageTestHarness which extends IntegrationTestHarness and sets up a cluster with TS enabled on it. Review Comment: Will add detailed steps later once we start to enable the tests. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kamalcph commented on a diff in pull request #14116: KAFKA-15167: Tiered Storage Test Harness Framework
kamalcph commented on code in PR #14116: URL: https://github.com/apache/kafka/pull/14116#discussion_r1294583210 ## storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestContext.java: ## @@ -0,0 +1,348 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.tiered.storage; + +import org.apache.kafka.tiered.storage.specs.ExpandPartitionCountSpec; +import org.apache.kafka.tiered.storage.specs.TopicSpec; +import org.apache.kafka.tiered.storage.utils.BrokerLocalStorage; +import kafka.log.UnifiedLog; +import kafka.server.KafkaBroker; +import kafka.utils.TestUtils; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.AlterConfigOp; +import org.apache.kafka.clients.admin.AlterConfigsOptions; +import org.apache.kafka.clients.admin.ConfigEntry; +import org.apache.kafka.clients.admin.NewPartitions; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.admin.TopicDescription; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.TopicPartitionInfo; +import org.apache.kafka.common.config.ConfigResource; +import org.apache.kafka.common.network.ListenerName; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.metadata.BrokerState; +import org.apache.kafka.server.log.remote.storage.LocalTieredStorage; +import org.apache.kafka.server.log.remote.storage.LocalTieredStorageHistory; +import org.apache.kafka.server.log.remote.storage.LocalTieredStorageSnapshot; +import scala.Function0; +import scala.Function1; + +import java.io.IOException; +import java.io.PrintStream; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Properties; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.stream.Collectors; + +import scala.Option; +import scala.collection.Seq; + +import static org.apache.kafka.clients.CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG; +import static org.apache.kafka.clients.producer.ProducerConfig.LINGER_MS_CONFIG; + +public final class TieredStorageTestContext { + +private final Seq brokers; +private final Properties producerConfig; +private final Properties consumerConfig; +private final ListenerName listenerName; + +private final Serializer ser; +private final Deserializer de; + +private final Map topicSpecs; +private final TieredStorageTestReport testReport; + +private volatile KafkaProducer producer; +private volatile KafkaConsumer consumer; +private volatile Admin admin; +private volatile List tieredStorages; +private volatile List localStorages; + +public TieredStorageTestContext(Seq brokers, +Properties producerConfig, +Properties consumerConfig, +ListenerName listenerName) { +this.brokers = brokers; +this.producerConfig = producerConfig; +this.consumerConfig = consumerConfig; +this.listenerName = listenerName; +this.ser = Serdes.String().serializer(); +this.de = Serdes.String().deserializer(); +this.topicSpecs = new HashMap<>(); +this.testReport = new TieredStorageTestReport(this); +initContext(); +} + +private void initContext() { +String bootstrapServers = TestUtils.bootstrapServers(brokers, listenerName); +producerConfig.put(BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); +// Set a producer linger of
[GitHub] [kafka] kamalcph commented on a diff in pull request #14116: KAFKA-15167: Tiered Storage Test Harness Framework
kamalcph commented on code in PR #14116: URL: https://github.com/apache/kafka/pull/14116#discussion_r1294503554 ## storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestContext.java: ## @@ -0,0 +1,339 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.tiered.storage; + +import org.apache.kafka.tiered.storage.specs.ExpandPartitionCountSpec; +import org.apache.kafka.tiered.storage.specs.TopicSpec; +import org.apache.kafka.tiered.storage.utils.BrokerLocalStorage; +import kafka.log.UnifiedLog; +import kafka.server.KafkaBroker; +import kafka.utils.TestUtils; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.AlterConfigOp; +import org.apache.kafka.clients.admin.AlterConfigsOptions; +import org.apache.kafka.clients.admin.ConfigEntry; +import org.apache.kafka.clients.admin.NewPartitions; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.admin.TopicDescription; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.TopicPartitionInfo; +import org.apache.kafka.common.config.ConfigResource; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.metadata.BrokerState; +import org.apache.kafka.server.log.remote.storage.LocalTieredStorage; +import org.apache.kafka.server.log.remote.storage.LocalTieredStorageHistory; +import org.apache.kafka.server.log.remote.storage.LocalTieredStorageSnapshot; +import scala.Function0; +import scala.Function1; + +import java.io.IOException; +import java.io.PrintStream; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Properties; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.stream.Collectors; + +import scala.Option; +import scala.collection.Seq; + +import static org.apache.kafka.clients.producer.ProducerConfig.LINGER_MS_CONFIG; + +public final class TieredStorageTestContext { + +private final Seq brokers; +private final Properties producerConfig; +private final Properties consumerConfig; +private final Properties adminConfig; + +private final Serializer ser; +private final Deserializer de; + +private final Map topicSpecs; +private final TieredStorageTestReport testReport; + +private volatile KafkaProducer producer; +private volatile KafkaConsumer consumer; +private volatile Admin admin; +private volatile List tieredStorages; +private volatile List localStorages; + +public TieredStorageTestContext(Seq brokers, +Properties producerConfig, +Properties consumerConfig, +Properties adminConfig) { +this.brokers = brokers; +this.producerConfig = producerConfig; +this.consumerConfig = consumerConfig; +this.adminConfig = adminConfig; +this.ser = Serdes.String().serializer(); +this.de = Serdes.String().deserializer(); +this.topicSpecs = new HashMap<>(); +this.testReport = new TieredStorageTestReport(this); +initContext(); +} + +private void initContext() { +// Set a producer linger of 60 seconds, in order to optimistically generate batches of +// records with a pre-determined size. +producerConfig.put(LINGER_MS_CONFIG, String.valueOf(TimeUnit.SECONDS.toMillis(60))); +producer = new KafkaProducer<>(producerConfig, ser, ser); +consumer = new
[GitHub] [kafka] kamalcph commented on a diff in pull request #14116: KAFKA-15167: Tiered Storage Test Harness Framework
kamalcph commented on code in PR #14116: URL: https://github.com/apache/kafka/pull/14116#discussion_r1286059074 ## core/src/main/scala/kafka/server/KafkaBroker.scala: ## @@ -19,6 +19,7 @@ package kafka.server import com.yammer.metrics.core.MetricName import kafka.log.LogManager +import kafka.log.remote.RemoteLogManager Review Comment: This import is being used in L83. Removed the unused import "scala.collection.Seq" from this class, but it throwing below error, so reverted it back since this change is unrelated to this PR: ``` ❯ ./gradlew :storage:compileTestJava -PscalaVersion=2.13 > Configure project : Starting build with version 3.6.0-SNAPSHOT (commit id d109f88d) using Gradle 8.2.1, Java 11 and Scala 2.13.11 Build properties: maxParallelForks=12, maxScalacThreads=8, maxTestRetries=0 > Task :core:compileScala [Error] /Users/kchandraprakash/projects/kafka/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala:910:69: type mismatch; found : scala.collection.mutable.Buffer[org.apache.kafka.common.metrics.MetricsReporter] required: Seq[AnyRef] [Error] /Users/kchandraprakash/projects/kafka/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala:915:61: type mismatch; found : scala.collection.mutable.Buffer[org.apache.kafka.common.metrics.MetricsReporter] required: Seq[AnyRef] [Error] /Users/kchandraprakash/projects/kafka/core/src/main/scala/kafka/server/KafkaServer.scala:267:77: type mismatch; found : Seq[Object] (in scala.collection) required: Seq[AnyRef] (in scala.collection.immutable) [Error] /Users/kchandraprakash/projects/kafka/core/src/main/scala/kafka/server/KafkaServer.scala:117:15: private val kafkaMetricsReporters in class KafkaServer is never used four errors found ``` ## core/src/main/scala/kafka/server/KafkaBroker.scala: ## @@ -19,6 +19,7 @@ package kafka.server import com.yammer.metrics.core.MetricName import kafka.log.LogManager +import kafka.log.remote.RemoteLogManager Review Comment: This import is being used in L83. Removed the unused import "scala.collection.Seq" from this class, but it throwing below error, so reverted it back since the change is unrelated to this PR: ``` ❯ ./gradlew :storage:compileTestJava -PscalaVersion=2.13 > Configure project : Starting build with version 3.6.0-SNAPSHOT (commit id d109f88d) using Gradle 8.2.1, Java 11 and Scala 2.13.11 Build properties: maxParallelForks=12, maxScalacThreads=8, maxTestRetries=0 > Task :core:compileScala [Error] /Users/kchandraprakash/projects/kafka/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala:910:69: type mismatch; found : scala.collection.mutable.Buffer[org.apache.kafka.common.metrics.MetricsReporter] required: Seq[AnyRef] [Error] /Users/kchandraprakash/projects/kafka/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala:915:61: type mismatch; found : scala.collection.mutable.Buffer[org.apache.kafka.common.metrics.MetricsReporter] required: Seq[AnyRef] [Error] /Users/kchandraprakash/projects/kafka/core/src/main/scala/kafka/server/KafkaServer.scala:267:77: type mismatch; found : Seq[Object] (in scala.collection) required: Seq[AnyRef] (in scala.collection.immutable) [Error] /Users/kchandraprakash/projects/kafka/core/src/main/scala/kafka/server/KafkaServer.scala:117:15: private val kafkaMetricsReporters in class KafkaServer is never used four errors found ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kamalcph commented on a diff in pull request #14116: KAFKA-15167: Tiered Storage Test Harness Framework
kamalcph commented on code in PR #14116: URL: https://github.com/apache/kafka/pull/14116#discussion_r1286072822 ## storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestBuilder.java: ## @@ -0,0 +1,427 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.tiered.storage; + +import org.apache.kafka.tiered.storage.actions.BounceBrokerAction; +import org.apache.kafka.tiered.storage.actions.ConsumeAction; +import org.apache.kafka.tiered.storage.actions.CreatePartitionsAction; +import org.apache.kafka.tiered.storage.actions.CreateTopicAction; +import org.apache.kafka.tiered.storage.actions.DeleteRecordsAction; +import org.apache.kafka.tiered.storage.actions.DeleteTopicAction; +import org.apache.kafka.tiered.storage.actions.EraseBrokerStorageAction; +import org.apache.kafka.tiered.storage.actions.ExpectBrokerInISRAction; +import org.apache.kafka.tiered.storage.actions.ExpectEmptyRemoteStorageAction; +import org.apache.kafka.tiered.storage.actions.ExpectLeaderAction; +import org.apache.kafka.tiered.storage.actions.ExpectLeaderEpochCheckpointAction; +import org.apache.kafka.tiered.storage.actions.ExpectListOffsetsAction; +import org.apache.kafka.tiered.storage.actions.ExpectTopicIdToMatchInRemoteStorageAction; +import org.apache.kafka.tiered.storage.actions.ExpectUserTopicMappedToMetadataPartitionsAction; +import org.apache.kafka.tiered.storage.actions.ProduceAction; +import org.apache.kafka.tiered.storage.actions.ReassignReplicaAction; +import org.apache.kafka.tiered.storage.actions.ShrinkReplicaAction; +import org.apache.kafka.tiered.storage.actions.StartBrokerAction; +import org.apache.kafka.tiered.storage.actions.StopBrokerAction; +import org.apache.kafka.tiered.storage.actions.UpdateBrokerConfigAction; +import org.apache.kafka.tiered.storage.actions.UpdateTopicConfigAction; +import org.apache.kafka.tiered.storage.specs.ConsumableSpec; +import org.apache.kafka.tiered.storage.specs.DeletableSpec; +import org.apache.kafka.tiered.storage.specs.ExpandPartitionCountSpec; +import org.apache.kafka.tiered.storage.specs.FetchableSpec; +import org.apache.kafka.tiered.storage.specs.KeyValueSpec; +import org.apache.kafka.tiered.storage.specs.OffloadableSpec; +import org.apache.kafka.tiered.storage.specs.OffloadedSegmentSpec; +import org.apache.kafka.tiered.storage.specs.ProducableSpec; +import org.apache.kafka.tiered.storage.specs.RemoteDeleteSegmentSpec; +import org.apache.kafka.tiered.storage.specs.RemoteFetchSpec; +import org.apache.kafka.tiered.storage.specs.TopicSpec; +import org.apache.kafka.clients.admin.OffsetSpec; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.config.TopicConfig; +import org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent; +import org.apache.kafka.storage.internals.log.EpochEntry; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +public final class TieredStorageTestBuilder { + +private final int defaultProducedBatchSize = 1; +private final long defaultEarliestLocalOffsetExpectedInLogDirectory = 0; + +private Map producables = new HashMap<>(); +private Map> offloadables = new HashMap<>(); +private Map consumables = new HashMap<>(); +private Map fetchables = new HashMap<>(); +private Map> deletables = new HashMap<>(); +private List actions = new ArrayList<>(); + +public TieredStorageTestBuilder() { +} + +public TieredStorageTestBuilder createTopic(String topic, +Integer partitionCount, +Integer replicationFactor, +Integer maxBatchCountPerSegment, +Map> replicaAssignment, +Boolean enableRemoteLogStorage) { +assert maxBatchCountPerSegment >= 1 : "Segments size for topic " + topic + " needs to be >= 1"; +assert partitionCount >= 1 : "Partition count for topic " + topic + " needs to be >=
[GitHub] [kafka] kamalcph commented on a diff in pull request #14116: KAFKA-15167: Tiered Storage Test Harness Framework
kamalcph commented on code in PR #14116: URL: https://github.com/apache/kafka/pull/14116#discussion_r1286074651 ## storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestBuilder.java: ## @@ -0,0 +1,427 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.tiered.storage; + +import org.apache.kafka.tiered.storage.actions.BounceBrokerAction; +import org.apache.kafka.tiered.storage.actions.ConsumeAction; +import org.apache.kafka.tiered.storage.actions.CreatePartitionsAction; +import org.apache.kafka.tiered.storage.actions.CreateTopicAction; +import org.apache.kafka.tiered.storage.actions.DeleteRecordsAction; +import org.apache.kafka.tiered.storage.actions.DeleteTopicAction; +import org.apache.kafka.tiered.storage.actions.EraseBrokerStorageAction; +import org.apache.kafka.tiered.storage.actions.ExpectBrokerInISRAction; +import org.apache.kafka.tiered.storage.actions.ExpectEmptyRemoteStorageAction; +import org.apache.kafka.tiered.storage.actions.ExpectLeaderAction; +import org.apache.kafka.tiered.storage.actions.ExpectLeaderEpochCheckpointAction; +import org.apache.kafka.tiered.storage.actions.ExpectListOffsetsAction; +import org.apache.kafka.tiered.storage.actions.ExpectTopicIdToMatchInRemoteStorageAction; +import org.apache.kafka.tiered.storage.actions.ExpectUserTopicMappedToMetadataPartitionsAction; +import org.apache.kafka.tiered.storage.actions.ProduceAction; +import org.apache.kafka.tiered.storage.actions.ReassignReplicaAction; +import org.apache.kafka.tiered.storage.actions.ShrinkReplicaAction; +import org.apache.kafka.tiered.storage.actions.StartBrokerAction; +import org.apache.kafka.tiered.storage.actions.StopBrokerAction; +import org.apache.kafka.tiered.storage.actions.UpdateBrokerConfigAction; +import org.apache.kafka.tiered.storage.actions.UpdateTopicConfigAction; +import org.apache.kafka.tiered.storage.specs.ConsumableSpec; +import org.apache.kafka.tiered.storage.specs.DeletableSpec; +import org.apache.kafka.tiered.storage.specs.ExpandPartitionCountSpec; +import org.apache.kafka.tiered.storage.specs.FetchableSpec; +import org.apache.kafka.tiered.storage.specs.KeyValueSpec; +import org.apache.kafka.tiered.storage.specs.OffloadableSpec; +import org.apache.kafka.tiered.storage.specs.OffloadedSegmentSpec; +import org.apache.kafka.tiered.storage.specs.ProducableSpec; +import org.apache.kafka.tiered.storage.specs.RemoteDeleteSegmentSpec; +import org.apache.kafka.tiered.storage.specs.RemoteFetchSpec; +import org.apache.kafka.tiered.storage.specs.TopicSpec; +import org.apache.kafka.clients.admin.OffsetSpec; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.config.TopicConfig; +import org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent; +import org.apache.kafka.storage.internals.log.EpochEntry; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +public final class TieredStorageTestBuilder { + +private final int defaultProducedBatchSize = 1; +private final long defaultEarliestLocalOffsetExpectedInLogDirectory = 0; + +private Map producables = new HashMap<>(); +private Map> offloadables = new HashMap<>(); +private Map consumables = new HashMap<>(); +private Map fetchables = new HashMap<>(); +private Map> deletables = new HashMap<>(); +private List actions = new ArrayList<>(); + +public TieredStorageTestBuilder() { +} + +public TieredStorageTestBuilder createTopic(String topic, +Integer partitionCount, +Integer replicationFactor, +Integer maxBatchCountPerSegment, +Map> replicaAssignment, +Boolean enableRemoteLogStorage) { +assert maxBatchCountPerSegment >= 1 : "Segments size for topic " + topic + " needs to be >= 1"; +assert partitionCount >= 1 : "Partition count for topic " + topic + " needs to be >=
[GitHub] [kafka] kamalcph commented on a diff in pull request #14116: KAFKA-15167: Tiered Storage Test Harness Framework
kamalcph commented on code in PR #14116: URL: https://github.com/apache/kafka/pull/14116#discussion_r1286072822 ## storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestBuilder.java: ## @@ -0,0 +1,427 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.tiered.storage; + +import org.apache.kafka.tiered.storage.actions.BounceBrokerAction; +import org.apache.kafka.tiered.storage.actions.ConsumeAction; +import org.apache.kafka.tiered.storage.actions.CreatePartitionsAction; +import org.apache.kafka.tiered.storage.actions.CreateTopicAction; +import org.apache.kafka.tiered.storage.actions.DeleteRecordsAction; +import org.apache.kafka.tiered.storage.actions.DeleteTopicAction; +import org.apache.kafka.tiered.storage.actions.EraseBrokerStorageAction; +import org.apache.kafka.tiered.storage.actions.ExpectBrokerInISRAction; +import org.apache.kafka.tiered.storage.actions.ExpectEmptyRemoteStorageAction; +import org.apache.kafka.tiered.storage.actions.ExpectLeaderAction; +import org.apache.kafka.tiered.storage.actions.ExpectLeaderEpochCheckpointAction; +import org.apache.kafka.tiered.storage.actions.ExpectListOffsetsAction; +import org.apache.kafka.tiered.storage.actions.ExpectTopicIdToMatchInRemoteStorageAction; +import org.apache.kafka.tiered.storage.actions.ExpectUserTopicMappedToMetadataPartitionsAction; +import org.apache.kafka.tiered.storage.actions.ProduceAction; +import org.apache.kafka.tiered.storage.actions.ReassignReplicaAction; +import org.apache.kafka.tiered.storage.actions.ShrinkReplicaAction; +import org.apache.kafka.tiered.storage.actions.StartBrokerAction; +import org.apache.kafka.tiered.storage.actions.StopBrokerAction; +import org.apache.kafka.tiered.storage.actions.UpdateBrokerConfigAction; +import org.apache.kafka.tiered.storage.actions.UpdateTopicConfigAction; +import org.apache.kafka.tiered.storage.specs.ConsumableSpec; +import org.apache.kafka.tiered.storage.specs.DeletableSpec; +import org.apache.kafka.tiered.storage.specs.ExpandPartitionCountSpec; +import org.apache.kafka.tiered.storage.specs.FetchableSpec; +import org.apache.kafka.tiered.storage.specs.KeyValueSpec; +import org.apache.kafka.tiered.storage.specs.OffloadableSpec; +import org.apache.kafka.tiered.storage.specs.OffloadedSegmentSpec; +import org.apache.kafka.tiered.storage.specs.ProducableSpec; +import org.apache.kafka.tiered.storage.specs.RemoteDeleteSegmentSpec; +import org.apache.kafka.tiered.storage.specs.RemoteFetchSpec; +import org.apache.kafka.tiered.storage.specs.TopicSpec; +import org.apache.kafka.clients.admin.OffsetSpec; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.config.TopicConfig; +import org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent; +import org.apache.kafka.storage.internals.log.EpochEntry; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +public final class TieredStorageTestBuilder { + +private final int defaultProducedBatchSize = 1; +private final long defaultEarliestLocalOffsetExpectedInLogDirectory = 0; + +private Map producables = new HashMap<>(); +private Map> offloadables = new HashMap<>(); +private Map consumables = new HashMap<>(); +private Map fetchables = new HashMap<>(); +private Map> deletables = new HashMap<>(); +private List actions = new ArrayList<>(); + +public TieredStorageTestBuilder() { +} + +public TieredStorageTestBuilder createTopic(String topic, +Integer partitionCount, +Integer replicationFactor, +Integer maxBatchCountPerSegment, +Map> replicaAssignment, +Boolean enableRemoteLogStorage) { +assert maxBatchCountPerSegment >= 1 : "Segments size for topic " + topic + " needs to be >= 1"; +assert partitionCount >= 1 : "Partition count for topic " + topic + " needs to be >=
[GitHub] [kafka] kamalcph commented on a diff in pull request #14116: KAFKA-15167: Tiered Storage Test Harness Framework
kamalcph commented on code in PR #14116: URL: https://github.com/apache/kafka/pull/14116#discussion_r1286066810 ## storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestContext.java: ## @@ -0,0 +1,348 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.tiered.storage; + +import org.apache.kafka.tiered.storage.specs.ExpandPartitionCountSpec; +import org.apache.kafka.tiered.storage.specs.TopicSpec; +import org.apache.kafka.tiered.storage.utils.BrokerLocalStorage; +import kafka.log.UnifiedLog; +import kafka.server.KafkaBroker; +import kafka.utils.TestUtils; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.AlterConfigOp; +import org.apache.kafka.clients.admin.AlterConfigsOptions; +import org.apache.kafka.clients.admin.ConfigEntry; +import org.apache.kafka.clients.admin.NewPartitions; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.admin.TopicDescription; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.TopicPartitionInfo; +import org.apache.kafka.common.config.ConfigResource; +import org.apache.kafka.common.network.ListenerName; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.metadata.BrokerState; +import org.apache.kafka.server.log.remote.storage.LocalTieredStorage; +import org.apache.kafka.server.log.remote.storage.LocalTieredStorageHistory; +import org.apache.kafka.server.log.remote.storage.LocalTieredStorageSnapshot; +import scala.Function0; +import scala.Function1; + +import java.io.IOException; +import java.io.PrintStream; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Properties; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.stream.Collectors; + +import scala.Option; +import scala.collection.Seq; + +import static org.apache.kafka.clients.CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG; +import static org.apache.kafka.clients.producer.ProducerConfig.LINGER_MS_CONFIG; + +public final class TieredStorageTestContext { Review Comment: `IntegrationTestHarness` creates instance for each action invocation. We can do the refactoring later if required. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kamalcph commented on a diff in pull request #14116: KAFKA-15167: Tiered Storage Test Harness Framework
kamalcph commented on code in PR #14116: URL: https://github.com/apache/kafka/pull/14116#discussion_r1286064263 ## storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestContext.java: ## @@ -0,0 +1,348 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.tiered.storage; + +import org.apache.kafka.tiered.storage.specs.ExpandPartitionCountSpec; +import org.apache.kafka.tiered.storage.specs.TopicSpec; +import org.apache.kafka.tiered.storage.utils.BrokerLocalStorage; +import kafka.log.UnifiedLog; +import kafka.server.KafkaBroker; +import kafka.utils.TestUtils; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.AlterConfigOp; +import org.apache.kafka.clients.admin.AlterConfigsOptions; +import org.apache.kafka.clients.admin.ConfigEntry; +import org.apache.kafka.clients.admin.NewPartitions; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.admin.TopicDescription; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.TopicPartitionInfo; +import org.apache.kafka.common.config.ConfigResource; +import org.apache.kafka.common.network.ListenerName; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.metadata.BrokerState; +import org.apache.kafka.server.log.remote.storage.LocalTieredStorage; +import org.apache.kafka.server.log.remote.storage.LocalTieredStorageHistory; +import org.apache.kafka.server.log.remote.storage.LocalTieredStorageSnapshot; +import scala.Function0; +import scala.Function1; + +import java.io.IOException; +import java.io.PrintStream; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Properties; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.stream.Collectors; + +import scala.Option; +import scala.collection.Seq; + +import static org.apache.kafka.clients.CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG; +import static org.apache.kafka.clients.producer.ProducerConfig.LINGER_MS_CONFIG; + +public final class TieredStorageTestContext { + +private final Seq brokers; +private final Properties producerConfig; +private final Properties consumerConfig; +private final ListenerName listenerName; + +private final Serializer ser; +private final Deserializer de; + +private final Map topicSpecs; +private final TieredStorageTestReport testReport; + +private volatile KafkaProducer producer; +private volatile KafkaConsumer consumer; +private volatile Admin admin; +private volatile List tieredStorages; +private volatile List localStorages; + +public TieredStorageTestContext(Seq brokers, +Properties producerConfig, +Properties consumerConfig, +ListenerName listenerName) { +this.brokers = brokers; +this.producerConfig = producerConfig; +this.consumerConfig = consumerConfig; +this.listenerName = listenerName; +this.ser = Serdes.String().serializer(); +this.de = Serdes.String().deserializer(); +this.topicSpecs = new HashMap<>(); +this.testReport = new TieredStorageTestReport(this); +initContext(); +} + +private void initContext() { +String bootstrapServers = TestUtils.bootstrapServers(brokers, listenerName); +producerConfig.put(BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); +// Set a producer linger of
[GitHub] [kafka] kamalcph commented on a diff in pull request #14116: KAFKA-15167: Tiered Storage Test Harness Framework
kamalcph commented on code in PR #14116: URL: https://github.com/apache/kafka/pull/14116#discussion_r1286060966 ## storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestContext.java: ## @@ -0,0 +1,348 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.tiered.storage; + +import org.apache.kafka.tiered.storage.specs.ExpandPartitionCountSpec; +import org.apache.kafka.tiered.storage.specs.TopicSpec; +import org.apache.kafka.tiered.storage.utils.BrokerLocalStorage; +import kafka.log.UnifiedLog; +import kafka.server.KafkaBroker; +import kafka.utils.TestUtils; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.AlterConfigOp; +import org.apache.kafka.clients.admin.AlterConfigsOptions; +import org.apache.kafka.clients.admin.ConfigEntry; +import org.apache.kafka.clients.admin.NewPartitions; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.admin.TopicDescription; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.TopicPartitionInfo; +import org.apache.kafka.common.config.ConfigResource; +import org.apache.kafka.common.network.ListenerName; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.metadata.BrokerState; +import org.apache.kafka.server.log.remote.storage.LocalTieredStorage; +import org.apache.kafka.server.log.remote.storage.LocalTieredStorageHistory; +import org.apache.kafka.server.log.remote.storage.LocalTieredStorageSnapshot; +import scala.Function0; +import scala.Function1; + +import java.io.IOException; +import java.io.PrintStream; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Properties; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.stream.Collectors; + +import scala.Option; +import scala.collection.Seq; + +import static org.apache.kafka.clients.CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG; +import static org.apache.kafka.clients.producer.ProducerConfig.LINGER_MS_CONFIG; + +public final class TieredStorageTestContext { + +private final Seq brokers; +private final Properties producerConfig; +private final Properties consumerConfig; +private final ListenerName listenerName; + +private final Serializer ser; +private final Deserializer de; + +private final Map topicSpecs; +private final TieredStorageTestReport testReport; + +private volatile KafkaProducer producer; +private volatile KafkaConsumer consumer; +private volatile Admin admin; +private volatile List tieredStorages; +private volatile List localStorages; + +public TieredStorageTestContext(Seq brokers, +Properties producerConfig, +Properties consumerConfig, +ListenerName listenerName) { +this.brokers = brokers; +this.producerConfig = producerConfig; +this.consumerConfig = consumerConfig; +this.listenerName = listenerName; +this.ser = Serdes.String().serializer(); +this.de = Serdes.String().deserializer(); +this.topicSpecs = new HashMap<>(); +this.testReport = new TieredStorageTestReport(this); +initContext(); +} + +private void initContext() { +String bootstrapServers = TestUtils.bootstrapServers(brokers, listenerName); +producerConfig.put(BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); +// Set a producer linger of
[GitHub] [kafka] kamalcph commented on a diff in pull request #14116: KAFKA-15167: Tiered Storage Test Harness Framework
kamalcph commented on code in PR #14116: URL: https://github.com/apache/kafka/pull/14116#discussion_r1286060584 ## storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestContext.java: ## @@ -0,0 +1,348 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.tiered.storage; + +import org.apache.kafka.tiered.storage.specs.ExpandPartitionCountSpec; +import org.apache.kafka.tiered.storage.specs.TopicSpec; +import org.apache.kafka.tiered.storage.utils.BrokerLocalStorage; +import kafka.log.UnifiedLog; +import kafka.server.KafkaBroker; +import kafka.utils.TestUtils; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.AlterConfigOp; +import org.apache.kafka.clients.admin.AlterConfigsOptions; +import org.apache.kafka.clients.admin.ConfigEntry; +import org.apache.kafka.clients.admin.NewPartitions; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.admin.TopicDescription; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.TopicPartitionInfo; +import org.apache.kafka.common.config.ConfigResource; +import org.apache.kafka.common.network.ListenerName; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.metadata.BrokerState; +import org.apache.kafka.server.log.remote.storage.LocalTieredStorage; +import org.apache.kafka.server.log.remote.storage.LocalTieredStorageHistory; +import org.apache.kafka.server.log.remote.storage.LocalTieredStorageSnapshot; +import scala.Function0; +import scala.Function1; + +import java.io.IOException; +import java.io.PrintStream; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Properties; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.stream.Collectors; + +import scala.Option; +import scala.collection.Seq; + +import static org.apache.kafka.clients.CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG; +import static org.apache.kafka.clients.producer.ProducerConfig.LINGER_MS_CONFIG; + +public final class TieredStorageTestContext { + +private final Seq brokers; +private final Properties producerConfig; +private final Properties consumerConfig; +private final ListenerName listenerName; + +private final Serializer ser; +private final Deserializer de; + +private final Map topicSpecs; +private final TieredStorageTestReport testReport; + +private volatile KafkaProducer producer; +private volatile KafkaConsumer consumer; +private volatile Admin admin; +private volatile List tieredStorages; +private volatile List localStorages; + +public TieredStorageTestContext(Seq brokers, +Properties producerConfig, +Properties consumerConfig, +ListenerName listenerName) { +this.brokers = brokers; +this.producerConfig = producerConfig; +this.consumerConfig = consumerConfig; +this.listenerName = listenerName; +this.ser = Serdes.String().serializer(); +this.de = Serdes.String().deserializer(); +this.topicSpecs = new HashMap<>(); +this.testReport = new TieredStorageTestReport(this); +initContext(); +} + +private void initContext() { +String bootstrapServers = TestUtils.bootstrapServers(brokers, listenerName); +producerConfig.put(BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); +// Set a producer linger of
[GitHub] [kafka] kamalcph commented on a diff in pull request #14116: KAFKA-15167: Tiered Storage Test Harness Framework
kamalcph commented on code in PR #14116: URL: https://github.com/apache/kafka/pull/14116#discussion_r1286060074 ## storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestContext.java: ## @@ -0,0 +1,348 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.tiered.storage; + +import org.apache.kafka.tiered.storage.specs.ExpandPartitionCountSpec; +import org.apache.kafka.tiered.storage.specs.TopicSpec; +import org.apache.kafka.tiered.storage.utils.BrokerLocalStorage; +import kafka.log.UnifiedLog; +import kafka.server.KafkaBroker; +import kafka.utils.TestUtils; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.AlterConfigOp; +import org.apache.kafka.clients.admin.AlterConfigsOptions; +import org.apache.kafka.clients.admin.ConfigEntry; +import org.apache.kafka.clients.admin.NewPartitions; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.admin.TopicDescription; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.TopicPartitionInfo; +import org.apache.kafka.common.config.ConfigResource; +import org.apache.kafka.common.network.ListenerName; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.metadata.BrokerState; +import org.apache.kafka.server.log.remote.storage.LocalTieredStorage; +import org.apache.kafka.server.log.remote.storage.LocalTieredStorageHistory; +import org.apache.kafka.server.log.remote.storage.LocalTieredStorageSnapshot; +import scala.Function0; +import scala.Function1; + +import java.io.IOException; +import java.io.PrintStream; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Properties; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.stream.Collectors; + +import scala.Option; +import scala.collection.Seq; + +import static org.apache.kafka.clients.CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG; +import static org.apache.kafka.clients.producer.ProducerConfig.LINGER_MS_CONFIG; + +public final class TieredStorageTestContext { + +private final Seq brokers; +private final Properties producerConfig; +private final Properties consumerConfig; +private final ListenerName listenerName; + +private final Serializer ser; +private final Deserializer de; + +private final Map topicSpecs; +private final TieredStorageTestReport testReport; + +private volatile KafkaProducer producer; +private volatile KafkaConsumer consumer; +private volatile Admin admin; +private volatile List tieredStorages; +private volatile List localStorages; + +public TieredStorageTestContext(Seq brokers, +Properties producerConfig, +Properties consumerConfig, +ListenerName listenerName) { +this.brokers = brokers; +this.producerConfig = producerConfig; +this.consumerConfig = consumerConfig; +this.listenerName = listenerName; +this.ser = Serdes.String().serializer(); +this.de = Serdes.String().deserializer(); +this.topicSpecs = new HashMap<>(); +this.testReport = new TieredStorageTestReport(this); +initContext(); +} + +private void initContext() { +String bootstrapServers = TestUtils.bootstrapServers(brokers, listenerName); +producerConfig.put(BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); +// Set a producer linger of
[GitHub] [kafka] kamalcph commented on a diff in pull request #14116: KAFKA-15167: Tiered Storage Test Harness Framework
kamalcph commented on code in PR #14116: URL: https://github.com/apache/kafka/pull/14116#discussion_r1286059074 ## core/src/main/scala/kafka/server/KafkaBroker.scala: ## @@ -19,6 +19,7 @@ package kafka.server import com.yammer.metrics.core.MetricName import kafka.log.LogManager +import kafka.log.remote.RemoteLogManager Review Comment: This import is being used in L83. Removed the unused import "scala.collection.Seq" from this class, but it throwing below error, so reverted it back: ``` ❯ ./gradlew :storage:compileTestJava -PscalaVersion=2.13 > Configure project : Starting build with version 3.6.0-SNAPSHOT (commit id d109f88d) using Gradle 8.2.1, Java 11 and Scala 2.13.11 Build properties: maxParallelForks=12, maxScalacThreads=8, maxTestRetries=0 > Task :core:compileScala [Error] /Users/kchandraprakash/projects/kafka/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala:910:69: type mismatch; found : scala.collection.mutable.Buffer[org.apache.kafka.common.metrics.MetricsReporter] required: Seq[AnyRef] [Error] /Users/kchandraprakash/projects/kafka/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala:915:61: type mismatch; found : scala.collection.mutable.Buffer[org.apache.kafka.common.metrics.MetricsReporter] required: Seq[AnyRef] [Error] /Users/kchandraprakash/projects/kafka/core/src/main/scala/kafka/server/KafkaServer.scala:267:77: type mismatch; found : Seq[Object] (in scala.collection) required: Seq[AnyRef] (in scala.collection.immutable) [Error] /Users/kchandraprakash/projects/kafka/core/src/main/scala/kafka/server/KafkaServer.scala:117:15: private val kafkaMetricsReporters in class KafkaServer is never used four errors found ``` ## core/src/main/scala/kafka/server/KafkaBroker.scala: ## @@ -19,6 +19,7 @@ package kafka.server import com.yammer.metrics.core.MetricName import kafka.log.LogManager +import kafka.log.remote.RemoteLogManager Review Comment: This import is being used in L83. Removed the unused import "scala.collection.Seq" from this class, but it throwing below error, so reverted it back: ``` ❯ ./gradlew :storage:compileTestJava -PscalaVersion=2.13 > Configure project : Starting build with version 3.6.0-SNAPSHOT (commit id d109f88d) using Gradle 8.2.1, Java 11 and Scala 2.13.11 Build properties: maxParallelForks=12, maxScalacThreads=8, maxTestRetries=0 > Task :core:compileScala [Error] /Users/kchandraprakash/projects/kafka/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala:910:69: type mismatch; found : scala.collection.mutable.Buffer[org.apache.kafka.common.metrics.MetricsReporter] required: Seq[AnyRef] [Error] /Users/kchandraprakash/projects/kafka/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala:915:61: type mismatch; found : scala.collection.mutable.Buffer[org.apache.kafka.common.metrics.MetricsReporter] required: Seq[AnyRef] [Error] /Users/kchandraprakash/projects/kafka/core/src/main/scala/kafka/server/KafkaServer.scala:267:77: type mismatch; found : Seq[Object] (in scala.collection) required: Seq[AnyRef] (in scala.collection.immutable) [Error] /Users/kchandraprakash/projects/kafka/core/src/main/scala/kafka/server/KafkaServer.scala:117:15: private val kafkaMetricsReporters in class KafkaServer is never used four errors found ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kamalcph commented on a diff in pull request #14116: KAFKA-15167: Tiered Storage Test Harness Framework
kamalcph commented on code in PR #14116: URL: https://github.com/apache/kafka/pull/14116#discussion_r1285716790 ## core/src/test/scala/unit/kafka/utils/TestUtils.scala: ## @@ -216,6 +216,14 @@ object TestUtils extends Logging { (new Broker(id, host, port, ListenerName.forSecurityProtocol(securityProtocol), securityProtocol), epoch) } + /** + * Create a test config for the provided parameters. + */ + def createServerConfigs(numConfigs: Int, Review Comment: Removed this method. With `createBrokerConfigs`, we had to supply 18 parameters from Java code. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kamalcph commented on a diff in pull request #14116: KAFKA-15167: Tiered Storage Test Harness Framework
kamalcph commented on code in PR #14116: URL: https://github.com/apache/kafka/pull/14116#discussion_r1285715433 ## storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestHarness.java: ## @@ -0,0 +1,205 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.tiered.storage; + +import org.apache.kafka.tiered.storage.utils.BrokerLocalStorage; +import kafka.api.IntegrationTestHarness; +import kafka.log.remote.RemoteLogManager; +import kafka.server.KafkaBroker; +import kafka.server.KafkaConfig; +import kafka.utils.TestUtils; +import org.apache.kafka.common.replica.ReplicaSelector; +import org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManager; +import org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig; +import org.apache.kafka.server.log.remote.storage.ClassLoaderAwareRemoteStorageManager; +import org.apache.kafka.server.log.remote.storage.LocalTieredStorage; +import org.apache.kafka.server.log.remote.storage.RemoteStorageManager; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInfo; +import scala.collection.Seq; + +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import java.util.Properties; +import java.util.stream.Collectors; + +import scala.collection.JavaConverters; + +import static org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP; +import static org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.REMOTE_STORAGE_MANAGER_CLASS_NAME_PROP; +import static org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP; +import static org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.REMOTE_LOG_MANAGER_TASK_INTERVAL_MS_PROP; + +import static org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX_PROP; +import static org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.REMOTE_STORAGE_MANAGER_CONFIG_PREFIX_PROP; + +import static org.apache.kafka.server.log.remote.storage.LocalTieredStorage.DELETE_ON_CLOSE_CONFIG; +import static org.apache.kafka.server.log.remote.storage.LocalTieredStorage.STORAGE_DIR_CONFIG; + +/** + * Base class for integration tests exercising the tiered storage functionality in Apache Kafka. + */ +public abstract class TieredStorageTestHarness extends IntegrationTestHarness { + +/** + * InitialTaskDelayMs is set to 30 seconds for the delete-segment scheduler in Apache Kafka. + * Hence, we need to wait at least that amount of time before segments eligible for deletion + * gets physically removed. + */ +private static final Integer STORAGE_WAIT_TIMEOUT_SEC = 35; + +private TieredStorageTestContext context; + +protected int numMetadataPartition() { +return 5; +} + +@SuppressWarnings("deprecation") +@Override +public Seq generateConfigs() { +Properties overridingProps = getOverridingProps(); +List configs = + JavaConverters.seqAsJavaList(TestUtils.createServerConfigs(brokerCount(), zkConnectOrNull())) +.stream() +.map(config -> KafkaConfig.fromProps(config, overridingProps)) +.collect(Collectors.toList()); +return JavaConverters.asScalaBuffer(configs).toSeq(); +} + +public Properties getOverridingProps() { +Properties overridingProps = new Properties(); +// Configure the tiered storage in Kafka. Set an interval of 1 second for the remote log manager background +// activity to ensure the tiered storage has enough room to be exercised within the lifetime of a test. +// +// The replication factor of the remote log metadata topic needs to be chosen so that in resiliency +// tests, metadata can survive the loss of one replica for its topic-partitions. +// +// The second-tier storage system is mocked via the LocalTieredStorage instance
[GitHub] [kafka] kamalcph commented on a diff in pull request #14116: KAFKA-15167: Tiered Storage Test Harness Framework
kamalcph commented on code in PR #14116: URL: https://github.com/apache/kafka/pull/14116#discussion_r1285713904 ## storage/src/test/java/org/apache/kafka/tiered/storage/integration/OffloadAndConsumeFromLeaderTest.java: ## @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.tiered.storage.integration; + +import org.apache.kafka.tiered.storage.TieredStorageTestBuilder; +import org.apache.kafka.tiered.storage.TieredStorageTestHarness; +import org.apache.kafka.tiered.storage.specs.KeyValueSpec; + +import java.util.List; +import java.util.Map; + +/** + * Test Cases (A): + *Elementary offloads and fetches from tiered storage. + */ +public final class OffloadAndConsumeFromLeaderTest extends TieredStorageTestHarness { Review Comment: Disabled the `TieredStorageTestHarness` test until the trunk build becomes stable to test them out. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org