This is an automated email from the ASF dual-hosted git repository.

bdeggleston pushed a commit to branch cassandra-3.0
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/cassandra-3.0 by this push:
     new b30c8c9  Improve merkle tree size and time on heap
b30c8c9 is described below

commit b30c8c98a594a5682f6ea1f0b5511463b700b6e8
Author: Joseph Lynch <joe.e.ly...@gmail.com>
AuthorDate: Tue Dec 18 14:52:33 2018 -0800

    Improve merkle tree size and time on heap
    
    Patch by Joseph Lynch; Reviewed by Blake Eggleston for CASSANDRA-14096
---
 CHANGES.txt                                        |   1 +
 NEWS.txt                                           |   4 +-
 conf/cassandra.yaml                                |  13 +
 src/java/org/apache/cassandra/config/Config.java   |   3 +
 .../cassandra/config/DatabaseDescriptor.java       |  21 ++
 .../cassandra/db/compaction/CompactionManager.java |   8 +-
 .../org/apache/cassandra/repair/LocalSyncTask.java |   8 +-
 .../apache/cassandra/repair/RemoteSyncTask.java    |   8 +-
 .../org/apache/cassandra/repair/RepairJob.java     |  67 +++--
 .../org/apache/cassandra/repair/RepairSession.java |  20 +-
 src/java/org/apache/cassandra/repair/SyncTask.java |  34 +--
 .../apache/cassandra/service/StorageService.java   |  10 +
 .../cassandra/service/StorageServiceMBean.java     |   4 +
 .../cassandra/config/DatabaseDescriptorTest.java   |  37 +++
 .../apache/cassandra/repair/LocalSyncTaskTest.java |   8 +-
 .../org/apache/cassandra/repair/RepairJobTest.java | 325 +++++++++++++++++++++
 16 files changed, 509 insertions(+), 62 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 5e8845f..b755751 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.0.19
+ * Improve merkle tree size and time on heap (CASSANDRA-14096)
  * Add missing commands to nodetool-completion (CASSANDRA-14916)
  * Anti-compaction temporarily corrupts sstable state for readers 
(CASSANDRA-15004)
  Merged from 2.2:
diff --git a/NEWS.txt b/NEWS.txt
index 54dc63a..704fde1 100644
--- a/NEWS.txt
+++ b/NEWS.txt
@@ -47,8 +47,8 @@ using the provided 'sstableupgrade' tool.
 
 Upgrading
 ---------
-    - Nothing specific to this release, but please see previous upgrading 
sections,
-      especially if you are upgrading from 2.2.
+       - repair_session_max_tree_depth setting has been added to 
cassandra.yaml to allow operators to reduce
+         merkle tree size if repair is creating too much heap pressure. See 
CASSANDRA-14096 for details.
 
 3.0.18
 ======
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index 84664fe..c321a72 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -405,6 +405,19 @@ concurrent_materialized_view_writes: 32
 # https://issues.apache.org/jira/browse/CASSANDRA-11039
 memtable_allocation_type: heap_buffers
 
+# Limits the maximum Merkle tree depth to avoid consuming too much
+# memory during repairs.
+#
+# The default setting of 18 generates trees of maximum size around
+# 50 MiB / tree. If you are running out of memory during repairs consider
+# lowering this to 15 (~6 MiB / tree) or lower, but try not to lower it
+# too much past that or you will lose too much resolution and stream
+# too much redundant data during repair. Cannot be set lower than 10.
+#
+# For more details see https://issues.apache.org/jira/browse/CASSANDRA-14096.
+#
+# repair_session_max_tree_depth: 18
+
 # Total space to use for commit logs on disk.
 #
 # If space gets above this value, Cassandra will flush every dirty CF
diff --git a/src/java/org/apache/cassandra/config/Config.java 
b/src/java/org/apache/cassandra/config/Config.java
index c882e40..de158bd 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -116,6 +116,9 @@ public class Config
     public Integer memtable_offheap_space_in_mb;
     public Float memtable_cleanup_threshold = null;
 
