This is an automated email from the ASF dual-hosted git repository. gwenshap pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 219c221 MINOR: Optimize ConnectionStressWorker 219c221 is described below commit 219c22113ee3e8c2b9763d8577c50e481fb1ba31 Author: Colin P. Mccabe <cmcc...@confluent.io> AuthorDate: Fri Mar 29 15:02:10 2019 -0700 MINOR: Optimize ConnectionStressWorker Optimize ConnectionStressWorker by avoiding creating a new ChannelBuilder each time we want to open a new connection. Author: Colin P. Mccabe <cmcc...@confluent.io> Reviewers: Gwen Shapira Closes #6518 from cmccabe/optimize-connection-stress-worker --- .../trogdor/workload/ConnectionStressWorker.java | 160 +++++++++++++-------- 1 file changed, 100 insertions(+), 60 deletions(-) diff --git a/tools/src/main/java/org/apache/kafka/trogdor/workload/ConnectionStressWorker.java b/tools/src/main/java/org/apache/kafka/trogdor/workload/ConnectionStressWorker.java index 9a9439a..d42367e 100644 --- a/tools/src/main/java/org/apache/kafka/trogdor/workload/ConnectionStressWorker.java +++ b/tools/src/main/java/org/apache/kafka/trogdor/workload/ConnectionStressWorker.java @@ -36,6 +36,7 @@ import org.apache.kafka.common.network.ChannelBuilder; import org.apache.kafka.common.network.Selector; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Utils; import org.apache.kafka.trogdor.common.JsonUtil; import org.apache.kafka.trogdor.common.Platform; import org.apache.kafka.trogdor.common.ThreadUtils; @@ -131,70 +132,61 @@ public class ConnectionStressWorker implements TaskWorker { } } - public class ConnectLoop implements Runnable { - @Override - public void run() { - try { - Properties props = new Properties(); - props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, spec.bootstrapServers()); - WorkerUtils.addConfigsToProperties(props, spec.commonClientConf(), spec.commonClientConf()); - AdminClientConfig conf = new AdminClientConfig(props); - List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses( - conf.getList(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG), - conf.getString(AdminClientConfig.CLIENT_DNS_LOOKUP_CONFIG)); - ManualMetadataUpdater updater = new ManualMetadataUpdater(Cluster.bootstrap(addresses).nodes()); - while (!doneFuture.isDone()) { - throttle.increment(); - long lastTimeMs = throttle.lastTimeMs(); - boolean success = false; - switch (spec.action()) { - case CONNECT: - success = attemptConnection(conf, updater); - break; - case FETCH_METADATA: - success = attemptMetadataFetch(props); - break; - } - synchronized (ConnectionStressWorker.this) { - totalConnections++; - if (!success) { - totalFailedConnections++; - } - if (lastTimeMs > nextReportTime) - updateStatus(lastTimeMs); - } - } - } catch (Exception e) { - WorkerUtils.abort(log, "ConnectionStressRunnable", e, doneFuture); + interface Stressor extends AutoCloseable { + static Stressor fromSpec(ConnectionStressSpec spec) { + switch (spec.action()) { + case CONNECT: + return new ConnectStressor(spec); + case FETCH_METADATA: + return new FetchMetadataStressor(spec); } + throw new RuntimeException("invalid spec.action " + spec.action()); } - private boolean attemptConnection(AdminClientConfig conf, - ManualMetadataUpdater updater) { + boolean tryConnect(); + } + + static class ConnectStressor implements Stressor { + private final AdminClientConfig conf; + private final ManualMetadataUpdater updater; + private final ChannelBuilder channelBuilder; + + ConnectStressor(ConnectionStressSpec spec) { + Properties props = new Properties(); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, spec.bootstrapServers()); + WorkerUtils.addConfigsToProperties(props, spec.commonClientConf(), spec.commonClientConf()); + this.conf = new AdminClientConfig(props); + List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses( + conf.getList(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG), + conf.getString(AdminClientConfig.CLIENT_DNS_LOOKUP_CONFIG)); + this.updater = new ManualMetadataUpdater(Cluster.bootstrap(addresses).nodes()); + this.channelBuilder = ClientUtils.createChannelBuilder(conf, TIME); + } + + @Override + public boolean tryConnect() { try { List<Node> nodes = updater.fetchNodes(); Node targetNode = nodes.get(ThreadLocalRandom.current().nextInt(nodes.size())); - try (ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(conf, TIME)) { - try (Metrics metrics = new Metrics()) { - LogContext logContext = new LogContext(); - try (Selector selector = new Selector(conf.getLong(AdminClientConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG), - metrics, TIME, "", channelBuilder, logContext)) { - try (NetworkClient client = new NetworkClient(selector, - updater, - "ConnectionStressWorker", - 1, - 1000, - 1000, - 4096, - 4096, - 1000, - ClientDnsLookup.forConfig(conf.getString(AdminClientConfig.CLIENT_DNS_LOOKUP_CONFIG)), - TIME, - false, - new ApiVersions(), - logContext)) { - NetworkClientUtils.awaitReady(client, targetNode, TIME, 100); - } + try (Metrics metrics = new Metrics()) { + LogContext logContext = new LogContext(); + try (Selector selector = new Selector(conf.getLong(AdminClientConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG), + metrics, TIME, "", channelBuilder, logContext)) { + try (NetworkClient client = new NetworkClient(selector, + updater, + "ConnectionStressWorker", + 1, + 1000, + 1000, + 4096, + 4096, + 1000, + ClientDnsLookup.forConfig(conf.getString(AdminClientConfig.CLIENT_DNS_LOOKUP_CONFIG)), + TIME, + false, + new ApiVersions(), + logContext)) { + NetworkClientUtils.awaitReady(client, targetNode, TIME, 100); } } } @@ -204,8 +196,25 @@ public class ConnectionStressWorker implements TaskWorker { } } - private boolean attemptMetadataFetch(Properties conf) { - try (AdminClient client = AdminClient.create(conf)) { + @Override + public void close() throws Exception { + Utils.closeQuietly(updater, "ManualMetadataUpdater"); + Utils.closeQuietly(channelBuilder, "ChannelBuilder"); + } + } + + static class FetchMetadataStressor implements Stressor { + private final Properties props; + + FetchMetadataStressor(ConnectionStressSpec spec) { + this.props = new Properties(); + this.props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, spec.bootstrapServers()); + WorkerUtils.addConfigsToProperties(this.props, spec.commonClientConf(), spec.commonClientConf()); + } + + @Override + public boolean tryConnect() { + try (AdminClient client = AdminClient.create(this.props)) { client.describeCluster().nodes().get(); } catch (RuntimeException e) { return false; @@ -214,6 +223,37 @@ public class ConnectionStressWorker implements TaskWorker { } return true; } + + @Override + public void close() throws Exception { + } + } + + public class ConnectLoop implements Runnable { + @Override + public void run() { + Stressor stressor = Stressor.fromSpec(spec); + try { + while (!doneFuture.isDone()) { + throttle.increment(); + long lastTimeMs = throttle.lastTimeMs(); + boolean success = stressor.tryConnect(); + synchronized (ConnectionStressWorker.this) { + totalConnections++; + if (!success) { + totalFailedConnections++; + } + if (lastTimeMs > nextReportTime) + updateStatus(lastTimeMs); + } + } + } catch (Exception e) { + WorkerUtils.abort(log, "ConnectionStressRunnable", e, doneFuture); + } finally { + Utils.closeQuietly(stressor, "stressor"); + } + } + } public static class StatusData {