http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/distributedlog/service/MonitorService.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/org/apache/distributedlog/service/MonitorService.java b/distributedlog-service/src/main/java/org/apache/distributedlog/service/MonitorService.java deleted file mode 100644 index b1e2879..0000000 --- a/distributedlog-service/src/main/java/org/apache/distributedlog/service/MonitorService.java +++ /dev/null @@ -1,469 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.distributedlog.service; - -import static com.google.common.base.Preconditions.checkArgument; - -import com.google.common.base.Optional; -import com.google.common.base.Stopwatch; -import com.google.common.collect.Sets; -import com.google.common.hash.HashFunction; -import com.google.common.hash.Hashing; -import com.twitter.common.zookeeper.ServerSet; -import org.apache.distributedlog.DistributedLogConfiguration; -import org.apache.distributedlog.DistributedLogConstants; -import org.apache.distributedlog.DistributedLogManager; -import org.apache.distributedlog.LogSegmentMetadata; -import org.apache.distributedlog.callback.LogSegmentListener; -import org.apache.distributedlog.callback.NamespaceListener; -import org.apache.distributedlog.client.monitor.MonitorServiceClient; -import org.apache.distributedlog.client.serverset.DLZkServerSet; -import org.apache.distributedlog.namespace.DistributedLogNamespace; -import org.apache.distributedlog.namespace.DistributedLogNamespaceBuilder; -import com.twitter.finagle.builder.ClientBuilder; -import com.twitter.finagle.stats.Stat; -import com.twitter.finagle.stats.StatsReceiver; -import com.twitter.finagle.thrift.ClientId$; -import com.twitter.util.Duration; -import com.twitter.util.FutureEventListener; -import java.io.File; -import java.io.IOException; -import java.net.MalformedURLException; -import java.net.URI; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.Executors; -import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; -import org.apache.bookkeeper.stats.Gauge; -import org.apache.bookkeeper.stats.StatsProvider; -import org.apache.commons.configuration.ConfigurationException; -import org.apache.commons.lang.StringUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Monitor Service. - */ -public class MonitorService implements NamespaceListener { - - private static final Logger logger = LoggerFactory.getLogger(MonitorService.class); - - private DistributedLogNamespace dlNamespace = null; - private MonitorServiceClient dlClient = null; - private DLZkServerSet[] zkServerSets = null; - private final ScheduledExecutorService executorService = - Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors()); - private final CountDownLatch keepAliveLatch = new CountDownLatch(1); - private final Map<String, StreamChecker> knownStreams = new HashMap<String, StreamChecker>(); - - // Settings - private int regionId = DistributedLogConstants.LOCAL_REGION_ID; - private int interval = 100; - private String streamRegex = null; - private boolean watchNamespaceChanges = false; - private boolean handshakeWithClientInfo = false; - private int heartbeatEveryChecks = 0; - private int instanceId = -1; - private int totalInstances = -1; - private boolean isThriftMux = false; - - // Options - private final Optional<String> uriArg; - private final Optional<String> confFileArg; - private final Optional<String> serverSetArg; - private final Optional<Integer> intervalArg; - private final Optional<Integer> regionIdArg; - private final Optional<String> streamRegexArg; - private final Optional<Integer> instanceIdArg; - private final Optional<Integer> totalInstancesArg; - private final Optional<Integer> heartbeatEveryChecksArg; - private final Optional<Boolean> handshakeWithClientInfoArg; - private final Optional<Boolean> watchNamespaceChangesArg; - private final Optional<Boolean> isThriftMuxArg; - - // Stats - private final StatsProvider statsProvider; - private final StatsReceiver statsReceiver; - private final StatsReceiver monitorReceiver; - private final Stat successStat; - private final Stat failureStat; - private final Gauge<Number> numOfStreamsGauge; - // Hash Function - private final HashFunction hashFunction = Hashing.md5(); - - class StreamChecker implements Runnable, FutureEventListener<Void>, LogSegmentListener { - private final String name; - private volatile boolean closed = false; - private volatile boolean checking = false; - private final Stopwatch stopwatch = Stopwatch.createUnstarted(); - private DistributedLogManager dlm = null; - private int numChecks = 0; - - StreamChecker(String name) { - this.name = name; - } - - @Override - public void run() { - if (null == dlm) { - try { - dlm = dlNamespace.openLog(name); - dlm.registerListener(this); - } catch (IOException e) { - if (null != dlm) { - try { - dlm.close(); - } catch (IOException e1) { - logger.error("Failed to close dlm for {} : ", name, e1); - } - dlm = null; - } - executorService.schedule(this, interval, TimeUnit.MILLISECONDS); - } - } else { - stopwatch.reset().start(); - boolean sendHeartBeat; - if (heartbeatEveryChecks > 0) { - synchronized (this) { - ++numChecks; - if (numChecks >= Integer.MAX_VALUE) { - numChecks = 0; - } - sendHeartBeat = (numChecks % heartbeatEveryChecks) == 0; - } - } else { - sendHeartBeat = false; - } - if (sendHeartBeat) { - dlClient.heartbeat(name).addEventListener(this); - } else { - dlClient.check(name).addEventListener(this); - } - } - } - - @Override - public void onSegmentsUpdated(List<LogSegmentMetadata> segments) { - if (segments.size() > 0 && segments.get(0).getRegionId() == regionId) { - if (!checking) { - logger.info("Start checking stream {}.", name); - checking = true; - run(); - } - } else { - if (checking) { - logger.info("Stop checking stream {}.", name); - } - } - } - - @Override - public void onLogStreamDeleted() { - logger.info("Stream {} is deleted", name); - } - - @Override - public void onSuccess(Void value) { - successStat.add(stopwatch.stop().elapsed(TimeUnit.MICROSECONDS)); - scheduleCheck(); - } - - @Override - public void onFailure(Throwable cause) { - failureStat.add(stopwatch.stop().elapsed(TimeUnit.MICROSECONDS)); - scheduleCheck(); - } - - private void scheduleCheck() { - if (closed) { - return; - } - if (!checking) { - return; - } - try { - executorService.schedule(this, interval, TimeUnit.MILLISECONDS); - } catch (RejectedExecutionException ree) { - logger.error("Failed to schedule checking stream {} in {} ms : ", - new Object[] { name, interval, ree }); - } - } - - private void close() { - closed = true; - if (null != dlm) { - try { - dlm.close(); - } catch (IOException e) { - logger.error("Failed to close dlm for {} : ", name, e); - } - } - } - } - - MonitorService(Optional<String> uriArg, - Optional<String> confFileArg, - Optional<String> serverSetArg, - Optional<Integer> intervalArg, - Optional<Integer> regionIdArg, - Optional<String> streamRegexArg, - Optional<Integer> instanceIdArg, - Optional<Integer> totalInstancesArg, - Optional<Integer> heartbeatEveryChecksArg, - Optional<Boolean> handshakeWithClientInfoArg, - Optional<Boolean> watchNamespaceChangesArg, - Optional<Boolean> isThriftMuxArg, - StatsReceiver statsReceiver, - StatsProvider statsProvider) { - // options - this.uriArg = uriArg; - this.confFileArg = confFileArg; - this.serverSetArg = serverSetArg; - this.intervalArg = intervalArg; - this.regionIdArg = regionIdArg; - this.streamRegexArg = streamRegexArg; - this.instanceIdArg = instanceIdArg; - this.totalInstancesArg = totalInstancesArg; - this.heartbeatEveryChecksArg = heartbeatEveryChecksArg; - this.handshakeWithClientInfoArg = handshakeWithClientInfoArg; - this.watchNamespaceChangesArg = watchNamespaceChangesArg; - this.isThriftMuxArg = isThriftMuxArg; - - // Stats - this.statsReceiver = statsReceiver; - this.monitorReceiver = statsReceiver.scope("monitor"); - this.successStat = monitorReceiver.stat0("success"); - this.failureStat = monitorReceiver.stat0("failure"); - this.statsProvider = statsProvider; - this.numOfStreamsGauge = new Gauge<Number>() { - @Override - public Number getDefaultValue() { - return 0; - } - - @Override - public Number getSample() { - return knownStreams.size(); - } - }; - } - - public void runServer() throws IllegalArgumentException, IOException { - checkArgument(uriArg.isPresent(), - "No distributedlog uri provided."); - checkArgument(serverSetArg.isPresent(), - "No proxy server set provided."); - if (intervalArg.isPresent()) { - interval = intervalArg.get(); - } - if (regionIdArg.isPresent()) { - regionId = regionIdArg.get(); - } - if (streamRegexArg.isPresent()) { - streamRegex = streamRegexArg.get(); - } - if (instanceIdArg.isPresent()) { - instanceId = instanceIdArg.get(); - } - if (totalInstancesArg.isPresent()) { - totalInstances = totalInstancesArg.get(); - } - if (heartbeatEveryChecksArg.isPresent()) { - heartbeatEveryChecks = heartbeatEveryChecksArg.get(); - } - if (instanceId < 0 || totalInstances <= 0 || instanceId >= totalInstances) { - throw new IllegalArgumentException("Invalid instance id or total instances number."); - } - handshakeWithClientInfo = handshakeWithClientInfoArg.isPresent(); - watchNamespaceChanges = watchNamespaceChangesArg.isPresent(); - isThriftMux = isThriftMuxArg.isPresent(); - URI uri = URI.create(uriArg.get()); - DistributedLogConfiguration dlConf = new DistributedLogConfiguration(); - if (confFileArg.isPresent()) { - String configFile = confFileArg.get(); - try { - dlConf.loadConf(new File(configFile).toURI().toURL()); - } catch (ConfigurationException e) { - throw new IOException("Failed to load distributedlog configuration from " + configFile + "."); - } catch (MalformedURLException e) { - throw new IOException("Failed to load distributedlog configuration from malformed " - + configFile + "."); - } - } - logger.info("Starting stats provider : {}.", statsProvider.getClass()); - statsProvider.start(dlConf); - String[] serverSetPaths = StringUtils.split(serverSetArg.get(), ","); - if (serverSetPaths.length == 0) { - throw new IllegalArgumentException("Invalid serverset paths provided : " + serverSetArg.get()); - } - - ServerSet[] serverSets = createServerSets(serverSetPaths); - ServerSet local = serverSets[0]; - ServerSet[] remotes = new ServerSet[serverSets.length - 1]; - System.arraycopy(serverSets, 1, remotes, 0, remotes.length); - - ClientBuilder finagleClientBuilder = ClientBuilder.get() - .connectTimeout(Duration.fromSeconds(1)) - .tcpConnectTimeout(Duration.fromSeconds(1)) - .requestTimeout(Duration.fromSeconds(2)) - .keepAlive(true) - .failFast(false); - - if (!isThriftMux) { - finagleClientBuilder = finagleClientBuilder - .hostConnectionLimit(2) - .hostConnectionCoresize(2); - } - - dlClient = DistributedLogClientBuilder.newBuilder() - .name("monitor") - .thriftmux(isThriftMux) - .clientId(ClientId$.MODULE$.apply("monitor")) - .redirectBackoffMaxMs(50) - .redirectBackoffStartMs(100) - .requestTimeoutMs(2000) - .maxRedirects(2) - .serverSets(local, remotes) - .streamNameRegex(streamRegex) - .handshakeWithClientInfo(handshakeWithClientInfo) - .clientBuilder(finagleClientBuilder) - .statsReceiver(monitorReceiver.scope("client")) - .buildMonitorClient(); - runMonitor(dlConf, uri); - } - - ServerSet[] createServerSets(String[] serverSetPaths) { - ServerSet[] serverSets = new ServerSet[serverSetPaths.length]; - zkServerSets = new DLZkServerSet[serverSetPaths.length]; - for (int i = 0; i < serverSetPaths.length; i++) { - String serverSetPath = serverSetPaths[i]; - zkServerSets[i] = parseServerSet(serverSetPath); - serverSets[i] = zkServerSets[i].getServerSet(); - } - return serverSets; - } - - protected DLZkServerSet parseServerSet(String serverSetPath) { - return DLZkServerSet.of(URI.create(serverSetPath), 60000); - } - - @Override - public void onStreamsChanged(Iterator<String> streams) { - Set<String> newSet = new HashSet<String>(); - while (streams.hasNext()) { - String s = streams.next(); - if (null == streamRegex || s.matches(streamRegex)) { - if (Math.abs(hashFunction.hashUnencodedChars(s).asInt()) % totalInstances == instanceId) { - newSet.add(s); - } - } - } - List<StreamChecker> tasksToCancel = new ArrayList<StreamChecker>(); - synchronized (knownStreams) { - Set<String> knownStreamSet = new HashSet<String>(knownStreams.keySet()); - Set<String> removedStreams = Sets.difference(knownStreamSet, newSet).immutableCopy(); - Set<String> addedStreams = Sets.difference(newSet, knownStreamSet).immutableCopy(); - for (String s : removedStreams) { - StreamChecker task = knownStreams.remove(s); - if (null != task) { - logger.info("Removed stream {}", s); - tasksToCancel.add(task); - } - } - for (String s : addedStreams) { - if (!knownStreams.containsKey(s)) { - logger.info("Added stream {}", s); - StreamChecker sc = new StreamChecker(s); - knownStreams.put(s, sc); - sc.run(); - } - } - } - for (StreamChecker sc : tasksToCancel) { - sc.close(); - } - } - - void runMonitor(DistributedLogConfiguration conf, URI dlUri) throws IOException { - // stats - statsProvider.getStatsLogger("monitor").registerGauge("num_streams", numOfStreamsGauge); - logger.info("Construct dl namespace @ {}", dlUri); - dlNamespace = DistributedLogNamespaceBuilder.newBuilder() - .conf(conf) - .uri(dlUri) - .build(); - if (watchNamespaceChanges) { - dlNamespace.registerNamespaceListener(this); - } else { - onStreamsChanged(dlNamespace.getLogs()); - } - } - - /** - * Close the server. - */ - public void close() { - logger.info("Closing monitor service."); - if (null != dlClient) { - dlClient.close(); - } - if (null != zkServerSets) { - for (DLZkServerSet zkServerSet : zkServerSets) { - zkServerSet.close(); - } - } - if (null != dlNamespace) { - dlNamespace.close(); - } - executorService.shutdown(); - try { - if (!executorService.awaitTermination(1, TimeUnit.MINUTES)) { - executorService.shutdownNow(); - } - } catch (InterruptedException e) { - logger.error("Interrupted on waiting shutting down monitor executor service : ", e); - } - if (null != statsProvider) { - // clean up the gauges - unregisterGauge(); - statsProvider.stop(); - } - keepAliveLatch.countDown(); - logger.info("Closed monitor service."); - } - - public void join() throws InterruptedException { - keepAliveLatch.await(); - } - - /** - * clean up the gauge before we close to help GC. - */ - private void unregisterGauge(){ - statsProvider.getStatsLogger("monitor").unregisterGauge("num_streams", numOfStreamsGauge); - } - -}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/distributedlog/service/MonitorServiceApp.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/org/apache/distributedlog/service/MonitorServiceApp.java b/distributedlog-service/src/main/java/org/apache/distributedlog/service/MonitorServiceApp.java deleted file mode 100644 index 1f45b13..0000000 --- a/distributedlog-service/src/main/java/org/apache/distributedlog/service/MonitorServiceApp.java +++ /dev/null @@ -1,133 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.distributedlog.service; - -import static org.apache.distributedlog.util.CommandLineUtils.getOptionalBooleanArg; -import static org.apache.distributedlog.util.CommandLineUtils.getOptionalIntegerArg; -import static org.apache.distributedlog.util.CommandLineUtils.getOptionalStringArg; - -import com.twitter.finagle.stats.NullStatsReceiver; -import com.twitter.finagle.stats.StatsReceiver; -import java.io.IOException; -import org.apache.bookkeeper.stats.NullStatsProvider; -import org.apache.bookkeeper.stats.StatsProvider; -import org.apache.bookkeeper.util.ReflectionUtils; -import org.apache.commons.cli.BasicParser; -import org.apache.commons.cli.CommandLine; -import org.apache.commons.cli.HelpFormatter; -import org.apache.commons.cli.Options; -import org.apache.commons.cli.ParseException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - - -/** - * The launcher to run monitor service. - */ -public class MonitorServiceApp { - - private static final Logger logger = LoggerFactory.getLogger(MonitorServiceApp.class); - - static final String USAGE = "MonitorService [-u <uri>] [-c <conf>] [-s serverset]"; - - final String[] args; - final Options options = new Options(); - - private MonitorServiceApp(String[] args) { - this.args = args; - // prepare options - options.addOption("u", "uri", true, "DistributedLog URI"); - options.addOption("c", "conf", true, "DistributedLog Configuration File"); - options.addOption("s", "serverset", true, "Proxy Server Set"); - options.addOption("i", "interval", true, "Check interval"); - options.addOption("d", "region", true, "Region ID"); - options.addOption("p", "provider", true, "DistributedLog Stats Provider"); - options.addOption("f", "filter", true, "Filter streams by regex"); - options.addOption("w", "watch", false, "Watch stream changes under a given namespace"); - options.addOption("n", "instance_id", true, "Instance ID"); - options.addOption("t", "total_instances", true, "Total instances"); - options.addOption("hck", "heartbeat-num-checks", true, "Send a heartbeat after num checks"); - options.addOption("hsci", "handshake-with-client-info", false, "Enable handshaking with client info"); - } - - void printUsage() { - HelpFormatter helpFormatter = new HelpFormatter(); - helpFormatter.printHelp(USAGE, options); - } - - private void run() { - try { - logger.info("Running monitor service."); - BasicParser parser = new BasicParser(); - CommandLine cmdline = parser.parse(options, args); - runCmd(cmdline); - } catch (ParseException pe) { - printUsage(); - Runtime.getRuntime().exit(-1); - } catch (IOException ie) { - logger.error("Failed to start monitor service : ", ie); - Runtime.getRuntime().exit(-1); - } - } - - void runCmd(CommandLine cmdline) throws IOException { - StatsProvider statsProvider = new NullStatsProvider(); - if (cmdline.hasOption("p")) { - String providerClass = cmdline.getOptionValue("p"); - statsProvider = ReflectionUtils.newInstance(providerClass, StatsProvider.class); - } - StatsReceiver statsReceiver = NullStatsReceiver.get(); - - final MonitorService monitorService = new MonitorService( - getOptionalStringArg(cmdline, "u"), - getOptionalStringArg(cmdline, "c"), - getOptionalStringArg(cmdline, "s"), - getOptionalIntegerArg(cmdline, "i"), - getOptionalIntegerArg(cmdline, "d"), - getOptionalStringArg(cmdline, "f"), - getOptionalIntegerArg(cmdline, "n"), - getOptionalIntegerArg(cmdline, "t"), - getOptionalIntegerArg(cmdline, "hck"), - getOptionalBooleanArg(cmdline, "hsci"), - getOptionalBooleanArg(cmdline, "w"), - getOptionalBooleanArg(cmdline, "mx"), - statsReceiver, - statsProvider); - - monitorService.runServer(); - - Runtime.getRuntime().addShutdownHook(new Thread() { - @Override - public void run() { - logger.info("Closing monitor service."); - monitorService.close(); - logger.info("Closed monitor service."); - } - }); - try { - monitorService.join(); - } catch (InterruptedException ie) { - logger.warn("Interrupted when waiting monitor service to be finished : ", ie); - } - } - - public static void main(String[] args) { - final MonitorServiceApp launcher = new MonitorServiceApp(args); - launcher.run(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/distributedlog/service/ResponseUtils.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/org/apache/distributedlog/service/ResponseUtils.java b/distributedlog-service/src/main/java/org/apache/distributedlog/service/ResponseUtils.java deleted file mode 100644 index a2691d3..0000000 --- a/distributedlog-service/src/main/java/org/apache/distributedlog/service/ResponseUtils.java +++ /dev/null @@ -1,86 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.distributedlog.service; - -import org.apache.distributedlog.exceptions.DLException; -import org.apache.distributedlog.exceptions.OwnershipAcquireFailedException; -import org.apache.distributedlog.thrift.service.BulkWriteResponse; -import org.apache.distributedlog.thrift.service.ResponseHeader; -import org.apache.distributedlog.thrift.service.StatusCode; -import org.apache.distributedlog.thrift.service.WriteResponse; - -/** - * Utility methods for building write proxy service responses. - */ -public class ResponseUtils { - public static ResponseHeader deniedHeader() { - return new ResponseHeader(StatusCode.REQUEST_DENIED); - } - - public static ResponseHeader streamUnavailableHeader() { - return new ResponseHeader(StatusCode.STREAM_UNAVAILABLE); - } - - public static ResponseHeader successHeader() { - return new ResponseHeader(StatusCode.SUCCESS); - } - - public static ResponseHeader ownerToHeader(String owner) { - return new ResponseHeader(StatusCode.FOUND).setLocation(owner); - } - - public static ResponseHeader exceptionToHeader(Throwable t) { - ResponseHeader response = new ResponseHeader(); - if (t instanceof DLException) { - DLException dle = (DLException) t; - if (dle instanceof OwnershipAcquireFailedException) { - response.setLocation(((OwnershipAcquireFailedException) dle).getCurrentOwner()); - } - response.setCode(dle.getCode()); - response.setErrMsg(dle.getMessage()); - } else { - response.setCode(StatusCode.INTERNAL_SERVER_ERROR); - response.setErrMsg("Internal server error : " + t.getMessage()); - } - return response; - } - - public static WriteResponse write(ResponseHeader responseHeader) { - return new WriteResponse(responseHeader); - } - - public static WriteResponse writeSuccess() { - return new WriteResponse(successHeader()); - } - - public static WriteResponse writeDenied() { - return new WriteResponse(deniedHeader()); - } - - public static BulkWriteResponse bulkWrite(ResponseHeader responseHeader) { - return new BulkWriteResponse(responseHeader); - } - - public static BulkWriteResponse bulkWriteSuccess() { - return new BulkWriteResponse(successHeader()); - } - - public static BulkWriteResponse bulkWriteDenied() { - return new BulkWriteResponse(deniedHeader()); - } -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/distributedlog/service/ServerFeatureKeys.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/org/apache/distributedlog/service/ServerFeatureKeys.java b/distributedlog-service/src/main/java/org/apache/distributedlog/service/ServerFeatureKeys.java deleted file mode 100644 index 436145d..0000000 --- a/distributedlog-service/src/main/java/org/apache/distributedlog/service/ServerFeatureKeys.java +++ /dev/null @@ -1,29 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.distributedlog.service; - -/** - * List of feature keys used by distributedlog server. - */ -public enum ServerFeatureKeys { - - REGION_STOP_ACCEPT_NEW_STREAM, - SERVICE_RATE_LIMIT_DISABLED, - SERVICE_CHECKSUM_DISABLED, - SERVICE_GLOBAL_LIMITER_DISABLED -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/distributedlog/service/StatsFilter.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/org/apache/distributedlog/service/StatsFilter.java b/distributedlog-service/src/main/java/org/apache/distributedlog/service/StatsFilter.java deleted file mode 100644 index ee64580..0000000 --- a/distributedlog-service/src/main/java/org/apache/distributedlog/service/StatsFilter.java +++ /dev/null @@ -1,60 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.distributedlog.service; - -import com.google.common.base.Stopwatch; -import com.twitter.finagle.Service; -import com.twitter.finagle.SimpleFilter; -import com.twitter.util.Future; -import java.util.concurrent.TimeUnit; -import org.apache.bookkeeper.stats.Counter; -import org.apache.bookkeeper.stats.OpStatsLogger; -import org.apache.bookkeeper.stats.StatsLogger; - -/** - * Track distributedlog server finagle-service stats. - */ -class StatsFilter<Req, Rep> extends SimpleFilter<Req, Rep> { - - private final StatsLogger stats; - private final Counter outstandingAsync; - private final OpStatsLogger serviceExec; - - @Override - public Future<Rep> apply(Req req, Service<Req, Rep> service) { - Future<Rep> result = null; - outstandingAsync.inc(); - final Stopwatch stopwatch = Stopwatch.createStarted(); - try { - result = service.apply(req); - serviceExec.registerSuccessfulEvent(stopwatch.stop().elapsed(TimeUnit.MICROSECONDS)); - } finally { - outstandingAsync.dec(); - if (null == result) { - serviceExec.registerFailedEvent(stopwatch.stop().elapsed(TimeUnit.MICROSECONDS)); - } - } - return result; - } - - public StatsFilter(StatsLogger stats) { - this.stats = stats; - this.outstandingAsync = stats.getCounter("outstandingAsync"); - this.serviceExec = stats.getOpStatsLogger("serviceExec"); - } -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/distributedlog/service/announcer/Announcer.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/org/apache/distributedlog/service/announcer/Announcer.java b/distributedlog-service/src/main/java/org/apache/distributedlog/service/announcer/Announcer.java deleted file mode 100644 index ee64fc7..0000000 --- a/distributedlog-service/src/main/java/org/apache/distributedlog/service/announcer/Announcer.java +++ /dev/null @@ -1,41 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.distributedlog.service.announcer; - -import java.io.IOException; - -/** - * Announce service information. - */ -public interface Announcer { - - /** - * Announce service info. - */ - void announce() throws IOException; - - /** - * Unannounce the service info. - */ - void unannounce() throws IOException; - - /** - * Close the announcer. - */ - void close(); -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/distributedlog/service/announcer/NOPAnnouncer.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/org/apache/distributedlog/service/announcer/NOPAnnouncer.java b/distributedlog-service/src/main/java/org/apache/distributedlog/service/announcer/NOPAnnouncer.java deleted file mode 100644 index 5a1277a..0000000 --- a/distributedlog-service/src/main/java/org/apache/distributedlog/service/announcer/NOPAnnouncer.java +++ /dev/null @@ -1,40 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.distributedlog.service.announcer; - -import java.io.IOException; - -/** - * A no-op implementation of {@link Announcer}. - */ -public class NOPAnnouncer implements Announcer { - @Override - public void announce() throws IOException { - // nop - } - - @Override - public void unannounce() throws IOException { - // nop - } - - @Override - public void close() { - // nop - } -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/distributedlog/service/announcer/ServerSetAnnouncer.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/org/apache/distributedlog/service/announcer/ServerSetAnnouncer.java b/distributedlog-service/src/main/java/org/apache/distributedlog/service/announcer/ServerSetAnnouncer.java deleted file mode 100644 index df4a8e2..0000000 --- a/distributedlog-service/src/main/java/org/apache/distributedlog/service/announcer/ServerSetAnnouncer.java +++ /dev/null @@ -1,111 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.distributedlog.service.announcer; - -import com.twitter.common.zookeeper.Group; -import com.twitter.common.zookeeper.ServerSet; -import org.apache.distributedlog.client.serverset.DLZkServerSet; -import java.io.IOException; -import java.net.InetAddress; -import java.net.InetSocketAddress; -import java.net.URI; -import java.net.UnknownHostException; -import java.util.HashMap; -import java.util.Map; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * ServerSet based announcer. - */ -public class ServerSetAnnouncer implements Announcer { - - private static final Logger logger = LoggerFactory.getLogger(ServerSetAnnouncer.class); - - final String localAddr; - final InetSocketAddress serviceEndpoint; - final Map<String, InetSocketAddress> additionalEndpoints; - final int shardId; - - // ServerSet - DLZkServerSet zkServerSet; - - // Service Status - ServerSet.EndpointStatus serviceStatus = null; - - /** - * Announce server infos. - * - * @param servicePort - * service port - * @param statsPort - * stats port - * @param shardId - * shard id - */ - public ServerSetAnnouncer(URI uri, - int servicePort, - int statsPort, - int shardId) throws UnknownHostException { - this.shardId = shardId; - this.localAddr = InetAddress.getLocalHost().getHostAddress(); - // service endpoint - this.serviceEndpoint = new InetSocketAddress(localAddr, servicePort); - // stats endpoint - InetSocketAddress statsEndpoint = new InetSocketAddress(localAddr, statsPort); - this.additionalEndpoints = new HashMap<String, InetSocketAddress>(); - this.additionalEndpoints.put("aurora", statsEndpoint); - this.additionalEndpoints.put("stats", statsEndpoint); - this.additionalEndpoints.put("service", serviceEndpoint); - this.additionalEndpoints.put("thrift", serviceEndpoint); - - // Create zookeeper and server set - this.zkServerSet = DLZkServerSet.of(uri, 60000); - } - - @Override - public synchronized void announce() throws IOException { - try { - serviceStatus = - zkServerSet.getServerSet().join(serviceEndpoint, additionalEndpoints, shardId); - } catch (Group.JoinException e) { - throw new IOException("Failed to announce service : ", e); - } catch (InterruptedException e) { - logger.warn("Interrupted on announcing service : ", e); - Thread.currentThread().interrupt(); - } - } - - @Override - public synchronized void unannounce() throws IOException { - if (null == serviceStatus) { - logger.warn("No service to unannounce."); - return; - } - try { - serviceStatus.leave(); - } catch (ServerSet.UpdateException e) { - throw new IOException("Failed to unannounce service : ", e); - } - } - - @Override - public void close() { - zkServerSet.close(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/distributedlog/service/announcer/package-info.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/org/apache/distributedlog/service/announcer/package-info.java b/distributedlog-service/src/main/java/org/apache/distributedlog/service/announcer/package-info.java deleted file mode 100644 index 6559bb3..0000000 --- a/distributedlog-service/src/main/java/org/apache/distributedlog/service/announcer/package-info.java +++ /dev/null @@ -1,21 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -/** - * Announcers to announce servers to server set. - */ -package org.apache.distributedlog.service.announcer; http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/distributedlog/service/balancer/Balancer.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/org/apache/distributedlog/service/balancer/Balancer.java b/distributedlog-service/src/main/java/org/apache/distributedlog/service/balancer/Balancer.java deleted file mode 100644 index cdffaa3..0000000 --- a/distributedlog-service/src/main/java/org/apache/distributedlog/service/balancer/Balancer.java +++ /dev/null @@ -1,68 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.distributedlog.service.balancer; - -import com.google.common.base.Optional; -import com.google.common.util.concurrent.RateLimiter; - -/** - * Balancer Interface. - * - * <p>A balancer is used for balance the streams across the proxy cluster. - */ -public interface Balancer { - - /** - * Rebalance all the streams from <i>source</i> to others. - * - * @param source - * source target name. - * @param rebalanceConcurrency - * the concurrency to move streams for re-balance. - * @param rebalanceRateLimiter - * the rate limiting to move streams for re-balance. - */ - void balanceAll(String source, - int rebalanceConcurrency, - Optional<RateLimiter> rebalanceRateLimiter); - - /** - * Balance the streams across all targets. - * - * @param rebalanceWaterMark - * rebalance water mark. if number of streams of a given target is less than - * the water mark, no streams will be re-balanced from this target. - * @param rebalanceTolerancePercentage - * tolerance percentage for the balancer. if number of streams of a given target is - * less than average + average * <i>tolerancePercentage</i> / 100.0, no streams will - * be re-balanced from that target. - * @param rebalanceConcurrency - * the concurrency to move streams for re-balance. - * @param rebalanceRateLimiter - * the rate limiting to move streams for re-balance. - */ - void balance(int rebalanceWaterMark, - double rebalanceTolerancePercentage, - int rebalanceConcurrency, - Optional<RateLimiter> rebalanceRateLimiter); - - /** - * Close the balancer. - */ - void close(); -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/distributedlog/service/balancer/BalancerTool.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/org/apache/distributedlog/service/balancer/BalancerTool.java b/distributedlog-service/src/main/java/org/apache/distributedlog/service/balancer/BalancerTool.java deleted file mode 100644 index 964c1cc..0000000 --- a/distributedlog-service/src/main/java/org/apache/distributedlog/service/balancer/BalancerTool.java +++ /dev/null @@ -1,327 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.distributedlog.service.balancer; - -import static com.google.common.base.Preconditions.checkArgument; - -import com.google.common.base.Optional; -import com.google.common.util.concurrent.RateLimiter; -import com.twitter.common.zookeeper.ServerSet; -import org.apache.distributedlog.client.monitor.MonitorServiceClient; -import org.apache.distributedlog.client.serverset.DLZkServerSet; -import org.apache.distributedlog.impl.BKNamespaceDriver; -import org.apache.distributedlog.service.ClientUtils; -import org.apache.distributedlog.service.DLSocketAddress; -import org.apache.distributedlog.service.DistributedLogClient; -import org.apache.distributedlog.service.DistributedLogClientBuilder; -import org.apache.distributedlog.tools.Tool; -import com.twitter.finagle.builder.ClientBuilder; -import com.twitter.finagle.thrift.ClientId$; -import com.twitter.util.Await; -import com.twitter.util.Duration; -import java.net.InetSocketAddress; -import java.net.URI; -import org.apache.commons.cli.CommandLine; -import org.apache.commons.cli.Options; -import org.apache.commons.cli.ParseException; -import org.apache.commons.lang3.tuple.Pair; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Tool to rebalance cluster. - */ -public class BalancerTool extends Tool { - - private static final Logger logger = LoggerFactory.getLogger(BalancerTool.class); - - static DistributedLogClientBuilder createDistributedLogClientBuilder(ServerSet serverSet) { - return DistributedLogClientBuilder.newBuilder() - .name("rebalancer_tool") - .clientId(ClientId$.MODULE$.apply("rebalancer_tool")) - .maxRedirects(2) - .serverSet(serverSet) - .clientBuilder(ClientBuilder.get() - .connectionTimeout(Duration.fromSeconds(2)) - .tcpConnectTimeout(Duration.fromSeconds(2)) - .requestTimeout(Duration.fromSeconds(10)) - .hostConnectionLimit(1) - .hostConnectionCoresize(1) - .keepAlive(true) - .failFast(false)); - } - - /** - * Base Command to run balancer. - */ - protected abstract static class BalancerCommand extends OptsCommand { - - protected Options options = new Options(); - protected int rebalanceWaterMark = 0; - protected double rebalanceTolerancePercentage = 0.0f; - protected int rebalanceConcurrency = 1; - protected Double rate = null; - protected Optional<RateLimiter> rateLimiter; - - BalancerCommand(String name, String description) { - super(name, description); - options.addOption("rwm", "rebalance-water-mark", true, "Rebalance water mark per proxy"); - options.addOption("rtp", "rebalance-tolerance-percentage", true, - "Rebalance tolerance percentage per proxy"); - options.addOption("rc", "rebalance-concurrency", true, "Concurrency to rebalance stream distribution"); - options.addOption("r", "rate", true, "Rebalance rate"); - } - - Optional<RateLimiter> getRateLimiter() { - return rateLimiter; - } - - @Override - protected Options getOptions() { - return options; - } - - protected void parseCommandLine(CommandLine cmdline) throws ParseException { - if (cmdline.hasOption("rwm")) { - this.rebalanceWaterMark = Integer.parseInt(cmdline.getOptionValue("rwm")); - } - if (cmdline.hasOption("rtp")) { - this.rebalanceTolerancePercentage = Double.parseDouble(cmdline.getOptionValue("rtp")); - } - if (cmdline.hasOption("rc")) { - this.rebalanceConcurrency = Integer.parseInt(cmdline.getOptionValue("rc")); - } - if (cmdline.hasOption("r")) { - this.rate = Double.parseDouble(cmdline.getOptionValue("r")); - } - checkArgument(rebalanceWaterMark >= 0, - "Rebalance Water Mark should be a non-negative number"); - checkArgument(rebalanceTolerancePercentage >= 0.0f, - "Rebalance Tolerance Percentage should be a non-negative number"); - checkArgument(rebalanceConcurrency > 0, - "Rebalance Concurrency should be a positive number"); - if (null == rate || rate <= 0.0f) { - rateLimiter = Optional.absent(); - } else { - rateLimiter = Optional.of(RateLimiter.create(rate)); - } - } - - @Override - protected int runCmd(CommandLine cmdline) throws Exception { - try { - parseCommandLine(cmdline); - } catch (ParseException pe) { - println("ERROR: fail to parse commandline : '" + pe.getMessage() + "'"); - printUsage(); - return -1; - } - return executeCommand(cmdline); - } - - protected abstract int executeCommand(CommandLine cmdline) throws Exception; - } - - /** - * Command to balance streams within a cluster. - */ - protected static class ClusterBalancerCommand extends BalancerCommand { - - protected URI uri; - protected String source = null; - - protected ClusterBalancerCommand() { - super("clusterbalancer", "Balance streams inside a cluster"); - options.addOption("u", "uri", true, "DistributedLog URI"); - options.addOption("sp", "source-proxy", true, "Source proxy to balance"); - } - - @Override - protected String getUsage() { - return "clusterbalancer [options]"; - } - - protected void parseCommandLine(CommandLine cmdline) throws ParseException { - super.parseCommandLine(cmdline); - if (!cmdline.hasOption("u")) { - throw new ParseException("No proxy serverset provided."); - } - uri = URI.create(cmdline.getOptionValue("u")); - if (cmdline.hasOption("sp")) { - String sourceProxyStr = cmdline.getOptionValue("sp"); - try { - DLSocketAddress.parseSocketAddress(sourceProxyStr); - } catch (IllegalArgumentException iae) { - throw new ParseException("Invalid source proxy " + sourceProxyStr + " : " + iae.getMessage()); - } - this.source = sourceProxyStr; - } - } - - @Override - protected int executeCommand(CommandLine cmdline) throws Exception { - DLZkServerSet serverSet = DLZkServerSet.of(uri, 60000); - logger.info("Created serverset for {}", uri); - try { - DistributedLogClientBuilder clientBuilder = - createDistributedLogClientBuilder(serverSet.getServerSet()); - ClusterBalancer balancer = new ClusterBalancer(clientBuilder); - try { - return runBalancer(clientBuilder, balancer); - } finally { - balancer.close(); - } - } finally { - serverSet.close(); - } - } - - protected int runBalancer(DistributedLogClientBuilder clientBuilder, - ClusterBalancer balancer) - throws Exception { - if (null == source) { - balancer.balance( - rebalanceWaterMark, - rebalanceTolerancePercentage, - rebalanceConcurrency, - getRateLimiter()); - } else { - balanceFromSource(clientBuilder, balancer, source, getRateLimiter()); - } - return 0; - } - - protected void balanceFromSource(DistributedLogClientBuilder clientBuilder, - ClusterBalancer balancer, - String source, - Optional<RateLimiter> rateLimiter) - throws Exception { - InetSocketAddress sourceAddr = DLSocketAddress.parseSocketAddress(source); - DistributedLogClientBuilder sourceClientBuilder = - DistributedLogClientBuilder.newBuilder(clientBuilder) - .host(sourceAddr); - - Pair<DistributedLogClient, MonitorServiceClient> clientPair = - ClientUtils.buildClient(sourceClientBuilder); - try { - Await.result(clientPair.getRight().setAcceptNewStream(false)); - logger.info("Disable accepting new stream on proxy {}.", source); - balancer.balanceAll(source, rebalanceConcurrency, rateLimiter); - } finally { - clientPair.getLeft().close(); - } - } - } - - /** - * Command to balance streams between regions. - */ - protected static class RegionBalancerCommand extends BalancerCommand { - - protected URI region1; - protected URI region2; - protected String source = null; - - protected RegionBalancerCommand() { - super("regionbalancer", "Balance streams between regions"); - options.addOption("rs", "regions", true, "DistributedLog Region URI: uri1[,uri2]"); - options.addOption("s", "source", true, "DistributedLog Source Region to balance"); - } - - @Override - protected String getUsage() { - return "regionbalancer [options]"; - } - - @Override - protected void parseCommandLine(CommandLine cmdline) throws ParseException { - super.parseCommandLine(cmdline); - if (!cmdline.hasOption("rs")) { - throw new ParseException("No regions provided."); - } - String regionsStr = cmdline.getOptionValue("rs"); - String[] regions = regionsStr.split(","); - if (regions.length != 2) { - throw new ParseException("Invalid regions provided. Expected : serverset1[,serverset2]"); - } - region1 = URI.create(regions[0]); - region2 = URI.create(regions[1]); - if (cmdline.hasOption("s")) { - source = cmdline.getOptionValue("s"); - } - } - - @Override - protected int executeCommand(CommandLine cmdline) throws Exception { - DLZkServerSet serverSet1 = DLZkServerSet.of(region1, 60000); - logger.info("Created serverset for {}", region1); - DLZkServerSet serverSet2 = DLZkServerSet.of(region2, 60000); - logger.info("Created serverset for {}", region2); - try { - DistributedLogClientBuilder builder1 = - createDistributedLogClientBuilder(serverSet1.getServerSet()); - Pair<DistributedLogClient, MonitorServiceClient> pair1 = - ClientUtils.buildClient(builder1); - DistributedLogClientBuilder builder2 = - createDistributedLogClientBuilder(serverSet2.getServerSet()); - Pair<DistributedLogClient, MonitorServiceClient> pair2 = - ClientUtils.buildClient(builder2); - try { - SimpleBalancer balancer = new SimpleBalancer( - BKNamespaceDriver.getZKServersFromDLUri(region1), pair1.getLeft(), pair1.getRight(), - BKNamespaceDriver.getZKServersFromDLUri(region2), pair2.getLeft(), pair2.getRight()); - try { - return runBalancer(balancer); - } finally { - balancer.close(); - } - } finally { - pair1.getLeft().close(); - pair2.getLeft().close(); - } - } finally { - serverSet1.close(); - serverSet2.close(); - } - } - - protected int runBalancer(SimpleBalancer balancer) throws Exception { - if (null == source) { - balancer.balance( - rebalanceWaterMark, - rebalanceTolerancePercentage, - rebalanceConcurrency, - getRateLimiter()); - } else { - balancer.balanceAll(source, rebalanceConcurrency, getRateLimiter()); - } - return 0; - } - } - - public BalancerTool() { - super(); - addCommand(new ClusterBalancerCommand()); - addCommand(new RegionBalancerCommand()); - } - - @Override - protected String getName() { - return "balancer"; - } -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/distributedlog/service/balancer/BalancerUtils.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/org/apache/distributedlog/service/balancer/BalancerUtils.java b/distributedlog-service/src/main/java/org/apache/distributedlog/service/balancer/BalancerUtils.java deleted file mode 100644 index 4c9e075..0000000 --- a/distributedlog-service/src/main/java/org/apache/distributedlog/service/balancer/BalancerUtils.java +++ /dev/null @@ -1,74 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.distributedlog.service.balancer; - -import java.util.Map; - -/** - * Utils for balancer. - */ -public class BalancerUtils { - - /** - * Util function to calculate how many streams to balance for <i>nodeToRebalance</i>, - * based on the load distribution <i>loadDistribution</i>. - * - * @param nodeToRebalance - * node to rebalance - * @param loadDistribution - * load distribution map - * @param rebalanceWaterMark - * if number of streams of <i>nodeToRebalance</i> - * is less than <i>rebalanceWaterMark</i>, no streams will be re-balanced. - * @param tolerancePercentage - * tolerance percentage for the balancer. if number of streams of <i>nodeToRebalance</i> - * is less than average + average * <i>tolerancePercentage</i> / 100.0, no streams will - * be re-balanced. - * @param <K> - * @return number of streams to rebalance - */ - public static <K> int calculateNumStreamsToRebalance(K nodeToRebalance, - Map<K, Integer> loadDistribution, - int rebalanceWaterMark, - double tolerancePercentage) { - Integer myLoad = loadDistribution.get(nodeToRebalance); - if (null == myLoad || myLoad <= rebalanceWaterMark) { - return 0; - } - - long totalLoad = 0L; - int numNodes = loadDistribution.size(); - - for (Map.Entry<K, Integer> entry : loadDistribution.entrySet()) { - if (null == entry.getKey() || null == entry.getValue()) { - continue; - } - totalLoad += entry.getValue(); - } - - double averageLoad = ((double) totalLoad) / numNodes; - long permissibleLoad = - Math.max(1L, (long) Math.ceil(averageLoad + averageLoad * tolerancePercentage / 100.0f)); - - if (myLoad <= permissibleLoad) { - return 0; - } - - return Math.max(0, myLoad - (int) Math.ceil(averageLoad)); - } -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/distributedlog/service/balancer/ClusterBalancer.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/org/apache/distributedlog/service/balancer/ClusterBalancer.java b/distributedlog-service/src/main/java/org/apache/distributedlog/service/balancer/ClusterBalancer.java deleted file mode 100644 index 5add339..0000000 --- a/distributedlog-service/src/main/java/org/apache/distributedlog/service/balancer/ClusterBalancer.java +++ /dev/null @@ -1,378 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.distributedlog.service.balancer; - -import com.google.common.base.Optional; -import com.google.common.util.concurrent.RateLimiter; -import org.apache.distributedlog.client.monitor.MonitorServiceClient; -import org.apache.distributedlog.service.ClientUtils; -import org.apache.distributedlog.service.DLSocketAddress; -import org.apache.distributedlog.service.DistributedLogClient; -import org.apache.distributedlog.service.DistributedLogClientBuilder; -import com.twitter.util.Await; -import com.twitter.util.Function; -import com.twitter.util.Future; -import com.twitter.util.FutureEventListener; -import java.io.Serializable; -import java.net.SocketAddress; -import java.util.ArrayList; -import java.util.Collections; -import java.util.Comparator; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.atomic.AtomicInteger; -import org.apache.commons.lang3.tuple.Pair; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * A balancer balances ownerships with a cluster of targets. - */ -public class ClusterBalancer implements Balancer { - - private static final Logger logger = LoggerFactory.getLogger(ClusterBalancer.class); - - /** - * Represent a single host. Ordered by number of streams in desc order. - */ - static class Host { - - final SocketAddress address; - final Set<String> streams; - final DistributedLogClientBuilder clientBuilder; - DistributedLogClient client = null; - MonitorServiceClient monitor = null; - - Host(SocketAddress address, Set<String> streams, - DistributedLogClientBuilder clientBuilder) { - this.address = address; - this.streams = streams; - this.clientBuilder = clientBuilder; - } - - private void initializeClientsIfNeeded() { - if (null == client) { - Pair<DistributedLogClient, MonitorServiceClient> clientPair = - createDistributedLogClient(address, clientBuilder); - client = clientPair.getLeft(); - monitor = clientPair.getRight(); - } - } - - synchronized DistributedLogClient getClient() { - initializeClientsIfNeeded(); - return client; - } - - synchronized MonitorServiceClient getMonitor() { - initializeClientsIfNeeded(); - return monitor; - } - - synchronized void close() { - if (null != client) { - client.close(); - } - } - - @Override - public String toString() { - StringBuilder sb = new StringBuilder(); - sb.append("Host(").append(address).append(")"); - return sb.toString(); - } - } - - static class HostComparator implements Comparator<Host>, Serializable { - private static final long serialVersionUID = 7984973796525102538L; - - @Override - public int compare(Host h1, Host h2) { - return h2.streams.size() - h1.streams.size(); - } - } - - protected final DistributedLogClientBuilder clientBuilder; - protected final DistributedLogClient client; - protected final MonitorServiceClient monitor; - - public ClusterBalancer(DistributedLogClientBuilder clientBuilder) { - this(clientBuilder, ClientUtils.buildClient(clientBuilder)); - } - - ClusterBalancer(DistributedLogClientBuilder clientBuilder, - Pair<DistributedLogClient, MonitorServiceClient> clientPair) { - this.clientBuilder = clientBuilder; - this.client = clientPair.getLeft(); - this.monitor = clientPair.getRight(); - } - - /** - * Build a new distributedlog client to a single host <i>host</i>. - * - * @param host - * host to access - * @return distributedlog clients - */ - static Pair<DistributedLogClient, MonitorServiceClient> createDistributedLogClient( - SocketAddress host, DistributedLogClientBuilder clientBuilder) { - DistributedLogClientBuilder newBuilder = - DistributedLogClientBuilder.newBuilder(clientBuilder).host(host); - return ClientUtils.buildClient(newBuilder); - } - - @Override - public void balanceAll(String source, - int rebalanceConcurrency, /* unused */ - Optional<RateLimiter> rebalanceRateLimiter) { - balance(0, 0.0f, rebalanceConcurrency, Optional.of(source), rebalanceRateLimiter); - } - - @Override - public void balance(int rebalanceWaterMark, - double rebalanceTolerancePercentage, - int rebalanceConcurrency, /* unused */ - Optional<RateLimiter> rebalanceRateLimiter) { - Optional<String> source = Optional.absent(); - balance(rebalanceWaterMark, rebalanceTolerancePercentage, rebalanceConcurrency, source, rebalanceRateLimiter); - } - - public void balance(int rebalanceWaterMark, - double rebalanceTolerancePercentage, - int rebalanceConcurrency, - Optional<String> source, - Optional<RateLimiter> rebalanceRateLimiter) { - Map<SocketAddress, Set<String>> distribution = monitor.getStreamOwnershipDistribution(); - if (distribution.size() <= 1) { - return; - } - SocketAddress sourceAddr = null; - if (source.isPresent()) { - sourceAddr = DLSocketAddress.parseSocketAddress(source.get()); - logger.info("Balancer source is {}", sourceAddr); - if (!distribution.containsKey(sourceAddr)) { - return; - } - } - // Get the list of hosts ordered by number of streams in DESC order - List<Host> hosts = new ArrayList<Host>(distribution.size()); - for (Map.Entry<SocketAddress, Set<String>> entry : distribution.entrySet()) { - Host host = new Host(entry.getKey(), entry.getValue(), clientBuilder); - hosts.add(host); - } - Collections.sort(hosts, new HostComparator()); - try { - - // find the host to move streams from. - int hostIdxMoveFrom = -1; - if (null != sourceAddr) { - for (Host host : hosts) { - ++hostIdxMoveFrom; - if (sourceAddr.equals(host.address)) { - break; - } - } - } - - // compute the average load. - int totalStream = 0; - for (Host host : hosts) { - totalStream += host.streams.size(); - } - double averageLoad; - if (hostIdxMoveFrom >= 0) { - averageLoad = ((double) totalStream / (hosts.size() - 1)); - } else { - averageLoad = ((double) totalStream / hosts.size()); - } - - int moveFromLowWaterMark; - int moveToHighWaterMark = - Math.max(1, (int) (averageLoad + averageLoad * rebalanceTolerancePercentage / 100.0f)); - - if (hostIdxMoveFrom >= 0) { - moveFromLowWaterMark = Math.max(0, rebalanceWaterMark); - moveStreams( - hosts, - new AtomicInteger(hostIdxMoveFrom), moveFromLowWaterMark, - new AtomicInteger(hosts.size() - 1), moveToHighWaterMark, - rebalanceRateLimiter); - moveRemainingStreamsFromSource(hosts.get(hostIdxMoveFrom), hosts, rebalanceRateLimiter); - } else { - moveFromLowWaterMark = Math.max((int) Math.ceil(averageLoad), rebalanceWaterMark); - AtomicInteger moveFrom = new AtomicInteger(0); - AtomicInteger moveTo = new AtomicInteger(hosts.size() - 1); - while (moveFrom.get() < moveTo.get()) { - moveStreams(hosts, moveFrom, moveFromLowWaterMark, - moveTo, moveToHighWaterMark, rebalanceRateLimiter); - moveFrom.incrementAndGet(); - } - } - } finally { - for (Host host : hosts) { - host.close(); - } - } - } - - void moveStreams(List<Host> hosts, - AtomicInteger hostIdxMoveFrom, - int moveFromLowWaterMark, - AtomicInteger hostIdxMoveTo, - int moveToHighWaterMark, - Optional<RateLimiter> rateLimiter) { - if (hostIdxMoveFrom.get() < 0 || hostIdxMoveFrom.get() >= hosts.size() - || hostIdxMoveTo.get() < 0 || hostIdxMoveTo.get() >= hosts.size() - || hostIdxMoveFrom.get() >= hostIdxMoveTo.get()) { - return; - } - - if (logger.isDebugEnabled()) { - logger.debug("Moving streams : hosts = {}, from = {}, to = {} :" - + " from_low_water_mark = {}, to_high_water_mark = {}", - new Object[] { - hosts, - hostIdxMoveFrom.get(), - hostIdxMoveTo.get(), - moveFromLowWaterMark, - moveToHighWaterMark }); - } - - Host hostMoveFrom = hosts.get(hostIdxMoveFrom.get()); - int numStreamsOnFromHost = hostMoveFrom.streams.size(); - if (numStreamsOnFromHost <= moveFromLowWaterMark) { - // do nothing - return; - } - - int numStreamsToMove = numStreamsOnFromHost - moveFromLowWaterMark; - LinkedList<String> streamsToMove = new LinkedList<String>(hostMoveFrom.streams); - Collections.shuffle(streamsToMove); - - if (logger.isDebugEnabled()) { - logger.debug("Try to move {} streams from host {} : streams = {}", - new Object[] { numStreamsToMove, hostMoveFrom.address, streamsToMove }); - } - - while (numStreamsToMove-- > 0 && !streamsToMove.isEmpty()) { - if (rateLimiter.isPresent()) { - rateLimiter.get().acquire(); - } - - // pick a host to move - Host hostMoveTo = hosts.get(hostIdxMoveTo.get()); - while (hostMoveTo.streams.size() >= moveToHighWaterMark) { - int hostIdx = hostIdxMoveTo.decrementAndGet(); - logger.info("move to host : {}, from {}", hostIdx, hostIdxMoveFrom.get()); - if (hostIdx <= hostIdxMoveFrom.get()) { - return; - } else { - hostMoveTo = hosts.get(hostIdx); - if (logger.isDebugEnabled()) { - logger.debug("Target host to move moved to host {} @ {}", - hostIdx, hostMoveTo); - } - } - } - - // pick a stream - String stream = streamsToMove.remove(); - - // move the stream - if (moveStream(stream, hostMoveFrom, hostMoveTo)) { - hostMoveFrom.streams.remove(stream); - hostMoveTo.streams.add(stream); - } - } - - } - - void moveRemainingStreamsFromSource(Host source, - List<Host> hosts, - Optional<RateLimiter> rateLimiter) { - LinkedList<String> streamsToMove = new LinkedList<String>(source.streams); - Collections.shuffle(streamsToMove); - - if (logger.isDebugEnabled()) { - logger.debug("Try to move remaining streams from {} : {}", source, streamsToMove); - } - - int hostIdx = hosts.size() - 1; - - while (!streamsToMove.isEmpty()) { - if (rateLimiter.isPresent()) { - rateLimiter.get().acquire(); - } - - Host target = hosts.get(hostIdx); - if (!target.address.equals(source.address)) { - String stream = streamsToMove.remove(); - // move the stream - if (moveStream(stream, source, target)) { - source.streams.remove(stream); - target.streams.add(stream); - } - } - --hostIdx; - if (hostIdx < 0) { - hostIdx = hosts.size() - 1; - } - } - } - - private boolean moveStream(String stream, Host from, Host to) { - try { - doMoveStream(stream, from, to); - return true; - } catch (Exception e) { - return false; - } - } - - private void doMoveStream(final String stream, final Host from, final Host to) throws Exception { - logger.info("Moving stream {} from {} to {}.", - new Object[] { stream, from.address, to.address }); - Await.result(from.getClient().release(stream).flatMap(new Function<Void, Future<Void>>() { - @Override - public Future<Void> apply(Void result) { - logger.info("Released stream {} from {}.", stream, from.address); - return to.getMonitor().check(stream).addEventListener(new FutureEventListener<Void>() { - - @Override - public void onSuccess(Void value) { - logger.info("Moved stream {} from {} to {}.", - new Object[] { stream, from.address, to.address }); - } - - @Override - public void onFailure(Throwable cause) { - logger.info("Failed to move stream {} from {} to {} : ", - new Object[] { stream, from.address, to.address, cause }); - } - }); - } - })); - } - - @Override - public void close() { - client.close(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/distributedlog/service/balancer/CountBasedStreamChooser.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/org/apache/distributedlog/service/balancer/CountBasedStreamChooser.java b/distributedlog-service/src/main/java/org/apache/distributedlog/service/balancer/CountBasedStreamChooser.java deleted file mode 100644 index 6a43179..0000000 --- a/distributedlog-service/src/main/java/org/apache/distributedlog/service/balancer/CountBasedStreamChooser.java +++ /dev/null @@ -1,109 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.distributedlog.service.balancer; - -import static com.google.common.base.Preconditions.checkArgument; - -import java.io.Serializable; -import java.net.SocketAddress; -import java.util.ArrayList; -import java.util.Collections; -import java.util.Comparator; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Set; -import org.apache.commons.lang3.tuple.Pair; - -/** - * A stream chooser based on number of streams. - */ -class CountBasedStreamChooser implements StreamChooser, Serializable, - Comparator<Pair<SocketAddress, LinkedList<String>>> { - - private static final long serialVersionUID = 4664153397369979203L; - - final List<Pair<SocketAddress, LinkedList<String>>> streamsDistribution; - - // pivot index in the list of hosts. the chooser will remove streams from the hosts before - // pivot, which will reduce their stream counts to make them equal to the stream count of the pivot. - int pivot; - int pivotCount; - - // next index in the list of hosts to choose stream from. - int next; - - CountBasedStreamChooser(Map<SocketAddress, Set<String>> streams) { - checkArgument(streams.size() > 0, "Only support no-empty streams distribution"); - streamsDistribution = new ArrayList<Pair<SocketAddress, LinkedList<String>>>(streams.size()); - for (Map.Entry<SocketAddress, Set<String>> entry : streams.entrySet()) { - LinkedList<String> randomizedStreams = new LinkedList<String>(entry.getValue()); - Collections.shuffle(randomizedStreams); - streamsDistribution.add(Pair.of(entry.getKey(), randomizedStreams)); - } - // sort the hosts by the number of streams in descending order - Collections.sort(streamsDistribution, this); - pivot = 0; - pivotCount = streamsDistribution.get(0).getValue().size(); - findNextPivot(); - next = 0; - } - - private void findNextPivot() { - int prevPivotCount = pivotCount; - while (++pivot < streamsDistribution.size()) { - pivotCount = streamsDistribution.get(pivot).getValue().size(); - if (pivotCount < prevPivotCount) { - return; - } - } - pivot = streamsDistribution.size(); - pivotCount = 0; - } - - @Override - public synchronized String choose() { - // reach the pivot - if (next == pivot) { - if (streamsDistribution.get(next - 1).getRight().size() > pivotCount) { - next = 0; - } else if (pivotCount == 0) { // the streams are empty now - return null; - } else { - findNextPivot(); - next = 0; - } - } - - // get stream count that next host to choose from - LinkedList<String> nextStreams = streamsDistribution.get(next).getRight(); - if (nextStreams.size() == 0) { - return null; - } - - String chosenStream = nextStreams.remove(); - ++next; - return chosenStream; - } - - @Override - public int compare(Pair<SocketAddress, LinkedList<String>> o1, - Pair<SocketAddress, LinkedList<String>> o2) { - return o2.getValue().size() - o1.getValue().size(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-service/src/main/java/org/apache/distributedlog/service/balancer/LimitedStreamChooser.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/org/apache/distributedlog/service/balancer/LimitedStreamChooser.java b/distributedlog-service/src/main/java/org/apache/distributedlog/service/balancer/LimitedStreamChooser.java deleted file mode 100644 index 4aefc5e..0000000 --- a/distributedlog-service/src/main/java/org/apache/distributedlog/service/balancer/LimitedStreamChooser.java +++ /dev/null @@ -1,57 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.distributedlog.service.balancer; - -/** - * A stream chooser that can only choose limited number of streams. - */ -public class LimitedStreamChooser implements StreamChooser { - - /** - * Create a limited stream chooser by {@code limit}. - * - * @param underlying the underlying stream chooser. - * @param limit the limit of number of streams to choose. - * @return the limited stream chooser. - */ - public static LimitedStreamChooser of(StreamChooser underlying, int limit) { - return new LimitedStreamChooser(underlying, limit); - } - - final StreamChooser underlying; - int limit; - - LimitedStreamChooser(StreamChooser underlying, int limit) { - this.underlying = underlying; - this.limit = limit; - } - - @Override - public synchronized String choose() { - if (limit <= 0) { - return null; - } - String s = underlying.choose(); - if (s == null) { - limit = 0; - return null; - } - --limit; - return s; - } -}