This is an automated email from the ASF dual-hosted git repository.

bdeggleston pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit 70e6f2952fd648f45aa9075909c13e5783141318
Merge: cbf4da4 1268530
Author: Blake Eggleston <bdeggles...@gmail.com>
AuthorDate: Thu Feb 14 13:21:54 2019 -0800

    Merge branch 'cassandra-3.11' into trunk

 CHANGES.txt                                        |   1 +
 NEWS.txt                                           |   3 +
 conf/cassandra.yaml                                |  11 +
 src/java/org/apache/cassandra/config/Config.java   |   5 +
 .../cassandra/config/DatabaseDescriptor.java       |  54 ++++
 .../org/apache/cassandra/repair/RepairJob.java     |   4 +-
 .../org/apache/cassandra/repair/RepairSession.java |  17 +-
 .../apache/cassandra/repair/ValidationManager.java |  17 +-
 .../cassandra/service/ActiveRepairService.java     |  12 +
 .../service/ActiveRepairServiceMBean.java          |   3 +
 .../apache/cassandra/service/StorageService.java   |  11 +-
 .../cassandra/service/StorageServiceMBean.java     |   4 +
 .../org/apache/cassandra/utils/MerkleTree.java     |  46 +++
 .../cassandra/config/DatabaseDescriptorTest.java   |  69 +++++
 .../org/apache/cassandra/repair/RepairJobTest.java | 317 +++++++++++++++++----
 .../org/apache/cassandra/repair/ValidatorTest.java | 158 +++++++++-
 .../org/apache/cassandra/utils/MerkleTreeTest.java |  98 ++++++-
 17 files changed, 762 insertions(+), 68 deletions(-)

diff --cc CHANGES.txt
index 87f3e2b,00ca115..1394ea8
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -354,6 -11,6 +354,7 @@@
   * Make stop-server.bat wait for Cassandra to terminate (CASSANDRA-14829)
   * Correct sstable sorting for garbagecollect and levelled compaction 
(CASSANDRA-14870)
  Merged from 3.0:
++ * Improve merkle tree size and time on heap (CASSANDRA-14096)
   * Severe concurrency issues in STCS,DTCS,TWCS,TMD.Topology,TypeParser
   * Add a script to make running the cqlsh tests in cassandra repo easier 
(CASSANDRA-14951)
   * If SizeEstimatesRecorder misses a 'onDropTable' notification, the 
size_estimates table will never be cleared for that table. (CASSANDRA-14905)
diff --cc NEWS.txt
index 00c3178,d5a9128..d9950e3
--- a/NEWS.txt
+++ b/NEWS.txt
@@@ -113,112 -47,8 +113,115 @@@ New feature
  
  Upgrading
  ---------
 -      - repair_session_max_tree_depth setting has been added to 
cassandra.yaml to allow operators to reduce
 -        merkle tree size if repair is creating too much heap pressure. See 
CASSANDRA-14096 for details.
 +    - CASSANDRA-13241 lowered the default chunk_lengh_in_kb for compresesd 
tables from
 +      64kb to 16kb. For highly compressible data this can have a noticeable 
impact
 +      on space utilization. You may want to consider manually specifying this 
value.
 +    - Additional columns have been added to system_distributed.repair_history,
 +      system_traces.sessions and system_traces.events. As a result select 
queries
 +      againsts these tables will fail and generate an error in the log
 +      during upgrade when the cluster is mixed version. The tables can be made
 +      readable by following the instructions in CASSANDRA-14897 to add the
 +      new columns to the system tables before upgrading.
 +    - Timestamp ties between values resolve differently: if either value has 
a TTL,
 +      this value always wins. This is to provide consistent reconciliation 
before
 +      and after the value expires into a tombstone.
 +    - Cassandra 4.0 removed support for COMPACT STORAGE tables. All Compact 
Tables
 +      have to be migrated using `ALTER ... DROP COMPACT STORAGE` statement in 
3.0/3.11.
 +      Cassandra starting 4.0 will not start if flags indicate that the table 
is non-CQL.
 +      Syntax for creating compact tables is also deprecated.
 +    - Support for legacy auth tables in the system_auth keyspace (users,
 +      permissions, credentials) and the migration code has been removed. 
Migration
 +      of these legacy auth tables must have been completed before the upgrade 
to
 +      4.0 and the legacy tables must have been removed. See the 'Upgrading' 
section
 +      for version 2.2 for migration instructions.
 +    - Cassandra 4.0 removed support for the deprecated Thrift interface. 
Amongst
 +      other things, this implies the removal of all yaml options related to 
thrift
 +      ('start_rpc', rpc_port, ...).
 +    - Cassandra 4.0 removed support for any pre-3.0 format. This means you
 +      cannot upgrade from a 2.x version to 4.0 directly, you have to upgrade 
to
 +      a 3.0.x/3.x version first (and run upgradesstable). In particular, this
 +      mean Cassandra 4.0 cannot load or read pre-3.0 sstables in any way: you
 +      will need to upgrade those sstable in 3.0.x/3.x first.
 +    - Upgrades from 3.0.x or 3.x are supported since 3.0.13 or 3.11.0, 
previous
 +      versions will causes issues during rolling upgrades (CASSANDRA-13274).
 +    - Cassandra will no longer allow invalid keyspace replication options, 
such
 +      as invalid datacenter names for NetworkTopologyStrategy. Operators MUST
 +      add new nodes to a datacenter before they can set set ALTER or CREATE
 +      keyspace replication policies using that datacenter. Existing keyspaces
 +      will continue to operate, but CREATE and ALTER will validate that all
 +      datacenters specified exist in the cluster.
 +    - Cassandra 4.0 fixes a problem with incremental repair which caused 
repaired
 +      data to be inconsistent between nodes. The fix changes the behavior of 
both
 +      full and incremental repairs. For full repairs, data is no longer marked
 +      repaired. For incremental repairs, anticompaction is run at the 
beginning
 +      of the repair, instead of at the end. If incremental repair was being 
used
 +      prior to upgrading, a full repair should be run after upgrading to 
resolve
 +      any inconsistencies.
 +    - Config option index_interval has been removed (it was deprecated since 
2.0)
 +    - Deprecated repair JMX APIs are removed.
 +    - The version of snappy-java has been upgraded to 1.1.2.6
 +      - the miniumum value for internode message timeouts is 10ms. 
Previously, any
 +        positive value was allowed. See cassandra.yaml entries like
 +        read_request_timeout_in_ms for more details.
 +      - Cassandra 4.0 allows a single port to be used for both secure and 
insecure
 +        connections between cassandra nodes (CASSANDRA-10404). See the yaml 
for
 +        specific property changes, and see the security doc for full details.
 +    - Due to the parallelization of the initial build of materialized views,
 +      the per token range view building status is stored in the new table
 +      `system.view_builds_in_progress`. The old table 
`system.views_builds_in_progress`
 +      is no longer used and can be removed. See CASSANDRA-12245 for more 
details.
 +      - Config option commitlog_sync_batch_window_in_ms has been deprecated 
as it's
 +        documentation has been incorrect and the setting itself near useless.
 +        Batch mode remains a valid commit log mode, however.
 +      - There is a new commit log mode, group, which is similar to batch mode
 +        but blocks for up to a configurable number of milliseconds between 
disk flushes.
 +      - nodetool clearsnapshot now required the --all flag to remove all 
snapshots.
 +        Previous behavior would delete all snapshots by default.
 +    - Nodes are now identified by a combination of IP, and storage port.
 +      Existing JMX APIs, nodetool, and system tables continue to work
 +      and accept/return just an IP, but there is a new
 +      version of each that works with the full unambiguous identifier.
 +      You should prefer these over the deprecated ambiguous versions that only
 +      work with an IP. This was done to support multiple instances per IP.
 +      Additionally we are moving to only using a single port for encrypted and
 +      unencrypted traffic and if you want multiple instances per IP you must
 +      first switch encrypted traffic to the storage port and not a separate
 +      encrypted port. If you want to use multiple instances per IP
 +      with SSL you will need to use StartTLS on storage_port and set
 +      outgoing_encrypted_port_source to gossip outbound connections
 +      know what port to connect to for each instance. Before changing
 +      storage port or native port at nodes you must first upgrade the entire 
cluster
 +      and clients to 4.0 so they can handle the port not being consistent 
across
 +      the cluster.
 +    - Names of AWS regions/availability zones have been cleaned up to more 
correctly
 +      match the Amazon names. There is now a new option in 
conf/cassandra-rackdc.properties
 +      that lets users enable the correct names for new clusters, or use the 
legacy
 +      names for existing clusters. See conf/cassandra-rackdc.properties for 
details.
 +    - Background repair has been removed. dclocal_read_repair_chance and
 +      read_repair_chance table options have been removed and are now rejected.
 +      See CASSANDRA-13910 for details.
 +    - Internode TCP connections that do not ack segments for 30s will now
 +      be automatically detected and closed via the Linux TCP_USER_TIMEOUT
 +      socket option. This should be exceedingly rare, but AWS networks (and
 +      other stateful firewalls) apparently suffer from this issue. You can
 +      tune the timeouts on TCP connection and segment ack via the
 +      `cassandra.yaml:internode_tcp_connect_timeout_in_ms` and
 +      `cassandra.yaml:internode_tcp_user_timeout_in_ms` options respectively.
 +      See CASSANDRA-14358 for details.
++      - repair_session_space_in_mb setting has been added to cassandra.yaml 
to allow operators to reduce
++        merkle tree size if repair is creating too much heap pressure. The 
repair_session_max_tree_depth
++        setting added in 3.0.19 and 3.11.5 is deprecated in favor of this 
setting. See CASSANDRA-14096 
 +
 +Materialized Views
 +-------------------
 +   - Following a discussion regarding concerns about the design and safety of 
Materialized Views, the C* development
 +     community no longer recommends them for production use, and considers 
