This is an automated email from the ASF dual-hosted git repository.

gwenshap pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 219c221  MINOR: Optimize ConnectionStressWorker
219c221 is described below

commit 219c22113ee3e8c2b9763d8577c50e481fb1ba31
Author: Colin P. Mccabe <cmcc...@confluent.io>
AuthorDate: Fri Mar 29 15:02:10 2019 -0700

    MINOR: Optimize ConnectionStressWorker
    
    Optimize ConnectionStressWorker by avoiding creating a new
    ChannelBuilder each time we want to open a new connection.
    
    Author: Colin P. Mccabe <cmcc...@confluent.io>
    
    Reviewers: Gwen Shapira
    
    Closes #6518 from cmccabe/optimize-connection-stress-worker
---
 .../trogdor/workload/ConnectionStressWorker.java   | 160 +++++++++++++--------
 1 file changed, 100 insertions(+), 60 deletions(-)

diff --git 
a/tools/src/main/java/org/apache/kafka/trogdor/workload/ConnectionStressWorker.java
 
b/tools/src/main/java/org/apache/kafka/trogdor/workload/ConnectionStressWorker.java
index 9a9439a..d42367e 100644
--- 
a/tools/src/main/java/org/apache/kafka/trogdor/workload/ConnectionStressWorker.java
+++ 
b/tools/src/main/java/org/apache/kafka/trogdor/workload/ConnectionStressWorker.java
@@ -36,6 +36,7 @@ import org.apache.kafka.common.network.ChannelBuilder;
 import org.apache.kafka.common.network.Selector;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.trogdor.common.JsonUtil;
 import org.apache.kafka.trogdor.common.Platform;
 import org.apache.kafka.trogdor.common.ThreadUtils;
@@ -131,70 +132,61 @@ public class ConnectionStressWorker implements TaskWorker 
{
         }
     }
 
