http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/MonitorService.java ---------------------------------------------------------------------- diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/MonitorService.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/MonitorService.java new file mode 100644 index 0000000..b1e2879 --- /dev/null +++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/MonitorService.java @@ -0,0 +1,469 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog.service; + +import static com.google.common.base.Preconditions.checkArgument; + +import com.google.common.base.Optional; +import com.google.common.base.Stopwatch; +import com.google.common.collect.Sets; +import com.google.common.hash.HashFunction; +import com.google.common.hash.Hashing; +import com.twitter.common.zookeeper.ServerSet; +import org.apache.distributedlog.DistributedLogConfiguration; +import org.apache.distributedlog.DistributedLogConstants; +import org.apache.distributedlog.DistributedLogManager; +import org.apache.distributedlog.LogSegmentMetadata; +import org.apache.distributedlog.callback.LogSegmentListener; +import org.apache.distributedlog.callback.NamespaceListener; +import org.apache.distributedlog.client.monitor.MonitorServiceClient; +import org.apache.distributedlog.client.serverset.DLZkServerSet; +import org.apache.distributedlog.namespace.DistributedLogNamespace; +import org.apache.distributedlog.namespace.DistributedLogNamespaceBuilder; +import com.twitter.finagle.builder.ClientBuilder; +import com.twitter.finagle.stats.Stat; +import com.twitter.finagle.stats.StatsReceiver; +import com.twitter.finagle.thrift.ClientId$; +import com.twitter.util.Duration; +import com.twitter.util.FutureEventListener; +import java.io.File; +import java.io.IOException; +import java.net.MalformedURLException; +import java.net.URI; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executors; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import org.apache.bookkeeper.stats.Gauge; +import org.apache.bookkeeper.stats.StatsProvider; +import org.apache.commons.configuration.ConfigurationException; +import org.apache.commons.lang.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Monitor Service. + */ +public class MonitorService implements NamespaceListener { + + private static final Logger logger = LoggerFactory.getLogger(MonitorService.class); + + private DistributedLogNamespace dlNamespace = null; + private MonitorServiceClient dlClient = null; + private DLZkServerSet[] zkServerSets = null; + private final ScheduledExecutorService executorService = + Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors()); + private final CountDownLatch keepAliveLatch = new CountDownLatch(1); + private final Map<String, StreamChecker> knownStreams = new HashMap<String, StreamChecker>(); + + // Settings + private int regionId = DistributedLogConstants.LOCAL_REGION_ID; + private int interval = 100; + private String streamRegex = null; + private boolean watchNamespaceChanges = false; + private boolean handshakeWithClientInfo = false; + private int heartbeatEveryChecks = 0; + private int instanceId = -1; + private int totalInstances = -1; + private boolean isThriftMux = false; + + // Options + private final Optional<String> uriArg; + private final Optional<String> confFileArg; + private final Optional<String> serverSetArg; + private final Optional<Integer> intervalArg; + private final Optional<Integer> regionIdArg; + private final Optional<String> streamRegexArg; + private final Optional<Integer> instanceIdArg; + private final Optional<Integer> totalInstancesArg; + private final Optional<Integer> heartbeatEveryChecksArg; + private final Optional<Boolean> handshakeWithClientInfoArg; + private final Optional<Boolean> watchNamespaceChangesArg; + private final Optional<Boolean> isThriftMuxArg; + + // Stats + private final StatsProvider statsProvider; + private final StatsReceiver statsReceiver; + private final StatsReceiver monitorReceiver; + private final Stat successStat; + private final Stat failureStat; + private final Gauge<Number> numOfStreamsGauge; + // Hash Function + private final HashFunction hashFunction = Hashing.md5(); + + class StreamChecker implements Runnable, FutureEventListener<Void>, LogSegmentListener { + private final String name; + private volatile boolean closed = false; + private volatile boolean checking = false; + private final Stopwatch stopwatch = Stopwatch.createUnstarted(); + private DistributedLogManager dlm = null; + private int numChecks = 0; + + StreamChecker(String name) { + this.name = name; + } + + @Override + public void run() { + if (null == dlm) { + try { + dlm = dlNamespace.openLog(name); + dlm.registerListener(this); + } catch (IOException e) { + if (null != dlm) { + try { + dlm.close(); + } catch (IOException e1) { + logger.error("Failed to close dlm for {} : ", name, e1); + } + dlm = null; + } + executorService.schedule(this, interval, TimeUnit.MILLISECONDS); + } + } else { + stopwatch.reset().start(); + boolean sendHeartBeat; + if (heartbeatEveryChecks > 0) { + synchronized (this) { + ++numChecks; + if (numChecks >= Integer.MAX_VALUE) { + numChecks = 0; + } + sendHeartBeat = (numChecks % heartbeatEveryChecks) == 0; + } + } else { + sendHeartBeat = false; + } + if (sendHeartBeat) { + dlClient.heartbeat(name).addEventListener(this); + } else { + dlClient.check(name).addEventListener(this); + } + } + } + + @Override + public void onSegmentsUpdated(List<LogSegmentMetadata> segments) { + if (segments.size() > 0 && segments.get(0).getRegionId() == regionId) { + if (!checking) { + logger.info("Start checking stream {}.", name); + checking = true; + run(); + } + } else { + if (checking) { + logger.info("Stop checking stream {}.", name); + } + } + } + + @Override + public void onLogStreamDeleted() { + logger.info("Stream {} is deleted", name); + } + + @Override + public void onSuccess(Void value) { + successStat.add(stopwatch.stop().elapsed(TimeUnit.MICROSECONDS)); + scheduleCheck(); + } + + @Override + public void onFailure(Throwable cause) { + failureStat.add(stopwatch.stop().elapsed(TimeUnit.MICROSECONDS)); + scheduleCheck(); + } + + private void scheduleCheck() { + if (closed) { + return; + } + if (!checking) { + return; + } + try { + executorService.schedule(this, interval, TimeUnit.MILLISECONDS); + } catch (RejectedExecutionException ree) { + logger.error("Failed to schedule checking stream {} in {} ms : ", + new Object[] { name, interval, ree }); + } + } + + private void close() { + closed = true; + if (null != dlm) { + try { + dlm.close(); + } catch (IOException e) { + logger.error("Failed to close dlm for {} : ", name, e); + } + } + } + } + + MonitorService(Optional<String> uriArg, + Optional<String> confFileArg, + Optional<String> serverSetArg, + Optional<Integer> intervalArg, + Optional<Integer> regionIdArg, + Optional<String> streamRegexArg, + Optional<Integer> instanceIdArg, + Optional<Integer> totalInstancesArg, + Optional<Integer> heartbeatEveryChecksArg, + Optional<Boolean> handshakeWithClientInfoArg, + Optional<Boolean> watchNamespaceChangesArg, + Optional<Boolean> isThriftMuxArg, + StatsReceiver statsReceiver, + StatsProvider statsProvider) { + // options + this.uriArg = uriArg; + this.confFileArg = confFileArg; + this.serverSetArg = serverSetArg; + this.intervalArg = intervalArg; + this.regionIdArg = regionIdArg; + this.streamRegexArg = streamRegexArg; + this.instanceIdArg = instanceIdArg; + this.totalInstancesArg = totalInstancesArg; + this.heartbeatEveryChecksArg = heartbeatEveryChecksArg; + this.handshakeWithClientInfoArg = handshakeWithClientInfoArg; + this.watchNamespaceChangesArg = watchNamespaceChangesArg; + this.isThriftMuxArg = isThriftMuxArg; + + // Stats + this.statsReceiver = statsReceiver; + this.monitorReceiver = statsReceiver.scope("monitor"); + this.successStat = monitorReceiver.stat0("success"); + this.failureStat = monitorReceiver.stat0("failure"); + this.statsProvider = statsProvider; + this.numOfStreamsGauge = new Gauge<Number>() { + @Override + public Number getDefaultValue() { + return 0; + } + + @Override + public Number getSample() { + return knownStreams.size(); + } + }; + } + + public void runServer() throws IllegalArgumentException, IOException { + checkArgument(uriArg.isPresent(), + "No distributedlog uri provided."); + checkArgument(serverSetArg.isPresent(), + "No proxy server set provided."); + if (intervalArg.isPresent()) { + interval = intervalArg.get(); + } + if (regionIdArg.isPresent()) { + regionId = regionIdArg.get(); + } + if (streamRegexArg.isPresent()) { + streamRegex = streamRegexArg.get(); + } + if (instanceIdArg.isPresent()) { + instanceId = instanceIdArg.get(); + } + if (totalInstancesArg.isPresent()) { + totalInstances = totalInstancesArg.get(); + } + if (heartbeatEveryChecksArg.isPresent()) { + heartbeatEveryChecks = heartbeatEveryChecksArg.get(); + } + if (instanceId < 0 || totalInstances <= 0 || instanceId >= totalInstances) { + throw new IllegalArgumentException("Invalid instance id or total instances number."); + } + handshakeWithClientInfo = handshakeWithClientInfoArg.isPresent(); + watchNamespaceChanges = watchNamespaceChangesArg.isPresent(); + isThriftMux = isThriftMuxArg.isPresent(); + URI uri = URI.create(uriArg.get()); + DistributedLogConfiguration dlConf = new DistributedLogConfiguration(); + if (confFileArg.isPresent()) { + String configFile = confFileArg.get(); + try { + dlConf.loadConf(new File(configFile).toURI().toURL()); + } catch (ConfigurationException e) { + throw new IOException("Failed to load distributedlog configuration from " + configFile + "."); + } catch (MalformedURLException e) { + throw new IOException("Failed to load distributedlog configuration from malformed " + + configFile + "."); + } + } + logger.info("Starting stats provider : {}.", statsProvider.getClass()); + statsProvider.start(dlConf); + String[] serverSetPaths = StringUtils.split(serverSetArg.get(), ","); + if (serverSetPaths.length == 0) { + throw new IllegalArgumentException("Invalid serverset paths provided : " + serverSetArg.get()); + } + + ServerSet[] serverSets = createServerSets(serverSetPaths); + ServerSet local = serverSets[0]; + ServerSet[] remotes = new ServerSet[serverSets.length - 1]; + System.arraycopy(serverSets, 1, remotes, 0, remotes.length); + + ClientBuilder finagleClientBuilder = ClientBuilder.get() + .connectTimeout(Duration.fromSeconds(1)) + .tcpConnectTimeout(Duration.fromSeconds(1)) + .requestTimeout(Duration.fromSeconds(2)) + .keepAlive(true) + .failFast(false); + + if (!isThriftMux) { + finagleClientBuilder = finagleClientBuilder + .hostConnectionLimit(2) + .hostConnectionCoresize(2); + } + + dlClient = DistributedLogClientBuilder.newBuilder() + .name("monitor") + .thriftmux(isThriftMux) + .clientId(ClientId$.MODULE$.apply("monitor")) + .redirectBackoffMaxMs(50) + .redirectBackoffStartMs(100) + .requestTimeoutMs(2000) + .maxRedirects(2) + .serverSets(local, remotes) + .streamNameRegex(streamRegex) + .handshakeWithClientInfo(handshakeWithClientInfo) + .clientBuilder(finagleClientBuilder) + .statsReceiver(monitorReceiver.scope("client")) + .buildMonitorClient(); + runMonitor(dlConf, uri); + } + + ServerSet[] createServerSets(String[] serverSetPaths) { + ServerSet[] serverSets = new ServerSet[serverSetPaths.length]; + zkServerSets = new DLZkServerSet[serverSetPaths.length]; + for (int i = 0; i < serverSetPaths.length; i++) { + String serverSetPath = serverSetPaths[i]; + zkServerSets[i] = parseServerSet(serverSetPath); + serverSets[i] = zkServerSets[i].getServerSet(); + } + return serverSets; + } + + protected DLZkServerSet parseServerSet(String serverSetPath) { + return DLZkServerSet.of(URI.create(serverSetPath), 60000); + } + + @Override + public void onStreamsChanged(Iterator<String> streams) { + Set<String> newSet = new HashSet<String>(); + while (streams.hasNext()) { + String s = streams.next(); + if (null == streamRegex || s.matches(streamRegex)) { + if (Math.abs(hashFunction.hashUnencodedChars(s).asInt()) % totalInstances == instanceId) { + newSet.add(s); + } + } + } + List<StreamChecker> tasksToCancel = new ArrayList<StreamChecker>(); + synchronized (knownStreams) { + Set<String> knownStreamSet = new HashSet<String>(knownStreams.keySet()); + Set<String> removedStreams = Sets.difference(knownStreamSet, newSet).immutableCopy(); + Set<String> addedStreams = Sets.difference(newSet, knownStreamSet).immutableCopy(); + for (String s : removedStreams) { + StreamChecker task = knownStreams.remove(s); + if (null != task) { + logger.info("Removed stream {}", s); + tasksToCancel.add(task); + } + } + for (String s : addedStreams) { + if (!knownStreams.containsKey(s)) { + logger.info("Added stream {}", s); + StreamChecker sc = new StreamChecker(s); + knownStreams.put(s, sc); + sc.run(); + } + } + } + for (StreamChecker sc : tasksToCancel) { + sc.close(); + } + } + + void runMonitor(DistributedLogConfiguration conf, URI dlUri) throws IOException { + // stats + statsProvider.getStatsLogger("monitor").registerGauge("num_streams", numOfStreamsGauge); + logger.info("Construct dl namespace @ {}", dlUri); + dlNamespace = DistributedLogNamespaceBuilder.newBuilder() + .conf(conf) + .uri(dlUri) + .build(); + if (watchNamespaceChanges) { + dlNamespace.registerNamespaceListener(this); + } else { + onStreamsChanged(dlNamespace.getLogs()); + } + } + + /** + * Close the server. + */ + public void close() { + logger.info("Closing monitor service."); + if (null != dlClient) { + dlClient.close(); + } + if (null != zkServerSets) { + for (DLZkServerSet zkServerSet : zkServerSets) { + zkServerSet.close(); + } + } + if (null != dlNamespace) { + dlNamespace.close(); + } + executorService.shutdown(); + try { + if (!executorService.awaitTermination(1, TimeUnit.MINUTES)) { + executorService.shutdownNow(); + } + } catch (InterruptedException e) { + logger.error("Interrupted on waiting shutting down monitor executor service : ", e); + } + if (null != statsProvider) { + // clean up the gauges + unregisterGauge(); + statsProvider.stop(); + } + keepAliveLatch.countDown(); + logger.info("Closed monitor service."); + } + + public void join() throws InterruptedException { + keepAliveLatch.await(); + } + + /** + * clean up the gauge before we close to help GC. + */ + private void unregisterGauge(){ + statsProvider.getStatsLogger("monitor").unregisterGauge("num_streams", numOfStreamsGauge); + } + +}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/MonitorServiceApp.java ---------------------------------------------------------------------- diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/MonitorServiceApp.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/MonitorServiceApp.java new file mode 100644 index 0000000..1f45b13 --- /dev/null +++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/MonitorServiceApp.java @@ -0,0 +1,133 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog.service; + +import static org.apache.distributedlog.util.CommandLineUtils.getOptionalBooleanArg; +import static org.apache.distributedlog.util.CommandLineUtils.getOptionalIntegerArg; +import static org.apache.distributedlog.util.CommandLineUtils.getOptionalStringArg; + +import com.twitter.finagle.stats.NullStatsReceiver; +import com.twitter.finagle.stats.StatsReceiver; +import java.io.IOException; +import org.apache.bookkeeper.stats.NullStatsProvider; +import org.apache.bookkeeper.stats.StatsProvider; +import org.apache.bookkeeper.util.ReflectionUtils; +import org.apache.commons.cli.BasicParser; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.HelpFormatter; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.ParseException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * The launcher to run monitor service. + */ +public class MonitorServiceApp { + + private static final Logger logger = LoggerFactory.getLogger(MonitorServiceApp.class); + + static final String USAGE = "MonitorService [-u <uri>] [-c <conf>] [-s serverset]"; + + final String[] args; + final Options options = new Options(); + + private MonitorServiceApp(String[] args) { + this.args = args; + // prepare options + options.addOption("u", "uri", true, "DistributedLog URI"); + options.addOption("c", "conf", true, "DistributedLog Configuration File"); + options.addOption("s", "serverset", true, "Proxy Server Set"); + options.addOption("i", "interval", true, "Check interval"); + options.addOption("d", "region", true, "Region ID"); + options.addOption("p", "provider", true, "DistributedLog Stats Provider"); + options.addOption("f", "filter", true, "Filter streams by regex"); + options.addOption("w", "watch", false, "Watch stream changes under a given namespace"); + options.addOption("n", "instance_id", true, "Instance ID"); + options.addOption("t", "total_instances", true, "Total instances"); + options.addOption("hck", "heartbeat-num-checks", true, "Send a heartbeat after num checks"); + options.addOption("hsci", "handshake-with-client-info", false, "Enable handshaking with client info"); + } + + void printUsage() { + HelpFormatter helpFormatter = new HelpFormatter(); + helpFormatter.printHelp(USAGE, options); + } + + private void run() { + try { + logger.info("Running monitor service."); + BasicParser parser = new BasicParser(); + CommandLine cmdline = parser.parse(options, args); + runCmd(cmdline); + } catch (ParseException pe) { + printUsage(); + Runtime.getRuntime().exit(-1); + } catch (IOException ie) { + logger.error("Failed to start monitor service : ", ie); + Runtime.getRuntime().exit(-1); + } + } + + void runCmd(CommandLine cmdline) throws IOException { + StatsProvider statsProvider = new NullStatsProvider(); + if (cmdline.hasOption("p")) { + String providerClass = cmdline.getOptionValue("p"); + statsProvider = ReflectionUtils.newInstance(providerClass, StatsProvider.class); + } + StatsReceiver statsReceiver = NullStatsReceiver.get(); + + final MonitorService monitorService = new MonitorService( + getOptionalStringArg(cmdline, "u"), + getOptionalStringArg(cmdline, "c"), + getOptionalStringArg(cmdline, "s"), + getOptionalIntegerArg(cmdline, "i"), + getOptionalIntegerArg(cmdline, "d"), + getOptionalStringArg(cmdline, "f"), + getOptionalIntegerArg(cmdline, "n"), + getOptionalIntegerArg(cmdline, "t"), + getOptionalIntegerArg(cmdline, "hck"), + getOptionalBooleanArg(cmdline, "hsci"), + getOptionalBooleanArg(cmdline, "w"), + getOptionalBooleanArg(cmdline, "mx"), + statsReceiver, + statsProvider); + + monitorService.runServer(); + + Runtime.getRuntime().addShutdownHook(new Thread() { + @Override + public void run() { + logger.info("Closing monitor service."); + monitorService.close(); + logger.info("Closed monitor service."); + } + }); + try { + monitorService.join(); + } catch (InterruptedException ie) { + logger.warn("Interrupted when waiting monitor service to be finished : ", ie); + } + } + + public static void main(String[] args) { + final MonitorServiceApp launcher = new MonitorServiceApp(args); + launcher.run(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/ResponseUtils.java ---------------------------------------------------------------------- diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/ResponseUtils.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/ResponseUtils.java new file mode 100644 index 0000000..08f4b41 --- /dev/null +++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/ResponseUtils.java @@ -0,0 +1,86 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog.service; + +import org.apache.distributedlog.exceptions.DLException; +import org.apache.distributedlog.exceptions.OwnershipAcquireFailedException; +import org.apache.distributedlog.thrift.service.BulkWriteResponse; +import org.apache.distributedlog.thrift.service.ResponseHeader; +import org.apache.distributedlog.thrift.service.StatusCode; +import org.apache.distributedlog.thrift.service.WriteResponse; + +/** + * Utility methods for building write proxy service responses. + */ +public class ResponseUtils { + public static ResponseHeader deniedHeader() { + return new ResponseHeader(StatusCode.REQUEST_DENIED); + } + + public static ResponseHeader streamUnavailableHeader() { + return new ResponseHeader(StatusCode.STREAM_UNAVAILABLE); + } + + public static ResponseHeader successHeader() { + return new ResponseHeader(StatusCode.SUCCESS); + } + + public static ResponseHeader ownerToHeader(String owner) { + return new ResponseHeader(StatusCode.FOUND).setLocation(owner); + } + + public static ResponseHeader exceptionToHeader(Throwable t) { + ResponseHeader response = new ResponseHeader(); + if (t instanceof DLException) { + DLException dle = (DLException) t; + if (dle instanceof OwnershipAcquireFailedException) { + response.setLocation(((OwnershipAcquireFailedException) dle).getCurrentOwner()); + } + response.setCode(StatusCode.findByValue(dle.getCode())); + response.setErrMsg(dle.getMessage()); + } else { + response.setCode(StatusCode.INTERNAL_SERVER_ERROR); + response.setErrMsg("Internal server error : " + t.getMessage()); + } + return response; + } + + public static WriteResponse write(ResponseHeader responseHeader) { + return new WriteResponse(responseHeader); + } + + public static WriteResponse writeSuccess() { + return new WriteResponse(successHeader()); + } + + public static WriteResponse writeDenied() { + return new WriteResponse(deniedHeader()); + } + + public static BulkWriteResponse bulkWrite(ResponseHeader responseHeader) { + return new BulkWriteResponse(responseHeader); + } + + public static BulkWriteResponse bulkWriteSuccess() { + return new BulkWriteResponse(successHeader()); + } + + public static BulkWriteResponse bulkWriteDenied() { + return new BulkWriteResponse(deniedHeader()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/ServerFeatureKeys.java ---------------------------------------------------------------------- diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/ServerFeatureKeys.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/ServerFeatureKeys.java new file mode 100644 index 0000000..436145d --- /dev/null +++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/ServerFeatureKeys.java @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog.service; + +/** + * List of feature keys used by distributedlog server. + */ +public enum ServerFeatureKeys { + + REGION_STOP_ACCEPT_NEW_STREAM, + SERVICE_RATE_LIMIT_DISABLED, + SERVICE_CHECKSUM_DISABLED, + SERVICE_GLOBAL_LIMITER_DISABLED +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/StatsFilter.java ---------------------------------------------------------------------- diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/StatsFilter.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/StatsFilter.java new file mode 100644 index 0000000..ee64580 --- /dev/null +++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/StatsFilter.java @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog.service; + +import com.google.common.base.Stopwatch; +import com.twitter.finagle.Service; +import com.twitter.finagle.SimpleFilter; +import com.twitter.util.Future; +import java.util.concurrent.TimeUnit; +import org.apache.bookkeeper.stats.Counter; +import org.apache.bookkeeper.stats.OpStatsLogger; +import org.apache.bookkeeper.stats.StatsLogger; + +/** + * Track distributedlog server finagle-service stats. + */ +class StatsFilter<Req, Rep> extends SimpleFilter<Req, Rep> { + + private final StatsLogger stats; + private final Counter outstandingAsync; + private final OpStatsLogger serviceExec; + + @Override + public Future<Rep> apply(Req req, Service<Req, Rep> service) { + Future<Rep> result = null; + outstandingAsync.inc(); + final Stopwatch stopwatch = Stopwatch.createStarted(); + try { + result = service.apply(req); + serviceExec.registerSuccessfulEvent(stopwatch.stop().elapsed(TimeUnit.MICROSECONDS)); + } finally { + outstandingAsync.dec(); + if (null == result) { + serviceExec.registerFailedEvent(stopwatch.stop().elapsed(TimeUnit.MICROSECONDS)); + } + } + return result; + } + + public StatsFilter(StatsLogger stats) { + this.stats = stats; + this.outstandingAsync = stats.getCounter("outstandingAsync"); + this.serviceExec = stats.getOpStatsLogger("serviceExec"); + } +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/announcer/Announcer.java ---------------------------------------------------------------------- diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/announcer/Announcer.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/announcer/Announcer.java new file mode 100644 index 0000000..ee64fc7 --- /dev/null +++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/announcer/Announcer.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog.service.announcer; + +import java.io.IOException; + +/** + * Announce service information. + */ +public interface Announcer { + + /** + * Announce service info. + */ + void announce() throws IOException; + + /** + * Unannounce the service info. + */ + void unannounce() throws IOException; + + /** + * Close the announcer. + */ + void close(); +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/announcer/NOPAnnouncer.java ---------------------------------------------------------------------- diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/announcer/NOPAnnouncer.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/announcer/NOPAnnouncer.java new file mode 100644 index 0000000..5a1277a --- /dev/null +++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/announcer/NOPAnnouncer.java @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog.service.announcer; + +import java.io.IOException; + +/** + * A no-op implementation of {@link Announcer}. + */ +public class NOPAnnouncer implements Announcer { + @Override + public void announce() throws IOException { + // nop + } + + @Override + public void unannounce() throws IOException { + // nop + } + + @Override + public void close() { + // nop + } +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/announcer/ServerSetAnnouncer.java ---------------------------------------------------------------------- diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/announcer/ServerSetAnnouncer.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/announcer/ServerSetAnnouncer.java new file mode 100644 index 0000000..df4a8e2 --- /dev/null +++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/announcer/ServerSetAnnouncer.java @@ -0,0 +1,111 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog.service.announcer; + +import com.twitter.common.zookeeper.Group; +import com.twitter.common.zookeeper.ServerSet; +import org.apache.distributedlog.client.serverset.DLZkServerSet; +import java.io.IOException; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.URI; +import java.net.UnknownHostException; +import java.util.HashMap; +import java.util.Map; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * ServerSet based announcer. + */ +public class ServerSetAnnouncer implements Announcer { + + private static final Logger logger = LoggerFactory.getLogger(ServerSetAnnouncer.class); + + final String localAddr; + final InetSocketAddress serviceEndpoint; + final Map<String, InetSocketAddress> additionalEndpoints; + final int shardId; + + // ServerSet + DLZkServerSet zkServerSet; + + // Service Status + ServerSet.EndpointStatus serviceStatus = null; + + /** + * Announce server infos. + * + * @param servicePort + * service port + * @param statsPort + * stats port + * @param shardId + * shard id + */ + public ServerSetAnnouncer(URI uri, + int servicePort, + int statsPort, + int shardId) throws UnknownHostException { + this.shardId = shardId; + this.localAddr = InetAddress.getLocalHost().getHostAddress(); + // service endpoint + this.serviceEndpoint = new InetSocketAddress(localAddr, servicePort); + // stats endpoint + InetSocketAddress statsEndpoint = new InetSocketAddress(localAddr, statsPort); + this.additionalEndpoints = new HashMap<String, InetSocketAddress>(); + this.additionalEndpoints.put("aurora", statsEndpoint); + this.additionalEndpoints.put("stats", statsEndpoint); + this.additionalEndpoints.put("service", serviceEndpoint); + this.additionalEndpoints.put("thrift", serviceEndpoint); + + // Create zookeeper and server set + this.zkServerSet = DLZkServerSet.of(uri, 60000); + } + + @Override + public synchronized void announce() throws IOException { + try { + serviceStatus = + zkServerSet.getServerSet().join(serviceEndpoint, additionalEndpoints, shardId); + } catch (Group.JoinException e) { + throw new IOException("Failed to announce service : ", e); + } catch (InterruptedException e) { + logger.warn("Interrupted on announcing service : ", e); + Thread.currentThread().interrupt(); + } + } + + @Override + public synchronized void unannounce() throws IOException { + if (null == serviceStatus) { + logger.warn("No service to unannounce."); + return; + } + try { + serviceStatus.leave(); + } catch (ServerSet.UpdateException e) { + throw new IOException("Failed to unannounce service : ", e); + } + } + + @Override + public void close() { + zkServerSet.close(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/announcer/package-info.java ---------------------------------------------------------------------- diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/announcer/package-info.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/announcer/package-info.java new file mode 100644 index 0000000..6559bb3 --- /dev/null +++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/announcer/package-info.java @@ -0,0 +1,21 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Announcers to announce servers to server set. + */ +package org.apache.distributedlog.service.announcer; http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/balancer/Balancer.java ---------------------------------------------------------------------- diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/balancer/Balancer.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/balancer/Balancer.java new file mode 100644 index 0000000..cdffaa3 --- /dev/null +++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/balancer/Balancer.java @@ -0,0 +1,68 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog.service.balancer; + +import com.google.common.base.Optional; +import com.google.common.util.concurrent.RateLimiter; + +/** + * Balancer Interface. + * + * <p>A balancer is used for balance the streams across the proxy cluster. + */ +public interface Balancer { + + /** + * Rebalance all the streams from <i>source</i> to others. + * + * @param source + * source target name. + * @param rebalanceConcurrency + * the concurrency to move streams for re-balance. + * @param rebalanceRateLimiter + * the rate limiting to move streams for re-balance. + */ + void balanceAll(String source, + int rebalanceConcurrency, + Optional<RateLimiter> rebalanceRateLimiter); + + /** + * Balance the streams across all targets. + * + * @param rebalanceWaterMark + * rebalance water mark. if number of streams of a given target is less than + * the water mark, no streams will be re-balanced from this target. + * @param rebalanceTolerancePercentage + * tolerance percentage for the balancer. if number of streams of a given target is + * less than average + average * <i>tolerancePercentage</i> / 100.0, no streams will + * be re-balanced from that target. + * @param rebalanceConcurrency + * the concurrency to move streams for re-balance. + * @param rebalanceRateLimiter + * the rate limiting to move streams for re-balance. + */ + void balance(int rebalanceWaterMark, + double rebalanceTolerancePercentage, + int rebalanceConcurrency, + Optional<RateLimiter> rebalanceRateLimiter); + + /** + * Close the balancer. + */ + void close(); +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/balancer/BalancerTool.java ---------------------------------------------------------------------- diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/balancer/BalancerTool.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/balancer/BalancerTool.java new file mode 100644 index 0000000..964c1cc --- /dev/null +++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/balancer/BalancerTool.java @@ -0,0 +1,327 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog.service.balancer; + +import static com.google.common.base.Preconditions.checkArgument; + +import com.google.common.base.Optional; +import com.google.common.util.concurrent.RateLimiter; +import com.twitter.common.zookeeper.ServerSet; +import org.apache.distributedlog.client.monitor.MonitorServiceClient; +import org.apache.distributedlog.client.serverset.DLZkServerSet; +import org.apache.distributedlog.impl.BKNamespaceDriver; +import org.apache.distributedlog.service.ClientUtils; +import org.apache.distributedlog.service.DLSocketAddress; +import org.apache.distributedlog.service.DistributedLogClient; +import org.apache.distributedlog.service.DistributedLogClientBuilder; +import org.apache.distributedlog.tools.Tool; +import com.twitter.finagle.builder.ClientBuilder; +import com.twitter.finagle.thrift.ClientId$; +import com.twitter.util.Await; +import com.twitter.util.Duration; +import java.net.InetSocketAddress; +import java.net.URI; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.ParseException; +import org.apache.commons.lang3.tuple.Pair; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Tool to rebalance cluster. + */ +public class BalancerTool extends Tool { + + private static final Logger logger = LoggerFactory.getLogger(BalancerTool.class); + + static DistributedLogClientBuilder createDistributedLogClientBuilder(ServerSet serverSet) { + return DistributedLogClientBuilder.newBuilder() + .name("rebalancer_tool") + .clientId(ClientId$.MODULE$.apply("rebalancer_tool")) + .maxRedirects(2) + .serverSet(serverSet) + .clientBuilder(ClientBuilder.get() + .connectionTimeout(Duration.fromSeconds(2)) + .tcpConnectTimeout(Duration.fromSeconds(2)) + .requestTimeout(Duration.fromSeconds(10)) + .hostConnectionLimit(1) + .hostConnectionCoresize(1) + .keepAlive(true) + .failFast(false)); + } + + /** + * Base Command to run balancer. + */ + protected abstract static class BalancerCommand extends OptsCommand { + + protected Options options = new Options(); + protected int rebalanceWaterMark = 0; + protected double rebalanceTolerancePercentage = 0.0f; + protected int rebalanceConcurrency = 1; + protected Double rate = null; + protected Optional<RateLimiter> rateLimiter; + + BalancerCommand(String name, String description) { + super(name, description); + options.addOption("rwm", "rebalance-water-mark", true, "Rebalance water mark per proxy"); + options.addOption("rtp", "rebalance-tolerance-percentage", true, + "Rebalance tolerance percentage per proxy"); + options.addOption("rc", "rebalance-concurrency", true, "Concurrency to rebalance stream distribution"); + options.addOption("r", "rate", true, "Rebalance rate"); + } + + Optional<RateLimiter> getRateLimiter() { + return rateLimiter; + } + + @Override + protected Options getOptions() { + return options; + } + + protected void parseCommandLine(CommandLine cmdline) throws ParseException { + if (cmdline.hasOption("rwm")) { + this.rebalanceWaterMark = Integer.parseInt(cmdline.getOptionValue("rwm")); + } + if (cmdline.hasOption("rtp")) { + this.rebalanceTolerancePercentage = Double.parseDouble(cmdline.getOptionValue("rtp")); + } + if (cmdline.hasOption("rc")) { + this.rebalanceConcurrency = Integer.parseInt(cmdline.getOptionValue("rc")); + } + if (cmdline.hasOption("r")) { + this.rate = Double.parseDouble(cmdline.getOptionValue("r")); + } + checkArgument(rebalanceWaterMark >= 0, + "Rebalance Water Mark should be a non-negative number"); + checkArgument(rebalanceTolerancePercentage >= 0.0f, + "Rebalance Tolerance Percentage should be a non-negative number"); + checkArgument(rebalanceConcurrency > 0, + "Rebalance Concurrency should be a positive number"); + if (null == rate || rate <= 0.0f) { + rateLimiter = Optional.absent(); + } else { + rateLimiter = Optional.of(RateLimiter.create(rate)); + } + } + + @Override + protected int runCmd(CommandLine cmdline) throws Exception { + try { + parseCommandLine(cmdline); + } catch (ParseException pe) { + println("ERROR: fail to parse commandline : '" + pe.getMessage() + "'"); + printUsage(); + return -1; + } + return executeCommand(cmdline); + } + + protected abstract int executeCommand(CommandLine cmdline) throws Exception; + } + + /** + * Command to balance streams within a cluster. + */ + protected static class ClusterBalancerCommand extends BalancerCommand { + + protected URI uri; + protected String source = null; + + protected ClusterBalancerCommand() { + super("clusterbalancer", "Balance streams inside a cluster"); + options.addOption("u", "uri", true, "DistributedLog URI"); + options.addOption("sp", "source-proxy", true, "Source proxy to balance"); + } + + @Override + protected String getUsage() { + return "clusterbalancer [options]"; + } + + protected void parseCommandLine(CommandLine cmdline) throws ParseException { + super.parseCommandLine(cmdline); + if (!cmdline.hasOption("u")) { + throw new ParseException("No proxy serverset provided."); + } + uri = URI.create(cmdline.getOptionValue("u")); + if (cmdline.hasOption("sp")) { + String sourceProxyStr = cmdline.getOptionValue("sp"); + try { + DLSocketAddress.parseSocketAddress(sourceProxyStr); + } catch (IllegalArgumentException iae) { + throw new ParseException("Invalid source proxy " + sourceProxyStr + " : " + iae.getMessage()); + } + this.source = sourceProxyStr; + } + } + + @Override + protected int executeCommand(CommandLine cmdline) throws Exception { + DLZkServerSet serverSet = DLZkServerSet.of(uri, 60000); + logger.info("Created serverset for {}", uri); + try { + DistributedLogClientBuilder clientBuilder = + createDistributedLogClientBuilder(serverSet.getServerSet()); + ClusterBalancer balancer = new ClusterBalancer(clientBuilder); + try { + return runBalancer(clientBuilder, balancer); + } finally { + balancer.close(); + } + } finally { + serverSet.close(); + } + } + + protected int runBalancer(DistributedLogClientBuilder clientBuilder, + ClusterBalancer balancer) + throws Exception { + if (null == source) { + balancer.balance( + rebalanceWaterMark, + rebalanceTolerancePercentage, + rebalanceConcurrency, + getRateLimiter()); + } else { + balanceFromSource(clientBuilder, balancer, source, getRateLimiter()); + } + return 0; + } + + protected void balanceFromSource(DistributedLogClientBuilder clientBuilder, + ClusterBalancer balancer, + String source, + Optional<RateLimiter> rateLimiter) + throws Exception { + InetSocketAddress sourceAddr = DLSocketAddress.parseSocketAddress(source); + DistributedLogClientBuilder sourceClientBuilder = + DistributedLogClientBuilder.newBuilder(clientBuilder) + .host(sourceAddr); + + Pair<DistributedLogClient, MonitorServiceClient> clientPair = + ClientUtils.buildClient(sourceClientBuilder); + try { + Await.result(clientPair.getRight().setAcceptNewStream(false)); + logger.info("Disable accepting new stream on proxy {}.", source); + balancer.balanceAll(source, rebalanceConcurrency, rateLimiter); + } finally { + clientPair.getLeft().close(); + } + } + } + + /** + * Command to balance streams between regions. + */ + protected static class RegionBalancerCommand extends BalancerCommand { + + protected URI region1; + protected URI region2; + protected String source = null; + + protected RegionBalancerCommand() { + super("regionbalancer", "Balance streams between regions"); + options.addOption("rs", "regions", true, "DistributedLog Region URI: uri1[,uri2]"); + options.addOption("s", "source", true, "DistributedLog Source Region to balance"); + } + + @Override + protected String getUsage() { + return "regionbalancer [options]"; + } + + @Override + protected void parseCommandLine(CommandLine cmdline) throws ParseException { + super.parseCommandLine(cmdline); + if (!cmdline.hasOption("rs")) { + throw new ParseException("No regions provided."); + } + String regionsStr = cmdline.getOptionValue("rs"); + String[] regions = regionsStr.split(","); + if (regions.length != 2) { + throw new ParseException("Invalid regions provided. Expected : serverset1[,serverset2]"); + } + region1 = URI.create(regions[0]); + region2 = URI.create(regions[1]); + if (cmdline.hasOption("s")) { + source = cmdline.getOptionValue("s"); + } + } + + @Override + protected int executeCommand(CommandLine cmdline) throws Exception { + DLZkServerSet serverSet1 = DLZkServerSet.of(region1, 60000); + logger.info("Created serverset for {}", region1); + DLZkServerSet serverSet2 = DLZkServerSet.of(region2, 60000); + logger.info("Created serverset for {}", region2); + try { + DistributedLogClientBuilder builder1 = + createDistributedLogClientBuilder(serverSet1.getServerSet()); + Pair<DistributedLogClient, MonitorServiceClient> pair1 = + ClientUtils.buildClient(builder1); + DistributedLogClientBuilder builder2 = + createDistributedLogClientBuilder(serverSet2.getServerSet()); + Pair<DistributedLogClient, MonitorServiceClient> pair2 = + ClientUtils.buildClient(builder2); + try { + SimpleBalancer balancer = new SimpleBalancer( + BKNamespaceDriver.getZKServersFromDLUri(region1), pair1.getLeft(), pair1.getRight(), + BKNamespaceDriver.getZKServersFromDLUri(region2), pair2.getLeft(), pair2.getRight()); + try { + return runBalancer(balancer); + } finally { + balancer.close(); + } + } finally { + pair1.getLeft().close(); + pair2.getLeft().close(); + } + } finally { + serverSet1.close(); + serverSet2.close(); + } + } + + protected int runBalancer(SimpleBalancer balancer) throws Exception { + if (null == source) { + balancer.balance( + rebalanceWaterMark, + rebalanceTolerancePercentage, + rebalanceConcurrency, + getRateLimiter()); + } else { + balancer.balanceAll(source, rebalanceConcurrency, getRateLimiter()); + } + return 0; + } + } + + public BalancerTool() { + super(); + addCommand(new ClusterBalancerCommand()); + addCommand(new RegionBalancerCommand()); + } + + @Override + protected String getName() { + return "balancer"; + } +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/balancer/BalancerUtils.java ---------------------------------------------------------------------- diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/balancer/BalancerUtils.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/balancer/BalancerUtils.java new file mode 100644 index 0000000..4c9e075 --- /dev/null +++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/balancer/BalancerUtils.java @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog.service.balancer; + +import java.util.Map; + +/** + * Utils for balancer. + */ +public class BalancerUtils { + + /** + * Util function to calculate how many streams to balance for <i>nodeToRebalance</i>, + * based on the load distribution <i>loadDistribution</i>. + * + * @param nodeToRebalance + * node to rebalance + * @param loadDistribution + * load distribution map + * @param rebalanceWaterMark + * if number of streams of <i>nodeToRebalance</i> + * is less than <i>rebalanceWaterMark</i>, no streams will be re-balanced. + * @param tolerancePercentage + * tolerance percentage for the balancer. if number of streams of <i>nodeToRebalance</i> + * is less than average + average * <i>tolerancePercentage</i> / 100.0, no streams will + * be re-balanced. + * @param <K> + * @return number of streams to rebalance + */ + public static <K> int calculateNumStreamsToRebalance(K nodeToRebalance, + Map<K, Integer> loadDistribution, + int rebalanceWaterMark, + double tolerancePercentage) { + Integer myLoad = loadDistribution.get(nodeToRebalance); + if (null == myLoad || myLoad <= rebalanceWaterMark) { + return 0; + } + + long totalLoad = 0L; + int numNodes = loadDistribution.size(); + + for (Map.Entry<K, Integer> entry : loadDistribution.entrySet()) { + if (null == entry.getKey() || null == entry.getValue()) { + continue; + } + totalLoad += entry.getValue(); + } + + double averageLoad = ((double) totalLoad) / numNodes; + long permissibleLoad = + Math.max(1L, (long) Math.ceil(averageLoad + averageLoad * tolerancePercentage / 100.0f)); + + if (myLoad <= permissibleLoad) { + return 0; + } + + return Math.max(0, myLoad - (int) Math.ceil(averageLoad)); + } +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/balancer/ClusterBalancer.java ---------------------------------------------------------------------- diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/balancer/ClusterBalancer.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/balancer/ClusterBalancer.java new file mode 100644 index 0000000..5add339 --- /dev/null +++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/balancer/ClusterBalancer.java @@ -0,0 +1,378 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog.service.balancer; + +import com.google.common.base.Optional; +import com.google.common.util.concurrent.RateLimiter; +import org.apache.distributedlog.client.monitor.MonitorServiceClient; +import org.apache.distributedlog.service.ClientUtils; +import org.apache.distributedlog.service.DLSocketAddress; +import org.apache.distributedlog.service.DistributedLogClient; +import org.apache.distributedlog.service.DistributedLogClientBuilder; +import com.twitter.util.Await; +import com.twitter.util.Function; +import com.twitter.util.Future; +import com.twitter.util.FutureEventListener; +import java.io.Serializable; +import java.net.SocketAddress; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.commons.lang3.tuple.Pair; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A balancer balances ownerships with a cluster of targets. + */ +public class ClusterBalancer implements Balancer { + + private static final Logger logger = LoggerFactory.getLogger(ClusterBalancer.class); + + /** + * Represent a single host. Ordered by number of streams in desc order. + */ + static class Host { + + final SocketAddress address; + final Set<String> streams; + final DistributedLogClientBuilder clientBuilder; + DistributedLogClient client = null; + MonitorServiceClient monitor = null; + + Host(SocketAddress address, Set<String> streams, + DistributedLogClientBuilder clientBuilder) { + this.address = address; + this.streams = streams; + this.clientBuilder = clientBuilder; + } + + private void initializeClientsIfNeeded() { + if (null == client) { + Pair<DistributedLogClient, MonitorServiceClient> clientPair = + createDistributedLogClient(address, clientBuilder); + client = clientPair.getLeft(); + monitor = clientPair.getRight(); + } + } + + synchronized DistributedLogClient getClient() { + initializeClientsIfNeeded(); + return client; + } + + synchronized MonitorServiceClient getMonitor() { + initializeClientsIfNeeded(); + return monitor; + } + + synchronized void close() { + if (null != client) { + client.close(); + } + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("Host(").append(address).append(")"); + return sb.toString(); + } + } + + static class HostComparator implements Comparator<Host>, Serializable { + private static final long serialVersionUID = 7984973796525102538L; + + @Override + public int compare(Host h1, Host h2) { + return h2.streams.size() - h1.streams.size(); + } + } + + protected final DistributedLogClientBuilder clientBuilder; + protected final DistributedLogClient client; + protected final MonitorServiceClient monitor; + + public ClusterBalancer(DistributedLogClientBuilder clientBuilder) { + this(clientBuilder, ClientUtils.buildClient(clientBuilder)); + } + + ClusterBalancer(DistributedLogClientBuilder clientBuilder, + Pair<DistributedLogClient, MonitorServiceClient> clientPair) { + this.clientBuilder = clientBuilder; + this.client = clientPair.getLeft(); + this.monitor = clientPair.getRight(); + } + + /** + * Build a new distributedlog client to a single host <i>host</i>. + * + * @param host + * host to access + * @return distributedlog clients + */ + static Pair<DistributedLogClient, MonitorServiceClient> createDistributedLogClient( + SocketAddress host, DistributedLogClientBuilder clientBuilder) { + DistributedLogClientBuilder newBuilder = + DistributedLogClientBuilder.newBuilder(clientBuilder).host(host); + return ClientUtils.buildClient(newBuilder); + } + + @Override + public void balanceAll(String source, + int rebalanceConcurrency, /* unused */ + Optional<RateLimiter> rebalanceRateLimiter) { + balance(0, 0.0f, rebalanceConcurrency, Optional.of(source), rebalanceRateLimiter); + } + + @Override + public void balance(int rebalanceWaterMark, + double rebalanceTolerancePercentage, + int rebalanceConcurrency, /* unused */ + Optional<RateLimiter> rebalanceRateLimiter) { + Optional<String> source = Optional.absent(); + balance(rebalanceWaterMark, rebalanceTolerancePercentage, rebalanceConcurrency, source, rebalanceRateLimiter); + } + + public void balance(int rebalanceWaterMark, + double rebalanceTolerancePercentage, + int rebalanceConcurrency, + Optional<String> source, + Optional<RateLimiter> rebalanceRateLimiter) { + Map<SocketAddress, Set<String>> distribution = monitor.getStreamOwnershipDistribution(); + if (distribution.size() <= 1) { + return; + } + SocketAddress sourceAddr = null; + if (source.isPresent()) { + sourceAddr = DLSocketAddress.parseSocketAddress(source.get()); + logger.info("Balancer source is {}", sourceAddr); + if (!distribution.containsKey(sourceAddr)) { + return; + } + } + // Get the list of hosts ordered by number of streams in DESC order + List<Host> hosts = new ArrayList<Host>(distribution.size()); + for (Map.Entry<SocketAddress, Set<String>> entry : distribution.entrySet()) { + Host host = new Host(entry.getKey(), entry.getValue(), clientBuilder); + hosts.add(host); + } + Collections.sort(hosts, new HostComparator()); + try { + + // find the host to move streams from. + int hostIdxMoveFrom = -1; + if (null != sourceAddr) { + for (Host host : hosts) { + ++hostIdxMoveFrom; + if (sourceAddr.equals(host.address)) { + break; + } + } + } + + // compute the average load. + int totalStream = 0; + for (Host host : hosts) { + totalStream += host.streams.size(); + } + double averageLoad; + if (hostIdxMoveFrom >= 0) { + averageLoad = ((double) totalStream / (hosts.size() - 1)); + } else { + averageLoad = ((double) totalStream / hosts.size()); + } + + int moveFromLowWaterMark; + int moveToHighWaterMark = + Math.max(1, (int) (averageLoad + averageLoad * rebalanceTolerancePercentage / 100.0f)); + + if (hostIdxMoveFrom >= 0) { + moveFromLowWaterMark = Math.max(0, rebalanceWaterMark); + moveStreams( + hosts, + new AtomicInteger(hostIdxMoveFrom), moveFromLowWaterMark, + new AtomicInteger(hosts.size() - 1), moveToHighWaterMark, + rebalanceRateLimiter); + moveRemainingStreamsFromSource(hosts.get(hostIdxMoveFrom), hosts, rebalanceRateLimiter); + } else { + moveFromLowWaterMark = Math.max((int) Math.ceil(averageLoad), rebalanceWaterMark); + AtomicInteger moveFrom = new AtomicInteger(0); + AtomicInteger moveTo = new AtomicInteger(hosts.size() - 1); + while (moveFrom.get() < moveTo.get()) { + moveStreams(hosts, moveFrom, moveFromLowWaterMark, + moveTo, moveToHighWaterMark, rebalanceRateLimiter); + moveFrom.incrementAndGet(); + } + } + } finally { + for (Host host : hosts) { + host.close(); + } + } + } + + void moveStreams(List<Host> hosts, + AtomicInteger hostIdxMoveFrom, + int moveFromLowWaterMark, + AtomicInteger hostIdxMoveTo, + int moveToHighWaterMark, + Optional<RateLimiter> rateLimiter) { + if (hostIdxMoveFrom.get() < 0 || hostIdxMoveFrom.get() >= hosts.size() + || hostIdxMoveTo.get() < 0 || hostIdxMoveTo.get() >= hosts.size() + || hostIdxMoveFrom.get() >= hostIdxMoveTo.get()) { + return; + } + + if (logger.isDebugEnabled()) { + logger.debug("Moving streams : hosts = {}, from = {}, to = {} :" + + " from_low_water_mark = {}, to_high_water_mark = {}", + new Object[] { + hosts, + hostIdxMoveFrom.get(), + hostIdxMoveTo.get(), + moveFromLowWaterMark, + moveToHighWaterMark }); + } + + Host hostMoveFrom = hosts.get(hostIdxMoveFrom.get()); + int numStreamsOnFromHost = hostMoveFrom.streams.size(); + if (numStreamsOnFromHost <= moveFromLowWaterMark) { + // do nothing + return; + } + + int numStreamsToMove = numStreamsOnFromHost - moveFromLowWaterMark; + LinkedList<String> streamsToMove = new LinkedList<String>(hostMoveFrom.streams); + Collections.shuffle(streamsToMove); + + if (logger.isDebugEnabled()) { + logger.debug("Try to move {} streams from host {} : streams = {}", + new Object[] { numStreamsToMove, hostMoveFrom.address, streamsToMove }); + } + + while (numStreamsToMove-- > 0 && !streamsToMove.isEmpty()) { + if (rateLimiter.isPresent()) { + rateLimiter.get().acquire(); + } + + // pick a host to move + Host hostMoveTo = hosts.get(hostIdxMoveTo.get()); + while (hostMoveTo.streams.size() >= moveToHighWaterMark) { + int hostIdx = hostIdxMoveTo.decrementAndGet(); + logger.info("move to host : {}, from {}", hostIdx, hostIdxMoveFrom.get()); + if (hostIdx <= hostIdxMoveFrom.get()) { + return; + } else { + hostMoveTo = hosts.get(hostIdx); + if (logger.isDebugEnabled()) { + logger.debug("Target host to move moved to host {} @ {}", + hostIdx, hostMoveTo); + } + } + } + + // pick a stream + String stream = streamsToMove.remove(); + + // move the stream + if (moveStream(stream, hostMoveFrom, hostMoveTo)) { + hostMoveFrom.streams.remove(stream); + hostMoveTo.streams.add(stream); + } + } + + } + + void moveRemainingStreamsFromSource(Host source, + List<Host> hosts, + Optional<RateLimiter> rateLimiter) { + LinkedList<String> streamsToMove = new LinkedList<String>(source.streams); + Collections.shuffle(streamsToMove); + + if (logger.isDebugEnabled()) { + logger.debug("Try to move remaining streams from {} : {}", source, streamsToMove); + } + + int hostIdx = hosts.size() - 1; + + while (!streamsToMove.isEmpty()) { + if (rateLimiter.isPresent()) { + rateLimiter.get().acquire(); + } + + Host target = hosts.get(hostIdx); + if (!target.address.equals(source.address)) { + String stream = streamsToMove.remove(); + // move the stream + if (moveStream(stream, source, target)) { + source.streams.remove(stream); + target.streams.add(stream); + } + } + --hostIdx; + if (hostIdx < 0) { + hostIdx = hosts.size() - 1; + } + } + } + + private boolean moveStream(String stream, Host from, Host to) { + try { + doMoveStream(stream, from, to); + return true; + } catch (Exception e) { + return false; + } + } + + private void doMoveStream(final String stream, final Host from, final Host to) throws Exception { + logger.info("Moving stream {} from {} to {}.", + new Object[] { stream, from.address, to.address }); + Await.result(from.getClient().release(stream).flatMap(new Function<Void, Future<Void>>() { + @Override + public Future<Void> apply(Void result) { + logger.info("Released stream {} from {}.", stream, from.address); + return to.getMonitor().check(stream).addEventListener(new FutureEventListener<Void>() { + + @Override + public void onSuccess(Void value) { + logger.info("Moved stream {} from {} to {}.", + new Object[] { stream, from.address, to.address }); + } + + @Override + public void onFailure(Throwable cause) { + logger.info("Failed to move stream {} from {} to {} : ", + new Object[] { stream, from.address, to.address, cause }); + } + }); + } + })); + } + + @Override + public void close() { + client.close(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/balancer/CountBasedStreamChooser.java ---------------------------------------------------------------------- diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/balancer/CountBasedStreamChooser.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/balancer/CountBasedStreamChooser.java new file mode 100644 index 0000000..6a43179 --- /dev/null +++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/balancer/CountBasedStreamChooser.java @@ -0,0 +1,109 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog.service.balancer; + +import static com.google.common.base.Preconditions.checkArgument; + +import java.io.Serializable; +import java.net.SocketAddress; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.apache.commons.lang3.tuple.Pair; + +/** + * A stream chooser based on number of streams. + */ +class CountBasedStreamChooser implements StreamChooser, Serializable, + Comparator<Pair<SocketAddress, LinkedList<String>>> { + + private static final long serialVersionUID = 4664153397369979203L; + + final List<Pair<SocketAddress, LinkedList<String>>> streamsDistribution; + + // pivot index in the list of hosts. the chooser will remove streams from the hosts before + // pivot, which will reduce their stream counts to make them equal to the stream count of the pivot. + int pivot; + int pivotCount; + + // next index in the list of hosts to choose stream from. + int next; + + CountBasedStreamChooser(Map<SocketAddress, Set<String>> streams) { + checkArgument(streams.size() > 0, "Only support no-empty streams distribution"); + streamsDistribution = new ArrayList<Pair<SocketAddress, LinkedList<String>>>(streams.size()); + for (Map.Entry<SocketAddress, Set<String>> entry : streams.entrySet()) { + LinkedList<String> randomizedStreams = new LinkedList<String>(entry.getValue()); + Collections.shuffle(randomizedStreams); + streamsDistribution.add(Pair.of(entry.getKey(), randomizedStreams)); + } + // sort the hosts by the number of streams in descending order + Collections.sort(streamsDistribution, this); + pivot = 0; + pivotCount = streamsDistribution.get(0).getValue().size(); + findNextPivot(); + next = 0; + } + + private void findNextPivot() { + int prevPivotCount = pivotCount; + while (++pivot < streamsDistribution.size()) { + pivotCount = streamsDistribution.get(pivot).getValue().size(); + if (pivotCount < prevPivotCount) { + return; + } + } + pivot = streamsDistribution.size(); + pivotCount = 0; + } + + @Override + public synchronized String choose() { + // reach the pivot + if (next == pivot) { + if (streamsDistribution.get(next - 1).getRight().size() > pivotCount) { + next = 0; + } else if (pivotCount == 0) { // the streams are empty now + return null; + } else { + findNextPivot(); + next = 0; + } + } + + // get stream count that next host to choose from + LinkedList<String> nextStreams = streamsDistribution.get(next).getRight(); + if (nextStreams.size() == 0) { + return null; + } + + String chosenStream = nextStreams.remove(); + ++next; + return chosenStream; + } + + @Override + public int compare(Pair<SocketAddress, LinkedList<String>> o1, + Pair<SocketAddress, LinkedList<String>> o2) { + return o2.getValue().size() - o1.getValue().size(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/balancer/LimitedStreamChooser.java ---------------------------------------------------------------------- diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/balancer/LimitedStreamChooser.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/balancer/LimitedStreamChooser.java new file mode 100644 index 0000000..4aefc5e --- /dev/null +++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/balancer/LimitedStreamChooser.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog.service.balancer; + +/** + * A stream chooser that can only choose limited number of streams. + */ +public class LimitedStreamChooser implements StreamChooser { + + /** + * Create a limited stream chooser by {@code limit}. + * + * @param underlying the underlying stream chooser. + * @param limit the limit of number of streams to choose. + * @return the limited stream chooser. + */ + public static LimitedStreamChooser of(StreamChooser underlying, int limit) { + return new LimitedStreamChooser(underlying, limit); + } + + final StreamChooser underlying; + int limit; + + LimitedStreamChooser(StreamChooser underlying, int limit) { + this.underlying = underlying; + this.limit = limit; + } + + @Override + public synchronized String choose() { + if (limit <= 0) { + return null; + } + String s = underlying.choose(); + if (s == null) { + limit = 0; + return null; + } + --limit; + return s; + } +}