Repository: cassandra
Updated Branches:
  refs/heads/trunk 7925f91dc -> 07b0aca6d


Remove post-13910 dead code

patch by Aleksey Yeschenko; reviewed by Benedict Elliott Smith for
CASSANDRA-14645


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/07b0aca6
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/07b0aca6
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/07b0aca6

Branch: refs/heads/trunk
Commit: 07b0aca6d3b423f9af405b2d64cd285c597023c4
Parents: 7925f91
Author: Aleksey Yeshchenko <alek...@apple.com>
Authored: Thu Aug 16 01:44:08 2018 +0100
Committer: Aleksey Yeshchenko <alek...@apple.com>
Committed: Thu Aug 16 12:28:13 2018 +0100

----------------------------------------------------------------------
 .../apache/cassandra/service/StorageProxy.java  |  2 +-
 .../service/reads/AbstractReadExecutor.java     | 47 ++++++++++----------
 .../cassandra/service/reads/ReadCallback.java   | 11 ++---
 .../reads/ShortReadPartitionsProtection.java    |  2 +-
 .../reads/repair/BlockingReadRepair.java        |  2 +-
 5 files changed, 30 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/07b0aca6/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java 
b/src/java/org/apache/cassandra/service/StorageProxy.java
index 7e9b0f9..81e6dae 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -2171,7 +2171,7 @@ public class StorageProxy implements StorageProxyMBean
             int blockFor = consistency.blockFor(keyspace);
             int minResponses = Math.min(toQuery.filteredEndpoints.size(), 
blockFor);
             List<InetAddressAndPort> minimalEndpoints = 
toQuery.filteredEndpoints.subList(0, minResponses);
-            ReadCallback handler = new ReadCallback(resolver, consistency, 
rangeCommand, minimalEndpoints, queryStartNanoTime, readRepair);
+            ReadCallback handler = new ReadCallback(resolver, consistency, 
rangeCommand, minimalEndpoints, queryStartNanoTime);
 
             handler.assureSufficientLiveNodes();
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/07b0aca6/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java 
b/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java
index 6c8a45a..e531e0c 100644
--- a/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java
+++ b/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java
@@ -76,7 +76,7 @@ public abstract class AbstractReadExecutor
         this.targetReplicas = targetReplicas;
         this.readRepair = ReadRepair.create(command, targetReplicas, 
queryStartNanoTime, consistency);
         this.digestResolver = new DigestResolver(keyspace, command, 
consistency, readRepair, targetReplicas.size());
-        this.handler = new ReadCallback(digestResolver, consistency, command, 
targetReplicas, queryStartNanoTime, readRepair);
+        this.handler = new ReadCallback(digestResolver, consistency, command, 
targetReplicas, queryStartNanoTime);
         this.cfs = cfs;
         this.traceState = Tracing.instance.get();
         this.queryStartNanoTime = queryStartNanoTime;
@@ -165,11 +165,12 @@ public abstract class AbstractReadExecutor
     public static AbstractReadExecutor 
getReadExecutor(SinglePartitionReadCommand command, ConsistencyLevel 
consistencyLevel, long queryStartNanoTime) throws UnavailableException
     {
         Keyspace keyspace = Keyspace.open(command.metadata().keyspace);
-        List<InetAddressAndPort> allReplicas = 
StorageProxy.getLiveSortedEndpoints(keyspace, command.partitionKey());
-        List<InetAddressAndPort> targetReplicas = 
consistencyLevel.filterForQuery(keyspace, allReplicas);
+
+        List<InetAddressAndPort> allLiveReplicas = 
StorageProxy.getLiveSortedEndpoints(keyspace, command.partitionKey());
+        List<InetAddressAndPort> selectedReplicas = 
consistencyLevel.filterForQuery(keyspace, allLiveReplicas);
 
         // Throw UAE early if we don't have enough replicas.
-        consistencyLevel.assureSufficientLiveNodes(keyspace, targetReplicas);
+        consistencyLevel.assureSufficientLiveNodes(keyspace, selectedReplicas);
 
         ColumnFamilyStore cfs = 
keyspace.getColumnFamilyStore(command.metadata().id);
         SpeculativeRetryPolicy retry = cfs.metadata().params.speculativeRetry;
@@ -177,27 +178,21 @@ public abstract class AbstractReadExecutor
         // Speculative retry is disabled *OR*
         // 11980: Disable speculative retry if using EACH_QUORUM in order to 
prevent miscounting DC responses
         if (retry.equals(NeverSpeculativeRetryPolicy.INSTANCE) || 
consistencyLevel == ConsistencyLevel.EACH_QUORUM)
-            return new NeverSpeculatingReadExecutor(keyspace, cfs, command, 
consistencyLevel, targetReplicas, queryStartNanoTime, false);
+            return new NeverSpeculatingReadExecutor(keyspace, cfs, command, 
consistencyLevel, selectedReplicas, queryStartNanoTime, false);
 
         // There are simply no extra replicas to speculate.
-        // Handle this separately so it can log failed attempts to speculate 
due to lack of replicas
-        if (consistencyLevel.blockFor(keyspace) == allReplicas.size())
-            return new NeverSpeculatingReadExecutor(keyspace, cfs, command, 
consistencyLevel, targetReplicas, queryStartNanoTime, true);
-
-        if (targetReplicas.size() == allReplicas.size())
+        // Handle this separately so it can record failed attempts to 
speculate due to lack of replicas
+        if (selectedReplicas.size() == allLiveReplicas.size())
         {
-            // CL.ALL
-            // We are going to contact every node anyway, so ask for 2 full 
data requests instead of 1, for redundancy
-            // (same amount of requests in total, but we turn 1 digest request 
into a full blown data request).
-            return new AlwaysSpeculatingReadExecutor(keyspace, cfs, command, 
consistencyLevel, targetReplicas, queryStartNanoTime);
+            boolean recordFailedSpeculation = consistencyLevel != 
ConsistencyLevel.ALL;
+            return new NeverSpeculatingReadExecutor(keyspace, cfs, command, 
consistencyLevel, selectedReplicas, queryStartNanoTime, 
recordFailedSpeculation);
         }
 