them experimental. Warnings messages will
 +     now be logged when they are created. (See 
https://www.mail-archive.com/dev@cassandra.apache.org/msg11511.html)
 +   - An 'enable_materialized_views' flag has been added to cassandra.yaml to 
allow operators to prevent creation of
 +     views
 +   - CREATE MATERIALIZED VIEW syntax has become stricter. Partition key 
columns are no longer implicitly considered
 +     to be NOT NULL, and no base primary key columns get automatically 
included in view definition. You have to
 +     specify them explicitly now.
  
  3.11.4
  ======
diff --cc conf/cassandra.yaml
index dde4296,a263d8a..78fb162
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@@ -506,6 -498,19 +506,17 @@@ concurrent_materialized_view_writes: 3
  #    off heap objects
  memtable_allocation_type: heap_buffers
  
 -# Limits the maximum Merkle tree depth to avoid consuming too much
 -# memory during repairs.
 -#
 -# The default setting of 18 generates trees of maximum size around
 -# 50 MiB / tree. If you are running out of memory during repairs consider
 -# lowering this to 15 (~6 MiB / tree) or lower, but try not to lower it
 -# too much past that or you will lose too much resolution and stream
 -# too much redundant data during repair. Cannot be set lower than 10.
++# Limit memory usage for Merkle tree calculations during repairs. The default
++# is 1/16th of the available heap. The main tradeoff is that smaller trees
++# have less resolution, which can lead to over-streaming data. If you see heap
++# pressure during repairs, consider lowering this, but you cannot go below
++# one megabyte. If you see lots of over-streaming, consider raising
++# this or using subrange repair.
+ #
+ # For more details see https://issues.apache.org/jira/browse/CASSANDRA-14096.
+ #
 -# repair_session_max_tree_depth: 18
++# repair_session_space_in_mb:
+ 
  # Total space to use for commit logs on disk.
  #
  # If space gets above this value, Cassandra will flush every dirty CF
diff --cc src/java/org/apache/cassandra/config/Config.java
index 4a55b2e,528cf4f..a95db23
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@@ -123,6 -123,9 +123,11 @@@ public class Confi
      public Integer memtable_offheap_space_in_mb;
      public Float memtable_cleanup_threshold = null;
  
+     // Limit the maximum depth of repair session merkle trees
 -    public volatile int repair_session_max_tree_depth = 18;
++    @Deprecated
++    public volatile Integer repair_session_max_tree_depth = null;
++    public volatile Integer repair_session_space_in_mb = null;
+ 
      public int storage_port = 7000;
      public int ssl_storage_port = 7001;
      public String listen_address;
diff --cc src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 5c70c9a,069a17e..3f80d71
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@@ -461,7 -437,13 +461,28 @@@ public class DatabaseDescripto
          else
              logger.info("Global memtable off-heap threshold is enabled at 
{}MB", conf.memtable_offheap_space_in_mb);
  
 -        if (conf.repair_session_max_tree_depth < 10)
 -            throw new ConfigurationException("repair_session_max_tree_depth 
should not be < 10, but was " + conf.repair_session_max_tree_depth);
 -        if (conf.repair_session_max_tree_depth > 20)
 -            logger.warn("repair_session_max_tree_depth of " + 
conf.repair_session_max_tree_depth + " > 20 could lead to excessive memory 
usage");
++        if (conf.repair_session_max_tree_depth != null)
++        {
++            logger.warn("repair_session_max_tree_depth has been deprecated 
and should be removed from cassandra.yaml. Use repair_session_space_in_mb 
instead");
++            if (conf.repair_session_max_tree_depth < 10)
++                throw new 
ConfigurationException("repair_session_max_tree_depth should not be < 10, but 
was " + conf.repair_session_max_tree_depth);
++            if (conf.repair_session_max_tree_depth > 20)
++                logger.warn("repair_session_max_tree_depth of " + 
conf.repair_session_max_tree_depth + " > 20 could lead to excessive memory 
usage");
++        }
++        else
++        {
++            conf.repair_session_max_tree_depth = 20;
++        }
++
++        if (conf.repair_session_space_in_mb == null)
++            conf.repair_session_space_in_mb = Math.max(1, (int) 
(Runtime.getRuntime().maxMemory() / (16 * 1048576)));
++
++        if (conf.repair_session_space_in_mb < 1)
++            throw new ConfigurationException("repair_session_space_in_mb must 
be > 0, but was " + conf.repair_session_space_in_mb);
++        else if (conf.repair_session_space_in_mb > (int) 
(Runtime.getRuntime().maxMemory() / (4 * 1048576)))
++            logger.warn("A repair_session_space_in_mb of " + 
conf.repair_session_space_in_mb + " megabytes is likely to cause heap 
pressure");
+ 
 -        if (conf.thrift_framed_transport_size_in_mb <= 0)
 -            throw new 
ConfigurationException("thrift_framed_transport_size_in_mb must be positive, 
but was " + conf.thrift_framed_transport_size_in_mb, false);
 +        checkForLowestAcceptedTimeouts(conf);
  
          if (conf.native_transport_max_frame_size_in_mb <= 0)
              throw new 
ConfigurationException("native_transport_max_frame_size_in_mb must be positive, 
but was " + conf.native_transport_max_frame_size_in_mb, false);
@@@ -2382,11 -2280,27 +2403,44 @@@
          return conf.memtable_allocation_type;
      }
  
 -    public static Float getMemtableCleanupThreshold()
 -    {
 -        return conf.memtable_cleanup_threshold;
 -    }
 -
+     public static int getRepairSessionMaxTreeDepth()
+     {
+         return conf.repair_session_max_tree_depth;
+     }
+ 
+     public static void setRepairSessionMaxTreeDepth(int depth)
+     {
+         if (depth < 10)
+             throw new ConfigurationException("Cannot set 
repair_session_max_tree_depth to " + depth +
+                                              " which is < 10, doing nothing");
+         else if (depth > 20)
+             logger.warn("repair_session_max_tree_depth of " + depth + " > 20 
could lead to excessive memory usage");
+ 
+         conf.repair_session_max_tree_depth = depth;
+     }
+ 
++    public static int getRepairSessionSpaceInMegabytes()
++    {
++        return conf.repair_session_space_in_mb;
++    }
++
++    public static void setRepairSessionSpaceInMegabytes(int sizeInMegabytes)
++    {
++        if (sizeInMegabytes < 1)
++            throw new ConfigurationException("Cannot set 
repair_session_space_in_mb to " + sizeInMegabytes +
++                                             " < 1 megabyte");
++        else if (sizeInMegabytes > (int) (Runtime.getRuntime().maxMemory() / 
(4 * 1048576)))
++            logger.warn("A repair_session_space_in_mb of " + 
conf.repair_session_space_in_mb +
++                        " megabytes is likely to cause heap pressure.");
++
++        conf.repair_session_space_in_mb = sizeInMegabytes;
++    }
++
 +    public static Float getMemtableCleanupThreshold()
 +    {
 +        return conf.memtable_cleanup_threshold;
 +    }
 +
      public static int getIndexSummaryResizeIntervalInMinutes()
      {
          return conf.index_summary_resize_interval_in_minutes;
diff --cc src/java/org/apache/cassandra/repair/RepairJob.java
index ebeb1f9,6f89a86..a67aac0
--- a/src/java/org/apache/cassandra/repair/RepairJob.java
+++ b/src/java/org/apache/cassandra/repair/RepairJob.java
@@@ -17,13 -17,10 +17,14 @@@
   */
  package org.apache.cassandra.repair;
  
 -import java.net.InetAddress;
  import java.util.*;
 +import java.util.function.Predicate;
 +import java.util.function.Function;
 +import java.util.stream.Collectors;
  
+ import com.google.common.annotations.VisibleForTesting;
 +import com.google.common.base.Preconditions;
 +import com.google.common.collect.ImmutableMap;
  import com.google.common.util.concurrent.*;
  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;
@@@ -239,96 -171,6 +240,97 @@@ public class RepairJob extends Abstract
          return syncTasks;
      }
  
 +    private ListenableFuture<List<SyncStat>> 
optimisedSyncing(List<TreeResponse> trees)
 +    {
 +        List<SyncTask> syncTasks = createOptimisedSyncingSyncTasks(desc,
 +                                                                   trees,
 +                                                                   
FBUtilities.getLocalAddressAndPort(),
 +                                                                   
this::isTransient,
 +                                                                   
this::getDC,
 +                                                                   
session.isIncremental,
 +                                                                   
session.previewKind);
 +
 +        return executeTasks(syncTasks);
 +    }
 +
-     private ListenableFuture<List<SyncStat>> executeTasks(List<SyncTask> 
syncTasks)
++    @VisibleForTesting
++    ListenableFuture<List<SyncStat>> executeTasks(List<SyncTask> syncTasks)
 +    {
 +        for (SyncTask task : syncTasks)
 +        {
 +            if (!task.isLocal())
 +                session.trackSyncCompletion(Pair.create(desc, 
task.nodePair()), (CompletableRemoteSyncTask) task);
 +            taskExecutor.submit(task);
 +        }
 +
 +        return Futures.allAsList(syncTasks);
 +    }
 +
 +    static List<SyncTask> createOptimisedSyncingSyncTasks(RepairJobDesc desc,
 +                                                          List<TreeResponse> 
trees,
 +                                                          InetAddressAndPort 
local,
 +                                                          
Predicate<InetAddressAndPort> isTransient,
 +                                                          
Function<InetAddressAndPort, String> getDC,
 +                                                          boolean 
isIncremental,
 +                                                          PreviewKind 
previewKind)
 +    {
 +        List<SyncTask> syncTasks = new ArrayList<>();
 +        // We need to difference all trees one against another
 +        DifferenceHolder diffHolder = new DifferenceHolder(trees);
 +
 +        logger.debug("diffs = {}", diffHolder);
 +        PreferedNodeFilter preferSameDCFilter = (streaming, candidates) ->
 +                                                candidates.stream()
 +                                                          .filter(node -> 
getDC.apply(streaming)
 +                                                                          
.equals(getDC.apply(node)))
 +                                                          
.collect(Collectors.toSet());
 +        ImmutableMap<InetAddressAndPort, HostDifferences> reducedDifferences 
= ReduceHelper.reduce(diffHolder, preferSameDCFilter);
 +
 +        for (int i = 0; i < trees.size(); i++)
 +        {
 +            InetAddressAndPort address = trees.get(i).endpoint;
 +
 +            // we don't stream to transient replicas
 +            if (isTransient.test(address))
 +                continue;
 +
 +            HostDifferences streamsFor = reducedDifferences.get(address);
 +            if (streamsFor != null)
 +            {
 +                
Preconditions.checkArgument(streamsFor.get(address).isEmpty(), "We should not 
fetch ranges from ourselves");
 +                for (InetAddressAndPort fetchFrom : streamsFor.hosts())
 +                {
 +                    List<Range<Token>> toFetch = streamsFor.get(fetchFrom);
 +                    assert !toFetch.isEmpty();
 +
 +                    logger.debug("{} is about to fetch {} from {}", address, 
toFetch, fetchFrom);
 +                    SyncTask task;
 +                    if (address.equals(local))
 +                    {
 +                        task = new LocalSyncTask(desc, address, fetchFrom, 
toFetch, isIncremental ? desc.parentSessionId : null,
 +                                                 true, false, previewKind);
 +                    }
 +                    else
 +                    {
 +                        task = new AsymmetricRemoteSyncTask(desc, address, 
fetchFrom, toFetch, previewKind);
 +                    }
 +                    syncTasks.add(task);
 +
 +                }
 +            }
 +            else
 +            {
 +                logger.debug("Node {} has nothing to stream", address);
 +            }
 +        }
 +        return syncTasks;
 +    }
 +
 +    private String getDC(InetAddressAndPort address)
 +    {
 +        return DatabaseDescriptor.getEndpointSnitch().getDatacenter(address);
 +    }
 +
      /**
       * Creates {@link ValidationTask} and submit them to task executor in 
parallel.
       *
diff --cc src/java/org/apache/cassandra/repair/RepairSession.java
index a38205e,3d25cbf..40f3dbe
--- a/src/java/org/apache/cassandra/repair/RepairSession.java
+++ b/src/java/org/apache/cassandra/repair/RepairSession.java
@@@ -100,13 -96,12 +101,13 @@@ public class RepairSession extends Abst
      private final AtomicBoolean isFailed = new AtomicBoolean(false);
  
      // Each validation task waits response from replica in validating 
ConcurrentMap (keyed by CF name and endpoint address)
 -    private final ConcurrentMap<Pair<RepairJobDesc, InetAddress>, 
ValidationTask> validating = new ConcurrentHashMap<>();
 +    private final ConcurrentMap<Pair<RepairJobDesc, InetAddressAndPort>, 
ValidationTask> validating = new ConcurrentHashMap<>();
      // Remote syncing jobs wait response in syncingTasks map
 -    private final ConcurrentMap<Pair<RepairJobDesc, NodePair>, 
RemoteSyncTask> syncingTasks = new ConcurrentHashMap<>();
 +    private final ConcurrentMap<Pair<RepairJobDesc, SyncNodePair>, 
CompletableRemoteSyncTask> syncingTasks = new ConcurrentHashMap<>();
  
      // Tasks(snapshot, validate request, differencing, ...) are run on 
taskExecutor
-     public final ListeningExecutorService taskExecutor = 
MoreExecutors.listeningDecorator(DebuggableThreadPoolExecutor.createCachedThreadpoolWithMaxSize("RepairJobTask"));
+     public final ListeningExecutorService taskExecutor;
 +    public final boolean optimiseStreams;
  
      private volatile boolean terminated = false;
  
@@@ -140,38 -135,17 +141,44 @@@
          this.parallelismDegree = parallelismDegree;
          this.keyspace = keyspace;
          this.cfnames = cfnames;
 -        this.ranges = ranges;
 -        this.endpoints = endpoints;
 -        this.repairedAt = repairedAt;
 +
 +        //If force then filter out dead endpoints
 +        boolean forceSkippedReplicas = false;
 +        if (force)
 +        {
 +            logger.debug("force flag set, removing dead endpoints");
 +            final Set<InetAddressAndPort> removeCandidates = new HashSet<>();
 +            for (final InetAddressAndPort endpoint : commonRange.endpoints)
 +            {
 +                if (!FailureDetector.instance.isAlive(endpoint))
 +                {
 +                    logger.info("Removing a dead node from Repair due to 
-force {}", endpoint);
 +                    removeCandidates.add(endpoint);
 +                }
 +            }
 +            if (!removeCandidates.isEmpty())
 +            {
 +                // we shouldn't be recording a successful repair if
 +                // any replicas are excluded from the repair
 +                forceSkippedReplicas = true;
 +                Set<InetAddressAndPort> filteredEndpoints = new 
HashSet<>(commonRange.endpoints);
 +                filteredEndpoints.removeAll(removeCandidates);
 +                commonRange = new CommonRange(filteredEndpoints, 
commonRange.transEndpoints, commonRange.ranges);
 +            }
 +        }
 +
 +        this.commonRange = commonRange;
 +        this.isIncremental = isIncremental;
 +        this.previewKind = previewKind;
          this.pullRepair = pullRepair;
 +        this.skippedReplicas = forceSkippedReplicas;
 +        this.optimiseStreams = optimiseStreams;
+         this.taskExecutor = 
MoreExecutors.listeningDecorator(createExecutor());
+     }
+ 
 -    @VisibleForTesting
+     protected DebuggableThreadPoolExecutor createExecutor()
+     {
+         return 
DebuggableThreadPoolExecutor.createCachedThreadpoolWithMaxSize("RepairJobTask");
      }
  
      public UUID getId()
@@@ -228,20 -197,25 +235,26 @@@
       * @param nodes nodes that completed sync
       * @param success true if sync succeeded
       */
 -    public void syncComplete(RepairJobDesc desc, NodePair nodes, boolean 
success)
 +    public void syncComplete(RepairJobDesc desc, SyncNodePair nodes, boolean 
success, List<SessionSummary> summaries)
      {
-         CompletableRemoteSyncTask task = syncingTasks.get(Pair.create(desc, 
nodes));
 -        RemoteSyncTask task = syncingTasks.get(Pair.create(desc, nodes));
++        CompletableRemoteSyncTask task = 
syncingTasks.remove(Pair.create(desc, nodes));
          if (task == null)
          {
              assert terminated;
              return;
          }
  
 -        logger.debug("[repair #{}] Repair completed between {} and {} on {}", 
getId(), nodes.endpoint1, nodes.endpoint2, desc.columnFamily);
 -        task.syncComplete(success);
 +        if (logger.isDebugEnabled())
 +            logger.debug("{} Repair completed between {} and {} on {}", 
previewKind.logPrefix(getId()), nodes.coordinator, nodes.peer, 
desc.columnFamily);
 +        task.syncComplete(success, summaries);
      }
  
+     @VisibleForTesting
 -    Map<Pair<RepairJobDesc, NodePair>, RemoteSyncTask> getSyncingTasks()
++    Map<Pair<RepairJobDesc, SyncNodePair>, CompletableRemoteSyncTask> 
getSyncingTasks()
+     {
+         return Collections.unmodifiableMap(syncingTasks);
+     }
+ 
      private String repairedNodes()
      {
          StringBuilder sb = new StringBuilder();
diff --cc src/java/org/apache/cassandra/repair/ValidationManager.java
index d664c8a,0000000..bbd5219
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/repair/ValidationManager.java
+++ b/src/java/org/apache/cassandra/repair/ValidationManager.java
@@@ -1,163 -1,0 +1,176 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.cassandra.repair;
 +
 +import java.io.IOException;
 +import java.util.Collection;
 +import java.util.Map;
 +import java.util.concurrent.Callable;
 +import java.util.concurrent.Future;
 +import java.util.concurrent.TimeUnit;
 +
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
++import org.apache.cassandra.config.DatabaseDescriptor;
 +import org.apache.cassandra.db.ColumnFamilyStore;
 +import org.apache.cassandra.db.rows.UnfilteredRowIterator;
 +import org.apache.cassandra.dht.Range;
 +import org.apache.cassandra.dht.Token;
 +import org.apache.cassandra.metrics.TableMetrics;
 +import org.apache.cassandra.utils.FBUtilities;
++import org.apache.cassandra.utils.MerkleTree;
 +import org.apache.cassandra.utils.MerkleTrees;
 +
 +public class ValidationManager
 +{
 +    private static final Logger logger = 
LoggerFactory.getLogger(ValidationManager.class);
 +
 +    public static final ValidationManager instance = new ValidationManager();
 +
 +    private ValidationManager() {}
 +
 +    private static MerkleTrees createMerkleTrees(ValidationPartitionIterator 
validationIterator, Collection<Range<Token>> ranges, ColumnFamilyStore cfs)
 +    {
 +        MerkleTrees tree = new MerkleTrees(cfs.getPartitioner());
 +        long allPartitions = validationIterator.estimatedPartitions();
 +        Map<Range<Token>, Long> rangePartitionCounts = 
validationIterator.getRangePartitionCounts();
 +
++        // The repair coordinator must hold RF trees in memory at once, so a 
given validation compaction can only
++        // use 1 / RF of the allowed space.
++        long availableBytes = 
(DatabaseDescriptor.getRepairSessionSpaceInMegabytes() * 1048576) /
++                              
cfs.keyspace.getReplicationStrategy().getReplicationFactor().allReplicas;
++
 +        for (Range<Token> range : ranges)
 +        {
 +            long numPartitions = rangePartitionCounts.get(range);
 +            double rangeOwningRatio = allPartitions > 0 ? 
(double)numPartitions / allPartitions : 0;
 +            // determine max tree depth proportional to range size to avoid 
blowing up memory with multiple tress,
-             // capping at 20 to prevent large tree (CASSANDRA-11390)
-             int maxDepth = rangeOwningRatio > 0 ? (int) Math.floor(20 - 
Math.log(1 / rangeOwningRatio) / Math.log(2)) : 0;
++            // capping at a depth that does not exceed our memory budget 
(CASSANDRA-11390, CASSANDRA-14096)
++            int rangeAvailableBytes = Math.max(1, (int) (rangeOwningRatio * 
availableBytes));
++            // Try to estimate max tree depth that fits the space budget 
assuming hashes of 256 bits = 32 bytes
++            // note that estimatedMaxDepthForBytes cannot return a number 
lower than 1
++            int estimatedMaxDepth = 
MerkleTree.estimatedMaxDepthForBytes(cfs.getPartitioner(), rangeAvailableBytes, 
32);
++            int maxDepth = rangeOwningRatio > 0
++                           ? Math.min(estimatedMaxDepth, 
DatabaseDescriptor.getRepairSessionMaxTreeDepth())
++                           : 0;
 +            // determine tree depth from number of partitions, capping at max 
tree depth (CASSANDRA-5263)
 +            int depth = numPartitions > 0 ? (int) 
Math.min(Math.ceil(Math.log(numPartitions) / Math.log(2)), maxDepth) : 0;
 +            tree.addMerkleTree((int) Math.pow(2, depth), range);
 +        }
 +        if (logger.isDebugEnabled())
 +        {
 +            // MT serialize may take time
 +            logger.debug("Created {} merkle trees with merkle trees size {}, 
{} partitions, {} bytes", tree.ranges().size(), tree.size(), allPartitions, 
MerkleTrees.serializer.serializedSize(tree, 0));
 +        }
 +
 +        return tree;
 +    }
 +
 +    private static ValidationPartitionIterator 
getValidationIterator(TableRepairManager repairManager, Validator validator) 
throws IOException
 +    {
 +        RepairJobDesc desc = validator.desc;
 +        return repairManager.getValidationIterator(desc.ranges, 
desc.parentSessionId, desc.sessionId, validator.isIncremental, 
validator.nowInSec);
 +    }
 +
 +    /**
 +     * Performs a readonly "compaction" of all sstables in order to validate 
complete rows,
 +     * but without writing the merge result
 +     */
 +    @SuppressWarnings("resource")
 +    private void doValidation(ColumnFamilyStore cfs, Validator validator) 
throws IOException
 +    {
 +        // this isn't meant to be race-proof, because it's not -- it won't 
cause bugs for a CFS to be dropped
 +        // mid-validation, or to attempt to validate a droped CFS.  this is 
just a best effort to avoid useless work,
 +        // particularly in the scenario where a validation is submitted 
before the drop, and there are compactions
 +        // started prior to the drop keeping some sstables alive.  Since 
validationCompaction can run
 +        // concurrently with other compactions, it would otherwise go ahead 
and scan those again.
 +        if (!cfs.isValid())
 +            return;
 +
 +        // Create Merkle trees suitable to hold estimated partitions for the 
given ranges.
 +        // We blindly assume that a partition is evenly distributed on all 
sstables for now.
 +        long start = System.nanoTime();
 +        long partitionCount = 0;
 +        long estimatedTotalBytes = 0;
 +        try (ValidationPartitionIterator vi = 
getValidationIterator(cfs.getRepairManager(), validator))
 +        {
 +            MerkleTrees tree = createMerkleTrees(vi, validator.desc.ranges, 
cfs);
 +            try
 +            {
 +                // validate the CF as we iterate over it
 +                validator.prepare(cfs, tree);
 +                while (vi.hasNext())
 +                {
 +                    try (UnfilteredRowIterator partition = vi.next())
 +                    {
 +                        validator.add(partition);
 +                        partitionCount++;
 +                    }
 +                }
 +                validator.complete();
 +            }
 +            finally
 +            {
 +                estimatedTotalBytes = vi.getEstimatedBytes();
 +                partitionCount = vi.estimatedPartitions();
 +            }
 +        }
 +        finally
 +        {
 +            cfs.metric.bytesValidated.update(estimatedTotalBytes);
 +            cfs.metric.partitionsValidated.update(partitionCount);
 +        }
 +        if (logger.isDebugEnabled())
 +        {
 +            long duration = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - 
start);
 +            logger.debug("Validation of {} partitions (~{}) finished in {} 
msec, for {}",
 +                         partitionCount,
 +                         FBUtilities.prettyPrintMemory(estimatedTotalBytes),
 +                         duration,
 +                         validator.desc);
 +        }
 +    }
 +
 +    /**
 +     * Does not mutate data, so is not scheduled.
 +     */
 +    public Future<?> submitValidation(ColumnFamilyStore cfs, Validator 
validator)
 +    {
 +        Callable<Object> validation = new Callable<Object>()
 +        {
 +            public Object call() throws IOException
 +            {
 +                try (TableMetrics.TableTimer.Context c = 
cfs.metric.validationTime.time())
 +                {
 +                    doValidation(cfs, validator);
 +                }
 +                catch (Throwable e)
 +                {
 +                    // we need to inform the remote end of our failure, 
otherwise it will hang on repair forever
 +                    validator.fail();
 +                    throw e;
 +                }
 +                return this;
 +            }
 +        };
 +
 +        return cfs.getRepairManager().submitValidation(validation);
 +    }
 +}
diff --cc src/java/org/apache/cassandra/service/ActiveRepairService.java
index 1a54e75,626aa91..525beba
--- a/src/java/org/apache/cassandra/service/ActiveRepairService.java
+++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java
@@@ -156,40 -121,8 +156,52 @@@ public class ActiveRepairService implem
      {
          this.failureDetector = failureDetector;
          this.gossiper = gossiper;
 +        this.repairStatusByCmd = CacheBuilder.newBuilder()
 +                                             .expireAfterWrite(
 +                                             
Long.getLong("cassandra.parent_repair_status_expiry_seconds",
 +                                                          
TimeUnit.SECONDS.convert(1, TimeUnit.DAYS)), TimeUnit.SECONDS)
 +                                             // using weight wouldn't work so 
well, since it doesn't reflect mutation of cached data
 +                                             // see 
https://github.com/google/guava/wiki/CachesExplained
 +                                             // We assume each entry is 
unlikely to be much more than 100 bytes, so bounding the size should be 
sufficient.
 +                                             
.maximumSize(Long.getLong("cassandra.parent_repair_status_cache_size", 100_000))
 +                                             .build();
 +
 +        MBeanWrapper.instance.registerMBean(this, MBEAN_NAME);
 +    }
 +
 +    public void start()
 +    {
 +        consistent.local.start();
 +        
ScheduledExecutors.optionalTasks.scheduleAtFixedRate(consistent.local::cleanup, 
0,
 +                                                             
LocalSessions.CLEANUP_INTERVAL,
 +                                                             
TimeUnit.SECONDS);
 +    }
 +
 +    @Override
 +    public List<Map<String, String>> getSessions(boolean all)
 +    {
 +        return consistent.local.sessionInfo(all);
 +    }
 +
 +    @Override
 +    public void failSession(String session, boolean force)
 +    {
 +        UUID sessionID = UUID.fromString(session);
 +        consistent.local.cancelSession(sessionID, force);
 +    }
 +
++    @Override
++    public void setRepairSessionSpaceInMegabytes(int sizeInMegabytes)
++    {
++        DatabaseDescriptor.setRepairSessionSpaceInMegabytes(sizeInMegabytes);
++    }
++
++    @Override
++    public int getRepairSessionSpaceInMegabytes()
++    {
++        return DatabaseDescriptor.getRepairSessionSpaceInMegabytes();
+     }
+ 
      /**
       * Requests repairs for the given keyspace and column families.
       *
diff --cc src/java/org/apache/cassandra/service/ActiveRepairServiceMBean.java
index 53b0acb,0000000..283d466
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/service/ActiveRepairServiceMBean.java
+++ b/src/java/org/apache/cassandra/service/ActiveRepairServiceMBean.java
@@@ -1,30 -1,0 +1,33 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.cassandra.service;
 +
 +import java.util.List;
 +import java.util.Map;
 +
 +public interface ActiveRepairServiceMBean
 +{
 +    public static final String MBEAN_NAME = 
"org.apache.cassandra.db:type=RepairService";
 +
 +    public List<Map<String, String>> getSessions(boolean all);
 +    public void failSession(String session, boolean force);
++
++    public void setRepairSessionSpaceInMegabytes(int sizeInMegabytes);
++    public int getRepairSessionSpaceInMegabytes();
 +}
diff --cc src/java/org/apache/cassandra/service/StorageService.java
index ca453d0,8f4b1e7..6a57493
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@@ -3868,15 -3688,16 +3868,24 @@@ public class StorageService extends Not
          ActiveRepairService.instance.terminateSessions();
      }
  
- 
 +    @Nullable
 +    public List<String> getParentRepairStatus(int cmd)
 +    {
 +        Pair<ActiveRepairService.ParentRepairStatus, List<String>> pair = 
ActiveRepairService.instance.getRepairStatus(cmd);
 +        return pair == null ? null :
 +               
ImmutableList.<String>builder().add(pair.left.name()).addAll(pair.right).build();
 +    }
 +
+     public void setRepairSessionMaxTreeDepth(int depth)
+     {
+         DatabaseDescriptor.setRepairSessionMaxTreeDepth(depth);
+     }
+ 
+     public int getRepairSessionMaxTreeDepth()
+     {
+         return DatabaseDescriptor.getRepairSessionMaxTreeDepth();
+     }
+ 
      /* End of MBean interface methods */
  
      /**
diff --cc src/java/org/apache/cassandra/service/StorageServiceMBean.java
index e82d1ba,bcf55cf..e74f002
--- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java
+++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
@@@ -357,22 -330,65 +357,26 @@@ public interface StorageServiceMBean ex
       */
      public int repairAsync(String keyspace, Map<String, String> options);
  
 -    /**
 -     * @deprecated use {@link #repairAsync(String keyspace, Map options)} 
instead.
 -     */
 -    @Deprecated
 -    public int forceRepairAsync(String keyspace, boolean isSequential, 
Collection<String> dataCenters, Collection<String> hosts,  boolean 
primaryRange, boolean fullRepair, String... tableNames) throws IOException;
 -
 -    /**
 -     * Invoke repair asynchronously.
 -     * You can track repair progress by subscribing JMX notification sent 
from this StorageServiceMBean.
 -     * Notification format is:
 -     *   type: "repair"
 -     *   userObject: int array of length 2, [0]=command number, [1]=ordinal 
of ActiveRepairService.Status
 -     *
 -     * @deprecated use {@link #repairAsync(String keyspace, Map options)} 
instead.
 -     *
 -     * @param parallelismDegree 0: sequential, 1: parallel, 2: DC parallel
 -     * @return Repair command number, or 0 if nothing to repair
 -     */
 -    @Deprecated
 -    public int forceRepairAsync(String keyspace, int parallelismDegree, 
Collection<String> dataCenters, Collection<String> hosts, boolean primaryRange, 
boolean fullRepair, String... tableNames);
 -
 -    /**
 -     * @deprecated use {@link #repairAsync(String keyspace, Map options)} 
instead.
 -     */
 -    @Deprecated
 -    public int forceRepairRangeAsync(String beginToken, String endToken, 
String keyspaceName, boolean isSequential, Collection<String> dataCenters, 
Collection<String> hosts, boolean fullRepair, String... tableNames) throws 
IOException;
 -
 -    /**
 -     * Same as forceRepairAsync, but handles a specified range
 -     *
 -     * @deprecated use {@link #repairAsync(String keyspace, Map options)} 
instead.
 -     *
 -     * @param parallelismDegree 0: sequential, 1: parallel, 2: DC parallel
 -     */
 -    @Deprecated
 -    public int forceRepairRangeAsync(String beginToken, String endToken, 
String keyspaceName, int parallelismDegree, Collection<String> dataCenters, 
Collection<String> hosts, boolean fullRepair, String... tableNames);
 -
 -    /**
 -     * @deprecated use {@link #repairAsync(String keyspace, Map options)} 
instead.
 -     */
 -    @Deprecated
 -    public int forceRepairAsync(String keyspace, boolean isSequential, 
boolean isLocal, boolean primaryRange, boolean fullRepair, String... 
tableNames);
 -
 -    /**
 -     * @deprecated use {@link #repairAsync(String keyspace, Map options)} 
instead.
 -     */
 -    @Deprecated
 -    public int forceRepairRangeAsync(String beginToken, String endToken, 
String keyspaceName, boolean isSequential, boolean isLocal, boolean fullRepair, 
String... tableNames);
 -
      public void forceTerminateAllRepairSessions();
  
+     public void setRepairSessionMaxTreeDepth(int depth);
+ 
+     public int getRepairSessionMaxTreeDepth();
+ 
      /**
 +     * Get the status of a given parent repair session.
 +     * @param cmd the int reference returned when issuing the repair
 +     * @return status of parent repair from enum {@link 
org.apache.cassandra.repair.RepairRunnable.Status}
 +     * followed by final message or messages of the session
 +     */
 +    @Nullable
 +    public List<String> getParentRepairStatus(int cmd);
 +
 +    /**
       * transfer this node's data to other machines and remove it from service.
 +     * @param force Decommission even if this will reduce N to be less than 
RF.
       */
 -    public void decommission() throws InterruptedException;
 +    public void decommission(boolean force) throws InterruptedException;
  
      /**
       * @param newToken token to move this node to.
diff --cc src/java/org/apache/cassandra/utils/MerkleTree.java
index 143d839,9572a27..1d51f03
--- a/src/java/org/apache/cassandra/utils/MerkleTree.java
+++ b/src/java/org/apache/cassandra/utils/MerkleTree.java
@@@ -1155,4 -1190,4 +1155,50 @@@ public class MerkleTree implements Seri
              public TooDeep(){ super(); }
          }
      }
++
++    /**
++     * Estimate the allowable depth while keeping the resulting heap usage of 
this tree under the provided
++     * number of bytes. This is important for ensuring that we do not 
allocate overly large trees that could
++     * OOM the JVM and cause instability.
++     *
++     * Calculated using the following logic:
++     *
++     * Let T = size of a tree of depth n
++     *
++     * T = #leafs  * sizeof(leaf) + #inner  * sizeof(inner)
++     * T = 2^n     * L            + 2^n - 1 * I
++     *
++     * T = 2^n * L + 2^n * I - I;
++     *
++     * So to solve for n given sizeof(tree_n) T:
++     *
++     * n = floor(log_2((T + I) / (L + I))
++     *
++     * @param numBytes: The number of bytes to fit the tree within
++     * @param bytesPerHash: The number of bytes stored in a leaf node, for 
example 2 * murmur128 will be 256 bits
++     *                    or 32 bytes
++     * @return the estimated depth that will fit within the provided number 
of bytes
++     */
++    public static int estimatedMaxDepthForBytes(IPartitioner partitioner, 
long numBytes, int bytesPerHash)
++    {
++        byte[] hashLeft = new byte[bytesPerHash];
++        byte[] hashRigth = new byte[bytesPerHash];
++        Leaf left = new Leaf(hashLeft);
++        Leaf right = new Leaf(hashRigth);
++        Inner inner = new Inner(partitioner.getMinimumToken(), left, right);
++        inner.calc();
++
++        // Some partioners have variable token sizes, try to estimate as 
close as we can by using the same
++        // heap estimate as the memtables use.
++        long innerTokenSize = 
ObjectSizes.measureDeep(partitioner.getMinimumToken());
++        long realInnerTokenSize = partitioner.getMinimumToken().getHeapSize();
++
++        long sizeOfLeaf = ObjectSizes.measureDeep(left);
++        long sizeOfInner = ObjectSizes.measureDeep(inner) -
++                           (ObjectSizes.measureDeep(left) + 
ObjectSizes.measureDeep(right) + innerTokenSize) +
++                           realInnerTokenSize;
++
++        long adjustedBytes = Math.max(1, (numBytes + sizeOfInner) / 
(sizeOfLeaf + sizeOfInner));
++        return Math.max(1, (int) Math.floor(Math.log(adjustedBytes) / 
Math.log(2)));
++    }
  }
diff --cc test/unit/org/apache/cassandra/config/DatabaseDescriptorTest.java
index 3d88164,4788289..209c35d
--- a/test/unit/org/apache/cassandra/config/DatabaseDescriptorTest.java
+++ b/test/unit/org/apache/cassandra/config/DatabaseDescriptorTest.java
@@@ -29,13 -31,23 +30,14 @@@ import org.junit.Assert
  import org.junit.BeforeClass;
  import org.junit.Test;
  import org.junit.runner.RunWith;
 +
  import org.apache.cassandra.OrderedJUnit4ClassRunner;
 -import org.apache.cassandra.SchemaLoader;
  import org.apache.cassandra.db.Keyspace;
  import org.apache.cassandra.exceptions.ConfigurationException;
 -import org.apache.cassandra.exceptions.InvalidRequestException;
 -import org.apache.cassandra.gms.Gossiper;
 -import org.apache.cassandra.schema.KeyspaceMetadata;
 -import org.apache.cassandra.schema.KeyspaceParams;
 -import org.apache.cassandra.service.MigrationManager;
 -import org.apache.cassandra.thrift.ThriftConversion;
  
  import static org.junit.Assert.assertEquals;
 -import static org.junit.Assert.assertNotNull;
 -import static org.junit.Assert.assertNull;
 -import static org.junit.Assert.fail;
 -
  import static org.junit.Assert.assertTrue;
++import static org.junit.Assert.fail;
  
  @RunWith(OrderedJUnit4ClassRunner.class)
  public class DatabaseDescriptorTest
@@@ -208,46 -284,41 +210,113 @@@
          assertTrue(DatabaseDescriptor.tokensFromString(null).isEmpty());
          Collection<String> tokens = DatabaseDescriptor.tokensFromString(" a,b 
,c , d, f,g,h");
          assertEquals(7, tokens.size());
 -        assertTrue(tokens.containsAll(Arrays.asList(new String[]{ "a", "b", 
"c", "d", "f", "g", "h" })));
 +        assertTrue(tokens.containsAll(Arrays.asList(new String[]{"a", "b", 
"c", "d", "f", "g", "h"})));
 +    }
 +
 +    @Test
 +    public void testLowestAcceptableTimeouts() throws ConfigurationException
 +    {
 +        Config testConfig = new Config();
 +        testConfig.read_request_timeout_in_ms = 
DatabaseDescriptor.LOWEST_ACCEPTED_TIMEOUT + 1;
 +        testConfig.range_request_timeout_in_ms = 
DatabaseDescriptor.LOWEST_ACCEPTED_TIMEOUT + 1;
 +        testConfig.write_request_timeout_in_ms = 
DatabaseDescriptor.LOWEST_ACCEPTED_TIMEOUT + 1;
 +        testConfig.truncate_request_timeout_in_ms = 
DatabaseDescriptor.LOWEST_ACCEPTED_TIMEOUT + 1;
 +        testConfig.cas_contention_timeout_in_ms = 
DatabaseDescriptor.LOWEST_ACCEPTED_TIMEOUT + 1;
 +        testConfig.counter_write_request_timeout_in_ms = 
DatabaseDescriptor.LOWEST_ACCEPTED_TIMEOUT + 1;
 +        testConfig.request_timeout_in_ms = 
DatabaseDescriptor.LOWEST_ACCEPTED_TIMEOUT + 1;
 +        
 +        assertTrue(testConfig.read_request_timeout_in_ms > 
DatabaseDescriptor.LOWEST_ACCEPTED_TIMEOUT);
 +        assertTrue(testConfig.range_request_timeout_in_ms > 
DatabaseDescriptor.LOWEST_ACCEPTED_TIMEOUT);
 +        assertTrue(testConfig.write_request_timeout_in_ms > 
DatabaseDescriptor.LOWEST_ACCEPTED_TIMEOUT);
 +        assertTrue(testConfig.truncate_request_timeout_in_ms > 
DatabaseDescriptor.LOWEST_ACCEPTED_TIMEOUT);
 +        assertTrue(testConfig.cas_contention_timeout_in_ms > 
DatabaseDescriptor.LOWEST_ACCEPTED_TIMEOUT);
 +        assertTrue(testConfig.counter_write_request_timeout_in_ms > 
DatabaseDescriptor.LOWEST_ACCEPTED_TIMEOUT);
 +        assertTrue(testConfig.request_timeout_in_ms > 
DatabaseDescriptor.LOWEST_ACCEPTED_TIMEOUT);
 +
 +        //set less than Lowest acceptable value
 +        testConfig.read_request_timeout_in_ms = 
DatabaseDescriptor.LOWEST_ACCEPTED_TIMEOUT - 1;
 +        testConfig.range_request_timeout_in_ms = 
DatabaseDescriptor.LOWEST_ACCEPTED_TIMEOUT - 1;
 +        testConfig.write_request_timeout_in_ms = 
DatabaseDescriptor.LOWEST_ACCEPTED_TIMEOUT - 1;
 +        testConfig.truncate_request_timeout_in_ms = 
DatabaseDescriptor.LOWEST_ACCEPTED_TIMEOUT - 1;
 +        testConfig.cas_contention_timeout_in_ms = 
DatabaseDescriptor.LOWEST_ACCEPTED_TIMEOUT - 1;
 +        testConfig.counter_write_request_timeout_in_ms = 
DatabaseDescriptor.LOWEST_ACCEPTED_TIMEOUT - 1;
 +        testConfig.request_timeout_in_ms = 
DatabaseDescriptor.LOWEST_ACCEPTED_TIMEOUT - 1;
 +
 +        DatabaseDescriptor.checkForLowestAcceptedTimeouts(testConfig);
 +
 +        assertTrue(testConfig.read_request_timeout_in_ms == 
DatabaseDescriptor.LOWEST_ACCEPTED_TIMEOUT);
 +        assertTrue(testConfig.range_request_timeout_in_ms == 
DatabaseDescriptor.LOWEST_ACCEPTED_TIMEOUT);
 +        assertTrue(testConfig.write_request_timeout_in_ms == 
DatabaseDescriptor.LOWEST_ACCEPTED_TIMEOUT);
 +        assertTrue(testConfig.truncate_request_timeout_in_ms == 
DatabaseDescriptor.LOWEST_ACCEPTED_TIMEOUT);
 +        assertTrue(testConfig.cas_contention_timeout_in_ms == 
DatabaseDescriptor.LOWEST_ACCEPTED_TIMEOUT);
 +        assertTrue(testConfig.counter_write_request_timeout_in_ms == 
DatabaseDescriptor.LOWEST_ACCEPTED_TIMEOUT);
 +        assertTrue(testConfig.request_timeout_in_ms == 
DatabaseDescriptor.LOWEST_ACCEPTED_TIMEOUT);
 +    }
++
++    @Test
++    public void testRepairSessionMemorySizeToggles()
++    {
++        int previousSize = 
DatabaseDescriptor.getRepairSessionSpaceInMegabytes();
++        try
++        {
++            Assert.assertEquals((Runtime.getRuntime().maxMemory() / (1024 * 
1024) / 16),
++                                
DatabaseDescriptor.getRepairSessionSpaceInMegabytes());
++
++            int targetSize = (int) (Runtime.getRuntime().maxMemory() / (1024 
* 1024) / 4) + 1;
++
++            DatabaseDescriptor.setRepairSessionSpaceInMegabytes(targetSize);
++            Assert.assertEquals(targetSize, 
DatabaseDescriptor.getRepairSessionSpaceInMegabytes());
++
++            DatabaseDescriptor.setRepairSessionSpaceInMegabytes(10);
++            Assert.assertEquals(10, 
DatabaseDescriptor.getRepairSessionSpaceInMegabytes());
++
++            try
++            {
++                DatabaseDescriptor.setRepairSessionSpaceInMegabytes(0);
++                fail("Should have received a ConfigurationException for depth 
of 9");
++            }
++            catch (ConfigurationException ignored) { }
++
++            Assert.assertEquals(10, 
DatabaseDescriptor.getRepairSessionSpaceInMegabytes());
++        }
++        finally
++        {
++            DatabaseDescriptor.setRepairSessionSpaceInMegabytes(previousSize);
++        }
+     }
+ 
+     @Test
+     public void testRepairSessionSizeToggles()
+     {
+         int previousDepth = DatabaseDescriptor.getRepairSessionMaxTreeDepth();
+         try
+         {
 -            Assert.assertEquals(18, 
DatabaseDescriptor.getRepairSessionMaxTreeDepth());
++            Assert.assertEquals(20, 
DatabaseDescriptor.getRepairSessionMaxTreeDepth());
+             DatabaseDescriptor.setRepairSessionMaxTreeDepth(10);
+             Assert.assertEquals(10, 
DatabaseDescriptor.getRepairSessionMaxTreeDepth());
+ 
+             try
+             {
+                 DatabaseDescriptor.setRepairSessionMaxTreeDepth(9);
+                 fail("Should have received a ConfigurationException for depth 
of 9");
+             }
+             catch (ConfigurationException ignored) { }
+             Assert.assertEquals(10, 
DatabaseDescriptor.getRepairSessionMaxTreeDepth());
+ 
+             try
+             {
+                 DatabaseDescriptor.setRepairSessionMaxTreeDepth(-20);
+                 fail("Should have received a ConfigurationException for depth 
of -20");
+             }
+             catch (ConfigurationException ignored) { }
+             Assert.assertEquals(10, 
DatabaseDescriptor.getRepairSessionMaxTreeDepth());
+ 
+             DatabaseDescriptor.setRepairSessionMaxTreeDepth(22);
+             Assert.assertEquals(22, 
DatabaseDescriptor.getRepairSessionMaxTreeDepth());
+         }
+         finally
+         {
+             DatabaseDescriptor.setRepairSessionMaxTreeDepth(previousDepth);
+         }
+     }
  }
diff --cc test/unit/org/apache/cassandra/repair/RepairJobTest.java
index 263ecbc,e1dd5b3..6db29dc
--- a/test/unit/org/apache/cassandra/repair/RepairJobTest.java
+++ b/test/unit/org/apache/cassandra/repair/RepairJobTest.java
@@@ -18,608 -18,309 +18,823 @@@
  
  package org.apache.cassandra.repair;
  
 -import java.net.InetAddress;
  import java.net.UnknownHostException;
+ import java.util.ArrayList;
  import java.util.Arrays;
  import java.util.Collection;
+ import java.util.Collections;
  import java.util.HashMap;
  import java.util.HashSet;
  import java.util.List;
  import java.util.Map;
  import java.util.Set;
  import java.util.UUID;
++import java.util.concurrent.ExecutionException;
+ import java.util.concurrent.TimeUnit;
++import java.util.concurrent.TimeoutException;
 +import java.util.function.Predicate;
+ import java.util.stream.Collectors;
  
 -import com.google.common.util.concurrent.AsyncFunction;
 -import com.google.common.util.concurrent.Futures;
 +import com.google.common.collect.Sets;
+ import com.google.common.util.concurrent.ListenableFuture;
  import org.junit.After;
 +import org.junit.Assert;
+ import org.junit.Before;
+ import org.junit.BeforeClass;
  import org.junit.Test;
  
- import org.apache.cassandra.config.DatabaseDescriptor;
+ import org.apache.cassandra.SchemaLoader;
+ import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
 -import org.apache.cassandra.config.DatabaseDescriptor;
+ import org.apache.cassandra.db.Keyspace;
 +import org.apache.cassandra.dht.ByteOrderedPartitioner;
  import org.apache.cassandra.dht.IPartitioner;
+ import org.apache.cassandra.dht.Murmur3Partitioner;
  import org.apache.cassandra.dht.Range;
  import org.apache.cassandra.dht.Token;
 +import org.apache.cassandra.locator.InetAddressAndPort;
+ import org.apache.cassandra.net.IMessageSink;
+ import org.apache.cassandra.net.MessageIn;
+ import org.apache.cassandra.net.MessageOut;
+ import org.apache.cassandra.net.MessagingService;
+ import org.apache.cassandra.repair.messages.RepairMessage;
+ import org.apache.cassandra.repair.messages.SyncRequest;
+ import org.apache.cassandra.schema.KeyspaceParams;
+ import org.apache.cassandra.service.ActiveRepairService;
 +import org.apache.cassandra.streaming.PreviewKind;
 +import org.apache.cassandra.utils.ByteBufferUtil;
  import org.apache.cassandra.utils.FBUtilities;
  import org.apache.cassandra.utils.MerkleTree;
  import org.apache.cassandra.utils.MerkleTrees;
+ import org.apache.cassandra.utils.ObjectSizes;
+ import org.apache.cassandra.utils.UUIDGen;
  
  import static org.junit.Assert.assertEquals;
 +import static org.junit.Assert.assertFalse;
 +import static org.junit.Assert.assertNull;
  import static org.junit.Assert.assertTrue;
  
 -public class RepairJobTest extends SchemaLoader
 +public class RepairJobTest
  {
+     private static final long TEST_TIMEOUT_S = 10;
+     private static final long THREAD_TIMEOUT_MILLIS = 100;
 +    private static final IPartitioner PARTITIONER = 
ByteOrderedPartitioner.instance;
+     private static final IPartitioner MURMUR3_PARTITIONER = 
Murmur3Partitioner.instance;
+     private static final String KEYSPACE = "RepairJobTest";
+     private static final String CF = "Standard1";
+     private static final Object messageLock = new Object();
+ 
++    private static final Range<Token> range1 = range(0, 1);
++    private static final Range<Token> range2 = range(2, 3);
++    private static final Range<Token> range3 = range(4, 5);
++    private static final RepairJobDesc desc = new 
RepairJobDesc(UUID.randomUUID(), UUID.randomUUID(), KEYSPACE, CF, 
Arrays.asList());
+     private static final List<Range<Token>> fullRange = 
Collections.singletonList(new Range<>(MURMUR3_PARTITIONER.getMinimumToken(),
 -                                                                              
                MURMUR3_PARTITIONER.getRandomToken()));
 -    private static InetAddress addr1;
 -    private static InetAddress addr2;
 -    private static InetAddress addr3;
 -    private static InetAddress addr4;
++                                                                              
                MURMUR3_PARTITIONER.getMaximumToken()));
++    private static InetAddressAndPort addr1;
++    private static InetAddressAndPort addr2;
++    private static InetAddressAndPort addr3;
++    private static InetAddressAndPort addr4;
++    private static InetAddressAndPort addr5;
+     private RepairSession session;
+     private RepairJob job;
+     private RepairJobDesc sessionJobDesc;
+ 
+     // So that threads actually get recycled and we can have accurate memory 
accounting while testing
+     // memory retention from CASSANDRA-14096
+     private static class MeasureableRepairSession extends RepairSession
+     {
 -        public MeasureableRepairSession(UUID parentRepairSession, UUID id, 
Collection<Range<Token>> ranges,
 -                                        String keyspace,RepairParallelism 
parallelismDegree, Set<InetAddress> endpoints,
 -                                        long repairedAt, boolean pullRepair, 
String... cfnames)
++        public MeasureableRepairSession(UUID parentRepairSession, UUID id, 
CommonRange commonRange, String keyspace,
++                                        RepairParallelism parallelismDegree, 
boolean isIncremental, boolean pullRepair,
++                                        boolean force, PreviewKind 
previewKind, boolean optimiseStreams, String... cfnames)
+         {
 -            super(parentRepairSession, id, ranges, keyspace, 
parallelismDegree, endpoints, repairedAt, pullRepair, cfnames);
++            super(parentRepairSession, id, commonRange, keyspace, 
parallelismDegree, isIncremental, pullRepair, force, previewKind, 
optimiseStreams, cfnames);
+         }
+ 
+         protected DebuggableThreadPoolExecutor createExecutor()
+         {
+             DebuggableThreadPoolExecutor executor = super.createExecutor();
+             executor.setKeepAliveTime(THREAD_TIMEOUT_MILLIS, 
TimeUnit.MILLISECONDS);
 -            return executor;
 -        }
++            return executor;        }
+     }
 -
+     @BeforeClass
+     public static void setupClass() throws UnknownHostException
+     {
+         SchemaLoader.prepareServer();
+         SchemaLoader.createKeyspace(KEYSPACE,
+                                     KeyspaceParams.simple(1),
+                                     SchemaLoader.standardCFMD(KEYSPACE, CF));
 -        addr1 = InetAddress.getByName("127.0.0.1");
 -        addr2 = InetAddress.getByName("127.0.0.2");
 -        addr3 = InetAddress.getByName("127.0.0.3");
 -        addr4 = InetAddress.getByName("127.0.0.4");
++        addr1 = InetAddressAndPort.getByName("127.0.0.1");
++        addr2 = InetAddressAndPort.getByName("127.0.0.2");
++        addr3 = InetAddressAndPort.getByName("127.0.0.3");
++        addr4 = InetAddressAndPort.getByName("127.0.0.4");
++        addr5 = InetAddressAndPort.getByName("127.0.0.5");
+     }
  
-     static InetAddressAndPort addr1;
-     static InetAddressAndPort addr2;
-     static InetAddressAndPort addr3;
-     static InetAddressAndPort addr4;
-     static InetAddressAndPort addr5;
+     @Before
+     public void setup()
+     {
 -        Set<InetAddress> neighbors = new HashSet<>(Arrays.asList(addr2, 
addr3));
++        Set<InetAddressAndPort> neighbors = new 
HashSet<>(Arrays.asList(addr2, addr3));
  
-     static Range<Token> range1 = range(0, 1);
-     static Range<Token> range2 = range(2, 3);
-     static Range<Token> range3 = range(4, 5);
-     static RepairJobDesc desc = new RepairJobDesc(UUID.randomUUID(), 
UUID.randomUUID(), "ks", "cf", Arrays.asList());
+         UUID parentRepairSession = UUID.randomUUID();
 -        
ActiveRepairService.instance.registerParentRepairSession(parentRepairSession, 
FBUtilities.getBroadcastAddress(),
++        
ActiveRepairService.instance.registerParentRepairSession(parentRepairSession, 
FBUtilities.getBroadcastAddressAndPort(),
+                                                                  
Collections.singletonList(Keyspace.open(KEYSPACE).getColumnFamilyStore(CF)), 
fullRange, false,
 -                                                                 
ActiveRepairService.UNREPAIRED_SSTABLE, false);
++                                                                 
ActiveRepairService.UNREPAIRED_SSTABLE, false, PreviewKind.NONE);
+ 
 -        this.session = new MeasureableRepairSession(parentRepairSession, 
UUIDGen.getTimeUUID(), fullRange,
 -                                                    KEYSPACE, 
RepairParallelism.SEQUENTIAL, neighbors,
 -                                                    
ActiveRepairService.UNREPAIRED_SSTABLE, false, CF);
++        this.session = new MeasureableRepairSession(parentRepairSession, 
UUIDGen.getTimeUUID(),
++                                                    new 
CommonRange(neighbors, Collections.emptySet(), fullRange),
++                                                    KEYSPACE, 
RepairParallelism.SEQUENTIAL,
++                                                    false, false, false,
++                                                    PreviewKind.NONE, false, 
CF);
+ 
+         this.job = new RepairJob(session, CF);
+         this.sessionJobDesc = new RepairJobDesc(session.parentRepairSession, 
session.getId(),
 -                                                session.keyspace, CF, 
session.getRanges());
++                                                session.keyspace, CF, 
session.ranges());
+ 
 -        DatabaseDescriptor.setBroadcastAddress(addr1);
++        FBUtilities.setBroadcastInetAddress(addr1.address);
+     }
  
      @After
      public void reset()
      {
+         ActiveRepairService.instance.terminateSessions();
+         MessagingService.instance().clearMessageSinks();
 +        FBUtilities.reset();
-         DatabaseDescriptor.setBroadcastAddress(addr1.address);
      }
  
-     static
+     /**
 -     * Ensure we can do an end to end repair of consistent data and get the 
messages we expect
++     * Ensure RepairJob issues the right messages in an end to end repair of 
consistent data
+      */
+     @Test
 -    public void testEndToEndNoDifferences() throws Exception
++    public void testEndToEndNoDifferences() throws InterruptedException, 
ExecutionException, TimeoutException
      {
-         try
-         {
-             addr1 = InetAddressAndPort.getByName("127.0.0.1");
-             addr2 = InetAddressAndPort.getByName("127.0.0.2");
-             addr3 = InetAddressAndPort.getByName("127.0.0.3");
-             addr4 = InetAddressAndPort.getByName("127.0.0.4");
-             addr5 = InetAddressAndPort.getByName("127.0.0.5");
-             DatabaseDescriptor.setBroadcastAddress(addr1.address);
-         }
-         catch (UnknownHostException e)
 -        Map<InetAddress, MerkleTrees> mockTrees = new HashMap<>();
 -        mockTrees.put(FBUtilities.getBroadcastAddress(), 
createInitialTree(false));
++        Map<InetAddressAndPort, MerkleTrees> mockTrees = new HashMap<>();
++        mockTrees.put(FBUtilities.getBroadcastAddressAndPort(), 
createInitialTree(false));
+         mockTrees.put(addr2, createInitialTree(false));
+         mockTrees.put(addr3, createInitialTree(false));
+ 
+         List<MessageOut> observedMessages = new ArrayList<>();
+         interceptRepairMessages(mockTrees, observedMessages);
+ 
+         job.run();
+ 
+         RepairResult result = job.get(TEST_TIMEOUT_S, TimeUnit.SECONDS);
+ 
 -        assertEquals(3, result.stats.size());
 -        // Should be one RemoteSyncTask left behind (other two should be 
local)
 -        assertExpectedDifferences(session.getSyncingTasks().values(), 0);
++        // Since there are no differences, there should be nothing to sync.
++        assertEquals(0, result.stats.size());
+ 
+         // RepairJob should send out SNAPSHOTS -> VALIDATIONS -> done
+         List<RepairMessage.Type> expectedTypes = new ArrayList<>();
+         for (int i = 0; i < 3; i++)
+             expectedTypes.add(RepairMessage.Type.SNAPSHOT);
+         for (int i = 0; i < 3; i++)
+             expectedTypes.add(RepairMessage.Type.VALIDATION_REQUEST);
+ 
+         assertEquals(expectedTypes, observedMessages.stream()
+                                                     .map(k -> 
((RepairMessage) k.payload).messageType)
+                                                     
.collect(Collectors.toList()));
+     }
+ 
+     /**
+      * Regression test for CASSANDRA-14096. We should not retain memory in 
the RepairSession once the
+      * ValidationTask -> SyncTask transform is done.
+      */
+     @Test
+     public void testNoTreesRetainedAfterDifference() throws Throwable
+     {
 -        Map<InetAddress, MerkleTrees> mockTrees = new HashMap<>();
 -        mockTrees.put(FBUtilities.getBroadcastAddress(), 
createInitialTree(false));
 -        mockTrees.put(addr2, createInitialTree(true));
++        Map<InetAddressAndPort, MerkleTrees> mockTrees = new HashMap<>();
++        mockTrees.put(addr1, createInitialTree(true));
++        mockTrees.put(addr2, createInitialTree(false));
+         mockTrees.put(addr3, createInitialTree(false));
+ 
 -        List<MessageOut> observedMessages = new ArrayList<>();
 -        interceptRepairMessages(mockTrees, observedMessages);
 -
+         List<TreeResponse> mockTreeResponses = mockTrees.entrySet().stream()
+                                                         .map(e -> new 
TreeResponse(e.getKey(), e.getValue()))
+                                                         
.collect(Collectors.toList());
++        List<MessageOut> messages = new ArrayList<>();
++        interceptRepairMessages(mockTrees, messages);
+ 
 -        long singleTreeSize = ObjectSizes.measureDeep(mockTrees.get(addr2));
++        long singleTreeSize = ObjectSizes.measureDeep(mockTrees.get(addr1));
+ 
 -        // Use a different local address so we get all RemoteSyncs (as 
LocalSyncs try to reach out over the network).
 -        List<SyncTask> syncTasks = job.createSyncTasks(mockTreeResponses, 
addr4);
++        // Use addr4 instead of one of the provided trees to force everything 
to be remote sync tasks as
++        // LocalSyncTasks try to reach over the network.
++        List<SyncTask> syncTasks = 
RepairJob.createStandardSyncTasks(sessionJobDesc, mockTreeResponses,
++                                                                     addr4, 
// local
++                                                                     
noTransient(),
++                                                                     
session.isIncremental,
++                                                                     
session.pullRepair,
++                                                                     
session.previewKind);
+ 
+         // SyncTasks themselves should not contain significant memory
 -        assertTrue(ObjectSizes.measureDeep(syncTasks) < 0.8 * singleTreeSize);
++        assertTrue(ObjectSizes.measureDeep(syncTasks) < 0.2 * singleTreeSize);
+ 
 -        ListenableFuture<List<SyncStat>> syncResults = 
Futures.transform(Futures.immediateFuture(mockTreeResponses), new 
AsyncFunction<List<TreeResponse>, List<SyncStat>>()
 -        {
 -            public ListenableFuture<List<SyncStat>> apply(List<TreeResponse> 
treeResponses)
 -            {
 -                return Futures.allAsList(syncTasks);
 -            }
 -        }, session.taskExecutor);
++        ListenableFuture<List<SyncStat>> syncResults = 
job.executeTasks(syncTasks);
+ 
 -        // The session can retain memory in the contained executor until the 
threads expire, so we wait for the threads
++        // Immediately following execution the internal execution queue 
should still retain the trees
++        assertTrue(ObjectSizes.measureDeep(session) > singleTreeSize);
++
++        // The session retains memory in the contained executor until the 
threads expire, so we wait for the threads
+         // that ran the Tree -> SyncTask conversions to die and release the 
memory
+         int millisUntilFreed;
+         for (millisUntilFreed = 0; millisUntilFreed < TEST_TIMEOUT_S * 1000; 
millisUntilFreed += THREAD_TIMEOUT_MILLIS)
          {
-             e.printStackTrace();
+             // The measured size of the syncingTasks, and result of the 
computation should be much smaller
++            TimeUnit.MILLISECONDS.sleep(THREAD_TIMEOUT_MILLIS);
+             if (ObjectSizes.measureDeep(session) < 0.8 * singleTreeSize)
+                 break;
 -            TimeUnit.MILLISECONDS.sleep(THREAD_TIMEOUT_MILLIS);
          }
+ 
+         assertTrue(millisUntilFreed < TEST_TIMEOUT_S * 1000);
+ 
+         List<SyncStat> results = syncResults.get(TEST_TIMEOUT_S, 
TimeUnit.SECONDS);
+ 
 -        assertTrue(ObjectSizes.measureDeep(results) < 0.8 * singleTreeSize);
++        assertTrue(ObjectSizes.measureDeep(results) < 0.2 * singleTreeSize);
++
++        assertEquals(2, results.size());
++        assertEquals(0, session.getSyncingTasks().size());
++        assertTrue(results.stream().allMatch(s -> s.numberOfDifferences == 
1));
++
++        assertEquals(2, messages.size());
++        assertTrue(messages.stream().allMatch(m -> ((RepairMessage) 
m.payload).messageType == RepairMessage.Type.SYNC_REQUEST));
 +    }
 +
 +    @Test
 +    public void testCreateStandardSyncTasks()
 +    {
 +        testCreateStandardSyncTasks(false);
 +    }
 +
 +    @Test
 +    public void testCreateStandardSyncTasksPullRepair()
 +    {
 +        testCreateStandardSyncTasks(true);
 +    }
 +
 +    public static void testCreateStandardSyncTasks(boolean pullRepair)
 +    {
 +        List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, 
range1, "same",      range2, "same", range3, "same"),
 +                                                         treeResponse(addr2, 
range1, "different", range2, "same", range3, "different"),
 +                                                         treeResponse(addr3, 
range1, "same",      range2, "same", range3, "same"));
 +
 +        Map<SyncNodePair, SyncTask> tasks = 
toMap(RepairJob.createStandardSyncTasks(desc,
 +                                                                              
      treeResponses,
 +                                                                              
      addr1, // local
 +                                                                              
      noTransient(), // transient
 +                                                                              
      false,
 +                                                                              
      pullRepair,
 +                                                                              
      PreviewKind.ALL));
 +
 +        Assert.assertEquals(2, tasks.size());
 +
 +        SyncTask task = tasks.get(pair(addr1, addr2));
 +        Assert.assertTrue(task.isLocal());
 +        Assert.assertTrue(((LocalSyncTask) task).requestRanges);
 +        Assert.assertEquals(!pullRepair, ((LocalSyncTask) 
task).transferRanges);
 +        Assert.assertEquals(Arrays.asList(range1, range3), task.rangesToSync);
 +
 +        task = tasks.get(pair(addr2, addr3));
 +        Assert.assertFalse(task.isLocal());
 +        Assert.assertTrue(task instanceof SymmetricRemoteSyncTask);
 +        Assert.assertEquals(Arrays.asList(range1, range3), task.rangesToSync);
 +
 +        Assert.assertNull(tasks.get(pair(addr1, addr3)));
 +    }
 +
 +    @Test
 +    public void testStandardSyncTransient()
 +    {
 +        // Do not stream towards transient nodes
 +        testStandardSyncTransient(true);
 +        testStandardSyncTransient(false);
 +    }
 +
 +    public void testStandardSyncTransient(boolean pullRepair)
 +    {
 +        List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, 
range1, "same", range2, "same", range3, "same"),
 +                                                         treeResponse(addr2, 
range1, "different", range2, "same", range3, "different"));
 +
 +        Map<SyncNodePair, SyncTask> tasks = 
toMap(RepairJob.createStandardSyncTasks(desc,
 +                                                                              
      treeResponses,
 +                                                                              
      addr1, // local
 +                                                                              
      transientPredicate(addr2),
 +                                                                              
      false,
 +                                                                              
      pullRepair,
 +                                                                              
      PreviewKind.ALL));
 +
 +        Assert.assertEquals(1, tasks.size());
 +
 +        SyncTask task = tasks.get(pair(addr1, addr2));
 +        Assert.assertTrue(task.isLocal());
 +        Assert.assertTrue(((LocalSyncTask) task).requestRanges);
 +        Assert.assertFalse(((LocalSyncTask) task).transferRanges);
 +        Assert.assertEquals(Arrays.asList(range1, range3), task.rangesToSync);
 +    }
 +
 +    @Test
 +    public void testStandardSyncLocalTransient()
 +    {
 +        // Do not stream towards transient nodes
 +        testStandardSyncLocalTransient(true);
 +        testStandardSyncLocalTransient(false);
 +    }
 +
 +    public void testStandardSyncLocalTransient(boolean pullRepair)
 +    {
 +        List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, 
range1, "same", range2, "same", range3, "same"),
 +                                                         treeResponse(addr2, 
range1, "different", range2, "same", range3, "different"));
 +
 +        Map<SyncNodePair, SyncTask> tasks = 
toMap(RepairJob.createStandardSyncTasks(desc,
 +                                                                              
      treeResponses,
 +                                                                              
      addr1, // local
 +                                                                              
      transientPredicate(addr1),
 +                                                                              
      false,
 +                                                                              
      pullRepair,
 +                                                                              
      PreviewKind.ALL));
 +
 +        if (pullRepair)
 +        {
 +            Assert.assertTrue(tasks.isEmpty());
 +            return;
 +        }
 +
 +        Assert.assertEquals(1, tasks.size());
 +
 +        SyncTask task = tasks.get(pair(addr1, addr2));
 +        Assert.assertTrue(task.isLocal());
 +        Assert.assertFalse(((LocalSyncTask) task).requestRanges);
 +        Assert.assertTrue(((LocalSyncTask) task).transferRanges);
 +        Assert.assertEquals(Arrays.asList(range1, range3), task.rangesToSync);
 +    }
 +
 +    @Test
 +    public void testEmptyDifference()
 +    {
 +        // one of the nodes is a local coordinator
 +        testEmptyDifference(addr1, noTransient(), true);
 +        testEmptyDifference(addr1, noTransient(), false);
 +        testEmptyDifference(addr2, noTransient(), true);
 +        testEmptyDifference(addr2, noTransient(), false);
 +        testEmptyDifference(addr1, transientPredicate(addr1), true);
 +        testEmptyDifference(addr2, transientPredicate(addr1), true);
 +        testEmptyDifference(addr1, transientPredicate(addr1), false);
 +        testEmptyDifference(addr2, transientPredicate(addr1), false);
 +        testEmptyDifference(addr1, transientPredicate(addr2), true);
 +        testEmptyDifference(addr2, transientPredicate(addr2), true);
 +        testEmptyDifference(addr1, transientPredicate(addr2), false);
 +        testEmptyDifference(addr2, transientPredicate(addr2), false);
 +
 +        // nonlocal coordinator
 +        testEmptyDifference(addr3, noTransient(), true);
 +        testEmptyDifference(addr3, noTransient(), false);
 +        testEmptyDifference(addr3, noTransient(), true);
 +        testEmptyDifference(addr3, noTransient(), false);
 +        testEmptyDifference(addr3, transientPredicate(addr1), true);
 +        testEmptyDifference(addr3, transientPredicate(addr1), true);
 +        testEmptyDifference(addr3, transientPredicate(addr1), false);
 +        testEmptyDifference(addr3, transientPredicate(addr1), false);
 +        testEmptyDifference(addr3, transientPredicate(addr2), true);
 +        testEmptyDifference(addr3, transientPredicate(addr2), true);
 +        testEmptyDifference(addr3, transientPredicate(addr2), false);
 +        testEmptyDifference(addr3, transientPredicate(addr2), false);
 +    }
 +
 +    public void testEmptyDifference(InetAddressAndPort local, 
Predicate<InetAddressAndPort> isTransient, boolean pullRepair)
 +    {
 +        List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, 
range1, "same", range2, "same", range3, "same"),
 +                                                         treeResponse(addr2, 
range1, "same", range2, "same", range3, "same"));
 +
 +        Map<SyncNodePair, SyncTask> tasks = 
toMap(RepairJob.createStandardSyncTasks(desc,
 +                                                                              
      treeResponses,
 +                                                                              
      local, // local
 +                                                                              
      isTransient,
 +                                                                              
      false,
 +                                                                              
      pullRepair,
 +                                                                              
      PreviewKind.ALL));
 +
 +        Assert.assertTrue(tasks.isEmpty());
 +    }
 +
 +    @Test
 +    public void testCreateStandardSyncTasksAllDifferent()
 +    {
 +        List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, 
range1, "one",   range2, "one",   range3, "one"),
 +                                                         treeResponse(addr2, 
range1, "two",   range2, "two",   range3, "two"),
 +                                                         treeResponse(addr3, 
range1, "three", range2, "three", range3, "three"));
 +
 +        Map<SyncNodePair, SyncTask> tasks = 
