This is an automated email from the ASF dual-hosted git repository. marcuse pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit 2c05f82755625c805ce5587ae71a502dab7b6d35 Author: Marcus Eriksson <[email protected]> AuthorDate: Fri Apr 4 08:19:17 2025 +0200 Add nodetool command to dump the contents of the system_views.{cluster_metadata_log, cluster_metadata_directory} tables Patch by marcuse; reviewed by Sam Tunnicliffe for CASSANDRA-20525 --- CHANGES.txt | 1 + .../db/virtual/ClusterMetadataDirectoryTable.java | 71 ++++++++++--- .../db/virtual/ClusterMetadataLogTable.java | 34 ++++-- .../org/apache/cassandra/tcm/CMSOperations.java | 27 +++++ .../apache/cassandra/tcm/CMSOperationsMBean.java | 2 + src/java/org/apache/cassandra/tools/NodeTool.java | 4 +- .../apache/cassandra/tools/nodetool/CMSAdmin.java | 50 +++++++++ .../test/log/ClusterMetadataDumpTest.java | 116 +++++++++++++++++++++ 8 files changed, 280 insertions(+), 25 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index ba71e542b4..c4676baf07 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 5.1 + * Add nodetool command to dump the contents of the system_views.{cluster_metadata_log, cluster_metadata_directory} tables (CASSANDRA-20525) * Fix TreeMap race in CollectionVirtualTableAdapter causing us to lose rows in the virtual table (CASSANDRA-20524) * Improve metadata log catch up with inter-DC mutation forwarding (CASSANDRA-20523) * Support topology-safe changes to Datacenter & Rack for live nodes (CASSANDRA-20528) diff --git a/src/java/org/apache/cassandra/db/virtual/ClusterMetadataDirectoryTable.java b/src/java/org/apache/cassandra/db/virtual/ClusterMetadataDirectoryTable.java index 0d026ce65d..e7fba1519b 100644 --- a/src/java/org/apache/cassandra/db/virtual/ClusterMetadataDirectoryTable.java +++ b/src/java/org/apache/cassandra/db/virtual/ClusterMetadataDirectoryTable.java @@ -17,16 +17,26 @@ */ package org.apache.cassandra.db.virtual; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; import java.util.Map; +import java.util.stream.Collectors; + +import com.google.common.collect.ImmutableMap; import org.apache.cassandra.db.marshal.InetAddressType; import org.apache.cassandra.db.marshal.Int32Type; +import org.apache.cassandra.db.marshal.ListType; import org.apache.cassandra.db.marshal.LongType; +import org.apache.cassandra.db.marshal.MapType; import org.apache.cassandra.db.marshal.UTF8Type; import org.apache.cassandra.db.marshal.UUIDType; import org.apache.cassandra.dht.LocalPartitioner; +import org.apache.cassandra.dht.Token; import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.tcm.ClusterMetadata; +import org.apache.cassandra.tcm.MultiStepOperation; import org.apache.cassandra.tcm.membership.Directory; import org.apache.cassandra.tcm.membership.Location; import org.apache.cassandra.tcm.membership.NodeAddresses; @@ -35,7 +45,7 @@ import org.apache.cassandra.tcm.membership.NodeState; import org.apache.cassandra.tcm.membership.NodeVersion; -final class ClusterMetadataDirectoryTable extends AbstractVirtualTable +public final class ClusterMetadataDirectoryTable extends AbstractVirtualTable { private static final String NODE_ID = "node_id"; private static final String HOST_ID = "host_id"; @@ -50,6 +60,8 @@ final class ClusterMetadataDirectoryTable extends AbstractVirtualTable private static final String LOCAL_PORT = "local_port"; private static final String NATIVE_ADDRESS = "native_address"; private static final String NATIVE_PORT = "native_port"; + private static final String TOKENS = "tokens"; + private static final String MULTI_STEP_OPERATION = "multi_step_operation"; ClusterMetadataDirectoryTable(String keyspace) @@ -71,15 +83,31 @@ final class ClusterMetadataDirectoryTable extends AbstractVirtualTable .addRegularColumn(LOCAL_PORT, Int32Type.instance) .addRegularColumn(NATIVE_ADDRESS, InetAddressType.instance) .addRegularColumn(NATIVE_PORT, Int32Type.instance) + .addRegularColumn(TOKENS, ListType.getInstance(UTF8Type.instance, false)) + .addRegularColumn(MULTI_STEP_OPERATION, MapType.getInstance(UTF8Type.instance, UTF8Type.instance, false)) .build()); } @Override public DataSet data() + { + SimpleDataSet result = new SimpleDataSet(metadata()); + + for (Map.Entry<Long, Map<String, Object>> entry : directory(true).entrySet()) + { + result = result.row(entry.getKey().intValue()); + for (Map.Entry<String, Object> row : entry.getValue().entrySet()) + result = result.column(row.getKey(), row.getValue()); + } + return result; + } + + public static Map<Long, Map<String, Object>> directory(boolean tokens) { ClusterMetadata metadata = ClusterMetadata.current(); Directory directory = metadata.directory; - SimpleDataSet result = new SimpleDataSet(metadata()); + Map<Long, Map<String, Object>> result = new LinkedHashMap<>(); + for (Map.Entry<NodeId, NodeState> entry : directory.states.entrySet()) { NodeId nodeId = entry.getKey(); @@ -87,20 +115,33 @@ final class ClusterMetadataDirectoryTable extends AbstractVirtualTable NodeAddresses address = directory.getNodeAddresses(nodeId); Location location = directory.location(nodeId); NodeVersion version = directory.version(nodeId); - result.row(nodeId.id()) - .column(HOST_ID, nodeId.toUUID()) - .column(STATE, nodeState.toString()) - .column(CASSANDRA_VERSION, version != null ? version.cassandraVersion.toString() : null) - .column(SERIALIZATION_VERSION, version != null ? version.serializationVersion : null) - .column(RACK, location != null ? location.rack : null) - .column(DC, location != null ? location.datacenter : null) - .column(BROADCAST_ADDRESS, address != null ? address.broadcastAddress.getAddress() : null) - .column(BROADCAST_PORT, address != null ? address.broadcastAddress.getPort() : null) - .column(LOCAL_ADDRESS, address != null ? address.localAddress.getAddress() : null) - .column(LOCAL_PORT, address != null ? address.localAddress.getPort() : null) - .column(NATIVE_ADDRESS, address != null ? address.nativeAddress.getAddress() : null) - .column(NATIVE_PORT, address != null ? address.nativeAddress.getPort() : null); + Map<String, Object> row = new HashMap<>(); + row.put(HOST_ID, nodeId.toUUID()); + row.put(STATE, nodeState.toString()); + row.put(CASSANDRA_VERSION, version != null ? version.cassandraVersion.toString() : null); + row.put(SERIALIZATION_VERSION, version != null ? version.serializationVersion : null); + row.put(RACK, location != null ? location.rack : null); + row.put(DC, location != null ? location.datacenter : null); + row.put(BROADCAST_ADDRESS, address != null ? address.broadcastAddress.getAddress() : null); + row.put(BROADCAST_PORT, address != null ? address.broadcastAddress.getPort() : null); + row.put(LOCAL_ADDRESS, address != null ? address.localAddress.getAddress() : null); + row.put(LOCAL_PORT, address != null ? address.localAddress.getPort() : null); + row.put(NATIVE_ADDRESS, address != null ? address.nativeAddress.getAddress() : null); + row.put(NATIVE_PORT, address != null ? address.nativeAddress.getPort() : null); + if (tokens) + row.put(TOKENS, tokensToString(metadata.tokenMap.tokens(nodeId))); + MultiStepOperation<?> mso = metadata.inProgressSequences.get(nodeId); + if (mso != null) + row.put(MULTI_STEP_OPERATION, ImmutableMap.of("kind", mso.kind().name(), + "status", mso.status(), + "nextStep", mso.nextStep().name())); + result.put((long)nodeId.id(), row); } return result; } + + private static List<String> tokensToString(List<Token> tokens) + { + return tokens.stream().map(Object::toString).collect(Collectors.toList()); + } } diff --git a/src/java/org/apache/cassandra/db/virtual/ClusterMetadataLogTable.java b/src/java/org/apache/cassandra/db/virtual/ClusterMetadataLogTable.java index 152b4769a7..cd755115ec 100644 --- a/src/java/org/apache/cassandra/db/virtual/ClusterMetadataLogTable.java +++ b/src/java/org/apache/cassandra/db/virtual/ClusterMetadataLogTable.java @@ -19,6 +19,9 @@ package org.apache.cassandra.db.virtual; import java.io.IOException; import java.util.Date; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.Map; import org.apache.cassandra.cql3.UntypedResultSet; import org.apache.cassandra.db.ConsistencyLevel; @@ -27,6 +30,7 @@ import org.apache.cassandra.db.marshal.TimestampType; import org.apache.cassandra.db.marshal.UTF8Type; import org.apache.cassandra.locator.MetaStrategy; import org.apache.cassandra.schema.TableMetadata; +import org.apache.cassandra.tcm.Epoch; import org.apache.cassandra.tcm.Transformation; import static java.lang.String.format; @@ -34,7 +38,7 @@ import static org.apache.cassandra.cql3.QueryProcessor.execute; import static org.apache.cassandra.schema.DistributedMetadataLogKeyspace.TABLE_NAME; import static org.apache.cassandra.schema.SchemaConstants.METADATA_KEYSPACE_NAME; -final class ClusterMetadataLogTable extends AbstractVirtualTable +public final class ClusterMetadataLogTable extends AbstractVirtualTable { private static final String EPOCH = "epoch"; private static final String KIND = "kind"; @@ -58,22 +62,34 @@ final class ClusterMetadataLogTable extends AbstractVirtualTable @Override public DataSet data() + { + SimpleDataSet result = new SimpleDataSet(metadata()); + for (Map.Entry<Long, Map<String, Object>> entry : log(Epoch.FIRST.getEpoch(), Long.MAX_VALUE).entrySet()) + { + SimpleDataSet data = result.row(entry.getKey()); + for (Map.Entry<String, Object> rowEntry : entry.getValue().entrySet()) + data = data.column(rowEntry.getKey(), rowEntry.getValue()); + } + return result; + } + + public static Map<Long, Map<String, Object>> log(long startEpoch, long endEpoch) { try { - SimpleDataSet result = new SimpleDataSet(metadata()); + Map<Long, Map<String, Object>> result = new LinkedHashMap<>(); UntypedResultSet res = execute(format("SELECT epoch, kind, transformation, entry_id, writetime(kind) as wt " + - "FROM %s.%s", METADATA_KEYSPACE_NAME, TABLE_NAME), ConsistencyLevel.QUORUM); + "FROM %s.%s WHERE token(epoch) >= token(?) AND token(epoch) <= token(?)", METADATA_KEYSPACE_NAME, TABLE_NAME), ConsistencyLevel.QUORUM, endEpoch, startEpoch); for (UntypedResultSet.Row r : res) { Transformation.Kind kind = Transformation.Kind.fromId(r.getInt("kind")); Transformation transformation = kind.fromVersionedBytes(r.getBlob("transformation")); - - result.row(r.getLong("epoch")) - .column(KIND, kind.toString()) - .column(TRANSFORMATION, transformation.toString()) - .column(ENTRY_ID, r.getLong("entry_id")) - .column(ENTRY_TIME, new Date(r.getLong("wt") / 1000)); + Map<String, Object> row = new HashMap<>(); + row.put(KIND, kind.toString()); + row.put(TRANSFORMATION, transformation.toString()); + row.put(ENTRY_ID, r.getLong("entry_id")); + row.put(ENTRY_TIME, new Date(r.getLong("wt") / 1000)); + result.put(r.getLong("epoch"), row); } return result; } diff --git a/src/java/org/apache/cassandra/tcm/CMSOperations.java b/src/java/org/apache/cassandra/tcm/CMSOperations.java index b37da9dd94..5b21acd142 100644 --- a/src/java/org/apache/cassandra/tcm/CMSOperations.java +++ b/src/java/org/apache/cassandra/tcm/CMSOperations.java @@ -31,6 +31,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.db.virtual.ClusterMetadataDirectoryTable; +import org.apache.cassandra.db.virtual.ClusterMetadataLogTable; import org.apache.cassandra.schema.ReplicationParams; import org.apache.cassandra.tcm.membership.NodeId; import org.apache.cassandra.tcm.membership.NodeState; @@ -261,4 +263,29 @@ public class CMSOperations implements CMSOperationsMBean cms.commit(new Unregister(nodeId, EnumSet.of(NodeState.LEFT), ClusterMetadataService.instance().placementProvider())); } } + + public Map<Long, Map<String, String>> dumpDirectory(boolean tokens) + { + Map<Long, Map<String, Object>> directory = ClusterMetadataDirectoryTable.directory(tokens); + return convertToStringValues(directory); + } + + public Map<Long, Map<String, String>> dumpLog(long startEpoch, long endEpoch) + { + Map<Long, Map<String, Object>> log = ClusterMetadataLogTable.log(startEpoch, endEpoch); + return convertToStringValues(log); + } + + private Map<Long, Map<String, String>> convertToStringValues(Map<Long, Map<String, Object>> log) + { + Map<Long, Map<String, String>> res = new LinkedHashMap<>(); + for (Map.Entry<Long, Map<String, Object>> outerEntry : log.entrySet()) + { + Map<String, String> rowRes = new HashMap<>(); + for (Map.Entry<String, Object> row : outerEntry.getValue().entrySet()) + rowRes.put(row.getKey(), row.getValue().toString()); + res.put(outerEntry.getKey(), rowRes); + } + return res; + } } diff --git a/src/java/org/apache/cassandra/tcm/CMSOperationsMBean.java b/src/java/org/apache/cassandra/tcm/CMSOperationsMBean.java index 1e2d9e1473..7ff0c0191b 100644 --- a/src/java/org/apache/cassandra/tcm/CMSOperationsMBean.java +++ b/src/java/org/apache/cassandra/tcm/CMSOperationsMBean.java @@ -46,4 +46,6 @@ public interface CMSOperationsMBean public boolean cancelInProgressSequences(String sequenceOwner, String expectedSequenceKind); public void unregisterLeftNodes(List<String> nodeIds); + public Map<Long, Map<String, String>> dumpDirectory(boolean includeTokens); + public Map<Long, Map<String, String>> dumpLog(long startEpoch, long endEpoch); } diff --git a/src/java/org/apache/cassandra/tools/NodeTool.java b/src/java/org/apache/cassandra/tools/NodeTool.java index 1cc12f3882..5b149acd9a 100644 --- a/src/java/org/apache/cassandra/tools/NodeTool.java +++ b/src/java/org/apache/cassandra/tools/NodeTool.java @@ -270,7 +270,9 @@ public class NodeTool .withCommand(CMSAdmin.ReconfigureCMS.class) .withCommand(CMSAdmin.Snapshot.class) .withCommand(CMSAdmin.Unregister.class) - .withCommand(CMSAdmin.AbortInitialization.class); + .withCommand(CMSAdmin.AbortInitialization.class) + .withCommand(CMSAdmin.DumpDirectory.class) + .withCommand(CMSAdmin.DumpLog.class); Cli<NodeToolCmdRunnable> parser = builder.build(); diff --git a/src/java/org/apache/cassandra/tools/nodetool/CMSAdmin.java b/src/java/org/apache/cassandra/tools/nodetool/CMSAdmin.java index 02cc045545..7f54fdd9be 100644 --- a/src/java/org/apache/cassandra/tools/nodetool/CMSAdmin.java +++ b/src/java/org/apache/cassandra/tools/nodetool/CMSAdmin.java @@ -18,14 +18,19 @@ package org.apache.cassandra.tools.nodetool; +import java.io.PrintStream; import java.util.ArrayList; +import java.util.Comparator; import java.util.HashMap; import java.util.List; import java.util.Map; +import com.google.common.collect.ImmutableList; + import io.airlift.airline.Arguments; import io.airlift.airline.Command; import io.airlift.airline.Option; +import org.apache.cassandra.tcm.Epoch; import org.apache.cassandra.tools.NodeProbe; import org.apache.cassandra.tools.NodeTool; @@ -207,4 +212,49 @@ public abstract class CMSAdmin extends NodeTool.NodeToolCmd probe.getCMSOperationsProxy().abortInitialization(initiator); } } + + @Command(name = "dumpdirectory", description = "Dump the directory from the current ClusterMetadata") + public static class DumpDirectory extends NodeTool.NodeToolCmd + { + @Option(name = "--tokens", title = "Include tokens", description = "Include tokens in output") + public boolean tokens = false; + @Override + protected void execute(NodeProbe probe) + { + output(probe.output().out, "NodeId", probe.getCMSOperationsProxy().dumpDirectory(tokens)); + } + } + + @Command(name = "dumplog", description = "Dump the metadata log") + public static class DumpLog extends NodeTool.NodeToolCmd + { + @Option(name = "--start", title = "Start epoch") + long startEpoch = Epoch.FIRST.getEpoch(); + @Option(name = "--end", title = "End epoch") + long endEpoch = Long.MAX_VALUE; + @Override + protected void execute(NodeProbe probe) + { + output(probe.output().out, "Epoch", probe.getCMSOperationsProxy().dumpLog(startEpoch, endEpoch)); + } + } + + private static void output(PrintStream out, String title, Map<Long, Map<String, String>> map) + { + if (map.isEmpty()) + return; + int keywidth = keywidth(map); + for (Long key : ImmutableList.sortedCopyOf(map.keySet())) + { + out.println(title + ": " + key); + for (Map.Entry<String, String> nodeEntry : map.get(key).entrySet()) + out.printf(" %-" + keywidth + "s%s%n", nodeEntry.getKey(), nodeEntry.getValue()); + } + } + + private static int keywidth(Map<?, Map<String, String>> map) + { + assert !map.isEmpty(); + return map.entrySet().iterator().next().getValue().keySet().stream().max(Comparator.comparingInt(String::length)).get().length() + 1; + } } diff --git a/test/distributed/org/apache/cassandra/distributed/test/log/ClusterMetadataDumpTest.java b/test/distributed/org/apache/cassandra/distributed/test/log/ClusterMetadataDumpTest.java new file mode 100644 index 0000000000..b0f66ff885 --- /dev/null +++ b/test/distributed/org/apache/cassandra/distributed/test/log/ClusterMetadataDumpTest.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.distributed.test.log; + +import java.io.IOException; + +import org.junit.Test; + +import org.apache.cassandra.distributed.Cluster; +import org.apache.cassandra.distributed.api.NodeToolResult; +import org.apache.cassandra.distributed.test.TestBaseImpl; +import org.apache.cassandra.tcm.ClusterMetadataService; +import org.apache.cassandra.tcm.transformations.CustomTransformation; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class ClusterMetadataDumpTest extends TestBaseImpl +{ + @Test + public void dumpLogTest() throws IOException + { + try (Cluster cluster = init(builder().withNodes(3) + .start())) + { + cluster.get(1).runOnInstance(() -> { + for (int i = 0; i < 10; i++) + ClusterMetadataService.instance().commit(new CustomTransformation(CustomTransformation.PokeInt.NAME, new CustomTransformation.PokeInt(i))); + }); + + NodeToolResult res = cluster.get(1).nodetoolResult("cms", "dumplog"); + res.asserts().success(); + int unsafeJoinSeen = 0; + int registerSeen = 0; + int epochsSeen = 0; + for (String l : res.getStdout().split("\n")) + { + if (l.contains("kind")) + { + if (l.contains("REGISTER")) + registerSeen++; + else if (l.contains("UNSAFE_JOIN")) + unsafeJoinSeen++; + } + if (l.startsWith("Epoch:")) + epochsSeen++; + } + assertEquals(3, unsafeJoinSeen); + assertEquals(3, registerSeen); + assertTrue(epochsSeen > 15); + + res = cluster.get(1).nodetoolResult("cms", "dumplog", "--start", "10", "--end", "15"); + epochsSeen = 0; + for (String l : res.getStdout().split("\n")) + { + if (l.startsWith("Epoch: ")) + { + epochsSeen++; + long epoch = Long.parseLong(l.split(": ")[1]); + assertTrue(epoch >= 10 && epoch <= 15); + } + } + assertEquals(6, epochsSeen); + } + } + + @Test + public void dumpDirectoryTest() throws IOException + { + try (Cluster cluster = init(builder().withNodes(3) + .start())) + { + NodeToolResult res = cluster.get(1).nodetoolResult("cms", "dumpdirectory"); + res.asserts().success(); + int nodesFound = 0; + for (String l : res.getStdout().split("\n")) + { + if (l.startsWith("NodeId")) + nodesFound++; + assertFalse(l.contains("tokens")); + } + assertEquals(3, nodesFound); + res = cluster.get(1).nodetoolResult("cms", "dumpdirectory", "--tokens"); + res.asserts().success(); + nodesFound = 0; + int tokensFound = 0; + for (String l : res.getStdout().split("\n")) + { + if (l.startsWith("NodeId")) + nodesFound++; + + if (l.contains("tokens")) + tokensFound++; + } + assertEquals(3, nodesFound); + assertEquals(3, tokensFound); + } + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