-        targetReplicas.add(allReplicas.get(targetReplicas.size()));
-
+        selectedReplicas.add(allLiveReplicas.get(selectedReplicas.size()));
         if (retry.equals(AlwaysSpeculativeRetryPolicy.INSTANCE))
-            return new AlwaysSpeculatingReadExecutor(keyspace, cfs, command, 
consistencyLevel, targetReplicas, queryStartNanoTime);
+            return new AlwaysSpeculatingReadExecutor(keyspace, cfs, command, 
consistencyLevel, selectedReplicas, queryStartNanoTime);
         else // PERCENTILE or CUSTOM.
-            return new SpeculatingReadExecutor(keyspace, cfs, command, 
consistencyLevel, targetReplicas, queryStartNanoTime);
+            return new SpeculatingReadExecutor(keyspace, cfs, command, 
consistencyLevel, selectedReplicas, queryStartNanoTime);
     }
 
     /**
@@ -222,12 +217,18 @@ public abstract class AbstractReadExecutor
          * log it is as a failure if it should have happened
          * but couldn't due to lack of replicas
          */
-        private final boolean logFailedSpeculation;
-
-        public NeverSpeculatingReadExecutor(Keyspace keyspace, 
ColumnFamilyStore cfs, ReadCommand command, ConsistencyLevel consistencyLevel, 
List<InetAddressAndPort> targetReplicas, long queryStartNanoTime, boolean 
logFailedSpeculation)
+        private final boolean recordFailedSpeculation;
+
+        NeverSpeculatingReadExecutor(Keyspace keyspace,
+                                     ColumnFamilyStore cfs,
+                                     ReadCommand command,
+                                     ConsistencyLevel consistencyLevel,
+                                     List<InetAddressAndPort> targetReplicas,
+                                     long queryStartNanoTime,
+                                     boolean recordFailedSpeculation)
         {
             super(keyspace, cfs, command, consistencyLevel, targetReplicas, 
queryStartNanoTime);
-            this.logFailedSpeculation = logFailedSpeculation;
+            this.recordFailedSpeculation = recordFailedSpeculation;
         }
 
         public void executeAsync()
@@ -239,7 +240,7 @@ public abstract class AbstractReadExecutor
 
         public void maybeTryAdditionalReplicas()
         {
-            if (shouldSpeculateAndMaybeWait() && logFailedSpeculation)
+            if (shouldSpeculateAndMaybeWait() && recordFailedSpeculation)
             {
                 cfs.metric.speculativeInsufficientReplicas.inc();
             }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/07b0aca6/src/java/org/apache/cassandra/service/reads/ReadCallback.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/reads/ReadCallback.java 
b/src/java/org/apache/cassandra/service/reads/ReadCallback.java
index a35fc2e..537e684 100644
--- a/src/java/org/apache/cassandra/service/reads/ReadCallback.java
+++ b/src/java/org/apache/cassandra/service/reads/ReadCallback.java
@@ -35,12 +35,10 @@ import org.apache.cassandra.exceptions.ReadFailureException;
 import org.apache.cassandra.exceptions.ReadTimeoutException;
 import org.apache.cassandra.exceptions.UnavailableException;
 import org.apache.cassandra.locator.InetAddressAndPort;
-import org.apache.cassandra.metrics.ReadRepairMetrics;
 import org.apache.cassandra.net.IAsyncCallbackWithFailure;
 import org.apache.cassandra.net.MessageIn;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.db.ConsistencyLevel;
-import org.apache.cassandra.service.reads.repair.ReadRepair;
 import org.apache.cassandra.tracing.Tracing;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.concurrent.SimpleCondition;
@@ -66,12 +64,10 @@ public class ReadCallback implements 
IAsyncCallbackWithFailure<ReadResponse>
 
     private final Keyspace keyspace; // TODO push this into ConsistencyLevel?
 
-    private final ReadRepair readRepair;
-
     /**
      * Constructor when response count has to be calculated and blocked for.
      */
-    public ReadCallback(ResponseResolver resolver, ConsistencyLevel 
consistencyLevel, ReadCommand command, List<InetAddressAndPort> 
filteredEndpoints, long queryStartNanoTime, ReadRepair readRepair)
+    public ReadCallback(ResponseResolver resolver, ConsistencyLevel 
consistencyLevel, ReadCommand command, List<InetAddressAndPort> 
filteredEndpoints, long queryStartNanoTime)
     {
         this(resolver,
              consistencyLevel,
@@ -79,10 +75,10 @@ public class ReadCallback implements 
IAsyncCallbackWithFailure<ReadResponse>
              command,
              Keyspace.open(command.metadata().keyspace),
              filteredEndpoints,
-             queryStartNanoTime, readRepair);
+             queryStartNanoTime);
     }
 