+    // Limit the maximum depth of repair session merkle trees
+    public volatile Integer repair_session_max_tree_depth = 18;
+
     public Integer storage_port = 7000;
     public Integer ssl_storage_port = 7001;
     public String listen_address;
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java 
b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 3145df6..8f4b338 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -437,6 +437,11 @@ public class DatabaseDescriptor
         else
             logger.info("Global memtable off-heap threshold is enabled at 
{}MB", conf.memtable_offheap_space_in_mb);
 
+        if (conf.repair_session_max_tree_depth < 10)
+            throw new ConfigurationException("repair_session_max_tree_depth 
should not be < 10, but was " + conf.repair_session_max_tree_depth);
+        if (conf.repair_session_max_tree_depth > 20)
+            logger.warn("repair_session_max_tree_depth of " + 
conf.repair_session_max_tree_depth + " > 20 could lead to excessive memory 
usage");
+
         applyAddressConfig(config);
 
         if (conf.thrift_framed_transport_size_in_mb <= 0)
@@ -1949,6 +1954,22 @@ public class DatabaseDescriptor
         }
     }
 
+    public static int getRepairSessionMaxTreeDepth()
+    {
+        return conf.repair_session_max_tree_depth;
+    }
+
+    public static void setRepairSessionMaxTreeDepth(int depth)
+    {
+        if (depth < 10)
+            throw new ConfigurationException("Cannot set 
repair_session_max_tree_depth to " + depth +
+                                             " which is < 10, doing nothing");
+        else if (depth > 20)
+            logger.warn("repair_session_max_tree_depth of " + depth + " > 20 
could lead to excessive memory usage");
+
+        conf.repair_session_max_tree_depth = depth;
+    }
+
     public static boolean getOutboundBindAny()
     {
         return Config.outboundBindAny || conf.listen_on_broadcast_address;
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java 
b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 4295c7a..54233f2 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -1210,8 +1210,12 @@ public class CompactionManager implements 
CompactionManagerMBean
             long numPartitions = rangePartitionCounts.get(range);
             double rangeOwningRatio = allPartitions > 0 ? 
(double)numPartitions / allPartitions : 0;
             // determine max tree depth proportional to range size to avoid 
blowing up memory with multiple tress,
-            // capping at 20 to prevent large tree (CASSANDRA-11390)
-            int maxDepth = rangeOwningRatio > 0 ? (int) Math.floor(20 - 
Math.log(1 / rangeOwningRatio) / Math.log(2)) : 0;
+            // capping at a configurable depth (default 18) to prevent large 
tree (CASSANDRA-11390, CASSANDRA-14096)
+            int maxDepth = rangeOwningRatio > 0
+                           ? (int) Math.floor(Math.max(0.0, 
DatabaseDescriptor.getRepairSessionMaxTreeDepth() -
+                                                            Math.log(1 / 
rangeOwningRatio) / Math.log(2)))
+                           : 0;
+
             // determine tree depth from number of partitions, capping at max 
tree depth (CASSANDRA-5263)
             int depth = numPartitions > 0 ? (int) 
Math.min(Math.ceil(Math.log(numPartitions) / Math.log(2)), maxDepth) : 0;
             tree.addMerkleTree((int) Math.pow(2, depth), range);
diff --git a/src/java/org/apache/cassandra/repair/LocalSyncTask.java 
b/src/java/org/apache/cassandra/repair/LocalSyncTask.java
index daace01..5d43868 100644
--- a/src/java/org/apache/cassandra/repair/LocalSyncTask.java
+++ b/src/java/org/apache/cassandra/repair/LocalSyncTask.java
@@ -47,9 +47,9 @@ public class LocalSyncTask extends SyncTask implements 
StreamEventHandler
 
     private final long repairedAt;
 
-    public LocalSyncTask(RepairJobDesc desc, TreeResponse r1, TreeResponse r2, 
long repairedAt)
+    public LocalSyncTask(RepairJobDesc desc, InetAddress firstEndpoint, 
InetAddress secondEndpoint, List<Range<Token>> rangesToSync, long repairedAt)
     {
-        super(desc, r1, r2);
+        super(desc, firstEndpoint, secondEndpoint, rangesToSync);
         this.repairedAt = repairedAt;
     }
 
@@ -61,7 +61,7 @@ public class LocalSyncTask extends SyncTask implements 
StreamEventHandler
     {
         InetAddress local = FBUtilities.getBroadcastAddress();
         // We can take anyone of the node as source or destination, however if 
one is localhost, we put at source to avoid a forwarding
-        InetAddress dst = r2.endpoint.equals(local) ? r1.endpoint : 
r2.endpoint;
+        InetAddress dst = secondEndpoint.equals(local) ? firstEndpoint : 
secondEndpoint;
         InetAddress preferred = SystemKeyspace.getPreferredIP(dst);
 
         String message = String.format("Performing streaming repair of %d 
ranges with %s", differences.size(), dst);
@@ -110,7 +110,7 @@ public class LocalSyncTask extends SyncTask implements 
StreamEventHandler
 
     public void onSuccess(StreamState result)
     {
-        String message = String.format("Sync complete using session %s between 
%s and %s on %s", desc.sessionId, r1.endpoint, r2.endpoint, desc.columnFamily);
+        String message = String.format("Sync complete using session %s between 
%s and %s on %s", desc.sessionId, firstEndpoint, secondEndpoint, 
desc.columnFamily);
         logger.info("[repair #{}] {}", desc.sessionId, message);
         Tracing.traceRepair(message);
         set(stat);
diff --git a/src/java/org/apache/cassandra/repair/RemoteSyncTask.java 
b/src/java/org/apache/cassandra/repair/RemoteSyncTask.java
index ededc40..5af815a 100644
--- a/src/java/org/apache/cassandra/repair/RemoteSyncTask.java
+++ b/src/java/org/apache/cassandra/repair/RemoteSyncTask.java
@@ -41,15 +41,15 @@ public class RemoteSyncTask extends SyncTask
 {
     private static final Logger logger = 
LoggerFactory.getLogger(RemoteSyncTask.class);
 
-    public RemoteSyncTask(RepairJobDesc desc, TreeResponse r1, TreeResponse r2)
+    public RemoteSyncTask(RepairJobDesc desc, InetAddress firstEndpoint, 
InetAddress secondEndpoint, List<Range<Token>> rangesToSync)
     {
-        super(desc, r1, r2);
+        super(desc, firstEndpoint, secondEndpoint, rangesToSync);
     }
 
     protected void startSync(List<Range<Token>> differences)
     {
         InetAddress local = FBUtilities.getBroadcastAddress();
-        SyncRequest request = new SyncRequest(desc, local, r1.endpoint, 
r2.endpoint, differences);
+        SyncRequest request = new SyncRequest(desc, local, firstEndpoint, 
secondEndpoint, differences);
         String message = String.format("Forwarding streaming repair of %d 
ranges to %s (to be streamed with %s)", request.ranges.size(), request.src, 
request.dst);
         logger.info("[repair #{}] {}", desc.sessionId, message);
         Tracing.traceRepair(message);
@@ -64,7 +64,7 @@ public class RemoteSyncTask extends SyncTask
         }
         else
         {
-            setException(new RepairException(desc, String.format("Sync failed 
between %s and %s", r1.endpoint, r2.endpoint)));
+            setException(new RepairException(desc, String.format("Sync failed 
between %s and %s", firstEndpoint, secondEndpoint)));
         }
     }
 }
diff --git a/src/java/org/apache/cassandra/repair/RepairJob.java 
b/src/java/org/apache/cassandra/repair/RepairJob.java
index cba176c..5443bf8 100644
--- a/src/java/org/apache/cassandra/repair/RepairJob.java
+++ b/src/java/org/apache/cassandra/repair/RepairJob.java
@@ -20,14 +20,18 @@ package org.apache.cassandra.repair;
 import java.net.InetAddress;
 import java.util.*;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.util.concurrent.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.tracing.Tracing;
 import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.MerkleTrees;
 import org.apache.cassandra.utils.Pair;
 
 /**
@@ -103,35 +107,9 @@ public class RepairJob extends 
AbstractFuture<RepairResult> implements Runnable
         // When all validations complete, submit sync tasks
         ListenableFuture<List<SyncStat>> syncResults = 
Futures.transform(validations, new AsyncFunction<List<TreeResponse>, 
List<SyncStat>>()
         {
-            public ListenableFuture<List<SyncStat>> apply(List<TreeResponse> 
trees) throws Exception
+            public ListenableFuture<List<SyncStat>> apply(List<TreeResponse> 
trees)
             {
-                InetAddress local = FBUtilities.getLocalAddress();
-
-                List<SyncTask> syncTasks = new ArrayList<>();
-                // We need to difference all trees one against another
-                for (int i = 0; i < trees.size() - 1; ++i)
-                {
-                    TreeResponse r1 = trees.get(i);
-                    for (int j = i + 1; j < trees.size(); ++j)
-                    {
-                        TreeResponse r2 = trees.get(j);
-                        SyncTask task;
-                        if (r1.endpoint.equals(local) || 
r2.endpoint.equals(local))
-                        {
-                            task = new LocalSyncTask(desc, r1, r2, repairedAt);
-                        }
-                        else
-                        {
-                            task = new RemoteSyncTask(desc, r1, r2);
-                            // RemoteSyncTask expects SyncComplete message 
sent back.
-                            // Register task to RepairSession to receive 
response.
-                            session.waitForSync(Pair.create(desc, new 
NodePair(r1.endpoint, r2.endpoint)), (RemoteSyncTask) task);
-                        }
-                        syncTasks.add(task);
-                        taskExecutor.submit(task);
-                    }
-                }
-                return Futures.allAsList(syncTasks);
+                return Futures.allAsList(createSyncTasks(trees, 
FBUtilities.getLocalAddress()));
             }
         }, taskExecutor);
 
@@ -160,6 +138,39 @@ public class RepairJob extends 
AbstractFuture<RepairResult> implements Runnable
         Futures.getUnchecked(validations);
     }
 
+    @VisibleForTesting
+    List<SyncTask> createSyncTasks(List<TreeResponse> trees, InetAddress local)
+    {
+        List<SyncTask> syncTasks = new ArrayList<>();
+        // We need to difference all trees one against another
+        for (int i = 0; i < trees.size() - 1; ++i)
+        {
+            TreeResponse r1 = trees.get(i);
+            for (int j = i + 1; j < trees.size(); ++j)
+            {
+                TreeResponse r2 = trees.get(j);
+                SyncTask task;
+
+                List<Range<Token>> differences = 
MerkleTrees.difference(r1.trees, r2.trees);
+
+                if (r1.endpoint.equals(local) || r2.endpoint.equals(local))
+                {
+                    task = new LocalSyncTask(desc, r1.endpoint, r2.endpoint, 
differences, repairedAt);
+                }
+                else
+                {
+                    task = new RemoteSyncTask(desc, r1.endpoint, r2.endpoint, 
differences);
+                    // RemoteSyncTask expects SyncComplete message sent back.
+                    // Register task to RepairSession to receive response.
+                    session.waitForSync(Pair.create(desc, new 
NodePair(r1.endpoint, r2.endpoint)), (RemoteSyncTask) task);
+                }
+                syncTasks.add(task);
+                taskExecutor.submit(task);
+            }
+        }
+        return syncTasks;
+    }
+
     /**
      * Creates {@link ValidationTask} and submit them to task executor in 
parallel.
      *
diff --git a/src/java/org/apache/cassandra/repair/RepairSession.java 
b/src/java/org/apache/cassandra/repair/RepairSession.java
index 5fe306d..ac8e0a9 100644
--- a/src/java/org/apache/cassandra/repair/RepairSession.java
+++ b/src/java/org/apache/cassandra/repair/RepairSession.java
@@ -24,6 +24,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.*;
 import org.slf4j.Logger;
@@ -53,8 +54,8 @@ import org.apache.cassandra.utils.Pair;
  *      validationComplete()).
  *   </li>
  *   <li>Synchronization phase: once all trees are received, the job compares 
each tree with
- *      all the other using a so-called {@link SyncTask}. If there is 
difference between 2 trees, the
- *      concerned SyncTask will start a streaming of the difference between 
the 2 endpoint concerned.
+ *      all the others and creates a {@link SyncTask} for each diverging 
replica. If there are differences
+ *      between 2 trees, the concerned SyncTask stream the differences between 
the 2 endpoints concerned.
  *   </li>
  * </ol>
  * The job is done once all its SyncTasks are done (i.e. have either computed 
no differences
@@ -99,7 +100,7 @@ public class RepairSession extends 
AbstractFuture<RepairSessionResult> implement
     private final ConcurrentMap<Pair<RepairJobDesc, NodePair>, RemoteSyncTask> 
syncingTasks = new ConcurrentHashMap<>();
 
     // Tasks(snapshot, validate request, differencing, ...) are run on 
taskExecutor
-    public final ListeningExecutorService taskExecutor = 
MoreExecutors.listeningDecorator(DebuggableThreadPoolExecutor.createCachedThreadpoolWithMaxSize("RepairJobTask"));
+    public final ListeningExecutorService taskExecutor;
 
     private volatile boolean terminated = false;
 
@@ -134,6 +135,13 @@ public class RepairSession extends 
AbstractFuture<RepairSessionResult> implement
         this.ranges = ranges;
         this.endpoints = endpoints;
         this.repairedAt = repairedAt;
+        this.taskExecutor = MoreExecutors.listeningDecorator(createExecutor());
+    }
+
+    @VisibleForTesting
+    protected DebuggableThreadPoolExecutor createExecutor()
+    {
+        return 
DebuggableThreadPoolExecutor.createCachedThreadpoolWithMaxSize("RepairJobTask");
     }
 
     public UUID getId()
@@ -198,6 +206,12 @@ public class RepairSession extends 
AbstractFuture<RepairSessionResult> implement
         task.syncComplete(success);
     }
 
+    @VisibleForTesting
+    Map<Pair<RepairJobDesc, NodePair>, RemoteSyncTask> getSyncingTasks()
+    {
+        return Collections.unmodifiableMap(syncingTasks);
+    }
+
     private String repairedNodes()
     {
         StringBuilder sb = new StringBuilder();
diff --git a/src/java/org/apache/cassandra/repair/SyncTask.java 
b/src/java/org/apache/cassandra/repair/SyncTask.java
index 8adec6f..c96caf4 100644
--- a/src/java/org/apache/cassandra/repair/SyncTask.java
+++ b/src/java/org/apache/cassandra/repair/SyncTask.java
@@ -17,6 +17,7 @@
  */
 package org.apache.cassandra.repair;
 
+import java.net.InetAddress;
 import java.util.List;
 
 import com.google.common.util.concurrent.AbstractFuture;
@@ -26,10 +27,9 @@ import org.slf4j.LoggerFactory;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.tracing.Tracing;
-import org.apache.cassandra.utils.MerkleTrees;
 
 /**
- * SyncTask will calculate the difference of MerkleTree between two nodes
+ * SyncTask takes the difference of MerkleTrees between two nodes
  * and perform necessary operation to repair replica.
  */
 public abstract class SyncTask extends AbstractFuture<SyncStat> implements 
Runnable
@@ -37,16 +37,19 @@ public abstract class SyncTask extends 
AbstractFuture<SyncStat> implements Runna
     private static Logger logger = LoggerFactory.getLogger(SyncTask.class);
 
     protected final RepairJobDesc desc;
-    protected final TreeResponse r1;
-    protected final TreeResponse r2;
+    protected final InetAddress firstEndpoint;
+    protected final InetAddress secondEndpoint;
+
+    private final List<Range<Token>> rangesToSync;
 
     protected volatile SyncStat stat;
 
-    public SyncTask(RepairJobDesc desc, TreeResponse r1, TreeResponse r2)
+    public SyncTask(RepairJobDesc desc, InetAddress firstEndpoint, InetAddress 
secondEndpoint, List<Range<Token>> rangesToSync)
     {
         this.desc = desc;
-        this.r1 = r1;
-        this.r2 = r2;
+        this.firstEndpoint = firstEndpoint;
+        this.secondEndpoint = secondEndpoint;
+        this.rangesToSync = rangesToSync;
     }
 
     /**
@@ -54,25 +57,22 @@ public abstract class SyncTask extends 
AbstractFuture<SyncStat> implements Runna
      */
     public void run()
     {
-        // compare trees, and collect differences
-        List<Range<Token>> differences = MerkleTrees.difference(r1.trees, 
r2.trees);
-
-        stat = new SyncStat(new NodePair(r1.endpoint, r2.endpoint), 
differences.size());
+        stat = new SyncStat(new NodePair(firstEndpoint, secondEndpoint), 
rangesToSync.size());
 
         // choose a repair method based on the significance of the difference
-        String format = String.format("[repair #%s] Endpoints %s and %s %%s 
for %s", desc.sessionId, r1.endpoint, r2.endpoint, desc.columnFamily);
-        if (differences.isEmpty())
+        String format = String.format("[repair #%s] Endpoints %s and %s %%s 
for %s", desc.sessionId, firstEndpoint, secondEndpoint, desc.columnFamily);
+        if (rangesToSync.isEmpty())
         {
             logger.info(String.format(format, "are consistent"));
-            Tracing.traceRepair("Endpoint {} is consistent with {} for {}", 
r1.endpoint, r2.endpoint, desc.columnFamily);
+            Tracing.traceRepair("Endpoint {} is consistent with {} for {}", 
firstEndpoint, secondEndpoint, desc.columnFamily);
             set(stat);
             return;
         }
 
         // non-0 difference: perform streaming repair
-        logger.info(String.format(format, "have " + differences.size() + " 
range(s) out of sync"));
-        Tracing.traceRepair("Endpoint {} has {} range(s) out of sync with {} 
for {}", r1.endpoint, differences.size(), r2.endpoint, desc.columnFamily);
-        startSync(differences);
+        logger.info(String.format(format, "have " + rangesToSync.size() + " 
range(s) out of sync"));
+        Tracing.traceRepair("Endpoint {} has {} range(s) out of sync with {} 
for {}", firstEndpoint, rangesToSync.size(), secondEndpoint, desc.columnFamily);
+        startSync(rangesToSync);
     }
 
     public SyncStat getCurrentStat()
diff --git a/src/java/org/apache/cassandra/service/StorageService.java 
b/src/java/org/apache/cassandra/service/StorageService.java
index 1769970..b07401a 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -3348,6 +3348,16 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
         ActiveRepairService.instance.terminateSessions();
     }
 
+    public void setRepairSessionMaxTreeDepth(int depth)
+    {
+        DatabaseDescriptor.setRepairSessionMaxTreeDepth(depth);
+    }
+
+    public int getRepairSessionMaxTreeDepth()
+    {
+        return DatabaseDescriptor.getRepairSessionMaxTreeDepth();
+    }
+
     /* End of MBean interface methods */
 
     /**
diff --git a/src/java/org/apache/cassandra/service/StorageServiceMBean.java 
b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
index 5692754..ddd2da0 100644
--- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java
+++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
@@ -364,6 +364,10 @@ public interface StorageServiceMBean extends 
NotificationEmitter
 
     public void forceTerminateAllRepairSessions();
 
+    public void setRepairSessionMaxTreeDepth(int depth);
+
+    public int getRepairSessionMaxTreeDepth();
+
     /**
      * transfer this node's data to other machines and remove it from service.
      */
diff --git a/test/unit/org/apache/cassandra/config/DatabaseDescriptorTest.java 
b/test/unit/org/apache/cassandra/config/DatabaseDescriptorTest.java
index c078e7b..4a43388 100644
--- a/test/unit/org/apache/cassandra/config/DatabaseDescriptorTest.java
+++ b/test/unit/org/apache/cassandra/config/DatabaseDescriptorTest.java
@@ -25,6 +25,7 @@ import java.net.InetAddress;
 import java.net.NetworkInterface;
 import java.util.Enumeration;
 
+import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -42,6 +43,7 @@ import org.apache.cassandra.thrift.ThriftConversion;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
+import static org.junit.Assert.fail;
 
 @RunWith(OrderedJUnit4ClassRunner.class)
 public class DatabaseDescriptorTest
@@ -271,4 +273,39 @@ public class DatabaseDescriptorTest
         DatabaseDescriptor.applyAddressConfig(testConfig);
 
     }
+
+    @Test
+    public void testRepairSessionSizeToggles()
+    {
+        int previousDepth = DatabaseDescriptor.getRepairSessionMaxTreeDepth();
+        try
+        {
+            Assert.assertEquals(18, 
DatabaseDescriptor.getRepairSessionMaxTreeDepth());
+            DatabaseDescriptor.setRepairSessionMaxTreeDepth(10);
+            Assert.assertEquals(10, 
DatabaseDescriptor.getRepairSessionMaxTreeDepth());
+
+            try
+            {
+                DatabaseDescriptor.setRepairSessionMaxTreeDepth(9);
+                fail("Should have received a ConfigurationException for depth 
of 9");
+            }
+            catch (ConfigurationException ignored) { }
+            Assert.assertEquals(10, 
DatabaseDescriptor.getRepairSessionMaxTreeDepth());
+
+            try
+            {
+                DatabaseDescriptor.setRepairSessionMaxTreeDepth(-20);
+                fail("Should have received a ConfigurationException for depth 
of -20");
+            }
+            catch (ConfigurationException ignored) { }
+            Assert.assertEquals(10, 
DatabaseDescriptor.getRepairSessionMaxTreeDepth());
+
+            DatabaseDescriptor.setRepairSessionMaxTreeDepth(22);
+            Assert.assertEquals(22, 
DatabaseDescriptor.getRepairSessionMaxTreeDepth());
+        }
+        finally
+        {
+            DatabaseDescriptor.setRepairSessionMaxTreeDepth(previousDepth);
+        }
+    }
 }
diff --git a/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java 
b/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java
index 6aacae6..b891296 100644
--- a/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java
+++ b/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java
@@ -76,7 +76,9 @@ public class LocalSyncTaskTest extends SchemaLoader
         // note: we reuse the same endpoint which is bogus in theory but fine 
here
         TreeResponse r1 = new TreeResponse(ep1, tree1);
         TreeResponse r2 = new TreeResponse(ep2, tree2);
-        LocalSyncTask task = new LocalSyncTask(desc, r1, r2, 
ActiveRepairService.UNREPAIRED_SSTABLE);
+        LocalSyncTask task = new LocalSyncTask(desc, r1.endpoint, r2.endpoint,
+                                               
MerkleTrees.difference(r1.trees, r2.trees),
+                                               
ActiveRepairService.UNREPAIRED_SSTABLE);
         task.run();
 
         assertEquals(0, task.get().numberOfDifferences);
@@ -111,7 +113,9 @@ public class LocalSyncTaskTest extends SchemaLoader
         // note: we reuse the same endpoint which is bogus in theory but fine 
here
         TreeResponse r1 = new TreeResponse(InetAddress.getByName("127.0.0.1"), 
tree1);
         TreeResponse r2 = new TreeResponse(InetAddress.getByName("127.0.0.2"), 
tree2);
-        LocalSyncTask task = new LocalSyncTask(desc, r1, r2, 
ActiveRepairService.UNREPAIRED_SSTABLE);
+        LocalSyncTask task = new LocalSyncTask(desc, r1.endpoint, r2.endpoint,
+                                               
MerkleTrees.difference(r1.trees, r2.trees),
+                                               
ActiveRepairService.UNREPAIRED_SSTABLE);
         task.run();
 
         // ensure that the changed range was recorded
diff --git a/test/unit/org/apache/cassandra/repair/RepairJobTest.java 
b/test/unit/org/apache/cassandra/repair/RepairJobTest.java
new file mode 100644
index 0000000..2f77a34
--- /dev/null
+++ b/test/unit/org/apache/cassandra/repair/RepairJobTest.java
@@ -0,0 +1,325 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.repair;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import com.google.common.util.concurrent.AsyncFunction;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.Murmur3Partitioner;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.net.IMessageSink;
+import org.apache.cassandra.net.MessageIn;
+import org.apache.cassandra.net.MessageOut;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.repair.messages.RepairMessage;
+import org.apache.cassandra.repair.messages.SyncRequest;
+import org.apache.cassandra.schema.KeyspaceParams;
+import org.apache.cassandra.service.ActiveRepairService;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.MerkleTree;
+import org.apache.cassandra.utils.MerkleTrees;
+import org.apache.cassandra.utils.ObjectSizes;
+import org.apache.cassandra.utils.UUIDGen;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class RepairJobTest extends SchemaLoader
+{
+    private static final long TEST_TIMEOUT_S = 10;
+    private static final long THREAD_TIMEOUT_MILLIS = 100;
+    private static final IPartitioner MURMUR3_PARTITIONER = 
Murmur3Partitioner.instance;
+    private static final String KEYSPACE = "RepairJobTest";
+    private static final String CF = "Standard1";
+    private static final Object messageLock = new Object();
+
+    private static final List<Range<Token>> fullRange = 
Collections.singletonList(new Range<>(MURMUR3_PARTITIONER.getMinimumToken(),
+                                                                               
               MURMUR3_PARTITIONER.getRandomToken()));
+    private static InetAddress addr1;
+    private static InetAddress addr2;
+    private static InetAddress addr3;
+    private static InetAddress addr4;
+    private RepairSession session;
+    private RepairJob job;
+    private RepairJobDesc sessionJobDesc;
+
+    // So that threads actually get recycled and we can have accurate memory 
accounting while testing
+    // memory retention from CASSANDRA-14096
+    private static class MeasureableRepairSession extends RepairSession
+    {
+        public MeasureableRepairSession(UUID parentRepairSession, UUID id, 
Collection<Range<Token>> ranges, String keyspace,
+                                        RepairParallelism parallelismDegree, 
Set<InetAddress> endpoints, long repairedAt, String... cfnames)
+        {
+            super(parentRepairSession, id, ranges, keyspace, 
parallelismDegree, endpoints, repairedAt, cfnames);
+        }
+
+        protected DebuggableThreadPoolExecutor createExecutor()
+        {
+            DebuggableThreadPoolExecutor executor = super.createExecutor();
+            executor.setKeepAliveTime(THREAD_TIMEOUT_MILLIS, 
TimeUnit.MILLISECONDS);
+            return executor;
+        }
+    }
+
+    @BeforeClass
+    public static void setupClass() throws UnknownHostException
+    {
+        SchemaLoader.prepareServer();
+        SchemaLoader.createKeyspace(KEYSPACE,
+                                    KeyspaceParams.simple(1),
+                                    SchemaLoader.standardCFMD(KEYSPACE, CF));
+        addr1 = InetAddress.getByName("127.0.0.1");
+        addr2 = InetAddress.getByName("127.0.0.2");
+        addr3 = InetAddress.getByName("127.0.0.3");
+        addr4 = InetAddress.getByName("127.0.0.4");
+    }
+
+    @Before
+    public void setup()
+    {
+        Set<InetAddress> neighbors = new HashSet<>(Arrays.asList(addr2, 
addr3));
+
+        UUID parentRepairSession = UUID.randomUUID();
+        
ActiveRepairService.instance.registerParentRepairSession(parentRepairSession, 
FBUtilities.getBroadcastAddress(),
+                                                                 
Collections.singletonList(Keyspace.open(KEYSPACE).getColumnFamilyStore(CF)), 
fullRange, false,
+                                                                 
ActiveRepairService.UNREPAIRED_SSTABLE, false);
+
+        this.session = new MeasureableRepairSession(parentRepairSession, 
UUIDGen.getTimeUUID(), fullRange,
+                                                    KEYSPACE, 
RepairParallelism.SEQUENTIAL, neighbors,
+                                                    
ActiveRepairService.UNREPAIRED_SSTABLE, CF);
+
+        this.job = new RepairJob(session, CF);
+        this.sessionJobDesc = new RepairJobDesc(session.parentRepairSession, 
session.getId(),
+                                                session.keyspace, CF, 
session.getRanges());
+
+        DatabaseDescriptor.setBroadcastAddress(addr1);
+    }
+
+    @After
+    public void reset()
+    {
+        ActiveRepairService.instance.terminateSessions();
+        MessagingService.instance().clearMessageSinks();
+    }
+
+    /**
+     * Ensure we can do an end to end repair of consistent data and get the 
messages we expect
+     */
+    @Test
+    public void testEndToEndNoDifferences() throws Exception
+    {
+        Map<InetAddress, MerkleTrees> mockTrees = new HashMap<>();
+        mockTrees.put(FBUtilities.getBroadcastAddress(), 
createInitialTree(false));
+        mockTrees.put(addr2, createInitialTree(false));
+        mockTrees.put(addr3, createInitialTree(false));
+
+        List<MessageOut> observedMessages = new ArrayList<>();
+        interceptRepairMessages(mockTrees, observedMessages);
+
+        job.run();
+
+        RepairResult result = job.get(TEST_TIMEOUT_S, TimeUnit.SECONDS);
+
+        assertEquals(3, result.stats.size());
+        // Should be one RemoteSyncTask left behind (other two should be local)
+        assertExpectedDifferences(session.getSyncingTasks().values(), 0);
+
+        // RepairJob should send out SNAPSHOTS -> VALIDATIONS -> done
+        List<RepairMessage.Type> expectedTypes = new ArrayList<>();
+        for (int i = 0; i < 3; i++)
+            expectedTypes.add(RepairMessage.Type.SNAPSHOT);
+        for (int i = 0; i < 3; i++)
+            expectedTypes.add(RepairMessage.Type.VALIDATION_REQUEST);
+
+        assertEquals(expectedTypes, observedMessages.stream()
+                                                    .map(k -> ((RepairMessage) 
k.payload).messageType)
+                                                    
.collect(Collectors.toList()));
+    }
+
+    /**
+     * Regression test for CASSANDRA-14096. We should not retain memory in the 
RepairSession once the
+     * ValidationTask -> SyncTask transform is done.
+     */
+    @Test
+    public void testNoTreesRetainedAfterDifference() throws Throwable
+    {
+        Map<InetAddress, MerkleTrees> mockTrees = new HashMap<>();
+        mockTrees.put(FBUtilities.getBroadcastAddress(), 
createInitialTree(false));
+        mockTrees.put(addr2, createInitialTree(true));
+        mockTrees.put(addr3, createInitialTree(false));
+
+        List<MessageOut> observedMessages = new ArrayList<>();
+        interceptRepairMessages(mockTrees, observedMessages);
+
+        List<TreeResponse> mockTreeResponses = mockTrees.entrySet().stream()
+                                                        .map(e -> new 
TreeResponse(e.getKey(), e.getValue()))
+                                                        
.collect(Collectors.toList());
+
+        long singleTreeSize = ObjectSizes.measureDeep(mockTrees.get(addr2));
+
+        // Use a different local address so we get all RemoteSyncs (as 
LocalSyncs try to reach out over the network).
+        List<SyncTask> syncTasks = job.createSyncTasks(mockTreeResponses, 
addr4);
+
+        // SyncTasks themselves should not contain significant memory
+        assertTrue(ObjectSizes.measureDeep(syncTasks) < 0.8 * singleTreeSize);
+
+        ListenableFuture<List<SyncStat>> syncResults = 
Futures.transform(Futures.immediateFuture(mockTreeResponses), new 
AsyncFunction<List<TreeResponse>, List<SyncStat>>()
+        {
+            public ListenableFuture<List<SyncStat>> apply(List<TreeResponse> 
treeResponses)
+            {
+                return Futures.allAsList(syncTasks);
+            }
+        }, session.taskExecutor);
+
+        // The session can retain memory in the contained executor until the 
threads expire, so we wait for the threads
+        // that ran the Tree -> SyncTask conversions to die and release the 
memory
+        int millisUntilFreed;
+        for (millisUntilFreed = 0; millisUntilFreed < TEST_TIMEOUT_S * 1000; 
millisUntilFreed += THREAD_TIMEOUT_MILLIS)
+        {
+            // The measured size of the syncingTasks, and result of the 
computation should be much smaller
+            if (ObjectSizes.measureDeep(session) < 0.8 * singleTreeSize)
+                break;
+            TimeUnit.MILLISECONDS.sleep(THREAD_TIMEOUT_MILLIS);
+        }
+
+        assertTrue(millisUntilFreed < TEST_TIMEOUT_S * 1000);
+
+        List<SyncStat> results = syncResults.get(TEST_TIMEOUT_S, 
TimeUnit.SECONDS);
+
+        assertTrue(ObjectSizes.measureDeep(results) < 0.8 * singleTreeSize);
+
+        assertEquals(3, results.size());
+        // Should be two RemoteSyncTasks with ranges and one empty one
+        assertExpectedDifferences(new 
ArrayList<>(session.getSyncingTasks().values()), 1, 1, 0);
+
+        int numDifferent = 0;
+        for (SyncStat stat : results)
+        {
+            if (stat.nodes.endpoint1.equals(addr2) || 
stat.nodes.endpoint2.equals(addr2))
+            {
+                assertEquals(1, stat.numberOfDifferences);
+                numDifferent++;
+            }
+        }
+        assertEquals(2, numDifferent);
+    }
+
+    private void assertExpectedDifferences(Collection<RemoteSyncTask> tasks, 
Integer ... differences)
+    {
+        List<Integer> expectedDifferences = new 
ArrayList<>(Arrays.asList(differences));
+        List<Integer> observedDifferences = tasks.stream()
+                                                 .map(t -> (int) 
t.getCurrentStat().numberOfDifferences)
+                                                 .collect(Collectors.toList());
+        assertEquals(expectedDifferences.size(), observedDifferences.size());
+        assertTrue(expectedDifferences.containsAll(observedDifferences));
+    }
+
+    private MerkleTrees createInitialTree(boolean invalidate)
+    {
+        MerkleTrees tree = new MerkleTrees(MURMUR3_PARTITIONER);
+        tree.addMerkleTrees((int) Math.pow(2, 15), fullRange);
+        tree.init();
+        for (MerkleTree.TreeRange r : tree.invalids())
+        {
+            r.ensureHashInitialised();
+        }
+
+        if (invalidate)
+        {
+            // change a range in one of the trees
+            Token token = MURMUR3_PARTITIONER.midpoint(fullRange.get(0).left, 
fullRange.get(0).right);
+            tree.invalidate(token);
+            tree.get(token).hash("non-empty hash!".getBytes());
+        }
+
+        return tree;
+    }
+
+    private void interceptRepairMessages(Map<InetAddress, MerkleTrees> 
mockTrees,
+                                         List<MessageOut> messageCapture)
+    {
+        MessagingService.instance().addMessageSink(new IMessageSink()
+        {
+            public boolean allowOutgoingMessage(MessageOut message, int id, 
InetAddress to)
+            {
+                if (message == null || !(message.payload instanceof 
RepairMessage))
+                    return false;
+
+                // So different Thread's messages don't overwrite each other.
+                synchronized (messageLock)
+                {
+                    messageCapture.add(message);
+                }
+
+                RepairMessage rm = (RepairMessage) message.payload;
+                switch (rm.messageType)
+                {
+                    case SNAPSHOT:
+                        MessageIn<?> messageIn = MessageIn.create(to, null,
+                                                                  
Collections.emptyMap(),
+                                                                  
MessagingService.Verb.REQUEST_RESPONSE,
+                                                                  
MessagingService.current_version);
+                        MessagingService.instance().receive(messageIn, id, 
System.currentTimeMillis(), false);
+                        break;
+                    case VALIDATION_REQUEST:
+                        session.validationComplete(sessionJobDesc, to, 
mockTrees.get(to));
+                        break;
+                    case SYNC_REQUEST:
+                        SyncRequest syncRequest = (SyncRequest) rm;
+                        session.syncComplete(sessionJobDesc, new 
NodePair(syncRequest.src, syncRequest.dst), true);
+                        break;
+                    default:
+                        break;
+                }
+                return false;
+            }
+
+            public boolean allowIncomingMessage(MessageIn message, int id)
+            {
+                return message.verb == MessagingService.Verb.REQUEST_RESPONSE;
+            }
+        });
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to