DL-102: Add routing service to write proxy server side

this change is to add getOwner rpc in write proxy. so we can change the client 
side to get owner from write proxy first for routing service. in this way, we 
can start experiementing any resource placement algorithms.


Project: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/commit/16d73c35
Tree: 
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/tree/16d73c35
Diff: 
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/diff/16d73c35

Branch: refs/heads/master
Commit: 16d73c35ee45b99594bc7c840dc028337b7de859
Parents: 9ee7d01
Author: Leigh Stewart <[email protected]>
Authored: Thu Jul 28 21:36:49 2016 -0700
Committer: Sijie Guo <[email protected]>
Committed: Tue Dec 27 16:49:27 2016 -0800

----------------------------------------------------------------------
 .../routing/SingleHostRoutingService.java       |  9 ++-
 .../proxy/MockDistributedLogServices.java       |  5 ++
 .../distributedlog/BKDistributedLogManager.java |  1 +
 .../src/main/thrift/service.thrift              |  7 +-
 .../service/DistributedLogCluster.java          | 40 ++++++++++--
 .../service/DistributedLogServer.java           | 20 +++++-
 .../service/DistributedLogServerApp.java        | 18 +++++-
 .../service/DistributedLogServiceImpl.java      | 68 ++++++++++++++++++++
 .../distributedlog/service/ResponseUtils.java   |  4 ++
 .../client/routing/LocalRoutingService.java     |  3 +-
 .../service/DistributedLogServerTestCase.java   |  7 +-
 .../service/TestDistributedLogService.java      | 49 ++++++++++++++
 12 files changed, 214 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/16d73c35/distributedlog-client/src/main/java/com/twitter/distributedlog/client/routing/SingleHostRoutingService.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-client/src/main/java/com/twitter/distributedlog/client/routing/SingleHostRoutingService.java
 
b/distributedlog-client/src/main/java/com/twitter/distributedlog/client/routing/SingleHostRoutingService.java
index 15356ff..e526868 100644
--- 
a/distributedlog-client/src/main/java/com/twitter/distributedlog/client/routing/SingleHostRoutingService.java
+++ 
b/distributedlog-client/src/main/java/com/twitter/distributedlog/client/routing/SingleHostRoutingService.java
@@ -29,9 +29,8 @@ import java.util.concurrent.CopyOnWriteArraySet;
 /**
  * Single Host Routing Service.
  */