-    public ReadCallback(ResponseResolver resolver, ConsistencyLevel 
consistencyLevel, int blockfor, ReadCommand command, Keyspace keyspace, 
List<InetAddressAndPort> endpoints, long queryStartNanoTime, ReadRepair 
readRepair)
+    public ReadCallback(ResponseResolver resolver, ConsistencyLevel 
consistencyLevel, int blockfor, ReadCommand command, Keyspace keyspace, 
List<InetAddressAndPort> endpoints, long queryStartNanoTime)
     {
         this.command = command;
         this.keyspace = keyspace;
@@ -91,7 +87,6 @@ public class ReadCallback implements 
IAsyncCallbackWithFailure<ReadResponse>
         this.resolver = resolver;
         this.queryStartNanoTime = queryStartNanoTime;
         this.endpoints = endpoints;
-        this.readRepair = readRepair;
         this.failureReasonByEndpoint = new ConcurrentHashMap<>();
         // we don't support read repair (or rapid read protection) for range 
scans yet (CASSANDRA-6897)
         assert !(command instanceof PartitionRangeReadCommand) || blockfor >= 
endpoints.size();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/07b0aca6/src/java/org/apache/cassandra/service/reads/ShortReadPartitionsProtection.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/service/reads/ShortReadPartitionsProtection.java
 
b/src/java/org/apache/cassandra/service/reads/ShortReadPartitionsProtection.java
index fb5f48e..d4e8957 100644
--- 
a/src/java/org/apache/cassandra/service/reads/ShortReadPartitionsProtection.java
+++ 
b/src/java/org/apache/cassandra/service/reads/ShortReadPartitionsProtection.java
@@ -172,7 +172,7 @@ public class ShortReadPartitionsProtection extends 
Transformation<UnfilteredRowI
     {
         Keyspace keyspace = Keyspace.open(command.metadata().keyspace);
         DataResolver resolver = new DataResolver(keyspace, cmd, 
ConsistencyLevel.ONE, 1, queryStartNanoTime, NoopReadRepair.instance);
-        ReadCallback handler = new ReadCallback(resolver, 
ConsistencyLevel.ONE, cmd, Collections.singletonList(source), 
queryStartNanoTime, NoopReadRepair.instance);
+        ReadCallback handler = new ReadCallback(resolver, 
ConsistencyLevel.ONE, cmd, Collections.singletonList(source), 
queryStartNanoTime);
 
         if (StorageProxy.canDoLocalRequest(source))
             StageManager.getStage(Stage.READ).maybeExecuteImmediately(new 
StorageProxy.LocalReadRunnable(cmd, handler));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/07b0aca6/src/java/org/apache/cassandra/service/reads/repair/BlockingReadRepair.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/service/reads/repair/BlockingReadRepair.java 
b/src/java/org/apache/cassandra/service/reads/repair/BlockingReadRepair.java
index f207b7d..cd4d2a7 100644
--- a/src/java/org/apache/cassandra/service/reads/repair/BlockingReadRepair.java
+++ b/src/java/org/apache/cassandra/service/reads/repair/BlockingReadRepair.java
@@ -228,7 +228,7 @@ public class BlockingReadRepair implements ReadRepair, 
RepairListener
         Keyspace keyspace = Keyspace.open(command.metadata().keyspace);
         DataResolver resolver = new DataResolver(keyspace, command, 
ConsistencyLevel.ALL, allEndpoints.size(), queryStartNanoTime, this);
         ReadCallback readCallback = new ReadCallback(resolver, 
ConsistencyLevel.ALL, contactedEndpoints.size(), command,
-                                                     keyspace, allEndpoints, 
queryStartNanoTime, this);
+                                                     keyspace, allEndpoints, 
queryStartNanoTime);
 
         digestRepair = new DigestRepair(resolver, readCallback, 
resultConsumer);
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to