http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/stream/LedgerReadBenchmark.java ---------------------------------------------------------------------- diff --git a/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/stream/LedgerReadBenchmark.java b/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/stream/LedgerReadBenchmark.java deleted file mode 100644 index 072c3ef..0000000 --- a/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/stream/LedgerReadBenchmark.java +++ /dev/null @@ -1,151 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.benchmark.stream; - -import static com.google.common.base.Charsets.UTF_8; - -import com.google.common.base.Stopwatch; -import com.twitter.distributedlog.BookKeeperClientBuilder; -import com.twitter.distributedlog.DistributedLogManager; -import com.twitter.distributedlog.LogSegmentMetadata; -import com.twitter.distributedlog.ZooKeeperClient; -import com.twitter.distributedlog.ZooKeeperClientBuilder; -import com.twitter.distributedlog.impl.metadata.BKDLConfig; -import com.twitter.distributedlog.namespace.DistributedLogNamespace; -import java.io.IOException; -import java.util.List; -import java.util.concurrent.TimeUnit; -import org.apache.bookkeeper.client.BookKeeper; -import org.apache.bookkeeper.client.LedgerEntry; -import org.apache.bookkeeper.client.LedgerHandle; -import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks; -import org.apache.bookkeeper.stats.Counter; -import org.apache.bookkeeper.stats.StatsLogger; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Benchmark ledger reading. - */ -public class LedgerReadBenchmark extends AbstractReaderBenchmark { - - private static final Logger logger = LoggerFactory.getLogger(AsyncReaderBenchmark.class); - - @Override - protected void benchmark(DistributedLogNamespace namespace, String logName, StatsLogger statsLogger) { - DistributedLogManager dlm = null; - while (null == dlm) { - try { - dlm = namespace.openLog(streamName); - } catch (IOException ioe) { - logger.warn("Failed to create dlm for stream {} : ", streamName, ioe); - } - if (null == dlm) { - try { - TimeUnit.MILLISECONDS.sleep(conf.getZKSessionTimeoutMilliseconds()); - } catch (InterruptedException e) { - logger.warn("Interrupted from sleep while creating dlm for stream {} : ", - streamName, e); - } - } - } - logger.info("Created dlm for stream {}.", streamName); - - List<LogSegmentMetadata> segments = null; - while (null == segments) { - try { - segments = dlm.getLogSegments(); - } catch (IOException ioe) { - logger.warn("Failed to get log segments for stream {} : ", streamName, ioe); - } - if (null == segments) { - try { - TimeUnit.MILLISECONDS.sleep(conf.getZKSessionTimeoutMilliseconds()); - } catch (InterruptedException e) { - logger.warn("Interrupted from sleep while geting log segments for stream {} : ", - streamName, e); - } - } - } - - final Counter readCounter = statsLogger.getCounter("reads"); - - logger.info("Reading from log segments : {}", segments); - - ZooKeeperClient zkc = ZooKeeperClientBuilder.newBuilder() - .uri(uri) - .name("benchmark-zkc") - .sessionTimeoutMs(conf.getZKSessionTimeoutMilliseconds()) - .zkAclId(null) - .build(); - BKDLConfig bkdlConfig; - try { - bkdlConfig = BKDLConfig.resolveDLConfig(zkc, uri); - } catch (IOException e) { - return; - } - - BookKeeper bk; - try { - bk = BookKeeperClientBuilder.newBuilder() - .name("benchmark-bkc") - .dlConfig(conf) - .zkServers(bkdlConfig.getBkZkServersForReader()) - .ledgersPath(bkdlConfig.getBkLedgersPath()) - .build() - .get(); - } catch (IOException e) { - return; - } - - final int readConcurrency = conf.getInt("ledger_read_concurrency", 1000); - boolean streamRead = conf.getBoolean("ledger_stream_read", true); - try { - for (LogSegmentMetadata segment : segments) { - Stopwatch stopwatch = Stopwatch.createStarted(); - long lid = segment.getLogSegmentId(); - LedgerHandle lh = bk.openLedgerNoRecovery( - lid, BookKeeper.DigestType.CRC32, conf.getBKDigestPW().getBytes(UTF_8)); - logger.info("It took {} ms to open log segment {}", - new Object[] { stopwatch.elapsed(TimeUnit.MILLISECONDS), (lh.getLastAddConfirmed() + 1), segment }); - stopwatch.reset().start(); - Runnable reader; - if (streamRead) { - reader = new LedgerStreamReader(lh, new BookkeeperInternalCallbacks.ReadEntryListener() { - @Override - public void onEntryComplete(int rc, LedgerHandle lh, LedgerEntry entry, Object ctx) { - readCounter.inc(); - } - }, readConcurrency); - } else { - reader = new LedgerStreamReader(lh, new BookkeeperInternalCallbacks.ReadEntryListener() { - @Override - public void onEntryComplete(int rc, LedgerHandle lh, LedgerEntry entry, Object ctx) { - readCounter.inc(); - } - }, readConcurrency); - } - reader.run(); - logger.info("It took {} ms to complete reading {} entries from log segment {}", - new Object[] { stopwatch.elapsed(TimeUnit.MILLISECONDS), (lh.getLastAddConfirmed() + 1), segment }); - } - } catch (Exception e) { - logger.error("Error on reading bk ", e); - } - } -}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/stream/LedgerStreamReader.java ---------------------------------------------------------------------- diff --git a/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/stream/LedgerStreamReader.java b/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/stream/LedgerStreamReader.java deleted file mode 100644 index e542af7..0000000 --- a/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/stream/LedgerStreamReader.java +++ /dev/null @@ -1,131 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.benchmark.stream; - -import java.util.Enumeration; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.atomic.AtomicLong; -import org.apache.bookkeeper.client.AsyncCallback; -import org.apache.bookkeeper.client.BKException; -import org.apache.bookkeeper.client.LedgerEntry; -import org.apache.bookkeeper.client.LedgerHandle; -import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryListener; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Reading ledger in a streaming way. - */ -public class LedgerStreamReader implements Runnable { - - private static final Logger logger = LoggerFactory.getLogger(LedgerStreamReader.class); - - class PendingReadRequest implements AsyncCallback.ReadCallback { - - final long entryId; - boolean isDone = false; - int rc; - LedgerEntry entry = null; - - PendingReadRequest(long entryId) { - this.entryId = entryId; - } - - void read() { - lh.asyncReadEntries(entryId, entryId, this, null); - } - - void complete(ReadEntryListener listener) { - listener.onEntryComplete(rc, lh, entry, null); - } - - @Override - public void readComplete(int rc, LedgerHandle lh, Enumeration<LedgerEntry> enumeration, Object ctx) { - this.rc = rc; - if (BKException.Code.OK == rc && enumeration.hasMoreElements()) { - entry = enumeration.nextElement(); - } else { - entry = null; - } - isDone = true; - // construct a new read request - long nextEntry = nextReadEntry.getAndIncrement(); - if (nextEntry <= lac) { - PendingReadRequest nextRead = - new PendingReadRequest(nextEntry); - pendingReads.add(nextRead); - nextRead.read(); - } - triggerCallbacks(); - } - } - - private final LedgerHandle lh; - private final long lac; - private final ReadEntryListener readEntryListener; - private final int concurrency; - private final AtomicLong nextReadEntry = new AtomicLong(0); - private final CountDownLatch done = new CountDownLatch(1); - private final ConcurrentLinkedQueue<PendingReadRequest> pendingReads = - new ConcurrentLinkedQueue<PendingReadRequest>(); - - public LedgerStreamReader(LedgerHandle lh, - ReadEntryListener readEntryListener, - int concurrency) { - this.lh = lh; - this.lac = lh.getLastAddConfirmed(); - this.readEntryListener = readEntryListener; - this.concurrency = concurrency; - for (int i = 0; i < concurrency; i++) { - long entryId = nextReadEntry.getAndIncrement(); - if (entryId > lac) { - break; - } - PendingReadRequest request = new PendingReadRequest(entryId); - pendingReads.add(request); - request.read(); - } - if (pendingReads.isEmpty()) { - done.countDown(); - } - } - - synchronized void triggerCallbacks() { - PendingReadRequest request; - while ((request = pendingReads.peek()) != null) { - if (!request.isDone) { - break; - } - pendingReads.remove(); - request.complete(readEntryListener); - } - if (pendingReads.isEmpty()) { - done.countDown(); - } - } - - @Override - public void run() { - try { - done.await(); - } catch (InterruptedException e) { - logger.info("Interrupted on stream reading ledger {} : ", lh.getId(), e); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/stream/ReadMode.java ---------------------------------------------------------------------- diff --git a/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/stream/ReadMode.java b/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/stream/ReadMode.java deleted file mode 100644 index 280c9db..0000000 --- a/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/stream/ReadMode.java +++ /dev/null @@ -1,28 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.benchmark.stream; - -/** - * The read mode for streaming read benchmark. - */ -public enum ReadMode { - OLDEST, - LATEST, - REWIND, - POSITION -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/stream/StreamBenchmark.java ---------------------------------------------------------------------- diff --git a/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/stream/StreamBenchmark.java b/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/stream/StreamBenchmark.java deleted file mode 100644 index 1eff65a..0000000 --- a/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/stream/StreamBenchmark.java +++ /dev/null @@ -1,138 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.benchmark.stream; - -import com.twitter.distributedlog.DistributedLogConfiguration; -import com.twitter.distributedlog.namespace.DistributedLogNamespace; -import com.twitter.distributedlog.namespace.DistributedLogNamespaceBuilder; -import java.io.File; -import java.net.URI; -import org.apache.bookkeeper.stats.NullStatsProvider; -import org.apache.bookkeeper.stats.StatsLogger; -import org.apache.bookkeeper.stats.StatsProvider; -import org.apache.bookkeeper.util.ReflectionUtils; -import org.apache.commons.cli.BasicParser; -import org.apache.commons.cli.CommandLine; -import org.apache.commons.cli.HelpFormatter; -import org.apache.commons.cli.Options; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Benchmark Streams. - */ -public abstract class StreamBenchmark { - - private static final Logger logger = LoggerFactory.getLogger(StreamBenchmark.class); - - private static final String USAGE = "StreamBenchmark <benchmark-class> [options]"; - - protected final Options options = new Options(); - protected URI uri; - protected DistributedLogConfiguration conf; - protected StatsProvider statsProvider; - protected String streamName; - - protected StreamBenchmark() { - options.addOption("c", "conf", true, "Configuration File"); - options.addOption("u", "uri", true, "DistributedLog URI"); - options.addOption("p", "stats-provider", true, "Stats Provider"); - options.addOption("s", "stream", true, "Stream Name"); - options.addOption("h", "help", false, "Print usage."); - } - - protected Options getOptions() { - return options; - } - - protected void printUsage() { - HelpFormatter hf = new HelpFormatter(); - hf.printHelp(USAGE, options); - } - - protected void parseCommandLine(String[] args) - throws Exception { - BasicParser parser = new BasicParser(); - CommandLine cmdline = parser.parse(options, args); - if (cmdline.hasOption("h")) { - printUsage(); - System.exit(0); - } - if (cmdline.hasOption("u")) { - this.uri = URI.create(cmdline.getOptionValue("u")); - } else { - printUsage(); - System.exit(0); - } - this.conf = new DistributedLogConfiguration(); - if (cmdline.hasOption("c")) { - String configFile = cmdline.getOptionValue("c"); - this.conf.loadConf(new File(configFile).toURI().toURL()); - } - if (cmdline.hasOption("p")) { - statsProvider = ReflectionUtils.newInstance(cmdline.getOptionValue("p"), StatsProvider.class); - } else { - statsProvider = new NullStatsProvider(); - } - if (cmdline.hasOption("s")) { - this.streamName = cmdline.getOptionValue("s"); - } else { - printUsage(); - System.exit(0); - } - parseCommandLine(cmdline); - } - - protected abstract void parseCommandLine(CommandLine cmdline); - - protected void run(String[] args) throws Exception { - logger.info("Parsing arguments for benchmark : {}", args); - // parse command line - parseCommandLine(args); - statsProvider.start(conf); - // run the benchmark - StatsLogger statsLogger = statsProvider.getStatsLogger("dl"); - DistributedLogNamespace namespace = - DistributedLogNamespaceBuilder.newBuilder() - .conf(conf) - .uri(uri) - .statsLogger(statsLogger) - .build(); - try { - benchmark(namespace, streamName, statsProvider.getStatsLogger("benchmark")); - } finally { - namespace.close(); - statsProvider.stop(); - } - } - - protected abstract void benchmark(DistributedLogNamespace namespace, - String logName, - StatsLogger statsLogger); - - public static void main(String[] args) throws Exception { - if (args.length <= 0) { - System.err.println(USAGE); - return; - } - String benchmarkClassName = args[0]; - StreamBenchmark benchmark = ReflectionUtils.newInstance( - benchmarkClassName, StreamBenchmark.class); - benchmark.run(args); - } -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/stream/SyncReaderBenchmark.java ---------------------------------------------------------------------- diff --git a/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/stream/SyncReaderBenchmark.java b/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/stream/SyncReaderBenchmark.java deleted file mode 100644 index 122c8ef..0000000 --- a/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/stream/SyncReaderBenchmark.java +++ /dev/null @@ -1,164 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.benchmark.stream; - -import com.google.common.base.Stopwatch; -import com.twitter.distributedlog.DistributedLogManager; -import com.twitter.distributedlog.LogReader; -import com.twitter.distributedlog.LogRecord; -import com.twitter.distributedlog.namespace.DistributedLogNamespace; -import java.io.IOException; -import java.util.concurrent.TimeUnit; -import org.apache.bookkeeper.stats.Counter; -import org.apache.bookkeeper.stats.OpStatsLogger; -import org.apache.bookkeeper.stats.StatsLogger; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Benchmark on {@link com.twitter.distributedlog.LogReader} reading from a stream. - */ -public class SyncReaderBenchmark extends AbstractReaderBenchmark { - - private static final Logger logger = LoggerFactory.getLogger(SyncReaderBenchmark.class); - - public SyncReaderBenchmark() {} - - @Override - protected void benchmark(DistributedLogNamespace namespace, String streamName, StatsLogger statsLogger) { - DistributedLogManager dlm = null; - while (null == dlm) { - try { - dlm = namespace.openLog(streamName); - } catch (IOException ioe) { - logger.warn("Failed to create dlm for stream {} : ", streamName, ioe); - } - if (null == dlm) { - try { - TimeUnit.MILLISECONDS.sleep(conf.getZKSessionTimeoutMilliseconds()); - } catch (InterruptedException e) { - logger.warn("Interrupted from sleep while creating dlm for stream {} : ", - streamName, e); - } - } - } - OpStatsLogger openReaderStats = statsLogger.getOpStatsLogger("open_reader"); - OpStatsLogger nonBlockingReadStats = statsLogger.getOpStatsLogger("non_blocking_read"); - OpStatsLogger blockingReadStats = statsLogger.getOpStatsLogger("blocking_read"); - Counter nullReadCounter = statsLogger.getCounter("null_read"); - - logger.info("Created dlm for stream {}.", streamName); - LogReader reader = null; - Long lastTxId = null; - while (null == reader) { - // initialize the last txid - if (null == lastTxId) { - switch (readMode) { - case OLDEST: - lastTxId = 0L; - break; - case LATEST: - try { - lastTxId = dlm.getLastTxId(); - } catch (IOException ioe) { - continue; - } - break; - case REWIND: - lastTxId = System.currentTimeMillis() - rewindMs; - break; - case POSITION: - lastTxId = fromTxId; - break; - default: - logger.warn("Unsupported mode {}", readMode); - printUsage(); - System.exit(0); - break; - } - logger.info("Reading from transaction id {}", lastTxId); - } - // Open the reader - Stopwatch stopwatch = Stopwatch.createStarted(); - try { - reader = dlm.getInputStream(lastTxId); - long elapsedMs = stopwatch.elapsed(TimeUnit.MICROSECONDS); - openReaderStats.registerSuccessfulEvent(elapsedMs); - logger.info("It took {} ms to position the reader to transaction id {}", lastTxId); - } catch (IOException ioe) { - openReaderStats.registerFailedEvent(stopwatch.elapsed(TimeUnit.MICROSECONDS)); - logger.warn("Failed to create reader for stream {} reading from {}.", streamName, lastTxId); - } - if (null == reader) { - try { - TimeUnit.MILLISECONDS.sleep(conf.getZKSessionTimeoutMilliseconds()); - } catch (InterruptedException e) { - logger.warn("Interrupted from sleep after reader was reassigned null for stream {} : ", - streamName, e); - } - continue; - } - - // read loop - - LogRecord record; - boolean nonBlocking = false; - stopwatch = Stopwatch.createUnstarted(); - long numCatchupReads = 0L; - long numCatchupBytes = 0L; - Stopwatch catchupStopwatch = Stopwatch.createStarted(); - while (true) { - try { - stopwatch.start(); - record = reader.readNext(nonBlocking); - if (null != record) { - long elapsedMicros = stopwatch.stop().elapsed(TimeUnit.MICROSECONDS); - if (nonBlocking) { - nonBlockingReadStats.registerSuccessfulEvent(elapsedMicros); - } else { - numCatchupBytes += record.getPayload().length; - ++numCatchupReads; - blockingReadStats.registerSuccessfulEvent(elapsedMicros); - } - lastTxId = record.getTransactionId(); - } else { - nullReadCounter.inc(); - } - if (null == record && !nonBlocking) { - nonBlocking = true; - catchupStopwatch.stop(); - logger.info("Catchup {} records (total {} bytes) in {} milliseconds", - new Object[] { numCatchupReads, numCatchupBytes, - stopwatch.elapsed(TimeUnit.MILLISECONDS) }); - } - stopwatch.reset(); - } catch (IOException e) { - logger.warn("Encountered reading record from stream {} : ", streamName, e); - reader = null; - break; - } - } - try { - TimeUnit.MILLISECONDS.sleep(conf.getZKSessionTimeoutMilliseconds()); - } catch (InterruptedException e) { - logger.warn("Interrupted from sleep while creating reader for stream {} : ", - streamName, e); - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/stream/package-info.java ---------------------------------------------------------------------- diff --git a/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/stream/package-info.java b/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/stream/package-info.java deleted file mode 100644 index d8e198c..0000000 --- a/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/stream/package-info.java +++ /dev/null @@ -1,21 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -/** - * Stream level benchmarks. - */ -package com.twitter.distributedlog.benchmark.stream; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/utils/ShiftableRateLimiter.java ---------------------------------------------------------------------- diff --git a/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/utils/ShiftableRateLimiter.java b/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/utils/ShiftableRateLimiter.java deleted file mode 100644 index def0346..0000000 --- a/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/utils/ShiftableRateLimiter.java +++ /dev/null @@ -1,71 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.twitter.distributedlog.benchmark.utils; - -import com.google.common.util.concurrent.RateLimiter; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; - -/** - * A wrapper over rate limiter. - */ -public class ShiftableRateLimiter implements Runnable { - - private final RateLimiter rateLimiter; - private final ScheduledExecutorService executor; - private final double initialRate, maxRate, changeRate; - private final long changeInterval; - private final TimeUnit changeIntervalUnit; - private double nextRate; - - public ShiftableRateLimiter(double initialRate, - double maxRate, - double changeRate, - long changeInterval, - TimeUnit changeIntervalUnit) { - this.initialRate = initialRate; - this.maxRate = maxRate; - this.changeRate = changeRate; - this.nextRate = initialRate; - this.changeInterval = changeInterval; - this.changeIntervalUnit = changeIntervalUnit; - this.rateLimiter = RateLimiter.create(initialRate); - this.executor = Executors.newSingleThreadScheduledExecutor(); - this.executor.scheduleAtFixedRate(this, changeInterval, changeInterval, changeIntervalUnit); - } - - public ShiftableRateLimiter duplicate() { - return new ShiftableRateLimiter( - initialRate, - maxRate, - changeRate, - changeInterval, - changeIntervalUnit); - } - - @Override - public void run() { - this.nextRate = Math.min(nextRate + changeRate, maxRate); - this.rateLimiter.setRate(nextRate); - } - - public RateLimiter getLimiter() { - return this.rateLimiter; - } -} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/utils/package-info.java ---------------------------------------------------------------------- diff --git a/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/utils/package-info.java b/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/utils/package-info.java deleted file mode 100644 index 369b979..0000000 --- a/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/utils/package-info.java +++ /dev/null @@ -1,21 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -/** - * Utils for benchmarking. - */ -package com.twitter.distributedlog.benchmark.utils; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/Benchmarker.java ---------------------------------------------------------------------- diff --git a/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/Benchmarker.java b/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/Benchmarker.java new file mode 100644 index 0000000..f724102 --- /dev/null +++ b/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/Benchmarker.java @@ -0,0 +1,468 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog.benchmark; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; + +import org.apache.distributedlog.DistributedLogConfiguration; +import org.apache.distributedlog.benchmark.utils.ShiftableRateLimiter; +import com.twitter.finagle.stats.OstrichStatsReceiver; +import com.twitter.finagle.stats.StatsReceiver; +import java.io.File; +import java.io.IOException; +import java.net.URI; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.TimeUnit; +import org.apache.bookkeeper.stats.NullStatsProvider; +import org.apache.bookkeeper.stats.StatsLogger; +import org.apache.bookkeeper.stats.StatsProvider; +import org.apache.bookkeeper.util.ReflectionUtils; +import org.apache.commons.cli.BasicParser; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.HelpFormatter; +import org.apache.commons.cli.Options; +import org.apache.commons.lang.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * The launcher for benchmarks. + */ +public class Benchmarker { + + private static final Logger logger = LoggerFactory.getLogger(Benchmarker.class); + + static final String USAGE = "Benchmarker [-u <uri>] [-c <conf>] [-s serverset] [-m (read|write|dlwrite)]"; + + final String[] args; + final Options options = new Options(); + + int rate = 100; + int maxRate = 1000; + int changeRate = 100; + int changeRateSeconds = 1800; + int concurrency = 10; + String streamPrefix = "dlog-loadtest"; + int shardId = -1; + int numStreams = 10; + List<String> serversetPaths = new ArrayList<String>(); + List<String> finagleNames = new ArrayList<String>(); + int msgSize = 256; + String mode = null; + int durationMins = 60; + URI dlUri = null; + int batchSize = 0; + int readersPerStream = 1; + Integer maxStreamId = null; + int truncationInterval = 3600; + Integer startStreamId = null; + Integer endStreamId = null; + int hostConnectionCoreSize = 10; + int hostConnectionLimit = 10; + boolean thriftmux = false; + boolean handshakeWithClientInfo = false; + boolean readFromHead = false; + int sendBufferSize = 1024 * 1024; + int recvBufferSize = 1024 * 1024; + boolean enableBatching = false; + int batchBufferSize = 256 * 1024; + int batchFlushIntervalMicros = 2000; + String routingServiceFinagleNameString; + + final DistributedLogConfiguration conf = new DistributedLogConfiguration(); + final StatsReceiver statsReceiver = new OstrichStatsReceiver(); + StatsProvider statsProvider = null; + + Benchmarker(String[] args) { + this.args = args; + // prepare options + options.addOption("s", "serverset", true, "Proxy Server Set (separated by ',')"); + options.addOption("fn", "finagle-name", true, "Write proxy finagle name (separated by ',')"); + options.addOption("c", "conf", true, "DistributedLog Configuration File"); + options.addOption("u", "uri", true, "DistributedLog URI"); + options.addOption("i", "shard", true, "Shard Id"); + options.addOption("p", "provider", true, "DistributedLog Stats Provider"); + options.addOption("d", "duration", true, "Duration (minutes)"); + options.addOption("sp", "streamprefix", true, "Stream Prefix"); + options.addOption("sc", "streamcount", true, "Number of Streams"); + options.addOption("ms", "messagesize", true, "Message Size (bytes)"); + options.addOption("bs", "batchsize", true, "Batch Size"); + options.addOption("r", "rate", true, "Rate limit (requests/second)"); + options.addOption("mr", "max-rate", true, "Maximum Rate limit (requests/second)"); + options.addOption("cr", "change-rate", true, "Rate to increase each change period (requests/second)"); + options.addOption("ci", "change-interval", true, "Rate to increase period, seconds"); + options.addOption("t", "concurrency", true, "Concurrency (number of threads)"); + options.addOption("m", "mode", true, "Benchmark mode (read/write)"); + options.addOption("rps", "readers-per-stream", true, "Number readers per stream"); + options.addOption("msid", "max-stream-id", true, "Max Stream ID"); + options.addOption("ti", "truncation-interval", true, "Truncation interval in seconds"); + options.addOption("ssid", "start-stream-id", true, "Start Stream ID"); + options.addOption("esid", "end-stream-id", true, "Start Stream ID"); + options.addOption("hccs", "host-connection-core-size", true, "Finagle hostConnectionCoreSize"); + options.addOption("hcl", "host-connection-limit", true, "Finagle hostConnectionLimit"); + options.addOption("mx", "thriftmux", false, "Enable thriftmux (write mode only)"); + options.addOption("hsci", "handshake-with-client-info", false, "Enable handshaking with client info"); + options.addOption("rfh", "read-from-head", false, "Read from head of the stream"); + options.addOption("sb", "send-buffer", true, "Channel send buffer size, in bytes"); + options.addOption("rb", "recv-buffer", true, "Channel recv buffer size, in bytes"); + options.addOption("bt", "enable-batch", false, "Enable batching on writers"); + options.addOption("bbs", "batch-buffer-size", true, "The batch buffer size in bytes"); + options.addOption("bfi", "batch-flush-interval", true, "The batch buffer flush interval in micros"); + options.addOption("rs", "routing-service", true, "The routing service finagle name for server-side routing"); + options.addOption("h", "help", false, "Print usage."); + } + + void printUsage() { + HelpFormatter helpFormatter = new HelpFormatter(); + helpFormatter.printHelp(USAGE, options); + } + + void run() throws Exception { + logger.info("Running benchmark."); + + BasicParser parser = new BasicParser(); + CommandLine cmdline = parser.parse(options, args); + if (cmdline.hasOption("h")) { + printUsage(); + System.exit(0); + } + if (cmdline.hasOption("s")) { + String serversetPathStr = cmdline.getOptionValue("s"); + serversetPaths = Arrays.asList(StringUtils.split(serversetPathStr, ',')); + } + if (cmdline.hasOption("fn")) { + String finagleNameStr = cmdline.getOptionValue("fn"); + finagleNames = Arrays.asList(StringUtils.split(finagleNameStr, ',')); + } + if (cmdline.hasOption("i")) { + shardId = Integer.parseInt(cmdline.getOptionValue("i")); + } + if (cmdline.hasOption("d")) { + durationMins = Integer.parseInt(cmdline.getOptionValue("d")); + } + if (cmdline.hasOption("sp")) { + streamPrefix = cmdline.getOptionValue("sp"); + } + if (cmdline.hasOption("sc")) { + numStreams = Integer.parseInt(cmdline.getOptionValue("sc")); + } + if (cmdline.hasOption("ms")) { + msgSize = Integer.parseInt(cmdline.getOptionValue("ms")); + } + if (cmdline.hasOption("r")) { + rate = Integer.parseInt(cmdline.getOptionValue("r")); + } + if (cmdline.hasOption("mr")) { + maxRate = Integer.parseInt(cmdline.getOptionValue("mr")); + } + if (cmdline.hasOption("cr")) { + changeRate = Integer.parseInt(cmdline.getOptionValue("cr")); + } + if (cmdline.hasOption("ci")) { + changeRateSeconds = Integer.parseInt(cmdline.getOptionValue("ci")); + } + if (cmdline.hasOption("t")) { + concurrency = Integer.parseInt(cmdline.getOptionValue("t")); + } + if (cmdline.hasOption("m")) { + mode = cmdline.getOptionValue("m"); + } + if (cmdline.hasOption("u")) { + dlUri = URI.create(cmdline.getOptionValue("u")); + } + if (cmdline.hasOption("bs")) { + batchSize = Integer.parseInt(cmdline.getOptionValue("bs")); + checkArgument("write" != mode, "batchSize supported only for mode=write"); + } + if (cmdline.hasOption("c")) { + String configFile = cmdline.getOptionValue("c"); + conf.loadConf(new File(configFile).toURI().toURL()); + } + if (cmdline.hasOption("rps")) { + readersPerStream = Integer.parseInt(cmdline.getOptionValue("rps")); + } + if (cmdline.hasOption("msid")) { + maxStreamId = Integer.parseInt(cmdline.getOptionValue("msid")); + } + if (cmdline.hasOption("ti")) { + truncationInterval = Integer.parseInt(cmdline.getOptionValue("ti")); + } + if (cmdline.hasOption("ssid")) { + startStreamId = Integer.parseInt(cmdline.getOptionValue("ssid")); + } + if (cmdline.hasOption("esid")) { + endStreamId = Integer.parseInt(cmdline.getOptionValue("esid")); + } + if (cmdline.hasOption("hccs")) { + hostConnectionCoreSize = Integer.parseInt(cmdline.getOptionValue("hccs")); + } + if (cmdline.hasOption("hcl")) { + hostConnectionLimit = Integer.parseInt(cmdline.getOptionValue("hcl")); + } + if (cmdline.hasOption("sb")) { + sendBufferSize = Integer.parseInt(cmdline.getOptionValue("sb")); + } + if (cmdline.hasOption("rb")) { + recvBufferSize = Integer.parseInt(cmdline.getOptionValue("rb")); + } + if (cmdline.hasOption("rs")) { + routingServiceFinagleNameString = cmdline.getOptionValue("rs"); + } + thriftmux = cmdline.hasOption("mx"); + handshakeWithClientInfo = cmdline.hasOption("hsci"); + readFromHead = cmdline.hasOption("rfh"); + enableBatching = cmdline.hasOption("bt"); + if (cmdline.hasOption("bbs")) { + batchBufferSize = Integer.parseInt(cmdline.getOptionValue("bbs")); + } + if (cmdline.hasOption("bfi")) { + batchFlushIntervalMicros = Integer.parseInt(cmdline.getOptionValue("bfi")); + } + + checkArgument(shardId >= 0, "shardId must be >= 0"); + checkArgument(numStreams > 0, "numStreams must be > 0"); + checkArgument(durationMins > 0, "durationMins must be > 0"); + checkArgument(streamPrefix != null, "streamPrefix must be defined"); + checkArgument(hostConnectionCoreSize > 0, "host connection core size must be > 0"); + checkArgument(hostConnectionLimit > 0, "host connection limit must be > 0"); + + if (cmdline.hasOption("p")) { + statsProvider = ReflectionUtils.newInstance(cmdline.getOptionValue("p"), StatsProvider.class); + } else { + statsProvider = new NullStatsProvider(); + } + + logger.info("Starting stats provider : {}.", statsProvider.getClass()); + statsProvider.start(conf); + + Worker w = null; + if (mode.startsWith("read")) { + w = runReader(); + } else if (mode.startsWith("write")) { + w = runWriter(); + } else if (mode.startsWith("dlwrite")) { + w = runDLWriter(); + } else if (mode.startsWith("dlread")) { + w = runDLReader(); + } + + if (w == null) { + throw new IOException("Unknown mode " + mode + " to run the benchmark."); + } + + Thread workerThread = new Thread(w, mode + "-benchmark-thread"); + workerThread.start(); + + TimeUnit.MINUTES.sleep(durationMins); + + logger.info("{} minutes passed, exiting...", durationMins); + w.close(); + + if (null != statsProvider) { + statsProvider.stop(); + } + + Runtime.getRuntime().exit(0); + } + + Worker runWriter() { + checkArgument(!finagleNames.isEmpty() || !serversetPaths.isEmpty() || null != dlUri, + "either serverset paths, finagle-names or uri required"); + checkArgument(msgSize > 0, "messagesize must be greater than 0"); + checkArgument(rate > 0, "rate must be greater than 0"); + checkArgument(maxRate >= rate, "max rate must be greater than rate"); + checkArgument(changeRate >= 0, "change rate must be positive"); + checkArgument(changeRateSeconds >= 0, "change rate must be positive"); + checkArgument(concurrency > 0, "concurrency must be greater than 0"); + + ShiftableRateLimiter rateLimiter = + new ShiftableRateLimiter(rate, maxRate, changeRate, changeRateSeconds, TimeUnit.SECONDS); + return createWriteWorker( + streamPrefix, + dlUri, + null == startStreamId ? shardId * numStreams : startStreamId, + null == endStreamId ? (shardId + 1) * numStreams : endStreamId, + rateLimiter, + concurrency, + msgSize, + batchSize, + hostConnectionCoreSize, + hostConnectionLimit, + serversetPaths, + finagleNames, + statsReceiver.scope("write_client"), + statsProvider.getStatsLogger("write"), + thriftmux, + handshakeWithClientInfo, + sendBufferSize, + recvBufferSize, + enableBatching, + batchBufferSize, + batchFlushIntervalMicros, + routingServiceFinagleNameString); + } + + protected WriterWorker createWriteWorker( + String streamPrefix, + URI uri, + int startStreamId, + int endStreamId, + ShiftableRateLimiter rateLimiter, + int writeConcurrency, + int messageSizeBytes, + int batchSize, + int hostConnectionCoreSize, + int hostConnectionLimit, + List<String> serverSetPaths, + List<String> finagleNames, + StatsReceiver statsReceiver, + StatsLogger statsLogger, + boolean thriftmux, + boolean handshakeWithClientInfo, + int sendBufferSize, + int recvBufferSize, + boolean enableBatching, + int batchBufferSize, + int batchFlushIntervalMicros, + String routingServiceFinagleNameString) { + return new WriterWorker( + streamPrefix, + uri, + startStreamId, + endStreamId, + rateLimiter, + writeConcurrency, + messageSizeBytes, + batchSize, + hostConnectionCoreSize, + hostConnectionLimit, + serverSetPaths, + finagleNames, + statsReceiver, + statsLogger, + thriftmux, + handshakeWithClientInfo, + sendBufferSize, + recvBufferSize, + enableBatching, + batchBufferSize, + batchFlushIntervalMicros, + routingServiceFinagleNameString); + } + + Worker runDLWriter() throws IOException { + checkNotNull(dlUri, "dlUri must be defined"); + checkArgument(rate > 0, "rate must be greater than 0"); + checkArgument(maxRate >= rate, "max rate must be greater than rate"); + checkArgument(changeRate >= 0, "change rate must be positive"); + checkArgument(changeRateSeconds >= 0, "change rate must be positive"); + checkArgument(concurrency > 0, "concurrency must be greater than 0"); + + ShiftableRateLimiter rateLimiter = + new ShiftableRateLimiter(rate, maxRate, changeRate, changeRateSeconds, TimeUnit.SECONDS); + + return new DLWriterWorker(conf, + dlUri, + streamPrefix, + shardId * numStreams, + (shardId + 1) * numStreams, + rateLimiter, + concurrency, + msgSize, + statsProvider.getStatsLogger("dlwrite")); + } + + Worker runReader() throws IOException { + checkArgument(!finagleNames.isEmpty() || !serversetPaths.isEmpty() || null != dlUri, + "either serverset paths, finagle-names or dlUri required"); + checkArgument(concurrency > 0, "concurrency must be greater than 0"); + checkArgument(truncationInterval > 0, "truncation interval should be greater than 0"); + return runReaderInternal(serversetPaths, finagleNames, truncationInterval); + } + + Worker runDLReader() throws IOException { + return runReaderInternal(new ArrayList<String>(), new ArrayList<String>(), 0); + } + + private Worker runReaderInternal(List<String> serversetPaths, + List<String> finagleNames, + int truncationInterval) throws IOException { + checkNotNull(dlUri); + + int ssid = null == startStreamId ? shardId * numStreams : startStreamId; + int esid = null == endStreamId ? (shardId + readersPerStream) * numStreams : endStreamId; + if (null != maxStreamId) { + esid = Math.min(esid, maxStreamId); + } + + return createReaderWorker( + conf, + dlUri, + streamPrefix, + ssid, + esid, + concurrency, + serversetPaths, + finagleNames, + truncationInterval, + readFromHead, + statsReceiver, + statsProvider.getStatsLogger("dlreader")); + } + + protected ReaderWorker createReaderWorker( + DistributedLogConfiguration conf, + URI uri, + String streamPrefix, + int startStreamId, + int endStreamId, + int readThreadPoolSize, + List<String> serverSetPaths, + List<String> finagleNames, + int truncationIntervalInSeconds, + boolean readFromHead, /* read from the earliest data of log */ + StatsReceiver statsReceiver, + StatsLogger statsLogger) throws IOException { + return new ReaderWorker( + conf, + uri, + streamPrefix, + startStreamId, + endStreamId, + readThreadPoolSize, + serverSetPaths, + finagleNames, + truncationIntervalInSeconds, + readFromHead, + statsReceiver, + statsLogger); + } + + public static void main(String[] args) { + Benchmarker benchmarker = new Benchmarker(args); + try { + benchmarker.run(); + } catch (Exception e) { + logger.info("Benchmark quit due to : ", e); + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/DLWriterWorker.java ---------------------------------------------------------------------- diff --git a/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/DLWriterWorker.java b/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/DLWriterWorker.java new file mode 100644 index 0000000..a5e7a0a --- /dev/null +++ b/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/DLWriterWorker.java @@ -0,0 +1,245 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog.benchmark; + +import static com.google.common.base.Preconditions.checkArgument; + +import org.apache.distributedlog.AsyncLogWriter; +import org.apache.distributedlog.DLSN; +import org.apache.distributedlog.DistributedLogConfiguration; +import org.apache.distributedlog.DistributedLogManager; +import org.apache.distributedlog.LogRecord; +import org.apache.distributedlog.benchmark.utils.ShiftableRateLimiter; +import org.apache.distributedlog.namespace.DistributedLogNamespace; +import org.apache.distributedlog.namespace.DistributedLogNamespaceBuilder; +import org.apache.distributedlog.util.FutureUtils; +import org.apache.distributedlog.util.SchedulerUtils; +import com.twitter.util.FutureEventListener; +import java.io.IOException; +import java.net.URI; +import java.util.ArrayList; +import java.util.List; +import java.util.Random; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import org.apache.bookkeeper.stats.OpStatsLogger; +import org.apache.bookkeeper.stats.StatsLogger; +import org.apache.thrift.TException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * The benchmark for core library writer. + */ +public class DLWriterWorker implements Worker { + + private static final Logger LOG = LoggerFactory.getLogger(DLWriterWorker.class); + + static final int BACKOFF_MS = 200; + + final String streamPrefix; + final int startStreamId; + final int endStreamId; + final int writeConcurrency; + final int messageSizeBytes; + final ExecutorService executorService; + final ScheduledExecutorService rescueService; + final ShiftableRateLimiter rateLimiter; + final Random random; + final DistributedLogNamespace namespace; + final List<DistributedLogManager> dlms; + final List<AsyncLogWriter> streamWriters; + final int numStreams; + + volatile boolean running = true; + + final StatsLogger statsLogger; + final OpStatsLogger requestStat; + + public DLWriterWorker(DistributedLogConfiguration conf, + URI uri, + String streamPrefix, + int startStreamId, + int endStreamId, + ShiftableRateLimiter rateLimiter, + int writeConcurrency, + int messageSizeBytes, + StatsLogger statsLogger) throws IOException { + checkArgument(startStreamId <= endStreamId); + this.streamPrefix = streamPrefix; + this.startStreamId = startStreamId; + this.endStreamId = endStreamId; + this.rateLimiter = rateLimiter; + this.writeConcurrency = writeConcurrency; + this.messageSizeBytes = messageSizeBytes; + this.statsLogger = statsLogger; + this.requestStat = this.statsLogger.getOpStatsLogger("requests"); + this.executorService = Executors.newCachedThreadPool(); + this.rescueService = Executors.newSingleThreadScheduledExecutor(); + this.random = new Random(System.currentTimeMillis()); + + this.namespace = DistributedLogNamespaceBuilder.newBuilder() + .conf(conf) + .uri(uri) + .statsLogger(statsLogger.scope("dl")) + .build(); + this.numStreams = endStreamId - startStreamId; + dlms = new ArrayList<DistributedLogManager>(numStreams); + streamWriters = new ArrayList<AsyncLogWriter>(numStreams); + final ConcurrentMap<String, AsyncLogWriter> writers = new ConcurrentHashMap<String, AsyncLogWriter>(); + final CountDownLatch latch = new CountDownLatch(this.numStreams); + for (int i = startStreamId; i < endStreamId; i++) { + final String streamName = String.format("%s_%d", streamPrefix, i); + final DistributedLogManager dlm = namespace.openLog(streamName); + executorService.submit(new Runnable() { + @Override + public void run() { + try { + AsyncLogWriter writer = dlm.startAsyncLogSegmentNonPartitioned(); + if (null != writers.putIfAbsent(streamName, writer)) { + FutureUtils.result(writer.asyncClose()); + } + latch.countDown(); + } catch (IOException e) { + LOG.error("Failed to intialize writer for stream : {}", streamName, e); + } + + } + }); + dlms.add(dlm); + } + try { + latch.await(); + } catch (InterruptedException e) { + throw new IOException("Interrupted on initializing writers for streams.", e); + } + for (int i = startStreamId; i < endStreamId; i++) { + final String streamName = String.format("%s_%d", streamPrefix, i); + AsyncLogWriter writer = writers.get(streamName); + if (null == writer) { + throw new IOException("Writer for " + streamName + " never initialized."); + } + streamWriters.add(writer); + } + LOG.info("Writing to {} streams.", numStreams); + } + + void rescueWriter(int idx, AsyncLogWriter writer) { + if (streamWriters.get(idx) == writer) { + try { + FutureUtils.result(writer.asyncClose()); + } catch (IOException e) { + LOG.error("Failed to close writer for stream {}.", idx); + } + AsyncLogWriter newWriter = null; + try { + newWriter = dlms.get(idx).startAsyncLogSegmentNonPartitioned(); + } catch (IOException e) { + LOG.error("Failed to create new writer for stream {}, backoff for {} ms.", + idx, BACKOFF_MS); + scheduleRescue(idx, writer, BACKOFF_MS); + } + streamWriters.set(idx, newWriter); + } else { + LOG.warn("AsyncLogWriter for stream {} was already rescued.", idx); + } + } + + void scheduleRescue(final int idx, final AsyncLogWriter writer, int delayMs) { + Runnable r = new Runnable() { + @Override + public void run() { + rescueWriter(idx, writer); + } + }; + if (delayMs > 0) { + rescueService.schedule(r, delayMs, TimeUnit.MILLISECONDS); + } else { + rescueService.submit(r); + } + } + + @Override + public void close() throws IOException { + this.running = false; + SchedulerUtils.shutdownScheduler(this.executorService, 2, TimeUnit.MINUTES); + SchedulerUtils.shutdownScheduler(this.rescueService, 2, TimeUnit.MINUTES); + for (AsyncLogWriter writer : streamWriters) { + FutureUtils.result(writer.asyncClose()); + } + for (DistributedLogManager dlm : dlms) { + dlm.close(); + } + namespace.close(); + } + + @Override + public void run() { + LOG.info("Starting dlwriter (concurrency = {}, prefix = {}, numStreams = {})", + new Object[] { writeConcurrency, streamPrefix, numStreams }); + for (int i = 0; i < writeConcurrency; i++) { + executorService.submit(new Writer(i)); + } + } + + class Writer implements Runnable { + + final int idx; + + Writer(int idx) { + this.idx = idx; + } + + @Override + public void run() { + LOG.info("Started writer {}.", idx); + while (running) { + final int streamIdx = random.nextInt(numStreams); + final AsyncLogWriter writer = streamWriters.get(streamIdx); + rateLimiter.getLimiter().acquire(); + final long requestMillis = System.currentTimeMillis(); + final byte[] data; + try { + data = Utils.generateMessage(requestMillis, messageSizeBytes); + } catch (TException e) { + LOG.error("Error on generating message : ", e); + break; + } + writer.write(new LogRecord(requestMillis, data)).addEventListener(new FutureEventListener<DLSN>() { + @Override + public void onSuccess(DLSN value) { + requestStat.registerSuccessfulEvent(System.currentTimeMillis() - requestMillis); + } + + @Override + public void onFailure(Throwable cause) { + requestStat.registerFailedEvent(System.currentTimeMillis() - requestMillis); + LOG.error("Failed to publish, rescue it : ", cause); + scheduleRescue(streamIdx, writer, 0); + } + }); + } + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/ReaderWorker.java ---------------------------------------------------------------------- diff --git a/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/ReaderWorker.java b/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/ReaderWorker.java new file mode 100644 index 0000000..11cba6f --- /dev/null +++ b/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/ReaderWorker.java @@ -0,0 +1,468 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog.benchmark; + +import static com.google.common.base.Preconditions.checkArgument; + +import com.google.common.base.Stopwatch; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import com.twitter.common.zookeeper.ServerSet; +import org.apache.distributedlog.AsyncLogReader; +import org.apache.distributedlog.DLSN; +import org.apache.distributedlog.DistributedLogConfiguration; +import org.apache.distributedlog.DistributedLogManager; +import org.apache.distributedlog.LogRecordSet; +import org.apache.distributedlog.LogRecordWithDLSN; +import org.apache.distributedlog.benchmark.thrift.Message; +import org.apache.distributedlog.client.serverset.DLZkServerSet; +import org.apache.distributedlog.exceptions.DLInterruptedException; +import org.apache.distributedlog.namespace.DistributedLogNamespace; +import org.apache.distributedlog.namespace.DistributedLogNamespaceBuilder; +import org.apache.distributedlog.service.DistributedLogClient; +import org.apache.distributedlog.service.DistributedLogClientBuilder; +import org.apache.distributedlog.util.FutureUtils; +import org.apache.distributedlog.util.SchedulerUtils; +import com.twitter.finagle.builder.ClientBuilder; +import com.twitter.finagle.stats.StatsReceiver; +import com.twitter.finagle.thrift.ClientId$; +import com.twitter.util.Duration$; +import com.twitter.util.Function; +import com.twitter.util.Future; +import com.twitter.util.FutureEventListener; +import com.twitter.util.Promise; +import java.io.IOException; +import java.net.URI; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import org.apache.bookkeeper.stats.Counter; +import org.apache.bookkeeper.stats.Gauge; +import org.apache.bookkeeper.stats.OpStatsLogger; +import org.apache.bookkeeper.stats.StatsLogger; +import org.apache.thrift.TException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * The benchmark for core library reader. + */ +public class ReaderWorker implements Worker { + + private static final Logger LOG = LoggerFactory.getLogger(ReaderWorker.class); + + static final int BACKOFF_MS = 200; + + final String streamPrefix; + final int startStreamId; + final int endStreamId; + final ScheduledExecutorService executorService; + final ExecutorService callbackExecutor; + final DistributedLogNamespace namespace; + final DistributedLogManager[] dlms; + final AsyncLogReader[] logReaders; + final StreamReader[] streamReaders; + final int numStreams; + final boolean readFromHead; + + final int truncationIntervalInSeconds; + // DL Client Related Variables + final DLZkServerSet[] serverSets; + final List<String> finagleNames; + final DistributedLogClient dlc; + + volatile boolean running = true; + + final StatsReceiver statsReceiver; + final StatsLogger statsLogger; + final OpStatsLogger e2eStat; + final OpStatsLogger deliveryStat; + final OpStatsLogger negativeE2EStat; + final OpStatsLogger negativeDeliveryStat; + final OpStatsLogger truncationStat; + final Counter invalidRecordsCounter; + final Counter outOfOrderSequenceIdCounter; + + class StreamReader implements FutureEventListener<List<LogRecordWithDLSN>>, Runnable, Gauge<Number> { + + final int streamIdx; + final String streamName; + DLSN prevDLSN = null; + long prevSequenceId = Long.MIN_VALUE; + private static final String gaugeLabel = "sequence_id"; + + StreamReader(int idx, StatsLogger statsLogger) { + this.streamIdx = idx; + int streamId = startStreamId + streamIdx; + streamName = String.format("%s_%d", streamPrefix, streamId); + statsLogger.scope(streamName).registerGauge(gaugeLabel, this); + } + + @Override + public void onSuccess(final List<LogRecordWithDLSN> records) { + for (final LogRecordWithDLSN record : records) { + if (record.isRecordSet()) { + try { + processRecordSet(record); + } catch (IOException e) { + onFailure(e); + } + } else { + processRecord(record); + } + } + readLoop(); + } + + public void processRecordSet(final LogRecordWithDLSN record) throws IOException { + LogRecordSet.Reader reader = LogRecordSet.of(record); + LogRecordWithDLSN nextRecord = reader.nextRecord(); + while (null != nextRecord) { + processRecord(nextRecord); + nextRecord = reader.nextRecord(); + } + } + + public void processRecord(final LogRecordWithDLSN record) { + Message msg; + try { + msg = Utils.parseMessage(record.getPayload()); + } catch (TException e) { + invalidRecordsCounter.inc(); + LOG.warn("Failed to parse record {} for stream {} : size = {} , ", + new Object[] { record, streamIdx, record.getPayload().length, e }); + return; + } + long curTimeMillis = System.currentTimeMillis(); + long e2eLatency = curTimeMillis - msg.getPublishTime(); + long deliveryLatency = curTimeMillis - record.getTransactionId(); + if (e2eLatency >= 0) { + e2eStat.registerSuccessfulEvent(e2eLatency); + } else { + negativeE2EStat.registerSuccessfulEvent(-e2eLatency); + } + if (deliveryLatency >= 0) { + deliveryStat.registerSuccessfulEvent(deliveryLatency); + } else { + negativeDeliveryStat.registerSuccessfulEvent(-deliveryLatency); + } + + prevDLSN = record.getDlsn(); + } + + @Override + public void onFailure(Throwable cause) { + scheduleReinitStream(streamIdx).map(new Function<Void, Void>() { + @Override + public Void apply(Void value) { + prevDLSN = null; + prevSequenceId = Long.MIN_VALUE; + readLoop(); + return null; + } + }); + } + + void readLoop() { + if (!running) { + return; + } + logReaders[streamIdx].readBulk(10).addEventListener(this); + } + + @Override + public void run() { + final DLSN dlsnToTruncate = prevDLSN; + if (null == dlsnToTruncate) { + return; + } + final Stopwatch stopwatch = Stopwatch.createStarted(); + dlc.truncate(streamName, dlsnToTruncate).addEventListener( + new FutureEventListener<Boolean>() { + @Override + public void onSuccess(Boolean value) { + truncationStat.registerSuccessfulEvent(stopwatch.stop().elapsed(TimeUnit.MILLISECONDS)); + } + + @Override + public void onFailure(Throwable cause) { + truncationStat.registerFailedEvent(stopwatch.stop().elapsed(TimeUnit.MILLISECONDS)); + LOG.error("Failed to truncate stream {} to {} : ", + new Object[]{streamName, dlsnToTruncate, cause}); + } + }); + } + + @Override + public Number getDefaultValue() { + return Long.MIN_VALUE; + } + + @Override + public synchronized Number getSample() { + return prevSequenceId; + } + + void unregisterGauge() { + statsLogger.scope(streamName).unregisterGauge(gaugeLabel, this); + } + } + + public ReaderWorker(DistributedLogConfiguration conf, + URI uri, + String streamPrefix, + int startStreamId, + int endStreamId, + int readThreadPoolSize, + List<String> serverSetPaths, + List<String> finagleNames, + int truncationIntervalInSeconds, + boolean readFromHead, /* read from the earliest data of log */ + StatsReceiver statsReceiver, + StatsLogger statsLogger) throws IOException { + checkArgument(startStreamId <= endStreamId); + this.streamPrefix = streamPrefix; + this.startStreamId = startStreamId; + this.endStreamId = endStreamId; + this.truncationIntervalInSeconds = truncationIntervalInSeconds; + this.readFromHead = readFromHead; + this.statsReceiver = statsReceiver; + this.statsLogger = statsLogger; + this.e2eStat = this.statsLogger.getOpStatsLogger("e2e"); + this.negativeE2EStat = this.statsLogger.getOpStatsLogger("e2eNegative"); + this.deliveryStat = this.statsLogger.getOpStatsLogger("delivery"); + this.negativeDeliveryStat = this.statsLogger.getOpStatsLogger("deliveryNegative"); + this.truncationStat = this.statsLogger.getOpStatsLogger("truncation"); + this.invalidRecordsCounter = this.statsLogger.getCounter("invalid_records"); + this.outOfOrderSequenceIdCounter = this.statsLogger.getCounter("out_of_order_seq_id"); + this.executorService = Executors.newScheduledThreadPool( + readThreadPoolSize, new ThreadFactoryBuilder().setNameFormat("benchmark.reader-%d").build()); + this.callbackExecutor = Executors.newFixedThreadPool( + Runtime.getRuntime().availableProcessors(), + new ThreadFactoryBuilder().setNameFormat("benchmark.reader-callback-%d").build()); + this.finagleNames = finagleNames; + this.serverSets = createServerSets(serverSetPaths); + + conf.setDeserializeRecordSetOnReads(false); + + if (truncationIntervalInSeconds > 0 && (!finagleNames.isEmpty() || !serverSetPaths.isEmpty())) { + // Construct client for truncation + DistributedLogClientBuilder builder = DistributedLogClientBuilder.newBuilder() + .clientId(ClientId$.MODULE$.apply("dlog_loadtest_reader")) + .clientBuilder(ClientBuilder.get() + .hostConnectionLimit(10) + .hostConnectionCoresize(10) + .tcpConnectTimeout(Duration$.MODULE$.fromSeconds(1)) + .requestTimeout(Duration$.MODULE$.fromSeconds(2))) + .redirectBackoffStartMs(100) + .redirectBackoffMaxMs(500) + .requestTimeoutMs(2000) + .statsReceiver(statsReceiver) + .thriftmux(true) + .name("reader"); + + if (serverSetPaths.isEmpty()) { + // Prepare finagle names + String local = finagleNames.get(0); + String[] remotes = new String[finagleNames.size() - 1]; + finagleNames.subList(1, finagleNames.size()).toArray(remotes); + + builder = builder.finagleNameStrs(local, remotes); + LOG.info("Initialized distributedlog client for truncation @ {}.", finagleNames); + } else if (serverSets.length != 0){ + ServerSet local = this.serverSets[0].getServerSet(); + ServerSet[] remotes = new ServerSet[this.serverSets.length - 1]; + for (int i = 1; i < serverSets.length; i++) { + remotes[i - 1] = serverSets[i].getServerSet(); + } + + builder = builder.serverSets(local, remotes); + LOG.info("Initialized distributedlog client for truncation @ {}.", serverSetPaths); + } else { + builder = builder.uri(uri); + LOG.info("Initialized distributedlog client for namespace {}", uri); + } + dlc = builder.build(); + } else { + dlc = null; + } + + // construct the factory + this.namespace = DistributedLogNamespaceBuilder.newBuilder() + .conf(conf) + .uri(uri) + .statsLogger(statsLogger.scope("dl")) + .build(); + this.numStreams = endStreamId - startStreamId; + this.dlms = new DistributedLogManager[numStreams]; + this.logReaders = new AsyncLogReader[numStreams]; + final CountDownLatch latch = new CountDownLatch(numStreams); + for (int i = 0; i < numStreams; i++) { + final int idx = i; + executorService.submit(new Runnable() { + @Override + public void run() { + reinitStream(idx).map(new Function<Void, Void>() { + @Override + public Void apply(Void value) { + LOG.info("Initialized stream reader {}.", idx); + latch.countDown(); + return null; + } + }); + } + }); + } + try { + latch.await(); + } catch (InterruptedException e) { + throw new DLInterruptedException("Failed to intialize benchmark readers : ", e); + } + this.streamReaders = new StreamReader[numStreams]; + for (int i = 0; i < numStreams; i++) { + streamReaders[i] = new StreamReader(i, statsLogger.scope("perstream")); + if (truncationIntervalInSeconds > 0) { + executorService.scheduleWithFixedDelay(streamReaders[i], + truncationIntervalInSeconds, truncationIntervalInSeconds, TimeUnit.SECONDS); + } + } + LOG.info("Initialized benchmark reader on {} streams {} : [{} - {})", + new Object[] { numStreams, streamPrefix, startStreamId, endStreamId }); + } + + protected DLZkServerSet[] createServerSets(List<String> serverSetPaths) { + DLZkServerSet[] serverSets = new DLZkServerSet[serverSetPaths.size()]; + for (int i = 0; i < serverSets.length; i++) { + String serverSetPath = serverSetPaths.get(i); + serverSets[i] = DLZkServerSet.of(URI.create(serverSetPath), 60000); + } + return serverSets; + } + + private Future<Void> reinitStream(int idx) { + Promise<Void> promise = new Promise<Void>(); + reinitStream(idx, promise); + return promise; + } + + private void reinitStream(int idx, Promise<Void> promise) { + int streamId = startStreamId + idx; + String streamName = String.format("%s_%d", streamPrefix, streamId); + + if (logReaders[idx] != null) { + try { + FutureUtils.result(logReaders[idx].asyncClose()); + } catch (IOException e) { + LOG.warn("Failed on closing stream reader {} : ", streamName, e); + } + logReaders[idx] = null; + } + if (dlms[idx] != null) { + try { + dlms[idx].close(); + } catch (IOException e) { + LOG.warn("Failed on closing dlm {} : ", streamName, e); + } + dlms[idx] = null; + } + + try { + dlms[idx] = namespace.openLog(streamName); + } catch (IOException ioe) { + LOG.error("Failed on creating dlm {} : ", streamName, ioe); + scheduleReinitStream(idx, promise); + return; + } + DLSN lastDLSN; + if (readFromHead) { + lastDLSN = DLSN.InitialDLSN; + } else { + try { + lastDLSN = dlms[idx].getLastDLSN(); + } catch (IOException ioe) { + LOG.error("Failed on getting last dlsn from stream {} : ", streamName, ioe); + scheduleReinitStream(idx, promise); + return; + } + } + try { + logReaders[idx] = dlms[idx].getAsyncLogReader(lastDLSN); + } catch (IOException ioe) { + LOG.error("Failed on opening reader for stream {} starting from {} : ", + new Object[] { streamName, lastDLSN, ioe }); + scheduleReinitStream(idx, promise); + return; + } + LOG.info("Opened reader for stream {}, starting from {}.", streamName, lastDLSN); + promise.setValue(null); + } + + Future<Void> scheduleReinitStream(int idx) { + Promise<Void> promise = new Promise<Void>(); + scheduleReinitStream(idx, promise); + return promise; + } + + void scheduleReinitStream(final int idx, final Promise<Void> promise) { + executorService.schedule(new Runnable() { + @Override + public void run() { + reinitStream(idx, promise); + } + }, BACKOFF_MS, TimeUnit.MILLISECONDS); + } + + @Override + public void close() throws IOException { + this.running = false; + for (AsyncLogReader reader : logReaders) { + if (null != reader) { + FutureUtils.result(reader.asyncClose()); + } + } + for (DistributedLogManager dlm : dlms) { + if (null != dlm) { + dlm.close(); + } + } + namespace.close(); + SchedulerUtils.shutdownScheduler(executorService, 2, TimeUnit.MINUTES); + SchedulerUtils.shutdownScheduler(callbackExecutor, 2, TimeUnit.MINUTES); + if (this.dlc != null) { + this.dlc.close(); + } + for (DLZkServerSet serverSet: serverSets) { + serverSet.close(); + } + // Unregister gauges to prevent GC spirals + for (StreamReader sr : streamReaders) { + sr.unregisterGauge(); + } + } + + @Override + public void run() { + LOG.info("Starting reader (prefix = {}, numStreams = {}).", + streamPrefix, numStreams); + for (StreamReader sr : streamReaders) { + sr.readLoop(); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/Utils.java ---------------------------------------------------------------------- diff --git a/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/Utils.java b/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/Utils.java new file mode 100644 index 0000000..81f99ef --- /dev/null +++ b/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/Utils.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.distributedlog.benchmark; + +import org.apache.distributedlog.benchmark.thrift.Message; +import java.nio.ByteBuffer; +import java.util.Random; +import org.apache.thrift.TException; +import org.apache.thrift.TSerializer; +import org.apache.thrift.protocol.TBinaryProtocol; +import org.apache.thrift.transport.TMemoryInputTransport; + +/** + * Utils for generating and parsing messages. + */ +public class Utils { + + static final Random RAND = new Random(System.currentTimeMillis()); + static final ThreadLocal<TSerializer> MSG_SERIALIZER = + new ThreadLocal<TSerializer>() { + @Override + public TSerializer initialValue() { + return new TSerializer(new TBinaryProtocol.Factory()); + } + }; + + public static byte[] generateMessage(long requestMillis, int payLoadSize) throws TException { + byte[] payload = new byte[payLoadSize]; + RAND.nextBytes(payload); + Message msg = new Message(requestMillis, ByteBuffer.wrap(payload)); + return MSG_SERIALIZER.get().serialize(msg); + } + + public static Message parseMessage(byte[] data) throws TException { + Message msg = new Message(); + TMemoryInputTransport transport = new TMemoryInputTransport(data); + TBinaryProtocol protocol = new TBinaryProtocol(transport); + msg.read(protocol); + return msg; + } + +}