This is an automated email from the ASF dual-hosted git repository. blerer pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push: new c4e4e93 Fix Streaming Repair metrics c4e4e93 is described below commit c4e4e9388b229a0db92c16029fd647909cec5737 Author: Benjamin Lerer <b.le...@gmail.com> AuthorDate: Wed Mar 24 17:23:51 2021 +0100 Fix Streaming Repair metrics patch by Benjamin Lerer; reviewed by Ekaterina Dimitrova for CASSANDRA-16190 --- CHANGES.txt | 1 + .../apache/cassandra/streaming/StreamSession.java | 17 +- .../test/metrics/StreamingMetricsTest.java | 402 +++++++++++++++++---- 3 files changed, 349 insertions(+), 71 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index 99c8c97..590a4a4 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 4.0-rc1 + * Fix Streaming Repair metrics (CASSANDRA-16190) * Scheduled (delayed) schema pull tasks should not run after MIGRATION stage shutdown during decommission (CASSANDRA-16495) * When behind a firewall trunk is not buildable, need to allow overriding URLs (CASSANDRA-16563) * Make sure sstables with moved starts are removed correctly in LeveledGenerations (CASSANDRA-16552) diff --git a/src/java/org/apache/cassandra/streaming/StreamSession.java b/src/java/org/apache/cassandra/streaming/StreamSession.java index a3ffef1..3a32834 100644 --- a/src/java/org/apache/cassandra/streaming/StreamSession.java +++ b/src/java/org/apache/cassandra/streaming/StreamSession.java @@ -615,21 +615,11 @@ public class StreamSession implements IEndpointStateChangeSubscriber state(State.PREPARING); PrepareSynMessage prepare = new PrepareSynMessage(); prepare.requests.addAll(requests); - long totalBytesToStream = 0; - long totalSSTablesStreamed = 0; for (StreamTransferTask task : transfers.values()) { - totalBytesToStream += task.getTotalSize(); - totalSSTablesStreamed += task.getTotalNumberOfFiles(); prepare.summaries.add(task.getSummary()); } - if(StreamOperation.REPAIR == getStreamOperation()) - { - StreamingMetrics.totalOutgoingRepairBytes.inc(totalBytesToStream); - StreamingMetrics.totalOutgoingRepairSSTables.inc(totalSSTablesStreamed); - } - messageSender.sendMessage(prepare); } @@ -767,6 +757,13 @@ public class StreamSession implements IEndpointStateChangeSubscriber long headerSize = message.stream.getEstimatedSize(); StreamingMetrics.totalOutgoingBytes.inc(headerSize); metrics.outgoingBytes.inc(headerSize); + + if(StreamOperation.REPAIR == getStreamOperation()) + { + StreamingMetrics.totalOutgoingRepairBytes.inc(headerSize); + StreamingMetrics.totalOutgoingRepairSSTables.inc(message.stream.getNumFiles()); + } + // schedule timeout for receiving ACK StreamTransferTask task = transfers.get(message.header.tableId); if (task != null) diff --git a/test/distributed/org/apache/cassandra/distributed/test/metrics/StreamingMetricsTest.java b/test/distributed/org/apache/cassandra/distributed/test/metrics/StreamingMetricsTest.java index 22449e8..a4e4adc 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/metrics/StreamingMetricsTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/metrics/StreamingMetricsTest.java @@ -18,105 +18,385 @@ package org.apache.cassandra.distributed.test.metrics; -import java.io.IOException; import java.net.InetSocketAddress; -import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.TimeUnit; -import org.junit.AfterClass; -import org.junit.BeforeClass; import org.junit.Test; import org.apache.cassandra.distributed.Cluster; +import org.apache.cassandra.distributed.api.ConsistencyLevel; import org.apache.cassandra.distributed.test.TestBaseImpl; import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.metrics.StreamingMetrics; -import static org.apache.cassandra.distributed.api.Feature.NETWORK; import static org.assertj.core.api.Assertions.assertThat; +import static org.apache.cassandra.distributed.api.Feature.NETWORK; public class StreamingMetricsTest extends TestBaseImpl { - private static Cluster cluster; - @BeforeClass - public static void setupCluster() throws IOException - { - cluster = Cluster.build().withNodes(2) - .withDataDirCount(1) - .withConfig(config -> config.with(NETWORK) - .set("stream_entire_sstables", false)) - .start(); - cluster.schemaChange("CREATE KEYSPACE " + KEYSPACE + " WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 2 };"); - } - - private static InetAddressAndPort getNodeAddress(int num) + private static InetAddressAndPort getNodeAddress(Cluster cluster, int num) { InetSocketAddress broadcastAddress = cluster.get(num).broadcastAddress(); return InetAddressAndPort.getByAddressOverrideDefaults(broadcastAddress.getAddress(), broadcastAddress.getPort()); } - @AfterClass - public static void teardownCluster() + @Test + public void testMetricsWithRepairAndStreamingFromTwoNodes() throws Exception { - if (cluster != null) - cluster.close(); + testMetricsWithStreamingFromTwoNodes(true); } + @Test + public void testMetricsWithRebuildAndStreamingFromTwoNodes() throws Exception + { + testMetricsWithStreamingFromTwoNodes(false); + } + + public void testMetricsWithStreamingFromTwoNodes(boolean useRepair) throws Exception + { + try(Cluster cluster = init(Cluster.build(3) + .withDataDirCount(1) + .withConfig(config -> config.with(NETWORK) + .set("stream_entire_sstables", false) + .set("hinted_handoff_enabled", false)) + .start(), 2)) + { + cluster.schemaChange(String.format("CREATE TABLE %s.cf (k text, c1 text, c2 text, PRIMARY KEY (k)) WITH compaction = {'class': '%s', 'enabled': 'false'}", KEYSPACE, "LeveledCompactionStrategy")); + + cluster.get(3).shutdown().get(10, TimeUnit.SECONDS); + + final int rowsPerFile = 500; + final int files = 5; + for (int k = 0; k < files; k++) + { + for (int i = k * rowsPerFile; i < k * rowsPerFile + rowsPerFile; ++i) + { + cluster.coordinator(1).execute(withKeyspace("INSERT INTO %s.cf (k, c1, c2) VALUES (?, 'value1', 'value2');"), + ConsistencyLevel.ONE, + Integer.toString(i)); + } + cluster.get(1).nodetool("flush"); + cluster.get(2).nodetool("flush"); + } + + cluster.get(3).startup(); + + // Checks that the table is empty on node 3 + Object[][] results = cluster.get(3).executeInternal(String.format("SELECT k, c1, c2 FROM %s.cf;", KEYSPACE)); + assertThat(results.length).isEqualTo(0); + + checkThatNoStreamingOccuredBetweenTheThreeNodes(cluster); + + // Trigger streaming from node 3 + if (useRepair) + cluster.get(3).nodetool("repair", "--full"); + else + cluster.get(3).nodetool("rebuild", "--keyspace", KEYSPACE); + + + // Check streaming metrics on node 1 + checkThatNoStreamingOccured(cluster, 1, 2); + long bytesFrom1 = checkDataSent(cluster, 1, 3); + checkDataReceived(cluster, 1, 3, 0, 0); + + if (useRepair) + checkTotalDataSent(cluster, 1, bytesFrom1, bytesFrom1, files); + else + checkTotalDataSent(cluster, 1, bytesFrom1, 0, 0); + + checkTotalDataReceived(cluster, 1, 0); + + // Check streaming metrics on node 2 + checkThatNoStreamingOccured(cluster, 2, 1); + long bytesFrom2 = checkDataSent(cluster, 2, 3); + checkDataReceived(cluster, 1, 2, 0, 0); + + if (useRepair) + checkTotalDataSent(cluster, 2, bytesFrom2, bytesFrom2, files); + else + checkTotalDataSent(cluster, 2, bytesFrom2, 0, 0); + + checkTotalDataReceived(cluster, 2, 0); + + // Check streaming metrics on node 3 + checkDataReceived(cluster, 3, 1, bytesFrom1, files); + checkDataReceived(cluster, 3, 2, bytesFrom2, files); + checkTotalDataSent(cluster, 3, 0, 0, 0); + checkTotalDataReceived(cluster, 3, bytesFrom1 + bytesFrom2); + } + } @Test - public void testStreamMetrics() + public void testMetricsWithRebuildAndStreamingToTwoNodes() throws Exception { - cluster.schemaChange(String.format("CREATE TABLE %s.cf (k text, c1 text, c2 text, PRIMARY KEY (k)) WITH compaction = {'class': '%s', 'enabled': 'false'}", KEYSPACE, "LeveledCompactionStrategy")); + testMetricsWithStreamingToTwoNodes(false); + } + + @Test + public void testMetricsWithRepairAndStreamingToTwoNodes() throws Exception + { + testMetricsWithStreamingToTwoNodes(true); + } - final int rowsPerFile = 500; - final int files = 5; - for (int k = 0; k < files; k++) + public void testMetricsWithStreamingToTwoNodes(boolean useRepair) throws Exception + { + try(Cluster cluster = init(Cluster.build(3) + .withDataDirCount(1) + .withConfig(config -> config.with(NETWORK) + .set("stream_entire_sstables", false) + .set("hinted_handoff_enabled", false)) + .start(), 2)) { - for (int i = k * rowsPerFile; i < k * rowsPerFile + rowsPerFile; ++i) - cluster.get(1).executeInternal(withKeyspace("INSERT INTO %s.cf (k, c1, c2) VALUES (?, 'value1', 'value2');"), Integer.toString(i)); - cluster.get(1).nodetool("flush"); + cluster.schemaChange(String.format("CREATE TABLE %s.cf (k text, c1 text, c2 text, PRIMARY KEY (k)) WITH compaction = {'class': '%s', 'enabled': 'false'}", KEYSPACE, "LeveledCompactionStrategy")); + + final int rowsPerFile = 500; + final int files = 5; + + cluster.get(3).shutdown().get(10, TimeUnit.SECONDS); + cluster.get(1).nodetool("disableautocompaction", KEYSPACE); + cluster.get(2).nodetool("disableautocompaction", KEYSPACE); + + int sstablesInitiallyOnNode2 = 0; + int sstablesInitiallyOnNode3 = 0; + + for (int k = 0; k < 3; k++) + { + for (int i = k * rowsPerFile; i < k * rowsPerFile + rowsPerFile; ++i) + { + cluster.coordinator(1).execute(withKeyspace("INSERT INTO %s.cf (k, c1, c2) VALUES (?, 'value1', 'value2');"), + ConsistencyLevel.ONE, + Integer.toString(i)); + } + cluster.get(1).nodetool("flush"); + cluster.get(2).nodetool("flush"); + sstablesInitiallyOnNode2++; + } + + cluster.get(3).startup(); + cluster.get(3).nodetool("disableautocompaction", KEYSPACE); + + cluster.get(2).shutdown().get(10, TimeUnit.SECONDS); + + for (int k = 3; k < files; k++) + { + for (int i = k * rowsPerFile; i < k * rowsPerFile + rowsPerFile; ++i) + { + cluster.coordinator(1).execute(withKeyspace("INSERT INTO %s.cf (k, c1, c2) VALUES (?, 'value1', 'value2');"), + ConsistencyLevel.ONE, + Integer.toString(i)); + } + cluster.get(1).nodetool("flush"); + cluster.get(3).nodetool("flush"); + sstablesInitiallyOnNode3++; + } + + cluster.get(2).startup(); + cluster.get(2).nodetool("disableautocompaction", KEYSPACE); + + checkThatNoStreamingOccuredBetweenTheThreeNodes(cluster); + + // Trigger streaming from node 3 and node 2 + + long bytesFrom2To1; + int sstablesFrom2To1; + + long bytesFrom3To1; + int sstablesFrom3To1; + + int sstablesFrom3To2; + int sstablesFrom2To3; + + if (useRepair) + { + cluster.get(3).nodetool("repair", "--full"); + cluster.get(2).nodetool("repair", "--full"); + + bytesFrom2To1 = checkDataSent(cluster, 2, 1); + sstablesFrom2To1 = sstablesInitiallyOnNode2; + + bytesFrom3To1 = checkDataSent(cluster, 3, 1) ; + sstablesFrom3To1 = sstablesInitiallyOnNode3; + + sstablesFrom2To3 = sstablesInitiallyOnNode2; + sstablesFrom3To2 = sstablesInitiallyOnNode3; + } + else + { + cluster.get(3).nodetool("rebuild", "--keyspace", KEYSPACE); + cluster.get(2).nodetool("rebuild", "--keyspace", KEYSPACE); + + bytesFrom2To1 = 0; + sstablesFrom2To1 = 0; + + bytesFrom3To1 = 0; + sstablesFrom3To1 = 0; + + sstablesFrom2To3 = sstablesInitiallyOnNode2; + sstablesFrom3To2 = sstablesInitiallyOnNode3 + sstablesInitiallyOnNode2; + } + + // Check streaming metrics on node 1 + long bytesFrom1To2 = checkDataSent(cluster, 1, 2); + long bytesFrom1To3 = checkDataSent(cluster, 1, 3); + + long totalBytesSentFrom1 = bytesFrom1To2 + bytesFrom1To3; + + if (useRepair) + checkTotalDataSent(cluster, 1, totalBytesSentFrom1, totalBytesSentFrom1, 10); + else + checkTotalDataSent(cluster, 1, totalBytesSentFrom1, 0, 0); + + checkDataReceived(cluster, 1, 2, bytesFrom2To1, sstablesFrom2To1); + checkDataReceived(cluster, 1, 3, bytesFrom3To1, sstablesFrom3To1); + checkTotalDataReceived(cluster, 1, bytesFrom2To1 + bytesFrom3To1); + + // Check streaming metrics on node 2 and 3 + long bytesFrom2To3 = checkDataSent(cluster, 2, 3); + long bytesFrom3To2 = checkDataSent(cluster, 3, 2); + + long totalBytesReceivedBy2 = bytesFrom1To2 + bytesFrom3To2; + + checkDataReceived(cluster, 2, 1, bytesFrom1To2, files); + checkDataReceived(cluster, 2, 3, bytesFrom3To2, sstablesFrom3To2); + + if (useRepair) + checkTotalDataSent(cluster, 2, bytesFrom2To3 + bytesFrom2To1, bytesFrom2To3 + bytesFrom2To1, sstablesFrom2To3 + sstablesFrom2To1); + else + checkTotalDataSent(cluster, 2, bytesFrom2To3, 0, 0); + + checkTotalDataReceived(cluster, 2, totalBytesReceivedBy2); + + long totalBytesReceivedBy3 = bytesFrom1To3 + bytesFrom2To3; + + checkDataReceived(cluster, 3, 1, bytesFrom1To3, files); + checkDataReceived(cluster, 3, 2, bytesFrom2To3, sstablesFrom2To3); + + if (useRepair) + checkTotalDataSent(cluster, 3, bytesFrom3To2 + bytesFrom3To1, bytesFrom3To2 + bytesFrom3To1, sstablesFrom3To2 + sstablesFrom3To1); + else + checkTotalDataSent(cluster, 3, bytesFrom3To2, 0, 0); + + checkTotalDataReceived(cluster, 3, totalBytesReceivedBy3); } + } - cluster.get(2).executeInternal("TRUNCATE system.available_ranges;"); - Object[][] results = cluster.get(2).executeInternal(String.format("SELECT k, c1, c2 FROM %s.cf;", KEYSPACE)); - assertThat(results.length).isEqualTo(0); + private void checkThatNoStreamingOccuredBetweenTheThreeNodes(Cluster cluster) + { + checkThatNoStreamingOccured(cluster, 1, 2); + checkThatNoStreamingOccured(cluster, 1, 3); + checkTotalDataSent(cluster, 1, 0, 0, 0); + checkTotalDataReceived(cluster, 1, 0); - InetAddressAndPort node1Address = getNodeAddress(1); - InetAddressAndPort node2Address = getNodeAddress(2); + checkThatNoStreamingOccured(cluster, 2, 1); + checkThatNoStreamingOccured(cluster, 2, 3); + checkTotalDataSent(cluster, 2, 0, 0, 0); + checkTotalDataReceived(cluster, 2, 0); - // Trigger streaming from node 2 - cluster.get(2).nodetool("rebuild", "--keyspace", KEYSPACE); + checkThatNoStreamingOccured(cluster, 3, 1); + checkThatNoStreamingOccured(cluster, 3, 2); + checkTotalDataSent(cluster, 3, 0, 0, 0); + checkTotalDataReceived(cluster, 3, 0); + } + + private void checkThatNoStreamingOccured(Cluster cluster, int node, int peer) + { + InetAddressAndPort address = getNodeAddress(cluster, peer); + cluster.get(node).runOnInstance(() -> { + + StreamingMetrics metrics = StreamingMetrics.get(address); - // Assert metrics in node 2 - long transmittedBytes = cluster.get(2).callOnInstance(() -> { - StreamingMetrics metrics = StreamingMetrics.get(node1Address); assertThat(metrics.incomingBytes.getCount()) - .isGreaterThan(0) - .describedAs("There should be bytes streamed from the peer."); + .describedAs("No SSTable should have been streamed so far from node" + node + " to node" + peer) + .isEqualTo(0); + assertThat(metrics.outgoingBytes.getCount()) - .isEqualTo(0) - .describedAs("There should not be sstables streamed to the peer."); + .describedAs("No SSTable should have been streamed so far from node" + node + " to node" + peer) + .isEqualTo(0); + assertThat(metrics.incomingProcessTime.getCount()) - .isEqualTo(files) - .describedAs("There should be " + files + " files streamed from the peer."); - assertThat(metrics.incomingProcessTime.getSnapshot().getMedian()) - .isGreaterThan(0) - .describedAs("The median processing time should be non-0"); - return metrics.incomingBytes.getCount(); + .describedAs("No SSTable should have been streamed so far from node" + node + " to node" + peer) + .isEqualTo(0); }); + } - // Assert metrics in node 1 - cluster.get(1).runOnInstance(() -> { - StreamingMetrics metrics = StreamingMetrics.get(node2Address); - assertThat(metrics.incomingBytes.getCount()) - .isEqualTo(0).describedAs("There should not be sstables streamed from the peer."); - assertThat(metrics.outgoingBytes.getCount()) - .isEqualTo(transmittedBytes) - .describedAs("The outgoingBytes count in node1 should be equals to incomingBytes count in node2"); - assertThat(metrics.incomingProcessTime.getCount()) - .isEqualTo(0) - .describedAs("There should be no files streamed from the peer."); + private long checkDataSent(Cluster cluster, int node, int peer) + { + InetAddressAndPort address = getNodeAddress(cluster, peer); + return cluster.get(node).callOnInstance(() -> { + + StreamingMetrics metrics = StreamingMetrics.get(address); + + long outgoingBytes = metrics.outgoingBytes.getCount(); + assertThat(outgoingBytes) + .describedAs("There should be data streamed from node" + node + " to node" + peer) + .isGreaterThan(0); + + return outgoingBytes; }); } + + private void checkDataReceived(Cluster cluster, int node, int peer, long receivedBytes, int files) + { + InetAddressAndPort address = getNodeAddress(cluster, peer); + cluster.get(node).runOnInstance(() -> { + + StreamingMetrics metrics = StreamingMetrics.get(address); + + long actual = metrics.incomingBytes.getCount(); + assertThat(actual) + .describedAs("The amount of data received by node" + node + " from node" + peer + " is not the expected one. [expected: " + receivedBytes + ", actual: " + actual + "]") + .isEqualTo(receivedBytes); + + actual = metrics.incomingProcessTime.getCount(); + // The incomingProcessTime timer is updated for each incoming file. By consequence incomingProcessTime.getCount() should be equals to the number of files received by the node. + assertThat(actual) + .describedAs("The amount of files received by node" + node + " from node" + peer + " is not the expected one. [expected: " + files + ", actual: " + actual + "]") + .isEqualTo(files); + + if (metrics.incomingProcessTime.getCount() != 0) + { + assertThat(metrics.incomingProcessTime.getSnapshot().getMedian()) + .describedAs("The median processing time for data streamed from node"+ peer + " to node" + node + " should be non-0") + .isGreaterThan(0); + } + }); + } + + private void checkTotalDataSent(Cluster cluster, + int node, + long outgoingBytes, + long outgoingRepairBytes, + long outgoingRepairSSTables) + { + cluster.get(node).runOnInstance(() -> { + + long actual = StreamingMetrics.totalOutgoingBytes.getCount(); + assertThat(actual) + .describedAs("The total amount of data sent by the node" + node + " is not the expected one. [expected: " + outgoingBytes + ", actual: " + actual + "]") + .isEqualTo(outgoingBytes); + + actual = StreamingMetrics.totalOutgoingRepairBytes.getCount(); + assertThat(actual) + .describedAs("The total amount of data sent by the node" + node + " for repair is not the expected one. [expected: " + outgoingRepairBytes + ", actual: " + actual + "]") + .isEqualTo(outgoingRepairBytes); + + actual = StreamingMetrics.totalOutgoingRepairSSTables.getCount(); + assertThat(actual) + .describedAs("The total amount of SSTables sent by the node" + node + " for repair is not the expected one. [expected: " + outgoingRepairSSTables + ", actual: " + actual + "]") + .isEqualTo(outgoingRepairSSTables); + }); + } + + private void checkTotalDataReceived(Cluster cluster, int node, long incomingBytes) + { + cluster.get(node).runOnInstance(() -> { + + long actual = StreamingMetrics.totalIncomingBytes.getCount(); + assertThat(actual) + .describedAs("The total amount of data received by the node" + node + " is not the expected one. [expected: " + incomingBytes + ", actual: " + actual + "]") + .isEqualTo(incomingBytes); + }); + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org