[GitHub] [kafka] kamalcph commented on a diff in pull request #14116: KAFKA-15167: Tiered Storage Test Harness Framework

2023-08-29 Thread via GitHub


kamalcph commented on code in PR #14116:
URL: https://github.com/apache/kafka/pull/14116#discussion_r1308574000


##
storage/src/test/java/org/apache/kafka/tiered/storage/actions/CreateTopicAction.java:
##
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tiered.storage.actions;
+
+import org.apache.kafka.tiered.storage.TieredStorageTestAction;
+import org.apache.kafka.tiered.storage.TieredStorageTestContext;
+import org.apache.kafka.tiered.storage.specs.TopicSpec;
+import org.apache.kafka.common.config.TopicConfig;
+
+import java.io.PrintStream;
+import java.util.concurrent.ExecutionException;
+
+public final class CreateTopicAction implements TieredStorageTestAction {
+
+private final TopicSpec spec;
+
+public CreateTopicAction(TopicSpec spec) {
+this.spec = spec;
+}
+
+@Override
+public void doExecute(TieredStorageTestContext context) throws 
ExecutionException, InterruptedException {
+// Ensure offset and time indexes are generated for every record.
+spec.getProperties().put(TopicConfig.INDEX_INTERVAL_BYTES_CONFIG, "1");
+// Leverage the use of the segment index size to create a log-segment 
accepting one and only one record.
+// The minimum size of the indexes is that of an entry, which is 8 for 
the offset index and 12 for the
+// time index. Hence, since the topic is configured to generate index 
entries for every record with, for
+// a "small" number of records (i.e. such that the average record size 
times the number of records is
+// much less than the segment size), the number of records which hold 
in a segment is the multiple of 12
+// defined below.
+if (spec.getMaxBatchCountPerSegment() != -1) {
+spec.getProperties().put(
+TopicConfig.SEGMENT_INDEX_BYTES_CONFIG, String.valueOf(12 
* spec.getMaxBatchCountPerSegment()));

Review Comment:
   This is the allocated size for all the indexes. Since, timeIndex requires 12 
bytes for one record batch which is higher than the offsetIndex of 8 bytes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] kamalcph commented on a diff in pull request #14116: KAFKA-15167: Tiered Storage Test Harness Framework

2023-08-29 Thread via GitHub


kamalcph commented on code in PR #14116:
URL: https://github.com/apache/kafka/pull/14116#discussion_r1308568867


##
storage/src/test/java/org/apache/kafka/tiered/storage/actions/ShrinkReplicaAction.java:
##
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tiered.storage.actions;
+
+import org.apache.kafka.tiered.storage.TieredStorageTestAction;
+import org.apache.kafka.tiered.storage.TieredStorageTestContext;
+import org.apache.kafka.clients.admin.NewPartitionReassignment;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.TopicPartitionInfo;
+import org.apache.kafka.test.TestUtils;
+
+import java.io.PrintStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+
+import static org.apache.kafka.tiered.storage.utils.ActionUtils.describeTopic;
+
+public final class ShrinkReplicaAction implements TieredStorageTestAction {

Review Comment:
   Kafka does not support decreasing the number of partitions but shrinking the 
replication factor is supported via `ALTER_PARTITION_REASSIGNMENTS` API.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] kamalcph commented on a diff in pull request #14116: KAFKA-15167: Tiered Storage Test Harness Framework

2023-08-29 Thread via GitHub


kamalcph commented on code in PR #14116:
URL: https://github.com/apache/kafka/pull/14116#discussion_r1308567665


##
storage/src/test/java/org/apache/kafka/tiered/storage/utils/BrokerLocalStorage.java:
##
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tiered.storage.utils;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.storage.internals.log.LogFileUtils;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+public final class BrokerLocalStorage {
+
+private final Integer brokerId;
+private final File brokerStorageDirectory;
+private final Integer storageWaitTimeoutSec;
+
+private final int storagePollPeriodSec = 1;
+private final Time time = Time.SYSTEM;
+
+public BrokerLocalStorage(Integer brokerId,
+  String storageDirname,
+  Integer storageWaitTimeoutSec) {
+this.brokerId = brokerId;
+this.brokerStorageDirectory = new File(storageDirname);
+this.storageWaitTimeoutSec = storageWaitTimeoutSec;
+}
+
+public Integer getBrokerId() {
+return brokerId;
+}
+
+/**
+ * Wait until the first segment offset in Apache Kafka storage for the 
given topic-partition is
+ * equal or greater to the provided offset.

Review Comment:
   ack, typo error. Will update the comment in the next PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] kamalcph commented on a diff in pull request #14116: KAFKA-15167: Tiered Storage Test Harness Framework

2023-08-15 Thread via GitHub


kamalcph commented on code in PR #14116:
URL: https://github.com/apache/kafka/pull/14116#discussion_r1294653156


##
storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestReport.java:
##
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tiered.storage;
+
+import org.apache.kafka.tiered.storage.utils.DumpLocalTieredStorage;
+import org.apache.kafka.server.log.remote.storage.LocalTieredStorage;
+
+import java.io.PrintStream;
+import java.util.ArrayList;
+import java.util.List;
+
+public final class TieredStorageTestReport {
+
+private final TieredStorageTestContext context;
+private final List successfulActions = new 
ArrayList<>();
+private final List failedActions = new 
ArrayList<>();
+
+public TieredStorageTestReport(TieredStorageTestContext context) {
+this.context = context;
+}
+
+public void addSucceeded(TieredStorageTestAction action) {
+synchronized (this) {
+successfulActions.add(action);
+}
+}
+
+public void addFailed(TieredStorageTestAction action) {
+synchronized (this) {
+failedActions.add(action);
+}
+}
+
+public void print(PrintStream output) {
+output.println();
+int seqNo = 0;
+List> actionsLists = new ArrayList<>();
+actionsLists.add(successfulActions);
+actionsLists.add(failedActions);
+
+List statusList = new ArrayList<>();
+statusList.add("SUCCESS");
+statusList.add("FAILURE");
+
+for (int i = 0; i < actionsLists.size(); i++) {
+List actions = actionsLists.get(i);
+String ident = statusList.get(i);
+for (TieredStorageTestAction action : actions) {
+seqNo++;
+output.print("[" + ident + "] (" + seqNo + ") ");
+action.describe(output);
+output.println();
+}
+}
+String lts = "";
+if (!context.getTieredStorages().isEmpty()) {

Review Comment:
   If the test is setup correctly, then the `getTieredStorages` (or) 
`remoteStorageManagers` won't be empty. Added the `isEmpty` check to avoid NPE. 
We can remove/refactor the code later.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] kamalcph commented on a diff in pull request #14116: KAFKA-15167: Tiered Storage Test Harness Framework

2023-08-15 Thread via GitHub


kamalcph commented on code in PR #14116:
URL: https://github.com/apache/kafka/pull/14116#discussion_r1294640475


##
storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestHarness.java:
##
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tiered.storage;
+
+import org.apache.kafka.tiered.storage.utils.BrokerLocalStorage;
+import kafka.api.IntegrationTestHarness;
+import kafka.log.remote.RemoteLogManager;
+import kafka.server.KafkaBroker;
+import kafka.server.KafkaConfig;
+import kafka.utils.TestUtils;
+import org.apache.kafka.common.replica.ReplicaSelector;
+import 
org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManager;
+import 
org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig;
+import 
org.apache.kafka.server.log.remote.storage.ClassLoaderAwareRemoteStorageManager;
+import org.apache.kafka.server.log.remote.storage.LocalTieredStorage;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageManager;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+import scala.collection.Seq;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.stream.Collectors;
+
+import scala.collection.JavaConverters;
+
+import static 
org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.REMOTE_STORAGE_MANAGER_CLASS_NAME_PROP;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.REMOTE_LOG_MANAGER_TASK_INTERVAL_MS_PROP;
+
+import static 
org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX_PROP;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.REMOTE_STORAGE_MANAGER_CONFIG_PREFIX_PROP;
+
+import static 
org.apache.kafka.server.log.remote.storage.LocalTieredStorage.DELETE_ON_CLOSE_CONFIG;
+import static 
org.apache.kafka.server.log.remote.storage.LocalTieredStorage.STORAGE_DIR_CONFIG;
+
+/**
+ * Base class for integration tests exercising the tiered storage 
functionality in Apache Kafka.
+ */
+@Tag("integration")
+public abstract class TieredStorageTestHarness extends IntegrationTestHarness {
+
+/**
+ * InitialTaskDelayMs is set to 30 seconds for the delete-segment 
scheduler in Apache Kafka.
+ * Hence, we need to wait at least that amount of time before segments 
eligible for deletion
+ * gets physically removed.
+ */
+private static final Integer STORAGE_WAIT_TIMEOUT_SEC = 35;
+
+private TieredStorageTestContext context;
+
+protected int numRemoteLogMetadataPartitions() {
+return 5;
+}
+
+@SuppressWarnings("deprecation")
+@Override
+public void modifyConfigs(Seq props) {
+for (Properties p : JavaConverters.seqAsJavaList(props)) {
+p.putAll(overridingProps());
+}
+}
+
+public Properties overridingProps() {
+Properties overridingProps = new Properties();
+// Configure the tiered storage in Kafka. Set an interval of 1 second 
for the remote log manager background
+// activity to ensure the tiered storage has enough room to be 
exercised within the lifetime of a test.
+//
+// The replication factor of the remote log metadata topic needs to be 
chosen so that in resiliency
+// tests, metadata can survive the loss of one replica for its 
topic-partitions.
+//
+// The second-tier storage system is mocked via the LocalTieredStorage 
instance which persists transferred
+// data files on the local file system.
+overridingProps.setProperty(REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, 
"true");
+

[GitHub] [kafka] kamalcph commented on a diff in pull request #14116: KAFKA-15167: Tiered Storage Test Harness Framework

2023-08-15 Thread via GitHub


kamalcph commented on code in PR #14116:
URL: https://github.com/apache/kafka/pull/14116#discussion_r1294609562


##
storage/src/test/java/org/apache/kafka/tiered/storage/README.md:
##
@@ -0,0 +1,9 @@
+Step 1: For every test, setup is done via TieredStorageTestHarness which 
extends IntegrationTestHarness and sets up a cluster with TS enabled on it.

Review Comment:
   Will add detailed steps later once we start to enable the tests.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] kamalcph commented on a diff in pull request #14116: KAFKA-15167: Tiered Storage Test Harness Framework

2023-08-15 Thread via GitHub


kamalcph commented on code in PR #14116:
URL: https://github.com/apache/kafka/pull/14116#discussion_r1294583210


##
storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestContext.java:
##
@@ -0,0 +1,348 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tiered.storage;
+
+import org.apache.kafka.tiered.storage.specs.ExpandPartitionCountSpec;
+import org.apache.kafka.tiered.storage.specs.TopicSpec;
+import org.apache.kafka.tiered.storage.utils.BrokerLocalStorage;
+import kafka.log.UnifiedLog;
+import kafka.server.KafkaBroker;
+import kafka.utils.TestUtils;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AlterConfigOp;
+import org.apache.kafka.clients.admin.AlterConfigsOptions;
+import org.apache.kafka.clients.admin.ConfigEntry;
+import org.apache.kafka.clients.admin.NewPartitions;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.admin.TopicDescription;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.TopicPartitionInfo;
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.network.ListenerName;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.metadata.BrokerState;
+import org.apache.kafka.server.log.remote.storage.LocalTieredStorage;
+import org.apache.kafka.server.log.remote.storage.LocalTieredStorageHistory;
+import org.apache.kafka.server.log.remote.storage.LocalTieredStorageSnapshot;
+import scala.Function0;
+import scala.Function1;
+
+import java.io.IOException;
+import java.io.PrintStream;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+
+import scala.Option;
+import scala.collection.Seq;
+
+import static 
org.apache.kafka.clients.CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG;
+import static 
org.apache.kafka.clients.producer.ProducerConfig.LINGER_MS_CONFIG;
+
+public final class TieredStorageTestContext {
+
+private final Seq brokers;
+private final Properties producerConfig;
+private final Properties consumerConfig;
+private final ListenerName listenerName;
+
+private final Serializer ser;
+private final Deserializer de;
+
+private final Map topicSpecs;
+private final TieredStorageTestReport testReport;
+
+private volatile KafkaProducer producer;
+private volatile KafkaConsumer consumer;
+private volatile Admin admin;
+private volatile List tieredStorages;
+private volatile List localStorages;
+
+public TieredStorageTestContext(Seq brokers,
+Properties producerConfig,
+Properties consumerConfig,
+ListenerName listenerName) {
+this.brokers = brokers;
+this.producerConfig = producerConfig;
+this.consumerConfig = consumerConfig;
+this.listenerName = listenerName;
+this.ser = Serdes.String().serializer();
+this.de = Serdes.String().deserializer();
+this.topicSpecs = new HashMap<>();
+this.testReport = new TieredStorageTestReport(this);
+initContext();
+}
+
+private void initContext() {
+String bootstrapServers = TestUtils.bootstrapServers(brokers, 
listenerName);
+producerConfig.put(BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+// Set a producer linger of 

[GitHub] [kafka] kamalcph commented on a diff in pull request #14116: KAFKA-15167: Tiered Storage Test Harness Framework

2023-08-15 Thread via GitHub


kamalcph commented on code in PR #14116:
URL: https://github.com/apache/kafka/pull/14116#discussion_r1294503554


##
storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestContext.java:
##
@@ -0,0 +1,339 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tiered.storage;
+
+import org.apache.kafka.tiered.storage.specs.ExpandPartitionCountSpec;
+import org.apache.kafka.tiered.storage.specs.TopicSpec;
+import org.apache.kafka.tiered.storage.utils.BrokerLocalStorage;
+import kafka.log.UnifiedLog;
+import kafka.server.KafkaBroker;
+import kafka.utils.TestUtils;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AlterConfigOp;
+import org.apache.kafka.clients.admin.AlterConfigsOptions;
+import org.apache.kafka.clients.admin.ConfigEntry;
+import org.apache.kafka.clients.admin.NewPartitions;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.admin.TopicDescription;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.TopicPartitionInfo;
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.metadata.BrokerState;
+import org.apache.kafka.server.log.remote.storage.LocalTieredStorage;
+import org.apache.kafka.server.log.remote.storage.LocalTieredStorageHistory;
+import org.apache.kafka.server.log.remote.storage.LocalTieredStorageSnapshot;
+import scala.Function0;
+import scala.Function1;
+
+import java.io.IOException;
+import java.io.PrintStream;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+
+import scala.Option;
+import scala.collection.Seq;
+
+import static 
org.apache.kafka.clients.producer.ProducerConfig.LINGER_MS_CONFIG;
+
+public final class TieredStorageTestContext {
+
+private final Seq brokers;
+private final Properties producerConfig;
+private final Properties consumerConfig;
+private final Properties adminConfig;
+
+private final Serializer ser;
+private final Deserializer de;
+
+private final Map topicSpecs;
+private final TieredStorageTestReport testReport;
+
+private volatile KafkaProducer producer;
+private volatile KafkaConsumer consumer;
+private volatile Admin admin;
+private volatile List tieredStorages;
+private volatile List localStorages;
+
+public TieredStorageTestContext(Seq brokers,
+Properties producerConfig,
+Properties consumerConfig,
+Properties adminConfig) {
+this.brokers = brokers;
+this.producerConfig = producerConfig;
+this.consumerConfig = consumerConfig;
+this.adminConfig = adminConfig;
+this.ser = Serdes.String().serializer();
+this.de = Serdes.String().deserializer();
+this.topicSpecs = new HashMap<>();
+this.testReport = new TieredStorageTestReport(this);
+initContext();
+}
+
+private void initContext() {
+// Set a producer linger of 60 seconds, in order to optimistically 
generate batches of
+// records with a pre-determined size.
+producerConfig.put(LINGER_MS_CONFIG, 
String.valueOf(TimeUnit.SECONDS.toMillis(60)));
+producer = new KafkaProducer<>(producerConfig, ser, ser);
+consumer = new 

[GitHub] [kafka] kamalcph commented on a diff in pull request #14116: KAFKA-15167: Tiered Storage Test Harness Framework

2023-08-07 Thread via GitHub


kamalcph commented on code in PR #14116:
URL: https://github.com/apache/kafka/pull/14116#discussion_r1286059074


##
core/src/main/scala/kafka/server/KafkaBroker.scala:
##
@@ -19,6 +19,7 @@ package kafka.server
 
 import com.yammer.metrics.core.MetricName
 import kafka.log.LogManager
+import kafka.log.remote.RemoteLogManager

Review Comment:
   This import is being used in L83.
   
   Removed the unused import "scala.collection.Seq" from this class, but it 
throwing below error, so reverted it back since this change is unrelated to 
this PR:
   
   ```
   ❯ ./gradlew :storage:compileTestJava -PscalaVersion=2.13
   
   > Configure project :
   Starting build with version 3.6.0-SNAPSHOT (commit id d109f88d) using Gradle 
8.2.1, Java 11 and Scala 2.13.11
   Build properties: maxParallelForks=12, maxScalacThreads=8, maxTestRetries=0
   
   > Task :core:compileScala
   [Error] 
/Users/kchandraprakash/projects/kafka/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala:910:69:
 type mismatch;
found   : 
scala.collection.mutable.Buffer[org.apache.kafka.common.metrics.MetricsReporter]
required: Seq[AnyRef]
   [Error] 
/Users/kchandraprakash/projects/kafka/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala:915:61:
 type mismatch;
found   : 
scala.collection.mutable.Buffer[org.apache.kafka.common.metrics.MetricsReporter]
required: Seq[AnyRef]
   [Error] 
/Users/kchandraprakash/projects/kafka/core/src/main/scala/kafka/server/KafkaServer.scala:267:77:
 type mismatch;
found   : Seq[Object] (in scala.collection) 
required: Seq[AnyRef] (in scala.collection.immutable) 
   [Error] 
/Users/kchandraprakash/projects/kafka/core/src/main/scala/kafka/server/KafkaServer.scala:117:15:
 private val kafkaMetricsReporters in class KafkaServer is never used
   four errors found
   ```



##
core/src/main/scala/kafka/server/KafkaBroker.scala:
##
@@ -19,6 +19,7 @@ package kafka.server
 
 import com.yammer.metrics.core.MetricName
 import kafka.log.LogManager
+import kafka.log.remote.RemoteLogManager

Review Comment:
   This import is being used in L83.
   
   Removed the unused import "scala.collection.Seq" from this class, but it 
throwing below error, so reverted it back since the change is unrelated to this 
PR:
   
   ```
   ❯ ./gradlew :storage:compileTestJava -PscalaVersion=2.13
   
   > Configure project :
   Starting build with version 3.6.0-SNAPSHOT (commit id d109f88d) using Gradle 
8.2.1, Java 11 and Scala 2.13.11
   Build properties: maxParallelForks=12, maxScalacThreads=8, maxTestRetries=0
   
   > Task :core:compileScala
   [Error] 
/Users/kchandraprakash/projects/kafka/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala:910:69:
 type mismatch;
found   : 
scala.collection.mutable.Buffer[org.apache.kafka.common.metrics.MetricsReporter]
required: Seq[AnyRef]
   [Error] 
/Users/kchandraprakash/projects/kafka/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala:915:61:
 type mismatch;
found   : 
scala.collection.mutable.Buffer[org.apache.kafka.common.metrics.MetricsReporter]
required: Seq[AnyRef]
   [Error] 
/Users/kchandraprakash/projects/kafka/core/src/main/scala/kafka/server/KafkaServer.scala:267:77:
 type mismatch;
found   : Seq[Object] (in scala.collection) 
required: Seq[AnyRef] (in scala.collection.immutable) 
   [Error] 
/Users/kchandraprakash/projects/kafka/core/src/main/scala/kafka/server/KafkaServer.scala:117:15:
 private val kafkaMetricsReporters in class KafkaServer is never used
   four errors found
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] kamalcph commented on a diff in pull request #14116: KAFKA-15167: Tiered Storage Test Harness Framework

2023-08-07 Thread via GitHub


kamalcph commented on code in PR #14116:
URL: https://github.com/apache/kafka/pull/14116#discussion_r1286072822


##
storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestBuilder.java:
##
@@ -0,0 +1,427 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tiered.storage;
+
+import org.apache.kafka.tiered.storage.actions.BounceBrokerAction;
+import org.apache.kafka.tiered.storage.actions.ConsumeAction;
+import org.apache.kafka.tiered.storage.actions.CreatePartitionsAction;
+import org.apache.kafka.tiered.storage.actions.CreateTopicAction;
+import org.apache.kafka.tiered.storage.actions.DeleteRecordsAction;
+import org.apache.kafka.tiered.storage.actions.DeleteTopicAction;
+import org.apache.kafka.tiered.storage.actions.EraseBrokerStorageAction;
+import org.apache.kafka.tiered.storage.actions.ExpectBrokerInISRAction;
+import org.apache.kafka.tiered.storage.actions.ExpectEmptyRemoteStorageAction;
+import org.apache.kafka.tiered.storage.actions.ExpectLeaderAction;
+import 
org.apache.kafka.tiered.storage.actions.ExpectLeaderEpochCheckpointAction;
+import org.apache.kafka.tiered.storage.actions.ExpectListOffsetsAction;
+import 
org.apache.kafka.tiered.storage.actions.ExpectTopicIdToMatchInRemoteStorageAction;
+import 
org.apache.kafka.tiered.storage.actions.ExpectUserTopicMappedToMetadataPartitionsAction;
+import org.apache.kafka.tiered.storage.actions.ProduceAction;
+import org.apache.kafka.tiered.storage.actions.ReassignReplicaAction;
+import org.apache.kafka.tiered.storage.actions.ShrinkReplicaAction;
+import org.apache.kafka.tiered.storage.actions.StartBrokerAction;
+import org.apache.kafka.tiered.storage.actions.StopBrokerAction;
+import org.apache.kafka.tiered.storage.actions.UpdateBrokerConfigAction;
+import org.apache.kafka.tiered.storage.actions.UpdateTopicConfigAction;
+import org.apache.kafka.tiered.storage.specs.ConsumableSpec;
+import org.apache.kafka.tiered.storage.specs.DeletableSpec;
+import org.apache.kafka.tiered.storage.specs.ExpandPartitionCountSpec;
+import org.apache.kafka.tiered.storage.specs.FetchableSpec;
+import org.apache.kafka.tiered.storage.specs.KeyValueSpec;
+import org.apache.kafka.tiered.storage.specs.OffloadableSpec;
+import org.apache.kafka.tiered.storage.specs.OffloadedSegmentSpec;
+import org.apache.kafka.tiered.storage.specs.ProducableSpec;
+import org.apache.kafka.tiered.storage.specs.RemoteDeleteSegmentSpec;
+import org.apache.kafka.tiered.storage.specs.RemoteFetchSpec;
+import org.apache.kafka.tiered.storage.specs.TopicSpec;
+import org.apache.kafka.clients.admin.OffsetSpec;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.config.TopicConfig;
+import org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent;
+import org.apache.kafka.storage.internals.log.EpochEntry;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public final class TieredStorageTestBuilder {
+
+private final int defaultProducedBatchSize = 1;
+private final long defaultEarliestLocalOffsetExpectedInLogDirectory = 0;
+
+private Map producables = new HashMap<>();
+private Map> offloadables = new 
HashMap<>();
+private Map consumables = new HashMap<>();
+private Map fetchables = new HashMap<>();
+private Map> deletables = new 
HashMap<>();
+private List actions = new ArrayList<>();
+
+public TieredStorageTestBuilder() {
+}
+
+public TieredStorageTestBuilder createTopic(String topic,
+Integer partitionCount,
+Integer replicationFactor,
+Integer 
maxBatchCountPerSegment,
+Map> 
replicaAssignment,
+Boolean 
enableRemoteLogStorage) {
+assert maxBatchCountPerSegment >= 1 : "Segments size for topic " + 
topic + " needs to be >= 1";
+assert partitionCount >= 1 : "Partition count for topic " + topic + " 
needs to be >= 

[GitHub] [kafka] kamalcph commented on a diff in pull request #14116: KAFKA-15167: Tiered Storage Test Harness Framework

2023-08-07 Thread via GitHub


kamalcph commented on code in PR #14116:
URL: https://github.com/apache/kafka/pull/14116#discussion_r1286074651


##
storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestBuilder.java:
##
@@ -0,0 +1,427 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tiered.storage;
+
+import org.apache.kafka.tiered.storage.actions.BounceBrokerAction;
+import org.apache.kafka.tiered.storage.actions.ConsumeAction;
+import org.apache.kafka.tiered.storage.actions.CreatePartitionsAction;
+import org.apache.kafka.tiered.storage.actions.CreateTopicAction;
+import org.apache.kafka.tiered.storage.actions.DeleteRecordsAction;
+import org.apache.kafka.tiered.storage.actions.DeleteTopicAction;
+import org.apache.kafka.tiered.storage.actions.EraseBrokerStorageAction;
+import org.apache.kafka.tiered.storage.actions.ExpectBrokerInISRAction;
+import org.apache.kafka.tiered.storage.actions.ExpectEmptyRemoteStorageAction;
+import org.apache.kafka.tiered.storage.actions.ExpectLeaderAction;
+import 
org.apache.kafka.tiered.storage.actions.ExpectLeaderEpochCheckpointAction;
+import org.apache.kafka.tiered.storage.actions.ExpectListOffsetsAction;
+import 
org.apache.kafka.tiered.storage.actions.ExpectTopicIdToMatchInRemoteStorageAction;
+import 
org.apache.kafka.tiered.storage.actions.ExpectUserTopicMappedToMetadataPartitionsAction;
+import org.apache.kafka.tiered.storage.actions.ProduceAction;
+import org.apache.kafka.tiered.storage.actions.ReassignReplicaAction;
+import org.apache.kafka.tiered.storage.actions.ShrinkReplicaAction;
+import org.apache.kafka.tiered.storage.actions.StartBrokerAction;
+import org.apache.kafka.tiered.storage.actions.StopBrokerAction;
+import org.apache.kafka.tiered.storage.actions.UpdateBrokerConfigAction;
+import org.apache.kafka.tiered.storage.actions.UpdateTopicConfigAction;
+import org.apache.kafka.tiered.storage.specs.ConsumableSpec;
+import org.apache.kafka.tiered.storage.specs.DeletableSpec;
+import org.apache.kafka.tiered.storage.specs.ExpandPartitionCountSpec;
+import org.apache.kafka.tiered.storage.specs.FetchableSpec;
+import org.apache.kafka.tiered.storage.specs.KeyValueSpec;
+import org.apache.kafka.tiered.storage.specs.OffloadableSpec;
+import org.apache.kafka.tiered.storage.specs.OffloadedSegmentSpec;
+import org.apache.kafka.tiered.storage.specs.ProducableSpec;
+import org.apache.kafka.tiered.storage.specs.RemoteDeleteSegmentSpec;
+import org.apache.kafka.tiered.storage.specs.RemoteFetchSpec;
+import org.apache.kafka.tiered.storage.specs.TopicSpec;
+import org.apache.kafka.clients.admin.OffsetSpec;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.config.TopicConfig;
+import org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent;
+import org.apache.kafka.storage.internals.log.EpochEntry;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public final class TieredStorageTestBuilder {
+
+private final int defaultProducedBatchSize = 1;
+private final long defaultEarliestLocalOffsetExpectedInLogDirectory = 0;
+
+private Map producables = new HashMap<>();
+private Map> offloadables = new 
HashMap<>();
+private Map consumables = new HashMap<>();
+private Map fetchables = new HashMap<>();
+private Map> deletables = new 
HashMap<>();
+private List actions = new ArrayList<>();
+
+public TieredStorageTestBuilder() {
+}
+
+public TieredStorageTestBuilder createTopic(String topic,
+Integer partitionCount,
+Integer replicationFactor,
+Integer 
maxBatchCountPerSegment,
+Map> 
replicaAssignment,
+Boolean 
enableRemoteLogStorage) {
+assert maxBatchCountPerSegment >= 1 : "Segments size for topic " + 
topic + " needs to be >= 1";
+assert partitionCount >= 1 : "Partition count for topic " + topic + " 
needs to be >= 

[GitHub] [kafka] kamalcph commented on a diff in pull request #14116: KAFKA-15167: Tiered Storage Test Harness Framework

2023-08-07 Thread via GitHub


kamalcph commented on code in PR #14116:
URL: https://github.com/apache/kafka/pull/14116#discussion_r1286072822


##
storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestBuilder.java:
##
@@ -0,0 +1,427 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tiered.storage;
+
+import org.apache.kafka.tiered.storage.actions.BounceBrokerAction;
+import org.apache.kafka.tiered.storage.actions.ConsumeAction;
+import org.apache.kafka.tiered.storage.actions.CreatePartitionsAction;
+import org.apache.kafka.tiered.storage.actions.CreateTopicAction;
+import org.apache.kafka.tiered.storage.actions.DeleteRecordsAction;
+import org.apache.kafka.tiered.storage.actions.DeleteTopicAction;
+import org.apache.kafka.tiered.storage.actions.EraseBrokerStorageAction;
+import org.apache.kafka.tiered.storage.actions.ExpectBrokerInISRAction;
+import org.apache.kafka.tiered.storage.actions.ExpectEmptyRemoteStorageAction;
+import org.apache.kafka.tiered.storage.actions.ExpectLeaderAction;
+import 
org.apache.kafka.tiered.storage.actions.ExpectLeaderEpochCheckpointAction;
+import org.apache.kafka.tiered.storage.actions.ExpectListOffsetsAction;
+import 
org.apache.kafka.tiered.storage.actions.ExpectTopicIdToMatchInRemoteStorageAction;
+import 
org.apache.kafka.tiered.storage.actions.ExpectUserTopicMappedToMetadataPartitionsAction;
+import org.apache.kafka.tiered.storage.actions.ProduceAction;
+import org.apache.kafka.tiered.storage.actions.ReassignReplicaAction;
+import org.apache.kafka.tiered.storage.actions.ShrinkReplicaAction;
+import org.apache.kafka.tiered.storage.actions.StartBrokerAction;
+import org.apache.kafka.tiered.storage.actions.StopBrokerAction;
+import org.apache.kafka.tiered.storage.actions.UpdateBrokerConfigAction;
+import org.apache.kafka.tiered.storage.actions.UpdateTopicConfigAction;
+import org.apache.kafka.tiered.storage.specs.ConsumableSpec;
+import org.apache.kafka.tiered.storage.specs.DeletableSpec;
+import org.apache.kafka.tiered.storage.specs.ExpandPartitionCountSpec;
+import org.apache.kafka.tiered.storage.specs.FetchableSpec;
+import org.apache.kafka.tiered.storage.specs.KeyValueSpec;
+import org.apache.kafka.tiered.storage.specs.OffloadableSpec;
+import org.apache.kafka.tiered.storage.specs.OffloadedSegmentSpec;
+import org.apache.kafka.tiered.storage.specs.ProducableSpec;
+import org.apache.kafka.tiered.storage.specs.RemoteDeleteSegmentSpec;
+import org.apache.kafka.tiered.storage.specs.RemoteFetchSpec;
+import org.apache.kafka.tiered.storage.specs.TopicSpec;
+import org.apache.kafka.clients.admin.OffsetSpec;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.config.TopicConfig;
+import org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent;
+import org.apache.kafka.storage.internals.log.EpochEntry;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public final class TieredStorageTestBuilder {
+
+private final int defaultProducedBatchSize = 1;
+private final long defaultEarliestLocalOffsetExpectedInLogDirectory = 0;
+
+private Map producables = new HashMap<>();
+private Map> offloadables = new 
HashMap<>();
+private Map consumables = new HashMap<>();
+private Map fetchables = new HashMap<>();
+private Map> deletables = new 
HashMap<>();
+private List actions = new ArrayList<>();
+
+public TieredStorageTestBuilder() {
+}
+
+public TieredStorageTestBuilder createTopic(String topic,
+Integer partitionCount,
+Integer replicationFactor,
+Integer 
maxBatchCountPerSegment,
+Map> 
replicaAssignment,
+Boolean 
enableRemoteLogStorage) {
+assert maxBatchCountPerSegment >= 1 : "Segments size for topic " + 
topic + " needs to be >= 1";
+assert partitionCount >= 1 : "Partition count for topic " + topic + " 
needs to be >= 

[GitHub] [kafka] kamalcph commented on a diff in pull request #14116: KAFKA-15167: Tiered Storage Test Harness Framework

2023-08-07 Thread via GitHub


kamalcph commented on code in PR #14116:
URL: https://github.com/apache/kafka/pull/14116#discussion_r1286066810


##
storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestContext.java:
##
@@ -0,0 +1,348 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tiered.storage;
+
+import org.apache.kafka.tiered.storage.specs.ExpandPartitionCountSpec;
+import org.apache.kafka.tiered.storage.specs.TopicSpec;
+import org.apache.kafka.tiered.storage.utils.BrokerLocalStorage;
+import kafka.log.UnifiedLog;
+import kafka.server.KafkaBroker;
+import kafka.utils.TestUtils;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AlterConfigOp;
+import org.apache.kafka.clients.admin.AlterConfigsOptions;
+import org.apache.kafka.clients.admin.ConfigEntry;
+import org.apache.kafka.clients.admin.NewPartitions;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.admin.TopicDescription;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.TopicPartitionInfo;
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.network.ListenerName;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.metadata.BrokerState;
+import org.apache.kafka.server.log.remote.storage.LocalTieredStorage;
+import org.apache.kafka.server.log.remote.storage.LocalTieredStorageHistory;
+import org.apache.kafka.server.log.remote.storage.LocalTieredStorageSnapshot;
+import scala.Function0;
+import scala.Function1;
+
+import java.io.IOException;
+import java.io.PrintStream;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+
+import scala.Option;
+import scala.collection.Seq;
+
+import static 
org.apache.kafka.clients.CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG;
+import static 
org.apache.kafka.clients.producer.ProducerConfig.LINGER_MS_CONFIG;
+
+public final class TieredStorageTestContext {

Review Comment:
   `IntegrationTestHarness` creates instance for each action invocation. We can 
do the refactoring later if required.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] kamalcph commented on a diff in pull request #14116: KAFKA-15167: Tiered Storage Test Harness Framework

2023-08-07 Thread via GitHub


kamalcph commented on code in PR #14116:
URL: https://github.com/apache/kafka/pull/14116#discussion_r1286064263


##
storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestContext.java:
##
@@ -0,0 +1,348 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tiered.storage;
+
+import org.apache.kafka.tiered.storage.specs.ExpandPartitionCountSpec;
+import org.apache.kafka.tiered.storage.specs.TopicSpec;
+import org.apache.kafka.tiered.storage.utils.BrokerLocalStorage;
+import kafka.log.UnifiedLog;
+import kafka.server.KafkaBroker;
+import kafka.utils.TestUtils;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AlterConfigOp;
+import org.apache.kafka.clients.admin.AlterConfigsOptions;
+import org.apache.kafka.clients.admin.ConfigEntry;
+import org.apache.kafka.clients.admin.NewPartitions;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.admin.TopicDescription;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.TopicPartitionInfo;
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.network.ListenerName;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.metadata.BrokerState;
+import org.apache.kafka.server.log.remote.storage.LocalTieredStorage;
+import org.apache.kafka.server.log.remote.storage.LocalTieredStorageHistory;
+import org.apache.kafka.server.log.remote.storage.LocalTieredStorageSnapshot;
+import scala.Function0;
+import scala.Function1;
+
+import java.io.IOException;
+import java.io.PrintStream;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+
+import scala.Option;
+import scala.collection.Seq;
+
+import static 
org.apache.kafka.clients.CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG;
+import static 
org.apache.kafka.clients.producer.ProducerConfig.LINGER_MS_CONFIG;
+
+public final class TieredStorageTestContext {
+
+private final Seq brokers;
+private final Properties producerConfig;
+private final Properties consumerConfig;
+private final ListenerName listenerName;
+
+private final Serializer ser;
+private final Deserializer de;
+
+private final Map topicSpecs;
+private final TieredStorageTestReport testReport;
+
+private volatile KafkaProducer producer;
+private volatile KafkaConsumer consumer;
+private volatile Admin admin;
+private volatile List tieredStorages;
+private volatile List localStorages;
+
+public TieredStorageTestContext(Seq brokers,
+Properties producerConfig,
+Properties consumerConfig,
+ListenerName listenerName) {
+this.brokers = brokers;
+this.producerConfig = producerConfig;
+this.consumerConfig = consumerConfig;
+this.listenerName = listenerName;
+this.ser = Serdes.String().serializer();
+this.de = Serdes.String().deserializer();
+this.topicSpecs = new HashMap<>();
+this.testReport = new TieredStorageTestReport(this);
+initContext();
+}
+
+private void initContext() {
+String bootstrapServers = TestUtils.bootstrapServers(brokers, 
listenerName);
+producerConfig.put(BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+// Set a producer linger of 

[GitHub] [kafka] kamalcph commented on a diff in pull request #14116: KAFKA-15167: Tiered Storage Test Harness Framework

2023-08-07 Thread via GitHub


kamalcph commented on code in PR #14116:
URL: https://github.com/apache/kafka/pull/14116#discussion_r1286060966


##
storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestContext.java:
##
@@ -0,0 +1,348 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tiered.storage;
+
+import org.apache.kafka.tiered.storage.specs.ExpandPartitionCountSpec;
+import org.apache.kafka.tiered.storage.specs.TopicSpec;
+import org.apache.kafka.tiered.storage.utils.BrokerLocalStorage;
+import kafka.log.UnifiedLog;
+import kafka.server.KafkaBroker;
+import kafka.utils.TestUtils;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AlterConfigOp;
+import org.apache.kafka.clients.admin.AlterConfigsOptions;
+import org.apache.kafka.clients.admin.ConfigEntry;
+import org.apache.kafka.clients.admin.NewPartitions;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.admin.TopicDescription;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.TopicPartitionInfo;
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.network.ListenerName;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.metadata.BrokerState;
+import org.apache.kafka.server.log.remote.storage.LocalTieredStorage;
+import org.apache.kafka.server.log.remote.storage.LocalTieredStorageHistory;
+import org.apache.kafka.server.log.remote.storage.LocalTieredStorageSnapshot;
+import scala.Function0;
+import scala.Function1;
+
+import java.io.IOException;
+import java.io.PrintStream;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+
+import scala.Option;
+import scala.collection.Seq;
+
+import static 
org.apache.kafka.clients.CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG;
+import static 
org.apache.kafka.clients.producer.ProducerConfig.LINGER_MS_CONFIG;
+
+public final class TieredStorageTestContext {
+
+private final Seq brokers;
+private final Properties producerConfig;
+private final Properties consumerConfig;
+private final ListenerName listenerName;
+
+private final Serializer ser;
+private final Deserializer de;
+
+private final Map topicSpecs;
+private final TieredStorageTestReport testReport;
+
+private volatile KafkaProducer producer;
+private volatile KafkaConsumer consumer;
+private volatile Admin admin;
+private volatile List tieredStorages;
+private volatile List localStorages;
+
+public TieredStorageTestContext(Seq brokers,
+Properties producerConfig,
+Properties consumerConfig,
+ListenerName listenerName) {
+this.brokers = brokers;
+this.producerConfig = producerConfig;
+this.consumerConfig = consumerConfig;
+this.listenerName = listenerName;
+this.ser = Serdes.String().serializer();
+this.de = Serdes.String().deserializer();
+this.topicSpecs = new HashMap<>();
+this.testReport = new TieredStorageTestReport(this);
+initContext();
+}
+
+private void initContext() {
+String bootstrapServers = TestUtils.bootstrapServers(brokers, 
listenerName);
+producerConfig.put(BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+// Set a producer linger of 

[GitHub] [kafka] kamalcph commented on a diff in pull request #14116: KAFKA-15167: Tiered Storage Test Harness Framework

2023-08-07 Thread via GitHub


kamalcph commented on code in PR #14116:
URL: https://github.com/apache/kafka/pull/14116#discussion_r1286060584


##
storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestContext.java:
##
@@ -0,0 +1,348 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tiered.storage;
+
+import org.apache.kafka.tiered.storage.specs.ExpandPartitionCountSpec;
+import org.apache.kafka.tiered.storage.specs.TopicSpec;
+import org.apache.kafka.tiered.storage.utils.BrokerLocalStorage;
+import kafka.log.UnifiedLog;
+import kafka.server.KafkaBroker;
+import kafka.utils.TestUtils;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AlterConfigOp;
+import org.apache.kafka.clients.admin.AlterConfigsOptions;
+import org.apache.kafka.clients.admin.ConfigEntry;
+import org.apache.kafka.clients.admin.NewPartitions;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.admin.TopicDescription;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.TopicPartitionInfo;
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.network.ListenerName;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.metadata.BrokerState;
+import org.apache.kafka.server.log.remote.storage.LocalTieredStorage;
+import org.apache.kafka.server.log.remote.storage.LocalTieredStorageHistory;
+import org.apache.kafka.server.log.remote.storage.LocalTieredStorageSnapshot;
+import scala.Function0;
+import scala.Function1;
+
+import java.io.IOException;
+import java.io.PrintStream;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+
+import scala.Option;
+import scala.collection.Seq;
+
+import static 
org.apache.kafka.clients.CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG;
+import static 
org.apache.kafka.clients.producer.ProducerConfig.LINGER_MS_CONFIG;
+
+public final class TieredStorageTestContext {
+
+private final Seq brokers;
+private final Properties producerConfig;
+private final Properties consumerConfig;
+private final ListenerName listenerName;
+
+private final Serializer ser;
+private final Deserializer de;
+
+private final Map topicSpecs;
+private final TieredStorageTestReport testReport;
+
+private volatile KafkaProducer producer;
+private volatile KafkaConsumer consumer;
+private volatile Admin admin;
+private volatile List tieredStorages;
+private volatile List localStorages;
+
+public TieredStorageTestContext(Seq brokers,
+Properties producerConfig,
+Properties consumerConfig,
+ListenerName listenerName) {
+this.brokers = brokers;
+this.producerConfig = producerConfig;
+this.consumerConfig = consumerConfig;
+this.listenerName = listenerName;
+this.ser = Serdes.String().serializer();
+this.de = Serdes.String().deserializer();
+this.topicSpecs = new HashMap<>();
+this.testReport = new TieredStorageTestReport(this);
+initContext();
+}
+
+private void initContext() {
+String bootstrapServers = TestUtils.bootstrapServers(brokers, 
listenerName);
+producerConfig.put(BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+// Set a producer linger of 

[GitHub] [kafka] kamalcph commented on a diff in pull request #14116: KAFKA-15167: Tiered Storage Test Harness Framework

2023-08-07 Thread via GitHub


kamalcph commented on code in PR #14116:
URL: https://github.com/apache/kafka/pull/14116#discussion_r1286060074


##
storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestContext.java:
##
@@ -0,0 +1,348 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tiered.storage;
+
+import org.apache.kafka.tiered.storage.specs.ExpandPartitionCountSpec;
+import org.apache.kafka.tiered.storage.specs.TopicSpec;
+import org.apache.kafka.tiered.storage.utils.BrokerLocalStorage;
+import kafka.log.UnifiedLog;
+import kafka.server.KafkaBroker;
+import kafka.utils.TestUtils;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AlterConfigOp;
+import org.apache.kafka.clients.admin.AlterConfigsOptions;
+import org.apache.kafka.clients.admin.ConfigEntry;
+import org.apache.kafka.clients.admin.NewPartitions;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.admin.TopicDescription;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.TopicPartitionInfo;
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.network.ListenerName;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.metadata.BrokerState;
+import org.apache.kafka.server.log.remote.storage.LocalTieredStorage;
+import org.apache.kafka.server.log.remote.storage.LocalTieredStorageHistory;
+import org.apache.kafka.server.log.remote.storage.LocalTieredStorageSnapshot;
+import scala.Function0;
+import scala.Function1;
+
+import java.io.IOException;
+import java.io.PrintStream;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+
+import scala.Option;
+import scala.collection.Seq;
+
+import static 
org.apache.kafka.clients.CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG;
+import static 
org.apache.kafka.clients.producer.ProducerConfig.LINGER_MS_CONFIG;
+
+public final class TieredStorageTestContext {
+
+private final Seq brokers;
+private final Properties producerConfig;
+private final Properties consumerConfig;
+private final ListenerName listenerName;
+
+private final Serializer ser;
+private final Deserializer de;
+
+private final Map topicSpecs;
+private final TieredStorageTestReport testReport;
+
+private volatile KafkaProducer producer;
+private volatile KafkaConsumer consumer;
+private volatile Admin admin;
+private volatile List tieredStorages;
+private volatile List localStorages;
+
+public TieredStorageTestContext(Seq brokers,
+Properties producerConfig,
+Properties consumerConfig,
+ListenerName listenerName) {
+this.brokers = brokers;
+this.producerConfig = producerConfig;
+this.consumerConfig = consumerConfig;
+this.listenerName = listenerName;
+this.ser = Serdes.String().serializer();
+this.de = Serdes.String().deserializer();
+this.topicSpecs = new HashMap<>();
+this.testReport = new TieredStorageTestReport(this);
+initContext();
+}
+
+private void initContext() {
+String bootstrapServers = TestUtils.bootstrapServers(brokers, 
listenerName);
+producerConfig.put(BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+// Set a producer linger of 

[GitHub] [kafka] kamalcph commented on a diff in pull request #14116: KAFKA-15167: Tiered Storage Test Harness Framework

2023-08-07 Thread via GitHub


kamalcph commented on code in PR #14116:
URL: https://github.com/apache/kafka/pull/14116#discussion_r1286059074


##
core/src/main/scala/kafka/server/KafkaBroker.scala:
##
@@ -19,6 +19,7 @@ package kafka.server
 
 import com.yammer.metrics.core.MetricName
 import kafka.log.LogManager
+import kafka.log.remote.RemoteLogManager

Review Comment:
   This import is being used in L83.
   
   Removed the unused import "scala.collection.Seq" from this class, but it 
throwing below error, so reverted it back:
   
   ```
   ❯ ./gradlew :storage:compileTestJava -PscalaVersion=2.13
   
   > Configure project :
   Starting build with version 3.6.0-SNAPSHOT (commit id d109f88d) using Gradle 
8.2.1, Java 11 and Scala 2.13.11
   Build properties: maxParallelForks=12, maxScalacThreads=8, maxTestRetries=0
   
   > Task :core:compileScala
   [Error] 
/Users/kchandraprakash/projects/kafka/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala:910:69:
 type mismatch;
found   : 
scala.collection.mutable.Buffer[org.apache.kafka.common.metrics.MetricsReporter]
required: Seq[AnyRef]
   [Error] 
/Users/kchandraprakash/projects/kafka/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala:915:61:
 type mismatch;
found   : 
scala.collection.mutable.Buffer[org.apache.kafka.common.metrics.MetricsReporter]
required: Seq[AnyRef]
   [Error] 
/Users/kchandraprakash/projects/kafka/core/src/main/scala/kafka/server/KafkaServer.scala:267:77:
 type mismatch;
found   : Seq[Object] (in scala.collection) 
required: Seq[AnyRef] (in scala.collection.immutable) 
   [Error] 
/Users/kchandraprakash/projects/kafka/core/src/main/scala/kafka/server/KafkaServer.scala:117:15:
 private val kafkaMetricsReporters in class KafkaServer is never used
   four errors found
   ```



##
core/src/main/scala/kafka/server/KafkaBroker.scala:
##
@@ -19,6 +19,7 @@ package kafka.server
 
 import com.yammer.metrics.core.MetricName
 import kafka.log.LogManager
+import kafka.log.remote.RemoteLogManager

Review Comment:
   This import is being used in L83.
   
   Removed the unused import "scala.collection.Seq" from this class, but it 
throwing below error, so reverted it back:
   
   ```
   ❯ ./gradlew :storage:compileTestJava -PscalaVersion=2.13
   
   > Configure project :
   Starting build with version 3.6.0-SNAPSHOT (commit id d109f88d) using Gradle 
8.2.1, Java 11 and Scala 2.13.11
   Build properties: maxParallelForks=12, maxScalacThreads=8, maxTestRetries=0
   
   > Task :core:compileScala
   [Error] 
/Users/kchandraprakash/projects/kafka/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala:910:69:
 type mismatch;
found   : 
scala.collection.mutable.Buffer[org.apache.kafka.common.metrics.MetricsReporter]
required: Seq[AnyRef]
   [Error] 
/Users/kchandraprakash/projects/kafka/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala:915:61:
 type mismatch;
found   : 
scala.collection.mutable.Buffer[org.apache.kafka.common.metrics.MetricsReporter]
required: Seq[AnyRef]
   [Error] 
/Users/kchandraprakash/projects/kafka/core/src/main/scala/kafka/server/KafkaServer.scala:267:77:
 type mismatch;
found   : Seq[Object] (in scala.collection) 
required: Seq[AnyRef] (in scala.collection.immutable) 
   [Error] 
/Users/kchandraprakash/projects/kafka/core/src/main/scala/kafka/server/KafkaServer.scala:117:15:
 private val kafkaMetricsReporters in class KafkaServer is never used
   four errors found
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] kamalcph commented on a diff in pull request #14116: KAFKA-15167: Tiered Storage Test Harness Framework

2023-08-07 Thread via GitHub


kamalcph commented on code in PR #14116:
URL: https://github.com/apache/kafka/pull/14116#discussion_r1285716790


##
core/src/test/scala/unit/kafka/utils/TestUtils.scala:
##
@@ -216,6 +216,14 @@ object TestUtils extends Logging {
 (new Broker(id, host, port, 
ListenerName.forSecurityProtocol(securityProtocol), securityProtocol), epoch)
   }
 
+  /**
+   * Create a test config for the provided parameters.
+   */
+  def createServerConfigs(numConfigs: Int,

Review Comment:
   Removed this method. With `createBrokerConfigs`, we had to supply 18 
parameters from Java code.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] kamalcph commented on a diff in pull request #14116: KAFKA-15167: Tiered Storage Test Harness Framework

2023-08-07 Thread via GitHub


kamalcph commented on code in PR #14116:
URL: https://github.com/apache/kafka/pull/14116#discussion_r1285715433


##
storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestHarness.java:
##
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tiered.storage;
+
+import org.apache.kafka.tiered.storage.utils.BrokerLocalStorage;
+import kafka.api.IntegrationTestHarness;
+import kafka.log.remote.RemoteLogManager;
+import kafka.server.KafkaBroker;
+import kafka.server.KafkaConfig;
+import kafka.utils.TestUtils;
+import org.apache.kafka.common.replica.ReplicaSelector;
+import 
org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManager;
+import 
org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig;
+import 
org.apache.kafka.server.log.remote.storage.ClassLoaderAwareRemoteStorageManager;
+import org.apache.kafka.server.log.remote.storage.LocalTieredStorage;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageManager;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+import scala.collection.Seq;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.stream.Collectors;
+
+import scala.collection.JavaConverters;
+
+import static 
org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.REMOTE_STORAGE_MANAGER_CLASS_NAME_PROP;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.REMOTE_LOG_MANAGER_TASK_INTERVAL_MS_PROP;
+
+import static 
org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX_PROP;
+import static 
org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.REMOTE_STORAGE_MANAGER_CONFIG_PREFIX_PROP;
+
+import static 
org.apache.kafka.server.log.remote.storage.LocalTieredStorage.DELETE_ON_CLOSE_CONFIG;
+import static 
org.apache.kafka.server.log.remote.storage.LocalTieredStorage.STORAGE_DIR_CONFIG;
+
+/**
+ * Base class for integration tests exercising the tiered storage 
functionality in Apache Kafka.
+ */
+public abstract class TieredStorageTestHarness extends IntegrationTestHarness {
+
+/**
+ * InitialTaskDelayMs is set to 30 seconds for the delete-segment 
scheduler in Apache Kafka.
+ * Hence, we need to wait at least that amount of time before segments 
eligible for deletion
+ * gets physically removed.
+ */
+private static final Integer STORAGE_WAIT_TIMEOUT_SEC = 35;
+
+private TieredStorageTestContext context;
+
+protected int numMetadataPartition() {
+return 5;
+}
+
+@SuppressWarnings("deprecation")
+@Override
+public Seq generateConfigs() {
+Properties overridingProps = getOverridingProps();
+List configs =
+
JavaConverters.seqAsJavaList(TestUtils.createServerConfigs(brokerCount(), 
zkConnectOrNull()))
+.stream()
+.map(config -> KafkaConfig.fromProps(config, 
overridingProps))
+.collect(Collectors.toList());
+return JavaConverters.asScalaBuffer(configs).toSeq();
+}
+
+public Properties getOverridingProps() {
+Properties overridingProps = new Properties();
+// Configure the tiered storage in Kafka. Set an interval of 1 second 
for the remote log manager background
+// activity to ensure the tiered storage has enough room to be 
exercised within the lifetime of a test.
+//
+// The replication factor of the remote log metadata topic needs to be 
chosen so that in resiliency
+// tests, metadata can survive the loss of one replica for its 
topic-partitions.
+//
+// The second-tier storage system is mocked via the LocalTieredStorage 
instance 

[GitHub] [kafka] kamalcph commented on a diff in pull request #14116: KAFKA-15167: Tiered Storage Test Harness Framework

2023-08-07 Thread via GitHub


kamalcph commented on code in PR #14116:
URL: https://github.com/apache/kafka/pull/14116#discussion_r1285713904


##
storage/src/test/java/org/apache/kafka/tiered/storage/integration/OffloadAndConsumeFromLeaderTest.java:
##
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tiered.storage.integration;
+
+import org.apache.kafka.tiered.storage.TieredStorageTestBuilder;
+import org.apache.kafka.tiered.storage.TieredStorageTestHarness;
+import org.apache.kafka.tiered.storage.specs.KeyValueSpec;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Test Cases (A):
+ *Elementary offloads and fetches from tiered storage.
+ */
+public final class OffloadAndConsumeFromLeaderTest extends 
TieredStorageTestHarness {

Review Comment:
   Disabled the `TieredStorageTestHarness` test until the trunk build becomes 
stable to test them out. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org