Repository: incubator-distributedlog Updated Branches: refs/heads/master b9d21568c -> 05a8daa2a
DL-39: Use a distributedlog uri to configure write proxy routing address Author: Jon Derrick <[email protected]> Reviewers: Sijie Guo <[email protected]> Closes #18 from jderrickk/jd/uri_resolver Project: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/commit/05a8daa2 Tree: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/tree/05a8daa2 Diff: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/diff/05a8daa2 Branch: refs/heads/master Commit: 05a8daa2aa468fe82f6ca919f7d93f0bab86c522 Parents: b9d2156 Author: Jon Derrick <[email protected]> Authored: Wed Sep 7 01:40:17 2016 -0700 Committer: Sijie Guo <[email protected]> Committed: Wed Sep 7 01:40:17 2016 -0700 ---------------------------------------------------------------------- distributedlog-benchmark/bin/dbench | 2 -- distributedlog-benchmark/conf/dlogenv.sh | 3 --- .../distributedlog/benchmark/Benchmarker.java | 11 +++++--- .../distributedlog/benchmark/ReaderWorker.java | 5 +++- .../distributedlog/benchmark/WriterWorker.java | 9 +++++-- .../service/DistributedLogClientBuilder.java | 28 ++++++++++++++++++++ docs/operations/deployment.rst | 4 +-- 7 files changed, 47 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/05a8daa2/distributedlog-benchmark/bin/dbench ---------------------------------------------------------------------- diff --git a/distributedlog-benchmark/bin/dbench b/distributedlog-benchmark/bin/dbench index 667c6a5..b84e133 100755 --- a/distributedlog-benchmark/bin/dbench +++ b/distributedlog-benchmark/bin/dbench @@ -182,7 +182,6 @@ elif [ $COMMAND == "write" ]; then --max-rate ${MAX_RATE} \\ --change-rate ${CHANGE_RATE} \\ --change-interval ${CHANGE_RATE_INTERVAL} \\ - --finagle-name ${DL_WP_FINAGLE_NAME} """ BENCH_ARGS="${BENCH_ARGS} \\ ${BENCH_WRITE_ARGS} \\ --mode write \\" exec java $OPTS $JMX_ARGS com.twitter.distributedlog.benchmark.Benchmarker $BENCH_ARGS $@ @@ -191,7 +190,6 @@ elif [ $COMMAND == "read" ]; then --readers-per-stream ${NUM_READERS_PER_STREAM} \\ --max-stream-id ${MAX_STREAM_ID} \\ --truncation-interval ${TRUNCATION_INTERVAL} \\ - --finagle-name ${DL_WP_FINAGLE_NAME} """ BENCH_ARGS="${BENCH_ARGS} \\ ${BENCH_READ_ARGS} \\ --mode read \\" exec java $OPTS $JMX_ARGS com.twitter.distributedlog.benchmark.Benchmarker $BENCH_ARGS $@ http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/05a8daa2/distributedlog-benchmark/conf/dlogenv.sh ---------------------------------------------------------------------- diff --git a/distributedlog-benchmark/conf/dlogenv.sh b/distributedlog-benchmark/conf/dlogenv.sh index 608294e..205606b 100644 --- a/distributedlog-benchmark/conf/dlogenv.sh +++ b/distributedlog-benchmark/conf/dlogenv.sh @@ -76,9 +76,6 @@ CHANGE_RATE=100 # Rate change interval, in seconds CHANGE_RATE_INTERVAL=300 -# DL Write Proxy Finagle Name -DL_WP_FINAGLE_NAME='zk!127.0.0.1:2181!/messaging/distributedlog/mynamespace/.write_proxy' - ########## # Reader ########## http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/05a8daa2/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/Benchmarker.java ---------------------------------------------------------------------- diff --git a/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/Benchmarker.java b/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/Benchmarker.java index 25b70a4..7114dbb 100644 --- a/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/Benchmarker.java +++ b/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/Benchmarker.java @@ -265,8 +265,8 @@ public class Benchmarker { } Worker runWriter() { - Preconditions.checkArgument(!finagleNames.isEmpty() || !serversetPaths.isEmpty(), - "either serverset paths or finagle-names required"); + Preconditions.checkArgument(!finagleNames.isEmpty() || !serversetPaths.isEmpty() || null != dlUri, + "either serverset paths, finagle-names or uri required"); Preconditions.checkArgument(msgSize > 0, "messagesize must be greater than 0"); Preconditions.checkArgument(rate > 0, "rate must be greater than 0"); Preconditions.checkArgument(maxRate >= rate, "max rate must be greater than rate"); @@ -278,6 +278,7 @@ public class Benchmarker { new ShiftableRateLimiter(rate, maxRate, changeRate, changeRateSeconds, TimeUnit.SECONDS); return createWriteWorker( streamPrefix, + dlUri, null == startStreamId ? shardId * numStreams : startStreamId, null == endStreamId ? (shardId + 1) * numStreams : endStreamId, rateLimiter, @@ -299,6 +300,7 @@ public class Benchmarker { protected WriterWorker createWriteWorker( String streamPrefix, + URI uri, int startStreamId, int endStreamId, ShiftableRateLimiter rateLimiter, @@ -318,6 +320,7 @@ public class Benchmarker { boolean enableBatching) { return new WriterWorker( streamPrefix, + uri, startStreamId, endStreamId, rateLimiter, @@ -360,8 +363,8 @@ public class Benchmarker { } Worker runReader() throws IOException { - Preconditions.checkArgument(!finagleNames.isEmpty() || !serversetPaths.isEmpty(), - "either serverset paths or finagle-names required"); + Preconditions.checkArgument(!finagleNames.isEmpty() || !serversetPaths.isEmpty() || null != dlUri, + "either serverset paths, finagle-names or dlUri required"); Preconditions.checkArgument(concurrency > 0, "concurrency must be greater than 0"); Preconditions.checkArgument(truncationInterval > 0, "truncation interval should be greater than 0"); return runReaderInternal(serversetPaths, finagleNames, truncationInterval); http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/05a8daa2/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/ReaderWorker.java ---------------------------------------------------------------------- diff --git a/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/ReaderWorker.java b/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/ReaderWorker.java index 4bc55d2..5b34939 100644 --- a/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/ReaderWorker.java +++ b/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/ReaderWorker.java @@ -254,7 +254,7 @@ public class ReaderWorker implements Worker { builder = builder.finagleNameStrs(local, remotes); LOG.info("Initialized distributedlog client for truncation @ {}.", finagleNames); - } else { + } else if (serverSets.length != 0){ ServerSet local = this.serverSets[0].getServerSet(); ServerSet[] remotes = new ServerSet[this.serverSets.length - 1]; for (int i = 1; i < serverSets.length; i++) { @@ -263,6 +263,9 @@ public class ReaderWorker implements Worker { builder = builder.serverSets(local, remotes); LOG.info("Initialized distributedlog client for truncation @ {}.", serverSetPaths); + } else { + builder = builder.uri(uri); + LOG.info("Initialized distributedlog client for namespace {}", uri); } dlc = builder.build(); } else { http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/05a8daa2/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/WriterWorker.java ---------------------------------------------------------------------- diff --git a/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/WriterWorker.java b/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/WriterWorker.java index 5a961df..92fc090 100644 --- a/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/WriterWorker.java +++ b/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/WriterWorker.java @@ -65,6 +65,7 @@ public class WriterWorker implements Worker { final int hostConnectionLimit; final ExecutorService executorService; final ShiftableRateLimiter rateLimiter; + final URI dlUri; final DLZkServerSet[] serverSets; final List<String> finagleNames; final Random random; @@ -86,6 +87,7 @@ public class WriterWorker implements Worker { final StatsLogger dlErrorCodeLogger; public WriterWorker(String streamPrefix, + URI uri, int startStreamId, int endStreamId, ShiftableRateLimiter rateLimiter, @@ -106,6 +108,7 @@ public class WriterWorker implements Worker { Preconditions.checkArgument(startStreamId <= endStreamId); Preconditions.checkArgument(!finagleNames.isEmpty() || !serverSetPaths.isEmpty()); this.streamPrefix = streamPrefix; + this.dlUri = uri; this.startStreamId = startStreamId; this.endStreamId = endStreamId; this.rateLimiter = rateLimiter; @@ -184,19 +187,21 @@ public class WriterWorker implements Worker { .handshakeTracing(true) .name("writer"); - if (serverSets.length == 0) { + if (!finagleNames.isEmpty()) { String local = finagleNames.get(0); String[] remotes = new String[finagleNames.size() - 1]; finagleNames.subList(1, finagleNames.size()).toArray(remotes); builder = builder.finagleNameStrs(local, remotes); - } else { + } else if (serverSets.length != 0){ ServerSet local = serverSets[0].getServerSet(); ServerSet[] remotes = new ServerSet[serverSets.length - 1]; for (int i = 1; i < serverSets.length; i++) { remotes[i-1] = serverSets[i].getServerSet(); } builder = builder.serverSets(local, remotes); + } else { + builder = builder.uri(dlUri); } return builder.build(); http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/05a8daa2/distributedlog-client/src/main/java/com/twitter/distributedlog/service/DistributedLogClientBuilder.java ---------------------------------------------------------------------- diff --git a/distributedlog-client/src/main/java/com/twitter/distributedlog/service/DistributedLogClientBuilder.java b/distributedlog-client/src/main/java/com/twitter/distributedlog/service/DistributedLogClientBuilder.java index c4a82e5..5c6a54b 100644 --- a/distributedlog-client/src/main/java/com/twitter/distributedlog/service/DistributedLogClientBuilder.java +++ b/distributedlog-client/src/main/java/com/twitter/distributedlog/service/DistributedLogClientBuilder.java @@ -32,11 +32,16 @@ import com.twitter.finagle.builder.ClientBuilder; import com.twitter.finagle.stats.NullStatsReceiver; import com.twitter.finagle.stats.StatsReceiver; import com.twitter.finagle.thrift.ClientId; +import org.apache.commons.lang.StringUtils; import java.net.SocketAddress; +import java.net.URI; +import java.util.Random; public final class DistributedLogClientBuilder { + private static final Random random = new Random(System.currentTimeMillis()); + private String _name = null; private ClientId _clientId = null; private RoutingService.Builder _routingServiceBuilder = null; @@ -171,6 +176,29 @@ public final class DistributedLogClientBuilder { } /** + * URI to access proxy services. Assuming the write proxies are announced under `.write_proxy` of + * the provided namespace uri. + * <p> + * The builder will convert the dl uri (e.g. distributedlog://{zkserver}/path/to/namespace) to + * zookeeper serverset based finagle name str (`zk!{zkserver}!/path/to/namespace/.write_proxy`) + * + * @param uri namespace uri to access the serverset of write proxies + * @return distributedlog builder + */ + public DistributedLogClientBuilder uri(URI uri) { + DistributedLogClientBuilder newBuilder = newBuilder(this); + String zkServers = uri.getAuthority().replace(";", ","); + String[] zkServerList = StringUtils.split(zkServers, ','); + String finagleNameStr = String.format( + "zk!%s!%s/.write_proxy", + zkServerList[random.nextInt(zkServerList.length)], // zk server + uri.getPath()); + newBuilder._routingServiceBuilder = RoutingUtils.buildRoutingService(finagleNameStr); + newBuilder._enableRegionStats = false; + return newBuilder; + } + + /** * Address of write proxy to connect. * * @param address http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/05a8daa2/docs/operations/deployment.rst ---------------------------------------------------------------------- diff --git a/docs/operations/deployment.rst b/docs/operations/deployment.rst index 08191f4..41240db 100644 --- a/docs/operations/deployment.rst +++ b/docs/operations/deployment.rst @@ -464,9 +464,7 @@ Write Proxy Naming ++++++++++++++++++ The `dlog-daemon.sh` script starts the write proxy by announcing it to the `.write_proxy` path under -the dl namespace. So you could use `zk!<zkservers>!/<namespace_path>/.write_proxy` as the finagle name -to access the write proxy cluster. It is `zk!127.0.0.1:2181!/messaging/distributedlog/mynamespace/.write_proxy` -in the above example. +the dl namespace. So you could use uri in the distributedlog client builder to access the write proxy cluster. Verify the setup ++++++++++++++++
