This is an automated email from the ASF dual-hosted git repository. rustyrazorblade pushed a commit to branch client-metrics in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit fc9fd13bdf22d3ec91fe9e320d5c692193c8890e Author: Jon Haddad j...@jonhaddad.com <j...@jonhaddad.com> AuthorDate: Wed Apr 8 14:50:18 2020 -0700 reworked the metric into the frame encoder / decoder instead of the pipeline --- .../metrics/ClientRequestSizeMetrics.java | 4 +- .../transport/ClientRequestSizeMetricsHandler.java | 58 ---------------------- src/java/org/apache/cassandra/transport/Frame.java | 8 +++ .../org/apache/cassandra/transport/Server.java | 5 -- .../ClientRequestSizeMetricsHandlerTest.java | 55 ++------------------ 5 files changed, 14 insertions(+), 116 deletions(-) diff --git a/src/java/org/apache/cassandra/metrics/ClientRequestSizeMetrics.java b/src/java/org/apache/cassandra/metrics/ClientRequestSizeMetrics.java index c1d497b..41fb162 100644 --- a/src/java/org/apache/cassandra/metrics/ClientRequestSizeMetrics.java +++ b/src/java/org/apache/cassandra/metrics/ClientRequestSizeMetrics.java @@ -31,6 +31,6 @@ public class ClientRequestSizeMetrics private static final String TYPE = "ClientRequestSize"; public static final Counter totalBytesRead = Metrics.counter(DefaultNameFactory.createMetricName(TYPE, "IncomingBytes", null)); public static final Counter totalBytesWritten = Metrics.counter(DefaultNameFactory.createMetricName(TYPE, "OutgoingBytes", null)); - public static final Histogram bytesReadPerQueryHistogram = Metrics.histogram(DefaultNameFactory.createMetricName(TYPE, "BytesRecievedPerFrame", null), true); - public static final Histogram bytesWrittenPerQueryHistogram = Metrics.histogram(DefaultNameFactory.createMetricName(TYPE, "BytesTransmittedPerFrame", null), true); + public static final Histogram bytesRecievedPerFrame = Metrics.histogram(DefaultNameFactory.createMetricName(TYPE, "BytesRecievedPerFrame", null), true); + public static final Histogram bytesTransmittedPerFrame = Metrics.histogram(DefaultNameFactory.createMetricName(TYPE, "BytesTransmittedPerFrame", null), true); } diff --git a/src/java/org/apache/cassandra/transport/ClientRequestSizeMetricsHandler.java b/src/java/org/apache/cassandra/transport/ClientRequestSizeMetricsHandler.java deleted file mode 100644 index 33fe034..0000000 --- a/src/java/org/apache/cassandra/transport/ClientRequestSizeMetricsHandler.java +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.cassandra.transport; - -import java.util.List; - -import io.netty.buffer.ByteBuf; -import io.netty.channel.ChannelHandler; -import io.netty.channel.ChannelHandlerContext; -import io.netty.handler.codec.MessageToMessageCodec; -import org.apache.cassandra.metrics.ClientRequestSizeMetrics; - -/** - * Records the number of bytes read off of and written to the network - * Normally we only use the MessageToMessageCodec to apply a transformation in the Netty messaging pipeline - * Here we need to ensure the ByteBuf sticks around past this Handler, so we need to remember to call retain() - */ -@ChannelHandler.Sharable -public class ClientRequestSizeMetricsHandler extends MessageToMessageCodec<ByteBuf, ByteBuf> -{ - @Override - public void decode(ChannelHandlerContext ctx, ByteBuf buf, List<Object> results) - { - final long messageSize = buf.writerIndex() - buf.readerIndex(); - ClientRequestSizeMetrics.totalBytesRead.inc(messageSize); - ClientRequestSizeMetrics.bytesReadPerQueryHistogram.update(messageSize); - // the buffer needs to be retained here due to Netty's internal requirements. Without it the buffer may be freed - buf.retain(); - results.add(buf); - } - - @Override - public void encode(ChannelHandlerContext ctx, ByteBuf buf, List<Object> results) - { - final long messageSize = buf.writerIndex() - buf.readerIndex(); - ClientRequestSizeMetrics.totalBytesWritten.inc(messageSize); - ClientRequestSizeMetrics.bytesWrittenPerQueryHistogram.update(messageSize); - // please see the comment above regarding retaining the ByteBuf + Netty - buf.retain(); - results.add(buf); - } -} diff --git a/src/java/org/apache/cassandra/transport/Frame.java b/src/java/org/apache/cassandra/transport/Frame.java index 8163d7a..4a20a50 100644 --- a/src/java/org/apache/cassandra/transport/Frame.java +++ b/src/java/org/apache/cassandra/transport/Frame.java @@ -32,6 +32,7 @@ import io.netty.handler.codec.MessageToMessageEncoder; import io.netty.util.Attribute; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.exceptions.InvalidRequestException; +import org.apache.cassandra.metrics.ClientRequestSizeMetrics; import org.apache.cassandra.transport.frame.FrameBodyTransformer; import org.apache.cassandra.transport.messages.ErrorMessage; @@ -222,6 +223,9 @@ public class Frame if (buffer.readableBytes() < frameLength) return null; + ClientRequestSizeMetrics.totalBytesRead.inc(frameLength); + ClientRequestSizeMetrics.bytesRecievedPerFrame.update(frameLength); + // extract body ByteBuf body = buffer.slice(idx, (int) bodyLength); body.retain(); @@ -301,6 +305,10 @@ public class Frame results.add(header); results.add(frame.body); + + int messageSize = header.writerIndex() + frame.body.writerIndex(); + ClientRequestSizeMetrics.totalBytesWritten.inc(messageSize); + ClientRequestSizeMetrics.bytesTransmittedPerFrame.update(messageSize); } } diff --git a/src/java/org/apache/cassandra/transport/Server.java b/src/java/org/apache/cassandra/transport/Server.java index 64a110b..43b024f 100644 --- a/src/java/org/apache/cassandra/transport/Server.java +++ b/src/java/org/apache/cassandra/transport/Server.java @@ -416,7 +416,6 @@ public class Server implements CassandraDaemon.Server private static final Frame.Encoder frameEncoder = new Frame.Encoder(); private static final Message.ExceptionHandler exceptionHandler = new Message.ExceptionHandler(); private static final ConnectionLimitHandler connectionLimitHandler = new ConnectionLimitHandler(); - private static final ClientRequestSizeMetricsHandler clientRequestSizeMetricsHandler = new ClientRequestSizeMetricsHandler(); private final Server server; @@ -453,10 +452,6 @@ public class Server implements CassandraDaemon.Server //pipeline.addLast("debug", new LoggingHandler()); - // Handler to log size of client requests and responses - // we need this to come after the connection limit handler so it can drop connections first - pipeline.addLast("requestMetricsHandler", clientRequestSizeMetricsHandler); - pipeline.addLast("frameDecoder", new Frame.Decoder(server.connectionFactory)); pipeline.addLast("frameEncoder", frameEncoder); diff --git a/test/unit/org/apache/cassandra/transport/ClientRequestSizeMetricsHandlerTest.java b/test/unit/org/apache/cassandra/transport/ClientRequestSizeMetricsHandlerTest.java index 4d01763..1a8be13 100644 --- a/test/unit/org/apache/cassandra/transport/ClientRequestSizeMetricsHandlerTest.java +++ b/test/unit/org/apache/cassandra/transport/ClientRequestSizeMetricsHandlerTest.java @@ -18,7 +18,6 @@ package org.apache.cassandra.transport; -import java.util.LinkedList; import java.util.List; import org.junit.Before; @@ -26,7 +25,6 @@ import org.junit.Test; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; -import io.netty.buffer.PooledByteBufAllocator; import org.apache.cassandra.cql3.CQLTester; import org.apache.cassandra.metrics.ClientRequestSizeMetrics; @@ -37,10 +35,6 @@ import static org.assertj.core.api.Assertions.assertThat; */ public class ClientRequestSizeMetricsHandlerTest extends CQLTester { - private ClientRequestSizeMetricsHandler handler; - private ByteBufAllocator alloc; - private ByteBuf buf; - private List<Object> result; private long totalBytesReadStart; private long totalBytesWrittenStart; @@ -50,17 +44,11 @@ public class ClientRequestSizeMetricsHandlerTest extends CQLTester @Before public void setUp() { - handler = new ClientRequestSizeMetricsHandler(); - alloc = PooledByteBufAllocator.DEFAULT; - buf = alloc.buffer(1024); - result = new LinkedList<>(); - buf.writeInt(1); - totalBytesReadStart = ClientRequestSizeMetrics.totalBytesRead.getCount(); totalBytesWrittenStart = ClientRequestSizeMetrics.totalBytesWritten.getCount(); - totalBytesReadHistoCount = ClientRequestSizeMetrics.bytesReadPerQueryHistogram.getCount(); - totalBytesWrittenHistoCount = ClientRequestSizeMetrics.bytesWrittenPerQueryHistogram.getCount(); + totalBytesReadHistoCount = ClientRequestSizeMetrics.bytesRecievedPerFrame.getCount(); + totalBytesWrittenHistoCount = ClientRequestSizeMetrics.bytesTransmittedPerFrame.getCount(); } @Test @@ -70,43 +58,8 @@ public class ClientRequestSizeMetricsHandlerTest extends CQLTester assertThat(ClientRequestSizeMetrics.totalBytesRead.getCount()).isGreaterThan(totalBytesReadStart); assertThat(ClientRequestSizeMetrics.totalBytesWritten.getCount()).isGreaterThan(totalBytesWrittenStart); - assertThat(ClientRequestSizeMetrics.bytesReadPerQueryHistogram.getCount()).isGreaterThan(totalBytesReadStart); - assertThat(ClientRequestSizeMetrics.bytesWrittenPerQueryHistogram.getCount()).isGreaterThan(totalBytesWrittenStart); - } - - /** - * Ensures we work with the right metrics within the ClientRequestSizeMetricsHandler - */ - @Test - public void testBytesRead() - { - int beforeRefCount = buf.refCnt(); - handler.decode(null, buf, result); - - assertThat(ClientRequestSizeMetrics.totalBytesRead.getCount()).isEqualTo(totalBytesReadStart + Integer.BYTES); - assertThat(ClientRequestSizeMetrics.bytesReadPerQueryHistogram.getCount()).isEqualTo(totalBytesReadHistoCount + 1); - - // make sure we didn't touch the write metrics - assertThat(ClientRequestSizeMetrics.totalBytesWritten.getCount()).isEqualTo(totalBytesWrittenStart); - assertThat(ClientRequestSizeMetrics.bytesWrittenPerQueryHistogram.getCount()).isEqualTo(totalBytesWrittenHistoCount); - - // we should have incremented the reference count (netty ByteBuf requirement) - assertThat(buf.refCnt()).isEqualTo(beforeRefCount + 1); + assertThat(ClientRequestSizeMetrics.bytesRecievedPerFrame.getCount()).isGreaterThan(totalBytesReadStart); + assertThat(ClientRequestSizeMetrics.bytesTransmittedPerFrame.getCount()).isGreaterThan(totalBytesWrittenStart); } - @Test - public void testBytesWritten() - { - int beforeRefCount = buf.refCnt(); - handler.encode(null, buf, result); - - assertThat(ClientRequestSizeMetrics.totalBytesWritten.getCount()).isEqualTo(totalBytesWrittenStart + Integer.BYTES); - assertThat(ClientRequestSizeMetrics.bytesWrittenPerQueryHistogram.getCount()).isEqualTo(totalBytesWrittenHistoCount + 1); - - // make sure we didn't touch the read metrics - assertThat(ClientRequestSizeMetrics.totalBytesRead.getCount()).isEqualTo(totalBytesReadStart); - assertThat(ClientRequestSizeMetrics.bytesReadPerQueryHistogram.getCount()).isEqualTo(totalBytesReadHistoCount); - - assertThat(buf.refCnt()).isEqualTo(beforeRefCount + 1); - } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org