This is an automated email from the ASF dual-hosted git repository. rustyrazorblade pushed a commit to branch client-metrics in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit 1fea9a507079cde5c9e87f8c70c8f33cb2c7486c Author: Jon Haddad j...@jonhaddad.com <j...@jonhaddad.com> AuthorDate: Tue Apr 7 09:57:31 2020 -0700 Add client request size metrics Patch by Jon Haddad for CASSANDRA-15704 --- .../metrics/ClientRequestSizeMetrics.java | 36 +++++++ .../transport/ClientRequestSizeMetricsHandler.java | 58 +++++++++++ .../org/apache/cassandra/transport/Server.java | 5 + .../ClientRequestSizeMetricsHandlerTest.java | 112 +++++++++++++++++++++ 4 files changed, 211 insertions(+) diff --git a/src/java/org/apache/cassandra/metrics/ClientRequestSizeMetrics.java b/src/java/org/apache/cassandra/metrics/ClientRequestSizeMetrics.java new file mode 100644 index 0000000..cf26e55 --- /dev/null +++ b/src/java/org/apache/cassandra/metrics/ClientRequestSizeMetrics.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.metrics; + + +import static org.apache.cassandra.metrics.CassandraMetricsRegistry.Metrics; + +import com.codahale.metrics.Counter; +import com.codahale.metrics.Histogram; + +/** + * Metrics to track the size of incoming and outgoing bytes at Cassandra server. + */ +public class ClientRequestSizeMetrics +{ + private static final String TYPE = "ClientRequestSize"; + public static final Counter totalBytesRead = Metrics.counter(DefaultNameFactory.createMetricName(TYPE, "IncomingBytes", null)); + public static final Counter totalBytesWritten = Metrics.counter(DefaultNameFactory.createMetricName(TYPE, "OutgoingBytes", null)); + public static final Histogram bytesReadPerQueryHistogram = Metrics.histogram(DefaultNameFactory.createMetricName(TYPE, "BytesReadPerQuery", null), true); + public static final Histogram bytesWrittenPerQueryHistogram = Metrics.histogram(DefaultNameFactory.createMetricName(TYPE, "BytesWrittenPerQuery", null), true); +} diff --git a/src/java/org/apache/cassandra/transport/ClientRequestSizeMetricsHandler.java b/src/java/org/apache/cassandra/transport/ClientRequestSizeMetricsHandler.java new file mode 100644 index 0000000..33fe034 --- /dev/null +++ b/src/java/org/apache/cassandra/transport/ClientRequestSizeMetricsHandler.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.transport; + +import java.util.List; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.MessageToMessageCodec; +import org.apache.cassandra.metrics.ClientRequestSizeMetrics; + +/** + * Records the number of bytes read off of and written to the network + * Normally we only use the MessageToMessageCodec to apply a transformation in the Netty messaging pipeline + * Here we need to ensure the ByteBuf sticks around past this Handler, so we need to remember to call retain() + */ +@ChannelHandler.Sharable +public class ClientRequestSizeMetricsHandler extends MessageToMessageCodec<ByteBuf, ByteBuf> +{ + @Override + public void decode(ChannelHandlerContext ctx, ByteBuf buf, List<Object> results) + { + final long messageSize = buf.writerIndex() - buf.readerIndex(); + ClientRequestSizeMetrics.totalBytesRead.inc(messageSize); + ClientRequestSizeMetrics.bytesReadPerQueryHistogram.update(messageSize); + // the buffer needs to be retained here due to Netty's internal requirements. Without it the buffer may be freed + buf.retain(); + results.add(buf); + } + + @Override + public void encode(ChannelHandlerContext ctx, ByteBuf buf, List<Object> results) + { + final long messageSize = buf.writerIndex() - buf.readerIndex(); + ClientRequestSizeMetrics.totalBytesWritten.inc(messageSize); + ClientRequestSizeMetrics.bytesWrittenPerQueryHistogram.update(messageSize); + // please see the comment above regarding retaining the ByteBuf + Netty + buf.retain(); + results.add(buf); + } +} diff --git a/src/java/org/apache/cassandra/transport/Server.java b/src/java/org/apache/cassandra/transport/Server.java index 43b024f..64a110b 100644 --- a/src/java/org/apache/cassandra/transport/Server.java +++ b/src/java/org/apache/cassandra/transport/Server.java @@ -416,6 +416,7 @@ public class Server implements CassandraDaemon.Server private static final Frame.Encoder frameEncoder = new Frame.Encoder(); private static final Message.ExceptionHandler exceptionHandler = new Message.ExceptionHandler(); private static final ConnectionLimitHandler connectionLimitHandler = new ConnectionLimitHandler(); + private static final ClientRequestSizeMetricsHandler clientRequestSizeMetricsHandler = new ClientRequestSizeMetricsHandler(); private final Server server; @@ -452,6 +453,10 @@ public class Server implements CassandraDaemon.Server //pipeline.addLast("debug", new LoggingHandler()); + // Handler to log size of client requests and responses + // we need this to come after the connection limit handler so it can drop connections first + pipeline.addLast("requestMetricsHandler", clientRequestSizeMetricsHandler); + pipeline.addLast("frameDecoder", new Frame.Decoder(server.connectionFactory)); pipeline.addLast("frameEncoder", frameEncoder); diff --git a/test/unit/org/apache/cassandra/transport/ClientRequestSizeMetricsHandlerTest.java b/test/unit/org/apache/cassandra/transport/ClientRequestSizeMetricsHandlerTest.java new file mode 100644 index 0000000..4d01763 --- /dev/null +++ b/test/unit/org/apache/cassandra/transport/ClientRequestSizeMetricsHandlerTest.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.transport; + +import java.util.LinkedList; +import java.util.List; + +import org.junit.Before; +import org.junit.Test; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; +import io.netty.buffer.PooledByteBufAllocator; +import org.apache.cassandra.cql3.CQLTester; +import org.apache.cassandra.metrics.ClientRequestSizeMetrics; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Ensures we properly account for bytes read from and to clients + */ +public class ClientRequestSizeMetricsHandlerTest extends CQLTester +{ + private ClientRequestSizeMetricsHandler handler; + private ByteBufAllocator alloc; + private ByteBuf buf; + private List<Object> result; + private long totalBytesReadStart; + private long totalBytesWrittenStart; + + private long totalBytesReadHistoCount; + private long totalBytesWrittenHistoCount; + + @Before + public void setUp() + { + handler = new ClientRequestSizeMetricsHandler(); + alloc = PooledByteBufAllocator.DEFAULT; + buf = alloc.buffer(1024); + result = new LinkedList<>(); + buf.writeInt(1); + + totalBytesReadStart = ClientRequestSizeMetrics.totalBytesRead.getCount(); + totalBytesWrittenStart = ClientRequestSizeMetrics.totalBytesWritten.getCount(); + + totalBytesReadHistoCount = ClientRequestSizeMetrics.bytesReadPerQueryHistogram.getCount(); + totalBytesWrittenHistoCount = ClientRequestSizeMetrics.bytesWrittenPerQueryHistogram.getCount(); + } + + @Test + public void testReadAndWriteMetricsAreRecordedDuringNativeRequests() throws Throwable + { + executeNet("SELECT * from system.peers"); + + assertThat(ClientRequestSizeMetrics.totalBytesRead.getCount()).isGreaterThan(totalBytesReadStart); + assertThat(ClientRequestSizeMetrics.totalBytesWritten.getCount()).isGreaterThan(totalBytesWrittenStart); + assertThat(ClientRequestSizeMetrics.bytesReadPerQueryHistogram.getCount()).isGreaterThan(totalBytesReadStart); + assertThat(ClientRequestSizeMetrics.bytesWrittenPerQueryHistogram.getCount()).isGreaterThan(totalBytesWrittenStart); + } + + /** + * Ensures we work with the right metrics within the ClientRequestSizeMetricsHandler + */ + @Test + public void testBytesRead() + { + int beforeRefCount = buf.refCnt(); + handler.decode(null, buf, result); + + assertThat(ClientRequestSizeMetrics.totalBytesRead.getCount()).isEqualTo(totalBytesReadStart + Integer.BYTES); + assertThat(ClientRequestSizeMetrics.bytesReadPerQueryHistogram.getCount()).isEqualTo(totalBytesReadHistoCount + 1); + + // make sure we didn't touch the write metrics + assertThat(ClientRequestSizeMetrics.totalBytesWritten.getCount()).isEqualTo(totalBytesWrittenStart); + assertThat(ClientRequestSizeMetrics.bytesWrittenPerQueryHistogram.getCount()).isEqualTo(totalBytesWrittenHistoCount); + + // we should have incremented the reference count (netty ByteBuf requirement) + assertThat(buf.refCnt()).isEqualTo(beforeRefCount + 1); + } + + @Test + public void testBytesWritten() + { + int beforeRefCount = buf.refCnt(); + handler.encode(null, buf, result); + + assertThat(ClientRequestSizeMetrics.totalBytesWritten.getCount()).isEqualTo(totalBytesWrittenStart + Integer.BYTES); + assertThat(ClientRequestSizeMetrics.bytesWrittenPerQueryHistogram.getCount()).isEqualTo(totalBytesWrittenHistoCount + 1); + + // make sure we didn't touch the read metrics + assertThat(ClientRequestSizeMetrics.totalBytesRead.getCount()).isEqualTo(totalBytesReadStart); + assertThat(ClientRequestSizeMetrics.bytesReadPerQueryHistogram.getCount()).isEqualTo(totalBytesReadHistoCount); + + assertThat(buf.refCnt()).isEqualTo(beforeRefCount + 1); + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org