toMap(RepairJob.createStandardSyncTasks(desc,
 +                                                                              
      treeResponses,
 +                                                                              
      addr1, // local
 +                                                                              
      ep -> ep.equals(addr3), // transient
 +                                                                              
      false,
 +                                                                              
      true,
 +                                                                              
      PreviewKind.ALL));
 +
 +        Assert.assertEquals(3, tasks.size());
 +        SyncTask task = tasks.get(pair(addr1, addr2));
 +        Assert.assertTrue(task.isLocal());
 +        Assert.assertEquals(Arrays.asList(range1, range2, range3), 
task.rangesToSync);
 +
 +        task = tasks.get(pair(addr2, addr3));
 +        Assert.assertFalse(task.isLocal());
 +        Assert.assertEquals(Arrays.asList(range1, range2, range3), 
task.rangesToSync);
 +
 +        task = tasks.get(pair(addr1, addr3));
 +        Assert.assertTrue(task.isLocal());
 +        Assert.assertEquals(Arrays.asList(range1, range2, range3), 
task.rangesToSync);
 +    }
 +
 +    @Test
 +    public void testCreate5NodeStandardSyncTasksWithTransient()
 +    {
 +        List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, 
range1, "one",   range2, "one",   range3, "one"),
 +                                                         treeResponse(addr2, 
range1, "two",   range2, "two",   range3, "two"),
 +                                                         treeResponse(addr3, 
range1, "three", range2, "three", range3, "three"),
 +                                                         treeResponse(addr4, 
range1, "four",  range2, "four",  range3, "four"),
 +                                                         treeResponse(addr5, 
range1, "five",  range2, "five",  range3, "five"));
 +
 +        Predicate<InetAddressAndPort> isTransient = ep -> ep.equals(addr4) || 
