Repository: cassandra Updated Branches: refs/heads/trunk 8d32d9100 -> d38694afe
Add coordinator write metric per CF patch by Sumanth Pasupuleti; reviewed by Jay Zhuang and jasobrown for CASSANDRA-14232 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/d38694af Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/d38694af Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/d38694af Branch: refs/heads/trunk Commit: d38694afe209215f8fc562ca384b82766147eadf Parents: 8d32d91 Author: Sumanth Pasupuleti <sumanth.pasupuleti...@gmail.com> Authored: Tue Mar 6 15:57:01 2018 -0800 Committer: Jason Brown <jasedbr...@gmail.com> Committed: Wed Mar 28 12:55:49 2018 -0700 ---------------------------------------------------------------------- CHANGES.txt | 1 + NEWS.txt | 1 + doc/source/operating/metrics.rst | 1 + .../apache/cassandra/metrics/TableMetrics.java | 3 + .../apache/cassandra/service/StorageProxy.java | 27 ++- .../cassandra/metrics/TableMetricsTest.java | 176 +++++++++++++++++++ 6 files changed, 208 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/d38694af/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index daafa0a..5763720 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 4.0 + * Add coordinator write metric per CF (CASSANDRA-14232) * Fix scheduling of speculative retry threshold recalculation (CASSANDRA-14338) * Add support for hybrid MIN(), MAX() speculative retry policies (CASSANDRA-14293) * Correct and clarify SSLFactory.getSslContext method and call sites (CASSANDRA-14314) http://git-wip-us.apache.org/repos/asf/cassandra/blob/d38694af/NEWS.txt ---------------------------------------------------------------------- diff --git a/NEWS.txt b/NEWS.txt index bcac4ea..f8e3ca6 100644 --- a/NEWS.txt +++ b/NEWS.txt @@ -68,6 +68,7 @@ New features See nodetool and fqltool help text for more information. - SSTableDump now supports the -l option to output each partition as it's own json object See CASSANDRA-13848 for more detail + - Metric for coordinator writes per table has been added. See CASSANDRA-14232 Upgrading --------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/d38694af/doc/source/operating/metrics.rst ---------------------------------------------------------------------- diff --git a/doc/source/operating/metrics.rst b/doc/source/operating/metrics.rst index 345fc3e..325395c 100644 --- a/doc/source/operating/metrics.rst +++ b/doc/source/operating/metrics.rst @@ -95,6 +95,7 @@ ReadLatency Latency Local read latency for th RangeLatency Latency Local range scan latency for this table. WriteLatency Latency Local write latency for this table. CoordinatorReadLatency Timer Coordinator read latency for this table. +CoordinatorWriteLatency Timer Coordinator write latency for this table. CoordinatorScanLatency Timer Coordinator range scan latency for this table. PendingFlushes Counter Estimated number of flush tasks pending for this table. BytesFlushed Counter Total number of bytes flushed since server [re]start. http://git-wip-us.apache.org/repos/asf/cassandra/blob/d38694af/src/java/org/apache/cassandra/metrics/TableMetrics.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/metrics/TableMetrics.java b/src/java/org/apache/cassandra/metrics/TableMetrics.java index 7ce2f16..d8cb18e 100644 --- a/src/java/org/apache/cassandra/metrics/TableMetrics.java +++ b/src/java/org/apache/cassandra/metrics/TableMetrics.java @@ -184,6 +184,7 @@ public class TableMetrics public final Timer coordinatorReadLatency; public final Timer coordinatorScanLatency; + public final Timer coordinatorWriteLatency; /** Time spent waiting for free memtable space, either on- or off-heap */ public final Histogram waitingOnFreeMemtableSpace; @@ -791,6 +792,7 @@ public class TableMetrics colUpdateTimeDeltaHistogram = createTableHistogram("ColUpdateTimeDeltaHistogram", cfs.keyspace.metric.colUpdateTimeDeltaHistogram, false); coordinatorReadLatency = Metrics.timer(factory.createMetricName("CoordinatorReadLatency")); coordinatorScanLatency = Metrics.timer(factory.createMetricName("CoordinatorScanLatency")); + coordinatorWriteLatency = Metrics.timer(factory.createMetricName("CoordinatorWriteLatency")); waitingOnFreeMemtableSpace = Metrics.histogram(factory.createMetricName("WaitingOnFreeMemtableSpace"), false); // We do not want to capture view mutation specific metrics for a view @@ -880,6 +882,7 @@ public class TableMetrics Metrics.remove(factory.createMetricName("KeyCacheHitRate"), aliasFactory.createMetricName("KeyCacheHitRate")); Metrics.remove(factory.createMetricName("CoordinatorReadLatency"), aliasFactory.createMetricName("CoordinatorReadLatency")); Metrics.remove(factory.createMetricName("CoordinatorScanLatency"), aliasFactory.createMetricName("CoordinatorScanLatency")); + Metrics.remove(factory.createMetricName("CoordinatorWriteLatency"), aliasFactory.createMetricName("CoordinatorWriteLatency")); Metrics.remove(factory.createMetricName("WaitingOnFreeMemtableSpace"), aliasFactory.createMetricName("WaitingOnFreeMemtableSpace")); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/d38694af/src/java/org/apache/cassandra/service/StorageProxy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java index a47c4c6..bacc3a8 100644 --- a/src/java/org/apache/cassandra/service/StorageProxy.java +++ b/src/java/org/apache/cassandra/service/StorageProxy.java @@ -757,6 +757,7 @@ public class StorageProxy implements StorageProxyMBean long latency = System.nanoTime() - startTime; writeMetrics.addNano(latency); writeMetricsMap.get(consistency_level).addNano(latency); + updateCoordinatorWriteLatencyTableMetric(mutations, latency); } } @@ -1030,7 +1031,7 @@ public class StorageProxy implements StorageProxyMBean long latency = System.nanoTime() - startTime; writeMetrics.addNano(latency); writeMetricsMap.get(consistency_level).addNano(latency); - + updateCoordinatorWriteLatencyTableMetric(mutations, latency); } } @@ -1039,6 +1040,30 @@ public class StorageProxy implements StorageProxyMBean return replica.equals(FBUtilities.getBroadcastAddressAndPort()); } + private static void updateCoordinatorWriteLatencyTableMetric(Collection<? extends IMutation> mutations, long latency) + { + if (null == mutations) + { + return; + } + + try + { + //TODO: Avoid giving same latency number for each CF in each mutation in a given set of mutations + //We could potentially pass a callback into performWrite. And add callback provision for mutateCounter or mutateAtomically (sendToHintedEndPoints) + //However, Trade off between write metric per CF accuracy vs performance hit due to callbacks. Similar issue exists with CoordinatorReadLatency metric. + mutations.forEach(mutation -> { + mutation.getTableIds().forEach(tableId -> { + Keyspace.open(mutation.getKeyspaceName()).getColumnFamilyStore(tableId).metric.coordinatorWriteLatency.update(latency, TimeUnit.NANOSECONDS); + }); + }); + } + catch (Exception ex) + { + logger.warn("Exception occurred updating coordinatorWriteLatency metric", ex); + } + } + private static void syncWriteToBatchlog(Collection<Mutation> mutations, Collection<InetAddressAndPort> endpoints, UUID uuid, long queryStartNanoTime) throws WriteTimeoutException, WriteFailureException { http://git-wip-us.apache.org/repos/asf/cassandra/blob/d38694af/test/unit/org/apache/cassandra/metrics/TableMetricsTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/metrics/TableMetricsTest.java b/test/unit/org/apache/cassandra/metrics/TableMetricsTest.java new file mode 100644 index 0000000..a3ae956 --- /dev/null +++ b/test/unit/org/apache/cassandra/metrics/TableMetricsTest.java @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.metrics; + +import java.io.IOException; + +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.runner.RunWith; + +import com.datastax.driver.core.BatchStatement; +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.PreparedStatement; +import com.datastax.driver.core.Session; +import org.apache.cassandra.OrderedJUnit4ClassRunner; +import org.apache.cassandra.SchemaLoader; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.exceptions.ConfigurationException; +import org.apache.cassandra.schema.Schema; +import org.apache.cassandra.service.EmbeddedCassandraService; + +import static junit.framework.Assert.assertEquals; +import static junit.framework.Assert.assertTrue; + +@RunWith(OrderedJUnit4ClassRunner.class) +public class TableMetricsTest extends SchemaLoader +{ + + private static Session session; + + private static final String KEYSPACE = "junit"; + private static final String TABLE = "tablemetricstest"; + private static final String COUNTER_TABLE = "tablemetricscountertest"; + + @BeforeClass() + public static void setup() throws ConfigurationException, IOException + { + Schema.instance.clear(); + + EmbeddedCassandraService cassandra = new EmbeddedCassandraService(); + cassandra.start(); + + Cluster cluster = Cluster.builder().addContactPoint("127.0.0.1").withPort(DatabaseDescriptor.getNativeTransportPort()).build(); + session = cluster.connect(); + + session.execute(String.format("CREATE KEYSPACE IF NOT EXISTS %s WITH replication = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 };", KEYSPACE)); + session.execute(String.format("CREATE TABLE IF NOT EXISTS %s.%s (id_c counter, id int, val text, PRIMARY KEY(id, val));", KEYSPACE, COUNTER_TABLE)); + } + + private ColumnFamilyStore recreateTable() + { + session.execute(String.format("DROP TABLE IF EXISTS %s.%s", KEYSPACE, TABLE)); + session.execute(String.format("CREATE TABLE IF NOT EXISTS %s.%s (id int, val1 text, val2 text, PRIMARY KEY(id, val1));", KEYSPACE, TABLE)); + return ColumnFamilyStore.getIfExists(KEYSPACE, TABLE); + } + + private void executeBatch(boolean isLogged, int distinctPartitions, int statementsPerPartition) + { + BatchStatement.Type batchType; + PreparedStatement ps = session.prepare(String.format("INSERT INTO %s.%s (id, val1, val2) VALUES (?, ?, ?);", KEYSPACE, TABLE)); + + if (isLogged) + { + batchType = BatchStatement.Type.LOGGED; + } + else + { + batchType = BatchStatement.Type.UNLOGGED; + } + + BatchStatement batch = new BatchStatement(batchType); + + for (int i=0; i<distinctPartitions; i++) + { + for (int j=0; j<statementsPerPartition; j++) + { + batch.add(ps.bind(i, j + "a", "b")); + } + } + + session.execute(batch); + } + + @Test + public void testRegularStatementsExecuted() + { + ColumnFamilyStore cfs = recreateTable(); + assertEquals(0, cfs.metric.coordinatorWriteLatency.getCount()); + assertTrue(cfs.metric.coordinatorWriteLatency.getMeanRate() == 0); + + for (int i = 0; i < 10; i++) + { + session.execute(String.format("INSERT INTO %s.%s (id, val1, val2) VALUES (%d, '%s', '%s')", KEYSPACE, TABLE, i, "val" + i, "val" + i)); + } + + assertEquals(10, cfs.metric.coordinatorWriteLatency.getCount()); + assertTrue(cfs.metric.coordinatorWriteLatency.getMeanRate() > 0); + } + + @Test + public void testPreparedStatementsExecuted() + { + ColumnFamilyStore cfs = recreateTable(); + PreparedStatement metricsStatement = session.prepare(String.format("INSERT INTO %s.%s (id, val1, val2) VALUES (?, ?, ?)", KEYSPACE, TABLE)); + + assertEquals(0, cfs.metric.coordinatorWriteLatency.getCount()); + assertTrue(cfs.metric.coordinatorWriteLatency.getMeanRate() == 0); + + for (int i = 0; i < 10; i++) + { + session.execute(metricsStatement.bind(i, "val" + i, "val" + i)); + } + + assertEquals(10, cfs.metric.coordinatorWriteLatency.getCount()); + assertTrue(cfs.metric.coordinatorWriteLatency.getMeanRate() > 0); + } + + @Test + public void testLoggedPartitionsPerBatch() + { + ColumnFamilyStore cfs = recreateTable(); + assertEquals(0, cfs.metric.coordinatorWriteLatency.getCount()); + assertTrue(cfs.metric.coordinatorWriteLatency.getMeanRate() == 0); + + executeBatch(true, 10, 2); + assertEquals(10, cfs.metric.coordinatorWriteLatency.getCount()); + + executeBatch(true, 20, 2); + assertEquals(30, cfs.metric.coordinatorWriteLatency.getCount()); + assertTrue(cfs.metric.coordinatorWriteLatency.getMeanRate() > 0); + } + + @Test + public void testUnloggedPartitionsPerBatch() + { + ColumnFamilyStore cfs = recreateTable(); + assertEquals(0, cfs.metric.coordinatorWriteLatency.getCount()); + assertTrue(cfs.metric.coordinatorWriteLatency.getMeanRate() == 0); + + executeBatch(false, 5, 3); + assertEquals(5, cfs.metric.coordinatorWriteLatency.getCount()); + + executeBatch(false, 25, 2); + assertEquals(30, cfs.metric.coordinatorWriteLatency.getCount()); + assertTrue(cfs.metric.coordinatorWriteLatency.getMeanRate() > 0); + + } + + @Test + public void testCounterStatement() + { + ColumnFamilyStore cfs = ColumnFamilyStore.getIfExists(KEYSPACE, COUNTER_TABLE); + assertEquals(0, cfs.metric.coordinatorWriteLatency.getCount()); + assertTrue(cfs.metric.coordinatorWriteLatency.getMeanRate() == 0); + session.execute(String.format("UPDATE %s.%s SET id_c = id_c + 1 WHERE id = 1 AND val = 'val1'", KEYSPACE, COUNTER_TABLE)); + assertEquals(1, cfs.metric.coordinatorWriteLatency.getCount()); + assertTrue(cfs.metric.coordinatorWriteLatency.getMeanRate() > 0); + } +} \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org