-class SingleHostRoutingService implements RoutingService {
+public class SingleHostRoutingService implements RoutingService {
 
-    @Deprecated
     public static SingleHostRoutingService of(SocketAddress address) {
         return new SingleHostRoutingService(address);
     }
@@ -71,7 +70,7 @@ class SingleHostRoutingService implements RoutingService {
         }
     }
 
-    private final SocketAddress address;
+    private SocketAddress address;
     private final CopyOnWriteArraySet<RoutingListener> listeners =
             new CopyOnWriteArraySet<RoutingListener>();
 
@@ -79,6 +78,10 @@ class SingleHostRoutingService implements RoutingService {
         this.address = address;
     }
 
+    public void setAddress(SocketAddress address) {
+        this.address = address;
+    }
+
     @Override
     public Set<SocketAddress> getHosts() {
         return Sets.newHashSet(address);

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/16d73c35/distributedlog-client/src/test/java/com/twitter/distributedlog/client/proxy/MockDistributedLogServices.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-client/src/test/java/com/twitter/distributedlog/client/proxy/MockDistributedLogServices.java
 
b/distributedlog-client/src/test/java/com/twitter/distributedlog/client/proxy/MockDistributedLogServices.java
index 13ba044..f088c0d 100644
--- 
a/distributedlog-client/src/test/java/com/twitter/distributedlog/client/proxy/MockDistributedLogServices.java
+++ 
b/distributedlog-client/src/test/java/com/twitter/distributedlog/client/proxy/MockDistributedLogServices.java
@@ -105,6 +105,11 @@ public class MockDistributedLogServices {
         }
 
         @Override
+        public Future<WriteResponse> getOwner(String stream, WriteContext ctx) 
{
+            return Future.value(new WriteResponse());
+        }
+
+        @Override
         public Future<Void> setAcceptNewStream(boolean enabled) {
             return Future.value(null);
         }

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/16d73c35/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogManager.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogManager.java
 
b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogManager.java
index 75a5b83..ae8ae12 100644
--- 
a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogManager.java
+++ 
b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogManager.java
@@ -75,6 +75,7 @@ import org.apache.bookkeeper.stats.AlertStatsLogger;
 import org.apache.bookkeeper.feature.FeatureProvider;
 import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.bookkeeper.versioning.Versioned;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.ZKUtil;

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/16d73c35/distributedlog-protocol/src/main/thrift/service.thrift
----------------------------------------------------------------------
diff --git a/distributedlog-protocol/src/main/thrift/service.thrift 
b/distributedlog-protocol/src/main/thrift/service.thrift
index 4c0eaf1..a25af63 100644
--- a/distributedlog-protocol/src/main/thrift/service.thrift
+++ b/distributedlog-protocol/src/main/thrift/service.thrift
@@ -94,7 +94,7 @@ enum StatusCode {
     CHECKSUM_FAILED = 523,
     /* Overcapacity: too many streams */
     TOO_MANY_STREAMS = 524,
-    // Log Segment Not Found
+    /* Log Segment Not Found */
     LOG_SEGMENT_NOT_FOUND = 525,
 
     /* 6xx: unexpected */
@@ -167,14 +167,17 @@ struct ClientInfo {
 
 service DistributedLogService {
 
+    /* Deprecated */
     ServerInfo handshake();
 
     ServerInfo handshakeWithClientInfo(ClientInfo clientInfo);
 
+    /* Deprecated */
     WriteResponse heartbeat(string stream, WriteContext ctx);
 
     WriteResponse heartbeatWithOptions(string stream, WriteContext ctx, 
HeartbeatOptions options);
 
+    /* Deprecated */
     WriteResponse write(string stream, binary data);
 
     WriteResponse writeWithContext(string stream, binary data, WriteContext 
ctx);
@@ -189,6 +192,8 @@ service DistributedLogService {
 
     WriteResponse delete(string stream, WriteContext ctx);
 
+    WriteResponse getOwner(string stream, WriteContext ctx);
+
     /* Admin Methods */
     void setAcceptNewStream(bool enabled);
 }

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/16d73c35/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogCluster.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogCluster.java
 
b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogCluster.java
index 3e7948d..0ce335b 100644
--- 
a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogCluster.java
+++ 
b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogCluster.java
@@ -19,6 +19,7 @@ package com.twitter.distributedlog.service;
 
 import com.twitter.distributedlog.DistributedLogConfiguration;
 import com.twitter.distributedlog.LocalDLMEmulator;
+import com.twitter.distributedlog.client.routing.SingleHostRoutingService;
 import com.twitter.distributedlog.metadata.BKDLConfig;
 import com.twitter.distributedlog.metadata.DLMetadata;
 import 
com.twitter.distributedlog.service.streamset.IdentityStreamPartitionConverter;
@@ -63,6 +64,7 @@ public class DistributedLogCluster {
         int _zkPort = 0;
         boolean _shouldStartProxy = true;
         int _proxyPort = 7000;
+        boolean _thriftmux = false;
         DistributedLogConfiguration _dlConf = new DistributedLogConfiguration()
                 .setLockTimeout(10)
                 .setOutputBufferSize(0)
@@ -165,6 +167,17 @@ public class DistributedLogCluster {
             return this;
         }
 
+        /**
+         * Enable thriftmux for the dl server
+         *
+         * @param enabled flag to enable thriftmux
+         * @return builder
+         */
+        public Builder thriftmux(boolean enabled) {
+            this._thriftmux = enabled;
+            return this;
+        }
+
         public DistributedLogCluster build() throws Exception {
             // build the cluster
             return new DistributedLogCluster(
@@ -175,7 +188,8 @@ public class DistributedLogCluster {
                     _zkHost,
                     _zkPort,
                     _shouldStartProxy,
-                    _proxyPort);
+                    _proxyPort,
+                    _thriftmux);
         }
     }
 
@@ -189,8 +203,12 @@ public class DistributedLogCluster {
 
         public final InetSocketAddress address;
         public final Pair<DistributedLogServiceImpl, Server> dlServer;
+        private final SingleHostRoutingService routingService = 
SingleHostRoutingService.of(null);
 
-        protected DLServer(DistributedLogConfiguration dlConf, URI uri, int 
basePort) throws Exception {
+        protected DLServer(DistributedLogConfiguration dlConf,
+                           URI uri,
+                           int basePort,
+                           boolean thriftmux) throws Exception {
             proxyPort = basePort;
 
             boolean success = false;
@@ -207,8 +225,12 @@ public class DistributedLogCluster {
                             dlConf,
                             uri,
                             new IdentityStreamPartitionConverter(),
+                            routingService,
                             new NullStatsProvider(),
-                            proxyPort);
+                            proxyPort,
+                            thriftmux);
+                    
routingService.setAddress(DLSocketAddress.getSocketAddress(proxyPort));
+                    routingService.startService();
                     success = true;
                 } catch (BindException be) {
                     retries++;
@@ -234,6 +256,7 @@ public class DistributedLogCluster {
 
         public void shutdown() {
             DistributedLogServer.closeServer(dlServer, 0, 
TimeUnit.MILLISECONDS);
+            routingService.stopService();
         }
     }
 
@@ -243,6 +266,7 @@ public class DistributedLogCluster {
     private DLServer dlServer;
     private final boolean shouldStartProxy;
     private final int proxyPort;
+    private final boolean thriftmux;
     private final List<File> tmpDirs = new ArrayList<File>();
 
     private DistributedLogCluster(DistributedLogConfiguration dlConf,
@@ -252,7 +276,8 @@ public class DistributedLogCluster {
                                   String zkServers,
                                   int zkPort,
                                   boolean shouldStartProxy,
-                                  int proxyPort) throws Exception {
+                                  int proxyPort,
+                                  boolean thriftmux) throws Exception {
         this.dlConf = dlConf;
         if (shouldStartZK) {
             File zkTmpDir = IOUtils.createTempDir("zookeeper", "distrlog");
@@ -276,6 +301,7 @@ public class DistributedLogCluster {
                 .build();
         this.shouldStartProxy = shouldStartProxy;
         this.proxyPort = proxyPort;
+        this.thriftmux = thriftmux;
     }
 
     public void start() throws Exception {
@@ -283,7 +309,11 @@ public class DistributedLogCluster {
         BKDLConfig bkdlConfig = new 
BKDLConfig(this.dlmEmulator.getZkServers(), "/ledgers").setACLRootPath(".acl");
         DLMetadata.create(bkdlConfig).update(this.dlmEmulator.getUri());
         if (shouldStartProxy) {
-            this.dlServer = new DLServer(dlConf, this.dlmEmulator.getUri(), 
proxyPort);
+            this.dlServer = new DLServer(
+                    dlConf,
+                    this.dlmEmulator.getUri(),
+                    proxyPort,
+                    thriftmux);
         } else {
             this.dlServer = null;
         }

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/16d73c35/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServer.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServer.java
 
b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServer.java
index 6ef99b8..185ea82 100644
--- 
a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServer.java
+++ 
b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServer.java
@@ -20,6 +20,7 @@ package com.twitter.distributedlog.service;
 import com.google.common.base.Optional;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import com.twitter.distributedlog.DistributedLogConfiguration;
+import com.twitter.distributedlog.client.routing.RoutingService;
 import com.twitter.distributedlog.config.DynamicConfigurationFactory;
 import com.twitter.distributedlog.config.DynamicDistributedLogConfiguration;
 import com.twitter.distributedlog.service.announcer.Announcer;
@@ -73,6 +74,7 @@ public class DistributedLogServer {
 
     private DistributedLogServiceImpl dlService = null;
     private Server server = null;
+    private RoutingService routingService;
     private StatsProvider statsProvider;
     private Announcer announcer = null;
     private ScheduledExecutorService configExecutorService;
@@ -97,6 +99,7 @@ public class DistributedLogServer {
                          Optional<Integer> shardId,
                          Optional<Boolean> announceServerSet,
                          Optional<Boolean> thriftmux,
+                         RoutingService routingService,
                          StatsReceiver statsReceiver,
                          StatsProvider statsProvider) {
         this.uri = uri;
@@ -107,6 +110,7 @@ public class DistributedLogServer {
         this.shardId = shardId;
         this.announceServerSet = announceServerSet;
         this.thriftmux = thriftmux;
+        this.routingService = routingService;
         this.statsReceiver = statsReceiver;
         this.statsProvider = statsProvider;
     }
@@ -183,6 +187,7 @@ public class DistributedLogServer {
                 dynDlConf,
                 dlUri,
                 converter,
+                routingService,
                 statsProvider,
                 port.or(0),
                 keepAliveLatch,
@@ -195,6 +200,9 @@ public class DistributedLogServer {
 
         // announce the service
         announcer.announce();
+        // start the routing service after announced
+        routingService.startService();
+        logger.info("Started the routing service.");
     }
 
     protected void preRun(DistributedLogConfiguration conf, 
ServerConfiguration serverConf) {
@@ -245,19 +253,22 @@ public class DistributedLogServer {
             DistributedLogConfiguration dlConf,
             URI dlUri,
             StreamPartitionConverter converter,
+            RoutingService routingService,
             StatsProvider provider,
-            int port) throws IOException {
+            int port,
+            boolean thriftmux) throws IOException {
 
         return runServer(serverConf,
                 dlConf,
                 ConfUtils.getConstDynConf(dlConf),
                 dlUri,
                 converter,
+                routingService,
                 provider,
                 port,
                 new CountDownLatch(0),
                 new NullStatsReceiver(),
-                false,
+                thriftmux,
                 new NullStreamConfigProvider());
     }
 
@@ -267,6 +278,7 @@ public class DistributedLogServer {
             DynamicDistributedLogConfiguration dynDlConf,
             URI dlUri,
             StreamPartitionConverter partitionConverter,
+            RoutingService routingService,
             StatsProvider provider,
             int port,
             CountDownLatch keepAliveLatch,
@@ -291,6 +303,7 @@ public class DistributedLogServer {
                 streamConfProvider,
                 dlUri,
                 partitionConverter,
+                routingService,
                 provider.getStatsLogger(""),
                 perStreamStatsLogger,
                 keepAliveLatch);
@@ -358,6 +371,7 @@ public class DistributedLogServer {
             announcer.close();
         }
         closeServer(Pair.of(dlService, server), gracefulShutdownMs, 
TimeUnit.MILLISECONDS);
+        routingService.stopService();
         if (null != statsProvider) {
             statsProvider.stop();
         }
@@ -396,6 +410,7 @@ public class DistributedLogServer {
                Optional<Integer> shardId,
                Optional<Boolean> announceServerSet,
                Optional<Boolean> thriftmux,
+               RoutingService routingService,
                StatsReceiver statsReceiver,
                StatsProvider statsProvider)
             throws ConfigurationException, IllegalArgumentException, 
IOException {
@@ -409,6 +424,7 @@ public class DistributedLogServer {
                 shardId,
                 announceServerSet,
                 thriftmux,
+                routingService,
                 statsReceiver,
                 statsProvider);
 

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/16d73c35/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServerApp.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServerApp.java
 
b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServerApp.java
index a339261..af36307 100644
--- 
a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServerApp.java
+++ 
b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServerApp.java
@@ -19,7 +19,11 @@ package com.twitter.distributedlog.service;
 
 import com.google.common.base.Function;
 import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
 import com.twitter.distributedlog.DistributedLogConfiguration;
+import com.twitter.distributedlog.client.routing.RoutingService;
+import com.twitter.distributedlog.client.routing.RoutingUtils;
+import com.twitter.distributedlog.client.serverset.DLZkServerSet;
 import com.twitter.finagle.stats.NullStatsReceiver;
 import com.twitter.finagle.stats.StatsReceiver;
 import org.apache.bookkeeper.stats.NullStatsProvider;
@@ -38,7 +42,9 @@ import javax.annotation.Nullable;
 import java.io.File;
 import java.io.IOException;
 import java.net.MalformedURLException;
+import java.net.URI;
 import java.util.Arrays;
+import java.util.concurrent.TimeUnit;
 
 import static com.twitter.distributedlog.util.CommandLineUtils.*;
 
@@ -119,8 +125,17 @@ public class DistributedLogServerApp {
                     }
                 }).or(new NullStatsProvider());
 
+        final Optional<String> uriOption = getOptionalStringArg(cmdline, "u");
+        Preconditions.checkArgument(uriOption.isPresent(), "No distributedlog 
uri provided.");
+        URI dlUri = URI.create(uriOption.get());
+
+        DLZkServerSet serverSet = DLZkServerSet.of(dlUri, (int) 
TimeUnit.SECONDS.toMillis(60));
+        RoutingService routingService = 
RoutingUtils.buildRoutingService(serverSet.getServerSet())
+                .statsReceiver(statsReceiver.scope("routing"))
+                .build();
+
         final DistributedLogServer server = DistributedLogServer.runServer(
-                getOptionalStringArg(cmdline, "u"),
+                uriOption,
                 confOptional,
                 getOptionalStringArg(cmdline, "sc"),
                 getOptionalIntegerArg(cmdline, "p"),
@@ -128,6 +143,7 @@ public class DistributedLogServerApp {
                 getOptionalIntegerArg(cmdline, "si"),
                 getOptionalBooleanArg(cmdline, "a"),
                 getOptionalBooleanArg(cmdline, "mx"),
+                routingService,
                 statsReceiver,
                 statsProvider);
 

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/16d73c35/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServiceImpl.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServiceImpl.java
 
b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServiceImpl.java
index f529927..12820d9 100644
--- 
a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServiceImpl.java
+++ 
b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServiceImpl.java
@@ -24,6 +24,9 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import com.twitter.distributedlog.DLSN;
 import com.twitter.distributedlog.DistributedLogConfiguration;
 import com.twitter.distributedlog.acl.AccessControlManager;
+import com.twitter.distributedlog.client.resolver.RegionResolver;
+import com.twitter.distributedlog.client.resolver.TwitterRegionResolver;
+import com.twitter.distributedlog.client.routing.RoutingService;
 import com.twitter.distributedlog.config.DynamicDistributedLogConfiguration;
 import com.twitter.distributedlog.exceptions.RegionUnavailableException;
 import com.twitter.distributedlog.exceptions.ServiceUnavailableException;
@@ -67,6 +70,7 @@ import com.twitter.distributedlog.rate.MovingAverageRate;
 import com.twitter.distributedlog.util.ConfUtils;
 import com.twitter.distributedlog.util.OrderedScheduler;
 import com.twitter.distributedlog.util.SchedulerUtils;
+import com.twitter.finagle.NoBrokersAvailableException;
 import com.twitter.util.Await;
 import com.twitter.util.Duration;
 import com.twitter.util.Function0;
@@ -85,6 +89,8 @@ import org.slf4j.LoggerFactory;
 import scala.runtime.BoxedUnit;
 
 import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
 import java.net.URI;
 import java.nio.ByteBuffer;
 import java.util.List;
@@ -116,6 +122,8 @@ public class DistributedLogServiceImpl implements 
DistributedLogService.ServiceI
     private final StreamConfigProvider streamConfigProvider;
     private final StreamManager streamManager;
     private final StreamFactory streamFactory;
+    private final RoutingService routingService;
+    private final RegionResolver regionResolver;
     private final MovingAverageRateFactory movingAvgFactory;
     private final MovingAverageRate windowedRps;
     private final MovingAverageRate windowedBps;
@@ -149,6 +157,7 @@ public class DistributedLogServiceImpl implements 
DistributedLogService.ServiceI
                               StreamConfigProvider streamConfigProvider,
                               URI uri,
                               StreamPartitionConverter converter,
+                              RoutingService routingService,
                               StatsLogger statsLogger,
                               StatsLogger perStreamStatsLogger,
                               CountDownLatch keepAliveLatch)
@@ -224,6 +233,8 @@ public class DistributedLogServiceImpl implements 
DistributedLogService.ServiceI
                 converter,
                 streamConfigProvider,
                 dlNamespace);
+        this.routingService = routingService;
+        this.regionResolver = new TwitterRegionResolver();
 
         // Service features
         this.featureRegionStopAcceptNewStream = 
this.featureProvider.getFeature(
@@ -467,6 +478,53 @@ public class DistributedLogServiceImpl implements 
DistributedLogService.ServiceI
         return op.result();
     }
 
+    //
+    // Ownership RPC
+    //
+
+    @Override
+    public Future<WriteResponse> getOwner(String streamName, WriteContext ctx) 
{
+        if (streamManager.isAcquired(streamName)) {
+            // the stream is already acquired
+            return Future.value(new 
WriteResponse(ResponseUtils.ownerToHeader(clientId)));
+        }
+
+        Stream stream = streamManager.getStream(streamName);
+        String owner;
+        if (null != stream && null != (owner = stream.getOwner())) {
+            return Future.value(new 
WriteResponse(ResponseUtils.ownerToHeader(owner)));
+        }
+
+        RoutingService.RoutingContext routingContext = 
RoutingService.RoutingContext.of(regionResolver);
+
+        if (ctx.isSetTriedHosts()) {
+            for (String triedHost : ctx.getTriedHosts()) {
+                routingContext.addTriedHost(
+                        DLSocketAddress.parseSocketAddress(triedHost), 
StatusCode.STREAM_UNAVAILABLE);
+            }
+        }
+
+        try {
+            SocketAddress host = routingService.getHost(streamName, 
routingContext);
+            if (host instanceof InetSocketAddress) {
+                // use shard id '-1' as the shard id here won't be used for 
redirection
+                return Future.value(new WriteResponse(
+                        
ResponseUtils.ownerToHeader(DLSocketAddress.toLockId((InetSocketAddress) host, 
-1))));
+            } else {
+                return Future.value(new WriteResponse(
+                        ResponseUtils.streamUnavailableHeader()));
+            }
+        } catch (NoBrokersAvailableException e) {
+            return Future.value(new WriteResponse(
+                    ResponseUtils.streamUnavailableHeader()));
+        }
+    }
+
+
+    //
+    // Admin RPCs
+    //
+
     @Override
     public Future<Void> setAcceptNewStream(boolean enabled) {
         closeLock.writeLock().lock();
@@ -656,6 +714,16 @@ public class DistributedLogServiceImpl implements 
DistributedLogService.ServiceI
         return newWriteOp(stream, data, checksum, false);
     }
 
+    @VisibleForTesting
+    RoutingService getRoutingService() {
+        return this.routingService;
+    }
+
+    @VisibleForTesting
+    DLSocketAddress getServiceAddress() throws IOException {
+        return DLSocketAddress.deserialize(clientId);
+    }
+
     WriteOp newWriteOp(String stream,
                        ByteBuffer data,
                        Long checksum,

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/16d73c35/distributedlog-service/src/main/java/com/twitter/distributedlog/service/ResponseUtils.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/ResponseUtils.java
 
b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/ResponseUtils.java
index 0bceec5..cee9dba 100644
--- 
a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/ResponseUtils.java
+++ 
b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/ResponseUtils.java
@@ -32,6 +32,10 @@ public class ResponseUtils {
         return new ResponseHeader(StatusCode.REQUEST_DENIED);
     }
 
+    public static ResponseHeader streamUnavailableHeader() {
+        return new ResponseHeader(StatusCode.STREAM_UNAVAILABLE);
+    }
+
     public static ResponseHeader successHeader() {
         return new ResponseHeader(StatusCode.SUCCESS);
     }

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/16d73c35/distributedlog-service/src/test/java/com/twitter/distributedlog/client/routing/LocalRoutingService.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-service/src/test/java/com/twitter/distributedlog/client/routing/LocalRoutingService.java
 
b/distributedlog-service/src/test/java/com/twitter/distributedlog/client/routing/LocalRoutingService.java
index 475755b..10941ba 100644
--- 
a/distributedlog-service/src/test/java/com/twitter/distributedlog/client/routing/LocalRoutingService.java
+++ 
b/distributedlog-service/src/test/java/com/twitter/distributedlog/client/routing/LocalRoutingService.java
@@ -92,7 +92,7 @@ public class LocalRoutingService implements RoutingService {
         return this;
     }
 
-    public void addHost(String stream, SocketAddress address) {
+    public LocalRoutingService addHost(String stream, SocketAddress address) {
         boolean notify = false;
         synchronized (this) {
             LinkedHashSet<SocketAddress> addresses = 
localAddresses.get(stream);
@@ -109,6 +109,7 @@ public class LocalRoutingService implements RoutingService {
                 listener.onServerJoin(address);
             }
         }
+        return this;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/16d73c35/distributedlog-service/src/test/java/com/twitter/distributedlog/service/DistributedLogServerTestCase.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-service/src/test/java/com/twitter/distributedlog/service/DistributedLogServerTestCase.java
 
b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/DistributedLogServerTestCase.java
index 486a106..c37248c 100644
--- 
a/distributedlog-service/src/test/java/com/twitter/distributedlog/service/DistributedLogServerTestCase.java
+++ 
b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/DistributedLogServerTestCase.java
@@ -34,7 +34,6 @@ import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.BeforeClass;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -149,7 +148,7 @@ public abstract class DistributedLogServerTestCase {
     public void setupNoAdHocCluster() throws Exception {
         noAdHocCluster = createCluster(noAdHocConf);
         noAdHocCluster.start();
-        noAdHocServer = new DLServer(noAdHocConf, noAdHocCluster.getUri(), 
7002);
+        noAdHocServer = new DLServer(noAdHocConf, noAdHocCluster.getUri(), 
7002, false);
         noAdHocClient = createDistributedLogClient("no-ad-hoc-client");
     }
 
@@ -193,12 +192,12 @@ public abstract class DistributedLogServerTestCase {
     }
 
     protected DLServer createDistributedLogServer(int port) throws Exception {
-        return new DLServer(conf, dlCluster.getUri(), port);
+        return new DLServer(conf, dlCluster.getUri(), port, false);
     }
 
     protected DLServer createDistributedLogServer(DistributedLogConfiguration 
conf, int port)
             throws Exception {
-        return new DLServer(conf, dlCluster.getUri(), port);
+        return new DLServer(conf, dlCluster.getUri(), port, false);
     }
 
     protected DLClient createDistributedLogClient(String clientName) throws 
Exception {

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/16d73c35/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestDistributedLogService.java
----------------------------------------------------------------------
diff --git 
a/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestDistributedLogService.java
 
b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestDistributedLogService.java
index 4195ed3..d7a0ba6 100644
--- 
a/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestDistributedLogService.java
+++ 
b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestDistributedLogService.java
@@ -23,6 +23,7 @@ import com.twitter.distributedlog.DistributedLogConfiguration;
 import com.twitter.distributedlog.util.ProtocolUtils;
 import com.twitter.distributedlog.TestDistributedLogBase;
 import com.twitter.distributedlog.acl.DefaultAccessControlManager;
+import com.twitter.distributedlog.client.routing.LocalRoutingService;
 import com.twitter.distributedlog.exceptions.OwnershipAcquireFailedException;
 import com.twitter.distributedlog.exceptions.StreamUnavailableException;
 import com.twitter.distributedlog.service.config.NullStreamConfigProvider;
@@ -145,6 +146,7 @@ public class TestDistributedLogService extends 
TestDistributedLogBase {
                 new NullStreamConfigProvider(),
                 uri,
                 converter,
+                new LocalRoutingService(),
                 NullStatsLogger.INSTANCE,
                 NullStatsLogger.INSTANCE,
                 latch);
@@ -769,4 +771,51 @@ public class TestDistributedLogService extends 
TestDistributedLogBase {
                 streamManager.getAcquiredStreams().isEmpty());
     }
 
+    @Test(timeout = 60000)
+    public void testGetOwner() throws Exception {
+        ((LocalRoutingService) service.getRoutingService())
+                .addHost("stream-0", 
service.getServiceAddress().getSocketAddress())
+                .setAllowRetrySameHost(false);
+
+        // routing service doesn't know 'stream-1'
+        WriteResponse response = 
FutureUtils.result(service.getOwner("stream-1", new WriteContext()));
+        assertEquals(StatusCode.STREAM_UNAVAILABLE, 
response.getHeader().getCode());
+
+        // service cache "stream-2" but not acquire
+        StreamImpl stream = (StreamImpl) 
service.getStreamManager().getOrCreateStream("stream-2", false);
+        response = FutureUtils.result(service.getOwner("stream-2", new 
WriteContext()));
+        assertEquals(StatusCode.STREAM_UNAVAILABLE, 
response.getHeader().getCode());
+
+        // create write ops to stream-2 to make service acquire the stream
+        WriteOp op = createWriteOp(service, "stream-2", 0L);
+        stream.submit(op);
+        stream.start();
+        WriteResponse wr = Await.result(op.result());
+        assertEquals("Op  should succeed",
+                StatusCode.SUCCESS, wr.getHeader().getCode());
+        assertEquals("Service should acquire stream",
+                StreamStatus.INITIALIZED, stream.getStatus());
+        assertNotNull(stream.getManager());
+        assertNotNull(stream.getWriter());
+        assertNull(stream.getLastException());
+
+        // the stream is acquired
+        response = FutureUtils.result(service.getOwner("stream-2", new 
WriteContext()));
+        assertEquals(StatusCode.FOUND, response.getHeader().getCode());
+        assertEquals(service.getServiceAddress().toString(),
+                response.getHeader().getLocation());
+
+        // find the stream from the routing service
+        response = FutureUtils.result(service.getOwner("stream-0", new 
WriteContext()));
+        assertEquals(StatusCode.FOUND, response.getHeader().getCode());
+        assertEquals(service.getServiceAddress().toString(),
+                response.getHeader().getLocation());
+
+        // add the tried host
+        WriteContext ctx = new WriteContext();
+        
ctx.addToTriedHosts(DLSocketAddress.toString(service.getServiceAddress().getSocketAddress()));
+        response = FutureUtils.result(service.getOwner("stream-0", ctx));
+        assertEquals(StatusCode.STREAM_UNAVAILABLE, 
response.getHeader().getCode());
+    }
+
 }

Reply via email to