This is an automated email from the ASF dual-hosted git repository.
ifesdjeen pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push:
new 59b45eb8bf Add max depth to blocked txn vtable
59b45eb8bf is described below
commit 59b45eb8bf753bc75fa37ab327329ef7e88301f2
Author: Alex Petrov <[email protected]>
AuthorDate: Wed Jul 30 15:31:59 2025 +0200
Add max depth to blocked txn vtable
Patch by Alex Petrov; reviewed by Benedict Elliott Smith for CASSANDRA-20839
---
.../cassandra/db/virtual/AbstractVirtualTable.java | 2 +-
.../cassandra/db/virtual/AccordDebugKeyspace.java | 48 ++++++++++++++++++----
.../db/virtual/AccordDebugKeyspaceTest.java | 4 ++
3 files changed, 46 insertions(+), 8 deletions(-)
diff --git a/src/java/org/apache/cassandra/db/virtual/AbstractVirtualTable.java
b/src/java/org/apache/cassandra/db/virtual/AbstractVirtualTable.java
index df2e4bc7cc..a32ea67ab6 100644
--- a/src/java/org/apache/cassandra/db/virtual/AbstractVirtualTable.java
+++ b/src/java/org/apache/cassandra/db/virtual/AbstractVirtualTable.java
@@ -76,7 +76,7 @@ public abstract class AbstractVirtualTable implements
VirtualTable
}
@Override
- public final UnfilteredPartitionIterator select(DecoratedKey partitionKey,
ClusteringIndexFilter clusteringIndexFilter, ColumnFilter columnFilter,
RowFilter rowFilter)
+ public UnfilteredPartitionIterator select(DecoratedKey partitionKey,
ClusteringIndexFilter clusteringIndexFilter, ColumnFilter columnFilter,
RowFilter rowFilter)
{
Partition partition = data(partitionKey).getPartition(partitionKey);
diff --git a/src/java/org/apache/cassandra/db/virtual/AccordDebugKeyspace.java
b/src/java/org/apache/cassandra/db/virtual/AccordDebugKeyspace.java
index efe683af71..f7c89754e3 100644
--- a/src/java/org/apache/cassandra/db/virtual/AccordDebugKeyspace.java
+++ b/src/java/org/apache/cassandra/db/virtual/AccordDebugKeyspace.java
@@ -41,6 +41,14 @@ import javax.annotation.Nullable;
import com.google.common.collect.BoundType;
import com.google.common.collect.Range;
import com.google.common.collect.Sets;
+import org.apache.cassandra.cql3.Operator;
+import org.apache.cassandra.db.EmptyIterators;
+import org.apache.cassandra.db.filter.ClusteringIndexFilter;
+import org.apache.cassandra.db.filter.ColumnFilter;
+import org.apache.cassandra.db.filter.RowFilter;
+import org.apache.cassandra.db.partitions.SingletonUnfilteredPartitionIterator;
+import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
+import org.apache.cassandra.db.rows.UnfilteredRowIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -138,6 +146,7 @@ import static java.lang.String.format;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
import static
org.apache.cassandra.cql3.statements.RequestValidations.invalidRequest;
import static org.apache.cassandra.schema.SchemaConstants.VIRTUAL_ACCORD_DEBUG;
+import static org.apache.cassandra.utils.Clock.Global.currentTimeMillis;
import static org.apache.cassandra.utils.MonotonicClock.Global.approxTime;
public class AccordDebugKeyspace extends VirtualKeyspace
@@ -1417,8 +1426,33 @@ public class AccordDebugKeyspace extends VirtualKeyspace
}
@Override
- public DataSet data(DecoratedKey partitionKey)
+ public UnfilteredPartitionIterator select(DecoratedKey partitionKey,
ClusteringIndexFilter clusteringIndexFilter, ColumnFilter columnFilter,
RowFilter rowFilter)
{
+ Partition partition = data(partitionKey,
rowFilter).getPartition(partitionKey);
+
+ if (null == partition)
+ return EmptyIterators.unfilteredPartition(metadata);
+
+ long now = currentTimeMillis();
+ UnfilteredRowIterator rowIterator =
partition.toRowIterator(metadata(), clusteringIndexFilter, columnFilter, now);
+ return new SingletonUnfilteredPartitionIterator(rowIterator);
+ }
+
+ public DataSet data(DecoratedKey partitionKey, RowFilter rowFilter)
+ {
+ int maxDepth = Integer.MAX_VALUE;
+ if (rowFilter != null && rowFilter.getExpressions().size() > 0)
+ {
+ Invariants.require(rowFilter.getExpressions().size() == 1,
"Only depth filter is supported");
+ RowFilter.Expression expression =
rowFilter.getExpressions().get(0);
+
Invariants.require(expression.column().name.toString().equals("depth"), "Only
depth filter is supported, but got: %s", expression.column().name);
+ Invariants.require(expression.operator() == Operator.LT ||
expression.operator() == Operator.LTE, "Only < and <= queries are supported");
+ if (expression.operator() == Operator.LT)
+ maxDepth = expression.getIndexValue().getInt(0);
+ else
+ maxDepth = expression.getIndexValue().getInt(0) + 1;
+ }
+
TxnId id =
TxnId.parse(UTF8Type.instance.compose(partitionKey.getKey()));
List<CommandStoreTxnBlockedGraph> shards =
AccordService.instance().debugTxnBlockedGraph(id);
@@ -1427,7 +1461,7 @@ public class AccordDebugKeyspace extends VirtualKeyspace
for (CommandStoreTxnBlockedGraph shard : shards)
{
Set<TxnId> processed = new HashSet<>();
- process(ds, commandStores, shard, processed, id, 0, id,
Reason.Self, null);
+ process(ds, commandStores, shard, processed, id, 0, maxDepth,
id, Reason.Self, null);
// everything was processed right?
if (!shard.txns.isEmpty() &&
!shard.txns.keySet().containsAll(processed))
Invariants.expect(false, "Skipped txns: " +
Sets.difference(shard.txns.keySet(), processed));
@@ -1436,7 +1470,7 @@ public class AccordDebugKeyspace extends VirtualKeyspace
return ds;
}
- private void process(SimpleDataSet ds, CommandStores commandStores,
CommandStoreTxnBlockedGraph shard, Set<TxnId> processed, TxnId userTxn, int
depth, TxnId txnId, Reason reason, Runnable onDone)
+ private void process(SimpleDataSet ds, CommandStores commandStores,
CommandStoreTxnBlockedGraph shard, Set<TxnId> processed, TxnId userTxn, int
depth, int maxDepth, TxnId txnId, Reason reason, Runnable onDone)
{
if (!processed.add(txnId))
throw new IllegalStateException("Double processed " + txnId);
@@ -1462,15 +1496,15 @@ public class AccordDebugKeyspace extends VirtualKeyspace
{
for (TxnId blockedBy : txn.blockedBy)
{
- if (!processed.contains(blockedBy))
- process(ds, commandStores, shard, processed, userTxn,
depth + 1, blockedBy, Reason.Txn, null);
+ if (!processed.contains(blockedBy) && depth < maxDepth)
+ process(ds, commandStores, shard, processed, userTxn,
depth + 1, maxDepth, blockedBy, Reason.Txn, null);
}
for (TokenKey blockedBy : txn.blockedByKey)
{
TxnId blocking = shard.keys.get(blockedBy);
- if (!processed.contains(blocking))
- process(ds, commandStores, shard, processed, userTxn,
depth + 1, blocking, Reason.Key, () -> ds.column("key", printToken(blockedBy)));
+ if (!processed.contains(blocking) && depth < maxDepth)
+ process(ds, commandStores, shard, processed, userTxn,
depth + 1, maxDepth, blocking, Reason.Key, () -> ds.column("key",
printToken(blockedBy)));
}
}
}
diff --git
a/test/unit/org/apache/cassandra/db/virtual/AccordDebugKeyspaceTest.java
b/test/unit/org/apache/cassandra/db/virtual/AccordDebugKeyspaceTest.java
index 1d500f497a..948087e2f9 100644
--- a/test/unit/org/apache/cassandra/db/virtual/AccordDebugKeyspaceTest.java
+++ b/test/unit/org/apache/cassandra/db/virtual/AccordDebugKeyspaceTest.java
@@ -305,6 +305,10 @@ public class AccordDebugKeyspaceTest extends CQLTester
assertRows(execute(QUERY_TXN_BLOCKED_BY, second.toString()),
row(second.toString(), KEYSPACE, tableName, anyInt(),
0, ByteBufferUtil.EMPTY_BYTE_BUFFER, "Self", anyNonNull(), null,
SaveStatus.Stable.name()),
row(second.toString(), KEYSPACE, tableName, anyInt(),
1, first.toString(), "Key", anyNonNull(), anyNonNull(),
SaveStatus.ReadyToExecute.name()));
+
+ assertRows(execute(QUERY_TXN_BLOCKED_BY + " AND depth < 1",
second.toString()),
+ row(second.toString(), KEYSPACE, tableName, anyInt(),
0, ByteBufferUtil.EMPTY_BYTE_BUFFER, "Self", anyNonNull(), null,
SaveStatus.Stable.name()));
+
}
finally
{
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]