DL-102: Add routing service to write proxy server side this change is to add getOwner rpc in write proxy. so we can change the client side to get owner from write proxy first for routing service. in this way, we can start experiementing any resource placement algorithms.
Project: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/commit/16d73c35 Tree: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/tree/16d73c35 Diff: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/diff/16d73c35 Branch: refs/heads/master Commit: 16d73c35ee45b99594bc7c840dc028337b7de859 Parents: 9ee7d01 Author: Leigh Stewart <[email protected]> Authored: Thu Jul 28 21:36:49 2016 -0700 Committer: Sijie Guo <[email protected]> Committed: Tue Dec 27 16:49:27 2016 -0800 ---------------------------------------------------------------------- .../routing/SingleHostRoutingService.java | 9 ++- .../proxy/MockDistributedLogServices.java | 5 ++ .../distributedlog/BKDistributedLogManager.java | 1 + .../src/main/thrift/service.thrift | 7 +- .../service/DistributedLogCluster.java | 40 ++++++++++-- .../service/DistributedLogServer.java | 20 +++++- .../service/DistributedLogServerApp.java | 18 +++++- .../service/DistributedLogServiceImpl.java | 68 ++++++++++++++++++++ .../distributedlog/service/ResponseUtils.java | 4 ++ .../client/routing/LocalRoutingService.java | 3 +- .../service/DistributedLogServerTestCase.java | 7 +- .../service/TestDistributedLogService.java | 49 ++++++++++++++ 12 files changed, 214 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/16d73c35/distributedlog-client/src/main/java/com/twitter/distributedlog/client/routing/SingleHostRoutingService.java ---------------------------------------------------------------------- diff --git a/distributedlog-client/src/main/java/com/twitter/distributedlog/client/routing/SingleHostRoutingService.java b/distributedlog-client/src/main/java/com/twitter/distributedlog/client/routing/SingleHostRoutingService.java index 15356ff..e526868 100644 --- a/distributedlog-client/src/main/java/com/twitter/distributedlog/client/routing/SingleHostRoutingService.java +++ b/distributedlog-client/src/main/java/com/twitter/distributedlog/client/routing/SingleHostRoutingService.java @@ -29,9 +29,8 @@ import java.util.concurrent.CopyOnWriteArraySet; /** * Single Host Routing Service. */ -class SingleHostRoutingService implements RoutingService { +public class SingleHostRoutingService implements RoutingService { - @Deprecated public static SingleHostRoutingService of(SocketAddress address) { return new SingleHostRoutingService(address); } @@ -71,7 +70,7 @@ class SingleHostRoutingService implements RoutingService { } } - private final SocketAddress address; + private SocketAddress address; private final CopyOnWriteArraySet<RoutingListener> listeners = new CopyOnWriteArraySet<RoutingListener>(); @@ -79,6 +78,10 @@ class SingleHostRoutingService implements RoutingService { this.address = address; } + public void setAddress(SocketAddress address) { + this.address = address; + } + @Override public Set<SocketAddress> getHosts() { return Sets.newHashSet(address); http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/16d73c35/distributedlog-client/src/test/java/com/twitter/distributedlog/client/proxy/MockDistributedLogServices.java ---------------------------------------------------------------------- diff --git a/distributedlog-client/src/test/java/com/twitter/distributedlog/client/proxy/MockDistributedLogServices.java b/distributedlog-client/src/test/java/com/twitter/distributedlog/client/proxy/MockDistributedLogServices.java index 13ba044..f088c0d 100644 --- a/distributedlog-client/src/test/java/com/twitter/distributedlog/client/proxy/MockDistributedLogServices.java +++ b/distributedlog-client/src/test/java/com/twitter/distributedlog/client/proxy/MockDistributedLogServices.java @@ -105,6 +105,11 @@ public class MockDistributedLogServices { } @Override + public Future<WriteResponse> getOwner(String stream, WriteContext ctx) { + return Future.value(new WriteResponse()); + } + + @Override public Future<Void> setAcceptNewStream(boolean enabled) { return Future.value(null); } http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/16d73c35/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogManager.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogManager.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogManager.java index 75a5b83..ae8ae12 100644 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogManager.java +++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogManager.java @@ -75,6 +75,7 @@ import org.apache.bookkeeper.stats.AlertStatsLogger; import org.apache.bookkeeper.feature.FeatureProvider; import org.apache.bookkeeper.stats.NullStatsLogger; import org.apache.bookkeeper.stats.StatsLogger; +import org.apache.bookkeeper.versioning.Versioned; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZKUtil; http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/16d73c35/distributedlog-protocol/src/main/thrift/service.thrift ---------------------------------------------------------------------- diff --git a/distributedlog-protocol/src/main/thrift/service.thrift b/distributedlog-protocol/src/main/thrift/service.thrift index 4c0eaf1..a25af63 100644 --- a/distributedlog-protocol/src/main/thrift/service.thrift +++ b/distributedlog-protocol/src/main/thrift/service.thrift @@ -94,7 +94,7 @@ enum StatusCode { CHECKSUM_FAILED = 523, /* Overcapacity: too many streams */ TOO_MANY_STREAMS = 524, - // Log Segment Not Found + /* Log Segment Not Found */ LOG_SEGMENT_NOT_FOUND = 525, /* 6xx: unexpected */ @@ -167,14 +167,17 @@ struct ClientInfo { service DistributedLogService { + /* Deprecated */ ServerInfo handshake(); ServerInfo handshakeWithClientInfo(ClientInfo clientInfo); + /* Deprecated */ WriteResponse heartbeat(string stream, WriteContext ctx); WriteResponse heartbeatWithOptions(string stream, WriteContext ctx, HeartbeatOptions options); + /* Deprecated */ WriteResponse write(string stream, binary data); WriteResponse writeWithContext(string stream, binary data, WriteContext ctx); @@ -189,6 +192,8 @@ service DistributedLogService { WriteResponse delete(string stream, WriteContext ctx); + WriteResponse getOwner(string stream, WriteContext ctx); + /* Admin Methods */ void setAcceptNewStream(bool enabled); } http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/16d73c35/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogCluster.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogCluster.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogCluster.java index 3e7948d..0ce335b 100644 --- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogCluster.java +++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogCluster.java @@ -19,6 +19,7 @@ package com.twitter.distributedlog.service; import com.twitter.distributedlog.DistributedLogConfiguration; import com.twitter.distributedlog.LocalDLMEmulator; +import com.twitter.distributedlog.client.routing.SingleHostRoutingService; import com.twitter.distributedlog.metadata.BKDLConfig; import com.twitter.distributedlog.metadata.DLMetadata; import com.twitter.distributedlog.service.streamset.IdentityStreamPartitionConverter; @@ -63,6 +64,7 @@ public class DistributedLogCluster { int _zkPort = 0; boolean _shouldStartProxy = true; int _proxyPort = 7000; + boolean _thriftmux = false; DistributedLogConfiguration _dlConf = new DistributedLogConfiguration() .setLockTimeout(10) .setOutputBufferSize(0) @@ -165,6 +167,17 @@ public class DistributedLogCluster { return this; } + /** + * Enable thriftmux for the dl server + * + * @param enabled flag to enable thriftmux + * @return builder + */ + public Builder thriftmux(boolean enabled) { + this._thriftmux = enabled; + return this; + } + public DistributedLogCluster build() throws Exception { // build the cluster return new DistributedLogCluster( @@ -175,7 +188,8 @@ public class DistributedLogCluster { _zkHost, _zkPort, _shouldStartProxy, - _proxyPort); + _proxyPort, + _thriftmux); } } @@ -189,8 +203,12 @@ public class DistributedLogCluster { public final InetSocketAddress address; public final Pair<DistributedLogServiceImpl, Server> dlServer; + private final SingleHostRoutingService routingService = SingleHostRoutingService.of(null); - protected DLServer(DistributedLogConfiguration dlConf, URI uri, int basePort) throws Exception { + protected DLServer(DistributedLogConfiguration dlConf, + URI uri, + int basePort, + boolean thriftmux) throws Exception { proxyPort = basePort; boolean success = false; @@ -207,8 +225,12 @@ public class DistributedLogCluster { dlConf, uri, new IdentityStreamPartitionConverter(), + routingService, new NullStatsProvider(), - proxyPort); + proxyPort, + thriftmux); + routingService.setAddress(DLSocketAddress.getSocketAddress(proxyPort)); + routingService.startService(); success = true; } catch (BindException be) { retries++; @@ -234,6 +256,7 @@ public class DistributedLogCluster { public void shutdown() { DistributedLogServer.closeServer(dlServer, 0, TimeUnit.MILLISECONDS); + routingService.stopService(); } } @@ -243,6 +266,7 @@ public class DistributedLogCluster { private DLServer dlServer; private final boolean shouldStartProxy; private final int proxyPort; + private final boolean thriftmux; private final List<File> tmpDirs = new ArrayList<File>(); private DistributedLogCluster(DistributedLogConfiguration dlConf, @@ -252,7 +276,8 @@ public class DistributedLogCluster { String zkServers, int zkPort, boolean shouldStartProxy, - int proxyPort) throws Exception { + int proxyPort, + boolean thriftmux) throws Exception { this.dlConf = dlConf; if (shouldStartZK) { File zkTmpDir = IOUtils.createTempDir("zookeeper", "distrlog"); @@ -276,6 +301,7 @@ public class DistributedLogCluster { .build(); this.shouldStartProxy = shouldStartProxy; this.proxyPort = proxyPort; + this.thriftmux = thriftmux; } public void start() throws Exception { @@ -283,7 +309,11 @@ public class DistributedLogCluster { BKDLConfig bkdlConfig = new BKDLConfig(this.dlmEmulator.getZkServers(), "/ledgers").setACLRootPath(".acl"); DLMetadata.create(bkdlConfig).update(this.dlmEmulator.getUri()); if (shouldStartProxy) { - this.dlServer = new DLServer(dlConf, this.dlmEmulator.getUri(), proxyPort); + this.dlServer = new DLServer( + dlConf, + this.dlmEmulator.getUri(), + proxyPort, + thriftmux); } else { this.dlServer = null; } http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/16d73c35/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServer.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServer.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServer.java index 6ef99b8..185ea82 100644 --- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServer.java +++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServer.java @@ -20,6 +20,7 @@ package com.twitter.distributedlog.service; import com.google.common.base.Optional; import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.twitter.distributedlog.DistributedLogConfiguration; +import com.twitter.distributedlog.client.routing.RoutingService; import com.twitter.distributedlog.config.DynamicConfigurationFactory; import com.twitter.distributedlog.config.DynamicDistributedLogConfiguration; import com.twitter.distributedlog.service.announcer.Announcer; @@ -73,6 +74,7 @@ public class DistributedLogServer { private DistributedLogServiceImpl dlService = null; private Server server = null; + private RoutingService routingService; private StatsProvider statsProvider; private Announcer announcer = null; private ScheduledExecutorService configExecutorService; @@ -97,6 +99,7 @@ public class DistributedLogServer { Optional<Integer> shardId, Optional<Boolean> announceServerSet, Optional<Boolean> thriftmux, + RoutingService routingService, StatsReceiver statsReceiver, StatsProvider statsProvider) { this.uri = uri; @@ -107,6 +110,7 @@ public class DistributedLogServer { this.shardId = shardId; this.announceServerSet = announceServerSet; this.thriftmux = thriftmux; + this.routingService = routingService; this.statsReceiver = statsReceiver; this.statsProvider = statsProvider; } @@ -183,6 +187,7 @@ public class DistributedLogServer { dynDlConf, dlUri, converter, + routingService, statsProvider, port.or(0), keepAliveLatch, @@ -195,6 +200,9 @@ public class DistributedLogServer { // announce the service announcer.announce(); + // start the routing service after announced + routingService.startService(); + logger.info("Started the routing service."); } protected void preRun(DistributedLogConfiguration conf, ServerConfiguration serverConf) { @@ -245,19 +253,22 @@ public class DistributedLogServer { DistributedLogConfiguration dlConf, URI dlUri, StreamPartitionConverter converter, + RoutingService routingService, StatsProvider provider, - int port) throws IOException { + int port, + boolean thriftmux) throws IOException { return runServer(serverConf, dlConf, ConfUtils.getConstDynConf(dlConf), dlUri, converter, + routingService, provider, port, new CountDownLatch(0), new NullStatsReceiver(), - false, + thriftmux, new NullStreamConfigProvider()); } @@ -267,6 +278,7 @@ public class DistributedLogServer { DynamicDistributedLogConfiguration dynDlConf, URI dlUri, StreamPartitionConverter partitionConverter, + RoutingService routingService, StatsProvider provider, int port, CountDownLatch keepAliveLatch, @@ -291,6 +303,7 @@ public class DistributedLogServer { streamConfProvider, dlUri, partitionConverter, + routingService, provider.getStatsLogger(""), perStreamStatsLogger, keepAliveLatch); @@ -358,6 +371,7 @@ public class DistributedLogServer { announcer.close(); } closeServer(Pair.of(dlService, server), gracefulShutdownMs, TimeUnit.MILLISECONDS); + routingService.stopService(); if (null != statsProvider) { statsProvider.stop(); } @@ -396,6 +410,7 @@ public class DistributedLogServer { Optional<Integer> shardId, Optional<Boolean> announceServerSet, Optional<Boolean> thriftmux, + RoutingService routingService, StatsReceiver statsReceiver, StatsProvider statsProvider) throws ConfigurationException, IllegalArgumentException, IOException { @@ -409,6 +424,7 @@ public class DistributedLogServer { shardId, announceServerSet, thriftmux, + routingService, statsReceiver, statsProvider); http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/16d73c35/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServerApp.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServerApp.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServerApp.java index a339261..af36307 100644 --- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServerApp.java +++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServerApp.java @@ -19,7 +19,11 @@ package com.twitter.distributedlog.service; import com.google.common.base.Function; import com.google.common.base.Optional; +import com.google.common.base.Preconditions; import com.twitter.distributedlog.DistributedLogConfiguration; +import com.twitter.distributedlog.client.routing.RoutingService; +import com.twitter.distributedlog.client.routing.RoutingUtils; +import com.twitter.distributedlog.client.serverset.DLZkServerSet; import com.twitter.finagle.stats.NullStatsReceiver; import com.twitter.finagle.stats.StatsReceiver; import org.apache.bookkeeper.stats.NullStatsProvider; @@ -38,7 +42,9 @@ import javax.annotation.Nullable; import java.io.File; import java.io.IOException; import java.net.MalformedURLException; +import java.net.URI; import java.util.Arrays; +import java.util.concurrent.TimeUnit; import static com.twitter.distributedlog.util.CommandLineUtils.*; @@ -119,8 +125,17 @@ public class DistributedLogServerApp { } }).or(new NullStatsProvider()); + final Optional<String> uriOption = getOptionalStringArg(cmdline, "u"); + Preconditions.checkArgument(uriOption.isPresent(), "No distributedlog uri provided."); + URI dlUri = URI.create(uriOption.get()); + + DLZkServerSet serverSet = DLZkServerSet.of(dlUri, (int) TimeUnit.SECONDS.toMillis(60)); + RoutingService routingService = RoutingUtils.buildRoutingService(serverSet.getServerSet()) + .statsReceiver(statsReceiver.scope("routing")) + .build(); + final DistributedLogServer server = DistributedLogServer.runServer( - getOptionalStringArg(cmdline, "u"), + uriOption, confOptional, getOptionalStringArg(cmdline, "sc"), getOptionalIntegerArg(cmdline, "p"), @@ -128,6 +143,7 @@ public class DistributedLogServerApp { getOptionalIntegerArg(cmdline, "si"), getOptionalBooleanArg(cmdline, "a"), getOptionalBooleanArg(cmdline, "mx"), + routingService, statsReceiver, statsProvider); http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/16d73c35/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServiceImpl.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServiceImpl.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServiceImpl.java index f529927..12820d9 100644 --- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServiceImpl.java +++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServiceImpl.java @@ -24,6 +24,9 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.twitter.distributedlog.DLSN; import com.twitter.distributedlog.DistributedLogConfiguration; import com.twitter.distributedlog.acl.AccessControlManager; +import com.twitter.distributedlog.client.resolver.RegionResolver; +import com.twitter.distributedlog.client.resolver.TwitterRegionResolver; +import com.twitter.distributedlog.client.routing.RoutingService; import com.twitter.distributedlog.config.DynamicDistributedLogConfiguration; import com.twitter.distributedlog.exceptions.RegionUnavailableException; import com.twitter.distributedlog.exceptions.ServiceUnavailableException; @@ -67,6 +70,7 @@ import com.twitter.distributedlog.rate.MovingAverageRate; import com.twitter.distributedlog.util.ConfUtils; import com.twitter.distributedlog.util.OrderedScheduler; import com.twitter.distributedlog.util.SchedulerUtils; +import com.twitter.finagle.NoBrokersAvailableException; import com.twitter.util.Await; import com.twitter.util.Duration; import com.twitter.util.Function0; @@ -85,6 +89,8 @@ import org.slf4j.LoggerFactory; import scala.runtime.BoxedUnit; import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.SocketAddress; import java.net.URI; import java.nio.ByteBuffer; import java.util.List; @@ -116,6 +122,8 @@ public class DistributedLogServiceImpl implements DistributedLogService.ServiceI private final StreamConfigProvider streamConfigProvider; private final StreamManager streamManager; private final StreamFactory streamFactory; + private final RoutingService routingService; + private final RegionResolver regionResolver; private final MovingAverageRateFactory movingAvgFactory; private final MovingAverageRate windowedRps; private final MovingAverageRate windowedBps; @@ -149,6 +157,7 @@ public class DistributedLogServiceImpl implements DistributedLogService.ServiceI StreamConfigProvider streamConfigProvider, URI uri, StreamPartitionConverter converter, + RoutingService routingService, StatsLogger statsLogger, StatsLogger perStreamStatsLogger, CountDownLatch keepAliveLatch) @@ -224,6 +233,8 @@ public class DistributedLogServiceImpl implements DistributedLogService.ServiceI converter, streamConfigProvider, dlNamespace); + this.routingService = routingService; + this.regionResolver = new TwitterRegionResolver(); // Service features this.featureRegionStopAcceptNewStream = this.featureProvider.getFeature( @@ -467,6 +478,53 @@ public class DistributedLogServiceImpl implements DistributedLogService.ServiceI return op.result(); } + // + // Ownership RPC + // + + @Override + public Future<WriteResponse> getOwner(String streamName, WriteContext ctx) { + if (streamManager.isAcquired(streamName)) { + // the stream is already acquired + return Future.value(new WriteResponse(ResponseUtils.ownerToHeader(clientId))); + } + + Stream stream = streamManager.getStream(streamName); + String owner; + if (null != stream && null != (owner = stream.getOwner())) { + return Future.value(new WriteResponse(ResponseUtils.ownerToHeader(owner))); + } + + RoutingService.RoutingContext routingContext = RoutingService.RoutingContext.of(regionResolver); + + if (ctx.isSetTriedHosts()) { + for (String triedHost : ctx.getTriedHosts()) { + routingContext.addTriedHost( + DLSocketAddress.parseSocketAddress(triedHost), StatusCode.STREAM_UNAVAILABLE); + } + } + + try { + SocketAddress host = routingService.getHost(streamName, routingContext); + if (host instanceof InetSocketAddress) { + // use shard id '-1' as the shard id here won't be used for redirection + return Future.value(new WriteResponse( + ResponseUtils.ownerToHeader(DLSocketAddress.toLockId((InetSocketAddress) host, -1)))); + } else { + return Future.value(new WriteResponse( + ResponseUtils.streamUnavailableHeader())); + } + } catch (NoBrokersAvailableException e) { + return Future.value(new WriteResponse( + ResponseUtils.streamUnavailableHeader())); + } + } + + + // + // Admin RPCs + // + @Override public Future<Void> setAcceptNewStream(boolean enabled) { closeLock.writeLock().lock(); @@ -656,6 +714,16 @@ public class DistributedLogServiceImpl implements DistributedLogService.ServiceI return newWriteOp(stream, data, checksum, false); } + @VisibleForTesting + RoutingService getRoutingService() { + return this.routingService; + } + + @VisibleForTesting + DLSocketAddress getServiceAddress() throws IOException { + return DLSocketAddress.deserialize(clientId); + } + WriteOp newWriteOp(String stream, ByteBuffer data, Long checksum, http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/16d73c35/distributedlog-service/src/main/java/com/twitter/distributedlog/service/ResponseUtils.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/ResponseUtils.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/ResponseUtils.java index 0bceec5..cee9dba 100644 --- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/ResponseUtils.java +++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/ResponseUtils.java @@ -32,6 +32,10 @@ public class ResponseUtils { return new ResponseHeader(StatusCode.REQUEST_DENIED); } + public static ResponseHeader streamUnavailableHeader() { + return new ResponseHeader(StatusCode.STREAM_UNAVAILABLE); + } + public static ResponseHeader successHeader() { return new ResponseHeader(StatusCode.SUCCESS); } http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/16d73c35/distributedlog-service/src/test/java/com/twitter/distributedlog/client/routing/LocalRoutingService.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/test/java/com/twitter/distributedlog/client/routing/LocalRoutingService.java b/distributedlog-service/src/test/java/com/twitter/distributedlog/client/routing/LocalRoutingService.java index 475755b..10941ba 100644 --- a/distributedlog-service/src/test/java/com/twitter/distributedlog/client/routing/LocalRoutingService.java +++ b/distributedlog-service/src/test/java/com/twitter/distributedlog/client/routing/LocalRoutingService.java @@ -92,7 +92,7 @@ public class LocalRoutingService implements RoutingService { return this; } - public void addHost(String stream, SocketAddress address) { + public LocalRoutingService addHost(String stream, SocketAddress address) { boolean notify = false; synchronized (this) { LinkedHashSet<SocketAddress> addresses = localAddresses.get(stream); @@ -109,6 +109,7 @@ public class LocalRoutingService implements RoutingService { listener.onServerJoin(address); } } + return this; } @Override http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/16d73c35/distributedlog-service/src/test/java/com/twitter/distributedlog/service/DistributedLogServerTestCase.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/test/java/com/twitter/distributedlog/service/DistributedLogServerTestCase.java b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/DistributedLogServerTestCase.java index 486a106..c37248c 100644 --- a/distributedlog-service/src/test/java/com/twitter/distributedlog/service/DistributedLogServerTestCase.java +++ b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/DistributedLogServerTestCase.java @@ -34,7 +34,6 @@ import org.junit.After; import org.junit.AfterClass; import org.junit.Before; import org.junit.BeforeClass; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -149,7 +148,7 @@ public abstract class DistributedLogServerTestCase { public void setupNoAdHocCluster() throws Exception { noAdHocCluster = createCluster(noAdHocConf); noAdHocCluster.start(); - noAdHocServer = new DLServer(noAdHocConf, noAdHocCluster.getUri(), 7002); + noAdHocServer = new DLServer(noAdHocConf, noAdHocCluster.getUri(), 7002, false); noAdHocClient = createDistributedLogClient("no-ad-hoc-client"); } @@ -193,12 +192,12 @@ public abstract class DistributedLogServerTestCase { } protected DLServer createDistributedLogServer(int port) throws Exception { - return new DLServer(conf, dlCluster.getUri(), port); + return new DLServer(conf, dlCluster.getUri(), port, false); } protected DLServer createDistributedLogServer(DistributedLogConfiguration conf, int port) throws Exception { - return new DLServer(conf, dlCluster.getUri(), port); + return new DLServer(conf, dlCluster.getUri(), port, false); } protected DLClient createDistributedLogClient(String clientName) throws Exception { http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/16d73c35/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestDistributedLogService.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestDistributedLogService.java b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestDistributedLogService.java index 4195ed3..d7a0ba6 100644 --- a/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestDistributedLogService.java +++ b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestDistributedLogService.java @@ -23,6 +23,7 @@ import com.twitter.distributedlog.DistributedLogConfiguration; import com.twitter.distributedlog.util.ProtocolUtils; import com.twitter.distributedlog.TestDistributedLogBase; import com.twitter.distributedlog.acl.DefaultAccessControlManager; +import com.twitter.distributedlog.client.routing.LocalRoutingService; import com.twitter.distributedlog.exceptions.OwnershipAcquireFailedException; import com.twitter.distributedlog.exceptions.StreamUnavailableException; import com.twitter.distributedlog.service.config.NullStreamConfigProvider; @@ -145,6 +146,7 @@ public class TestDistributedLogService extends TestDistributedLogBase { new NullStreamConfigProvider(), uri, converter, + new LocalRoutingService(), NullStatsLogger.INSTANCE, NullStatsLogger.INSTANCE, latch); @@ -769,4 +771,51 @@ public class TestDistributedLogService extends TestDistributedLogBase { streamManager.getAcquiredStreams().isEmpty()); } + @Test(timeout = 60000) + public void testGetOwner() throws Exception { + ((LocalRoutingService) service.getRoutingService()) + .addHost("stream-0", service.getServiceAddress().getSocketAddress()) + .setAllowRetrySameHost(false); + + // routing service doesn't know 'stream-1' + WriteResponse response = FutureUtils.result(service.getOwner("stream-1", new WriteContext())); + assertEquals(StatusCode.STREAM_UNAVAILABLE, response.getHeader().getCode()); + + // service cache "stream-2" but not acquire + StreamImpl stream = (StreamImpl) service.getStreamManager().getOrCreateStream("stream-2", false); + response = FutureUtils.result(service.getOwner("stream-2", new WriteContext())); + assertEquals(StatusCode.STREAM_UNAVAILABLE, response.getHeader().getCode()); + + // create write ops to stream-2 to make service acquire the stream + WriteOp op = createWriteOp(service, "stream-2", 0L); + stream.submit(op); + stream.start(); + WriteResponse wr = Await.result(op.result()); + assertEquals("Op should succeed", + StatusCode.SUCCESS, wr.getHeader().getCode()); + assertEquals("Service should acquire stream", + StreamStatus.INITIALIZED, stream.getStatus()); + assertNotNull(stream.getManager()); + assertNotNull(stream.getWriter()); + assertNull(stream.getLastException()); + + // the stream is acquired + response = FutureUtils.result(service.getOwner("stream-2", new WriteContext())); + assertEquals(StatusCode.FOUND, response.getHeader().getCode()); + assertEquals(service.getServiceAddress().toString(), + response.getHeader().getLocation()); + + // find the stream from the routing service + response = FutureUtils.result(service.getOwner("stream-0", new WriteContext())); + assertEquals(StatusCode.FOUND, response.getHeader().getCode()); + assertEquals(service.getServiceAddress().toString(), + response.getHeader().getLocation()); + + // add the tried host + WriteContext ctx = new WriteContext(); + ctx.addToTriedHosts(DLSocketAddress.toString(service.getServiceAddress().getSocketAddress())); + response = FutureUtils.result(service.getOwner("stream-0", ctx)); + assertEquals(StatusCode.STREAM_UNAVAILABLE, response.getHeader().getCode()); + } + }
