This is an automated email from the ASF dual-hosted git repository. jmckenzie pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push: new 47cac5c49b Provide JMX endpoint to allow transient logging of blocking read repairs 47cac5c49b is described below commit 47cac5c49b93d205fa9b3a57ce55470887c5be45 Author: Josh McKenzie <jmcken...@apache.org> AuthorDate: Tue Mar 22 11:35:36 2022 -0400 Provide JMX endpoint to allow transient logging of blocking read repairs Patch by Josh McKenzie; reviewed by David Capwell for CASSANDRA-17471 Co-authored-by: Aleksey Yeschenko <alek...@apache.org> Co-aurhoted-by: Josh McKenzie <jmcken...@apache.org> --- CHANGES.txt | 1 + .../org/apache/cassandra/service/StorageProxy.java | 18 +++++++++++++++++- .../apache/cassandra/service/StorageProxyMBean.java | 3 +++ .../cassandra/service/reads/AbstractReadExecutor.java | 14 +++++++++++++- .../org/apache/cassandra/service/StorageProxyTest.java | 18 +++++++++++++++++- 5 files changed, 51 insertions(+), 3 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index 377e91b75c..196d2471e0 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 4.1 + * Provide JMX endpoint to allow transient logging of blocking read repairs (CASSANDRA-17471) * Add guardrail for GROUP BY queries (CASSANDRA-17509) * make pylib PEP and pylint compliant (CASSANDRA-17546) * Add support for vnodes in jvm-dtest (CASSANDRA-17332) diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java index 15e9e2d467..85c2699cb7 100644 --- a/src/java/org/apache/cassandra/service/StorageProxy.java +++ b/src/java/org/apache/cassandra/service/StorageProxy.java @@ -39,6 +39,7 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.function.Function; import java.util.stream.Collectors; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.cache.CacheLoader; import com.google.common.collect.Iterables; @@ -194,6 +195,8 @@ public class StorageProxy implements StorageProxyMBean private static final PartitionDenylist partitionDenylist = new PartitionDenylist(); + private volatile long logBlockingReadRepairAttemptsUntilNanos = Long.MIN_VALUE; + private StorageProxy() { } @@ -2048,9 +2051,10 @@ public class StorageProxy implements StorageProxyMBean // wait for enough responses to meet the consistency level. If there's a digest mismatch, begin the read // repair process by sending full data reads to all replicas we received responses from. + boolean logBlockingRepairAttempts = instance.isLoggingReadRepairs(); for (int i=0; i<cmdCount; i++) { - reads[i].awaitResponses(); + reads[i].awaitResponses(logBlockingRepairAttempts); } // read repair - if it looks like we may not receive enough full data responses to meet CL, send @@ -3019,6 +3023,18 @@ public class StorageProxy implements StorageProxyMBean return !partitionDenylist.isKeyPermitted(keyspace, table, bytes); } + @Override + public void logBlockingReadRepairAttemptsForNSeconds(int seconds) + { + logBlockingReadRepairAttemptsUntilNanos = nanoTime() + TimeUnit.SECONDS.toNanos(seconds); + } + + @Override + public boolean isLoggingReadRepairs() + { + return nanoTime() <= StorageProxy.instance.logBlockingReadRepairAttemptsUntilNanos; + } + @Override public void setPaxosVariant(String variant) { diff --git a/src/java/org/apache/cassandra/service/StorageProxyMBean.java b/src/java/org/apache/cassandra/service/StorageProxyMBean.java index 416a31284a..546143d515 100644 --- a/src/java/org/apache/cassandra/service/StorageProxyMBean.java +++ b/src/java/org/apache/cassandra/service/StorageProxyMBean.java @@ -91,6 +91,9 @@ public interface StorageProxyMBean public String getIdealConsistencyLevel(); public String setIdealConsistencyLevel(String cl); + public void logBlockingReadRepairAttemptsForNSeconds(int seconds); + public boolean isLoggingReadRepairs(); + /** * Tracking and reporting of variances in the repaired data set across replicas at read time */ diff --git a/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java b/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java index 5f73bc696d..b5a759c3dc 100644 --- a/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java +++ b/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java @@ -366,10 +366,15 @@ public abstract class AbstractReadExecutor this.result = DuplicateRowChecker.duringRead(result, this.replicaPlan.get().readCandidates().endpointList()); } + public void awaitResponses() throws ReadTimeoutException + { + awaitResponses(false); + } + /** * Wait for the CL to be satisfied by responses */ - public void awaitResponses() throws ReadTimeoutException + public void awaitResponses(boolean logBlockingReadRepairAttempt) throws ReadTimeoutException { try { @@ -397,6 +402,13 @@ public abstract class AbstractReadExecutor { Tracing.trace("Digest mismatch: Mismatch for key {}", getKey()); readRepair.startRepair(digestResolver, this::setResult); + if (logBlockingReadRepairAttempt) + { + logger.info("Blocking Read Repair triggered for query [{}] at CL.{} with endpoints {}", + command.toCQLString(), + replicaPlan().consistencyLevel(), + replicaPlan().contacts()); + } } } diff --git a/test/unit/org/apache/cassandra/service/StorageProxyTest.java b/test/unit/org/apache/cassandra/service/StorageProxyTest.java index 1338cd675f..41742f0c08 100644 --- a/test/unit/org/apache/cassandra/service/StorageProxyTest.java +++ b/test/unit/org/apache/cassandra/service/StorageProxyTest.java @@ -18,11 +18,13 @@ package org.apache.cassandra.service; +import java.net.UnknownHostException; import java.util.UUID; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; import com.google.common.util.concurrent.Uninterruptibles; +import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Test; import org.junit.runner.RunWith; @@ -101,7 +103,21 @@ public class StorageProxyTest }); } - private void shouldHintTest(Consumer<Replica> test) throws Exception + + /** + * Ensure that the timer backing the JMX endpoint to transiently enable blocking read repairs both enables + * and disables the way we'd expect. + */ + @Test + public void testTransientLoggingTimer() + { + StorageProxy.instance.logBlockingReadRepairAttemptsForNSeconds(2); + Assert.assertTrue(StorageProxy.instance.isLoggingReadRepairs()); + Uninterruptibles.sleepUninterruptibly(5, TimeUnit.SECONDS); + Assert.assertFalse(StorageProxy.instance.isLoggingReadRepairs()); + } + + private void shouldHintTest(Consumer<Replica> test) throws UnknownHostException { InetAddressAndPort testEp = InetAddressAndPort.getByName("192.168.1.1"); Replica replica = full(testEp); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org