http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/Worker.java ---------------------------------------------------------------------- diff --git a/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/Worker.java b/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/Worker.java new file mode 100644 index 0000000..a948092 --- /dev/null +++ b/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/Worker.java @@ -0,0 +1,26 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog.benchmark; + +import java.io.Closeable; + +/** + * Worker to run benchmark. + */ +public interface Worker extends Closeable, Runnable { +}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/WriterWorker.java ---------------------------------------------------------------------- diff --git a/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/WriterWorker.java b/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/WriterWorker.java new file mode 100644 index 0000000..9e96765 --- /dev/null +++ b/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/WriterWorker.java @@ -0,0 +1,387 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog.benchmark; + +import static com.google.common.base.Preconditions.checkArgument; + +import com.twitter.common.zookeeper.ServerSet; +import org.apache.distributedlog.DLSN; +import org.apache.distributedlog.benchmark.utils.ShiftableRateLimiter; +import org.apache.distributedlog.client.DistributedLogMultiStreamWriter; +import org.apache.distributedlog.client.serverset.DLZkServerSet; +import org.apache.distributedlog.exceptions.DLException; +import org.apache.distributedlog.io.CompressionCodec; +import org.apache.distributedlog.service.DistributedLogClient; +import org.apache.distributedlog.service.DistributedLogClientBuilder; +import org.apache.distributedlog.util.SchedulerUtils; +import com.twitter.finagle.builder.ClientBuilder; +import com.twitter.finagle.stats.StatsReceiver; +import com.twitter.finagle.thrift.ClientId; +import com.twitter.finagle.thrift.ClientId$; +import com.twitter.util.Duration$; +import com.twitter.util.Future; +import com.twitter.util.FutureEventListener; +import java.io.IOException; +import java.net.URI; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; +import java.util.Random; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import org.apache.bookkeeper.stats.OpStatsLogger; +import org.apache.bookkeeper.stats.StatsLogger; +import org.apache.thrift.TException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Benchmark for distributedlog proxy client. + */ +public class WriterWorker implements Worker { + + static final Logger LOG = LoggerFactory.getLogger(WriterWorker.class); + + final String streamPrefix; + final int startStreamId; + final int endStreamId; + final int writeConcurrency; + final int messageSizeBytes; + final int hostConnectionCoreSize; + final int hostConnectionLimit; + final ExecutorService executorService; + final ShiftableRateLimiter rateLimiter; + final URI dlUri; + final DLZkServerSet[] serverSets; + final List<String> finagleNames; + final Random random; + final List<String> streamNames; + final int numStreams; + final int batchSize; + final boolean thriftmux; + final boolean handshakeWithClientInfo; + final int sendBufferSize; + final int recvBufferSize; + final boolean enableBatching; + final int batchBufferSize; + final int batchFlushIntervalMicros; + private final String routingServiceFinagleName; + + volatile boolean running = true; + + final StatsReceiver statsReceiver; + final StatsLogger statsLogger; + final OpStatsLogger requestStat; + final StatsLogger exceptionsLogger; + final StatsLogger dlErrorCodeLogger; + + // callback thread + final ExecutorService executor; + + public WriterWorker(String streamPrefix, + URI uri, + int startStreamId, + int endStreamId, + ShiftableRateLimiter rateLimiter, + int writeConcurrency, + int messageSizeBytes, + int batchSize, + int hostConnectionCoreSize, + int hostConnectionLimit, + List<String> serverSetPaths, + List<String> finagleNames, + StatsReceiver statsReceiver, + StatsLogger statsLogger, + boolean thriftmux, + boolean handshakeWithClientInfo, + int sendBufferSize, + int recvBufferSize, + boolean enableBatching, + int batchBufferSize, + int batchFlushIntervalMicros, + String routingServiceFinagleName) { + checkArgument(startStreamId <= endStreamId); + checkArgument(!finagleNames.isEmpty() || !serverSetPaths.isEmpty()); + this.streamPrefix = streamPrefix; + this.dlUri = uri; + this.startStreamId = startStreamId; + this.endStreamId = endStreamId; + this.rateLimiter = rateLimiter; + this.writeConcurrency = writeConcurrency; + this.messageSizeBytes = messageSizeBytes; + this.statsReceiver = statsReceiver; + this.statsLogger = statsLogger; + this.requestStat = this.statsLogger.getOpStatsLogger("requests"); + this.exceptionsLogger = statsLogger.scope("exceptions"); + this.dlErrorCodeLogger = statsLogger.scope("dl_error_code"); + this.executorService = Executors.newCachedThreadPool(); + this.random = new Random(System.currentTimeMillis()); + this.batchSize = batchSize; + this.hostConnectionCoreSize = hostConnectionCoreSize; + this.hostConnectionLimit = hostConnectionLimit; + this.thriftmux = thriftmux; + this.handshakeWithClientInfo = handshakeWithClientInfo; + this.sendBufferSize = sendBufferSize; + this.recvBufferSize = recvBufferSize; + this.enableBatching = enableBatching; + this.batchBufferSize = batchBufferSize; + this.batchFlushIntervalMicros = batchFlushIntervalMicros; + this.finagleNames = finagleNames; + this.serverSets = createServerSets(serverSetPaths); + this.executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()); + this.routingServiceFinagleName = routingServiceFinagleName; + + // Streams + streamNames = new ArrayList<String>(endStreamId - startStreamId); + for (int i = startStreamId; i < endStreamId; i++) { + streamNames.add(String.format("%s_%d", streamPrefix, i)); + } + numStreams = streamNames.size(); + LOG.info("Writing to {} streams : {}", numStreams, streamNames); + } + + protected DLZkServerSet[] createServerSets(List<String> serverSetPaths) { + DLZkServerSet[] serverSets = new DLZkServerSet[serverSetPaths.size()]; + for (int i = 0; i < serverSets.length; i++) { + String serverSetPath = serverSetPaths.get(i); + serverSets[i] = DLZkServerSet.of(URI.create(serverSetPath), 60000); + } + return serverSets; + } + + @Override + public void close() throws IOException { + this.running = false; + SchedulerUtils.shutdownScheduler(this.executorService, 2, TimeUnit.MINUTES); + for (DLZkServerSet serverSet: serverSets) { + serverSet.close(); + } + } + + private DistributedLogClient buildDlogClient() { + ClientBuilder clientBuilder = ClientBuilder.get() + .hostConnectionLimit(hostConnectionLimit) + .hostConnectionCoresize(hostConnectionCoreSize) + .tcpConnectTimeout(Duration$.MODULE$.fromMilliseconds(200)) + .connectTimeout(Duration$.MODULE$.fromMilliseconds(200)) + .requestTimeout(Duration$.MODULE$.fromSeconds(10)) + .sendBufferSize(sendBufferSize) + .recvBufferSize(recvBufferSize); + + ClientId clientId = ClientId$.MODULE$.apply("dlog_loadtest_writer"); + + DistributedLogClientBuilder builder = DistributedLogClientBuilder.newBuilder() + .clientId(clientId) + .clientBuilder(clientBuilder) + .thriftmux(thriftmux) + .redirectBackoffStartMs(100) + .redirectBackoffMaxMs(500) + .requestTimeoutMs(10000) + .statsReceiver(statsReceiver) + .streamNameRegex("^" + streamPrefix + "_[0-9]+$") + .handshakeWithClientInfo(handshakeWithClientInfo) + .periodicHandshakeIntervalMs(TimeUnit.SECONDS.toMillis(30)) + .periodicOwnershipSyncIntervalMs(TimeUnit.MINUTES.toMillis(5)) + .periodicDumpOwnershipCache(true) + .handshakeTracing(true) + .serverRoutingServiceFinagleNameStr(routingServiceFinagleName) + .name("writer"); + + if (!finagleNames.isEmpty()) { + String local = finagleNames.get(0); + String[] remotes = new String[finagleNames.size() - 1]; + finagleNames.subList(1, finagleNames.size()).toArray(remotes); + + builder = builder.finagleNameStrs(local, remotes); + } else if (serverSets.length != 0){ + ServerSet local = serverSets[0].getServerSet(); + ServerSet[] remotes = new ServerSet[serverSets.length - 1]; + for (int i = 1; i < serverSets.length; i++) { + remotes[i - 1] = serverSets[i].getServerSet(); + } + builder = builder.serverSets(local, remotes); + } else { + builder = builder.uri(dlUri); + } + + return builder.build(); + } + + ByteBuffer buildBuffer(long requestMillis, int messageSizeBytes) { + ByteBuffer data; + try { + data = ByteBuffer.wrap(Utils.generateMessage(requestMillis, messageSizeBytes)); + return data; + } catch (TException e) { + LOG.error("Error generating message : ", e); + return null; + } + } + + List<ByteBuffer> buildBufferList(int batchSize, long requestMillis, int messageSizeBytes) { + ArrayList<ByteBuffer> bufferList = new ArrayList<ByteBuffer>(batchSize); + for (int i = 0; i < batchSize; i++) { + ByteBuffer buf = buildBuffer(requestMillis, messageSizeBytes); + if (null == buf) { + return null; + } + bufferList.add(buf); + } + return bufferList; + } + + class TimedRequestHandler implements FutureEventListener<DLSN>, Runnable { + final String streamName; + final long requestMillis; + DLSN dlsn = null; + Throwable cause = null; + + TimedRequestHandler(String streamName, + long requestMillis) { + this.streamName = streamName; + this.requestMillis = requestMillis; + } + @Override + public void onSuccess(DLSN value) { + dlsn = value; + executor.submit(this); + } + @Override + public void onFailure(Throwable cause) { + this.cause = cause; + executor.submit(this); + } + + @Override + public void run() { + if (null != dlsn) { + requestStat.registerSuccessfulEvent(System.currentTimeMillis() - requestMillis); + } else { + LOG.error("Failed to publish to {} : ", streamName, cause); + requestStat.registerFailedEvent(System.currentTimeMillis() - requestMillis); + exceptionsLogger.getCounter(cause.getClass().getName()).inc(); + if (cause instanceof DLException) { + DLException dle = (DLException) cause; + dlErrorCodeLogger.getCounter(dle.getCode().toString()).inc(); + } + } + } + } + + class Writer implements Runnable { + + final int idx; + final DistributedLogClient dlc; + DistributedLogMultiStreamWriter writer = null; + final ShiftableRateLimiter limiter; + + Writer(int idx) { + this.idx = idx; + this.dlc = buildDlogClient(); + if (enableBatching) { + writer = DistributedLogMultiStreamWriter.newBuilder() + .client(this.dlc) + .streams(streamNames) + .compressionCodec(CompressionCodec.Type.NONE) + .flushIntervalMicros(batchFlushIntervalMicros) + .bufferSize(batchBufferSize) + .firstSpeculativeTimeoutMs(9000) + .maxSpeculativeTimeoutMs(9000) + .requestTimeoutMs(10000) + .speculativeBackoffMultiplier(2) + .build(); + } + this.limiter = rateLimiter.duplicate(); + } + + @Override + public void run() { + LOG.info("Started writer {}.", idx); + while (running) { + this.limiter.getLimiter().acquire(); + final String streamName = streamNames.get(random.nextInt(numStreams)); + final long requestMillis = System.currentTimeMillis(); + final ByteBuffer data = buildBuffer(requestMillis, messageSizeBytes); + if (null == data) { + break; + } + if (null != writer) { + writer.write(data).addEventListener( + new TimedRequestHandler(streamName, requestMillis)); + } else { + dlc.write(streamName, data).addEventListener( + new TimedRequestHandler(streamName, requestMillis)); + } + } + if (null != writer) { + writer.close(); + } + dlc.close(); + } + } + + class BulkWriter implements Runnable { + + final int idx; + final DistributedLogClient dlc; + + BulkWriter(int idx) { + this.idx = idx; + this.dlc = buildDlogClient(); + } + + @Override + public void run() { + LOG.info("Started writer {}.", idx); + while (running) { + rateLimiter.getLimiter().acquire(batchSize); + String streamName = streamNames.get(random.nextInt(numStreams)); + final long requestMillis = System.currentTimeMillis(); + final List<ByteBuffer> data = buildBufferList(batchSize, requestMillis, messageSizeBytes); + if (null == data) { + break; + } + List<Future<DLSN>> results = dlc.writeBulk(streamName, data); + for (Future<DLSN> result : results) { + result.addEventListener(new TimedRequestHandler(streamName, requestMillis)); + } + } + dlc.close(); + } + } + + @Override + public void run() { + LOG.info("Starting writer (concurrency = {}, prefix = {}, batchSize = {})", + new Object[] { writeConcurrency, streamPrefix, batchSize }); + try { + for (int i = 0; i < writeConcurrency; i++) { + Runnable writer = null; + if (batchSize > 0) { + writer = new BulkWriter(i); + } else { + writer = new Writer(i); + } + executorService.submit(writer); + } + } catch (Throwable t) { + LOG.error("Unhandled exception caught", t); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/package-info.java ---------------------------------------------------------------------- diff --git a/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/package-info.java b/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/package-info.java new file mode 100644 index 0000000..7e87644 --- /dev/null +++ b/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/package-info.java @@ -0,0 +1,21 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Benchmarks for distributedlog. + */ +package org.apache.distributedlog.benchmark; http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/stream/AbstractReaderBenchmark.java ---------------------------------------------------------------------- diff --git a/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/stream/AbstractReaderBenchmark.java b/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/stream/AbstractReaderBenchmark.java new file mode 100644 index 0000000..a1f1f9f --- /dev/null +++ b/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/stream/AbstractReaderBenchmark.java @@ -0,0 +1,70 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog.benchmark.stream; + +import org.apache.distributedlog.DistributedLogConstants; +import org.apache.commons.cli.CommandLine; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +abstract class AbstractReaderBenchmark extends StreamBenchmark { + + private static final Logger logger = LoggerFactory.getLogger(SyncReaderBenchmark.class); + + protected ReadMode readMode = ReadMode.LATEST; + protected long fromTxId = DistributedLogConstants.INVALID_TXID; + protected long rewindMs = 0L; + protected int batchSize = 1; + + protected AbstractReaderBenchmark() { + options.addOption("t", "tx-id", true, + "Transaction ID to start read from when reading in mode 'position'"); + options.addOption("r", "rewind", true, + "Time to rewind back to read from when reading in mode 'rewind' (in milliseconds)"); + options.addOption("m", "mode", true, + "Read Mode : [oldest, latest, rewind, position]"); + options.addOption("b", "batch-size", true, "Read batch size"); + } + + @Override + protected void parseCommandLine(CommandLine cmdline) { + if (cmdline.hasOption("m")) { + String mode = cmdline.getOptionValue("m"); + try { + readMode = ReadMode.valueOf(mode.toUpperCase()); + } catch (IllegalArgumentException iae) { + logger.error("Invalid read mode {}.", mode); + printUsage(); + System.exit(0); + } + } else { + printUsage(); + System.exit(0); + } + if (cmdline.hasOption("t")) { + fromTxId = Long.parseLong(cmdline.getOptionValue("t")); + } + if (cmdline.hasOption("r")) { + rewindMs = Long.parseLong(cmdline.getOptionValue("r")); + } + if (cmdline.hasOption("b")) { + batchSize = Integer.parseInt(cmdline.getOptionValue("b")); + } + logger.info("Start reading from transaction id {}, rewind {} ms.", fromTxId, rewindMs); + } +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/stream/AsyncReaderBenchmark.java ---------------------------------------------------------------------- diff --git a/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/stream/AsyncReaderBenchmark.java b/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/stream/AsyncReaderBenchmark.java new file mode 100644 index 0000000..4930b8a --- /dev/null +++ b/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/stream/AsyncReaderBenchmark.java @@ -0,0 +1,158 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog.benchmark.stream; + +import com.google.common.base.Stopwatch; +import org.apache.distributedlog.AsyncLogReader; +import org.apache.distributedlog.DLSN; +import org.apache.distributedlog.DistributedLogManager; +import org.apache.distributedlog.LogRecordWithDLSN; +import org.apache.distributedlog.namespace.DistributedLogNamespace; +import org.apache.distributedlog.util.FutureUtils; +import java.io.IOException; +import java.util.List; +import java.util.concurrent.TimeUnit; +import org.apache.bookkeeper.stats.Counter; +import org.apache.bookkeeper.stats.OpStatsLogger; +import org.apache.bookkeeper.stats.StatsLogger; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Benchmark on {@link org.apache.distributedlog.AsyncLogReader} reading from a stream. + */ +public class AsyncReaderBenchmark extends AbstractReaderBenchmark { + + private static final Logger logger = LoggerFactory.getLogger(AsyncReaderBenchmark.class); + + @Override + protected void benchmark(DistributedLogNamespace namespace, String logName, StatsLogger statsLogger) { + DistributedLogManager dlm = null; + while (null == dlm) { + try { + dlm = namespace.openLog(streamName); + } catch (IOException ioe) { + logger.warn("Failed to create dlm for stream {} : ", streamName, ioe); + } + if (null == dlm) { + try { + TimeUnit.MILLISECONDS.sleep(conf.getZKSessionTimeoutMilliseconds()); + } catch (InterruptedException e) { + logger.warn("Interrupted from sleep while creating dlm for stream {} : ", + streamName, e); + } + } + } + logger.info("Created dlm for stream {}.", streamName); + + // Stats + OpStatsLogger openReaderStats = statsLogger.getOpStatsLogger("open_reader"); + OpStatsLogger blockingReadStats = statsLogger.getOpStatsLogger("blocking_read"); + Counter readCounter = statsLogger.getCounter("reads"); + + AsyncLogReader reader = null; + DLSN lastDLSN = null; + Long lastTxId = null; + while (null == reader) { + // initialize the last txid + if (null == lastTxId) { + switch (readMode) { + case OLDEST: + lastTxId = 0L; + lastDLSN = DLSN.InitialDLSN; + break; + case LATEST: + lastTxId = Long.MAX_VALUE; + try { + lastDLSN = dlm.getLastDLSN(); + } catch (IOException ioe) { + continue; + } + break; + case REWIND: + lastTxId = System.currentTimeMillis() - rewindMs; + lastDLSN = null; + break; + case POSITION: + lastTxId = fromTxId; + lastDLSN = null; + break; + default: + logger.warn("Unsupported mode {}", readMode); + printUsage(); + System.exit(0); + break; + } + logger.info("Reading from transaction id = {}, dlsn = {}", lastTxId, lastDLSN); + } + // Open the reader + Stopwatch stopwatch = Stopwatch.createStarted(); + try { + if (null == lastDLSN) { + reader = FutureUtils.result(dlm.openAsyncLogReader(lastTxId)); + } else { + reader = FutureUtils.result(dlm.openAsyncLogReader(lastDLSN)); + } + long elapsedMs = stopwatch.elapsed(TimeUnit.MICROSECONDS); + openReaderStats.registerSuccessfulEvent(elapsedMs); + logger.info("It took {} ms to position the reader to transaction id = {}, dlsn = {}", + lastTxId, lastDLSN); + } catch (IOException ioe) { + openReaderStats.registerFailedEvent(stopwatch.elapsed(TimeUnit.MICROSECONDS)); + logger.warn("Failed to create reader for stream {} reading from tx id = {}, dlsn = {}.", + new Object[] { streamName, lastTxId, lastDLSN }); + } + if (null == reader) { + try { + TimeUnit.MILLISECONDS.sleep(conf.getZKSessionTimeoutMilliseconds()); + } catch (InterruptedException e) { + logger.warn("Interrupted from sleep after reader was reassigned null for stream {} : ", + streamName, e); + } + continue; + } + List<LogRecordWithDLSN> records; + stopwatch = Stopwatch.createUnstarted(); + while (true) { + try { + stopwatch.start(); + records = FutureUtils.result(reader.readBulk(batchSize)); + long elapsedMicros = stopwatch.stop().elapsed(TimeUnit.MICROSECONDS); + blockingReadStats.registerSuccessfulEvent(elapsedMicros); + if (!records.isEmpty()) { + readCounter.add(records.size()); + LogRecordWithDLSN lastRecord = records.get(records.size() - 1); + lastTxId = lastRecord.getTransactionId(); + lastDLSN = lastRecord.getDlsn(); + } + stopwatch.reset(); + } catch (IOException e) { + logger.warn("Encountered reading record from stream {} : ", streamName, e); + reader = null; + break; + } + } + try { + TimeUnit.MILLISECONDS.sleep(conf.getZKSessionTimeoutMilliseconds()); + } catch (InterruptedException e) { + logger.warn("Interrupted from sleep while creating reader for stream {} : ", + streamName, e); + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/stream/LedgerBatchReader.java ---------------------------------------------------------------------- diff --git a/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/stream/LedgerBatchReader.java b/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/stream/LedgerBatchReader.java new file mode 100644 index 0000000..b115192 --- /dev/null +++ b/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/stream/LedgerBatchReader.java @@ -0,0 +1,82 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog.benchmark.stream; + +import java.util.Enumeration; +import org.apache.bookkeeper.client.BKException; +import org.apache.bookkeeper.client.LedgerEntry; +import org.apache.bookkeeper.client.LedgerHandle; +import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryListener; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Read ledgers in batches. + */ +public class LedgerBatchReader implements Runnable { + + private static final Logger logger = LoggerFactory.getLogger(LedgerBatchReader.class); + + private final LedgerHandle lh; + private final ReadEntryListener readEntryListener; + private final int batchSize; + + public LedgerBatchReader(LedgerHandle lh, + ReadEntryListener readEntryListener, + int batchSize) { + this.lh = lh; + this.batchSize = batchSize; + this.readEntryListener = readEntryListener; + } + + @Override + public void run() { + long lac = lh.getLastAddConfirmed(); + + long entryId = 0L; + + while (entryId <= lac) { + long startEntryId = entryId; + long endEntryId = Math.min(startEntryId + batchSize - 1, lac); + + Enumeration<LedgerEntry> entries = null; + while (null == entries) { + try { + entries = lh.readEntries(startEntryId, endEntryId); + } catch (BKException bke) { + logger.error("Encountered exceptions on reading [ {} - {} ] ", + new Object[] { startEntryId, endEntryId, bke }); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + break; + } + } + if (null == entries) { + break; + } + + while (entries.hasMoreElements()) { + LedgerEntry entry = entries.nextElement(); + readEntryListener.onEntryComplete(BKException.Code.OK, lh, entry, null); + } + + entryId = endEntryId + 1; + } + + } +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/stream/LedgerReadBenchmark.java ---------------------------------------------------------------------- diff --git a/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/stream/LedgerReadBenchmark.java b/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/stream/LedgerReadBenchmark.java new file mode 100644 index 0000000..489e5af --- /dev/null +++ b/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/stream/LedgerReadBenchmark.java @@ -0,0 +1,151 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog.benchmark.stream; + +import static com.google.common.base.Charsets.UTF_8; + +import com.google.common.base.Stopwatch; +import org.apache.distributedlog.BookKeeperClientBuilder; +import org.apache.distributedlog.DistributedLogManager; +import org.apache.distributedlog.LogSegmentMetadata; +import org.apache.distributedlog.ZooKeeperClient; +import org.apache.distributedlog.ZooKeeperClientBuilder; +import org.apache.distributedlog.impl.metadata.BKDLConfig; +import org.apache.distributedlog.namespace.DistributedLogNamespace; +import java.io.IOException; +import java.util.List; +import java.util.concurrent.TimeUnit; +import org.apache.bookkeeper.client.BookKeeper; +import org.apache.bookkeeper.client.LedgerEntry; +import org.apache.bookkeeper.client.LedgerHandle; +import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks; +import org.apache.bookkeeper.stats.Counter; +import org.apache.bookkeeper.stats.StatsLogger; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Benchmark ledger reading. + */ +public class LedgerReadBenchmark extends AbstractReaderBenchmark { + + private static final Logger logger = LoggerFactory.getLogger(AsyncReaderBenchmark.class); + + @Override + protected void benchmark(DistributedLogNamespace namespace, String logName, StatsLogger statsLogger) { + DistributedLogManager dlm = null; + while (null == dlm) { + try { + dlm = namespace.openLog(streamName); + } catch (IOException ioe) { + logger.warn("Failed to create dlm for stream {} : ", streamName, ioe); + } + if (null == dlm) { + try { + TimeUnit.MILLISECONDS.sleep(conf.getZKSessionTimeoutMilliseconds()); + } catch (InterruptedException e) { + logger.warn("Interrupted from sleep while creating dlm for stream {} : ", + streamName, e); + } + } + } + logger.info("Created dlm for stream {}.", streamName); + + List<LogSegmentMetadata> segments = null; + while (null == segments) { + try { + segments = dlm.getLogSegments(); + } catch (IOException ioe) { + logger.warn("Failed to get log segments for stream {} : ", streamName, ioe); + } + if (null == segments) { + try { + TimeUnit.MILLISECONDS.sleep(conf.getZKSessionTimeoutMilliseconds()); + } catch (InterruptedException e) { + logger.warn("Interrupted from sleep while geting log segments for stream {} : ", + streamName, e); + } + } + } + + final Counter readCounter = statsLogger.getCounter("reads"); + + logger.info("Reading from log segments : {}", segments); + + ZooKeeperClient zkc = ZooKeeperClientBuilder.newBuilder() + .uri(uri) + .name("benchmark-zkc") + .sessionTimeoutMs(conf.getZKSessionTimeoutMilliseconds()) + .zkAclId(null) + .build(); + BKDLConfig bkdlConfig; + try { + bkdlConfig = BKDLConfig.resolveDLConfig(zkc, uri); + } catch (IOException e) { + return; + } + + BookKeeper bk; + try { + bk = BookKeeperClientBuilder.newBuilder() + .name("benchmark-bkc") + .dlConfig(conf) + .zkServers(bkdlConfig.getBkZkServersForReader()) + .ledgersPath(bkdlConfig.getBkLedgersPath()) + .build() + .get(); + } catch (IOException e) { + return; + } + + final int readConcurrency = conf.getInt("ledger_read_concurrency", 1000); + boolean streamRead = conf.getBoolean("ledger_stream_read", true); + try { + for (LogSegmentMetadata segment : segments) { + Stopwatch stopwatch = Stopwatch.createStarted(); + long lid = segment.getLogSegmentId(); + LedgerHandle lh = bk.openLedgerNoRecovery( + lid, BookKeeper.DigestType.CRC32, conf.getBKDigestPW().getBytes(UTF_8)); + logger.info("It took {} ms to open log segment {}", + new Object[] { stopwatch.elapsed(TimeUnit.MILLISECONDS), (lh.getLastAddConfirmed() + 1), segment }); + stopwatch.reset().start(); + Runnable reader; + if (streamRead) { + reader = new LedgerStreamReader(lh, new BookkeeperInternalCallbacks.ReadEntryListener() { + @Override + public void onEntryComplete(int rc, LedgerHandle lh, LedgerEntry entry, Object ctx) { + readCounter.inc(); + } + }, readConcurrency); + } else { + reader = new LedgerStreamReader(lh, new BookkeeperInternalCallbacks.ReadEntryListener() { + @Override + public void onEntryComplete(int rc, LedgerHandle lh, LedgerEntry entry, Object ctx) { + readCounter.inc(); + } + }, readConcurrency); + } + reader.run(); + logger.info("It took {} ms to complete reading {} entries from log segment {}", + new Object[] { stopwatch.elapsed(TimeUnit.MILLISECONDS), (lh.getLastAddConfirmed() + 1), segment }); + } + } catch (Exception e) { + logger.error("Error on reading bk ", e); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/stream/LedgerStreamReader.java ---------------------------------------------------------------------- diff --git a/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/stream/LedgerStreamReader.java b/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/stream/LedgerStreamReader.java new file mode 100644 index 0000000..11c3482 --- /dev/null +++ b/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/stream/LedgerStreamReader.java @@ -0,0 +1,131 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog.benchmark.stream; + +import java.util.Enumeration; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicLong; +import org.apache.bookkeeper.client.AsyncCallback; +import org.apache.bookkeeper.client.BKException; +import org.apache.bookkeeper.client.LedgerEntry; +import org.apache.bookkeeper.client.LedgerHandle; +import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryListener; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Reading ledger in a streaming way. + */ +public class LedgerStreamReader implements Runnable { + + private static final Logger logger = LoggerFactory.getLogger(LedgerStreamReader.class); + + class PendingReadRequest implements AsyncCallback.ReadCallback { + + final long entryId; + boolean isDone = false; + int rc; + LedgerEntry entry = null; + + PendingReadRequest(long entryId) { + this.entryId = entryId; + } + + void read() { + lh.asyncReadEntries(entryId, entryId, this, null); + } + + void complete(ReadEntryListener listener) { + listener.onEntryComplete(rc, lh, entry, null); + } + + @Override + public void readComplete(int rc, LedgerHandle lh, Enumeration<LedgerEntry> enumeration, Object ctx) { + this.rc = rc; + if (BKException.Code.OK == rc && enumeration.hasMoreElements()) { + entry = enumeration.nextElement(); + } else { + entry = null; + } + isDone = true; + // construct a new read request + long nextEntry = nextReadEntry.getAndIncrement(); + if (nextEntry <= lac) { + PendingReadRequest nextRead = + new PendingReadRequest(nextEntry); + pendingReads.add(nextRead); + nextRead.read(); + } + triggerCallbacks(); + } + } + + private final LedgerHandle lh; + private final long lac; + private final ReadEntryListener readEntryListener; + private final int concurrency; + private final AtomicLong nextReadEntry = new AtomicLong(0); + private final CountDownLatch done = new CountDownLatch(1); + private final ConcurrentLinkedQueue<PendingReadRequest> pendingReads = + new ConcurrentLinkedQueue<PendingReadRequest>(); + + public LedgerStreamReader(LedgerHandle lh, + ReadEntryListener readEntryListener, + int concurrency) { + this.lh = lh; + this.lac = lh.getLastAddConfirmed(); + this.readEntryListener = readEntryListener; + this.concurrency = concurrency; + for (int i = 0; i < concurrency; i++) { + long entryId = nextReadEntry.getAndIncrement(); + if (entryId > lac) { + break; + } + PendingReadRequest request = new PendingReadRequest(entryId); + pendingReads.add(request); + request.read(); + } + if (pendingReads.isEmpty()) { + done.countDown(); + } + } + + synchronized void triggerCallbacks() { + PendingReadRequest request; + while ((request = pendingReads.peek()) != null) { + if (!request.isDone) { + break; + } + pendingReads.remove(); + request.complete(readEntryListener); + } + if (pendingReads.isEmpty()) { + done.countDown(); + } + } + + @Override + public void run() { + try { + done.await(); + } catch (InterruptedException e) { + logger.info("Interrupted on stream reading ledger {} : ", lh.getId(), e); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/stream/ReadMode.java ---------------------------------------------------------------------- diff --git a/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/stream/ReadMode.java b/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/stream/ReadMode.java new file mode 100644 index 0000000..ea5ed36 --- /dev/null +++ b/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/stream/ReadMode.java @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog.benchmark.stream; + +/** + * The read mode for streaming read benchmark. + */ +public enum ReadMode { + OLDEST, + LATEST, + REWIND, + POSITION +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/stream/StreamBenchmark.java ---------------------------------------------------------------------- diff --git a/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/stream/StreamBenchmark.java b/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/stream/StreamBenchmark.java new file mode 100644 index 0000000..d3083ca --- /dev/null +++ b/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/stream/StreamBenchmark.java @@ -0,0 +1,138 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog.benchmark.stream; + +import org.apache.distributedlog.DistributedLogConfiguration; +import org.apache.distributedlog.namespace.DistributedLogNamespace; +import org.apache.distributedlog.namespace.DistributedLogNamespaceBuilder; +import java.io.File; +import java.net.URI; +import org.apache.bookkeeper.stats.NullStatsProvider; +import org.apache.bookkeeper.stats.StatsLogger; +import org.apache.bookkeeper.stats.StatsProvider; +import org.apache.bookkeeper.util.ReflectionUtils; +import org.apache.commons.cli.BasicParser; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.HelpFormatter; +import org.apache.commons.cli.Options; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Benchmark Streams. + */ +public abstract class StreamBenchmark { + + private static final Logger logger = LoggerFactory.getLogger(StreamBenchmark.class); + + private static final String USAGE = "StreamBenchmark <benchmark-class> [options]"; + + protected final Options options = new Options(); + protected URI uri; + protected DistributedLogConfiguration conf; + protected StatsProvider statsProvider; + protected String streamName; + + protected StreamBenchmark() { + options.addOption("c", "conf", true, "Configuration File"); + options.addOption("u", "uri", true, "DistributedLog URI"); + options.addOption("p", "stats-provider", true, "Stats Provider"); + options.addOption("s", "stream", true, "Stream Name"); + options.addOption("h", "help", false, "Print usage."); + } + + protected Options getOptions() { + return options; + } + + protected void printUsage() { + HelpFormatter hf = new HelpFormatter(); + hf.printHelp(USAGE, options); + } + + protected void parseCommandLine(String[] args) + throws Exception { + BasicParser parser = new BasicParser(); + CommandLine cmdline = parser.parse(options, args); + if (cmdline.hasOption("h")) { + printUsage(); + System.exit(0); + } + if (cmdline.hasOption("u")) { + this.uri = URI.create(cmdline.getOptionValue("u")); + } else { + printUsage(); + System.exit(0); + } + this.conf = new DistributedLogConfiguration(); + if (cmdline.hasOption("c")) { + String configFile = cmdline.getOptionValue("c"); + this.conf.loadConf(new File(configFile).toURI().toURL()); + } + if (cmdline.hasOption("p")) { + statsProvider = ReflectionUtils.newInstance(cmdline.getOptionValue("p"), StatsProvider.class); + } else { + statsProvider = new NullStatsProvider(); + } + if (cmdline.hasOption("s")) { + this.streamName = cmdline.getOptionValue("s"); + } else { + printUsage(); + System.exit(0); + } + parseCommandLine(cmdline); + } + + protected abstract void parseCommandLine(CommandLine cmdline); + + protected void run(String[] args) throws Exception { + logger.info("Parsing arguments for benchmark : {}", args); + // parse command line + parseCommandLine(args); + statsProvider.start(conf); + // run the benchmark + StatsLogger statsLogger = statsProvider.getStatsLogger("dl"); + DistributedLogNamespace namespace = + DistributedLogNamespaceBuilder.newBuilder() + .conf(conf) + .uri(uri) + .statsLogger(statsLogger) + .build(); + try { + benchmark(namespace, streamName, statsProvider.getStatsLogger("benchmark")); + } finally { + namespace.close(); + statsProvider.stop(); + } + } + + protected abstract void benchmark(DistributedLogNamespace namespace, + String logName, + StatsLogger statsLogger); + + public static void main(String[] args) throws Exception { + if (args.length <= 0) { + System.err.println(USAGE); + return; + } + String benchmarkClassName = args[0]; + StreamBenchmark benchmark = ReflectionUtils.newInstance( + benchmarkClassName, StreamBenchmark.class); + benchmark.run(args); + } +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/stream/SyncReaderBenchmark.java ---------------------------------------------------------------------- diff --git a/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/stream/SyncReaderBenchmark.java b/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/stream/SyncReaderBenchmark.java new file mode 100644 index 0000000..4abb317 --- /dev/null +++ b/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/stream/SyncReaderBenchmark.java @@ -0,0 +1,164 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog.benchmark.stream; + +import com.google.common.base.Stopwatch; +import org.apache.distributedlog.DistributedLogManager; +import org.apache.distributedlog.LogReader; +import org.apache.distributedlog.LogRecord; +import org.apache.distributedlog.namespace.DistributedLogNamespace; +import java.io.IOException; +import java.util.concurrent.TimeUnit; +import org.apache.bookkeeper.stats.Counter; +import org.apache.bookkeeper.stats.OpStatsLogger; +import org.apache.bookkeeper.stats.StatsLogger; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Benchmark on {@link org.apache.distributedlog.LogReader} reading from a stream. + */ +public class SyncReaderBenchmark extends AbstractReaderBenchmark { + + private static final Logger logger = LoggerFactory.getLogger(SyncReaderBenchmark.class); + + public SyncReaderBenchmark() {} + + @Override + protected void benchmark(DistributedLogNamespace namespace, String streamName, StatsLogger statsLogger) { + DistributedLogManager dlm = null; + while (null == dlm) { + try { + dlm = namespace.openLog(streamName); + } catch (IOException ioe) { + logger.warn("Failed to create dlm for stream {} : ", streamName, ioe); + } + if (null == dlm) { + try { + TimeUnit.MILLISECONDS.sleep(conf.getZKSessionTimeoutMilliseconds()); + } catch (InterruptedException e) { + logger.warn("Interrupted from sleep while creating dlm for stream {} : ", + streamName, e); + } + } + } + OpStatsLogger openReaderStats = statsLogger.getOpStatsLogger("open_reader"); + OpStatsLogger nonBlockingReadStats = statsLogger.getOpStatsLogger("non_blocking_read"); + OpStatsLogger blockingReadStats = statsLogger.getOpStatsLogger("blocking_read"); + Counter nullReadCounter = statsLogger.getCounter("null_read"); + + logger.info("Created dlm for stream {}.", streamName); + LogReader reader = null; + Long lastTxId = null; + while (null == reader) { + // initialize the last txid + if (null == lastTxId) { + switch (readMode) { + case OLDEST: + lastTxId = 0L; + break; + case LATEST: + try { + lastTxId = dlm.getLastTxId(); + } catch (IOException ioe) { + continue; + } + break; + case REWIND: + lastTxId = System.currentTimeMillis() - rewindMs; + break; + case POSITION: + lastTxId = fromTxId; + break; + default: + logger.warn("Unsupported mode {}", readMode); + printUsage(); + System.exit(0); + break; + } + logger.info("Reading from transaction id {}", lastTxId); + } + // Open the reader + Stopwatch stopwatch = Stopwatch.createStarted(); + try { + reader = dlm.getInputStream(lastTxId); + long elapsedMs = stopwatch.elapsed(TimeUnit.MICROSECONDS); + openReaderStats.registerSuccessfulEvent(elapsedMs); + logger.info("It took {} ms to position the reader to transaction id {}", lastTxId); + } catch (IOException ioe) { + openReaderStats.registerFailedEvent(stopwatch.elapsed(TimeUnit.MICROSECONDS)); + logger.warn("Failed to create reader for stream {} reading from {}.", streamName, lastTxId); + } + if (null == reader) { + try { + TimeUnit.MILLISECONDS.sleep(conf.getZKSessionTimeoutMilliseconds()); + } catch (InterruptedException e) { + logger.warn("Interrupted from sleep after reader was reassigned null for stream {} : ", + streamName, e); + } + continue; + } + + // read loop + + LogRecord record; + boolean nonBlocking = false; + stopwatch = Stopwatch.createUnstarted(); + long numCatchupReads = 0L; + long numCatchupBytes = 0L; + Stopwatch catchupStopwatch = Stopwatch.createStarted(); + while (true) { + try { + stopwatch.start(); + record = reader.readNext(nonBlocking); + if (null != record) { + long elapsedMicros = stopwatch.stop().elapsed(TimeUnit.MICROSECONDS); + if (nonBlocking) { + nonBlockingReadStats.registerSuccessfulEvent(elapsedMicros); + } else { + numCatchupBytes += record.getPayload().length; + ++numCatchupReads; + blockingReadStats.registerSuccessfulEvent(elapsedMicros); + } + lastTxId = record.getTransactionId(); + } else { + nullReadCounter.inc(); + } + if (null == record && !nonBlocking) { + nonBlocking = true; + catchupStopwatch.stop(); + logger.info("Catchup {} records (total {} bytes) in {} milliseconds", + new Object[] { numCatchupReads, numCatchupBytes, + stopwatch.elapsed(TimeUnit.MILLISECONDS) }); + } + stopwatch.reset(); + } catch (IOException e) { + logger.warn("Encountered reading record from stream {} : ", streamName, e); + reader = null; + break; + } + } + try { + TimeUnit.MILLISECONDS.sleep(conf.getZKSessionTimeoutMilliseconds()); + } catch (InterruptedException e) { + logger.warn("Interrupted from sleep while creating reader for stream {} : ", + streamName, e); + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/stream/package-info.java ---------------------------------------------------------------------- diff --git a/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/stream/package-info.java b/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/stream/package-info.java new file mode 100644 index 0000000..b95a40f --- /dev/null +++ b/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/stream/package-info.java @@ -0,0 +1,21 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Stream level benchmarks. + */ +package org.apache.distributedlog.benchmark.stream; http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/utils/ShiftableRateLimiter.java ---------------------------------------------------------------------- diff --git a/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/utils/ShiftableRateLimiter.java b/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/utils/ShiftableRateLimiter.java new file mode 100644 index 0000000..03c561c --- /dev/null +++ b/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/utils/ShiftableRateLimiter.java @@ -0,0 +1,71 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog.benchmark.utils; + +import com.google.common.util.concurrent.RateLimiter; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +/** + * A wrapper over rate limiter. + */ +public class ShiftableRateLimiter implements Runnable { + + private final RateLimiter rateLimiter; + private final ScheduledExecutorService executor; + private final double initialRate, maxRate, changeRate; + private final long changeInterval; + private final TimeUnit changeIntervalUnit; + private double nextRate; + + public ShiftableRateLimiter(double initialRate, + double maxRate, + double changeRate, + long changeInterval, + TimeUnit changeIntervalUnit) { + this.initialRate = initialRate; + this.maxRate = maxRate; + this.changeRate = changeRate; + this.nextRate = initialRate; + this.changeInterval = changeInterval; + this.changeIntervalUnit = changeIntervalUnit; + this.rateLimiter = RateLimiter.create(initialRate); + this.executor = Executors.newSingleThreadScheduledExecutor(); + this.executor.scheduleAtFixedRate(this, changeInterval, changeInterval, changeIntervalUnit); + } + + public ShiftableRateLimiter duplicate() { + return new ShiftableRateLimiter( + initialRate, + maxRate, + changeRate, + changeInterval, + changeIntervalUnit); + } + + @Override + public void run() { + this.nextRate = Math.min(nextRate + changeRate, maxRate); + this.rateLimiter.setRate(nextRate); + } + + public RateLimiter getLimiter() { + return this.rateLimiter; + } +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/utils/package-info.java ---------------------------------------------------------------------- diff --git a/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/utils/package-info.java b/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/utils/package-info.java new file mode 100644 index 0000000..c650bab --- /dev/null +++ b/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/utils/package-info.java @@ -0,0 +1,21 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Utils for benchmarking. + */ +package org.apache.distributedlog.benchmark.utils; http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-benchmark/src/main/resources/findbugsExclude.xml ---------------------------------------------------------------------- diff --git a/distributedlog-benchmark/src/main/resources/findbugsExclude.xml b/distributedlog-benchmark/src/main/resources/findbugsExclude.xml index b7a1ecb..0ab2b6b 100644 --- a/distributedlog-benchmark/src/main/resources/findbugsExclude.xml +++ b/distributedlog-benchmark/src/main/resources/findbugsExclude.xml @@ -18,6 +18,6 @@ <FindBugsFilter> <Match> <!-- generated code, we can't be held responsible for findbugs in it //--> - <Class name="~com\.twitter\.distributedlog\.benchmark\.thrift.*" /> + <Class name="~org\.apache\.distributedlog\.benchmark\.thrift.*" /> </Match> </FindBugsFilter> http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-benchmark/src/main/thrift/loadtest.thrift ---------------------------------------------------------------------- diff --git a/distributedlog-benchmark/src/main/thrift/loadtest.thrift b/distributedlog-benchmark/src/main/thrift/loadtest.thrift index 6d98cec..48c5d5a 100644 --- a/distributedlog-benchmark/src/main/thrift/loadtest.thrift +++ b/distributedlog-benchmark/src/main/thrift/loadtest.thrift @@ -16,7 +16,7 @@ * limitations under the License. */ -namespace java com.twitter.distributedlog.benchmark.thrift +namespace java org.apache.distributedlog.benchmark.thrift struct Message { 1: i64 publishTime; http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-client/pom.xml ---------------------------------------------------------------------- diff --git a/distributedlog-client/pom.xml b/distributedlog-client/pom.xml index aad5093..f09caf1 100644 --- a/distributedlog-client/pom.xml +++ b/distributedlog-client/pom.xml @@ -137,7 +137,7 @@ <properties> <property> <name>listener</name> - <value>com.twitter.distributedlog.TimedOutTestsListener</value> + <value>org.apache.distributedlog.TimedOutTestsListener</value> </property> </properties> </configuration> http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-client/src/main/java/com/twitter/distributedlog/client/ClientConfig.java ---------------------------------------------------------------------- diff --git a/distributedlog-client/src/main/java/com/twitter/distributedlog/client/ClientConfig.java b/distributedlog-client/src/main/java/com/twitter/distributedlog/client/ClientConfig.java deleted file mode 100644 index de74f5a..0000000 --- a/distributedlog-client/src/main/java/com/twitter/distributedlog/client/ClientConfig.java +++ /dev/null @@ -1,187 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.client; - -import static com.google.common.base.Preconditions.checkNotNull; - -import java.util.concurrent.TimeUnit; - -/** - * Client Config. - */ -public class ClientConfig { - int redirectBackoffStartMs = 25; - int redirectBackoffMaxMs = 100; - int maxRedirects = -1; - int requestTimeoutMs = -1; - boolean thriftmux = false; - boolean streamFailfast = false; - String streamNameRegex = ".*"; - boolean handshakeWithClientInfo = true; - long periodicHandshakeIntervalMs = TimeUnit.MINUTES.toMillis(5); - long periodicOwnershipSyncIntervalMs = TimeUnit.MINUTES.toMillis(5); - boolean periodicDumpOwnershipCacheEnabled = false; - long periodicDumpOwnershipCacheIntervalMs = TimeUnit.MINUTES.toMillis(10); - boolean enableHandshakeTracing = false; - boolean enableChecksum = true; - - public ClientConfig setMaxRedirects(int maxRedirects) { - this.maxRedirects = maxRedirects; - return this; - } - - public int getMaxRedirects() { - return this.maxRedirects; - } - - public ClientConfig setRequestTimeoutMs(int timeoutInMillis) { - this.requestTimeoutMs = timeoutInMillis; - return this; - } - - public int getRequestTimeoutMs() { - return this.requestTimeoutMs; - } - - public ClientConfig setRedirectBackoffStartMs(int ms) { - this.redirectBackoffStartMs = ms; - return this; - } - - public int getRedirectBackoffStartMs() { - return this.redirectBackoffStartMs; - } - - public ClientConfig setRedirectBackoffMaxMs(int ms) { - this.redirectBackoffMaxMs = ms; - return this; - } - - public int getRedirectBackoffMaxMs() { - return this.redirectBackoffMaxMs; - } - - public ClientConfig setThriftMux(boolean enabled) { - this.thriftmux = enabled; - return this; - } - - public boolean getThriftMux() { - return this.thriftmux; - } - - public ClientConfig setStreamFailfast(boolean enabled) { - this.streamFailfast = enabled; - return this; - } - - public boolean getStreamFailfast() { - return this.streamFailfast; - } - - public ClientConfig setStreamNameRegex(String nameRegex) { - checkNotNull(nameRegex); - this.streamNameRegex = nameRegex; - return this; - } - - public String getStreamNameRegex() { - return this.streamNameRegex; - } - - public ClientConfig setHandshakeWithClientInfo(boolean enabled) { - this.handshakeWithClientInfo = enabled; - return this; - } - - public boolean getHandshakeWithClientInfo() { - return this.handshakeWithClientInfo; - } - - public ClientConfig setPeriodicHandshakeIntervalMs(long intervalMs) { - this.periodicHandshakeIntervalMs = intervalMs; - return this; - } - - public long getPeriodicHandshakeIntervalMs() { - return this.periodicHandshakeIntervalMs; - } - - public ClientConfig setPeriodicOwnershipSyncIntervalMs(long intervalMs) { - this.periodicOwnershipSyncIntervalMs = intervalMs; - return this; - } - - public long getPeriodicOwnershipSyncIntervalMs() { - return this.periodicOwnershipSyncIntervalMs; - } - - public ClientConfig setPeriodicDumpOwnershipCacheEnabled(boolean enabled) { - this.periodicDumpOwnershipCacheEnabled = enabled; - return this; - } - - public boolean isPeriodicDumpOwnershipCacheEnabled() { - return this.periodicDumpOwnershipCacheEnabled; - } - - public ClientConfig setPeriodicDumpOwnershipCacheIntervalMs(long intervalMs) { - this.periodicDumpOwnershipCacheIntervalMs = intervalMs; - return this; - } - - public long getPeriodicDumpOwnershipCacheIntervalMs() { - return this.periodicDumpOwnershipCacheIntervalMs; - } - - public ClientConfig setHandshakeTracingEnabled(boolean enabled) { - this.enableHandshakeTracing = enabled; - return this; - } - - public boolean isHandshakeTracingEnabled() { - return this.enableHandshakeTracing; - } - - public ClientConfig setChecksumEnabled(boolean enabled) { - this.enableChecksum = enabled; - return this; - } - - public boolean isChecksumEnabled() { - return this.enableChecksum; - } - - public static ClientConfig newConfig(ClientConfig config) { - ClientConfig newConfig = new ClientConfig(); - newConfig.setMaxRedirects(config.getMaxRedirects()) - .setRequestTimeoutMs(config.getRequestTimeoutMs()) - .setRedirectBackoffStartMs(config.getRedirectBackoffStartMs()) - .setRedirectBackoffMaxMs(config.getRedirectBackoffMaxMs()) - .setThriftMux(config.getThriftMux()) - .setStreamFailfast(config.getStreamFailfast()) - .setStreamNameRegex(config.getStreamNameRegex()) - .setHandshakeWithClientInfo(config.getHandshakeWithClientInfo()) - .setPeriodicHandshakeIntervalMs(config.getPeriodicHandshakeIntervalMs()) - .setPeriodicDumpOwnershipCacheEnabled(config.isPeriodicDumpOwnershipCacheEnabled()) - .setPeriodicDumpOwnershipCacheIntervalMs(config.getPeriodicDumpOwnershipCacheIntervalMs()) - .setHandshakeTracingEnabled(config.isHandshakeTracingEnabled()) - .setChecksumEnabled(config.isChecksumEnabled()); - return newConfig; - } -}