-    public class ConnectLoop implements Runnable {
-        @Override
-        public void run() {
-            try {
-                Properties props = new Properties();
-                props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
spec.bootstrapServers());
-                WorkerUtils.addConfigsToProperties(props, 
spec.commonClientConf(), spec.commonClientConf());
-                AdminClientConfig conf = new AdminClientConfig(props);
-                List<InetSocketAddress> addresses = 
ClientUtils.parseAndValidateAddresses(
-                        
conf.getList(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG),
-                        
conf.getString(AdminClientConfig.CLIENT_DNS_LOOKUP_CONFIG));
-                ManualMetadataUpdater updater = new 
ManualMetadataUpdater(Cluster.bootstrap(addresses).nodes());
-                while (!doneFuture.isDone()) {
-                    throttle.increment();
-                    long lastTimeMs = throttle.lastTimeMs();
-                    boolean success = false;
-                    switch (spec.action()) {
-                        case CONNECT:
-                            success = attemptConnection(conf, updater);
-                            break;
-                        case FETCH_METADATA:
-                            success = attemptMetadataFetch(props);
-                            break;
-                    }
-                    synchronized (ConnectionStressWorker.this) {
-                        totalConnections++;
-                        if (!success) {
-                            totalFailedConnections++;
-                        }
-                        if (lastTimeMs > nextReportTime)
-                            updateStatus(lastTimeMs);
-                    }
-                }
-            } catch (Exception e) {
-                WorkerUtils.abort(log, "ConnectionStressRunnable", e, 
doneFuture);
+    interface Stressor extends AutoCloseable {
+        static Stressor fromSpec(ConnectionStressSpec spec) {
+            switch (spec.action()) {
+                case CONNECT:
+                    return new ConnectStressor(spec);
+                case FETCH_METADATA:
+                    return new FetchMetadataStressor(spec);
             }
+            throw new RuntimeException("invalid spec.action " + spec.action());
         }
 
-        private boolean attemptConnection(AdminClientConfig conf,
-                                          ManualMetadataUpdater updater) {
+        boolean tryConnect();
+    }
+
+    static class ConnectStressor implements Stressor {
+        private final AdminClientConfig conf;
+        private final ManualMetadataUpdater updater;
+        private final ChannelBuilder channelBuilder;
+
+        ConnectStressor(ConnectionStressSpec spec) {
+            Properties props = new Properties();
+            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
spec.bootstrapServers());
+            WorkerUtils.addConfigsToProperties(props, spec.commonClientConf(), 
spec.commonClientConf());
+            this.conf = new AdminClientConfig(props);
+            List<InetSocketAddress> addresses = 
ClientUtils.parseAndValidateAddresses(
+                conf.getList(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG),
+                conf.getString(AdminClientConfig.CLIENT_DNS_LOOKUP_CONFIG));
+            this.updater = new 
ManualMetadataUpdater(Cluster.bootstrap(addresses).nodes());
+            this.channelBuilder = ClientUtils.createChannelBuilder(conf, TIME);
+        }
+
+        @Override
+        public boolean tryConnect() {
             try {
                 List<Node> nodes = updater.fetchNodes();
                 Node targetNode = 
nodes.get(ThreadLocalRandom.current().nextInt(nodes.size()));
-                try (ChannelBuilder channelBuilder = 
ClientUtils.createChannelBuilder(conf, TIME)) {
-                    try (Metrics metrics = new Metrics()) {
-                        LogContext logContext = new LogContext();
-                        try (Selector selector = new 
Selector(conf.getLong(AdminClientConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG),
-                            metrics, TIME, "", channelBuilder, logContext)) {
-                            try (NetworkClient client = new 
NetworkClient(selector,
-                                    updater,
-                                    "ConnectionStressWorker",
-                                    1,
-                                    1000,
-                                    1000,
-                                    4096,
-                                    4096,
-                                    1000,
-                                    
ClientDnsLookup.forConfig(conf.getString(AdminClientConfig.CLIENT_DNS_LOOKUP_CONFIG)),
-                                    TIME,
-                                    false,
-                                    new ApiVersions(),
-                                    logContext)) {
-                                NetworkClientUtils.awaitReady(client, 
targetNode, TIME, 100);
-                            }
+                try (Metrics metrics = new Metrics()) {
+                    LogContext logContext = new LogContext();
+                    try (Selector selector = new 
Selector(conf.getLong(AdminClientConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG),
+                        metrics, TIME, "", channelBuilder, logContext)) {
+                        try (NetworkClient client = new NetworkClient(selector,
+                            updater,
+                            "ConnectionStressWorker",
+                            1,
+                            1000,
+                            1000,
+                            4096,
+                            4096,
+                            1000,
+                            
ClientDnsLookup.forConfig(conf.getString(AdminClientConfig.CLIENT_DNS_LOOKUP_CONFIG)),
+                            TIME,
+                            false,
+                            new ApiVersions(),
+                            logContext)) {
+                            NetworkClientUtils.awaitReady(client, targetNode, 
TIME, 100);
                         }
                     }
                 }
@@ -204,8 +196,25 @@ public class ConnectionStressWorker implements TaskWorker {
             }
         }
 
-        private boolean attemptMetadataFetch(Properties conf) {
-            try (AdminClient client = AdminClient.create(conf)) {
+        @Override
+        public void close() throws Exception {
+            Utils.closeQuietly(updater, "ManualMetadataUpdater");
+            Utils.closeQuietly(channelBuilder, "ChannelBuilder");
+        }
+    }
+
+    static class FetchMetadataStressor  implements Stressor {
+        private final Properties props;
+
+        FetchMetadataStressor(ConnectionStressSpec spec) {
+            this.props = new Properties();
+            this.props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
spec.bootstrapServers());
+            WorkerUtils.addConfigsToProperties(this.props, 
spec.commonClientConf(), spec.commonClientConf());
+        }
+
+        @Override
+        public boolean tryConnect() {
+            try (AdminClient client = AdminClient.create(this.props)) {
                 client.describeCluster().nodes().get();
             } catch (RuntimeException e) {
                 return false;
@@ -214,6 +223,37 @@ public class ConnectionStressWorker implements TaskWorker {
             }
             return true;
         }
+
+        @Override
+        public void close() throws Exception {
+        }
+    }
+
+    public class ConnectLoop implements Runnable {
+        @Override
+        public void run() {
+            Stressor stressor = Stressor.fromSpec(spec);
+            try {
+                while (!doneFuture.isDone()) {
+                    throttle.increment();
+                    long lastTimeMs = throttle.lastTimeMs();
+                    boolean success = stressor.tryConnect();
+                    synchronized (ConnectionStressWorker.this) {
+                        totalConnections++;
+                        if (!success) {
+                            totalFailedConnections++;
+                        }
+                        if (lastTimeMs > nextReportTime)
+                            updateStatus(lastTimeMs);
+                    }
+                }
+            } catch (Exception e) {
+                WorkerUtils.abort(log, "ConnectionStressRunnable", e, 
doneFuture);
+            } finally {
+                Utils.closeQuietly(stressor, "stressor");
+            }
+        }
+
     }
 
     public static class StatusData {

Reply via email to