http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/Benchmarker.java ---------------------------------------------------------------------- diff --git a/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/Benchmarker.java b/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/Benchmarker.java deleted file mode 100644 index 5b04a05..0000000 --- a/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/Benchmarker.java +++ /dev/null @@ -1,468 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.benchmark; - -import static com.google.common.base.Preconditions.checkArgument; -import static com.google.common.base.Preconditions.checkNotNull; - -import com.twitter.distributedlog.DistributedLogConfiguration; -import com.twitter.distributedlog.benchmark.utils.ShiftableRateLimiter; -import com.twitter.finagle.stats.OstrichStatsReceiver; -import com.twitter.finagle.stats.StatsReceiver; -import java.io.File; -import java.io.IOException; -import java.net.URI; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.concurrent.TimeUnit; -import org.apache.bookkeeper.stats.NullStatsProvider; -import org.apache.bookkeeper.stats.StatsLogger; -import org.apache.bookkeeper.stats.StatsProvider; -import org.apache.bookkeeper.util.ReflectionUtils; -import org.apache.commons.cli.BasicParser; -import org.apache.commons.cli.CommandLine; -import org.apache.commons.cli.HelpFormatter; -import org.apache.commons.cli.Options; -import org.apache.commons.lang.StringUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * The launcher for benchmarks. - */ -public class Benchmarker { - - private static final Logger logger = LoggerFactory.getLogger(Benchmarker.class); - - static final String USAGE = "Benchmarker [-u <uri>] [-c <conf>] [-s serverset] [-m (read|write|dlwrite)]"; - - final String[] args; - final Options options = new Options(); - - int rate = 100; - int maxRate = 1000; - int changeRate = 100; - int changeRateSeconds = 1800; - int concurrency = 10; - String streamPrefix = "dlog-loadtest"; - int shardId = -1; - int numStreams = 10; - List<String> serversetPaths = new ArrayList<String>(); - List<String> finagleNames = new ArrayList<String>(); - int msgSize = 256; - String mode = null; - int durationMins = 60; - URI dlUri = null; - int batchSize = 0; - int readersPerStream = 1; - Integer maxStreamId = null; - int truncationInterval = 3600; - Integer startStreamId = null; - Integer endStreamId = null; - int hostConnectionCoreSize = 10; - int hostConnectionLimit = 10; - boolean thriftmux = false; - boolean handshakeWithClientInfo = false; - boolean readFromHead = false; - int sendBufferSize = 1024 * 1024; - int recvBufferSize = 1024 * 1024; - boolean enableBatching = false; - int batchBufferSize = 256 * 1024; - int batchFlushIntervalMicros = 2000; - String routingServiceFinagleNameString; - - final DistributedLogConfiguration conf = new DistributedLogConfiguration(); - final StatsReceiver statsReceiver = new OstrichStatsReceiver(); - StatsProvider statsProvider = null; - - Benchmarker(String[] args) { - this.args = args; - // prepare options - options.addOption("s", "serverset", true, "Proxy Server Set (separated by ',')"); - options.addOption("fn", "finagle-name", true, "Write proxy finagle name (separated by ',')"); - options.addOption("c", "conf", true, "DistributedLog Configuration File"); - options.addOption("u", "uri", true, "DistributedLog URI"); - options.addOption("i", "shard", true, "Shard Id"); - options.addOption("p", "provider", true, "DistributedLog Stats Provider"); - options.addOption("d", "duration", true, "Duration (minutes)"); - options.addOption("sp", "streamprefix", true, "Stream Prefix"); - options.addOption("sc", "streamcount", true, "Number of Streams"); - options.addOption("ms", "messagesize", true, "Message Size (bytes)"); - options.addOption("bs", "batchsize", true, "Batch Size"); - options.addOption("r", "rate", true, "Rate limit (requests/second)"); - options.addOption("mr", "max-rate", true, "Maximum Rate limit (requests/second)"); - options.addOption("cr", "change-rate", true, "Rate to increase each change period (requests/second)"); - options.addOption("ci", "change-interval", true, "Rate to increase period, seconds"); - options.addOption("t", "concurrency", true, "Concurrency (number of threads)"); - options.addOption("m", "mode", true, "Benchmark mode (read/write)"); - options.addOption("rps", "readers-per-stream", true, "Number readers per stream"); - options.addOption("msid", "max-stream-id", true, "Max Stream ID"); - options.addOption("ti", "truncation-interval", true, "Truncation interval in seconds"); - options.addOption("ssid", "start-stream-id", true, "Start Stream ID"); - options.addOption("esid", "end-stream-id", true, "Start Stream ID"); - options.addOption("hccs", "host-connection-core-size", true, "Finagle hostConnectionCoreSize"); - options.addOption("hcl", "host-connection-limit", true, "Finagle hostConnectionLimit"); - options.addOption("mx", "thriftmux", false, "Enable thriftmux (write mode only)"); - options.addOption("hsci", "handshake-with-client-info", false, "Enable handshaking with client info"); - options.addOption("rfh", "read-from-head", false, "Read from head of the stream"); - options.addOption("sb", "send-buffer", true, "Channel send buffer size, in bytes"); - options.addOption("rb", "recv-buffer", true, "Channel recv buffer size, in bytes"); - options.addOption("bt", "enable-batch", false, "Enable batching on writers"); - options.addOption("bbs", "batch-buffer-size", true, "The batch buffer size in bytes"); - options.addOption("bfi", "batch-flush-interval", true, "The batch buffer flush interval in micros"); - options.addOption("rs", "routing-service", true, "The routing service finagle name for server-side routing"); - options.addOption("h", "help", false, "Print usage."); - } - - void printUsage() { - HelpFormatter helpFormatter = new HelpFormatter(); - helpFormatter.printHelp(USAGE, options); - } - - void run() throws Exception { - logger.info("Running benchmark."); - - BasicParser parser = new BasicParser(); - CommandLine cmdline = parser.parse(options, args); - if (cmdline.hasOption("h")) { - printUsage(); - System.exit(0); - } - if (cmdline.hasOption("s")) { - String serversetPathStr = cmdline.getOptionValue("s"); - serversetPaths = Arrays.asList(StringUtils.split(serversetPathStr, ',')); - } - if (cmdline.hasOption("fn")) { - String finagleNameStr = cmdline.getOptionValue("fn"); - finagleNames = Arrays.asList(StringUtils.split(finagleNameStr, ',')); - } - if (cmdline.hasOption("i")) { - shardId = Integer.parseInt(cmdline.getOptionValue("i")); - } - if (cmdline.hasOption("d")) { - durationMins = Integer.parseInt(cmdline.getOptionValue("d")); - } - if (cmdline.hasOption("sp")) { - streamPrefix = cmdline.getOptionValue("sp"); - } - if (cmdline.hasOption("sc")) { - numStreams = Integer.parseInt(cmdline.getOptionValue("sc")); - } - if (cmdline.hasOption("ms")) { - msgSize = Integer.parseInt(cmdline.getOptionValue("ms")); - } - if (cmdline.hasOption("r")) { - rate = Integer.parseInt(cmdline.getOptionValue("r")); - } - if (cmdline.hasOption("mr")) { - maxRate = Integer.parseInt(cmdline.getOptionValue("mr")); - } - if (cmdline.hasOption("cr")) { - changeRate = Integer.parseInt(cmdline.getOptionValue("cr")); - } - if (cmdline.hasOption("ci")) { - changeRateSeconds = Integer.parseInt(cmdline.getOptionValue("ci")); - } - if (cmdline.hasOption("t")) { - concurrency = Integer.parseInt(cmdline.getOptionValue("t")); - } - if (cmdline.hasOption("m")) { - mode = cmdline.getOptionValue("m"); - } - if (cmdline.hasOption("u")) { - dlUri = URI.create(cmdline.getOptionValue("u")); - } - if (cmdline.hasOption("bs")) { - batchSize = Integer.parseInt(cmdline.getOptionValue("bs")); - checkArgument("write" != mode, "batchSize supported only for mode=write"); - } - if (cmdline.hasOption("c")) { - String configFile = cmdline.getOptionValue("c"); - conf.loadConf(new File(configFile).toURI().toURL()); - } - if (cmdline.hasOption("rps")) { - readersPerStream = Integer.parseInt(cmdline.getOptionValue("rps")); - } - if (cmdline.hasOption("msid")) { - maxStreamId = Integer.parseInt(cmdline.getOptionValue("msid")); - } - if (cmdline.hasOption("ti")) { - truncationInterval = Integer.parseInt(cmdline.getOptionValue("ti")); - } - if (cmdline.hasOption("ssid")) { - startStreamId = Integer.parseInt(cmdline.getOptionValue("ssid")); - } - if (cmdline.hasOption("esid")) { - endStreamId = Integer.parseInt(cmdline.getOptionValue("esid")); - } - if (cmdline.hasOption("hccs")) { - hostConnectionCoreSize = Integer.parseInt(cmdline.getOptionValue("hccs")); - } - if (cmdline.hasOption("hcl")) { - hostConnectionLimit = Integer.parseInt(cmdline.getOptionValue("hcl")); - } - if (cmdline.hasOption("sb")) { - sendBufferSize = Integer.parseInt(cmdline.getOptionValue("sb")); - } - if (cmdline.hasOption("rb")) { - recvBufferSize = Integer.parseInt(cmdline.getOptionValue("rb")); - } - if (cmdline.hasOption("rs")) { - routingServiceFinagleNameString = cmdline.getOptionValue("rs"); - } - thriftmux = cmdline.hasOption("mx"); - handshakeWithClientInfo = cmdline.hasOption("hsci"); - readFromHead = cmdline.hasOption("rfh"); - enableBatching = cmdline.hasOption("bt"); - if (cmdline.hasOption("bbs")) { - batchBufferSize = Integer.parseInt(cmdline.getOptionValue("bbs")); - } - if (cmdline.hasOption("bfi")) { - batchFlushIntervalMicros = Integer.parseInt(cmdline.getOptionValue("bfi")); - } - - checkArgument(shardId >= 0, "shardId must be >= 0"); - checkArgument(numStreams > 0, "numStreams must be > 0"); - checkArgument(durationMins > 0, "durationMins must be > 0"); - checkArgument(streamPrefix != null, "streamPrefix must be defined"); - checkArgument(hostConnectionCoreSize > 0, "host connection core size must be > 0"); - checkArgument(hostConnectionLimit > 0, "host connection limit must be > 0"); - - if (cmdline.hasOption("p")) { - statsProvider = ReflectionUtils.newInstance(cmdline.getOptionValue("p"), StatsProvider.class); - } else { - statsProvider = new NullStatsProvider(); - } - - logger.info("Starting stats provider : {}.", statsProvider.getClass()); - statsProvider.start(conf); - - Worker w = null; - if (mode.startsWith("read")) { - w = runReader(); - } else if (mode.startsWith("write")) { - w = runWriter(); - } else if (mode.startsWith("dlwrite")) { - w = runDLWriter(); - } else if (mode.startsWith("dlread")) { - w = runDLReader(); - } - - if (w == null) { - throw new IOException("Unknown mode " + mode + " to run the benchmark."); - } - - Thread workerThread = new Thread(w, mode + "-benchmark-thread"); - workerThread.start(); - - TimeUnit.MINUTES.sleep(durationMins); - - logger.info("{} minutes passed, exiting...", durationMins); - w.close(); - - if (null != statsProvider) { - statsProvider.stop(); - } - - Runtime.getRuntime().exit(0); - } - - Worker runWriter() { - checkArgument(!finagleNames.isEmpty() || !serversetPaths.isEmpty() || null != dlUri, - "either serverset paths, finagle-names or uri required"); - checkArgument(msgSize > 0, "messagesize must be greater than 0"); - checkArgument(rate > 0, "rate must be greater than 0"); - checkArgument(maxRate >= rate, "max rate must be greater than rate"); - checkArgument(changeRate >= 0, "change rate must be positive"); - checkArgument(changeRateSeconds >= 0, "change rate must be positive"); - checkArgument(concurrency > 0, "concurrency must be greater than 0"); - - ShiftableRateLimiter rateLimiter = - new ShiftableRateLimiter(rate, maxRate, changeRate, changeRateSeconds, TimeUnit.SECONDS); - return createWriteWorker( - streamPrefix, - dlUri, - null == startStreamId ? shardId * numStreams : startStreamId, - null == endStreamId ? (shardId + 1) * numStreams : endStreamId, - rateLimiter, - concurrency, - msgSize, - batchSize, - hostConnectionCoreSize, - hostConnectionLimit, - serversetPaths, - finagleNames, - statsReceiver.scope("write_client"), - statsProvider.getStatsLogger("write"), - thriftmux, - handshakeWithClientInfo, - sendBufferSize, - recvBufferSize, - enableBatching, - batchBufferSize, - batchFlushIntervalMicros, - routingServiceFinagleNameString); - } - - protected WriterWorker createWriteWorker( - String streamPrefix, - URI uri, - int startStreamId, - int endStreamId, - ShiftableRateLimiter rateLimiter, - int writeConcurrency, - int messageSizeBytes, - int batchSize, - int hostConnectionCoreSize, - int hostConnectionLimit, - List<String> serverSetPaths, - List<String> finagleNames, - StatsReceiver statsReceiver, - StatsLogger statsLogger, - boolean thriftmux, - boolean handshakeWithClientInfo, - int sendBufferSize, - int recvBufferSize, - boolean enableBatching, - int batchBufferSize, - int batchFlushIntervalMicros, - String routingServiceFinagleNameString) { - return new WriterWorker( - streamPrefix, - uri, - startStreamId, - endStreamId, - rateLimiter, - writeConcurrency, - messageSizeBytes, - batchSize, - hostConnectionCoreSize, - hostConnectionLimit, - serverSetPaths, - finagleNames, - statsReceiver, - statsLogger, - thriftmux, - handshakeWithClientInfo, - sendBufferSize, - recvBufferSize, - enableBatching, - batchBufferSize, - batchFlushIntervalMicros, - routingServiceFinagleNameString); - } - - Worker runDLWriter() throws IOException { - checkNotNull(dlUri, "dlUri must be defined"); - checkArgument(rate > 0, "rate must be greater than 0"); - checkArgument(maxRate >= rate, "max rate must be greater than rate"); - checkArgument(changeRate >= 0, "change rate must be positive"); - checkArgument(changeRateSeconds >= 0, "change rate must be positive"); - checkArgument(concurrency > 0, "concurrency must be greater than 0"); - - ShiftableRateLimiter rateLimiter = - new ShiftableRateLimiter(rate, maxRate, changeRate, changeRateSeconds, TimeUnit.SECONDS); - - return new DLWriterWorker(conf, - dlUri, - streamPrefix, - shardId * numStreams, - (shardId + 1) * numStreams, - rateLimiter, - concurrency, - msgSize, - statsProvider.getStatsLogger("dlwrite")); - } - - Worker runReader() throws IOException { - checkArgument(!finagleNames.isEmpty() || !serversetPaths.isEmpty() || null != dlUri, - "either serverset paths, finagle-names or dlUri required"); - checkArgument(concurrency > 0, "concurrency must be greater than 0"); - checkArgument(truncationInterval > 0, "truncation interval should be greater than 0"); - return runReaderInternal(serversetPaths, finagleNames, truncationInterval); - } - - Worker runDLReader() throws IOException { - return runReaderInternal(new ArrayList<String>(), new ArrayList<String>(), 0); - } - - private Worker runReaderInternal(List<String> serversetPaths, - List<String> finagleNames, - int truncationInterval) throws IOException { - checkNotNull(dlUri); - - int ssid = null == startStreamId ? shardId * numStreams : startStreamId; - int esid = null == endStreamId ? (shardId + readersPerStream) * numStreams : endStreamId; - if (null != maxStreamId) { - esid = Math.min(esid, maxStreamId); - } - - return createReaderWorker( - conf, - dlUri, - streamPrefix, - ssid, - esid, - concurrency, - serversetPaths, - finagleNames, - truncationInterval, - readFromHead, - statsReceiver, - statsProvider.getStatsLogger("dlreader")); - } - - protected ReaderWorker createReaderWorker( - DistributedLogConfiguration conf, - URI uri, - String streamPrefix, - int startStreamId, - int endStreamId, - int readThreadPoolSize, - List<String> serverSetPaths, - List<String> finagleNames, - int truncationIntervalInSeconds, - boolean readFromHead, /* read from the earliest data of log */ - StatsReceiver statsReceiver, - StatsLogger statsLogger) throws IOException { - return new ReaderWorker( - conf, - uri, - streamPrefix, - startStreamId, - endStreamId, - readThreadPoolSize, - serverSetPaths, - finagleNames, - truncationIntervalInSeconds, - readFromHead, - statsReceiver, - statsLogger); - } - - public static void main(String[] args) { - Benchmarker benchmarker = new Benchmarker(args); - try { - benchmarker.run(); - } catch (Exception e) { - logger.info("Benchmark quit due to : ", e); - } - } - -}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/DLWriterWorker.java ---------------------------------------------------------------------- diff --git a/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/DLWriterWorker.java b/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/DLWriterWorker.java deleted file mode 100644 index 152cd32..0000000 --- a/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/DLWriterWorker.java +++ /dev/null @@ -1,245 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.benchmark; - -import static com.google.common.base.Preconditions.checkArgument; - -import com.twitter.distributedlog.AsyncLogWriter; -import com.twitter.distributedlog.DLSN; -import com.twitter.distributedlog.DistributedLogConfiguration; -import com.twitter.distributedlog.DistributedLogManager; -import com.twitter.distributedlog.LogRecord; -import com.twitter.distributedlog.benchmark.utils.ShiftableRateLimiter; -import com.twitter.distributedlog.namespace.DistributedLogNamespace; -import com.twitter.distributedlog.namespace.DistributedLogNamespaceBuilder; -import com.twitter.distributedlog.util.FutureUtils; -import com.twitter.distributedlog.util.SchedulerUtils; -import com.twitter.util.FutureEventListener; -import java.io.IOException; -import java.net.URI; -import java.util.ArrayList; -import java.util.List; -import java.util.Random; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; -import org.apache.bookkeeper.stats.OpStatsLogger; -import org.apache.bookkeeper.stats.StatsLogger; -import org.apache.thrift.TException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * The benchmark for core library writer. - */ -public class DLWriterWorker implements Worker { - - private static final Logger LOG = LoggerFactory.getLogger(DLWriterWorker.class); - - static final int BACKOFF_MS = 200; - - final String streamPrefix; - final int startStreamId; - final int endStreamId; - final int writeConcurrency; - final int messageSizeBytes; - final ExecutorService executorService; - final ScheduledExecutorService rescueService; - final ShiftableRateLimiter rateLimiter; - final Random random; - final DistributedLogNamespace namespace; - final List<DistributedLogManager> dlms; - final List<AsyncLogWriter> streamWriters; - final int numStreams; - - volatile boolean running = true; - - final StatsLogger statsLogger; - final OpStatsLogger requestStat; - - public DLWriterWorker(DistributedLogConfiguration conf, - URI uri, - String streamPrefix, - int startStreamId, - int endStreamId, - ShiftableRateLimiter rateLimiter, - int writeConcurrency, - int messageSizeBytes, - StatsLogger statsLogger) throws IOException { - checkArgument(startStreamId <= endStreamId); - this.streamPrefix = streamPrefix; - this.startStreamId = startStreamId; - this.endStreamId = endStreamId; - this.rateLimiter = rateLimiter; - this.writeConcurrency = writeConcurrency; - this.messageSizeBytes = messageSizeBytes; - this.statsLogger = statsLogger; - this.requestStat = this.statsLogger.getOpStatsLogger("requests"); - this.executorService = Executors.newCachedThreadPool(); - this.rescueService = Executors.newSingleThreadScheduledExecutor(); - this.random = new Random(System.currentTimeMillis()); - - this.namespace = DistributedLogNamespaceBuilder.newBuilder() - .conf(conf) - .uri(uri) - .statsLogger(statsLogger.scope("dl")) - .build(); - this.numStreams = endStreamId - startStreamId; - dlms = new ArrayList<DistributedLogManager>(numStreams); - streamWriters = new ArrayList<AsyncLogWriter>(numStreams); - final ConcurrentMap<String, AsyncLogWriter> writers = new ConcurrentHashMap<String, AsyncLogWriter>(); - final CountDownLatch latch = new CountDownLatch(this.numStreams); - for (int i = startStreamId; i < endStreamId; i++) { - final String streamName = String.format("%s_%d", streamPrefix, i); - final DistributedLogManager dlm = namespace.openLog(streamName); - executorService.submit(new Runnable() { - @Override - public void run() { - try { - AsyncLogWriter writer = dlm.startAsyncLogSegmentNonPartitioned(); - if (null != writers.putIfAbsent(streamName, writer)) { - FutureUtils.result(writer.asyncClose()); - } - latch.countDown(); - } catch (IOException e) { - LOG.error("Failed to intialize writer for stream : {}", streamName, e); - } - - } - }); - dlms.add(dlm); - } - try { - latch.await(); - } catch (InterruptedException e) { - throw new IOException("Interrupted on initializing writers for streams.", e); - } - for (int i = startStreamId; i < endStreamId; i++) { - final String streamName = String.format("%s_%d", streamPrefix, i); - AsyncLogWriter writer = writers.get(streamName); - if (null == writer) { - throw new IOException("Writer for " + streamName + " never initialized."); - } - streamWriters.add(writer); - } - LOG.info("Writing to {} streams.", numStreams); - } - - void rescueWriter(int idx, AsyncLogWriter writer) { - if (streamWriters.get(idx) == writer) { - try { - FutureUtils.result(writer.asyncClose()); - } catch (IOException e) { - LOG.error("Failed to close writer for stream {}.", idx); - } - AsyncLogWriter newWriter = null; - try { - newWriter = dlms.get(idx).startAsyncLogSegmentNonPartitioned(); - } catch (IOException e) { - LOG.error("Failed to create new writer for stream {}, backoff for {} ms.", - idx, BACKOFF_MS); - scheduleRescue(idx, writer, BACKOFF_MS); - } - streamWriters.set(idx, newWriter); - } else { - LOG.warn("AsyncLogWriter for stream {} was already rescued.", idx); - } - } - - void scheduleRescue(final int idx, final AsyncLogWriter writer, int delayMs) { - Runnable r = new Runnable() { - @Override - public void run() { - rescueWriter(idx, writer); - } - }; - if (delayMs > 0) { - rescueService.schedule(r, delayMs, TimeUnit.MILLISECONDS); - } else { - rescueService.submit(r); - } - } - - @Override - public void close() throws IOException { - this.running = false; - SchedulerUtils.shutdownScheduler(this.executorService, 2, TimeUnit.MINUTES); - SchedulerUtils.shutdownScheduler(this.rescueService, 2, TimeUnit.MINUTES); - for (AsyncLogWriter writer : streamWriters) { - FutureUtils.result(writer.asyncClose()); - } - for (DistributedLogManager dlm : dlms) { - dlm.close(); - } - namespace.close(); - } - - @Override - public void run() { - LOG.info("Starting dlwriter (concurrency = {}, prefix = {}, numStreams = {})", - new Object[] { writeConcurrency, streamPrefix, numStreams }); - for (int i = 0; i < writeConcurrency; i++) { - executorService.submit(new Writer(i)); - } - } - - class Writer implements Runnable { - - final int idx; - - Writer(int idx) { - this.idx = idx; - } - - @Override - public void run() { - LOG.info("Started writer {}.", idx); - while (running) { - final int streamIdx = random.nextInt(numStreams); - final AsyncLogWriter writer = streamWriters.get(streamIdx); - rateLimiter.getLimiter().acquire(); - final long requestMillis = System.currentTimeMillis(); - final byte[] data; - try { - data = Utils.generateMessage(requestMillis, messageSizeBytes); - } catch (TException e) { - LOG.error("Error on generating message : ", e); - break; - } - writer.write(new LogRecord(requestMillis, data)).addEventListener(new FutureEventListener<DLSN>() { - @Override - public void onSuccess(DLSN value) { - requestStat.registerSuccessfulEvent(System.currentTimeMillis() - requestMillis); - } - - @Override - public void onFailure(Throwable cause) { - requestStat.registerFailedEvent(System.currentTimeMillis() - requestMillis); - LOG.error("Failed to publish, rescue it : ", cause); - scheduleRescue(streamIdx, writer, 0); - } - }); - } - } - } - -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/ReaderWorker.java ---------------------------------------------------------------------- diff --git a/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/ReaderWorker.java b/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/ReaderWorker.java deleted file mode 100644 index adbdeda..0000000 --- a/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/ReaderWorker.java +++ /dev/null @@ -1,468 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.benchmark; - -import static com.google.common.base.Preconditions.checkArgument; - -import com.google.common.base.Stopwatch; -import com.google.common.util.concurrent.ThreadFactoryBuilder; -import com.twitter.common.zookeeper.ServerSet; -import com.twitter.distributedlog.AsyncLogReader; -import com.twitter.distributedlog.DLSN; -import com.twitter.distributedlog.DistributedLogConfiguration; -import com.twitter.distributedlog.DistributedLogManager; -import com.twitter.distributedlog.LogRecordSet; -import com.twitter.distributedlog.LogRecordWithDLSN; -import com.twitter.distributedlog.benchmark.thrift.Message; -import com.twitter.distributedlog.client.serverset.DLZkServerSet; -import com.twitter.distributedlog.exceptions.DLInterruptedException; -import com.twitter.distributedlog.namespace.DistributedLogNamespace; -import com.twitter.distributedlog.namespace.DistributedLogNamespaceBuilder; -import com.twitter.distributedlog.service.DistributedLogClient; -import com.twitter.distributedlog.service.DistributedLogClientBuilder; -import com.twitter.distributedlog.util.FutureUtils; -import com.twitter.distributedlog.util.SchedulerUtils; -import com.twitter.finagle.builder.ClientBuilder; -import com.twitter.finagle.stats.StatsReceiver; -import com.twitter.finagle.thrift.ClientId$; -import com.twitter.util.Duration$; -import com.twitter.util.Function; -import com.twitter.util.Future; -import com.twitter.util.FutureEventListener; -import com.twitter.util.Promise; -import java.io.IOException; -import java.net.URI; -import java.util.List; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; -import org.apache.bookkeeper.stats.Counter; -import org.apache.bookkeeper.stats.Gauge; -import org.apache.bookkeeper.stats.OpStatsLogger; -import org.apache.bookkeeper.stats.StatsLogger; -import org.apache.thrift.TException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * The benchmark for core library reader. - */ -public class ReaderWorker implements Worker { - - private static final Logger LOG = LoggerFactory.getLogger(ReaderWorker.class); - - static final int BACKOFF_MS = 200; - - final String streamPrefix; - final int startStreamId; - final int endStreamId; - final ScheduledExecutorService executorService; - final ExecutorService callbackExecutor; - final DistributedLogNamespace namespace; - final DistributedLogManager[] dlms; - final AsyncLogReader[] logReaders; - final StreamReader[] streamReaders; - final int numStreams; - final boolean readFromHead; - - final int truncationIntervalInSeconds; - // DL Client Related Variables - final DLZkServerSet[] serverSets; - final List<String> finagleNames; - final DistributedLogClient dlc; - - volatile boolean running = true; - - final StatsReceiver statsReceiver; - final StatsLogger statsLogger; - final OpStatsLogger e2eStat; - final OpStatsLogger deliveryStat; - final OpStatsLogger negativeE2EStat; - final OpStatsLogger negativeDeliveryStat; - final OpStatsLogger truncationStat; - final Counter invalidRecordsCounter; - final Counter outOfOrderSequenceIdCounter; - - class StreamReader implements FutureEventListener<List<LogRecordWithDLSN>>, Runnable, Gauge<Number> { - - final int streamIdx; - final String streamName; - DLSN prevDLSN = null; - long prevSequenceId = Long.MIN_VALUE; - private static final String gaugeLabel = "sequence_id"; - - StreamReader(int idx, StatsLogger statsLogger) { - this.streamIdx = idx; - int streamId = startStreamId + streamIdx; - streamName = String.format("%s_%d", streamPrefix, streamId); - statsLogger.scope(streamName).registerGauge(gaugeLabel, this); - } - - @Override - public void onSuccess(final List<LogRecordWithDLSN> records) { - for (final LogRecordWithDLSN record : records) { - if (record.isRecordSet()) { - try { - processRecordSet(record); - } catch (IOException e) { - onFailure(e); - } - } else { - processRecord(record); - } - } - readLoop(); - } - - public void processRecordSet(final LogRecordWithDLSN record) throws IOException { - LogRecordSet.Reader reader = LogRecordSet.of(record); - LogRecordWithDLSN nextRecord = reader.nextRecord(); - while (null != nextRecord) { - processRecord(nextRecord); - nextRecord = reader.nextRecord(); - } - } - - public void processRecord(final LogRecordWithDLSN record) { - Message msg; - try { - msg = Utils.parseMessage(record.getPayload()); - } catch (TException e) { - invalidRecordsCounter.inc(); - LOG.warn("Failed to parse record {} for stream {} : size = {} , ", - new Object[] { record, streamIdx, record.getPayload().length, e }); - return; - } - long curTimeMillis = System.currentTimeMillis(); - long e2eLatency = curTimeMillis - msg.getPublishTime(); - long deliveryLatency = curTimeMillis - record.getTransactionId(); - if (e2eLatency >= 0) { - e2eStat.registerSuccessfulEvent(e2eLatency); - } else { - negativeE2EStat.registerSuccessfulEvent(-e2eLatency); - } - if (deliveryLatency >= 0) { - deliveryStat.registerSuccessfulEvent(deliveryLatency); - } else { - negativeDeliveryStat.registerSuccessfulEvent(-deliveryLatency); - } - - prevDLSN = record.getDlsn(); - } - - @Override - public void onFailure(Throwable cause) { - scheduleReinitStream(streamIdx).map(new Function<Void, Void>() { - @Override - public Void apply(Void value) { - prevDLSN = null; - prevSequenceId = Long.MIN_VALUE; - readLoop(); - return null; - } - }); - } - - void readLoop() { - if (!running) { - return; - } - logReaders[streamIdx].readBulk(10).addEventListener(this); - } - - @Override - public void run() { - final DLSN dlsnToTruncate = prevDLSN; - if (null == dlsnToTruncate) { - return; - } - final Stopwatch stopwatch = Stopwatch.createStarted(); - dlc.truncate(streamName, dlsnToTruncate).addEventListener( - new FutureEventListener<Boolean>() { - @Override - public void onSuccess(Boolean value) { - truncationStat.registerSuccessfulEvent(stopwatch.stop().elapsed(TimeUnit.MILLISECONDS)); - } - - @Override - public void onFailure(Throwable cause) { - truncationStat.registerFailedEvent(stopwatch.stop().elapsed(TimeUnit.MILLISECONDS)); - LOG.error("Failed to truncate stream {} to {} : ", - new Object[]{streamName, dlsnToTruncate, cause}); - } - }); - } - - @Override - public Number getDefaultValue() { - return Long.MIN_VALUE; - } - - @Override - public synchronized Number getSample() { - return prevSequenceId; - } - - void unregisterGauge() { - statsLogger.scope(streamName).unregisterGauge(gaugeLabel, this); - } - } - - public ReaderWorker(DistributedLogConfiguration conf, - URI uri, - String streamPrefix, - int startStreamId, - int endStreamId, - int readThreadPoolSize, - List<String> serverSetPaths, - List<String> finagleNames, - int truncationIntervalInSeconds, - boolean readFromHead, /* read from the earliest data of log */ - StatsReceiver statsReceiver, - StatsLogger statsLogger) throws IOException { - checkArgument(startStreamId <= endStreamId); - this.streamPrefix = streamPrefix; - this.startStreamId = startStreamId; - this.endStreamId = endStreamId; - this.truncationIntervalInSeconds = truncationIntervalInSeconds; - this.readFromHead = readFromHead; - this.statsReceiver = statsReceiver; - this.statsLogger = statsLogger; - this.e2eStat = this.statsLogger.getOpStatsLogger("e2e"); - this.negativeE2EStat = this.statsLogger.getOpStatsLogger("e2eNegative"); - this.deliveryStat = this.statsLogger.getOpStatsLogger("delivery"); - this.negativeDeliveryStat = this.statsLogger.getOpStatsLogger("deliveryNegative"); - this.truncationStat = this.statsLogger.getOpStatsLogger("truncation"); - this.invalidRecordsCounter = this.statsLogger.getCounter("invalid_records"); - this.outOfOrderSequenceIdCounter = this.statsLogger.getCounter("out_of_order_seq_id"); - this.executorService = Executors.newScheduledThreadPool( - readThreadPoolSize, new ThreadFactoryBuilder().setNameFormat("benchmark.reader-%d").build()); - this.callbackExecutor = Executors.newFixedThreadPool( - Runtime.getRuntime().availableProcessors(), - new ThreadFactoryBuilder().setNameFormat("benchmark.reader-callback-%d").build()); - this.finagleNames = finagleNames; - this.serverSets = createServerSets(serverSetPaths); - - conf.setDeserializeRecordSetOnReads(false); - - if (truncationIntervalInSeconds > 0 && (!finagleNames.isEmpty() || !serverSetPaths.isEmpty())) { - // Construct client for truncation - DistributedLogClientBuilder builder = DistributedLogClientBuilder.newBuilder() - .clientId(ClientId$.MODULE$.apply("dlog_loadtest_reader")) - .clientBuilder(ClientBuilder.get() - .hostConnectionLimit(10) - .hostConnectionCoresize(10) - .tcpConnectTimeout(Duration$.MODULE$.fromSeconds(1)) - .requestTimeout(Duration$.MODULE$.fromSeconds(2))) - .redirectBackoffStartMs(100) - .redirectBackoffMaxMs(500) - .requestTimeoutMs(2000) - .statsReceiver(statsReceiver) - .thriftmux(true) - .name("reader"); - - if (serverSetPaths.isEmpty()) { - // Prepare finagle names - String local = finagleNames.get(0); - String[] remotes = new String[finagleNames.size() - 1]; - finagleNames.subList(1, finagleNames.size()).toArray(remotes); - - builder = builder.finagleNameStrs(local, remotes); - LOG.info("Initialized distributedlog client for truncation @ {}.", finagleNames); - } else if (serverSets.length != 0){ - ServerSet local = this.serverSets[0].getServerSet(); - ServerSet[] remotes = new ServerSet[this.serverSets.length - 1]; - for (int i = 1; i < serverSets.length; i++) { - remotes[i - 1] = serverSets[i].getServerSet(); - } - - builder = builder.serverSets(local, remotes); - LOG.info("Initialized distributedlog client for truncation @ {}.", serverSetPaths); - } else { - builder = builder.uri(uri); - LOG.info("Initialized distributedlog client for namespace {}", uri); - } - dlc = builder.build(); - } else { - dlc = null; - } - - // construct the factory - this.namespace = DistributedLogNamespaceBuilder.newBuilder() - .conf(conf) - .uri(uri) - .statsLogger(statsLogger.scope("dl")) - .build(); - this.numStreams = endStreamId - startStreamId; - this.dlms = new DistributedLogManager[numStreams]; - this.logReaders = new AsyncLogReader[numStreams]; - final CountDownLatch latch = new CountDownLatch(numStreams); - for (int i = 0; i < numStreams; i++) { - final int idx = i; - executorService.submit(new Runnable() { - @Override - public void run() { - reinitStream(idx).map(new Function<Void, Void>() { - @Override - public Void apply(Void value) { - LOG.info("Initialized stream reader {}.", idx); - latch.countDown(); - return null; - } - }); - } - }); - } - try { - latch.await(); - } catch (InterruptedException e) { - throw new DLInterruptedException("Failed to intialize benchmark readers : ", e); - } - this.streamReaders = new StreamReader[numStreams]; - for (int i = 0; i < numStreams; i++) { - streamReaders[i] = new StreamReader(i, statsLogger.scope("perstream")); - if (truncationIntervalInSeconds > 0) { - executorService.scheduleWithFixedDelay(streamReaders[i], - truncationIntervalInSeconds, truncationIntervalInSeconds, TimeUnit.SECONDS); - } - } - LOG.info("Initialized benchmark reader on {} streams {} : [{} - {})", - new Object[] { numStreams, streamPrefix, startStreamId, endStreamId }); - } - - protected DLZkServerSet[] createServerSets(List<String> serverSetPaths) { - DLZkServerSet[] serverSets = new DLZkServerSet[serverSetPaths.size()]; - for (int i = 0; i < serverSets.length; i++) { - String serverSetPath = serverSetPaths.get(i); - serverSets[i] = DLZkServerSet.of(URI.create(serverSetPath), 60000); - } - return serverSets; - } - - private Future<Void> reinitStream(int idx) { - Promise<Void> promise = new Promise<Void>(); - reinitStream(idx, promise); - return promise; - } - - private void reinitStream(int idx, Promise<Void> promise) { - int streamId = startStreamId + idx; - String streamName = String.format("%s_%d", streamPrefix, streamId); - - if (logReaders[idx] != null) { - try { - FutureUtils.result(logReaders[idx].asyncClose()); - } catch (IOException e) { - LOG.warn("Failed on closing stream reader {} : ", streamName, e); - } - logReaders[idx] = null; - } - if (dlms[idx] != null) { - try { - dlms[idx].close(); - } catch (IOException e) { - LOG.warn("Failed on closing dlm {} : ", streamName, e); - } - dlms[idx] = null; - } - - try { - dlms[idx] = namespace.openLog(streamName); - } catch (IOException ioe) { - LOG.error("Failed on creating dlm {} : ", streamName, ioe); - scheduleReinitStream(idx, promise); - return; - } - DLSN lastDLSN; - if (readFromHead) { - lastDLSN = DLSN.InitialDLSN; - } else { - try { - lastDLSN = dlms[idx].getLastDLSN(); - } catch (IOException ioe) { - LOG.error("Failed on getting last dlsn from stream {} : ", streamName, ioe); - scheduleReinitStream(idx, promise); - return; - } - } - try { - logReaders[idx] = dlms[idx].getAsyncLogReader(lastDLSN); - } catch (IOException ioe) { - LOG.error("Failed on opening reader for stream {} starting from {} : ", - new Object[] { streamName, lastDLSN, ioe }); - scheduleReinitStream(idx, promise); - return; - } - LOG.info("Opened reader for stream {}, starting from {}.", streamName, lastDLSN); - promise.setValue(null); - } - - Future<Void> scheduleReinitStream(int idx) { - Promise<Void> promise = new Promise<Void>(); - scheduleReinitStream(idx, promise); - return promise; - } - - void scheduleReinitStream(final int idx, final Promise<Void> promise) { - executorService.schedule(new Runnable() { - @Override - public void run() { - reinitStream(idx, promise); - } - }, BACKOFF_MS, TimeUnit.MILLISECONDS); - } - - @Override - public void close() throws IOException { - this.running = false; - for (AsyncLogReader reader : logReaders) { - if (null != reader) { - FutureUtils.result(reader.asyncClose()); - } - } - for (DistributedLogManager dlm : dlms) { - if (null != dlm) { - dlm.close(); - } - } - namespace.close(); - SchedulerUtils.shutdownScheduler(executorService, 2, TimeUnit.MINUTES); - SchedulerUtils.shutdownScheduler(callbackExecutor, 2, TimeUnit.MINUTES); - if (this.dlc != null) { - this.dlc.close(); - } - for (DLZkServerSet serverSet: serverSets) { - serverSet.close(); - } - // Unregister gauges to prevent GC spirals - for (StreamReader sr : streamReaders) { - sr.unregisterGauge(); - } - } - - @Override - public void run() { - LOG.info("Starting reader (prefix = {}, numStreams = {}).", - streamPrefix, numStreams); - for (StreamReader sr : streamReaders) { - sr.readLoop(); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/Utils.java ---------------------------------------------------------------------- diff --git a/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/Utils.java b/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/Utils.java deleted file mode 100644 index f5c32db..0000000 --- a/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/Utils.java +++ /dev/null @@ -1,57 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.benchmark; - -import com.twitter.distributedlog.benchmark.thrift.Message; -import java.nio.ByteBuffer; -import java.util.Random; -import org.apache.thrift.TException; -import org.apache.thrift.TSerializer; -import org.apache.thrift.protocol.TBinaryProtocol; -import org.apache.thrift.transport.TMemoryInputTransport; - -/** - * Utils for generating and parsing messages. - */ -public class Utils { - - static final Random RAND = new Random(System.currentTimeMillis()); - static final ThreadLocal<TSerializer> MSG_SERIALIZER = - new ThreadLocal<TSerializer>() { - @Override - public TSerializer initialValue() { - return new TSerializer(new TBinaryProtocol.Factory()); - } - }; - - public static byte[] generateMessage(long requestMillis, int payLoadSize) throws TException { - byte[] payload = new byte[payLoadSize]; - RAND.nextBytes(payload); - Message msg = new Message(requestMillis, ByteBuffer.wrap(payload)); - return MSG_SERIALIZER.get().serialize(msg); - } - - public static Message parseMessage(byte[] data) throws TException { - Message msg = new Message(); - TMemoryInputTransport transport = new TMemoryInputTransport(data); - TBinaryProtocol protocol = new TBinaryProtocol(transport); - msg.read(protocol); - return msg; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/Worker.java ---------------------------------------------------------------------- diff --git a/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/Worker.java b/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/Worker.java deleted file mode 100644 index 6c60034..0000000 --- a/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/Worker.java +++ /dev/null @@ -1,26 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.benchmark; - -import java.io.Closeable; - -/** - * Worker to run benchmark. - */ -public interface Worker extends Closeable, Runnable { -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/WriterWorker.java ---------------------------------------------------------------------- diff --git a/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/WriterWorker.java b/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/WriterWorker.java deleted file mode 100644 index dc5a6e2..0000000 --- a/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/WriterWorker.java +++ /dev/null @@ -1,387 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.benchmark; - -import static com.google.common.base.Preconditions.checkArgument; - -import com.twitter.common.zookeeper.ServerSet; -import com.twitter.distributedlog.DLSN; -import com.twitter.distributedlog.benchmark.utils.ShiftableRateLimiter; -import com.twitter.distributedlog.client.DistributedLogMultiStreamWriter; -import com.twitter.distributedlog.client.serverset.DLZkServerSet; -import com.twitter.distributedlog.exceptions.DLException; -import com.twitter.distributedlog.io.CompressionCodec; -import com.twitter.distributedlog.service.DistributedLogClient; -import com.twitter.distributedlog.service.DistributedLogClientBuilder; -import com.twitter.distributedlog.util.SchedulerUtils; -import com.twitter.finagle.builder.ClientBuilder; -import com.twitter.finagle.stats.StatsReceiver; -import com.twitter.finagle.thrift.ClientId; -import com.twitter.finagle.thrift.ClientId$; -import com.twitter.util.Duration$; -import com.twitter.util.Future; -import com.twitter.util.FutureEventListener; -import java.io.IOException; -import java.net.URI; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.List; -import java.util.Random; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; -import org.apache.bookkeeper.stats.OpStatsLogger; -import org.apache.bookkeeper.stats.StatsLogger; -import org.apache.thrift.TException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Benchmark for distributedlog proxy client. - */ -public class WriterWorker implements Worker { - - static final Logger LOG = LoggerFactory.getLogger(WriterWorker.class); - - final String streamPrefix; - final int startStreamId; - final int endStreamId; - final int writeConcurrency; - final int messageSizeBytes; - final int hostConnectionCoreSize; - final int hostConnectionLimit; - final ExecutorService executorService; - final ShiftableRateLimiter rateLimiter; - final URI dlUri; - final DLZkServerSet[] serverSets; - final List<String> finagleNames; - final Random random; - final List<String> streamNames; - final int numStreams; - final int batchSize; - final boolean thriftmux; - final boolean handshakeWithClientInfo; - final int sendBufferSize; - final int recvBufferSize; - final boolean enableBatching; - final int batchBufferSize; - final int batchFlushIntervalMicros; - private final String routingServiceFinagleName; - - volatile boolean running = true; - - final StatsReceiver statsReceiver; - final StatsLogger statsLogger; - final OpStatsLogger requestStat; - final StatsLogger exceptionsLogger; - final StatsLogger dlErrorCodeLogger; - - // callback thread - final ExecutorService executor; - - public WriterWorker(String streamPrefix, - URI uri, - int startStreamId, - int endStreamId, - ShiftableRateLimiter rateLimiter, - int writeConcurrency, - int messageSizeBytes, - int batchSize, - int hostConnectionCoreSize, - int hostConnectionLimit, - List<String> serverSetPaths, - List<String> finagleNames, - StatsReceiver statsReceiver, - StatsLogger statsLogger, - boolean thriftmux, - boolean handshakeWithClientInfo, - int sendBufferSize, - int recvBufferSize, - boolean enableBatching, - int batchBufferSize, - int batchFlushIntervalMicros, - String routingServiceFinagleName) { - checkArgument(startStreamId <= endStreamId); - checkArgument(!finagleNames.isEmpty() || !serverSetPaths.isEmpty()); - this.streamPrefix = streamPrefix; - this.dlUri = uri; - this.startStreamId = startStreamId; - this.endStreamId = endStreamId; - this.rateLimiter = rateLimiter; - this.writeConcurrency = writeConcurrency; - this.messageSizeBytes = messageSizeBytes; - this.statsReceiver = statsReceiver; - this.statsLogger = statsLogger; - this.requestStat = this.statsLogger.getOpStatsLogger("requests"); - this.exceptionsLogger = statsLogger.scope("exceptions"); - this.dlErrorCodeLogger = statsLogger.scope("dl_error_code"); - this.executorService = Executors.newCachedThreadPool(); - this.random = new Random(System.currentTimeMillis()); - this.batchSize = batchSize; - this.hostConnectionCoreSize = hostConnectionCoreSize; - this.hostConnectionLimit = hostConnectionLimit; - this.thriftmux = thriftmux; - this.handshakeWithClientInfo = handshakeWithClientInfo; - this.sendBufferSize = sendBufferSize; - this.recvBufferSize = recvBufferSize; - this.enableBatching = enableBatching; - this.batchBufferSize = batchBufferSize; - this.batchFlushIntervalMicros = batchFlushIntervalMicros; - this.finagleNames = finagleNames; - this.serverSets = createServerSets(serverSetPaths); - this.executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()); - this.routingServiceFinagleName = routingServiceFinagleName; - - // Streams - streamNames = new ArrayList<String>(endStreamId - startStreamId); - for (int i = startStreamId; i < endStreamId; i++) { - streamNames.add(String.format("%s_%d", streamPrefix, i)); - } - numStreams = streamNames.size(); - LOG.info("Writing to {} streams : {}", numStreams, streamNames); - } - - protected DLZkServerSet[] createServerSets(List<String> serverSetPaths) { - DLZkServerSet[] serverSets = new DLZkServerSet[serverSetPaths.size()]; - for (int i = 0; i < serverSets.length; i++) { - String serverSetPath = serverSetPaths.get(i); - serverSets[i] = DLZkServerSet.of(URI.create(serverSetPath), 60000); - } - return serverSets; - } - - @Override - public void close() throws IOException { - this.running = false; - SchedulerUtils.shutdownScheduler(this.executorService, 2, TimeUnit.MINUTES); - for (DLZkServerSet serverSet: serverSets) { - serverSet.close(); - } - } - - private DistributedLogClient buildDlogClient() { - ClientBuilder clientBuilder = ClientBuilder.get() - .hostConnectionLimit(hostConnectionLimit) - .hostConnectionCoresize(hostConnectionCoreSize) - .tcpConnectTimeout(Duration$.MODULE$.fromMilliseconds(200)) - .connectTimeout(Duration$.MODULE$.fromMilliseconds(200)) - .requestTimeout(Duration$.MODULE$.fromSeconds(10)) - .sendBufferSize(sendBufferSize) - .recvBufferSize(recvBufferSize); - - ClientId clientId = ClientId$.MODULE$.apply("dlog_loadtest_writer"); - - DistributedLogClientBuilder builder = DistributedLogClientBuilder.newBuilder() - .clientId(clientId) - .clientBuilder(clientBuilder) - .thriftmux(thriftmux) - .redirectBackoffStartMs(100) - .redirectBackoffMaxMs(500) - .requestTimeoutMs(10000) - .statsReceiver(statsReceiver) - .streamNameRegex("^" + streamPrefix + "_[0-9]+$") - .handshakeWithClientInfo(handshakeWithClientInfo) - .periodicHandshakeIntervalMs(TimeUnit.SECONDS.toMillis(30)) - .periodicOwnershipSyncIntervalMs(TimeUnit.MINUTES.toMillis(5)) - .periodicDumpOwnershipCache(true) - .handshakeTracing(true) - .serverRoutingServiceFinagleNameStr(routingServiceFinagleName) - .name("writer"); - - if (!finagleNames.isEmpty()) { - String local = finagleNames.get(0); - String[] remotes = new String[finagleNames.size() - 1]; - finagleNames.subList(1, finagleNames.size()).toArray(remotes); - - builder = builder.finagleNameStrs(local, remotes); - } else if (serverSets.length != 0){ - ServerSet local = serverSets[0].getServerSet(); - ServerSet[] remotes = new ServerSet[serverSets.length - 1]; - for (int i = 1; i < serverSets.length; i++) { - remotes[i - 1] = serverSets[i].getServerSet(); - } - builder = builder.serverSets(local, remotes); - } else { - builder = builder.uri(dlUri); - } - - return builder.build(); - } - - ByteBuffer buildBuffer(long requestMillis, int messageSizeBytes) { - ByteBuffer data; - try { - data = ByteBuffer.wrap(Utils.generateMessage(requestMillis, messageSizeBytes)); - return data; - } catch (TException e) { - LOG.error("Error generating message : ", e); - return null; - } - } - - List<ByteBuffer> buildBufferList(int batchSize, long requestMillis, int messageSizeBytes) { - ArrayList<ByteBuffer> bufferList = new ArrayList<ByteBuffer>(batchSize); - for (int i = 0; i < batchSize; i++) { - ByteBuffer buf = buildBuffer(requestMillis, messageSizeBytes); - if (null == buf) { - return null; - } - bufferList.add(buf); - } - return bufferList; - } - - class TimedRequestHandler implements FutureEventListener<DLSN>, Runnable { - final String streamName; - final long requestMillis; - DLSN dlsn = null; - Throwable cause = null; - - TimedRequestHandler(String streamName, - long requestMillis) { - this.streamName = streamName; - this.requestMillis = requestMillis; - } - @Override - public void onSuccess(DLSN value) { - dlsn = value; - executor.submit(this); - } - @Override - public void onFailure(Throwable cause) { - this.cause = cause; - executor.submit(this); - } - - @Override - public void run() { - if (null != dlsn) { - requestStat.registerSuccessfulEvent(System.currentTimeMillis() - requestMillis); - } else { - LOG.error("Failed to publish to {} : ", streamName, cause); - requestStat.registerFailedEvent(System.currentTimeMillis() - requestMillis); - exceptionsLogger.getCounter(cause.getClass().getName()).inc(); - if (cause instanceof DLException) { - DLException dle = (DLException) cause; - dlErrorCodeLogger.getCounter(dle.getCode().toString()).inc(); - } - } - } - } - - class Writer implements Runnable { - - final int idx; - final DistributedLogClient dlc; - DistributedLogMultiStreamWriter writer = null; - final ShiftableRateLimiter limiter; - - Writer(int idx) { - this.idx = idx; - this.dlc = buildDlogClient(); - if (enableBatching) { - writer = DistributedLogMultiStreamWriter.newBuilder() - .client(this.dlc) - .streams(streamNames) - .compressionCodec(CompressionCodec.Type.NONE) - .flushIntervalMicros(batchFlushIntervalMicros) - .bufferSize(batchBufferSize) - .firstSpeculativeTimeoutMs(9000) - .maxSpeculativeTimeoutMs(9000) - .requestTimeoutMs(10000) - .speculativeBackoffMultiplier(2) - .build(); - } - this.limiter = rateLimiter.duplicate(); - } - - @Override - public void run() { - LOG.info("Started writer {}.", idx); - while (running) { - this.limiter.getLimiter().acquire(); - final String streamName = streamNames.get(random.nextInt(numStreams)); - final long requestMillis = System.currentTimeMillis(); - final ByteBuffer data = buildBuffer(requestMillis, messageSizeBytes); - if (null == data) { - break; - } - if (null != writer) { - writer.write(data).addEventListener( - new TimedRequestHandler(streamName, requestMillis)); - } else { - dlc.write(streamName, data).addEventListener( - new TimedRequestHandler(streamName, requestMillis)); - } - } - if (null != writer) { - writer.close(); - } - dlc.close(); - } - } - - class BulkWriter implements Runnable { - - final int idx; - final DistributedLogClient dlc; - - BulkWriter(int idx) { - this.idx = idx; - this.dlc = buildDlogClient(); - } - - @Override - public void run() { - LOG.info("Started writer {}.", idx); - while (running) { - rateLimiter.getLimiter().acquire(batchSize); - String streamName = streamNames.get(random.nextInt(numStreams)); - final long requestMillis = System.currentTimeMillis(); - final List<ByteBuffer> data = buildBufferList(batchSize, requestMillis, messageSizeBytes); - if (null == data) { - break; - } - List<Future<DLSN>> results = dlc.writeBulk(streamName, data); - for (Future<DLSN> result : results) { - result.addEventListener(new TimedRequestHandler(streamName, requestMillis)); - } - } - dlc.close(); - } - } - - @Override - public void run() { - LOG.info("Starting writer (concurrency = {}, prefix = {}, batchSize = {})", - new Object[] { writeConcurrency, streamPrefix, batchSize }); - try { - for (int i = 0; i < writeConcurrency; i++) { - Runnable writer = null; - if (batchSize > 0) { - writer = new BulkWriter(i); - } else { - writer = new Writer(i); - } - executorService.submit(writer); - } - } catch (Throwable t) { - LOG.error("Unhandled exception caught", t); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/package-info.java ---------------------------------------------------------------------- diff --git a/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/package-info.java b/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/package-info.java deleted file mode 100644 index 052a661..0000000 --- a/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/package-info.java +++ /dev/null @@ -1,21 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -/** - * Benchmarks for distributedlog. - */ -package com.twitter.distributedlog.benchmark; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/stream/AbstractReaderBenchmark.java ---------------------------------------------------------------------- diff --git a/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/stream/AbstractReaderBenchmark.java b/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/stream/AbstractReaderBenchmark.java deleted file mode 100644 index 4d436ee..0000000 --- a/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/stream/AbstractReaderBenchmark.java +++ /dev/null @@ -1,70 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.benchmark.stream; - -import com.twitter.distributedlog.DistributedLogConstants; -import org.apache.commons.cli.CommandLine; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -abstract class AbstractReaderBenchmark extends StreamBenchmark { - - private static final Logger logger = LoggerFactory.getLogger(SyncReaderBenchmark.class); - - protected ReadMode readMode = ReadMode.LATEST; - protected long fromTxId = DistributedLogConstants.INVALID_TXID; - protected long rewindMs = 0L; - protected int batchSize = 1; - - protected AbstractReaderBenchmark() { - options.addOption("t", "tx-id", true, - "Transaction ID to start read from when reading in mode 'position'"); - options.addOption("r", "rewind", true, - "Time to rewind back to read from when reading in mode 'rewind' (in milliseconds)"); - options.addOption("m", "mode", true, - "Read Mode : [oldest, latest, rewind, position]"); - options.addOption("b", "batch-size", true, "Read batch size"); - } - - @Override - protected void parseCommandLine(CommandLine cmdline) { - if (cmdline.hasOption("m")) { - String mode = cmdline.getOptionValue("m"); - try { - readMode = ReadMode.valueOf(mode.toUpperCase()); - } catch (IllegalArgumentException iae) { - logger.error("Invalid read mode {}.", mode); - printUsage(); - System.exit(0); - } - } else { - printUsage(); - System.exit(0); - } - if (cmdline.hasOption("t")) { - fromTxId = Long.parseLong(cmdline.getOptionValue("t")); - } - if (cmdline.hasOption("r")) { - rewindMs = Long.parseLong(cmdline.getOptionValue("r")); - } - if (cmdline.hasOption("b")) { - batchSize = Integer.parseInt(cmdline.getOptionValue("b")); - } - logger.info("Start reading from transaction id {}, rewind {} ms.", fromTxId, rewindMs); - } -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/stream/AsyncReaderBenchmark.java ---------------------------------------------------------------------- diff --git a/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/stream/AsyncReaderBenchmark.java b/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/stream/AsyncReaderBenchmark.java deleted file mode 100644 index 86acdb6..0000000 --- a/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/stream/AsyncReaderBenchmark.java +++ /dev/null @@ -1,158 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.benchmark.stream; - -import com.google.common.base.Stopwatch; -import com.twitter.distributedlog.AsyncLogReader; -import com.twitter.distributedlog.DLSN; -import com.twitter.distributedlog.DistributedLogManager; -import com.twitter.distributedlog.LogRecordWithDLSN; -import com.twitter.distributedlog.namespace.DistributedLogNamespace; -import com.twitter.distributedlog.util.FutureUtils; -import java.io.IOException; -import java.util.List; -import java.util.concurrent.TimeUnit; -import org.apache.bookkeeper.stats.Counter; -import org.apache.bookkeeper.stats.OpStatsLogger; -import org.apache.bookkeeper.stats.StatsLogger; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Benchmark on {@link com.twitter.distributedlog.AsyncLogReader} reading from a stream. - */ -public class AsyncReaderBenchmark extends AbstractReaderBenchmark { - - private static final Logger logger = LoggerFactory.getLogger(AsyncReaderBenchmark.class); - - @Override - protected void benchmark(DistributedLogNamespace namespace, String logName, StatsLogger statsLogger) { - DistributedLogManager dlm = null; - while (null == dlm) { - try { - dlm = namespace.openLog(streamName); - } catch (IOException ioe) { - logger.warn("Failed to create dlm for stream {} : ", streamName, ioe); - } - if (null == dlm) { - try { - TimeUnit.MILLISECONDS.sleep(conf.getZKSessionTimeoutMilliseconds()); - } catch (InterruptedException e) { - logger.warn("Interrupted from sleep while creating dlm for stream {} : ", - streamName, e); - } - } - } - logger.info("Created dlm for stream {}.", streamName); - - // Stats - OpStatsLogger openReaderStats = statsLogger.getOpStatsLogger("open_reader"); - OpStatsLogger blockingReadStats = statsLogger.getOpStatsLogger("blocking_read"); - Counter readCounter = statsLogger.getCounter("reads"); - - AsyncLogReader reader = null; - DLSN lastDLSN = null; - Long lastTxId = null; - while (null == reader) { - // initialize the last txid - if (null == lastTxId) { - switch (readMode) { - case OLDEST: - lastTxId = 0L; - lastDLSN = DLSN.InitialDLSN; - break; - case LATEST: - lastTxId = Long.MAX_VALUE; - try { - lastDLSN = dlm.getLastDLSN(); - } catch (IOException ioe) { - continue; - } - break; - case REWIND: - lastTxId = System.currentTimeMillis() - rewindMs; - lastDLSN = null; - break; - case POSITION: - lastTxId = fromTxId; - lastDLSN = null; - break; - default: - logger.warn("Unsupported mode {}", readMode); - printUsage(); - System.exit(0); - break; - } - logger.info("Reading from transaction id = {}, dlsn = {}", lastTxId, lastDLSN); - } - // Open the reader - Stopwatch stopwatch = Stopwatch.createStarted(); - try { - if (null == lastDLSN) { - reader = FutureUtils.result(dlm.openAsyncLogReader(lastTxId)); - } else { - reader = FutureUtils.result(dlm.openAsyncLogReader(lastDLSN)); - } - long elapsedMs = stopwatch.elapsed(TimeUnit.MICROSECONDS); - openReaderStats.registerSuccessfulEvent(elapsedMs); - logger.info("It took {} ms to position the reader to transaction id = {}, dlsn = {}", - lastTxId, lastDLSN); - } catch (IOException ioe) { - openReaderStats.registerFailedEvent(stopwatch.elapsed(TimeUnit.MICROSECONDS)); - logger.warn("Failed to create reader for stream {} reading from tx id = {}, dlsn = {}.", - new Object[] { streamName, lastTxId, lastDLSN }); - } - if (null == reader) { - try { - TimeUnit.MILLISECONDS.sleep(conf.getZKSessionTimeoutMilliseconds()); - } catch (InterruptedException e) { - logger.warn("Interrupted from sleep after reader was reassigned null for stream {} : ", - streamName, e); - } - continue; - } - List<LogRecordWithDLSN> records; - stopwatch = Stopwatch.createUnstarted(); - while (true) { - try { - stopwatch.start(); - records = FutureUtils.result(reader.readBulk(batchSize)); - long elapsedMicros = stopwatch.stop().elapsed(TimeUnit.MICROSECONDS); - blockingReadStats.registerSuccessfulEvent(elapsedMicros); - if (!records.isEmpty()) { - readCounter.add(records.size()); - LogRecordWithDLSN lastRecord = records.get(records.size() - 1); - lastTxId = lastRecord.getTransactionId(); - lastDLSN = lastRecord.getDlsn(); - } - stopwatch.reset(); - } catch (IOException e) { - logger.warn("Encountered reading record from stream {} : ", streamName, e); - reader = null; - break; - } - } - try { - TimeUnit.MILLISECONDS.sleep(conf.getZKSessionTimeoutMilliseconds()); - } catch (InterruptedException e) { - logger.warn("Interrupted from sleep while creating reader for stream {} : ", - streamName, e); - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/stream/LedgerBatchReader.java ---------------------------------------------------------------------- diff --git a/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/stream/LedgerBatchReader.java b/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/stream/LedgerBatchReader.java deleted file mode 100644 index 6a11469..0000000 --- a/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/stream/LedgerBatchReader.java +++ /dev/null @@ -1,82 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.benchmark.stream; - -import java.util.Enumeration; -import org.apache.bookkeeper.client.BKException; -import org.apache.bookkeeper.client.LedgerEntry; -import org.apache.bookkeeper.client.LedgerHandle; -import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryListener; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Read ledgers in batches. - */ -public class LedgerBatchReader implements Runnable { - - private static final Logger logger = LoggerFactory.getLogger(LedgerBatchReader.class); - - private final LedgerHandle lh; - private final ReadEntryListener readEntryListener; - private final int batchSize; - - public LedgerBatchReader(LedgerHandle lh, - ReadEntryListener readEntryListener, - int batchSize) { - this.lh = lh; - this.batchSize = batchSize; - this.readEntryListener = readEntryListener; - } - - @Override - public void run() { - long lac = lh.getLastAddConfirmed(); - - long entryId = 0L; - - while (entryId <= lac) { - long startEntryId = entryId; - long endEntryId = Math.min(startEntryId + batchSize - 1, lac); - - Enumeration<LedgerEntry> entries = null; - while (null == entries) { - try { - entries = lh.readEntries(startEntryId, endEntryId); - } catch (BKException bke) { - logger.error("Encountered exceptions on reading [ {} - {} ] ", - new Object[] { startEntryId, endEntryId, bke }); - } catch (InterruptedException ie) { - Thread.currentThread().interrupt(); - break; - } - } - if (null == entries) { - break; - } - - while (entries.hasMoreElements()) { - LedgerEntry entry = entries.nextElement(); - readEntryListener.onEntryComplete(BKException.Code.OK, lh, entry, null); - } - - entryId = endEntryId + 1; - } - - } -}