ep.equals(addr5);
 +        Map<SyncNodePair, SyncTask> tasks = 
toMap(RepairJob.createStandardSyncTasks(desc,
 +                                                                              
      treeResponses,
 +                                                                              
      addr1, // local
 +                                                                              
      isTransient, // transient
 +                                                                              
      false,
 +                                                                              
      true,
 +                                                                              
      PreviewKind.ALL));
 +
 +        SyncNodePair[] pairs = new SyncNodePair[] {pair(addr1, addr2),
 +                                                   pair(addr1, addr3),
 +                                                   pair(addr1, addr4),
 +                                                   pair(addr1, addr5),
 +                                                   pair(addr2, addr4),
 +                                                   pair(addr2, addr4),
 +                                                   pair(addr2, addr5),
 +                                                   pair(addr3, addr4),
 +                                                   pair(addr3, addr5)};
 +
 +        for (SyncNodePair pair : pairs)
 +        {
 +            SyncTask task = tasks.get(pair);
 +            // Local only if addr1 is a coordinator
 +            assertEquals(task.isLocal(), pair.coordinator.equals(addr1));
 +
 +            boolean isRemote = !pair.coordinator.equals(addr1) && 
!pair.peer.equals(addr1);
 +            boolean involvesTransient = isTransient.test(pair.coordinator) || 
isTransient.test(pair.peer);
 +            assertEquals(String.format("Coordinator: %s\n, Peer: 
%s\n",pair.coordinator, pair.peer),
 +                         isRemote && involvesTransient,
 +                         task instanceof AsymmetricRemoteSyncTask);
 +
 +            // All ranges to be synchronised
 +            Assert.assertEquals(Arrays.asList(range1, range2, range3), 
task.rangesToSync);
 +        }
 +    }
  
 -        assertEquals(3, results.size());
 -        // Should be two RemoteSyncTasks with ranges and one empty one
 -        assertExpectedDifferences(new 
ArrayList<>(session.getSyncingTasks().values()), 1, 1, 0);
 +    @Test
 +    public void testLocalSyncWithTransient()
 +    {
-         try
-         {
-             for (InetAddressAndPort local : new InetAddressAndPort[]{ addr1, 
addr2, addr3 })
-             {
-                 FBUtilities.reset();
-                 DatabaseDescriptor.setBroadcastAddress(local.address);
-                 testLocalSyncWithTransient(local, false);
-             }
-         }
-         finally
++        for (InetAddressAndPort local : new InetAddressAndPort[]{ addr1, 
addr2, addr3 })
 +        {
 +            FBUtilities.reset();
-             DatabaseDescriptor.setBroadcastAddress(addr1.address);
++            FBUtilities.setBroadcastInetAddress(local.address);
++            testLocalSyncWithTransient(local, false);
 +        }
 +    }
  
 -        int numDifferent = 0;
 -        for (SyncStat stat : results)
 +    @Test
 +    public void testLocalSyncWithTransientPullRepair()
 +    {
-         try
-         {
-             for (InetAddressAndPort local : new InetAddressAndPort[]{ addr1, 
addr2, addr3 })
-             {
-                 FBUtilities.reset();
-                 DatabaseDescriptor.setBroadcastAddress(local.address);
-                 testLocalSyncWithTransient(local, true);
-             }
-         }
-         finally
++        for (InetAddressAndPort local : new InetAddressAndPort[]{ addr1, 
addr2, addr3 })
          {
 -            if (stat.nodes.endpoint1.equals(addr2) || 
stat.nodes.endpoint2.equals(addr2))
 -            {
 -                assertEquals(1, stat.numberOfDifferences);
 -                numDifferent++;
 -            }
 +            FBUtilities.reset();
-             DatabaseDescriptor.setBroadcastAddress(addr1.address);
++            FBUtilities.setBroadcastInetAddress(local.address);
++            testLocalSyncWithTransient(local, true);
 +        }
- 
 +    }
 +
 +    public static void testLocalSyncWithTransient(InetAddressAndPort local, 
boolean pullRepair)
 +    {
 +        List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, 
range1, "one",   range2, "one",   range3, "one"),
 +                                                         treeResponse(addr2, 
range1, "two",   range2, "two",   range3, "two"),
 +                                                         treeResponse(addr3, 
range1, "three", range2, "three", range3, "three"),
 +                                                         treeResponse(addr4, 
range1, "four",  range2, "four",  range3, "four"),
 +                                                         treeResponse(addr5, 
range1, "five",  range2, "five",  range3, "five"));
 +
 +        Predicate<InetAddressAndPort> isTransient = ep -> ep.equals(addr4) || 
