dl: add flag to enable thrift mux on DL Client RB_ID=839555
Project: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/commit/d3a97bc0 Tree: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/tree/d3a97bc0 Diff: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/diff/d3a97bc0 Branch: refs/heads/merge/DL-98 Commit: d3a97bc0dde0c25516840725599ac46fa03601ab Parents: 98dc9ab Author: Dave Rusek <dru...@twitter.com> Authored: Mon Jun 6 16:50:25 2016 -0700 Committer: Sijie Guo <sij...@twitter.com> Committed: Mon Dec 12 16:39:04 2016 -0800 ---------------------------------------------------------------------- .../distributedlog/service/MonitorService.java | 28 ++++++++++++++------ .../service/MonitorServiceApp.java | 1 + 2 files changed, 21 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/d3a97bc0/distributedlog-service/src/main/java/com/twitter/distributedlog/service/MonitorService.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/MonitorService.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/MonitorService.java index 2683b47..6b58eff 100644 --- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/MonitorService.java +++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/MonitorService.java @@ -85,6 +85,7 @@ public class MonitorService implements NamespaceListener { private int heartbeatEveryChecks = 0; private int instanceId = -1; private int totalInstances = -1; + private boolean isThriftMux = false; // Options private final Optional<String> uriArg; @@ -98,6 +99,7 @@ public class MonitorService implements NamespaceListener { private final Optional<Integer> heartbeatEveryChecksArg; private final Optional<Boolean> handshakeWithClientInfoArg; private final Optional<Boolean> watchNamespaceChangesArg; + private final Optional<Boolean> isThriftMuxArg; // Stats private final StatsProvider statsProvider; @@ -224,6 +226,7 @@ public class MonitorService implements NamespaceListener { Optional<Integer> heartbeatEveryChecksArg, Optional<Boolean> handshakeWithClientInfoArg, Optional<Boolean> watchNamespaceChangesArg, + Optional<Boolean> isThriftMuxArg, StatsReceiver statsReceiver, StatsProvider statsProvider) { // options @@ -238,6 +241,7 @@ public class MonitorService implements NamespaceListener { this.heartbeatEveryChecksArg = heartbeatEveryChecksArg; this.handshakeWithClientInfoArg = handshakeWithClientInfoArg; this.watchNamespaceChangesArg = watchNamespaceChangesArg; + this.isThriftMuxArg = isThriftMuxArg; // Stats this.statsReceiver = statsReceiver; @@ -275,6 +279,7 @@ public class MonitorService implements NamespaceListener { } handshakeWithClientInfo = handshakeWithClientInfoArg.isPresent(); watchNamespaceChanges = watchNamespaceChangesArg.isPresent(); + isThriftMux = isThriftMuxArg.isPresent(); URI uri = URI.create(uriArg.get()); DistributedLogConfiguration dlConf = new DistributedLogConfiguration(); if (confFileArg.isPresent()) { @@ -300,8 +305,22 @@ public class MonitorService implements NamespaceListener { ServerSet[] remotes = new ServerSet[serverSets.length - 1]; System.arraycopy(serverSets, 1, remotes, 0, remotes.length); + ClientBuilder finagleClientBuilder = ClientBuilder.get() + .connectTimeout(Duration.fromSeconds(1)) + .tcpConnectTimeout(Duration.fromSeconds(1)) + .requestTimeout(Duration.fromSeconds(2)) + .keepAlive(true) + .failFast(false); + + if (!isThriftMux) { + finagleClientBuilder = finagleClientBuilder + .hostConnectionLimit(2) + .hostConnectionCoresize(2); + } + dlClient = DistributedLogClientBuilder.newBuilder() .name("monitor") + .thriftmux(isThriftMux) .clientId(ClientId$.MODULE$.apply("monitor")) .redirectBackoffMaxMs(50) .redirectBackoffStartMs(100) @@ -310,14 +329,7 @@ public class MonitorService implements NamespaceListener { .serverSets(local, remotes) .streamNameRegex(streamRegex) .handshakeWithClientInfo(handshakeWithClientInfo) - .clientBuilder(ClientBuilder.get() - .connectTimeout(Duration.fromSeconds(1)) - .tcpConnectTimeout(Duration.fromSeconds(1)) - .requestTimeout(Duration.fromSeconds(2)) - .hostConnectionLimit(2) - .hostConnectionCoresize(2) - .keepAlive(true) - .failFast(false)) + .clientBuilder(finagleClientBuilder) .statsReceiver(monitorReceiver.scope("client")) .buildMonitorClient(); runMonitor(dlConf, uri); http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/d3a97bc0/distributedlog-service/src/main/java/com/twitter/distributedlog/service/MonitorServiceApp.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/MonitorServiceApp.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/MonitorServiceApp.java index 90d3566..a51a6a9 100644 --- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/MonitorServiceApp.java +++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/MonitorServiceApp.java @@ -99,6 +99,7 @@ public class MonitorServiceApp { getOptionalIntegerArg(cmdline, "hck"), getOptionalBooleanArg(cmdline, "hsci"), getOptionalBooleanArg(cmdline, "w"), + getOptionalBooleanArg(cmdline, "mx"), statsReceiver, statsProvider);