DL-103: DL client can use server-side routing service to retrieve ownerships for streams
If the dl client configures a routing service finagle name, the client will use this to retrieve ownerships for streams. Project: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/commit/ddbd7716 Tree: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/tree/ddbd7716 Diff: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/diff/ddbd7716 Branch: refs/heads/master Commit: ddbd7716a895f060a6764624f65ffa45420167b8 Parents: 16d73c3 Author: Sijie Guo <[email protected]> Authored: Thu Jul 28 22:02:09 2016 -0700 Committer: Sijie Guo <[email protected]> Committed: Tue Dec 27 16:49:27 2016 -0800 ---------------------------------------------------------------------- .../client/DistributedLogClientImpl.java | 115 ++- .../client/proxy/ClusterClient.java | 51 ++ .../service/DistributedLogClientBuilder.java | 85 ++- .../service/DistributedLogServiceImpl.java | 4 +- .../service/DistributedLogServerTestCase.java | 46 +- .../service/TestDistributedLogServer.java | 730 ------------------- .../service/TestDistributedLogServerBase.java | 714 ++++++++++++++++++ .../TestDistributedLogServerClientRouting.java | 59 ++ .../TestDistributedLogServerServerRouting.java | 28 + .../service/TestRegionUnavailable.java | 1 + .../service/balancer/TestClusterBalancer.java | 3 +- .../service/balancer/TestSimpleBalancer.java | 6 +- .../service/balancer/TestStreamMover.java | 7 +- 13 files changed, 1085 insertions(+), 764 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/ddbd7716/distributedlog-client/src/main/java/com/twitter/distributedlog/client/DistributedLogClientImpl.java ---------------------------------------------------------------------- diff --git a/distributedlog-client/src/main/java/com/twitter/distributedlog/client/DistributedLogClientImpl.java b/distributedlog-client/src/main/java/com/twitter/distributedlog/client/DistributedLogClientImpl.java index d3edb74..5125f28 100644 --- a/distributedlog-client/src/main/java/com/twitter/distributedlog/client/DistributedLogClientImpl.java +++ b/distributedlog-client/src/main/java/com/twitter/distributedlog/client/DistributedLogClientImpl.java @@ -19,13 +19,14 @@ package com.twitter.distributedlog.client; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Optional; +import com.google.common.base.Preconditions; import com.google.common.base.Stopwatch; import com.google.common.collect.Sets; import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.twitter.distributedlog.DLSN; import com.twitter.distributedlog.LogRecordSetBuffer; -import com.twitter.distributedlog.client.monitor.MonitorServiceClient; import com.twitter.distributedlog.client.ownership.OwnershipCache; +import com.twitter.distributedlog.client.proxy.ClusterClient; import com.twitter.distributedlog.client.proxy.HostProvider; import com.twitter.distributedlog.client.proxy.ProxyClient; import com.twitter.distributedlog.client.proxy.ProxyClientManager; @@ -38,6 +39,7 @@ import com.twitter.distributedlog.client.stats.OpStats; import com.twitter.distributedlog.exceptions.DLClientClosedException; import com.twitter.distributedlog.exceptions.DLException; import com.twitter.distributedlog.exceptions.ServiceUnavailableException; +import com.twitter.distributedlog.exceptions.StreamUnavailableException; import com.twitter.distributedlog.service.DLSocketAddress; import com.twitter.distributedlog.service.DistributedLogClient; import com.twitter.distributedlog.thrift.service.BulkWriteResponse; @@ -66,6 +68,8 @@ import com.twitter.util.Function0; import com.twitter.util.Future; import com.twitter.util.FutureEventListener; import com.twitter.util.Promise; +import com.twitter.util.Return; +import com.twitter.util.Throw; import java.io.IOException; import java.net.SocketAddress; import java.nio.ByteBuffer; @@ -118,6 +122,8 @@ public class DistributedLogClientImpl implements DistributedLogClient, MonitorSe private final OwnershipCache ownershipCache; // Channel/Client management private final ProxyClientManager clientManager; + // Cluster Client (for routing service) + private final Optional<ClusterClient> clusterClient; // Close Status private boolean closed = false; @@ -140,6 +146,16 @@ public class DistributedLogClientImpl implements DistributedLogClient, MonitorSe this.opStats = opStats; } + boolean shouldTimeout() { + long elapsedMs = stopwatch.elapsed(TimeUnit.MILLISECONDS); + return shouldTimeout(elapsedMs); + } + + boolean shouldTimeout(long elapsedMs) { + return clientConfig.getRequestTimeoutMs() > 0 + && elapsedMs >= clientConfig.getRequestTimeoutMs(); + } + void send(SocketAddress address) { long elapsedMs = stopwatch.elapsed(TimeUnit.MILLISECONDS); if (clientConfig.getMaxRedirects() > 0 @@ -147,8 +163,7 @@ public class DistributedLogClientImpl implements DistributedLogClient, MonitorSe fail(address, new RequestTimeoutException(Duration.fromMilliseconds(elapsedMs), "Exhausted max redirects in " + elapsedMs + " ms")); return; - } else if (clientConfig.getRequestTimeoutMs() > 0 - && elapsedMs >= clientConfig.getRequestTimeoutMs()) { + } else if (shouldTimeout(elapsedMs)) { fail(address, new RequestTimeoutException(Duration.fromMilliseconds(elapsedMs), "Exhausted max request timeout " + clientConfig.getRequestTimeoutMs() + " in " + elapsedMs + " ms")); @@ -225,6 +240,7 @@ public class DistributedLogClientImpl implements DistributedLogClient, MonitorSe // to go so large for other reasons though. this.results = new ArrayList<Promise<DLSN>>(data.size()); for (int i = 0; i < data.size(); i++) { + Preconditions.checkNotNull(data.get(i)); this.results.add(new Promise<DLSN>()); } } @@ -549,6 +565,7 @@ public class DistributedLogClientImpl implements DistributedLogClient, MonitorSe RoutingService routingService, ClientBuilder clientBuilder, ClientConfig clientConfig, + Optional<ClusterClient> clusterClient, StatsReceiver statsReceiver, StatsReceiver streamStatsReceiver, RegionResolver regionResolver, @@ -579,6 +596,7 @@ public class DistributedLogClientImpl implements DistributedLogClient, MonitorSe this.dlTimer, // timer this, // host provider clientStats); // client stats + this.clusterClient = clusterClient; this.clientManager.registerProxyListener(this); // Cache Stats @@ -614,8 +632,10 @@ public class DistributedLogClientImpl implements DistributedLogClient, MonitorSe @Override public Set<SocketAddress> getHosts() { Set<SocketAddress> hosts = Sets.newHashSet(); - // use both routing service and ownership cache for the handshaking source - hosts.addAll(this.routingService.getHosts()); + // if using server side routing, we only handshake with the hosts in ownership cache. + if (!clusterClient.isPresent()) { + hosts.addAll(this.routingService.getHosts()); + } hosts.addAll(this.ownershipCache.getStreamOwnershipDistribution().keySet()); return hosts; } @@ -675,7 +695,12 @@ public class DistributedLogClientImpl implements DistributedLogClient, MonitorSe @Override public void onServerJoin(SocketAddress address) { - clientManager.createClient(address); + // we only pre-create connection for client-side routing + // if it is server side routing, we only know the exact proxy address + // when #getOwner. + if (!clusterClient.isPresent()) { + clientManager.createClient(address); + } } public void close() { @@ -807,17 +832,77 @@ public class DistributedLogClientImpl implements DistributedLogClient, MonitorSe op.routingContext.addTriedHost(previousAddr, StatusCode.WRITE_EXCEPTION); } // Get host first - SocketAddress address = ownershipCache.getOwner(op.stream); + final SocketAddress address = ownershipCache.getOwner(op.stream); if (null == address || op.routingContext.isTriedHost(address)) { - // pickup host by hashing - try { - address = routingService.getHost(op.stream, op.routingContext); - } catch (NoBrokersAvailableException nbae) { - op.fail(null, nbae); - return; - } + getOwner(op).addEventListener(new FutureEventListener<SocketAddress>() { + @Override + public void onFailure(Throwable cause) { + op.fail(null, cause); + } + + @Override + public void onSuccess(SocketAddress ownerAddr) { + op.send(ownerAddr); + } + }); + } else { + op.send(address); + } + } + + private void retryGetOwnerFromRoutingServer(final StreamOp op, + final Promise<SocketAddress> getOwnerPromise, + final Throwable cause) { + if (op.shouldTimeout()) { + op.fail(null, cause); + return; + } + getOwnerFromRoutingServer(op, getOwnerPromise); + } + + private void getOwnerFromRoutingServer(final StreamOp op, + final Promise<SocketAddress> getOwnerPromise) { + clusterClient.get().getService().getOwner(op.stream, op.ctx) + .addEventListener(new FutureEventListener<WriteResponse>() { + @Override + public void onFailure(Throwable cause) { + getOwnerPromise.updateIfEmpty(new Throw<SocketAddress>(cause)); + } + + @Override + public void onSuccess(WriteResponse value) { + if (StatusCode.FOUND == value.getHeader().getCode() && + null != value.getHeader().getLocation()) { + SocketAddress addr; + try { + addr = DLSocketAddress.deserialize(value.getHeader().getLocation()).getSocketAddress(); + } catch (IOException e) { + // retry from the routing server again + retryGetOwnerFromRoutingServer(op, getOwnerPromise, e); + return; + } + getOwnerPromise.updateIfEmpty(new Return<SocketAddress>(addr)); + } else { + // retry from the routing server again + retryGetOwnerFromRoutingServer(op, getOwnerPromise, + new StreamUnavailableException("Stream " + op.stream + "'s owner is unknown")); + } + } + }); + } + + private Future<SocketAddress> getOwner(final StreamOp op) { + if (clusterClient.isPresent()) { + final Promise<SocketAddress> getOwnerPromise = new Promise<SocketAddress>(); + getOwnerFromRoutingServer(op, getOwnerPromise); + return getOwnerPromise; + } + // pickup host by hashing + try { + return Future.value(routingService.getHost(op.stream, op.routingContext)); + } catch (NoBrokersAvailableException nbae) { + return Future.exception(nbae); } - op.send(address); } private void sendWriteRequest(final SocketAddress addr, final StreamOp op) { http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/ddbd7716/distributedlog-client/src/main/java/com/twitter/distributedlog/client/proxy/ClusterClient.java ---------------------------------------------------------------------- diff --git a/distributedlog-client/src/main/java/com/twitter/distributedlog/client/proxy/ClusterClient.java b/distributedlog-client/src/main/java/com/twitter/distributedlog/client/proxy/ClusterClient.java new file mode 100644 index 0000000..bb95a97 --- /dev/null +++ b/distributedlog-client/src/main/java/com/twitter/distributedlog/client/proxy/ClusterClient.java @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.twitter.distributedlog.client.proxy; + +import com.twitter.distributedlog.thrift.service.DistributedLogService; +import com.twitter.finagle.Service; +import com.twitter.finagle.thrift.ThriftClientRequest; +import com.twitter.util.Future; +import scala.runtime.BoxedUnit; + +/** + * Cluster client + */ +public class ClusterClient { + + private final Service<ThriftClientRequest, byte[]> client; + private final DistributedLogService.ServiceIface service; + + public ClusterClient(Service<ThriftClientRequest, byte[]> client, + DistributedLogService.ServiceIface service) { + this.client = client; + this.service = service; + } + + public Service<ThriftClientRequest, byte[]> getClient() { + return client; + } + + public DistributedLogService.ServiceIface getService() { + return service; + } + + public Future<BoxedUnit> close() { + return client.close(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/ddbd7716/distributedlog-client/src/main/java/com/twitter/distributedlog/service/DistributedLogClientBuilder.java ---------------------------------------------------------------------- diff --git a/distributedlog-client/src/main/java/com/twitter/distributedlog/service/DistributedLogClientBuilder.java b/distributedlog-client/src/main/java/com/twitter/distributedlog/service/DistributedLogClientBuilder.java index 48a229b..ec9a7c6 100644 --- a/distributedlog-client/src/main/java/com/twitter/distributedlog/service/DistributedLogClientBuilder.java +++ b/distributedlog-client/src/main/java/com/twitter/distributedlog/service/DistributedLogClientBuilder.java @@ -20,35 +20,52 @@ package com.twitter.distributedlog.service; import static com.google.common.base.Preconditions.checkNotNull; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Optional; import com.twitter.common.zookeeper.ServerSet; import com.twitter.distributedlog.client.ClientConfig; import com.twitter.distributedlog.client.DistributedLogClientImpl; import com.twitter.distributedlog.client.monitor.MonitorServiceClient; +import com.twitter.distributedlog.client.proxy.ClusterClient; import com.twitter.distributedlog.client.resolver.DefaultRegionResolver; import com.twitter.distributedlog.client.resolver.RegionResolver; import com.twitter.distributedlog.client.routing.RegionsRoutingService; import com.twitter.distributedlog.client.routing.RoutingService; import com.twitter.distributedlog.client.routing.RoutingUtils; +import com.twitter.distributedlog.thrift.service.DistributedLogService; +import com.twitter.finagle.Name; +import com.twitter.finagle.Resolver$; +import com.twitter.finagle.Service; +import com.twitter.finagle.ThriftMux; import com.twitter.finagle.builder.ClientBuilder; import com.twitter.finagle.stats.NullStatsReceiver; import com.twitter.finagle.stats.StatsReceiver; import com.twitter.finagle.thrift.ClientId; +import com.twitter.finagle.thrift.ThriftClientFramedCodec; +import com.twitter.finagle.thrift.ThriftClientRequest; +import com.twitter.util.Duration; import java.net.SocketAddress; import java.net.URI; import java.util.Random; import org.apache.commons.lang.StringUtils; +import org.apache.thrift.protocol.TBinaryProtocol; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.Option; /** * Builder to build {@link DistributedLogClient}. */ public final class DistributedLogClientBuilder { + private static final Logger logger = LoggerFactory.getLogger(DistributedLogClientBuilder.class); + private static final Random random = new Random(System.currentTimeMillis()); private String name = null; private ClientId clientId = null; private RoutingService.Builder routingServiceBuilder = null; private ClientBuilder clientBuilder = null; + private String serverRoutingServiceFinagleName = null; private StatsReceiver statsReceiver = new NullStatsReceiver(); private StatsReceiver streamStatsReceiver = new NullStatsReceiver(); private ClientConfig clientConfig = new ClientConfig(); @@ -478,6 +495,18 @@ public final class DistributedLogClientBuilder { return newBuilder; } + /** + * Configure the finagle name string for the server-side routing service. + * + * @param nameStr name string of the server-side routing service + * @return client builder + */ + public DistributedLogClientBuilder serverRoutingServiceFinagleNameStr(String nameStr) { + DistributedLogClientBuilder newBuilder = newBuilder(this); + newBuilder.serverRoutingServiceFinagleName = nameStr; + return newBuilder; + } + DistributedLogClientBuilder clientConfig(ClientConfig clientConfig) { DistributedLogClientBuilder newBuilder = newBuilder(this); newBuilder.clientConfig = ClientConfig.newConfig(clientConfig); @@ -499,9 +528,47 @@ public final class DistributedLogClientBuilder { * @return monitor service client. */ public MonitorServiceClient buildMonitorClient() { + return buildClient(); } + @SuppressWarnings("unchecked") + ClusterClient buildServerRoutingServiceClient(String serverRoutingServiceFinagleName) { + ClientBuilder builder = _clientBuilder; + if (null == builder) { + builder = ClientBuilder.get() + .tcpConnectTimeout(Duration.fromMilliseconds(200)) + .connectTimeout(Duration.fromMilliseconds(200)) + .requestTimeout(Duration.fromSeconds(1)) + .retries(20); + if (!_clientConfig.getThriftMux()) { + builder = builder.hostConnectionLimit(1); + } + } + if (_clientConfig.getThriftMux()) { + builder = builder.stack(ThriftMux.client().withClientId(_clientId)); + } else { + builder = builder.codec(ThriftClientFramedCodec.apply(Option.apply(_clientId))); + } + + Name name; + try { + name = Resolver$.MODULE$.eval(serverRoutingServiceFinagleName); + } catch (Exception exc) { + logger.error("Exception in Resolver.eval for name {}", serverRoutingServiceFinagleName, exc); + throw new RuntimeException(exc); + } + + // builder the client + Service<ThriftClientRequest, byte[]> client = + ClientBuilder.safeBuildFactory( + builder.dest(name).reportTo(_statsReceiver.scope("routing")) + ).toService(); + DistributedLogService.ServiceIface service = + new DistributedLogService.ServiceToClient(client, new TBinaryProtocol.Factory()); + return new ClusterClient(client, service); + } + DistributedLogClientImpl buildClient() { checkNotNull(name, "No name provided."); checkNotNull(clientId, "No client id provided."); @@ -511,13 +578,27 @@ public final class DistributedLogClientBuilder { streamStatsReceiver = new NullStatsReceiver(); } + Optional<ClusterClient> serverRoutingServiceClient = Optional.absent(); + if (null != serverRoutingServiceFinagleName) { + serverRoutingServiceClient = Optional.of( + buildServerRoutingServiceClient(serverRoutingServiceFinagleName)); + } + RoutingService routingService = routingServiceBuilder .statsReceiver(statsReceiver.scope("routing")) .build(); DistributedLogClientImpl clientImpl = new DistributedLogClientImpl( - name, clientId, routingService, clientBuilder, clientConfig, - statsReceiver, streamStatsReceiver, regionResolver, enableRegionStats); + name, + clientId, + routingService, + clientBuilder, + clientConfig, + serverRoutingServiceClient, + statsReceiver, + streamStatsReceiver, + regionResolver, + enableRegionStats); routingService.startService(); clientImpl.handshake(); return clientImpl; http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/ddbd7716/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServiceImpl.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServiceImpl.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServiceImpl.java index 12820d9..74da34f 100644 --- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServiceImpl.java +++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServiceImpl.java @@ -24,8 +24,8 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.twitter.distributedlog.DLSN; import com.twitter.distributedlog.DistributedLogConfiguration; import com.twitter.distributedlog.acl.AccessControlManager; +import com.twitter.distributedlog.client.resolver.DefaultRegionResolver; import com.twitter.distributedlog.client.resolver.RegionResolver; -import com.twitter.distributedlog.client.resolver.TwitterRegionResolver; import com.twitter.distributedlog.client.routing.RoutingService; import com.twitter.distributedlog.config.DynamicDistributedLogConfiguration; import com.twitter.distributedlog.exceptions.RegionUnavailableException; @@ -234,7 +234,7 @@ public class DistributedLogServiceImpl implements DistributedLogService.ServiceI streamConfigProvider, dlNamespace); this.routingService = routingService; - this.regionResolver = new TwitterRegionResolver(); + this.regionResolver = new DefaultRegionResolver(); // Service features this.featureRegionStopAcceptNewStream = this.featureProvider.getFeature( http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/ddbd7716/distributedlog-service/src/test/java/com/twitter/distributedlog/service/DistributedLogServerTestCase.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/test/java/com/twitter/distributedlog/service/DistributedLogServerTestCase.java b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/DistributedLogServerTestCase.java index c37248c..4d29f21 100644 --- a/distributedlog-service/src/test/java/com/twitter/distributedlog/service/DistributedLogServerTestCase.java +++ b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/DistributedLogServerTestCase.java @@ -17,6 +17,7 @@ */ package com.twitter.distributedlog.service; +import com.google.common.base.Optional; import com.google.common.collect.Sets; import com.twitter.distributedlog.DistributedLogConfiguration; import com.twitter.distributedlog.client.DistributedLogClientImpl; @@ -60,14 +61,12 @@ public abstract class DistributedLogServerTestCase { protected static class DLClient { public final LocalRoutingService routingService; - public final DistributedLogClientBuilder dlClientBuilder; + public DistributedLogClientBuilder dlClientBuilder; public final DistributedLogClientImpl dlClient; - protected DLClient(String name) { - this(name, ".*"); - } - - protected DLClient(String name, String streamNameRegex) { + protected DLClient(String name, + String streamNameRegex, + Optional<String> serverSideRoutingFinagleName) { routingService = LocalRoutingService.newBuilder().build(); dlClientBuilder = DistributedLogClientBuilder.newBuilder() .name(name) @@ -79,6 +78,10 @@ public abstract class DistributedLogServerTestCase { .hostConnectionLimit(1) .connectionTimeout(Duration.fromSeconds(1)) .requestTimeout(Duration.fromSeconds(60))); + if (serverSideRoutingFinagleName.isPresent()) { + dlClientBuilder = + dlClientBuilder.serverRoutingServiceFinagleNameStr(serverSideRoutingFinagleName.get()); + } dlClient = (DistributedLogClientImpl) dlClientBuilder.build(); } @@ -123,6 +126,7 @@ public abstract class DistributedLogServerTestCase { } } + private final boolean clientSideRouting; protected DLServer dlServer; protected DLClient dlClient; protected DLServer noAdHocServer; @@ -149,7 +153,12 @@ public abstract class DistributedLogServerTestCase { noAdHocCluster = createCluster(noAdHocConf); noAdHocCluster.start(); noAdHocServer = new DLServer(noAdHocConf, noAdHocCluster.getUri(), 7002, false); - noAdHocClient = createDistributedLogClient("no-ad-hoc-client"); + Optional<String> serverSideRoutingFinagleName = Optional.absent(); + if (!clientSideRouting) { + serverSideRoutingFinagleName = + Optional.of("inet!" + DLSocketAddress.toString(noAdHocServer.getAddress())); + } + noAdHocClient = createDistributedLogClient("no-ad-hoc-client", serverSideRoutingFinagleName); } public void tearDownNoAdHocCluster() throws Exception { @@ -175,10 +184,19 @@ public abstract class DistributedLogServerTestCase { return dlCluster.getUri(); } + protected DistributedLogServerTestCase(boolean clientSideRouting) { + this.clientSideRouting = clientSideRouting; + } + @Before public void setup() throws Exception { dlServer = createDistributedLogServer(7001); - dlClient = createDistributedLogClient("test"); + Optional<String> serverSideRoutingFinagleName = Optional.absent(); + if (!clientSideRouting) { + serverSideRoutingFinagleName = + Optional.of("inet!" + DLSocketAddress.toString(dlServer.getAddress())); + } + dlClient = createDistributedLogClient("test", serverSideRoutingFinagleName); } @After @@ -200,13 +218,17 @@ public abstract class DistributedLogServerTestCase { return new DLServer(conf, dlCluster.getUri(), port, false); } - protected DLClient createDistributedLogClient(String clientName) throws Exception { - return createDistributedLogClient(clientName, ".*"); + protected DLClient createDistributedLogClient(String clientName, + Optional<String> serverSideRoutingFinagleName) + throws Exception { + return createDistributedLogClient(clientName, ".*", serverSideRoutingFinagleName); } - protected DLClient createDistributedLogClient(String clientName, String streamNameRegex) + protected DLClient createDistributedLogClient(String clientName, + String streamNameRegex, + Optional<String> serverSideRoutingFinagleName) throws Exception { - return new DLClient(clientName, streamNameRegex); + return new DLClient(clientName, streamNameRegex, serverSideRoutingFinagleName); } protected TwoRegionDLClient createTwoRegionDLClient(String clientName, http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/ddbd7716/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestDistributedLogServer.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestDistributedLogServer.java b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestDistributedLogServer.java deleted file mode 100644 index 63723ef..0000000 --- a/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestDistributedLogServer.java +++ /dev/null @@ -1,730 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.service; - -import com.twitter.distributedlog.AsyncLogReader; -import com.twitter.distributedlog.DLMTestUtil; -import com.twitter.distributedlog.DLSN; -import com.twitter.distributedlog.DistributedLogManager; -import com.twitter.distributedlog.TestZooKeeperClientBuilder; -import com.twitter.distributedlog.annotations.DistributedLogAnnotations; -import com.twitter.distributedlog.exceptions.LogNotFoundException; -import com.twitter.distributedlog.LogReader; -import com.twitter.distributedlog.LogRecord; -import com.twitter.distributedlog.LogRecordWithDLSN; -import com.twitter.distributedlog.ZooKeeperClient; -import com.twitter.distributedlog.acl.AccessControlManager; -import com.twitter.distributedlog.acl.ZKAccessControl; -import com.twitter.distributedlog.client.DistributedLogClientImpl; -import com.twitter.distributedlog.client.routing.LocalRoutingService; -import com.twitter.distributedlog.exceptions.DLException; -import com.twitter.distributedlog.metadata.BKDLConfig; -import com.twitter.distributedlog.namespace.DistributedLogNamespace; -import com.twitter.distributedlog.service.stream.StreamManagerImpl; -import com.twitter.distributedlog.thrift.AccessControlEntry; -import com.twitter.distributedlog.thrift.service.BulkWriteResponse; -import com.twitter.distributedlog.thrift.service.HeartbeatOptions; -import com.twitter.distributedlog.thrift.service.StatusCode; -import com.twitter.distributedlog.thrift.service.WriteContext; -import com.twitter.distributedlog.util.FailpointUtils; -import com.twitter.distributedlog.util.FutureUtils; -import com.twitter.finagle.NoBrokersAvailableException; -import com.twitter.finagle.builder.ClientBuilder; -import com.twitter.finagle.thrift.ClientId$; -import com.twitter.util.Await; -import com.twitter.util.Duration; -import com.twitter.util.Future; -import com.twitter.util.Futures; -import org.junit.Ignore; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TestName; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.net.SocketAddress; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; - -import static com.google.common.base.Charsets.UTF_8; -import static com.twitter.distributedlog.LogRecord.MAX_LOGRECORD_SIZE; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - -public class TestDistributedLogServer extends DistributedLogServerTestCase { - static final Logger logger = LoggerFactory.getLogger(TestDistributedLogServer.class); - - @Rule - public TestName testName = new TestName(); - - /** - * {@link https://issues.apache.org/jira/browse/DL-27} - */ - @DistributedLogAnnotations.FlakyTest - @Ignore - @Test(timeout = 60000) - public void testBasicWrite() throws Exception { - String name = "dlserver-basic-write"; - - dlClient.routingService.addHost(name, dlServer.getAddress()); - - for (long i = 1; i <= 10; i++) { - logger.debug("Write entry {} to stream {}.", i, name); - Await.result(dlClient.dlClient.write(name, ByteBuffer.wrap(("" + i).getBytes()))); - } - - HeartbeatOptions hbOptions = new HeartbeatOptions(); - hbOptions.setSendHeartBeatToReader(true); - // make sure the first log segment of each stream created - FutureUtils.result(dlClient.dlClient.heartbeat(name)); - - DistributedLogManager dlm = DLMTestUtil.createNewDLM(name, conf, getUri()); - LogReader reader = dlm.getInputStream(1); - int numRead = 0; - LogRecord r = reader.readNext(false); - while (null != r) { - ++numRead; - int i = Integer.parseInt(new String(r.getPayload())); - assertEquals(numRead, i); - r = reader.readNext(false); - } - assertEquals(10, numRead); - reader.close(); - dlm.close(); - } - - /** - * Sanity check to make sure both checksum flag values work. - */ - @Test(timeout = 60000) - public void testChecksumFlag() throws Exception { - String name = "testChecksumFlag"; - LocalRoutingService routingService = LocalRoutingService.newBuilder().build(); - routingService.addHost(name, dlServer.getAddress()); - DistributedLogClientBuilder dlClientBuilder = DistributedLogClientBuilder.newBuilder() - .name(name) - .clientId(ClientId$.MODULE$.apply("test")) - .routingService(routingService) - .handshakeWithClientInfo(true) - .clientBuilder(ClientBuilder.get() - .hostConnectionLimit(1) - .connectionTimeout(Duration.fromSeconds(1)) - .requestTimeout(Duration.fromSeconds(60))) - .checksum(false); - DistributedLogClient dlClient = dlClientBuilder.build(); - Await.result(dlClient.write(name, ByteBuffer.wrap(("1").getBytes()))); - dlClient.close(); - - dlClient = dlClientBuilder.checksum(true).build(); - Await.result(dlClient.write(name, ByteBuffer.wrap(("2").getBytes()))); - dlClient.close(); - } - - private void runSimpleBulkWriteTest(int writeCount) throws Exception { - String name = String.format("dlserver-bulk-write-%d", writeCount); - - dlClient.routingService.addHost(name, dlServer.getAddress()); - - List<ByteBuffer> writes = new ArrayList<ByteBuffer>(writeCount); - for (long i = 1; i <= writeCount; i++) { - writes.add(ByteBuffer.wrap(("" + i).getBytes())); - } - - logger.debug("Write {} entries to stream {}.", writeCount, name); - List<Future<DLSN>> futures = dlClient.dlClient.writeBulk(name, writes); - assertEquals(futures.size(), writeCount); - for (Future<DLSN> future : futures) { - // No throw == pass. - DLSN dlsn = Await.result(future, Duration.fromSeconds(10)); - } - - DistributedLogManager dlm = DLMTestUtil.createNewDLM(name, conf, getUri()); - LogReader reader = dlm.getInputStream(1); - int numRead = 0; - LogRecord r = reader.readNext(false); - while (null != r) { - int i = Integer.parseInt(new String(r.getPayload())); - assertEquals(numRead + 1, i); - ++numRead; - r = reader.readNext(false); - } - assertEquals(writeCount, numRead); - reader.close(); - dlm.close(); - } - - @Test(timeout = 60000) - public void testBulkWrite() throws Exception { - runSimpleBulkWriteTest(100); - } - - @Test(timeout = 60000) - public void testBulkWriteSingleWrite() throws Exception { - runSimpleBulkWriteTest(1); - } - - @Test(timeout = 60000) - public void testBulkWriteEmptyList() throws Exception { - String name = String.format("dlserver-bulk-write-%d", 0); - - dlClient.routingService.addHost(name, dlServer.getAddress()); - - List<ByteBuffer> writes = new ArrayList<ByteBuffer>(); - List<Future<DLSN>> futures = dlClient.dlClient.writeBulk(name, writes); - - assertEquals(0, futures.size()); - } - - @Test(timeout = 60000) - public void testBulkWriteNullArg() throws Exception { - - String name = String.format("dlserver-bulk-write-%s", "null"); - - dlClient.routingService.addHost(name, dlServer.getAddress()); - - List<ByteBuffer> writes = new ArrayList<ByteBuffer>(); - writes.add(null); - - try { - List<Future<DLSN>> futureResult = dlClient.dlClient.writeBulk(name, writes); - fail("should not have succeeded"); - } catch (NullPointerException npe) { - ; // expected - } - } - - @Test(timeout = 60000) - public void testBulkWriteEmptyBuffer() throws Exception { - String name = String.format("dlserver-bulk-write-%s", "empty"); - - dlClient.routingService.addHost(name, dlServer.getAddress()); - - List<ByteBuffer> writes = new ArrayList<ByteBuffer>(); - writes.add(ByteBuffer.wrap(("").getBytes())); - writes.add(ByteBuffer.wrap(("").getBytes())); - List<Future<DLSN>> futures = dlClient.dlClient.writeBulk(name, writes); - assertEquals(2, futures.size()); - for (Future<DLSN> future : futures) { - // No throw == pass - DLSN dlsn = Await.result(future, Duration.fromSeconds(10)); - } - } - - void failDueToWrongException(Exception ex) { - logger.info("testBulkWritePartialFailure: ", ex); - fail(String.format("failed with wrong exception %s", ex.getClass().getName())); - } - - int validateAllFailedAsCancelled(List<Future<DLSN>> futures, int start, int finish) { - int failed = 0; - for (int i = start; i < finish; i++) { - Future<DLSN> future = futures.get(i); - try { - DLSN dlsn = Await.result(future, Duration.fromSeconds(10)); - fail("future should have failed!"); - } catch (DLException cre) { - ++failed; - } catch (Exception ex) { - failDueToWrongException(ex); - } - } - return failed; - } - - void validateFailedAsLogRecordTooLong(Future<DLSN> future) { - try { - DLSN dlsn = Await.result(future, Duration.fromSeconds(10)); - fail("should have failed"); - } catch (DLException dle) { - assertEquals(StatusCode.TOO_LARGE_RECORD, dle.getCode()); - } catch (Exception ex) { - failDueToWrongException(ex); - } - } - - @Test(timeout = 60000) - public void testBulkWritePartialFailure() throws Exception { - String name = String.format("dlserver-bulk-write-%s", "partial-failure"); - - dlClient.routingService.addHost(name, dlServer.getAddress()); - - final int writeCount = 100; - - List<ByteBuffer> writes = new ArrayList<ByteBuffer>(writeCount*2 + 1); - for (long i = 1; i <= writeCount; i++) { - writes.add(ByteBuffer.wrap(("" + i).getBytes())); - } - // Too big, will cause partial failure. - ByteBuffer buf = ByteBuffer.allocate(MAX_LOGRECORD_SIZE + 1); - writes.add(buf); - for (long i = 1; i <= writeCount; i++) { - writes.add(ByteBuffer.wrap(("" + i).getBytes())); - } - - // Count succeeded. - List<Future<DLSN>> futures = dlClient.dlClient.writeBulk(name, writes); - int succeeded = 0; - for (int i = 0; i < writeCount; i++) { - Future<DLSN> future = futures.get(i); - try { - DLSN dlsn = Await.result(future, Duration.fromSeconds(10)); - ++succeeded; - } catch (Exception ex) { - failDueToWrongException(ex); - } - } - - validateFailedAsLogRecordTooLong(futures.get(writeCount)); - FutureUtils.result(Futures.collect(futures.subList(writeCount + 1, 2 * writeCount + 1))); - assertEquals(writeCount, succeeded); - } - - @Test(timeout = 60000) - public void testBulkWriteTotalFailureFirstWriteFailed() throws Exception { - String name = String.format("dlserver-bulk-write-%s", "first-write-failed"); - - dlClient.routingService.addHost(name, dlServer.getAddress()); - - final int writeCount = 100; - List<ByteBuffer> writes = new ArrayList<ByteBuffer>(writeCount + 1); - ByteBuffer buf = ByteBuffer.allocate(MAX_LOGRECORD_SIZE + 1); - writes.add(buf); - for (long i = 1; i <= writeCount; i++) { - writes.add(ByteBuffer.wrap(("" + i).getBytes())); - } - - List<Future<DLSN>> futures = dlClient.dlClient.writeBulk(name, writes); - validateFailedAsLogRecordTooLong(futures.get(0)); - FutureUtils.result(Futures.collect(futures.subList(1, writeCount + 1))); - } - - @Test(timeout = 60000) - public void testBulkWriteTotalFailureLostLock() throws Exception { - String name = String.format("dlserver-bulk-write-%s", "lost-lock"); - - dlClient.routingService.addHost(name, dlServer.getAddress()); - - final int writeCount = 8; - List<ByteBuffer> writes = new ArrayList<ByteBuffer>(writeCount + 1); - ByteBuffer buf = ByteBuffer.allocate(8); - writes.add(buf); - for (long i = 1; i <= writeCount; i++) { - writes.add(ByteBuffer.wrap(("" + i).getBytes())); - } - // Warm it up with a write. - Await.result(dlClient.dlClient.write(name, ByteBuffer.allocate(8))); - - // Failpoint a lost lock, make sure the failure gets promoted to an operation failure. - DistributedLogServiceImpl svcImpl = (DistributedLogServiceImpl) dlServer.dlServer.getLeft(); - try { - FailpointUtils.setFailpoint( - FailpointUtils.FailPointName.FP_WriteInternalLostLock, - FailpointUtils.FailPointActions.FailPointAction_Default - ); - Future<BulkWriteResponse> futures = svcImpl.writeBulkWithContext(name, writes, new WriteContext()); - assertEquals(StatusCode.LOCKING_EXCEPTION, Await.result(futures).header.code); - } finally { - FailpointUtils.removeFailpoint( - FailpointUtils.FailPointName.FP_WriteInternalLostLock - ); - } - } - - @Test(timeout = 60000) - public void testHeartbeat() throws Exception { - String name = "dlserver-heartbeat"; - - dlClient.routingService.addHost(name, dlServer.getAddress()); - - for (long i = 1; i <= 10; i++) { - logger.debug("Send heartbeat {} to stream {}.", i, name); - dlClient.dlClient.check(name).get(); - } - - logger.debug("Write entry one to stream {}.", name); - dlClient.dlClient.write(name, ByteBuffer.wrap("1".getBytes())).get(); - - Thread.sleep(1000); - - DistributedLogManager dlm = DLMTestUtil.createNewDLM(name, conf, getUri()); - LogReader reader = dlm.getInputStream(DLSN.InitialDLSN); - int numRead = 0; - // eid=0 => control records - // other 9 heartbeats will not trigger writing any control records. - // eid=1 => user entry - long startEntryId = 1; - LogRecordWithDLSN r = reader.readNext(false); - while (null != r) { - int i = Integer.parseInt(new String(r.getPayload())); - assertEquals(numRead + 1, i); - assertEquals(r.getDlsn().compareTo(new DLSN(1, startEntryId, 0)), 0); - ++numRead; - ++startEntryId; - r = reader.readNext(false); - } - assertEquals(1, numRead); - } - - @Test(timeout = 60000) - public void testFenceWrite() throws Exception { - String name = "dlserver-fence-write"; - - dlClient.routingService.addHost(name, dlServer.getAddress()); - - for (long i = 1; i <= 10; i++) { - logger.debug("Write entry {} to stream {}.", i, name); - dlClient.dlClient.write(name, ByteBuffer.wrap(("" + i).getBytes())).get(); - } - - Thread.sleep(1000); - - logger.info("Fencing stream {}.", name); - DLMTestUtil.fenceStream(conf, getUri(), name); - logger.info("Fenced stream {}.", name); - - for (long i = 11; i <= 20; i++) { - logger.debug("Write entry {} to stream {}.", i, name); - dlClient.dlClient.write(name, ByteBuffer.wrap(("" + i).getBytes())).get(); - } - - DistributedLogManager dlm = DLMTestUtil.createNewDLM(name, conf, getUri()); - LogReader reader = dlm.getInputStream(1); - int numRead = 0; - LogRecord r = reader.readNext(false); - while (null != r) { - int i = Integer.parseInt(new String(r.getPayload())); - assertEquals(numRead + 1, i); - ++numRead; - r = reader.readNext(false); - } - assertEquals(20, numRead); - reader.close(); - dlm.close(); - } - - @Test(timeout = 60000) - public void testDeleteStream() throws Exception { - String name = "dlserver-delete-stream"; - - dlClient.routingService.addHost(name, dlServer.getAddress()); - - long txid = 101; - for (long i = 1; i <= 10; i++) { - long curTxId = txid++; - logger.debug("Write entry {} to stream {}.", curTxId, name); - dlClient.dlClient.write(name, - ByteBuffer.wrap(("" + curTxId).getBytes())).get(); - } - - checkStream(1, 1, 1, name, dlServer.getAddress(), true, true); - - dlClient.dlClient.delete(name).get(); - - checkStream(0, 0, 0, name, dlServer.getAddress(), false, false); - - Thread.sleep(1000); - - DistributedLogManager dlm101 = DLMTestUtil.createNewDLM(name, conf, getUri()); - AsyncLogReader reader101 = FutureUtils.result(dlm101.openAsyncLogReader(DLSN.InitialDLSN)); - try { - FutureUtils.result(reader101.readNext()); - fail("Should fail with LogNotFoundException since the stream is deleted"); - } catch (LogNotFoundException lnfe) { - // expected - } - FutureUtils.result(reader101.asyncClose()); - dlm101.close(); - - txid = 201; - for (long i = 1; i <= 10; i++) { - long curTxId = txid++; - logger.debug("Write entry {} to stream {}.", curTxId, name); - DLSN dlsn = dlClient.dlClient.write(name, - ByteBuffer.wrap(("" + curTxId).getBytes())).get(); - } - Thread.sleep(1000); - - DistributedLogManager dlm201 = DLMTestUtil.createNewDLM(name, conf, getUri()); - LogReader reader201 = dlm201.getInputStream(1); - int numRead = 0; - int curTxId = 201; - LogRecord r = reader201.readNext(false); - while (null != r) { - int i = Integer.parseInt(new String(r.getPayload())); - assertEquals(curTxId++, i); - ++numRead; - r = reader201.readNext(false); - } - assertEquals(10, numRead); - reader201.close(); - dlm201.close(); - } - - @Test(timeout = 60000) - public void testCreateStream() throws Exception { - try { - setupNoAdHocCluster(); - final String name = "dlserver-create-stream"; - - noAdHocClient.routingService.addHost("dlserver-create-stream", noAdHocServer.getAddress()); - assertFalse(noAdHocServer.dlServer.getKey().getStreamManager().isAcquired(name)); - assertTrue(Await.ready(noAdHocClient.dlClient.create(name)).isReturn()); - - long txid = 101; - for (long i = 1; i <= 10; i++) { - long curTxId = txid++; - logger.debug("Write entry {} to stream {}.", curTxId, name); - noAdHocClient.dlClient.write(name, - ByteBuffer.wrap(("" + curTxId).getBytes())).get(); - } - - assertTrue(noAdHocServer.dlServer.getKey().getStreamManager().isAcquired(name)); - } finally { - tearDownNoAdHocCluster(); - } - } - - /** This tests that create has touch like behavior in that trying to create the stream twice, simply does nothing */ - @Test(timeout = 60000) - public void testCreateStreamTwice() throws Exception { - try { - setupNoAdHocCluster(); - final String name = "dlserver-create-stream-twice"; - - noAdHocClient.routingService.addHost("dlserver-create-stream-twice", noAdHocServer.getAddress()); - assertFalse(noAdHocServer.dlServer.getKey().getStreamManager().isAcquired(name)); - assertTrue(Await.ready(noAdHocClient.dlClient.create(name)).isReturn()); - - long txid = 101; - for (long i = 1; i <= 10; i++) { - long curTxId = txid++; - logger.debug("Write entry {} to stream {}.", curTxId, name); - noAdHocClient.dlClient.write(name, - ByteBuffer.wrap(("" + curTxId).getBytes())).get(); - } - - assertTrue(noAdHocServer.dlServer.getKey().getStreamManager().isAcquired(name)); - - // create again - assertTrue(Await.ready(noAdHocClient.dlClient.create(name)).isReturn()); - assertTrue(noAdHocServer.dlServer.getKey().getStreamManager().isAcquired(name)); - } finally { - tearDownNoAdHocCluster(); - } - } - - - - @Test(timeout = 60000) - public void testTruncateStream() throws Exception { - String name = "dlserver-truncate-stream"; - - dlClient.routingService.addHost(name, dlServer.getAddress()); - - long txid = 1; - Map<Long, DLSN> txid2DLSN = new HashMap<Long, DLSN>(); - for (int s = 1; s <= 3; s++) { - for (long i = 1; i <= 10; i++) { - long curTxId = txid++; - logger.debug("Write entry {} to stream {}.", curTxId, name); - DLSN dlsn = dlClient.dlClient.write(name, - ByteBuffer.wrap(("" + curTxId).getBytes())).get(); - txid2DLSN.put(curTxId, dlsn); - } - if (s <= 2) { - dlClient.dlClient.release(name).get(); - } - } - - DLSN dlsnToDelete = txid2DLSN.get(21L); - dlClient.dlClient.truncate(name, dlsnToDelete).get(); - - DistributedLogManager readDLM = DLMTestUtil.createNewDLM(name, conf, getUri()); - LogReader reader = readDLM.getInputStream(1); - int numRead = 0; - int curTxId = 11; - LogRecord r = reader.readNext(false); - while (null != r) { - int i = Integer.parseInt(new String(r.getPayload())); - assertEquals(curTxId++, i); - ++numRead; - r = reader.readNext(false); - } - assertEquals(20, numRead); - reader.close(); - readDLM.close(); - } - - @Test(timeout = 60000) - public void testRequestDenied() throws Exception { - String name = "request-denied"; - - dlClient.routingService.addHost(name, dlServer.getAddress()); - - AccessControlEntry ace = new AccessControlEntry(); - ace.setDenyWrite(true); - ZooKeeperClient zkc = TestZooKeeperClientBuilder - .newBuilder() - .uri(getUri()) - .connectionTimeoutMs(60000) - .sessionTimeoutMs(60000) - .build(); - DistributedLogNamespace dlNamespace = dlServer.dlServer.getLeft().getDistributedLogNamespace(); - BKDLConfig bkdlConfig = BKDLConfig.resolveDLConfig(zkc, getUri()); - String zkPath = getUri().getPath() + "/" + bkdlConfig.getACLRootPath() + "/" + name; - ZKAccessControl accessControl = new ZKAccessControl(ace, zkPath); - accessControl.create(zkc); - - AccessControlManager acm = dlNamespace.createAccessControlManager(); - while (acm.allowWrite(name)) { - Thread.sleep(100); - } - - try { - Await.result(dlClient.dlClient.write(name, ByteBuffer.wrap("1".getBytes(UTF_8)))); - fail("Should fail with request denied exception"); - } catch (DLException dle) { - assertEquals(StatusCode.REQUEST_DENIED, dle.getCode()); - } - } - - @Test(timeout = 60000) - public void testNoneStreamNameRegex() throws Exception { - String streamNamePrefix = "none-stream-name-regex-"; - int numStreams = 5; - Set<String> streams = new HashSet<String>(); - - for (int i = 0; i < numStreams; i++) { - streams.add(streamNamePrefix + i); - } - testStreamNameRegex(streams, ".*", streams); - } - - @Test(timeout = 60000) - public void testStreamNameRegex() throws Exception { - String streamNamePrefix = "stream-name-regex-"; - int numStreams = 5; - Set<String> streams = new HashSet<String>(); - Set<String> expectedStreams = new HashSet<String>(); - String streamNameRegex = streamNamePrefix + "1"; - - for (int i = 0; i < numStreams; i++) { - streams.add(streamNamePrefix + i); - } - expectedStreams.add(streamNamePrefix + "1"); - - testStreamNameRegex(streams, streamNameRegex, expectedStreams); - } - - private void testStreamNameRegex(Set<String> streams, String streamNameRegex, - Set<String> expectedStreams) - throws Exception { - for (String streamName : streams) { - dlClient.routingService.addHost(streamName, dlServer.getAddress()); - Await.result(dlClient.dlClient.write(streamName, - ByteBuffer.wrap(streamName.getBytes(UTF_8)))); - } - - DLClient client = createDistributedLogClient("test-stream-name-regex", streamNameRegex); - try { - client.routingService.addHost("unknown", dlServer.getAddress()); - client.handshake(); - Map<SocketAddress, Set<String>> distribution = - client.dlClient.getStreamOwnershipDistribution(); - assertEquals(1, distribution.size()); - Set<String> cachedStreams = distribution.values().iterator().next(); - assertNotNull(cachedStreams); - assertEquals(expectedStreams.size(), cachedStreams.size()); - - for (String streamName : cachedStreams) { - assertTrue(expectedStreams.contains(streamName)); - } - } finally { - client.shutdown(); - } - } - - @Test(timeout = 60000) - public void testReleaseStream() throws Exception { - String name = "dlserver-release-stream"; - - dlClient.routingService.addHost(name, dlServer.getAddress()); - - Await.result(dlClient.dlClient.write(name, ByteBuffer.wrap("1".getBytes(UTF_8)))); - checkStream(1, 1, 1, name, dlServer.getAddress(), true, true); - - // release the stream - Await.result(dlClient.dlClient.release(name)); - checkStream(0, 0, 0, name, dlServer.getAddress(), false, false); - } - - @Test(timeout = 60000) - public void testAcceptNewStream() throws Exception { - String name = "dlserver-accept-new-stream"; - - dlClient.routingService.addHost(name, dlServer.getAddress()); - dlClient.routingService.setAllowRetrySameHost(false); - - Await.result(dlClient.dlClient.setAcceptNewStream(false)); - - try { - Await.result(dlClient.dlClient.write(name, ByteBuffer.wrap("1".getBytes(UTF_8)))); - fail("Should fail because the proxy couldn't accept new stream"); - } catch (NoBrokersAvailableException nbae) { - // expected - } - checkStream(0, 0, 0, name, dlServer.getAddress(), false, false); - - Await.result(dlServer.dlServer.getLeft().setAcceptNewStream(true)); - Await.result(dlClient.dlClient.write(name, ByteBuffer.wrap("1".getBytes(UTF_8)))); - checkStream(1, 1, 1, name, dlServer.getAddress(), true, true); - } - - private void checkStream(int expectedNumProxiesInClient, int expectedClientCacheSize, int expectedServerCacheSize, - String name, SocketAddress owner, boolean existedInServer, boolean existedInClient) { - Map<SocketAddress, Set<String>> distribution = dlClient.dlClient.getStreamOwnershipDistribution(); - assertEquals(expectedNumProxiesInClient, distribution.size()); - - if (expectedNumProxiesInClient > 0) { - Map.Entry<SocketAddress, Set<String>> localEntry = - distribution.entrySet().iterator().next(); - assertEquals(owner, localEntry.getKey()); - assertEquals(expectedClientCacheSize, localEntry.getValue().size()); - assertEquals(existedInClient, localEntry.getValue().contains(name)); - } - - - StreamManagerImpl streamManager = (StreamManagerImpl) dlServer.dlServer.getKey().getStreamManager(); - Set<String> cachedStreams = streamManager.getCachedStreams().keySet(); - Set<String> acquiredStreams = streamManager.getCachedStreams().keySet(); - - assertEquals(expectedServerCacheSize, cachedStreams.size()); - assertEquals(existedInServer, cachedStreams.contains(name)); - assertEquals(expectedServerCacheSize, acquiredStreams.size()); - assertEquals(existedInServer, acquiredStreams.contains(name)); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/ddbd7716/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestDistributedLogServerBase.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestDistributedLogServerBase.java b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestDistributedLogServerBase.java new file mode 100644 index 0000000..a0853e4 --- /dev/null +++ b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestDistributedLogServerBase.java @@ -0,0 +1,714 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.twitter.distributedlog.service; + +import com.google.common.base.Optional; +import com.twitter.distributedlog.AsyncLogReader; +import com.twitter.distributedlog.DLMTestUtil; +import com.twitter.distributedlog.DLSN; +import com.twitter.distributedlog.DistributedLogManager; +import com.twitter.distributedlog.TestZooKeeperClientBuilder; +import com.twitter.distributedlog.annotations.DistributedLogAnnotations; +import com.twitter.distributedlog.exceptions.LogNotFoundException; +import com.twitter.distributedlog.LogReader; +import com.twitter.distributedlog.LogRecord; +import com.twitter.distributedlog.LogRecordWithDLSN; +import com.twitter.distributedlog.ZooKeeperClient; +import com.twitter.distributedlog.acl.AccessControlManager; +import com.twitter.distributedlog.acl.ZKAccessControl; +import com.twitter.distributedlog.client.routing.LocalRoutingService; +import com.twitter.distributedlog.exceptions.DLException; +import com.twitter.distributedlog.metadata.BKDLConfig; +import com.twitter.distributedlog.namespace.DistributedLogNamespace; +import com.twitter.distributedlog.service.stream.StreamManagerImpl; +import com.twitter.distributedlog.thrift.AccessControlEntry; +import com.twitter.distributedlog.thrift.service.BulkWriteResponse; +import com.twitter.distributedlog.thrift.service.HeartbeatOptions; +import com.twitter.distributedlog.thrift.service.StatusCode; +import com.twitter.distributedlog.thrift.service.WriteContext; +import com.twitter.distributedlog.util.FailpointUtils; +import com.twitter.distributedlog.util.FutureUtils; +import com.twitter.finagle.builder.ClientBuilder; +import com.twitter.finagle.thrift.ClientId$; +import com.twitter.util.Await; +import com.twitter.util.Duration; +import com.twitter.util.Future; +import com.twitter.util.Futures; +import org.junit.Ignore; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestName; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.SocketAddress; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static com.google.common.base.Charsets.UTF_8; +import static com.twitter.distributedlog.LogRecord.MAX_LOGRECORD_SIZE; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +public abstract class TestDistributedLogServerBase extends DistributedLogServerTestCase { + static final Logger logger = LoggerFactory.getLogger(TestDistributedLogServerBase.class); + + @Rule + public TestName testName = new TestName(); + + protected TestDistributedLogServerBase(boolean clientSideRouting) { + super(clientSideRouting); + } + + /** + * {@link https://issues.apache.org/jira/browse/DL-27} + */ + @DistributedLogAnnotations.FlakyTest + @Ignore + @Test(timeout = 60000) + public void testBasicWrite() throws Exception { + String name = "dlserver-basic-write"; + + dlClient.routingService.addHost(name, dlServer.getAddress()); + + for (long i = 1; i <= 10; i++) { + logger.debug("Write entry {} to stream {}.", i, name); + Await.result(dlClient.dlClient.write(name, ByteBuffer.wrap(("" + i).getBytes()))); + } + + HeartbeatOptions hbOptions = new HeartbeatOptions(); + hbOptions.setSendHeartBeatToReader(true); + // make sure the first log segment of each stream created + FutureUtils.result(dlClient.dlClient.heartbeat(name)); + + DistributedLogManager dlm = DLMTestUtil.createNewDLM(name, conf, getUri()); + LogReader reader = dlm.getInputStream(1); + int numRead = 0; + LogRecord r = reader.readNext(false); + while (null != r) { + ++numRead; + int i = Integer.parseInt(new String(r.getPayload())); + assertEquals(numRead, i); + r = reader.readNext(false); + } + assertEquals(10, numRead); + reader.close(); + dlm.close(); + } + + /** + * Sanity check to make sure both checksum flag values work. + */ + @Test(timeout = 60000) + public void testChecksumFlag() throws Exception { + String name = "testChecksumFlag"; + LocalRoutingService routingService = LocalRoutingService.newBuilder().build(); + routingService.addHost(name, dlServer.getAddress()); + DistributedLogClientBuilder dlClientBuilder = DistributedLogClientBuilder.newBuilder() + .name(name) + .clientId(ClientId$.MODULE$.apply("test")) + .routingService(routingService) + .handshakeWithClientInfo(true) + .clientBuilder(ClientBuilder.get() + .hostConnectionLimit(1) + .connectionTimeout(Duration.fromSeconds(1)) + .requestTimeout(Duration.fromSeconds(60))) + .checksum(false); + DistributedLogClient dlClient = dlClientBuilder.build(); + Await.result(dlClient.write(name, ByteBuffer.wrap(("1").getBytes()))); + dlClient.close(); + + dlClient = dlClientBuilder.checksum(true).build(); + Await.result(dlClient.write(name, ByteBuffer.wrap(("2").getBytes()))); + dlClient.close(); + } + + private void runSimpleBulkWriteTest(int writeCount) throws Exception { + String name = String.format("dlserver-bulk-write-%d", writeCount); + + dlClient.routingService.addHost(name, dlServer.getAddress()); + + List<ByteBuffer> writes = new ArrayList<ByteBuffer>(writeCount); + for (long i = 1; i <= writeCount; i++) { + writes.add(ByteBuffer.wrap(("" + i).getBytes())); + } + + logger.debug("Write {} entries to stream {}.", writeCount, name); + List<Future<DLSN>> futures = dlClient.dlClient.writeBulk(name, writes); + assertEquals(futures.size(), writeCount); + for (Future<DLSN> future : futures) { + // No throw == pass. + DLSN dlsn = Await.result(future, Duration.fromSeconds(10)); + } + + DistributedLogManager dlm = DLMTestUtil.createNewDLM(name, conf, getUri()); + LogReader reader = dlm.getInputStream(1); + int numRead = 0; + LogRecord r = reader.readNext(false); + while (null != r) { + int i = Integer.parseInt(new String(r.getPayload())); + assertEquals(numRead + 1, i); + ++numRead; + r = reader.readNext(false); + } + assertEquals(writeCount, numRead); + reader.close(); + dlm.close(); + } + + @Test(timeout = 60000) + public void testBulkWrite() throws Exception { + runSimpleBulkWriteTest(100); + } + + @Test(timeout = 60000) + public void testBulkWriteSingleWrite() throws Exception { + runSimpleBulkWriteTest(1); + } + + @Test(timeout = 60000) + public void testBulkWriteEmptyList() throws Exception { + String name = String.format("dlserver-bulk-write-%d", 0); + + dlClient.routingService.addHost(name, dlServer.getAddress()); + + List<ByteBuffer> writes = new ArrayList<ByteBuffer>(); + List<Future<DLSN>> futures = dlClient.dlClient.writeBulk(name, writes); + + assertEquals(0, futures.size()); + } + + @Test(timeout = 60000) + public void testBulkWriteNullArg() throws Exception { + + String name = String.format("dlserver-bulk-write-%s", "null"); + + dlClient.routingService.addHost(name, dlServer.getAddress()); + + List<ByteBuffer> writes = new ArrayList<ByteBuffer>(); + writes.add(null); + + try { + List<Future<DLSN>> futureResult = dlClient.dlClient.writeBulk(name, writes); + fail("should not have succeeded"); + } catch (NullPointerException npe) { + ; // expected + } + } + + @Test(timeout = 60000) + public void testBulkWriteEmptyBuffer() throws Exception { + String name = String.format("dlserver-bulk-write-%s", "empty"); + + dlClient.routingService.addHost(name, dlServer.getAddress()); + + List<ByteBuffer> writes = new ArrayList<ByteBuffer>(); + writes.add(ByteBuffer.wrap(("").getBytes())); + writes.add(ByteBuffer.wrap(("").getBytes())); + List<Future<DLSN>> futures = dlClient.dlClient.writeBulk(name, writes); + assertEquals(2, futures.size()); + for (Future<DLSN> future : futures) { + // No throw == pass + DLSN dlsn = Await.result(future, Duration.fromSeconds(10)); + } + } + + void failDueToWrongException(Exception ex) { + logger.info("testBulkWritePartialFailure: ", ex); + fail(String.format("failed with wrong exception %s", ex.getClass().getName())); + } + + int validateAllFailedAsCancelled(List<Future<DLSN>> futures, int start, int finish) { + int failed = 0; + for (int i = start; i < finish; i++) { + Future<DLSN> future = futures.get(i); + try { + DLSN dlsn = Await.result(future, Duration.fromSeconds(10)); + fail("future should have failed!"); + } catch (DLException cre) { + ++failed; + } catch (Exception ex) { + failDueToWrongException(ex); + } + } + return failed; + } + + void validateFailedAsLogRecordTooLong(Future<DLSN> future) { + try { + DLSN dlsn = Await.result(future, Duration.fromSeconds(10)); + fail("should have failed"); + } catch (DLException dle) { + assertEquals(StatusCode.TOO_LARGE_RECORD, dle.getCode()); + } catch (Exception ex) { + failDueToWrongException(ex); + } + } + + @Test(timeout = 60000) + public void testBulkWritePartialFailure() throws Exception { + String name = String.format("dlserver-bulk-write-%s", "partial-failure"); + + dlClient.routingService.addHost(name, dlServer.getAddress()); + + final int writeCount = 100; + + List<ByteBuffer> writes = new ArrayList<ByteBuffer>(writeCount*2 + 1); + for (long i = 1; i <= writeCount; i++) { + writes.add(ByteBuffer.wrap(("" + i).getBytes())); + } + // Too big, will cause partial failure. + ByteBuffer buf = ByteBuffer.allocate(MAX_LOGRECORD_SIZE + 1); + writes.add(buf); + for (long i = 1; i <= writeCount; i++) { + writes.add(ByteBuffer.wrap(("" + i).getBytes())); + } + + // Count succeeded. + List<Future<DLSN>> futures = dlClient.dlClient.writeBulk(name, writes); + int succeeded = 0; + for (int i = 0; i < writeCount; i++) { + Future<DLSN> future = futures.get(i); + try { + DLSN dlsn = Await.result(future, Duration.fromSeconds(10)); + ++succeeded; + } catch (Exception ex) { + failDueToWrongException(ex); + } + } + + validateFailedAsLogRecordTooLong(futures.get(writeCount)); + FutureUtils.result(Futures.collect(futures.subList(writeCount + 1, 2 * writeCount + 1))); + assertEquals(writeCount, succeeded); + } + + @Test(timeout = 60000) + public void testBulkWriteTotalFailureFirstWriteFailed() throws Exception { + String name = String.format("dlserver-bulk-write-%s", "first-write-failed"); + + dlClient.routingService.addHost(name, dlServer.getAddress()); + + final int writeCount = 100; + List<ByteBuffer> writes = new ArrayList<ByteBuffer>(writeCount + 1); + ByteBuffer buf = ByteBuffer.allocate(MAX_LOGRECORD_SIZE + 1); + writes.add(buf); + for (long i = 1; i <= writeCount; i++) { + writes.add(ByteBuffer.wrap(("" + i).getBytes())); + } + + List<Future<DLSN>> futures = dlClient.dlClient.writeBulk(name, writes); + validateFailedAsLogRecordTooLong(futures.get(0)); + FutureUtils.result(Futures.collect(futures.subList(1, writeCount + 1))); + } + + @Test(timeout = 60000) + public void testBulkWriteTotalFailureLostLock() throws Exception { + String name = String.format("dlserver-bulk-write-%s", "lost-lock"); + + dlClient.routingService.addHost(name, dlServer.getAddress()); + + final int writeCount = 8; + List<ByteBuffer> writes = new ArrayList<ByteBuffer>(writeCount + 1); + ByteBuffer buf = ByteBuffer.allocate(8); + writes.add(buf); + for (long i = 1; i <= writeCount; i++) { + writes.add(ByteBuffer.wrap(("" + i).getBytes())); + } + // Warm it up with a write. + Await.result(dlClient.dlClient.write(name, ByteBuffer.allocate(8))); + + // Failpoint a lost lock, make sure the failure gets promoted to an operation failure. + DistributedLogServiceImpl svcImpl = (DistributedLogServiceImpl) dlServer.dlServer.getLeft(); + try { + FailpointUtils.setFailpoint( + FailpointUtils.FailPointName.FP_WriteInternalLostLock, + FailpointUtils.FailPointActions.FailPointAction_Default + ); + Future<BulkWriteResponse> futures = svcImpl.writeBulkWithContext(name, writes, new WriteContext()); + assertEquals(StatusCode.LOCKING_EXCEPTION, Await.result(futures).header.code); + } finally { + FailpointUtils.removeFailpoint( + FailpointUtils.FailPointName.FP_WriteInternalLostLock + ); + } + } + + @Test(timeout = 60000) + public void testHeartbeat() throws Exception { + String name = "dlserver-heartbeat"; + + dlClient.routingService.addHost(name, dlServer.getAddress()); + + for (long i = 1; i <= 10; i++) { + logger.debug("Send heartbeat {} to stream {}.", i, name); + dlClient.dlClient.check(name).get(); + } + + logger.debug("Write entry one to stream {}.", name); + dlClient.dlClient.write(name, ByteBuffer.wrap("1".getBytes())).get(); + + Thread.sleep(1000); + + DistributedLogManager dlm = DLMTestUtil.createNewDLM(name, conf, getUri()); + LogReader reader = dlm.getInputStream(DLSN.InitialDLSN); + int numRead = 0; + // eid=0 => control records + // other 9 heartbeats will not trigger writing any control records. + // eid=1 => user entry + long startEntryId = 1; + LogRecordWithDLSN r = reader.readNext(false); + while (null != r) { + int i = Integer.parseInt(new String(r.getPayload())); + assertEquals(numRead + 1, i); + assertEquals(r.getDlsn().compareTo(new DLSN(1, startEntryId, 0)), 0); + ++numRead; + ++startEntryId; + r = reader.readNext(false); + } + assertEquals(1, numRead); + } + + @Test(timeout = 60000) + public void testFenceWrite() throws Exception { + String name = "dlserver-fence-write"; + + dlClient.routingService.addHost(name, dlServer.getAddress()); + + for (long i = 1; i <= 10; i++) { + logger.debug("Write entry {} to stream {}.", i, name); + dlClient.dlClient.write(name, ByteBuffer.wrap(("" + i).getBytes())).get(); + } + + Thread.sleep(1000); + + logger.info("Fencing stream {}.", name); + DLMTestUtil.fenceStream(conf, getUri(), name); + logger.info("Fenced stream {}.", name); + + for (long i = 11; i <= 20; i++) { + logger.debug("Write entry {} to stream {}.", i, name); + dlClient.dlClient.write(name, ByteBuffer.wrap(("" + i).getBytes())).get(); + } + + DistributedLogManager dlm = DLMTestUtil.createNewDLM(name, conf, getUri()); + LogReader reader = dlm.getInputStream(1); + int numRead = 0; + LogRecord r = reader.readNext(false); + while (null != r) { + int i = Integer.parseInt(new String(r.getPayload())); + assertEquals(numRead + 1, i); + ++numRead; + r = reader.readNext(false); + } + assertEquals(20, numRead); + reader.close(); + dlm.close(); + } + + @Test(timeout = 60000) + public void testDeleteStream() throws Exception { + String name = "dlserver-delete-stream"; + + dlClient.routingService.addHost(name, dlServer.getAddress()); + + long txid = 101; + for (long i = 1; i <= 10; i++) { + long curTxId = txid++; + logger.debug("Write entry {} to stream {}.", curTxId, name); + dlClient.dlClient.write(name, + ByteBuffer.wrap(("" + curTxId).getBytes())).get(); + } + + checkStream(1, 1, 1, name, dlServer.getAddress(), true, true); + + dlClient.dlClient.delete(name).get(); + + checkStream(0, 0, 0, name, dlServer.getAddress(), false, false); + + Thread.sleep(1000); + + DistributedLogManager dlm101 = DLMTestUtil.createNewDLM(name, conf, getUri()); + AsyncLogReader reader101 = FutureUtils.result(dlm101.openAsyncLogReader(DLSN.InitialDLSN)); + try { + FutureUtils.result(reader101.readNext()); + fail("Should fail with LogNotFoundException since the stream is deleted"); + } catch (LogNotFoundException lnfe) { + // expected + } + FutureUtils.result(reader101.asyncClose()); + dlm101.close(); + + txid = 201; + for (long i = 1; i <= 10; i++) { + long curTxId = txid++; + logger.debug("Write entry {} to stream {}.", curTxId, name); + DLSN dlsn = dlClient.dlClient.write(name, + ByteBuffer.wrap(("" + curTxId).getBytes())).get(); + } + Thread.sleep(1000); + + DistributedLogManager dlm201 = DLMTestUtil.createNewDLM(name, conf, getUri()); + LogReader reader201 = dlm201.getInputStream(1); + int numRead = 0; + int curTxId = 201; + LogRecord r = reader201.readNext(false); + while (null != r) { + int i = Integer.parseInt(new String(r.getPayload())); + assertEquals(curTxId++, i); + ++numRead; + r = reader201.readNext(false); + } + assertEquals(10, numRead); + reader201.close(); + dlm201.close(); + } + + @Test(timeout = 60000) + public void testCreateStream() throws Exception { + try { + setupNoAdHocCluster(); + final String name = "dlserver-create-stream"; + + noAdHocClient.routingService.addHost("dlserver-create-stream", noAdHocServer.getAddress()); + assertFalse(noAdHocServer.dlServer.getKey().getStreamManager().isAcquired(name)); + assertTrue(Await.ready(noAdHocClient.dlClient.create(name)).isReturn()); + + long txid = 101; + for (long i = 1; i <= 10; i++) { + long curTxId = txid++; + logger.debug("Write entry {} to stream {}.", curTxId, name); + noAdHocClient.dlClient.write(name, + ByteBuffer.wrap(("" + curTxId).getBytes())).get(); + } + + assertTrue(noAdHocServer.dlServer.getKey().getStreamManager().isAcquired(name)); + } finally { + tearDownNoAdHocCluster(); + } + } + + /** This tests that create has touch like behavior in that trying to create the stream twice, simply does nothing */ + @Test(timeout = 60000) + public void testCreateStreamTwice() throws Exception { + try { + setupNoAdHocCluster(); + final String name = "dlserver-create-stream-twice"; + + noAdHocClient.routingService.addHost("dlserver-create-stream-twice", noAdHocServer.getAddress()); + assertFalse(noAdHocServer.dlServer.getKey().getStreamManager().isAcquired(name)); + assertTrue(Await.ready(noAdHocClient.dlClient.create(name)).isReturn()); + + long txid = 101; + for (long i = 1; i <= 10; i++) { + long curTxId = txid++; + logger.debug("Write entry {} to stream {}.", curTxId, name); + noAdHocClient.dlClient.write(name, + ByteBuffer.wrap(("" + curTxId).getBytes())).get(); + } + + assertTrue(noAdHocServer.dlServer.getKey().getStreamManager().isAcquired(name)); + + // create again + assertTrue(Await.ready(noAdHocClient.dlClient.create(name)).isReturn()); + assertTrue(noAdHocServer.dlServer.getKey().getStreamManager().isAcquired(name)); + } finally { + tearDownNoAdHocCluster(); + } + } + + + + @Test(timeout = 60000) + public void testTruncateStream() throws Exception { + String name = "dlserver-truncate-stream"; + + dlClient.routingService.addHost(name, dlServer.getAddress()); + + long txid = 1; + Map<Long, DLSN> txid2DLSN = new HashMap<Long, DLSN>(); + for (int s = 1; s <= 3; s++) { + for (long i = 1; i <= 10; i++) { + long curTxId = txid++; + logger.debug("Write entry {} to stream {}.", curTxId, name); + DLSN dlsn = dlClient.dlClient.write(name, + ByteBuffer.wrap(("" + curTxId).getBytes())).get(); + txid2DLSN.put(curTxId, dlsn); + } + if (s <= 2) { + dlClient.dlClient.release(name).get(); + } + } + + DLSN dlsnToDelete = txid2DLSN.get(21L); + dlClient.dlClient.truncate(name, dlsnToDelete).get(); + + DistributedLogManager readDLM = DLMTestUtil.createNewDLM(name, conf, getUri()); + LogReader reader = readDLM.getInputStream(1); + int numRead = 0; + int curTxId = 11; + LogRecord r = reader.readNext(false); + while (null != r) { + int i = Integer.parseInt(new String(r.getPayload())); + assertEquals(curTxId++, i); + ++numRead; + r = reader.readNext(false); + } + assertEquals(20, numRead); + reader.close(); + readDLM.close(); + } + + @Test(timeout = 60000) + public void testRequestDenied() throws Exception { + String name = "request-denied"; + + dlClient.routingService.addHost(name, dlServer.getAddress()); + + AccessControlEntry ace = new AccessControlEntry(); + ace.setDenyWrite(true); + ZooKeeperClient zkc = TestZooKeeperClientBuilder + .newBuilder() + .uri(getUri()) + .connectionTimeoutMs(60000) + .sessionTimeoutMs(60000) + .build(); + DistributedLogNamespace dlNamespace = dlServer.dlServer.getLeft().getDistributedLogNamespace(); + BKDLConfig bkdlConfig = BKDLConfig.resolveDLConfig(zkc, getUri()); + String zkPath = getUri().getPath() + "/" + bkdlConfig.getACLRootPath() + "/" + name; + ZKAccessControl accessControl = new ZKAccessControl(ace, zkPath); + accessControl.create(zkc); + + AccessControlManager acm = dlNamespace.createAccessControlManager(); + while (acm.allowWrite(name)) { + Thread.sleep(100); + } + + try { + Await.result(dlClient.dlClient.write(name, ByteBuffer.wrap("1".getBytes(UTF_8)))); + fail("Should fail with request denied exception"); + } catch (DLException dle) { + assertEquals(StatusCode.REQUEST_DENIED, dle.getCode()); + } + } + + @Test(timeout = 60000) + public void testNoneStreamNameRegex() throws Exception { + String streamNamePrefix = "none-stream-name-regex-"; + int numStreams = 5; + Set<String> streams = new HashSet<String>(); + + for (int i = 0; i < numStreams; i++) { + streams.add(streamNamePrefix + i); + } + testStreamNameRegex(streams, ".*", streams); + } + + @Test(timeout = 60000) + public void testStreamNameRegex() throws Exception { + String streamNamePrefix = "stream-name-regex-"; + int numStreams = 5; + Set<String> streams = new HashSet<String>(); + Set<String> expectedStreams = new HashSet<String>(); + String streamNameRegex = streamNamePrefix + "1"; + + for (int i = 0; i < numStreams; i++) { + streams.add(streamNamePrefix + i); + } + expectedStreams.add(streamNamePrefix + "1"); + + testStreamNameRegex(streams, streamNameRegex, expectedStreams); + } + + private void testStreamNameRegex(Set<String> streams, String streamNameRegex, + Set<String> expectedStreams) + throws Exception { + for (String streamName : streams) { + dlClient.routingService.addHost(streamName, dlServer.getAddress()); + Await.result(dlClient.dlClient.write(streamName, + ByteBuffer.wrap(streamName.getBytes(UTF_8)))); + } + + DLClient client = createDistributedLogClient( + "test-stream-name-regex", + streamNameRegex, + Optional.<String>absent()); + try { + client.routingService.addHost("unknown", dlServer.getAddress()); + client.handshake(); + Map<SocketAddress, Set<String>> distribution = + client.dlClient.getStreamOwnershipDistribution(); + assertEquals(1, distribution.size()); + Set<String> cachedStreams = distribution.values().iterator().next(); + assertNotNull(cachedStreams); + assertEquals(expectedStreams.size(), cachedStreams.size()); + + for (String streamName : cachedStreams) { + assertTrue(expectedStreams.contains(streamName)); + } + } finally { + client.shutdown(); + } + } + + @Test(timeout = 60000) + public void testReleaseStream() throws Exception { + String name = "dlserver-release-stream"; + + dlClient.routingService.addHost(name, dlServer.getAddress()); + + Await.result(dlClient.dlClient.write(name, ByteBuffer.wrap("1".getBytes(UTF_8)))); + checkStream(1, 1, 1, name, dlServer.getAddress(), true, true); + + // release the stream + Await.result(dlClient.dlClient.release(name)); + checkStream(0, 0, 0, name, dlServer.getAddress(), false, false); + } + + protected void checkStream(int expectedNumProxiesInClient, int expectedClientCacheSize, int expectedServerCacheSize, + String name, SocketAddress owner, boolean existedInServer, boolean existedInClient) { + Map<SocketAddress, Set<String>> distribution = dlClient.dlClient.getStreamOwnershipDistribution(); + assertEquals(expectedNumProxiesInClient, distribution.size()); + + if (expectedNumProxiesInClient > 0) { + Map.Entry<SocketAddress, Set<String>> localEntry = + distribution.entrySet().iterator().next(); + assertEquals(owner, localEntry.getKey()); + assertEquals(expectedClientCacheSize, localEntry.getValue().size()); + assertEquals(existedInClient, localEntry.getValue().contains(name)); + } + + + StreamManagerImpl streamManager = (StreamManagerImpl) dlServer.dlServer.getKey().getStreamManager(); + Set<String> cachedStreams = streamManager.getCachedStreams().keySet(); + Set<String> acquiredStreams = streamManager.getCachedStreams().keySet(); + + assertEquals(expectedServerCacheSize, cachedStreams.size()); + assertEquals(existedInServer, cachedStreams.contains(name)); + assertEquals(expectedServerCacheSize, acquiredStreams.size()); + assertEquals(existedInServer, acquiredStreams.contains(name)); + } + +}