ep.equals(addr5);
 +        Map<SyncNodePair, SyncTask> tasks = 
toMap(RepairJob.createStandardSyncTasks(desc,
 +                                                                              
      treeResponses,
 +                                                                              
      local, // local
 +                                                                              
      isTransient, // transient
 +                                                                              
      false,
 +                                                                              
      pullRepair,
 +                                                                              
      PreviewKind.ALL));
 +
 +        assertEquals(9, tasks.size());
 +        for (InetAddressAndPort addr : new InetAddressAndPort[]{ addr1, 
addr2, addr3 })
 +        {
 +            if (local.equals(addr))
 +                continue;
 +
 +            LocalSyncTask task = (LocalSyncTask) tasks.get(pair(local, addr));
 +            assertTrue(task.requestRanges);
 +            assertEquals(!pullRepair, task.transferRanges);
 +        }
 +
 +        LocalSyncTask task = (LocalSyncTask) tasks.get(pair(local, addr4));
 +        assertTrue(task.requestRanges);
 +        assertFalse(task.transferRanges);
 +
 +        task = (LocalSyncTask) tasks.get(pair(local, addr5));
 +        assertTrue(task.requestRanges);
 +        assertFalse(task.transferRanges);
 +    }
 +
 +    @Test
 +    public void testLocalAndRemoteTransient()
 +    {
 +        testLocalAndRemoteTransient(false);
 +    }
 +
 +    @Test
 +    public void testLocalAndRemoteTransientPullRepair()
 +    {
 +        testLocalAndRemoteTransient(true);
 +    }
 +
 +    private static void testLocalAndRemoteTransient(boolean pullRepair)
 +    {
-         DatabaseDescriptor.setBroadcastAddress(addr4.address);
++        FBUtilities.setBroadcastInetAddress(addr4.address);
 +        List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, 
range1, "one", range2, "one", range3, "one"),
 +                                                         treeResponse(addr2, 
range1, "two", range2, "two", range3, "two"),
 +                                                         treeResponse(addr3, 
range1, "three", range2, "three", range3, "three"),
 +                                                         treeResponse(addr4, 
range1, "four", range2, "four", range3, "four"),
 +                                                         treeResponse(addr5, 
range1, "five", range2, "five", range3, "five"));
 +
 +        Map<SyncNodePair, SyncTask> tasks = 
