nizhikov commented on code in PR #14588:
URL: https://github.com/apache/kafka/pull/14588#discussion_r1387612584


##########
tools/src/test/java/org/apache/kafka/tools/other/ReplicationQuotasTestRig.java:
##########
@@ -0,0 +1,479 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools.other;
+
+import kafka.log.UnifiedLog;
+import kafka.server.KafkaConfig;
+import kafka.server.KafkaServer;
+import kafka.server.QuorumTestHarness;
+import kafka.server.QuotaType;
+import kafka.utils.EmptyTestInfo;
+import kafka.utils.Exit;
+import kafka.utils.TestUtils;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.TopicPartitionInfo;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.tools.reassign.ReassignPartitionsCommand;
+import org.apache.log4j.PropertyConfigurator;
+import org.jfree.chart.ChartFactory;
+import org.jfree.chart.ChartFrame;
+import org.jfree.chart.JFreeChart;
+import org.jfree.chart.plot.PlotOrientation;
+import org.jfree.data.xy.XYSeries;
+import org.jfree.data.xy.XYSeriesCollection;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Option;
+import scala.collection.JavaConverters;
+import scala.collection.Seq;
+
+import javax.imageio.ImageIO;
+import java.io.File;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.PrintWriter;
+import java.nio.file.Files;
+import java.text.DecimalFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.TreeMap;
+import java.util.concurrent.ExecutionException;
+import java.util.function.Function;
+import java.util.function.IntSupplier;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static java.nio.file.StandardOpenOption.APPEND;
+import static java.nio.file.StandardOpenOption.CREATE;
+import static org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS;
+
+/**
+ * Test rig for measuring throttling performance. Configure the parameters for 
a set of experiments, then execute them
+ * and view the html output file, with charts, that are produced. You can also 
render the charts to the screen if
+ * you wish.
+ *
+ * Currently you'll need about 40GB of disk space to run these experiments 
(largest data written x2). Tune the msgSize
+ * & #partitions and throttle to adjust this.
+ */
+public class ReplicationQuotasTestRig {
+    public static final Logger LOGGER = 
LoggerFactory.getLogger(ReplicationQuotasTestRig.class);
+
+    public static final int K = 1000 * 1000;
+
+    private static final String DIR;
+
+    static {
+        
PropertyConfigurator.configure("core/src/test/resources/log4j.properties");
+
+        new File("Experiments").mkdir();
+        DIR = "Experiments/Run" + 
Long.valueOf(System.currentTimeMillis()).toString().substring(8);
+        new File(DIR).mkdir();
+    }
+
+    public static void main(String[] args) {
+        boolean displayChartsOnScreen = args.length > 0 && 
Objects.equals(args[0], "show-gui");
+        Journal journal = new Journal();
+
+        List<ExperimentDef> experiments = Arrays.asList(
+            //1GB total data written, will take 210s
+            new ExperimentDef("Experiment1", 5, 20, 1 * K, 500, 100 * 1000),
+            //5GB total data written, will take 110s
+            new ExperimentDef("Experiment2", 5, 50, 10 * K, 1000, 100 * 1000),
+            //5GB total data written, will take 110s
+            new ExperimentDef("Experiment3", 50, 50, 2 * K, 1000, 100 * 1000),
+            //10GB total data written, will take 110s
+            new ExperimentDef("Experiment4", 25, 100, 4 * K, 1000, 100 * 1000),
+            //10GB total data written, will take 80s
+            new ExperimentDef("Experiment5", 5, 50, 50 * K, 4000, 100 * 1000)
+        );
+        experiments.forEach(def -> run(def, journal, displayChartsOnScreen));
+
+        if (!displayChartsOnScreen)
+            Exit.exit(0, Option.empty());
+    }
+
+    static void run(ExperimentDef config, Journal journal, boolean 
displayChartsOnScreen) {
+        Experiment experiment = new Experiment();
+        try {
+            experiment.setUp(new EmptyTestInfo());
+            experiment.run(config, journal, displayChartsOnScreen);
+            journal.footer();
+        } catch (Exception e) {
+            e.printStackTrace();
+        } finally {
+            experiment.tearDown();
+        }
+    }
+
+    static class ExperimentDef {
+        String name;
+        int brokers;
+        int partitions;
+        long throttle;
+        int msgsPerPartition;
+        int msgSize;
+
+        public ExperimentDef(String name, int brokers, int partitions, long 
throttle, int msgsPerPartition, int msgSize) {
+            this.name = name;
+            this.brokers = brokers;
+            this.partitions = partitions;
+            this.throttle = throttle;
+            this.msgsPerPartition = msgsPerPartition;
+            this.msgSize = msgSize;
+        }
+
+        long targetBytesPerBrokerMB() {
+            return (long) msgsPerPartition * (long) msgSize * (long) 
partitions / brokers / 1_000_000;
+        }
+    }
+
+    static class Experiment extends QuorumTestHarness {
+        static final String TOPIC_NAME = "my-topic";
+
+        String experimentName = "unset";
+        List<KafkaServer> servers;
+        Map<Integer, List<Double>> leaderRates = new HashMap<>();
+        Map<Integer, List<Double>> followerRates = new HashMap<>();
+        Admin adminClient;
+
+        void startBrokers(List<Integer> brokerIds) {
+            System.out.println("Starting Brokers");
+            servers = brokerIds.stream().map(i -> createBrokerConfig(i, 
zkConnect()))
+                .map(c -> TestUtils.createServer(KafkaConfig.fromProps(c), 
Time.SYSTEM))
+                .collect(Collectors.toList());
+
+            TestUtils.waitUntilBrokerMetadataIsPropagated(seq(servers), 
DEFAULT_MAX_WAIT_MS);
+            String brokerList = 
TestUtils.plaintextBootstrapServers(seq(servers));
+            adminClient = Admin.create(Collections.singletonMap(
+                AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList
+            ));
+        }
+
+        @Override public void tearDown() {
+            Utils.closeQuietly(adminClient, "adminClient");
+            TestUtils.shutdownServers(seq(servers), true);
+            super.tearDown();
+        }
+
+        @SuppressWarnings({"unchecked", "deprecation"})
+        public void run(ExperimentDef config, Journal journal, boolean 
displayChartsOnScreen) throws Exception {
+            experimentName = config.name;
+            List<Integer> brokers = IntStream.rangeClosed(100, 100 + 
config.brokers).boxed().collect(Collectors.toList());

Review Comment:
   boxed invocation required to convert int primitive to Integer wrapper.
   Because, only Object can be added to `List` not primitives.
   
   So, it seems usage of `range` or `rangeClosed` not related to `boxed` 
invocation.
   Am I missing something?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to