DL-157: resource placement for write proxy
Project: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/commit/0591d067 Tree: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/tree/0591d067 Diff: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/diff/0591d067 Branch: refs/heads/master Commit: 0591d067f05617bc534a662b6f9a014192cbe3a5 Parents: 34fa16b Author: Jordan Bull <jb...@twitter.com> Authored: Tue Dec 13 11:11:03 2016 -0800 Committer: Sijie Guo <sij...@twitter.com> Committed: Thu Dec 29 02:10:32 2016 -0800 ---------------------------------------------------------------------- .../distributedlog/benchmark/Benchmarker.java | 16 +- .../distributedlog/benchmark/WriterWorker.java | 6 +- .../client/DistributedLogClientImpl.java | 25 +-- .../service/DistributedLogClientBuilder.java | 1 + .../BKDistributedLogNamespace.java | 8 +- distributedlog-service/pom.xml | 39 ++++ .../service/DistributedLogCluster.java | 7 +- .../service/DistributedLogServer.java | 96 ++++++---- .../service/DistributedLogServerApp.java | 7 +- .../service/DistributedLogServiceImpl.java | 90 ++++----- .../service/config/ServerConfiguration.java | 12 ++ .../service/placement/EqualLoadAppraiser.java | 37 ++++ .../placement/LeastLoadPlacementPolicy.java | 192 +++++++++++++++++++ .../service/placement/LoadAppraiser.java | 25 +++ .../service/placement/PlacementPolicy.java | 148 ++++++++++++++ .../placement/PlacementStateManager.java | 65 +++++++ .../service/placement/ServerLoad.java | 152 +++++++++++++++ .../service/placement/StreamLoad.java | 109 +++++++++++ .../placement/ZKPlacementStateManager.java | 172 +++++++++++++++++ .../src/main/thrift/metadata.thrift | 29 +++ .../service/TestDistributedLogService.java | 48 ++--- .../placement/TestLeastLoadPlacementPolicy.java | 160 ++++++++++++++++ .../service/placement/TestServerLoad.java | 48 +++++ .../service/placement/TestStreamLoad.java | 35 ++++ .../placement/TestZKPlacementStateManager.java | 123 ++++++++++++ 25 files changed, 1516 insertions(+), 134 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/0591d067/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/Benchmarker.java ---------------------------------------------------------------------- diff --git a/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/Benchmarker.java b/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/Benchmarker.java index 87d3b53..5b04a05 100644 --- a/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/Benchmarker.java +++ b/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/Benchmarker.java @@ -85,6 +85,7 @@ public class Benchmarker { boolean enableBatching = false; int batchBufferSize = 256 * 1024; int batchFlushIntervalMicros = 2000; + String routingServiceFinagleNameString; final DistributedLogConfiguration conf = new DistributedLogConfiguration(); final StatsReceiver statsReceiver = new OstrichStatsReceiver(); @@ -125,6 +126,7 @@ public class Benchmarker { options.addOption("bt", "enable-batch", false, "Enable batching on writers"); options.addOption("bbs", "batch-buffer-size", true, "The batch buffer size in bytes"); options.addOption("bfi", "batch-flush-interval", true, "The batch buffer flush interval in micros"); + options.addOption("rs", "routing-service", true, "The routing service finagle name for server-side routing"); options.addOption("h", "help", false, "Print usage."); } @@ -221,6 +223,9 @@ public class Benchmarker { if (cmdline.hasOption("rb")) { recvBufferSize = Integer.parseInt(cmdline.getOptionValue("rb")); } + if (cmdline.hasOption("rs")) { + routingServiceFinagleNameString = cmdline.getOptionValue("rs"); + } thriftmux = cmdline.hasOption("mx"); handshakeWithClientInfo = cmdline.hasOption("hsci"); readFromHead = cmdline.hasOption("rfh"); @@ -311,7 +316,8 @@ public class Benchmarker { recvBufferSize, enableBatching, batchBufferSize, - batchFlushIntervalMicros); + batchFlushIntervalMicros, + routingServiceFinagleNameString); } protected WriterWorker createWriteWorker( @@ -335,7 +341,8 @@ public class Benchmarker { int recvBufferSize, boolean enableBatching, int batchBufferSize, - int batchFlushIntervalMicros) { + int batchFlushIntervalMicros, + String routingServiceFinagleNameString) { return new WriterWorker( streamPrefix, uri, @@ -357,7 +364,8 @@ public class Benchmarker { recvBufferSize, enableBatching, batchBufferSize, - batchFlushIntervalMicros); + batchFlushIntervalMicros, + routingServiceFinagleNameString); } Worker runDLWriter() throws IOException { @@ -453,7 +461,7 @@ public class Benchmarker { try { benchmarker.run(); } catch (Exception e) { - logger.info("Benchmark quitted due to : ", e); + logger.info("Benchmark quit due to : ", e); } } http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/0591d067/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/WriterWorker.java ---------------------------------------------------------------------- diff --git a/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/WriterWorker.java b/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/WriterWorker.java index 46229b3..dc5a6e2 100644 --- a/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/WriterWorker.java +++ b/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/WriterWorker.java @@ -81,6 +81,7 @@ public class WriterWorker implements Worker { final boolean enableBatching; final int batchBufferSize; final int batchFlushIntervalMicros; + private final String routingServiceFinagleName; volatile boolean running = true; @@ -113,7 +114,8 @@ public class WriterWorker implements Worker { int recvBufferSize, boolean enableBatching, int batchBufferSize, - int batchFlushIntervalMicros) { + int batchFlushIntervalMicros, + String routingServiceFinagleName) { checkArgument(startStreamId <= endStreamId); checkArgument(!finagleNames.isEmpty() || !serverSetPaths.isEmpty()); this.streamPrefix = streamPrefix; @@ -143,6 +145,7 @@ public class WriterWorker implements Worker { this.finagleNames = finagleNames; this.serverSets = createServerSets(serverSetPaths); this.executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()); + this.routingServiceFinagleName = routingServiceFinagleName; // Streams streamNames = new ArrayList<String>(endStreamId - startStreamId); @@ -197,6 +200,7 @@ public class WriterWorker implements Worker { .periodicOwnershipSyncIntervalMs(TimeUnit.MINUTES.toMillis(5)) .periodicDumpOwnershipCache(true) .handshakeTracing(true) + .serverRoutingServiceFinagleNameStr(routingServiceFinagleName) .name("writer"); if (!finagleNames.isEmpty()) { http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/0591d067/distributedlog-client/src/main/java/com/twitter/distributedlog/client/DistributedLogClientImpl.java ---------------------------------------------------------------------- diff --git a/distributedlog-client/src/main/java/com/twitter/distributedlog/client/DistributedLogClientImpl.java b/distributedlog-client/src/main/java/com/twitter/distributedlog/client/DistributedLogClientImpl.java index 634afe1..1077cd0 100644 --- a/distributedlog-client/src/main/java/com/twitter/distributedlog/client/DistributedLogClientImpl.java +++ b/distributedlog-client/src/main/java/com/twitter/distributedlog/client/DistributedLogClientImpl.java @@ -73,6 +73,7 @@ import com.twitter.util.Promise; import com.twitter.util.Return; import com.twitter.util.Throw; import java.io.IOException; +import java.net.InetSocketAddress; import java.net.SocketAddress; import java.nio.ByteBuffer; import java.util.ArrayList; @@ -852,18 +853,18 @@ public class DistributedLogClientImpl implements DistributedLogClient, MonitorSe } } - private void retryGetOwnerFromRoutingServer(final StreamOp op, + private void retryGetOwnerFromResourcePlacementServer(final StreamOp op, final Promise<SocketAddress> getOwnerPromise, final Throwable cause) { if (op.shouldTimeout()) { op.fail(null, cause); return; } - getOwnerFromRoutingServer(op, getOwnerPromise); + getOwnerFromResourcePlacementServer(op, getOwnerPromise); } - private void getOwnerFromRoutingServer(final StreamOp op, - final Promise<SocketAddress> getOwnerPromise) { + private void getOwnerFromResourcePlacementServer(final StreamOp op, + final Promise<SocketAddress> getOwnerPromise) { clusterClient.get().getService().getOwner(op.stream, op.ctx) .addEventListener(new FutureEventListener<WriteResponse>() { @Override @@ -875,18 +876,20 @@ public class DistributedLogClientImpl implements DistributedLogClient, MonitorSe public void onSuccess(WriteResponse value) { if (StatusCode.FOUND == value.getHeader().getCode() && null != value.getHeader().getLocation()) { - SocketAddress addr; try { - addr = DLSocketAddress.deserialize(value.getHeader().getLocation()).getSocketAddress(); + InetSocketAddress addr = DLSocketAddress.deserialize( + value.getHeader().getLocation() + ).getSocketAddress(); + getOwnerPromise.updateIfEmpty(new Return<SocketAddress>(addr)); } catch (IOException e) { // retry from the routing server again - retryGetOwnerFromRoutingServer(op, getOwnerPromise, e); + logger.error("ERROR in getOwner", e); + retryGetOwnerFromResourcePlacementServer(op, getOwnerPromise, e); return; } - getOwnerPromise.updateIfEmpty(new Return<SocketAddress>(addr)); } else { // retry from the routing server again - retryGetOwnerFromRoutingServer(op, getOwnerPromise, + retryGetOwnerFromResourcePlacementServer(op, getOwnerPromise, new StreamUnavailableException("Stream " + op.stream + "'s owner is unknown")); } } @@ -896,7 +899,7 @@ public class DistributedLogClientImpl implements DistributedLogClient, MonitorSe private Future<SocketAddress> getOwner(final StreamOp op) { if (clusterClient.isPresent()) { final Promise<SocketAddress> getOwnerPromise = new Promise<SocketAddress>(); - getOwnerFromRoutingServer(op, getOwnerPromise); + getOwnerFromResourcePlacementServer(op, getOwnerPromise); return getOwnerPromise; } // pickup host by hashing @@ -1190,7 +1193,7 @@ public class DistributedLogClientImpl implements DistributedLogClient, MonitorSe ownershipCache.updateOwner(stream, ownerAddr); } catch (IOException e) { logger.warn("Invalid ownership {} found for stream {} : ", - new Object[] { location, stream, e }); + new Object[] { location, stream, e }); } } http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/0591d067/distributedlog-client/src/main/java/com/twitter/distributedlog/service/DistributedLogClientBuilder.java ---------------------------------------------------------------------- diff --git a/distributedlog-client/src/main/java/com/twitter/distributedlog/service/DistributedLogClientBuilder.java b/distributedlog-client/src/main/java/com/twitter/distributedlog/service/DistributedLogClientBuilder.java index 44d93ee..3f65aff 100644 --- a/distributedlog-client/src/main/java/com/twitter/distributedlog/service/DistributedLogClientBuilder.java +++ b/distributedlog-client/src/main/java/com/twitter/distributedlog/service/DistributedLogClientBuilder.java @@ -96,6 +96,7 @@ public final class DistributedLogClientBuilder { newBuilder.statsReceiver = builder.statsReceiver; newBuilder.streamStatsReceiver = builder.streamStatsReceiver; newBuilder.enableRegionStats = builder.enableRegionStats; + newBuilder.serverRoutingServiceFinagleName = builder.serverRoutingServiceFinagleName; newBuilder.clientConfig = ClientConfig.newConfig(builder.clientConfig); return newBuilder; } http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/0591d067/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogNamespace.java ---------------------------------------------------------------------- diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogNamespace.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogNamespace.java index 2c9fe44..e7f29cc 100644 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogNamespace.java +++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogNamespace.java @@ -609,10 +609,10 @@ public class BKDistributedLogNamespace implements DistributedLogNamespace { return rootPath; } - private static ZooKeeperClientBuilder createDLZKClientBuilder(String zkcName, - DistributedLogConfiguration conf, - String zkServers, - StatsLogger statsLogger) { + public static ZooKeeperClientBuilder createDLZKClientBuilder(String zkcName, + DistributedLogConfiguration conf, + String zkServers, + StatsLogger statsLogger) { RetryPolicy retryPolicy = null; if (conf.getZKNumRetries() > 0) { retryPolicy = new BoundExponentialBackoffRetryPolicy( http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/0591d067/distributedlog-service/pom.xml ---------------------------------------------------------------------- diff --git a/distributedlog-service/pom.xml b/distributedlog-service/pom.xml index b7b6ff8..e74d486 100644 --- a/distributedlog-service/pom.xml +++ b/distributedlog-service/pom.xml @@ -117,10 +117,49 @@ <artifactId>jetty-servlet</artifactId> <version>${jetty.version}</version> </dependency> + <dependency> + <groupId>org.apache.thrift</groupId> + <artifactId>libthrift</artifactId> + <version>0.5.0-1</version> + </dependency> + <dependency> + <groupId>com.twitter</groupId> + <artifactId>scrooge-core_2.11</artifactId> + <version>${scrooge.version}</version> + </dependency> + <dependency> + <groupId>org.apache.curator</groupId> + <artifactId>curator-test</artifactId> + <version>2.2.0-incubating</version> + <scope>test</scope> + <exclusions> + <exclusion> + <groupId>org.apache.zookeeper</groupId> + <artifactId>zookeeper</artifactId> + </exclusion> + </exclusions> + </dependency> </dependencies> <build> <plugins> <plugin> + <groupId>com.twitter</groupId> + <artifactId>scrooge-maven-plugin</artifactId> + <version>${scrooge-maven-plugin.version}</version> + <configuration> + <language>java</language> + </configuration> + <executions> + <execution> + <id>thrift-sources</id> + <phase>generate-sources</phase> + <goals> + <goal>compile</goal> + </goals> + </execution> + </executions> + </plugin> + <plugin> <artifactId>maven-assembly-plugin</artifactId> <version>2.2.1</version> <configuration> http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/0591d067/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogCluster.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogCluster.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogCluster.java index 0ce335b..3225ced 100644 --- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogCluster.java +++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogCluster.java @@ -22,6 +22,7 @@ import com.twitter.distributedlog.LocalDLMEmulator; import com.twitter.distributedlog.client.routing.SingleHostRoutingService; import com.twitter.distributedlog.metadata.BKDLConfig; import com.twitter.distributedlog.metadata.DLMetadata; +import com.twitter.distributedlog.service.placement.EqualLoadAppraiser; import com.twitter.distributedlog.service.streamset.IdentityStreamPartitionConverter; import com.twitter.finagle.builder.Server; import org.apache.bookkeeper.conf.ServerConfiguration; @@ -228,9 +229,11 @@ public class DistributedLogCluster { routingService, new NullStatsProvider(), proxyPort, - thriftmux); + thriftmux, + new EqualLoadAppraiser()); routingService.setAddress(DLSocketAddress.getSocketAddress(proxyPort)); routingService.startService(); + serverPair.getLeft().startPlacementPolicy(); success = true; } catch (BindException be) { retries++; @@ -244,7 +247,7 @@ public class DistributedLogCluster { } } - LOG.info("Runnning DL on port {}", proxyPort); + LOG.info("Running DL on port {}", proxyPort); dlServer = serverPair; address = DLSocketAddress.getSocketAddress(proxyPort); http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/0591d067/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServer.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServer.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServer.java index 185ea82..a9ba125 100644 --- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServer.java +++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServer.java @@ -17,8 +17,32 @@ */ package com.twitter.distributedlog.service; +import java.io.File; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.MalformedURLException; +import java.net.URI; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import scala.Option; +import scala.Tuple2; + import com.google.common.base.Optional; import com.google.common.util.concurrent.ThreadFactoryBuilder; + +import org.apache.bookkeeper.stats.NullStatsLogger; +import org.apache.bookkeeper.stats.StatsLogger; +import org.apache.bookkeeper.stats.StatsProvider; +import org.apache.bookkeeper.util.ReflectionUtils; +import org.apache.commons.configuration.ConfigurationException; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.thrift.protocol.TBinaryProtocol; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import com.twitter.distributedlog.DistributedLogConfiguration; import com.twitter.distributedlog.client.routing.RoutingService; import com.twitter.distributedlog.config.DynamicConfigurationFactory; @@ -31,6 +55,8 @@ import com.twitter.distributedlog.service.config.NullStreamConfigProvider; import com.twitter.distributedlog.service.config.ServerConfiguration; import com.twitter.distributedlog.service.config.ServiceStreamConfigProvider; import com.twitter.distributedlog.service.config.StreamConfigProvider; +import com.twitter.distributedlog.service.placement.EqualLoadAppraiser; +import com.twitter.distributedlog.service.placement.LoadAppraiser; import com.twitter.distributedlog.service.streamset.IdentityStreamPartitionConverter; import com.twitter.distributedlog.service.streamset.StreamPartitionConverter; import com.twitter.distributedlog.thrift.service.DistributedLogService; @@ -46,31 +72,11 @@ import com.twitter.finagle.thrift.ClientIdRequiredFilter; import com.twitter.finagle.thrift.ThriftServerFramedCodec; import com.twitter.finagle.transport.Transport; import com.twitter.util.Duration; -import org.apache.bookkeeper.stats.NullStatsLogger; -import org.apache.bookkeeper.stats.StatsLogger; -import org.apache.bookkeeper.stats.StatsProvider; -import org.apache.bookkeeper.util.ReflectionUtils; -import org.apache.commons.configuration.ConfigurationException; -import org.apache.commons.lang3.tuple.Pair; -import org.apache.thrift.protocol.TBinaryProtocol; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import scala.Option; -import scala.Tuple2; - -import java.io.File; -import java.io.IOException; -import java.net.InetSocketAddress; -import java.net.MalformedURLException; -import java.net.URI; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; public class DistributedLogServer { static final Logger logger = LoggerFactory.getLogger(DistributedLogServer.class); + private static final String DEFAULT_LOAD_APPRIASER = EqualLoadAppraiser.class.getCanonicalName(); private DistributedLogServiceImpl dlService = null; private Server server = null; @@ -89,6 +95,7 @@ public class DistributedLogServer { private final Optional<Integer> statsPort; private final Optional<Integer> shardId; private final Optional<Boolean> announceServerSet; + private final Optional<String> loadAppraiserClassStr; private final Optional<Boolean> thriftmux; DistributedLogServer(Optional<String> uri, @@ -98,6 +105,7 @@ public class DistributedLogServer { Optional<Integer> statsPort, Optional<Integer> shardId, Optional<Boolean> announceServerSet, + Optional<String> loadAppraiserClass, Optional<Boolean> thriftmux, RoutingService routingService, StatsReceiver statsReceiver, @@ -113,9 +121,10 @@ public class DistributedLogServer { this.routingService = routingService; this.statsReceiver = statsReceiver; this.statsProvider = statsProvider; + this.loadAppraiserClassStr = loadAppraiserClass; } - public void runServer() throws ConfigurationException, IllegalArgumentException, IOException { + public void runServer() throws ConfigurationException, IllegalArgumentException, IOException, ClassNotFoundException { if (!uri.isPresent()) { throw new IllegalArgumentException("No distributedlog uri provided."); } @@ -174,6 +183,9 @@ public class DistributedLogServer { IdentityStreamPartitionConverter.class.getName()); converter = new IdentityStreamPartitionConverter(); } + Class loadAppraiserClass = Class.forName(loadAppraiserClassStr.or(DEFAULT_LOAD_APPRIASER)); + LoadAppraiser loadAppraiser = (LoadAppraiser) ReflectionUtils.newInstance(loadAppraiserClass); + logger.info("Supplied load appraiser class is " + loadAppraiserClassStr.get() + " Instantiated " + loadAppraiser.getClass().getCanonicalName()); StreamConfigProvider streamConfProvider = getStreamConfigProvider(dlConf, converter); @@ -193,7 +205,8 @@ public class DistributedLogServer { keepAliveLatch, statsReceiver, thriftmux.isPresent(), - streamConfProvider); + streamConfProvider, + loadAppraiser); this.dlService = serverPair.getLeft(); this.server = serverPair.getRight(); @@ -203,6 +216,8 @@ public class DistributedLogServer { // start the routing service after announced routingService.startService(); logger.info("Started the routing service."); + dlService.startPlacementPolicy(); + logger.info("Started the placement policy."); } protected void preRun(DistributedLogConfiguration conf, ServerConfiguration serverConf) { @@ -256,7 +271,8 @@ public class DistributedLogServer { RoutingService routingService, StatsProvider provider, int port, - boolean thriftmux) throws IOException { + boolean thriftmux, + LoadAppraiser loadAppraiser) throws IOException { return runServer(serverConf, dlConf, @@ -269,7 +285,8 @@ public class DistributedLogServer { new CountDownLatch(0), new NullStatsReceiver(), thriftmux, - new NullStreamConfigProvider()); + new NullStreamConfigProvider(), + loadAppraiser); } static Pair<DistributedLogServiceImpl, Server> runServer( @@ -284,7 +301,8 @@ public class DistributedLogServer { CountDownLatch keepAliveLatch, StatsReceiver statsReceiver, boolean thriftmux, - StreamConfigProvider streamConfProvider) throws IOException { + StreamConfigProvider streamConfProvider, + LoadAppraiser loadAppraiser) throws IOException { logger.info("Running server @ uri {}.", dlUri); boolean perStreamStatsEnabled = serverConf.isPerStreamStatEnabled(); @@ -297,16 +315,17 @@ public class DistributedLogServer { // dl service DistributedLogServiceImpl dlService = new DistributedLogServiceImpl( - serverConf, - dlConf, - dynDlConf, - streamConfProvider, - dlUri, - partitionConverter, - routingService, - provider.getStatsLogger(""), - perStreamStatsLogger, - keepAliveLatch); + serverConf, + dlConf, + dynDlConf, + streamConfProvider, + dlUri, + partitionConverter, + routingService, + provider.getStatsLogger(""), + perStreamStatsLogger, + keepAliveLatch, + loadAppraiser); StatsReceiver serviceStatsReceiver = statsReceiver.scope("service"); StatsLogger serviceStatsLogger = provider.getStatsLogger("service"); @@ -400,6 +419,7 @@ public class DistributedLogServer { * @throws ConfigurationException * @throws IllegalArgumentException * @throws IOException + * @throws ClassNotFoundException */ public static DistributedLogServer runServer( Optional<String> uri, @@ -409,11 +429,12 @@ public class DistributedLogServer { Optional<Integer> statsPort, Optional<Integer> shardId, Optional<Boolean> announceServerSet, + Optional<String> loadAppraiserClass, Optional<Boolean> thriftmux, RoutingService routingService, StatsReceiver statsReceiver, StatsProvider statsProvider) - throws ConfigurationException, IllegalArgumentException, IOException { + throws ConfigurationException, IllegalArgumentException, IOException, ClassNotFoundException { final DistributedLogServer server = new DistributedLogServer( uri, @@ -423,6 +444,7 @@ public class DistributedLogServer { statsPort, shardId, announceServerSet, + loadAppraiserClass, thriftmux, routingService, statsReceiver, http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/0591d067/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServerApp.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServerApp.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServerApp.java index af36307..1c3d8d4 100644 --- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServerApp.java +++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServerApp.java @@ -68,6 +68,7 @@ public class DistributedLogServerApp { options.addOption("pd", "stats-provider", true, "DistributedLog Stats Provider"); options.addOption("si", "shard-id", true, "DistributedLog Shard ID"); options.addOption("a", "announce", false, "ServerSet Path to Announce"); + options.addOption("la", "load-appraiser", true, "LoadAppraiser Implementation to Use"); options.addOption("mx", "thriftmux", false, "Is thriftmux enabled"); } @@ -97,10 +98,13 @@ public class DistributedLogServerApp { } catch (IOException ie) { logger.error("Failed to start distributedlog server : ", ie); Runtime.getRuntime().exit(-1); + } catch (ClassNotFoundException cnf) { + logger.error("Failed to start distributedlog server : ", cnf); + Runtime.getRuntime().exit(-1); } } - private void runCmd(CommandLine cmdline) throws IllegalArgumentException, IOException, ConfigurationException { + private void runCmd(CommandLine cmdline) throws IllegalArgumentException, IOException, ConfigurationException, ClassNotFoundException { final StatsReceiver statsReceiver = NullStatsReceiver.get(); Optional<String> confOptional = getOptionalStringArg(cmdline, "c"); DistributedLogConfiguration dlConf = new DistributedLogConfiguration(); @@ -142,6 +146,7 @@ public class DistributedLogServerApp { getOptionalIntegerArg(cmdline, "sp"), getOptionalIntegerArg(cmdline, "si"), getOptionalBooleanArg(cmdline, "a"), + getOptionalStringArg(cmdline, "la"), getOptionalBooleanArg(cmdline, "mx"), routingService, statsReceiver, http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/0591d067/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServiceImpl.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServiceImpl.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServiceImpl.java index 5c5b5af..e7974c7 100644 --- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServiceImpl.java +++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServiceImpl.java @@ -21,6 +21,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Optional; import com.google.common.base.Stopwatch; import com.google.common.util.concurrent.ThreadFactoryBuilder; +import com.twitter.common.net.InetSocketAddressHelper; import com.twitter.distributedlog.DLSN; import com.twitter.distributedlog.DistributedLogConfiguration; import com.twitter.distributedlog.acl.AccessControlManager; @@ -36,8 +37,14 @@ import com.twitter.distributedlog.exceptions.TooManyStreamsException; import com.twitter.distributedlog.feature.AbstractFeatureProvider; import com.twitter.distributedlog.namespace.DistributedLogNamespace; import com.twitter.distributedlog.namespace.DistributedLogNamespaceBuilder; +import com.twitter.distributedlog.rate.MovingAverageRate; +import com.twitter.distributedlog.rate.MovingAverageRateFactory; import com.twitter.distributedlog.service.config.ServerConfiguration; import com.twitter.distributedlog.service.config.StreamConfigProvider; +import com.twitter.distributedlog.service.placement.LeastLoadPlacementPolicy; +import com.twitter.distributedlog.service.placement.LoadAppraiser; +import com.twitter.distributedlog.service.placement.PlacementPolicy; +import com.twitter.distributedlog.service.placement.ZKPlacementStateManager; import com.twitter.distributedlog.service.stream.BulkWriteOp; import com.twitter.distributedlog.service.stream.DeleteOp; import com.twitter.distributedlog.service.stream.admin.CreateOp; @@ -67,32 +74,19 @@ import com.twitter.distributedlog.thrift.service.ServerStatus; import com.twitter.distributedlog.thrift.service.StatusCode; import com.twitter.distributedlog.thrift.service.WriteContext; import com.twitter.distributedlog.thrift.service.WriteResponse; -import com.twitter.distributedlog.rate.MovingAverageRateFactory; -import com.twitter.distributedlog.rate.MovingAverageRate; import com.twitter.distributedlog.util.ConfUtils; import com.twitter.distributedlog.util.OrderedScheduler; import com.twitter.distributedlog.util.SchedulerUtils; -import com.twitter.finagle.NoBrokersAvailableException; import com.twitter.util.Await; import com.twitter.util.Duration; +import com.twitter.util.Function; import com.twitter.util.Function0; import com.twitter.util.Future; import com.twitter.util.FutureEventListener; -import com.twitter.util.Timer; import com.twitter.util.ScheduledThreadPoolTimer; -import org.apache.bookkeeper.feature.Feature; -import org.apache.bookkeeper.feature.FeatureProvider; -import org.apache.bookkeeper.stats.Counter; -import org.apache.bookkeeper.stats.Gauge; -import org.apache.bookkeeper.stats.StatsLogger; -import org.jboss.netty.util.HashedWheelTimer; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import scala.runtime.BoxedUnit; +import com.twitter.util.Timer; import java.io.IOException; -import java.net.InetSocketAddress; -import java.net.SocketAddress; import java.net.URI; import java.nio.ByteBuffer; import java.util.List; @@ -102,6 +96,17 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReentrantReadWriteLock; +import org.apache.bookkeeper.feature.Feature; +import org.apache.bookkeeper.feature.FeatureProvider; +import org.apache.bookkeeper.stats.Counter; +import org.apache.bookkeeper.stats.Gauge; +import org.apache.bookkeeper.stats.StatsLogger; +import org.jboss.netty.util.HashedWheelTimer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import scala.runtime.BoxedUnit; + public class DistributedLogServiceImpl implements DistributedLogService.ServiceIface, FatalErrorHandler { @@ -113,6 +118,7 @@ public class DistributedLogServiceImpl implements DistributedLogService.ServiceI private final DistributedLogConfiguration dlConfig; private final DistributedLogNamespace dlNamespace; private final int serverRegionId; + private final PlacementPolicy placementPolicy; private ServerStatus serverStatus = ServerStatus.WRITE_AND_ACCEPT; private final ReentrantReadWriteLock closeLock = new ReentrantReadWriteLock(); @@ -157,6 +163,7 @@ public class DistributedLogServiceImpl implements DistributedLogService.ServiceI private final Gauge<Number> movingAvgBpsGauge; private final Gauge<Number> streamAcquiredGauge; private final Gauge<Number> streamCachedGauge; + private final int shard; DistributedLogServiceImpl(ServerConfiguration serverConf, DistributedLogConfiguration dlConf, @@ -167,7 +174,8 @@ public class DistributedLogServiceImpl implements DistributedLogService.ServiceI RoutingService routingService, StatsLogger statsLogger, StatsLogger perStreamStatsLogger, - CountDownLatch keepAliveLatch) + CountDownLatch keepAliveLatch, + LoadAppraiser loadAppraiser) throws IOException { // Configuration. this.serverConfig = serverConf; @@ -177,7 +185,7 @@ public class DistributedLogServiceImpl implements DistributedLogService.ServiceI this.serverRegionId = serverConf.getRegionId(); this.streamPartitionConverter = converter; int serverPort = serverConf.getServerPort(); - int shard = serverConf.getServerShardId(); + this.shard = serverConf.getServerShardId(); int numThreads = serverConf.getServerThreads(); this.clientId = DLSocketAddress.toLockId(DLSocketAddress.getSocketAddress(serverPort), shard); String allocatorPoolName = ServerUtils.getLedgerAllocatorPoolName( @@ -264,6 +272,15 @@ public class DistributedLogServiceImpl implements DistributedLogService.ServiceI streamManager, limiterDisabledFeature); + this.placementPolicy = new LeastLoadPlacementPolicy( + loadAppraiser, + routingService, + dlNamespace, + new ZKPlacementStateManager(uri, dlConf, statsLogger), + Duration.fromSeconds(serverConf.getResourcePlacementRefreshInterval()), + statsLogger); + logger.info("placement started"); + // Stats this.statsLogger = statsLogger; @@ -501,35 +518,13 @@ public class DistributedLogServiceImpl implements DistributedLogService.ServiceI return Future.value(new WriteResponse(ResponseUtils.ownerToHeader(clientId))); } - Stream stream = streamManager.getStream(streamName); - String owner; - if (null != stream && null != (owner = stream.getOwner())) { - return Future.value(new WriteResponse(ResponseUtils.ownerToHeader(owner))); - } - - RoutingService.RoutingContext routingContext = RoutingService.RoutingContext.of(regionResolver); - - if (ctx.isSetTriedHosts()) { - for (String triedHost : ctx.getTriedHosts()) { - routingContext.addTriedHost( - DLSocketAddress.parseSocketAddress(triedHost), StatusCode.STREAM_UNAVAILABLE); - } - } - - try { - SocketAddress host = routingService.getHost(streamName, routingContext); - if (host instanceof InetSocketAddress) { - // use shard id '-1' as the shard id here won't be used for redirection - return Future.value(new WriteResponse( - ResponseUtils.ownerToHeader(DLSocketAddress.toLockId((InetSocketAddress) host, -1)))); - } else { - return Future.value(new WriteResponse( - ResponseUtils.streamUnavailableHeader())); + return placementPolicy.placeStream(streamName).map(new Function<String, WriteResponse>() { + @Override + public WriteResponse apply(String server) { + String host = DLSocketAddress.toLockId(InetSocketAddressHelper.parse(server), -1); + return new WriteResponse(ResponseUtils.ownerToHeader(host)); } - } catch (NoBrokersAvailableException e) { - return Future.value(new WriteResponse( - ResponseUtils.streamUnavailableHeader())); - } + }); } @@ -689,6 +684,7 @@ public class DistributedLogServiceImpl implements DistributedLogService.ServiceI // Stop the timer. timer.stop(); + placementPolicy.close(); // clean up gauge unregisterGauge(); @@ -704,6 +700,10 @@ public class DistributedLogServiceImpl implements DistributedLogService.ServiceI } } + protected void startPlacementPolicy() { + this.placementPolicy.start(shard == 0); + } + @Override public void notifyFatalError() { triggerShutdown(); http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/0591d067/distributedlog-service/src/main/java/com/twitter/distributedlog/service/config/ServerConfiguration.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/config/ServerConfiguration.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/config/ServerConfiguration.java index 9a9e83c..5b19f6c 100644 --- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/config/ServerConfiguration.java +++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/config/ServerConfiguration.java @@ -95,6 +95,9 @@ public class ServerConfiguration extends CompositeConfiguration { protected static final String SERVER_USE_HOSTNAME_AS_ALLOCATOR_POOL_NAME = "server_use_hostname_as_allocator_pool_name"; protected static final boolean SERVER_USE_HOSTNAME_AS_ALLOCATOR_POOL_NAME_DEFAULT = false; + //Configure refresh interval for calculating resource placement in seconds + public static final String SERVER_RESOURCE_PLACEMENT_REFRESH_INTERVAL_S = "server_resource_placement_refresh_interval_sec"; + public static final int SERVER_RESOURCE_PLACEMENT_REFRESH_INTERVAL_DEFAULT = 120; public ServerConfiguration() { super(); @@ -399,6 +402,15 @@ public class ServerConfiguration extends CompositeConfiguration { SERVER_USE_HOSTNAME_AS_ALLOCATOR_POOL_NAME_DEFAULT); } + public ServerConfiguration setResourcePlacementRefreshInterval(int refreshIntervalSecs) { + setProperty(SERVER_RESOURCE_PLACEMENT_REFRESH_INTERVAL_S, refreshIntervalSecs); + return this; + } + + public int getResourcePlacementRefreshInterval() { + return getInt(SERVER_RESOURCE_PLACEMENT_REFRESH_INTERVAL_S, SERVER_RESOURCE_PLACEMENT_REFRESH_INTERVAL_DEFAULT); + } + /** * Validate the configuration */ http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/0591d067/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/EqualLoadAppraiser.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/EqualLoadAppraiser.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/EqualLoadAppraiser.java new file mode 100644 index 0000000..144e358 --- /dev/null +++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/EqualLoadAppraiser.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.twitter.distributedlog.service.placement; + +import com.twitter.util.Future; + +/** + * Created for those who hold these truths to be self-evident, that all streams are created equal, + * that they are endowed by their creator with certain unalienable loads, that among these are + * Uno, Eins, and One. + */ +public class EqualLoadAppraiser implements LoadAppraiser { + @Override + public Future<StreamLoad> getStreamLoad(String stream) { + return Future.value(new StreamLoad(stream, 1)); + } + + @Override + public Future<Void> refreshCache() { + return Future.value(null); + } +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/0591d067/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/LeastLoadPlacementPolicy.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/LeastLoadPlacementPolicy.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/LeastLoadPlacementPolicy.java new file mode 100644 index 0000000..e4c8128 --- /dev/null +++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/LeastLoadPlacementPolicy.java @@ -0,0 +1,192 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.twitter.distributedlog.service.placement; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeSet; + +import scala.Function1; +import scala.runtime.BoxedUnit; + +import org.apache.bookkeeper.stats.Gauge; +import org.apache.bookkeeper.stats.Stats; +import org.apache.bookkeeper.stats.StatsLogger; +import org.slf4j.Logger; + +import com.twitter.distributedlog.client.routing.RoutingService; +import com.twitter.distributedlog.namespace.DistributedLogNamespace; +import com.twitter.util.Duration; +import com.twitter.util.Function; +import com.twitter.util.Future; +import com.twitter.util.Futures; + +/** + * A LoadPlacementPolicy that attempts to place streams in such a way that the load is balanced as + * evenly as possible across all shards. The LoadAppraiser remains responsible for determining what + * the load of a server would be. This placement policy then distributes these streams across the + * servers. + */ +public class LeastLoadPlacementPolicy extends PlacementPolicy { + private TreeSet<ServerLoad> serverLoads = new TreeSet<ServerLoad>(); + private Map<String, String> streamToServer = new HashMap<String, String>(); + + public LeastLoadPlacementPolicy(LoadAppraiser loadAppraiser, RoutingService routingService, + DistributedLogNamespace namespace, PlacementStateManager placementStateManager, + Duration refreshInterval, StatsLogger statsLogger) { + super(loadAppraiser, routingService, namespace, placementStateManager, refreshInterval, statsLogger); + statsLogger.registerGauge("placement/load.diff", new Gauge<Number>() { + @Override + public Number getDefaultValue() { + return 0; + } + + @Override + public Number getSample() { + if (serverLoads.size() > 0) { + return serverLoads.last().getLoad() - serverLoads.first().getLoad(); + } else { + return getDefaultValue(); + } + } + }); + } + + @Override + public Future<String> placeStream(String stream) { + if (streamToServer.containsKey(stream)) { + return Future.value(streamToServer.get(stream)); + } + Future<StreamLoad> streamLoadFuture = loadAppraiser.getStreamLoad(stream); + return streamLoadFuture.map(new Function<StreamLoad, String>() { + @Override + public String apply(StreamLoad streamLoad) { + return placeStreamSynchronized(streamLoad); + } + }); + } + + synchronized private String placeStreamSynchronized(StreamLoad streamLoad) { + ServerLoad serverLoad = serverLoads.pollFirst(); + serverLoad.addStream(streamLoad); + serverLoads.add(serverLoad); + return serverLoad.getServer(); + } + + @Override + public void refresh() { + logger.info("Refreshing server loads."); + Future<Void> refresh = loadAppraiser.refreshCache(); + final Set<String> servers = getServers(); + final Set<String> allStreams = getStreams(); + Future<TreeSet<ServerLoad>> serverLoadsFuture = refresh.flatMap(new Function<Void, Future<TreeSet<ServerLoad>>>() { + @Override + public Future<TreeSet<ServerLoad>> apply(Void v1) { + return calculate(servers, allStreams); + } + }); + serverLoadsFuture.map(new Function<TreeSet<ServerLoad>, BoxedUnit>() { + @Override + public BoxedUnit apply(TreeSet<ServerLoad> serverLoads) { + try { + updateServerLoads(serverLoads); + } catch (PlacementStateManager.StateManagerSaveException e) { + logger.error("The refreshed mapping could not be persisted and will not be used.", e); + } + return BoxedUnit.UNIT; + } + }); + } + + synchronized private void updateServerLoads(TreeSet<ServerLoad> serverLoads) throws PlacementStateManager.StateManagerSaveException { + this.placementStateManager.saveOwnership(serverLoads); + this.streamToServer = serverLoadsToMap(serverLoads); + this.serverLoads = serverLoads; + } + + @Override + synchronized public void load(TreeSet<ServerLoad> serverLoads) { + this.serverLoads = serverLoads; + this.streamToServer = serverLoadsToMap(serverLoads); + } + + public Future<TreeSet<ServerLoad>> calculate(final Set<String> servers, Set<String> streams) { + logger.info("Calculating server loads"); + final long startTime = System.currentTimeMillis(); + ArrayList<Future<StreamLoad>> futures = new ArrayList<Future<StreamLoad>>(streams.size()); + + for (String stream: streams) { + Future<StreamLoad> streamLoad = loadAppraiser.getStreamLoad(stream); + futures.add(streamLoad); + } + + return Futures.collect(futures).map(new Function<List<StreamLoad>, TreeSet<ServerLoad>>() { + @Override + public TreeSet<ServerLoad> apply(List<StreamLoad> streamLoads) { + /* Sort streamLoads so largest streams are placed first for better balance */ + TreeSet<StreamLoad> streamQueue = new TreeSet<StreamLoad>(); + for (StreamLoad streamLoad: streamLoads) { + streamQueue.add(streamLoad); + } + + TreeSet<ServerLoad> serverLoads = new TreeSet<ServerLoad>(); + for (String server: servers) { + ServerLoad serverLoad = new ServerLoad(server); + if (!streamQueue.isEmpty()) { + serverLoad.addStream(streamQueue.pollFirst()); + } + serverLoads.add(serverLoad); + } + + while (!streamQueue.isEmpty()) { + ServerLoad serverLoad = serverLoads.pollFirst(); + serverLoad.addStream(streamQueue.pollFirst()); + serverLoads.add(serverLoad); + } + return serverLoads; + } + }).onSuccess(new Function<TreeSet<ServerLoad>, BoxedUnit>() { + @Override + public BoxedUnit apply(TreeSet<ServerLoad> serverLoads) { + placementCalcStats.registerSuccessfulEvent(System.currentTimeMillis() - startTime); + return BoxedUnit.UNIT; + } + }).onFailure(new Function<Throwable, BoxedUnit>() { + @Override + public BoxedUnit apply(Throwable t) { + logger.error("Failure calculating loads", t); + placementCalcStats.registerFailedEvent(System.currentTimeMillis() - startTime); + return BoxedUnit.UNIT; + } + }); + } + + private static Map<String, String> serverLoadsToMap(Collection<ServerLoad> serverLoads) { + HashMap<String, String> streamToServer = new HashMap<String, String>(serverLoads.size()); + for (ServerLoad serverLoad: serverLoads) { + for (StreamLoad streamLoad: serverLoad.getStreamLoads()) { + streamToServer.put(streamLoad.getStream(), serverLoad.getServer()); + } + } + return streamToServer; + } +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/0591d067/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/LoadAppraiser.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/LoadAppraiser.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/LoadAppraiser.java new file mode 100644 index 0000000..784f106 --- /dev/null +++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/LoadAppraiser.java @@ -0,0 +1,25 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.twitter.distributedlog.service.placement; + +import com.twitter.util.Future; + +public interface LoadAppraiser { + Future<StreamLoad> getStreamLoad(String stream); + Future<Void> refreshCache(); +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/0591d067/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/PlacementPolicy.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/PlacementPolicy.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/PlacementPolicy.java new file mode 100644 index 0000000..2044428 --- /dev/null +++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/PlacementPolicy.java @@ -0,0 +1,148 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.twitter.distributedlog.service.placement; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Set; +import java.util.TreeSet; + +import scala.runtime.BoxedUnit; + +import org.apache.bookkeeper.stats.OpStatsLogger; +import org.apache.bookkeeper.stats.StatsLogger; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.twitter.distributedlog.client.routing.RoutingService; +import com.twitter.distributedlog.namespace.DistributedLogNamespace; +import com.twitter.distributedlog.service.DLSocketAddress; +import com.twitter.util.Duration; +import com.twitter.util.Function0; +import com.twitter.util.Future; +import com.twitter.util.ScheduledThreadPoolTimer; +import com.twitter.util.Time; +import com.twitter.util.Timer; + +/** + * A PlacementPolicy assigns streams to servers given an appraisal of the load that the stream + * contains. The load of a stream is determined by the LoadAppraiser used. The PlacementPolicy will + * then distributed these StreamLoads to the available servers in a manner defined by the + * implementation creating ServerLoad objects. It then saves this assignment via the + * PlacementStateManager. + */ +public abstract class PlacementPolicy { + protected final LoadAppraiser loadAppraiser; + protected final RoutingService routingService; + protected final DistributedLogNamespace namespace; + protected final PlacementStateManager placementStateManager; + private final Duration refreshInterval; + + protected static final Logger logger = LoggerFactory.getLogger(PlacementPolicy.class); + protected final OpStatsLogger placementCalcStats; + private Timer placementRefreshTimer; + + public PlacementPolicy(LoadAppraiser loadAppraiser, RoutingService routingService, + DistributedLogNamespace namespace, PlacementStateManager placementStateManager, + Duration refreshInterval, StatsLogger statsLogger) { + this.loadAppraiser = loadAppraiser; + this.routingService = routingService; + this.namespace = namespace; + this.placementStateManager = placementStateManager; + this.refreshInterval = refreshInterval; + placementCalcStats = statsLogger.getOpStatsLogger("placement"); + } + + public Set<String> getServers() { + Set<SocketAddress> hosts = routingService.getHosts(); + Set<String> servers = new HashSet<String>(hosts.size()); + for (SocketAddress address: hosts) { + servers.add(DLSocketAddress.toString((InetSocketAddress) address)); + } + return servers; + } + + public Set<String> getStreams() { + Set<String> streams = new HashSet<String>(); + try { + Iterator<String> logs = namespace.getLogs(); + while (logs.hasNext()) { + streams.add(logs.next()); + } + } catch (IOException e) { + logger.error("Could not get streams for placement policy.", e); + } + return streams; + } + + public void start(boolean leader) { + logger.info("Starting placement policy"); + + TreeSet<ServerLoad> emptyServerLoads = new TreeSet<ServerLoad>(); + for (String server: getServers()) { + emptyServerLoads.add(new ServerLoad(server)); + } + load(emptyServerLoads); //Pre-Load so streams don't NPE + if (leader) { //this is the leader shard + logger.info("Shard is leader. Scheduling timed refresh."); + placementRefreshTimer = new ScheduledThreadPoolTimer(1, "timer", true); + placementRefreshTimer.schedule(Time.now(), refreshInterval, new Function0<BoxedUnit>() { + @Override + public BoxedUnit apply() { + refresh(); + return BoxedUnit.UNIT; + } + }); + } else { + logger.info("Shard is not leader. Watching for server load changes."); + placementStateManager.watch(new PlacementStateManager.PlacementCallback() { + @Override + public void callback(TreeSet<ServerLoad> serverLoads) { + if (!serverLoads.isEmpty()) { + load(serverLoads); + } + } + }); + } + } + + public void close() { + if (placementRefreshTimer != null) { + placementRefreshTimer.stop(); + } + } + + /** + * Places the stream on a server according to the policy and returns a future contianing the + * host that owns the stream upon completion + */ + public abstract Future<String> placeStream(String stream); + + /** + * Recalculates the entire placement mapping and updates stores it using the PlacementStateManager + */ + public abstract void refresh(); + + /** + * Loads the placement mapping into the node from a TreeSet of ServerLoads + */ + public abstract void load(TreeSet<ServerLoad> serverLoads); +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/0591d067/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/PlacementStateManager.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/PlacementStateManager.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/PlacementStateManager.java new file mode 100644 index 0000000..cd0d906 --- /dev/null +++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/PlacementStateManager.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.twitter.distributedlog.service.placement; + +import java.util.TreeSet; + +/** + * The PlacementStateManager handles persistence of calculated resource placements including, the + * storage once the calculated, and the retrieval by the other shards. + */ +public interface PlacementStateManager { + + /** + * Saves the ownership mapping as a TreeSet of ServerLoads to persistent storage + */ + void saveOwnership(TreeSet<ServerLoad> serverLoads) throws StateManagerSaveException; + + /** + * Loads the ownership mapping as TreeSet of ServerLoads from persistent storage + */ + TreeSet<ServerLoad> loadOwnership() throws StateManagerLoadException; + + /** + * Watch the persistent storage for changes to the ownership mapping and calls placementCallback + * with the new mapping when a change occurs + */ + void watch(PlacementCallback placementCallback); + + interface PlacementCallback { + void callback(TreeSet<ServerLoad> serverLoads); + } + + abstract class StateManagerException extends Exception { + public StateManagerException(String message, Exception e) { + super(message, e); + } + } + + class StateManagerLoadException extends StateManagerException { + public StateManagerLoadException(Exception e) { + super("Load of Ownership failed", e); + } + } + + class StateManagerSaveException extends StateManagerException { + public StateManagerSaveException(Exception e) { + super("Save of Ownership failed", e); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/0591d067/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/ServerLoad.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/ServerLoad.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/ServerLoad.java new file mode 100644 index 0000000..d7fbcf2 --- /dev/null +++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/ServerLoad.java @@ -0,0 +1,152 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.twitter.distributedlog.service.placement; + +import java.io.IOException; +import java.io.UnsupportedEncodingException; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.Set; +import com.twitter.distributedlog.service.placement.thrift.*; + +import org.apache.commons.lang3.builder.HashCodeBuilder; +import org.apache.thrift.TException; +import org.apache.thrift.protocol.TJSONProtocol; +import org.apache.thrift.transport.TMemoryBuffer; +import org.apache.thrift.transport.TMemoryInputTransport; + +import static com.google.common.base.Charsets.UTF_8; + +/** + * A comparable data object containing the identifier of the server, total appraised load on the + * server, and all streams assigned to the server by the resource placement mapping. This is + * comparable first by load and then by server so that a sorted data structure of these will be + * consistent across multiple calculations. + */ +public class ServerLoad implements Comparable { + private static final int BUFFER_SIZE = 4096000; + private final String server; + private final HashSet<StreamLoad> streamLoads = new HashSet<StreamLoad>(); + private long load = 0l; + + public ServerLoad(String server) { + this.server = server; + } + + synchronized public long addStream(StreamLoad stream) { + this.load += stream.getLoad(); + streamLoads.add(stream); + return this.load; + } + + synchronized public long removeStream(String stream) { + for (StreamLoad streamLoad : streamLoads) { + if (streamLoad.stream.equals(stream)) { + this.load -= load; + streamLoads.remove(streamLoad); + return this.load; + } + } + return this.load; //Throwing an exception wouldn't help us as our logic should never reach here + } + + public long getLoad() { + return load; + } + + public Set<StreamLoad> getStreamLoads() { + return streamLoads; + } + + public String getServer() { + return server; + } + + protected com.twitter.distributedlog.service.placement.thrift.ServerLoad toThrift() { + com.twitter.distributedlog.service.placement.thrift.ServerLoad tServerLoad + = new com.twitter.distributedlog.service.placement.thrift.ServerLoad(); + tServerLoad.setServer(server); + tServerLoad.setLoad(load); + ArrayList<com.twitter.distributedlog.service.placement.thrift.StreamLoad> tStreamLoads + = new ArrayList<com.twitter.distributedlog.service.placement.thrift.StreamLoad>(); + for (StreamLoad streamLoad: streamLoads) { + tStreamLoads.add(streamLoad.toThrift()); + } + tServerLoad.setStreams(tStreamLoads); + return tServerLoad; + } + + public byte[] serialize() throws IOException { + TMemoryBuffer transport = new TMemoryBuffer(BUFFER_SIZE); + TJSONProtocol protocol = new TJSONProtocol(transport); + try { + toThrift().write(protocol); + transport.flush(); + return transport.toString(UTF_8.name()).getBytes(UTF_8); + } catch (TException e) { + throw new IOException("Failed to serialize server load : ", e); + } catch (UnsupportedEncodingException uee) { + throw new IOException("Failed to serialize server load : ", uee); + } + } + + public static ServerLoad deserialize(byte[] data) throws IOException { + com.twitter.distributedlog.service.placement.thrift.ServerLoad tServerLoad + = new com.twitter.distributedlog.service.placement.thrift.ServerLoad(); + TMemoryInputTransport transport = new TMemoryInputTransport(data); + TJSONProtocol protocol = new TJSONProtocol(transport); + try { + tServerLoad.read(protocol); + ServerLoad serverLoad = new ServerLoad(tServerLoad.getServer()); + if (tServerLoad.isSetStreams()) { + for (com.twitter.distributedlog.service.placement.thrift.StreamLoad tStreamLoad : tServerLoad.getStreams()) { + serverLoad.addStream(new StreamLoad(tStreamLoad.getStream(), tStreamLoad.getLoad())); + } + } + return serverLoad; + } catch (TException e) { + throw new IOException("Failed to deserialize server load : ", e); + } + } + + @Override + public int compareTo(Object o) { + ServerLoad other = (ServerLoad) o; + if (load == other.load) { + return server.compareTo(other.getServer()); + } else { + return Long.compare(load, other.getLoad()); + } + } + + @Override + public boolean equals(Object o) { + ServerLoad other = (ServerLoad) o; + return server.equals(other.getServer()) && load == other.getLoad() && streamLoads.equals(other.getStreamLoads()); + } + + @Override + public String toString() { + return String.format("ServerLoad<Server: %s, Load: %d, Streams: %s>", server, load, streamLoads); + } + + @Override + public int hashCode() { + return new HashCodeBuilder().append(server).append(load).append(streamLoads).build(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/0591d067/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/StreamLoad.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/StreamLoad.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/StreamLoad.java new file mode 100644 index 0000000..4f3dc71 --- /dev/null +++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/StreamLoad.java @@ -0,0 +1,109 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.twitter.distributedlog.service.placement; + +import java.io.IOException; +import java.io.UnsupportedEncodingException; + +import org.apache.commons.lang3.builder.HashCodeBuilder; +import org.apache.thrift.TException; +import org.apache.thrift.protocol.TJSONProtocol; +import org.apache.thrift.transport.TMemoryBuffer; +import org.apache.thrift.transport.TMemoryInputTransport; + +import static com.google.common.base.Charsets.UTF_8; + +/** + * A comparable data object containing the identifier of the stream and the appraised load produced + * by the stream. + */ +public class StreamLoad implements Comparable { + private static final int BUFFER_SIZE = 4096; + public final String stream; + private final int load; + + public StreamLoad(String stream, int load) { + this.stream = stream; + this.load = load; + } + + public int getLoad() { + return load; + } + + public String getStream() { + return stream; + } + + protected com.twitter.distributedlog.service.placement.thrift.StreamLoad toThrift() { + com.twitter.distributedlog.service.placement.thrift.StreamLoad tStreamLoad = new com.twitter.distributedlog.service.placement.thrift.StreamLoad(); + return tStreamLoad.setStream(stream).setLoad(load); + } + + public byte[] serialize() throws IOException { + TMemoryBuffer transport = new TMemoryBuffer(BUFFER_SIZE); + TJSONProtocol protocol = new TJSONProtocol(transport); + try { + toThrift().write(protocol); + transport.flush(); + return transport.toString(UTF_8.name()).getBytes(UTF_8); + } catch (TException e) { + throw new IOException("Failed to serialize stream load : ", e); + } catch (UnsupportedEncodingException uee) { + throw new IOException("Failed to serialize stream load : ", uee); + } + } + + public static StreamLoad deserialize(byte[] data) throws IOException { + com.twitter.distributedlog.service.placement.thrift.StreamLoad tStreamLoad = new com.twitter.distributedlog.service.placement.thrift.StreamLoad(); + TMemoryInputTransport transport = new TMemoryInputTransport(data); + TJSONProtocol protocol = new TJSONProtocol(transport); + try { + tStreamLoad.read(protocol); + return new StreamLoad(tStreamLoad.getStream(), tStreamLoad.getLoad()); + } catch (TException e) { + throw new IOException("Failed to deserialize stream load : ", e); + } + } + + @Override + public int compareTo(Object o) { + StreamLoad other = (StreamLoad) o; + if (load == other.getLoad()) { + return stream.compareTo(other.getStream()); + } else { + return Long.compare(load, other.getLoad()); + } + } + + @Override + public boolean equals(Object o) { + StreamLoad other = (StreamLoad) o; + return stream.equals(other.getStream()) && load == other.getLoad(); + } + + @Override + public String toString() { + return String.format("StreamLoad<Stream: %s, Load: %d>", stream, load); + } + + @Override + public int hashCode() { + return new HashCodeBuilder().append(stream).append(load).build(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/0591d067/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/ZKPlacementStateManager.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/ZKPlacementStateManager.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/ZKPlacementStateManager.java new file mode 100644 index 0000000..18b9d1f --- /dev/null +++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/ZKPlacementStateManager.java @@ -0,0 +1,172 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.twitter.distributedlog.service.placement; + +import java.io.IOException; +import java.net.URI; +import java.nio.ByteBuffer; +import java.util.HashSet; +import java.util.List; +import java.util.TreeSet; + +import org.apache.bookkeeper.stats.StatsLogger; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.Transaction; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.data.Stat; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.twitter.distributedlog.BKDistributedLogNamespace; +import com.twitter.distributedlog.DistributedLogConfiguration; +import com.twitter.distributedlog.ZooKeeperClient; +import com.twitter.distributedlog.util.DLUtils; +import com.twitter.distributedlog.util.Utils; + +/** + * An implementation of the PlacementStateManager that saves data to and loads from Zookeeper to + * avoid necessitating an additional system for the resource placement. + */ +public class ZKPlacementStateManager implements PlacementStateManager { + static final Logger logger = LoggerFactory.getLogger(ZKPlacementStateManager.class); + private static final String SERVER_LOAD_DIR = "/.server-load"; + + private final String serverLoadPath; + private final ZooKeeperClient zkClient; + + private boolean watching = false; + + public ZKPlacementStateManager(URI uri, DistributedLogConfiguration conf, StatsLogger statsLogger) { + zkClient = BKDistributedLogNamespace.createDLZKClientBuilder( + String.format("dlzk:%s:factory_writer_shared", uri), + conf, + DLUtils.getZKServersFromDLUri(uri), + statsLogger.scope("dlzk_factory_writer_shared")).build(); + serverLoadPath = uri.getPath() + SERVER_LOAD_DIR; + } + + private void createServerLoadPathIfNoExists(byte[] data) + throws ZooKeeperClient.ZooKeeperConnectionException, KeeperException, InterruptedException { + try { + Utils.zkCreateFullPathOptimistic(zkClient, serverLoadPath, data, zkClient.getDefaultACL(), CreateMode.PERSISTENT); + } catch (KeeperException.NodeExistsException nee) { + logger.debug("the server load path {} is already created by others", serverLoadPath, nee); + } + } + + @Override + public void saveOwnership(TreeSet<ServerLoad> serverLoads) throws StateManagerSaveException { + logger.info("saving ownership"); + try { + ZooKeeper zk = zkClient.get(); + // use timestamp as data so watchers will see any changes + byte[] timestamp = ByteBuffer.allocate(8).putLong(System.currentTimeMillis()).array(); + + if (zk.exists(serverLoadPath, false) == null) { //create path to rootnode if it does not yet exist + createServerLoadPathIfNoExists(timestamp); + } + + Transaction tx = zk.transaction(); + List<String> children = zk.getChildren(serverLoadPath, false); + HashSet<String> servers = new HashSet<String>(children); + tx.setData(serverLoadPath, timestamp, -1); // trigger the watcher that data has been updated + for (ServerLoad serverLoad : serverLoads) { + String server = serverToZkFormat(serverLoad.getServer()); + String serverPath = serverPath(server); + if (servers.contains(server)) { + servers.remove(server); + tx.setData(serverPath, serverLoad.serialize(), -1); + } else { + tx.create(serverPath, serverLoad.serialize(), zkClient.getDefaultACL(), CreateMode.PERSISTENT); + } + } + for (String server : servers) { + tx.delete(serverPath(server), -1); + } + tx.commit(); + } catch (InterruptedException | IOException | KeeperException e) { + throw new StateManagerSaveException(e); + } + } + + @Override + public TreeSet<ServerLoad> loadOwnership() throws StateManagerLoadException { + TreeSet<ServerLoad> ownerships = new TreeSet<ServerLoad>(); + try { + ZooKeeper zk = zkClient.get(); + List<String> children = zk.getChildren(serverLoadPath, false); + for (String server : children) { + ownerships.add(ServerLoad.deserialize(zk.getData(serverPath(server), false, new Stat()))); + } + return ownerships; + } catch (InterruptedException | IOException | KeeperException e) { + throw new StateManagerLoadException(e); + } + } + + @Override + synchronized public void watch(final PlacementCallback callback) { + if (watching) { + return; // do not double watch + } + watching = true; + + try { + ZooKeeper zk = zkClient.get(); + try { + zk.getData(serverLoadPath, new Watcher() { + @Override + public void process(WatchedEvent watchedEvent) { + try { + callback.callback(loadOwnership()); + } catch (StateManagerLoadException e) { + logger.error("Watch of Ownership failed", e); + } finally { + watching = false; + watch(callback); + } + } + }, new Stat()); + } catch (KeeperException.NoNodeException nee) { + byte[] timestamp = ByteBuffer.allocate(8).putLong(System.currentTimeMillis()).array(); + createServerLoadPathIfNoExists(timestamp); + watching = false; + watch(callback); + } + } catch (ZooKeeperClient.ZooKeeperConnectionException | InterruptedException | KeeperException e) { + logger.error("Watch of Ownership failed", e); + watching = false; + watch(callback); + } + } + + public String serverPath(String server) { + return String.format("%s/%s", serverLoadPath, server); + } + + protected String serverToZkFormat(String server) { + return server.replaceAll("/", "--"); + } + + protected String zkFormatToServer(String zkFormattedServer) { + return zkFormattedServer.replaceAll("--", "/"); + } +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/0591d067/distributedlog-service/src/main/thrift/metadata.thrift ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/thrift/metadata.thrift b/distributedlog-service/src/main/thrift/metadata.thrift new file mode 100644 index 0000000..8f7b6ec --- /dev/null +++ b/distributedlog-service/src/main/thrift/metadata.thrift @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +namespace java com.twitter.distributedlog.service.placement.thrift + +struct StreamLoad { + 1: optional string stream + 2: optional i32 load +} + +struct ServerLoad { + 1: optional string server + 2: optional i64 load + 3: optional list<StreamLoad> streams +}