toMap(RepairJob.createStandardSyncTasks(desc,
 +                                                                              
      treeResponses,
 +                                                                              
      addr4, // local
 +                                                                              
      ep -> ep.equals(addr4) || ep.equals(addr5), // transient
 +                                                                              
      false,
 +                                                                              
      pullRepair,
 +                                                                              
      PreviewKind.ALL));
 +
 +        assertNull(tasks.get(pair(addr4, addr5)));
 +    }
 +
 +    @Test
 +    public void testOptimizedCreateStandardSyncTasksAllDifferent()
 +    {
 +        List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, 
range1, "one",   range2, "one",   range3, "one"),
 +                                                         treeResponse(addr2, 
range1, "two",   range2, "two",   range3, "two"),
 +                                                         treeResponse(addr3, 
range1, "three", range2, "three", range3, "three"));
 +
 +        Map<SyncNodePair, SyncTask> tasks = 
toMap(RepairJob.createOptimisedSyncingSyncTasks(desc,
 +                                                                              
              treeResponses,
 +                                                                              
              addr1, // local
 +                                                                              
              noTransient(),
 +                                                                              
              addr -> "DC1",
 +                                                                              
              false,
 +                                                                              
              PreviewKind.ALL));
 +
 +        for (SyncNodePair pair : new SyncNodePair[]{ pair(addr1, addr2),
 +                                                     pair(addr1, addr3),
 +                                                     pair(addr2, addr1),
 +                                                     pair(addr2, addr3),
 +                                                     pair(addr3, addr1),
 +                                                     pair(addr3, addr2) })
 +        {
 +            assertEquals(Arrays.asList(range1, range2, range3), 
tasks.get(pair).rangesToSync);
 +        }
 +    }
 +
 +    @Test
 +    public void testOptimizedCreateStandardSyncTasks()
 +    {
 +        List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, 
range1, "one",   range2, "one"),
 +                                                         treeResponse(addr2, 
range1, "one",   range2, "two"),
 +                                                         treeResponse(addr3, 
range1, "three", range2, "two"));
 +
 +        Map<SyncNodePair, SyncTask> tasks = 
