Repository: incubator-distributedlog Updated Branches: refs/heads/merge/DL-98 [created] 28a8b8ff9
when publishing per partition stats, also publish per stream stats RB_ID=820062 Project: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/commit/904b8986 Tree: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/tree/904b8986 Diff: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/diff/904b8986 Branch: refs/heads/merge/DL-98 Commit: 904b8986b46fc908cfd1a0ee05b35dce77d12f5c Parents: 3fceccc Author: Jordan Bull <jb...@twitter.com> Authored: Fri May 13 11:27:19 2016 -0700 Committer: Sijie Guo <sij...@twitter.com> Committed: Mon Dec 12 16:16:37 2016 -0800 ---------------------------------------------------------------------- .../service/DistributedLogServiceImpl.java | 11 +++++++---- .../service/stream/BulkWriteOp.java | 8 ++++++-- .../service/stream/StreamImpl.java | 4 ++-- .../service/stream/StreamOpStats.java | 20 ++++++++++++-------- .../distributedlog/service/stream/WriteOp.java | 8 ++++++-- .../service/TestDistributedLogServer.java | 8 ++++---- .../service/TestDistributedLogService.java | 1 + .../service/stream/TestStreamOp.java | 3 ++- 8 files changed, 40 insertions(+), 23 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/904b8986/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServiceImpl.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServiceImpl.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServiceImpl.java index de475ea..751e972 100644 --- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServiceImpl.java +++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServiceImpl.java @@ -132,6 +132,7 @@ public class DistributedLogServiceImpl implements DistributedLogService.ServiceI // Stats private final StatsLogger statsLogger; private final StatsLogger perStreamStatsLogger; + private final StreamPartitionConverter streamPartitionConverter; private final StreamOpStats streamOpStats; private final Counter bulkWritePendingStat; private final Counter writePendingStat; @@ -158,6 +159,7 @@ public class DistributedLogServiceImpl implements DistributedLogService.ServiceI this.perStreamStatsLogger = perStreamStatsLogger; this.dlsnVersion = serverConf.getDlsnVersion(); this.serverRegionId = serverConf.getRegionId(); + this.streamPartitionConverter = converter; int serverPort = serverConf.getServerPort(); int shard = serverConf.getServerShardId(); int numThreads = serverConf.getServerThreads(); @@ -396,8 +398,8 @@ public class DistributedLogServiceImpl implements DistributedLogService.ServiceI public Future<BulkWriteResponse> writeBulkWithContext(final String stream, List<ByteBuffer> data, WriteContext ctx) { bulkWritePendingStat.inc(); receivedRecordCounter.add(data.size()); - BulkWriteOp op = new BulkWriteOp(stream, data, statsLogger, perStreamStatsLogger, getChecksum(ctx), - featureChecksumDisabled, accessControlManager); + BulkWriteOp op = new BulkWriteOp(stream, data, statsLogger, perStreamStatsLogger, streamPartitionConverter, + getChecksum(ctx), featureChecksumDisabled, accessControlManager); executeStreamOp(op); return op.result().ensure(new Function0<BoxedUnit>() { public BoxedUnit apply() { @@ -675,8 +677,9 @@ public class DistributedLogServiceImpl implements DistributedLogService.ServiceI ByteBuffer data, Long checksum, boolean isRecordSet) { - return new WriteOp(stream, data, statsLogger, perStreamStatsLogger, serverConfig, dlsnVersion, - checksum, isRecordSet, featureChecksumDisabled, accessControlManager); + return new WriteOp(stream, data, statsLogger, perStreamStatsLogger, streamPartitionConverter, + serverConfig, dlsnVersion, checksum, isRecordSet, featureChecksumDisabled, + accessControlManager); } @VisibleForTesting http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/904b8986/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/BulkWriteOp.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/BulkWriteOp.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/BulkWriteOp.java index 96a37cd..c009bb9 100644 --- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/BulkWriteOp.java +++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/BulkWriteOp.java @@ -33,6 +33,8 @@ import com.twitter.distributedlog.exceptions.DLException; import com.twitter.distributedlog.exceptions.OwnershipAcquireFailedException; import com.twitter.distributedlog.exceptions.RequestDeniedException; import com.twitter.distributedlog.service.ResponseUtils; +import com.twitter.distributedlog.service.streamset.Partition; +import com.twitter.distributedlog.service.streamset.StreamPartitionConverter; import com.twitter.distributedlog.thrift.service.BulkWriteResponse; import com.twitter.distributedlog.thrift.service.ResponseHeader; import com.twitter.distributedlog.thrift.service.StatusCode; @@ -88,6 +90,7 @@ public class BulkWriteOp extends AbstractStreamOp<BulkWriteResponse> implements List<ByteBuffer> buffers, StatsLogger statsLogger, StatsLogger perStreamStatsLogger, + StreamPartitionConverter streamPartitionConverter, Long checksum, Feature checksumDisabledFeature, AccessControlManager accessControlManager) { @@ -100,6 +103,7 @@ public class BulkWriteOp extends AbstractStreamOp<BulkWriteResponse> implements } this.payloadSize = total; + final Partition partition = streamPartitionConverter.convert(stream); // Write record stats StreamOpStats streamOpStats = new StreamOpStats(statsLogger, perStreamStatsLogger); this.deniedBulkWriteCounter = streamOpStats.requestDeniedCounter("bulkWrite"); @@ -107,8 +111,8 @@ public class BulkWriteOp extends AbstractStreamOp<BulkWriteResponse> implements this.failureRecordCounter = streamOpStats.recordsCounter("failure"); this.redirectRecordCounter = streamOpStats.recordsCounter("redirect"); this.bulkWriteBytes = streamOpStats.scopedRequestCounter("bulkWrite", "bytes"); - this.latencyStat = streamOpStats.streamRequestLatencyStat(stream, "bulkWrite"); - this.bytes = streamOpStats.streamRequestCounter(stream, "bulkWrite", "bytes"); + this.latencyStat = streamOpStats.streamRequestLatencyStat(partition, "bulkWrite"); + this.bytes = streamOpStats.streamRequestCounter(partition, "bulkWrite", "bytes"); this.accessControlManager = accessControlManager; http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/904b8986/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamImpl.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamImpl.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamImpl.java index 04f793f..45630fe 100644 --- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamImpl.java +++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamImpl.java @@ -194,12 +194,12 @@ public class StreamImpl implements Stream { this.dynConf = streamConf; StatsLogger limiterStatsLogger = BroadCastStatsLogger.two( streamOpStats.baseScope("stream_limiter"), - streamOpStats.streamRequestScope(name, "limiter")); + streamOpStats.streamRequestScope(partition, "limiter")); this.limiter = new StreamRequestLimiter(name, dynConf, limiterStatsLogger, featureRateLimitDisabled); this.requestTimer = requestTimer; // Stats - this.streamLogger = streamOpStats.streamRequestStatsLogger(name); + this.streamLogger = streamOpStats.streamRequestStatsLogger(partition); this.limiterStatLogger = streamOpStats.baseScope("request_limiter"); this.streamExceptionStatLogger = streamLogger.scope("exceptions"); this.serviceTimeout = streamOpStats.baseCounter("serviceTimeout"); http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/904b8986/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamOpStats.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamOpStats.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamOpStats.java index adf89a3..2a44d88 100644 --- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamOpStats.java +++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamOpStats.java @@ -17,6 +17,8 @@ */ package com.twitter.distributedlog.service.stream; +import com.twitter.distributedlog.stats.BroadCastStatsLogger; +import com.twitter.distributedlog.service.streamset.Partition; import org.apache.bookkeeper.stats.Counter; import org.apache.bookkeeper.stats.OpStatsLogger; import org.apache.bookkeeper.stats.StatsLogger; @@ -81,19 +83,21 @@ public class StreamOpStats { return recordsStatsLogger.getCounter(counterName); } - public StatsLogger streamRequestStatsLogger(String streamName) { - return streamStatsLogger.scope(streamName); + public StatsLogger streamRequestStatsLogger(Partition partition) { + return BroadCastStatsLogger.masterslave( + streamStatsLogger.scope(partition.getStream()).scope("partition").scope(Integer.toString(partition.getId())), + streamStatsLogger.scope(partition.getStream()).scope("aggregate")); } - public StatsLogger streamRequestScope(String streamName, String scopeName) { - return streamRequestStatsLogger(streamName).scope(scopeName); + public StatsLogger streamRequestScope(Partition partition, String scopeName) { + return streamRequestStatsLogger(partition).scope(scopeName); } - public OpStatsLogger streamRequestLatencyStat(String streamName, String opName) { - return streamRequestStatsLogger(streamName).getOpStatsLogger(opName); + public OpStatsLogger streamRequestLatencyStat(Partition partition, String opName) { + return streamRequestStatsLogger(partition).getOpStatsLogger(opName); } - public Counter streamRequestCounter(String streamName, String opName, String counterName) { - return streamRequestScope(streamName, opName).getCounter(counterName); + public Counter streamRequestCounter(Partition partition, String opName, String counterName) { + return streamRequestScope(partition, opName).getCounter(counterName); } } http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/904b8986/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/WriteOp.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/WriteOp.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/WriteOp.java index c74f2cd..e9f2f4e 100644 --- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/WriteOp.java +++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/WriteOp.java @@ -26,6 +26,8 @@ import com.twitter.distributedlog.service.config.ServerConfiguration; import com.twitter.distributedlog.exceptions.DLException; import com.twitter.distributedlog.exceptions.RequestDeniedException; import com.twitter.distributedlog.service.ResponseUtils; +import com.twitter.distributedlog.service.streamset.Partition; +import com.twitter.distributedlog.service.streamset.StreamPartitionConverter; import com.twitter.distributedlog.thrift.service.WriteResponse; import com.twitter.distributedlog.thrift.service.ResponseHeader; import com.twitter.distributedlog.thrift.service.StatusCode; @@ -67,6 +69,7 @@ public class WriteOp extends AbstractWriteOp implements WriteOpWithPayload { ByteBuffer data, StatsLogger statsLogger, StatsLogger perStreamStatsLogger, + StreamPartitionConverter streamPartitionConverter, ServerConfiguration conf, byte dlsnVersion, Long checksum, @@ -78,14 +81,15 @@ public class WriteOp extends AbstractWriteOp implements WriteOpWithPayload { data.get(payload); this.isRecordSet = isRecordSet; + final Partition partition = streamPartitionConverter.convert(stream); StreamOpStats streamOpStats = new StreamOpStats(statsLogger, perStreamStatsLogger); this.successRecordCounter = streamOpStats.recordsCounter("success"); this.failureRecordCounter = streamOpStats.recordsCounter("failure"); this.redirectRecordCounter = streamOpStats.recordsCounter("redirect"); this.deniedWriteCounter = streamOpStats.requestDeniedCounter("write"); this.writeBytes = streamOpStats.scopedRequestCounter("write", "bytes"); - this.latencyStat = streamOpStats.streamRequestLatencyStat(stream, "write"); - this.bytes = streamOpStats.streamRequestCounter(stream, "write", "bytes"); + this.latencyStat = streamOpStats.streamRequestLatencyStat(partition, "write"); + this.bytes = streamOpStats.streamRequestCounter(partition, "write", "bytes"); this.dlsnVersion = dlsnVersion; this.accessControlManager = accessControlManager; http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/904b8986/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestDistributedLogServer.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestDistributedLogServer.java b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestDistributedLogServer.java index 94e8755..63723ef 100644 --- a/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestDistributedLogServer.java +++ b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestDistributedLogServer.java @@ -106,9 +106,9 @@ public class TestDistributedLogServer extends DistributedLogServerTestCase { int numRead = 0; LogRecord r = reader.readNext(false); while (null != r) { - int i = Integer.parseInt(new String(r.getPayload())); - assertEquals(numRead + 1, i); ++numRead; + int i = Integer.parseInt(new String(r.getPayload())); + assertEquals(numRead, i); r = reader.readNext(false); } assertEquals(10, numRead); @@ -121,7 +121,7 @@ public class TestDistributedLogServer extends DistributedLogServerTestCase { */ @Test(timeout = 60000) public void testChecksumFlag() throws Exception { - String name = "dlserver-basic-write"; + String name = "testChecksumFlag"; LocalRoutingService routingService = LocalRoutingService.newBuilder().build(); routingService.addHost(name, dlServer.getAddress()); DistributedLogClientBuilder dlClientBuilder = DistributedLogClientBuilder.newBuilder() @@ -134,7 +134,7 @@ public class TestDistributedLogServer extends DistributedLogServerTestCase { .connectionTimeout(Duration.fromSeconds(1)) .requestTimeout(Duration.fromSeconds(60))) .checksum(false); - DistributedLogClient dlClient = (DistributedLogClientImpl) dlClientBuilder.build(); + DistributedLogClient dlClient = dlClientBuilder.build(); Await.result(dlClient.write(name, ByteBuffer.wrap(("1").getBytes()))); dlClient.close(); http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/904b8986/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestDistributedLogService.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestDistributedLogService.java b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestDistributedLogService.java index ed456b9..17fae4a 100644 --- a/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestDistributedLogService.java +++ b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestDistributedLogService.java @@ -600,6 +600,7 @@ public class TestDistributedLogService extends TestDistributedLogBase { ByteBuffer.wrap("test".getBytes()), new NullStatsLogger(), new NullStatsLogger(), + new IdentityStreamPartitionConverter(), new ServerConfiguration(), (byte)0, checksum, http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/904b8986/distributedlog-service/src/test/java/com/twitter/distributedlog/service/stream/TestStreamOp.java ---------------------------------------------------------------------- diff --git a/distributedlog-service/src/test/java/com/twitter/distributedlog/service/stream/TestStreamOp.java b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/stream/TestStreamOp.java index 93d900f..41b4c69 100644 --- a/distributedlog-service/src/test/java/com/twitter/distributedlog/service/stream/TestStreamOp.java +++ b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/stream/TestStreamOp.java @@ -24,7 +24,7 @@ import com.twitter.distributedlog.acl.DefaultAccessControlManager; import com.twitter.distributedlog.exceptions.InternalServerException; import com.twitter.distributedlog.service.ResponseUtils; import com.twitter.distributedlog.service.config.ServerConfiguration; -import com.twitter.distributedlog.service.stream.WriteOp; +import com.twitter.distributedlog.service.streamset.IdentityStreamPartitionConverter; import com.twitter.distributedlog.thrift.service.StatusCode; import com.twitter.distributedlog.thrift.service.WriteResponse; import com.twitter.distributedlog.util.Sequencer; @@ -67,6 +67,7 @@ public class TestStreamOp { ByteBuffer.wrap("test".getBytes()), new NullStatsLogger(), new NullStatsLogger(), + new IdentityStreamPartitionConverter(), new ServerConfiguration(), (byte)0, null,