toMap(RepairJob.createOptimisedSyncingSyncTasks(desc,
 +                                                                              
              treeResponses,
 +                                                                              
              addr4, // local
 +                                                                              
              noTransient(),
 +                                                                              
              addr -> "DC1",
 +                                                                              
              false,
 +                                                                              
              PreviewKind.ALL));
 +
 +        for (SyncTask task : tasks.values())
 +            assertTrue(task instanceof AsymmetricRemoteSyncTask);
 +
 +        assertEquals(Arrays.asList(range1), tasks.get(pair(addr1, 
addr3)).rangesToSync);
 +        // addr1 can get range2 from either addr2 or addr3 but not from both
 +        assertStreamRangeFromEither(tasks, Arrays.asList(range2),
 +                                    addr1, addr2, addr3);
 +
 +        assertEquals(Arrays.asList(range1), tasks.get(pair(addr2, 
addr3)).rangesToSync);
 +        assertEquals(Arrays.asList(range2), tasks.get(pair(addr2, 
addr1)).rangesToSync);
 +
 +        // addr3 can get range1 from either addr1 or addr2 but not from both
 +        assertStreamRangeFromEither(tasks, Arrays.asList(range1),
 +                                    addr3, addr2, addr1);
 +        assertEquals(Arrays.asList(range2), tasks.get(pair(addr3, 
addr1)).rangesToSync);
 +    }
 +
 +    @Test
 +    public void testOptimizedCreateStandardSyncTasksWithTransient()
 +    {
 +        List<TreeResponse> treeResponses = Arrays.asList(treeResponse(addr1, 
range1, "same",      range2, "same", range3, "same"),
 +                                                         treeResponse(addr2, 
range1, "different", range2, "same", range3, "different"),
 +                                                         treeResponse(addr3, 
range1, "same",      range2, "same", range3, "same"));
 +
 +        RepairJobDesc desc = new RepairJobDesc(UUID.randomUUID(), 
UUID.randomUUID(), "ks", "cf", Arrays.asList());
 +        Map<SyncNodePair, SyncTask> tasks = 
toMap(RepairJob.createOptimisedSyncingSyncTasks(desc,
 +                                                                              
              treeResponses,
 +                                                                              
              addr1, // local
 +                                                                              
              ep -> ep.equals(addr3),
 +                                                                              
              addr -> "DC1",
 +                                                                              
              false,
 +                                                                              
              PreviewKind.ALL));
 +
 +        assertEquals(3, tasks.size());
 +        SyncTask task = tasks.get(pair(addr1, addr2));
 +        assertTrue(task.isLocal());
 +        assertElementEquals(Arrays.asList(range1, range3), task.rangesToSync);
 +        assertTrue(((LocalSyncTask)task).requestRanges);
 +        assertFalse(((LocalSyncTask)task).transferRanges);
 +
 +        assertStreamRangeFromEither(tasks, Arrays.asList(range3),
 +                                    addr2, addr1, addr3);
 +
-         task = tasks.get(pair(addr2, addr3));
-         assertFalse(task.isLocal());
-         assertElementEquals(Arrays.asList(range1), task.rangesToSync);
++        assertStreamRangeFromEither(tasks, Arrays.asList(range1),
++                                    addr2, addr1, addr3);
 +    }
 +
 +    // Asserts that ranges are streamed from one of the nodes but not from 
the both
 +    public static void assertStreamRangeFromEither(Map<SyncNodePair, 
SyncTask> tasks, List<Range<Token>> ranges,
 +                                                   InetAddressAndPort target, 
InetAddressAndPort either, InetAddressAndPort or)
 +    {
 +        InetAddressAndPort streamsFrom;
 +        InetAddressAndPort doesntStreamFrom;
-         if (tasks.containsKey(pair(target, either)))
++        if (tasks.containsKey(pair(target, either)) && tasks.get(pair(target, 
either)).rangesToSync.equals(ranges))
 +        {
 +            streamsFrom = either;
 +            doesntStreamFrom = or;
          }
 -        assertEquals(2, numDifferent);
 +        else
 +        {
 +            doesntStreamFrom = either;
 +            streamsFrom = or;
 +        }
 +
 +        SyncTask task = tasks.get(pair(target, streamsFrom));
 +        assertTrue(task instanceof AsymmetricRemoteSyncTask);
 +        assertElementEquals(ranges, task.rangesToSync);
 +        assertDoesntStreamRangeFrom(tasks, ranges, target, doesntStreamFrom);
 +    }
 +
 +    public static void assertDoesntStreamRangeFrom(Map<SyncNodePair, 
SyncTask> tasks, List<Range<Token>> ranges,
 +                                                   InetAddressAndPort target, 
InetAddressAndPort source)
 +    {
 +        Set<Range<Token>> rangeSet = new HashSet<>(ranges);
 +        SyncTask task = tasks.get(pair(target, source));
 +        if (task == null)
 +            return; // Doesn't stream anything
 +
 +        for (Range<Token> range : task.rangesToSync)
 +        {
 +            assertFalse(String.format("%s shouldn't stream %s from %s",
 +                                      target, range, source),
 +                        rangeSet.contains(range));
 +        }
 +    }
 +
 +    public static <T> void assertElementEquals(Collection<T> col1, 
Collection<T> col2)
 +    {
 +        Set<T> set1 = new HashSet<>(col1);
 +        Set<T> set2 = new HashSet<>(col2);
 +        Set<T> difference = Sets.difference(set1, set2);
 +        assertTrue("Expected empty difference but got: " + 
difference.toString(),
 +                   difference.isEmpty());
 +    }
 +
 +    public static Token tk(int i)
 +    {
 +        return PARTITIONER.getToken(ByteBufferUtil.bytes(i));
 +    }
 +
 +    public static Range<Token> range(int from, int to)
 +    {
 +        return new Range<>(tk(from), tk(to));
 +    }
 +
 +    public static TreeResponse treeResponse(InetAddressAndPort addr, 
Object... rangesAndHashes)
 +    {
 +        MerkleTrees trees = new MerkleTrees(PARTITIONER);
 +        for (int i = 0; i < rangesAndHashes.length; i += 2)
 +        {
 +            Range<Token> range = (Range<Token>) rangesAndHashes[i];
 +            String hash = (String) rangesAndHashes[i + 1];
 +            MerkleTree tree = trees.addMerkleTree(2, 
MerkleTree.RECOMMENDED_DEPTH, range);
 +            tree.get(range.left).hash(hash.getBytes());
 +        }
 +
 +        return new TreeResponse(addr, trees);
 +    }
 +
 +    public static SyncNodePair pair(InetAddressAndPort node1, 
InetAddressAndPort node2)
 +    {
 +        return new SyncNodePair(node1, node2);
 +    }
 +
 +    public static Map<SyncNodePair, SyncTask> toMap(List<SyncTask> tasks)
 +    {
 +        Map<SyncNodePair, SyncTask> map = new HashMap();
 +        for (SyncTask task : tasks)
 +        {
 +            SyncTask oldTask = map.put(task.nodePair, task);
 +            Assert.assertNull(String.format("\nNode pair: %s\nOld task:  
%s\nNew task:  %s\n",
 +                                            task.nodePair,
 +                                            oldTask,
 +                                            task),
 +                              oldTask);
 +        }
 +        return map;
 +    }
 +
 +    public static Predicate<InetAddressAndPort> 
transientPredicate(InetAddressAndPort... transientNodes)
 +    {
 +        Set<InetAddressAndPort> set = new HashSet<>();
 +        for (InetAddressAndPort node : transientNodes)
 +            set.add(node);
 +
 +        return set::contains;
      }
  
 -    private void assertExpectedDifferences(Collection<RemoteSyncTask> tasks, 
Integer ... differences)
 +    public static Predicate<InetAddressAndPort> noTransient()
      {
 -        List<Integer> expectedDifferences = new 
ArrayList<>(Arrays.asList(differences));
 -        List<Integer> observedDifferences = tasks.stream()
 -                                                 .map(t -> (int) 
t.getCurrentStat().numberOfDifferences)
 -                                                 
.collect(Collectors.toList());
 -        assertEquals(expectedDifferences.size(), observedDifferences.size());
 -        assertTrue(expectedDifferences.containsAll(observedDifferences));
 +        return node -> false;
      }
+ 
+     private MerkleTrees createInitialTree(boolean invalidate)
+     {
+         MerkleTrees tree = new MerkleTrees(MURMUR3_PARTITIONER);
+         tree.addMerkleTrees((int) Math.pow(2, 15), fullRange);
+         tree.init();
+         for (MerkleTree.TreeRange r : tree.invalids())
+         {
+             r.ensureHashInitialised();
+         }
+ 
+         if (invalidate)
+         {
+             // change a range in one of the trees
+             Token token = MURMUR3_PARTITIONER.midpoint(fullRange.get(0).left, 
fullRange.get(0).right);
+             tree.invalidate(token);
+             tree.get(token).hash("non-empty hash!".getBytes());
+         }
+ 
+         return tree;
+     }
+ 
 -    private void interceptRepairMessages(Map<InetAddress, MerkleTrees> 
mockTrees,
++    private void interceptRepairMessages(Map<InetAddressAndPort, MerkleTrees> 
mockTrees,
+                                          List<MessageOut> messageCapture)
+     {
+         MessagingService.instance().addMessageSink(new IMessageSink()
+         {
 -            public boolean allowOutgoingMessage(MessageOut message, int id, 
InetAddress to)
++            public boolean allowOutgoingMessage(MessageOut message, int id, 
InetAddressAndPort to)
+             {
+                 if (message == null || !(message.payload instanceof 
RepairMessage))
+                     return false;
+ 
+                 // So different Thread's messages don't overwrite each other.
+                 synchronized (messageLock)
+                 {
+                     messageCapture.add(message);
+                 }
+ 
+                 RepairMessage rm = (RepairMessage) message.payload;
+                 switch (rm.messageType)
+                 {
+                     case SNAPSHOT:
+                         MessageIn<?> messageIn = MessageIn.create(to, null,
+                                                                   
Collections.emptyMap(),
+                                                                   
MessagingService.Verb.REQUEST_RESPONSE,
+                                                                   
MessagingService.current_version);
+                         MessagingService.instance().receive(messageIn, id);
+                         break;
+                     case VALIDATION_REQUEST:
+                         session.validationComplete(sessionJobDesc, to, 
mockTrees.get(to));
+                         break;
+                     case SYNC_REQUEST:
+                         SyncRequest syncRequest = (SyncRequest) rm;
 -                        session.syncComplete(sessionJobDesc, new 
NodePair(syncRequest.src, syncRequest.dst), true);
++                        session.syncComplete(sessionJobDesc, new 
SyncNodePair(syncRequest.src, syncRequest.dst),
++                                             true, Collections.emptyList());
+                         break;
+                     default:
+                         break;
+                 }
+                 return false;
+             }
+ 
+             public boolean allowIncomingMessage(MessageIn message, int id)
+             {
+                 return message.verb == MessagingService.Verb.REQUEST_RESPONSE;
+             }
+         });
+     }
  }
diff --cc test/unit/org/apache/cassandra/repair/ValidatorTest.java
index 3b582a9,9c32cef..ff6b11c
--- a/test/unit/org/apache/cassandra/repair/ValidatorTest.java
+++ b/test/unit/org/apache/cassandra/repair/ValidatorTest.java
@@@ -17,22 -17,22 +17,24 @@@
   */
  package org.apache.cassandra.repair;
  
--import java.net.InetAddress;
 +import java.nio.ByteBuffer;
++import java.util.ArrayList;
  import java.util.Arrays;
  import java.util.Collections;
  import java.util.Iterator;
++import java.util.List;
  import java.util.Map;
  import java.util.UUID;
  import java.util.concurrent.CompletableFuture;
  import java.util.concurrent.TimeUnit;
  
 -import com.google.common.util.concurrent.ListenableFuture;
 -import com.google.common.util.concurrent.SettableFuture;
 +import com.google.common.hash.Hasher;
  
--import org.apache.cassandra.db.compaction.CompactionManager;
++import org.apache.cassandra.config.DatabaseDescriptor;
  import org.apache.cassandra.db.compaction.CompactionsTest;
  import org.apache.cassandra.io.sstable.format.SSTableReader;
  import org.junit.After;
++import org.junit.Before;
  import org.junit.BeforeClass;
  import org.junit.Test;
  
@@@ -70,6 -68,6 +72,7 @@@ import static org.junit.Assert.assertTr
  public class ValidatorTest
  {
      private static final long TEST_TIMEOUT = 60; //seconds
++    private static int testSizeMegabytes;
  
      private static final String keyspace = "ValidatorTest";
      private static final String columnFamily = "Standard1";
@@@ -82,13 -80,13 +85,21 @@@
          SchemaLoader.createKeyspace(keyspace,
                                      KeyspaceParams.simple(1),
                                      SchemaLoader.standardCFMD(keyspace, 
columnFamily));
 -        partitioner = Schema.instance.getCFMetaData(keyspace, 
columnFamily).partitioner;
 +        partitioner = Schema.instance.getTableMetadata(keyspace, 
columnFamily).partitioner;
++        testSizeMegabytes = 
DatabaseDescriptor.getRepairSessionSpaceInMegabytes();
      }
  
      @After
      public void tearDown()
      {
          MessagingService.instance().clearMessageSinks();
++        
DatabaseDescriptor.setRepairSessionSpaceInMegabytes(testSizeMegabytes);
++    }
++
++    @Before
++    public void setup()
++    {
++        
DatabaseDescriptor.setRepairSessionSpaceInMegabytes(testSizeMegabytes);
      }
  
      @Test
@@@ -217,45 -215,6 +228,188 @@@
          assertEquals(trees.rowCount(), n);
      }
  
++    /*
++     * Test for CASSANDRA-14096 size limiting. We:
++     * 1. Limit the size of a repair session
++     * 2. Submit a validation
++     * 3. Check that the resulting tree is of limited depth
++     */
++    @Test
++    public void testSizeLimiting() throws Exception
++    {
++        Keyspace ks = Keyspace.open(keyspace);
++        ColumnFamilyStore cfs = ks.getColumnFamilyStore(columnFamily);
++        cfs.clearUnsafe();
++
++        DatabaseDescriptor.setRepairSessionSpaceInMegabytes(1);
++
++        // disable compaction while flushing
++        cfs.disableAutoCompaction();
++
++        // 2 ** 14 rows would normally use 2^14 leaves, but with only 1 meg 
we should only use 2^12
++        CompactionsTest.populate(keyspace, columnFamily, 0, 1 << 14, 0);
++
++        cfs.forceBlockingFlush();
++        assertEquals(1, cfs.getLiveSSTables().size());
++
++        // wait enough to force single compaction
++        TimeUnit.SECONDS.sleep(5);
++
++        SSTableReader sstable = cfs.getLiveSSTables().iterator().next();
++        UUID repairSessionId = UUIDGen.getTimeUUID();
++        final RepairJobDesc desc = new RepairJobDesc(repairSessionId, 
UUIDGen.getTimeUUID(), cfs.keyspace.getName(),
++                                                     cfs.getTableName(), 
Collections.singletonList(new Range<>(sstable.first.getToken(),
++                                                                              
                                 sstable.last.getToken())));
++
++        
ActiveRepairService.instance.registerParentRepairSession(repairSessionId, 
FBUtilities.getBroadcastAddressAndPort(),
++                                                                 
Collections.singletonList(cfs), desc.ranges, false, 
ActiveRepairService.UNREPAIRED_SSTABLE,
++                                                                 false, 
PreviewKind.NONE);
++
++        final CompletableFuture<MessageOut> outgoingMessageSink = 
registerOutgoingMessageSink();
++        Validator validator = new Validator(desc, 
FBUtilities.getBroadcastAddressAndPort(), 0, true, false, PreviewKind.NONE);
++        ValidationManager.instance.submitValidation(cfs, validator);
++
++        MessageOut message = outgoingMessageSink.get(TEST_TIMEOUT, 
TimeUnit.SECONDS);
++        MerkleTrees trees = ((ValidationComplete) message.payload).trees;
++
++        Iterator<Map.Entry<Range<Token>, MerkleTree>> iterator = 
trees.iterator();
++        int numTrees = 0;
++        while (iterator.hasNext())
++        {
++            assertEquals(1 << 12, iterator.next().getValue().size(), 0.0);
++            numTrees++;
++        }
++        assertEquals(1, numTrees);
++
++        assertEquals(trees.rowCount(), 1 << 14);
++    }
++
++    /*
++     * Test for CASSANDRA-11390. When there are multiple subranges the trees 
should
++     * automatically size down to make each subrange fit in the provided 
memory
++     * 1. Limit the size of all the trees
++     * 2. Submit a validation against more than one range
++     * 3. Check that we have the right number and sizes of trees
++     */
++    @Test
++    public void testRangeSplittingTreeSizeLimit() throws Exception
++    {
++        Keyspace ks = Keyspace.open(keyspace);
++        ColumnFamilyStore cfs = ks.getColumnFamilyStore(columnFamily);
++        cfs.clearUnsafe();
++
++        DatabaseDescriptor.setRepairSessionSpaceInMegabytes(1);
++
++        // disable compaction while flushing
++        cfs.disableAutoCompaction();
++
++        // 2 ** 14 rows would normally use 2^14 leaves, but with only 1 meg 
we should only use 2^12
++        CompactionsTest.populate(keyspace, columnFamily, 0, 1 << 14, 0);
++
++        cfs.forceBlockingFlush();
++        assertEquals(1, cfs.getLiveSSTables().size());
++
++        // wait enough to force single compaction
++        TimeUnit.SECONDS.sleep(5);
++
++        SSTableReader sstable = cfs.getLiveSSTables().iterator().next();
++        UUID repairSessionId = UUIDGen.getTimeUUID();
++
++        List<Range<Token>> ranges = splitHelper(new 
Range<>(sstable.first.getToken(), sstable.last.getToken()), 2);
++
++
++        final RepairJobDesc desc = new RepairJobDesc(repairSessionId, 
UUIDGen.getTimeUUID(), cfs.keyspace.getName(),
++                                                     cfs.getTableName(), 
ranges);
++
++        
ActiveRepairService.instance.registerParentRepairSession(repairSessionId, 
FBUtilities.getBroadcastAddressAndPort(),
++                                                                 
Collections.singletonList(cfs), desc.ranges, false, 
ActiveRepairService.UNREPAIRED_SSTABLE,
++                                                                 false, 
PreviewKind.NONE);
++
++        final CompletableFuture<MessageOut> outgoingMessageSink = 
registerOutgoingMessageSink();
++        Validator validator = new Validator(desc, 
FBUtilities.getBroadcastAddressAndPort(), 0, true, false, PreviewKind.NONE);
++        ValidationManager.instance.submitValidation(cfs, validator);
++
++        MessageOut message = outgoingMessageSink.get(TEST_TIMEOUT, 
TimeUnit.SECONDS);
++        MerkleTrees trees = ((ValidationComplete) message.payload).trees;
++
++        // Should have 4 trees each with a depth of on average 10 (since each 
range should have gotten 0.25 megabytes)
++        Iterator<Map.Entry<Range<Token>, MerkleTree>> iterator = 
trees.iterator();
++        int numTrees = 0;
++        double totalResolution = 0;
++        while (iterator.hasNext())
++        {
++            long size = iterator.next().getValue().size();
++            // So it turns out that sstable range estimates are pretty 
variable, depending on the sampling we can
++            // get a wide range of values here. So we just make sure that 
we're smaller than in the single range
++            // case and have the right total size.
++            assertTrue(size <= (1 << 11));
++            assertTrue(size >= (1 << 9));
++            totalResolution += size;
++            numTrees += 1;
++        }
++
++        assertEquals(trees.rowCount(), 1 << 14);
++        assertEquals(4, numTrees);
++
++        // With a single tree and a megabyte we should had a total resolution 
of 2^12 leaves; with multiple
++        // ranges we should get similar overall resolution, but not more.
++        assertTrue(totalResolution > (1 << 11) && totalResolution < (1 << 
13));
++    }
++
++    private List<Range<Token>> splitHelper(Range<Token> range, int depth)
++    {
++        if (depth <= 0)
++        {
++            List<Range<Token>> tokens = new ArrayList<>();
++            tokens.add(range);
++            return tokens;
++        }
++        Token midpoint = partitioner.midpoint(range.left, range.right);
++        List<Range<Token>> left = splitHelper(new Range<>(range.left, 
midpoint), depth - 1);
++        List<Range<Token>> right = splitHelper(new Range<>(midpoint, 
range.right), depth - 1);
++        left.addAll(right);
++        return left;
++    }
++
 +    @Test
 +    public void testCountingHasher()
 +    {
 +        Hasher [] hashers = new Hasher[] {new Validator.CountingHasher(), 
Validator.CountingHasher.hashFunctions[0].newHasher(), 
Validator.CountingHasher.hashFunctions[1].newHasher() };
 +        byte [] random = UUIDGen.getTimeUUIDBytes();
 +
 +        // call all overloaded methods:
 +        for (Hasher hasher : hashers)
 +        {
 +            hasher.putByte((byte) 33)
 +                  .putBytes(random)
 +                  .putBytes(ByteBuffer.wrap(random))
 +                  .putBytes(random, 0, 3)
 +                  .putChar('a')
 +                  .putBoolean(false)
 +                  .putDouble(3.3)
 +                  .putInt(77)
 +                  .putFloat(99)
 +                  .putLong(101)
 +                  .putShort((short) 23);
 +        }
 +
 +        long len = Byte.BYTES
 +                   + random.length * 2 // both the byte[] and the ByteBuffer
 +                   + 3 // 3 bytes from the random byte[]
 +                   + Character.BYTES
 +                   + Byte.BYTES
 +                   + Double.BYTES
 +                   + Integer.BYTES
 +                   + Float.BYTES
 +                   + Long.BYTES
 +                   + Short.BYTES;
 +
 +        byte [] h = hashers[0].hash().asBytes();
 +        assertTrue(Arrays.equals(hashers[1].hash().asBytes(), 
Arrays.copyOfRange(h, 0, 16)));
 +        assertTrue(Arrays.equals(hashers[2].hash().asBytes(), 
Arrays.copyOfRange(h, 16, 32)));
 +        assertEquals(len, ((Validator.CountingHasher)hashers[0]).getCount());
 +    }
 +
      private CompletableFuture<MessageOut> registerOutgoingMessageSink()
      {
          final CompletableFuture<MessageOut> future = new 
CompletableFuture<>();
diff --cc test/unit/org/apache/cassandra/utils/MerkleTreeTest.java
index 64aea24,64aea24..c213271
--- a/test/unit/org/apache/cassandra/utils/MerkleTreeTest.java
+++ b/test/unit/org/apache/cassandra/utils/MerkleTreeTest.java
@@@ -19,14 -19,14 +19,18 @@@
  package org.apache.cassandra.utils;
  
  import java.math.BigInteger;
++import java.nio.ByteBuffer;
  import java.util.*;
  
  import com.google.common.collect.Lists;
  
++import org.junit.Assert;
  import org.junit.Before;
  import org.junit.Test;
  import org.apache.cassandra.config.DatabaseDescriptor;
++import org.apache.cassandra.dht.ByteOrderedPartitioner;
  import org.apache.cassandra.dht.IPartitioner;
++import org.apache.cassandra.dht.Murmur3Partitioner;
  import org.apache.cassandra.dht.RandomPartitioner;
  import org.apache.cassandra.dht.RandomPartitioner.BigIntegerToken;
  import org.apache.cassandra.dht.Range;
@@@ -62,7 -62,7 +66,7 @@@ public class MerkleTreeTes
      }
  
      @Before
--    public void clear()
++    public void setup()
      {
          TOKEN_SCALE = new BigInteger("8");
          partitioner = RandomPartitioner.instance;
@@@ -92,7 -92,7 +96,7 @@@
      {
          if (i == -1)
              return new BigIntegerToken(new BigInteger("-1"));
--        BigInteger bint = 
RandomPartitioner.MAXIMUM.divide(TOKEN_SCALE).multiply(new BigInteger(""+i));
++        BigInteger bint = 
RandomPartitioner.MAXIMUM.divide(TOKEN_SCALE).multiply(new BigInteger("" + i));
          return new BigIntegerToken(bint);
      }
  
@@@ -113,10 -113,10 +117,10 @@@
          assertEquals(new Range<>(tok(6), tok(7)), mt.get(tok(7)));
  
          // check depths
--        assertEquals((byte)1, mt.get(tok(4)).depth);
--        assertEquals((byte)2, mt.get(tok(6)).depth);
--        assertEquals((byte)3, mt.get(tok(7)).depth);
--        assertEquals((byte)3, mt.get(tok(-1)).depth);
++        assertEquals((byte) 1, mt.get(tok(4)).depth);
++        assertEquals((byte) 2, mt.get(tok(6)).depth);
++        assertEquals((byte) 3, mt.get(tok(7)).depth);
++        assertEquals((byte) 3, mt.get(tok(-1)).depth);
  
          try
          {
@@@ -132,7 -132,7 +136,7 @@@
      @Test
      public void testSplitLimitDepth()
      {
--        mt = new MerkleTree(partitioner, fullRange(), (byte)2, 
Integer.MAX_VALUE);
++        mt = new MerkleTree(partitioner, fullRange(), (byte) 2, 
Integer.MAX_VALUE);
  
          assertTrue(mt.split(tok(4)));
          assertTrue(mt.split(tok(2)));
@@@ -472,7 -472,7 +476,7 @@@
  
          List<TreeRange> diffs = MerkleTree.difference(ltree, rtree);
          assertEquals(Lists.newArrayList(range), diffs);
--        assertEquals(MerkleTree.FULLY_INCONSISTENT, 
MerkleTree.differenceHelper(ltree, rtree, new ArrayList<>(), new 
MerkleTree.TreeDifference(ltree.fullRange.left, ltree.fullRange.right, 
(byte)0)));
++        assertEquals(MerkleTree.FULLY_INCONSISTENT, 
MerkleTree.differenceHelper(ltree, rtree, new ArrayList<>(), new 
MerkleTree.TreeDifference(ltree.fullRange.left, ltree.fullRange.right, (byte) 
0)));
      }
  
      /**
@@@ -530,7 -530,7 +534,7 @@@
              {
                  // consume the stack
                  hash = Hashable.binaryHash(hstack.pop(), hash);
--                depth = dstack.pop()-1;
++                depth = dstack.pop() - 1;
              }
              dstack.push(depth);
              hstack.push(hash);
@@@ -563,4 -563,4 +567,80 @@@
              return endOfData();
          }
      }
++
++    @Test
++    public void testEstimatedSizes()
++    {
++        // With no or negative allowed space we should still get a depth of 1
++        Assert.assertEquals(1, 
MerkleTree.estimatedMaxDepthForBytes(Murmur3Partitioner.instance, -20, 32));
++        Assert.assertEquals(1, 
MerkleTree.estimatedMaxDepthForBytes(Murmur3Partitioner.instance, 0, 32));
++        Assert.assertEquals(1, 
MerkleTree.estimatedMaxDepthForBytes(Murmur3Partitioner.instance, 1, 32));
++
++        // The minimum of 1 megabyte split between RF=3 should yield trees of 
around 10
++        Assert.assertEquals(10, 
MerkleTree.estimatedMaxDepthForBytes(Murmur3Partitioner.instance,
++                                                                     1048576 
/ 3, 32));
++
++        // With a single megabyte of space we should get 12
++        Assert.assertEquals(12, 
MerkleTree.estimatedMaxDepthForBytes(Murmur3Partitioner.instance,
++                                                                     1048576, 
32));
++
++        // With 100 megabytes we should get a limit of 19
++        Assert.assertEquals(19, 
MerkleTree.estimatedMaxDepthForBytes(Murmur3Partitioner.instance,
++                                                                     100 * 
1048576, 32));
++
++        // With 300 megabytes we should get the old limit of 20
++        Assert.assertEquals(20, 
MerkleTree.estimatedMaxDepthForBytes(Murmur3Partitioner.instance,
++                                                                     300 * 
1048576, 32));
++        Assert.assertEquals(20, 
MerkleTree.estimatedMaxDepthForBytes(RandomPartitioner.instance,
++                                                                     300 * 
1048576, 32));
++        Assert.assertEquals(20, 
MerkleTree.estimatedMaxDepthForBytes(ByteOrderedPartitioner.instance,
++                                                                     300 * 
1048576, 32));
++    }
++
++    @Test
++    public void testEstimatedSizesRealMeasurement()
++    {
++        // Use a fixed source of randomness so that the test does not flake.
++        Random random = new Random(1);
++        checkEstimatedSizes(RandomPartitioner.instance, random);
++        checkEstimatedSizes(Murmur3Partitioner.instance, random);
++    }
++
++    private void checkEstimatedSizes(IPartitioner partitioner, Random random)
++    {
++        Range<Token> fullRange = new Range<>(partitioner.getMinimumToken(), 
partitioner.getMinimumToken());
++        MerkleTree tree = new MerkleTree(partitioner, fullRange, 
RECOMMENDED_DEPTH, 0);
++
++        // Test 16 kilobyte -> 16 megabytes
++        for (int i = 14; i < 24; i ++)
++        {
++            long numBytes = 1 << i;
++            int maxDepth = MerkleTree.estimatedMaxDepthForBytes(partitioner, 
numBytes, 32);
++            long realSizeOfMerkleTree = measureTree(tree, fullRange, 
maxDepth, random);
++            long biggerTreeSize = measureTree(tree, fullRange, maxDepth + 1, 
random);
++
++            Assert.assertTrue(realSizeOfMerkleTree < numBytes);
++            Assert.assertTrue(biggerTreeSize > numBytes);
++        }
++    }
++
++    private long measureTree(MerkleTree tree, Range<Token> fullRange, int 
depth, Random random)
++    {
++        tree = new MerkleTree(tree.partitioner(), fullRange, 
RECOMMENDED_DEPTH, (long) Math.pow(2, depth));
++        // Initializes it as a fully balanced tree.
++        tree.init();
++
++        byte[] key = new byte[128];
++        // Try to actually allocate some hashes. Note that this is not 
guaranteed to actually populate the tree,
++        // but we re-use the source of randomness to try to make it 
reproducible.
++        for (int i = 0; i < tree.maxsize() * 8; i++)
++        {
++            random.nextBytes(key);
++            Token token = tree.partitioner().getToken(ByteBuffer.wrap(key));
++            tree.get(token).addHash(new RowHash(token, new byte[32], 32));
++        }
++
++        tree.hash(fullRange);
++        return ObjectSizes.measureDeep(tree);
++    }
  }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to