This is an automated email from the ASF dual-hosted git repository. bdeggleston pushed a commit to branch cassandra-3.0 in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cassandra-3.0 by this push: new b30c8c9 Improve merkle tree size and time on heap b30c8c9 is described below commit b30c8c98a594a5682f6ea1f0b5511463b700b6e8 Author: Joseph Lynch <joe.e.ly...@gmail.com> AuthorDate: Tue Dec 18 14:52:33 2018 -0800 Improve merkle tree size and time on heap Patch by Joseph Lynch; Reviewed by Blake Eggleston for CASSANDRA-14096 --- CHANGES.txt | 1 + NEWS.txt | 4 +- conf/cassandra.yaml | 13 + src/java/org/apache/cassandra/config/Config.java | 3 + .../cassandra/config/DatabaseDescriptor.java | 21 ++ .../cassandra/db/compaction/CompactionManager.java | 8 +- .../org/apache/cassandra/repair/LocalSyncTask.java | 8 +- .../apache/cassandra/repair/RemoteSyncTask.java | 8 +- .../org/apache/cassandra/repair/RepairJob.java | 67 +++-- .../org/apache/cassandra/repair/RepairSession.java | 20 +- src/java/org/apache/cassandra/repair/SyncTask.java | 34 +-- .../apache/cassandra/service/StorageService.java | 10 + .../cassandra/service/StorageServiceMBean.java | 4 + .../cassandra/config/DatabaseDescriptorTest.java | 37 +++ .../apache/cassandra/repair/LocalSyncTaskTest.java | 8 +- .../org/apache/cassandra/repair/RepairJobTest.java | 325 +++++++++++++++++++++ 16 files changed, 509 insertions(+), 62 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index 5e8845f..b755751 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 3.0.19 + * Improve merkle tree size and time on heap (CASSANDRA-14096) * Add missing commands to nodetool-completion (CASSANDRA-14916) * Anti-compaction temporarily corrupts sstable state for readers (CASSANDRA-15004) Merged from 2.2: diff --git a/NEWS.txt b/NEWS.txt index 54dc63a..704fde1 100644 --- a/NEWS.txt +++ b/NEWS.txt @@ -47,8 +47,8 @@ using the provided 'sstableupgrade' tool. Upgrading --------- - - Nothing specific to this release, but please see previous upgrading sections, - especially if you are upgrading from 2.2. + - repair_session_max_tree_depth setting has been added to cassandra.yaml to allow operators to reduce + merkle tree size if repair is creating too much heap pressure. See CASSANDRA-14096 for details. 3.0.18 ====== diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml index 84664fe..c321a72 100644 --- a/conf/cassandra.yaml +++ b/conf/cassandra.yaml @@ -405,6 +405,19 @@ concurrent_materialized_view_writes: 32 # https://issues.apache.org/jira/browse/CASSANDRA-11039 memtable_allocation_type: heap_buffers +# Limits the maximum Merkle tree depth to avoid consuming too much +# memory during repairs. +# +# The default setting of 18 generates trees of maximum size around +# 50 MiB / tree. If you are running out of memory during repairs consider +# lowering this to 15 (~6 MiB / tree) or lower, but try not to lower it +# too much past that or you will lose too much resolution and stream +# too much redundant data during repair. Cannot be set lower than 10. +# +# For more details see https://issues.apache.org/jira/browse/CASSANDRA-14096. +# +# repair_session_max_tree_depth: 18 + # Total space to use for commit logs on disk. # # If space gets above this value, Cassandra will flush every dirty CF diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java index c882e40..de158bd 100644 --- a/src/java/org/apache/cassandra/config/Config.java +++ b/src/java/org/apache/cassandra/config/Config.java @@ -116,6 +116,9 @@ public class Config public Integer memtable_offheap_space_in_mb; public Float memtable_cleanup_threshold = null; + // Limit the maximum depth of repair session merkle trees + public volatile Integer repair_session_max_tree_depth = 18; + public Integer storage_port = 7000; public Integer ssl_storage_port = 7001; public String listen_address; diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java index 3145df6..8f4b338 100644 --- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java +++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java @@ -437,6 +437,11 @@ public class DatabaseDescriptor else logger.info("Global memtable off-heap threshold is enabled at {}MB", conf.memtable_offheap_space_in_mb); + if (conf.repair_session_max_tree_depth < 10) + throw new ConfigurationException("repair_session_max_tree_depth should not be < 10, but was " + conf.repair_session_max_tree_depth); + if (conf.repair_session_max_tree_depth > 20) + logger.warn("repair_session_max_tree_depth of " + conf.repair_session_max_tree_depth + " > 20 could lead to excessive memory usage"); + applyAddressConfig(config); if (conf.thrift_framed_transport_size_in_mb <= 0) @@ -1949,6 +1954,22 @@ public class DatabaseDescriptor } } + public static int getRepairSessionMaxTreeDepth() + { + return conf.repair_session_max_tree_depth; + } + + public static void setRepairSessionMaxTreeDepth(int depth) + { + if (depth < 10) + throw new ConfigurationException("Cannot set repair_session_max_tree_depth to " + depth + + " which is < 10, doing nothing"); + else if (depth > 20) + logger.warn("repair_session_max_tree_depth of " + depth + " > 20 could lead to excessive memory usage"); + + conf.repair_session_max_tree_depth = depth; + } + public static boolean getOutboundBindAny() { return Config.outboundBindAny || conf.listen_on_broadcast_address; diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java index 4295c7a..54233f2 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java @@ -1210,8 +1210,12 @@ public class CompactionManager implements CompactionManagerMBean long numPartitions = rangePartitionCounts.get(range); double rangeOwningRatio = allPartitions > 0 ? (double)numPartitions / allPartitions : 0; // determine max tree depth proportional to range size to avoid blowing up memory with multiple tress, - // capping at 20 to prevent large tree (CASSANDRA-11390) - int maxDepth = rangeOwningRatio > 0 ? (int) Math.floor(20 - Math.log(1 / rangeOwningRatio) / Math.log(2)) : 0; + // capping at a configurable depth (default 18) to prevent large tree (CASSANDRA-11390, CASSANDRA-14096) + int maxDepth = rangeOwningRatio > 0 + ? (int) Math.floor(Math.max(0.0, DatabaseDescriptor.getRepairSessionMaxTreeDepth() - + Math.log(1 / rangeOwningRatio) / Math.log(2))) + : 0; + // determine tree depth from number of partitions, capping at max tree depth (CASSANDRA-5263) int depth = numPartitions > 0 ? (int) Math.min(Math.ceil(Math.log(numPartitions) / Math.log(2)), maxDepth) : 0; tree.addMerkleTree((int) Math.pow(2, depth), range); diff --git a/src/java/org/apache/cassandra/repair/LocalSyncTask.java b/src/java/org/apache/cassandra/repair/LocalSyncTask.java index daace01..5d43868 100644 --- a/src/java/org/apache/cassandra/repair/LocalSyncTask.java +++ b/src/java/org/apache/cassandra/repair/LocalSyncTask.java @@ -47,9 +47,9 @@ public class LocalSyncTask extends SyncTask implements StreamEventHandler private final long repairedAt; - public LocalSyncTask(RepairJobDesc desc, TreeResponse r1, TreeResponse r2, long repairedAt) + public LocalSyncTask(RepairJobDesc desc, InetAddress firstEndpoint, InetAddress secondEndpoint, List<Range<Token>> rangesToSync, long repairedAt) { - super(desc, r1, r2); + super(desc, firstEndpoint, secondEndpoint, rangesToSync); this.repairedAt = repairedAt; } @@ -61,7 +61,7 @@ public class LocalSyncTask extends SyncTask implements StreamEventHandler { InetAddress local = FBUtilities.getBroadcastAddress(); // We can take anyone of the node as source or destination, however if one is localhost, we put at source to avoid a forwarding - InetAddress dst = r2.endpoint.equals(local) ? r1.endpoint : r2.endpoint; + InetAddress dst = secondEndpoint.equals(local) ? firstEndpoint : secondEndpoint; InetAddress preferred = SystemKeyspace.getPreferredIP(dst); String message = String.format("Performing streaming repair of %d ranges with %s", differences.size(), dst); @@ -110,7 +110,7 @@ public class LocalSyncTask extends SyncTask implements StreamEventHandler public void onSuccess(StreamState result) { - String message = String.format("Sync complete using session %s between %s and %s on %s", desc.sessionId, r1.endpoint, r2.endpoint, desc.columnFamily); + String message = String.format("Sync complete using session %s between %s and %s on %s", desc.sessionId, firstEndpoint, secondEndpoint, desc.columnFamily); logger.info("[repair #{}] {}", desc.sessionId, message); Tracing.traceRepair(message); set(stat); diff --git a/src/java/org/apache/cassandra/repair/RemoteSyncTask.java b/src/java/org/apache/cassandra/repair/RemoteSyncTask.java index ededc40..5af815a 100644 --- a/src/java/org/apache/cassandra/repair/RemoteSyncTask.java +++ b/src/java/org/apache/cassandra/repair/RemoteSyncTask.java @@ -41,15 +41,15 @@ public class RemoteSyncTask extends SyncTask { private static final Logger logger = LoggerFactory.getLogger(RemoteSyncTask.class); - public RemoteSyncTask(RepairJobDesc desc, TreeResponse r1, TreeResponse r2) + public RemoteSyncTask(RepairJobDesc desc, InetAddress firstEndpoint, InetAddress secondEndpoint, List<Range<Token>> rangesToSync) { - super(desc, r1, r2); + super(desc, firstEndpoint, secondEndpoint, rangesToSync); } protected void startSync(List<Range<Token>> differences) { InetAddress local = FBUtilities.getBroadcastAddress(); - SyncRequest request = new SyncRequest(desc, local, r1.endpoint, r2.endpoint, differences); + SyncRequest request = new SyncRequest(desc, local, firstEndpoint, secondEndpoint, differences); String message = String.format("Forwarding streaming repair of %d ranges to %s (to be streamed with %s)", request.ranges.size(), request.src, request.dst); logger.info("[repair #{}] {}", desc.sessionId, message); Tracing.traceRepair(message); @@ -64,7 +64,7 @@ public class RemoteSyncTask extends SyncTask } else { - setException(new RepairException(desc, String.format("Sync failed between %s and %s", r1.endpoint, r2.endpoint))); + setException(new RepairException(desc, String.format("Sync failed between %s and %s", firstEndpoint, secondEndpoint))); } } } diff --git a/src/java/org/apache/cassandra/repair/RepairJob.java b/src/java/org/apache/cassandra/repair/RepairJob.java index cba176c..5443bf8 100644 --- a/src/java/org/apache/cassandra/repair/RepairJob.java +++ b/src/java/org/apache/cassandra/repair/RepairJob.java @@ -20,14 +20,18 @@ package org.apache.cassandra.repair; import java.net.InetAddress; import java.util.*; +import com.google.common.annotations.VisibleForTesting; import com.google.common.util.concurrent.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; import org.apache.cassandra.tracing.Tracing; import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.MerkleTrees; import org.apache.cassandra.utils.Pair; /** @@ -103,35 +107,9 @@ public class RepairJob extends AbstractFuture<RepairResult> implements Runnable // When all validations complete, submit sync tasks ListenableFuture<List<SyncStat>> syncResults = Futures.transform(validations, new AsyncFunction<List<TreeResponse>, List<SyncStat>>() { - public ListenableFuture<List<SyncStat>> apply(List<TreeResponse> trees) throws Exception + public ListenableFuture<List<SyncStat>> apply(List<TreeResponse> trees) { - InetAddress local = FBUtilities.getLocalAddress(); - - List<SyncTask> syncTasks = new ArrayList<>(); - // We need to difference all trees one against another - for (int i = 0; i < trees.size() - 1; ++i) - { - TreeResponse r1 = trees.get(i); - for (int j = i + 1; j < trees.size(); ++j) - { - TreeResponse r2 = trees.get(j); - SyncTask task; - if (r1.endpoint.equals(local) || r2.endpoint.equals(local)) - { - task = new LocalSyncTask(desc, r1, r2, repairedAt); - } - else - { - task = new RemoteSyncTask(desc, r1, r2); - // RemoteSyncTask expects SyncComplete message sent back. - // Register task to RepairSession to receive response. - session.waitForSync(Pair.create(desc, new NodePair(r1.endpoint, r2.endpoint)), (RemoteSyncTask) task); - } - syncTasks.add(task); - taskExecutor.submit(task); - } - } - return Futures.allAsList(syncTasks); + return Futures.allAsList(createSyncTasks(trees, FBUtilities.getLocalAddress())); } }, taskExecutor); @@ -160,6 +138,39 @@ public class RepairJob extends AbstractFuture<RepairResult> implements Runnable Futures.getUnchecked(validations); } + @VisibleForTesting + List<SyncTask> createSyncTasks(List<TreeResponse> trees, InetAddress local) + { + List<SyncTask> syncTasks = new ArrayList<>(); + // We need to difference all trees one against another + for (int i = 0; i < trees.size() - 1; ++i) + { + TreeResponse r1 = trees.get(i); + for (int j = i + 1; j < trees.size(); ++j) + { + TreeResponse r2 = trees.get(j); + SyncTask task; + + List<Range<Token>> differences = MerkleTrees.difference(r1.trees, r2.trees); + + if (r1.endpoint.equals(local) || r2.endpoint.equals(local)) + { + task = new LocalSyncTask(desc, r1.endpoint, r2.endpoint, differences, repairedAt); + } + else + { + task = new RemoteSyncTask(desc, r1.endpoint, r2.endpoint, differences); + // RemoteSyncTask expects SyncComplete message sent back. + // Register task to RepairSession to receive response. + session.waitForSync(Pair.create(desc, new NodePair(r1.endpoint, r2.endpoint)), (RemoteSyncTask) task); + } + syncTasks.add(task); + taskExecutor.submit(task); + } + } + return syncTasks; + } + /** * Creates {@link ValidationTask} and submit them to task executor in parallel. * diff --git a/src/java/org/apache/cassandra/repair/RepairSession.java b/src/java/org/apache/cassandra/repair/RepairSession.java index 5fe306d..ac8e0a9 100644 --- a/src/java/org/apache/cassandra/repair/RepairSession.java +++ b/src/java/org/apache/cassandra/repair/RepairSession.java @@ -24,6 +24,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicBoolean; +import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Lists; import com.google.common.util.concurrent.*; import org.slf4j.Logger; @@ -53,8 +54,8 @@ import org.apache.cassandra.utils.Pair; * validationComplete()). * </li> * <li>Synchronization phase: once all trees are received, the job compares each tree with - * all the other using a so-called {@link SyncTask}. If there is difference between 2 trees, the - * concerned SyncTask will start a streaming of the difference between the 2 endpoint concerned. + * all the others and creates a {@link SyncTask} for each diverging replica. If there are differences + * between 2 trees, the concerned SyncTask stream the differences between the 2 endpoints concerned. * </li> * </ol> * The job is done once all its SyncTasks are done (i.e. have either computed no differences @@ -99,7 +100,7 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement private final ConcurrentMap<Pair<RepairJobDesc, NodePair>, RemoteSyncTask> syncingTasks = new ConcurrentHashMap<>(); // Tasks(snapshot, validate request, differencing, ...) are run on taskExecutor - public final ListeningExecutorService taskExecutor = MoreExecutors.listeningDecorator(DebuggableThreadPoolExecutor.createCachedThreadpoolWithMaxSize("RepairJobTask")); + public final ListeningExecutorService taskExecutor; private volatile boolean terminated = false; @@ -134,6 +135,13 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement this.ranges = ranges; this.endpoints = endpoints; this.repairedAt = repairedAt; + this.taskExecutor = MoreExecutors.listeningDecorator(createExecutor()); + } + + @VisibleForTesting + protected DebuggableThreadPoolExecutor createExecutor() + { + return DebuggableThreadPoolExecutor.createCachedThreadpoolWithMaxSize("RepairJobTask"); } public UUID getId() @@ -198,6 +206,12 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement task.syncComplete(success); } + @VisibleForTesting + Map<Pair<RepairJobDesc, NodePair>, RemoteSyncTask> getSyncingTasks() + { + return Collections.unmodifiableMap(syncingTasks); + } + private String repairedNodes() { StringBuilder sb = new StringBuilder(); diff --git a/src/java/org/apache/cassandra/repair/SyncTask.java b/src/java/org/apache/cassandra/repair/SyncTask.java index 8adec6f..c96caf4 100644 --- a/src/java/org/apache/cassandra/repair/SyncTask.java +++ b/src/java/org/apache/cassandra/repair/SyncTask.java @@ -17,6 +17,7 @@ */ package org.apache.cassandra.repair; +import java.net.InetAddress; import java.util.List; import com.google.common.util.concurrent.AbstractFuture; @@ -26,10 +27,9 @@ import org.slf4j.LoggerFactory; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; import org.apache.cassandra.tracing.Tracing; -import org.apache.cassandra.utils.MerkleTrees; /** - * SyncTask will calculate the difference of MerkleTree between two nodes + * SyncTask takes the difference of MerkleTrees between two nodes * and perform necessary operation to repair replica. */ public abstract class SyncTask extends AbstractFuture<SyncStat> implements Runnable @@ -37,16 +37,19 @@ public abstract class SyncTask extends AbstractFuture<SyncStat> implements Runna private static Logger logger = LoggerFactory.getLogger(SyncTask.class); protected final RepairJobDesc desc; - protected final TreeResponse r1; - protected final TreeResponse r2; + protected final InetAddress firstEndpoint; + protected final InetAddress secondEndpoint; + + private final List<Range<Token>> rangesToSync; protected volatile SyncStat stat; - public SyncTask(RepairJobDesc desc, TreeResponse r1, TreeResponse r2) + public SyncTask(RepairJobDesc desc, InetAddress firstEndpoint, InetAddress secondEndpoint, List<Range<Token>> rangesToSync) { this.desc = desc; - this.r1 = r1; - this.r2 = r2; + this.firstEndpoint = firstEndpoint; + this.secondEndpoint = secondEndpoint; + this.rangesToSync = rangesToSync; } /** @@ -54,25 +57,22 @@ public abstract class SyncTask extends AbstractFuture<SyncStat> implements Runna */ public void run() { - // compare trees, and collect differences - List<Range<Token>> differences = MerkleTrees.difference(r1.trees, r2.trees); - - stat = new SyncStat(new NodePair(r1.endpoint, r2.endpoint), differences.size()); + stat = new SyncStat(new NodePair(firstEndpoint, secondEndpoint), rangesToSync.size()); // choose a repair method based on the significance of the difference - String format = String.format("[repair #%s] Endpoints %s and %s %%s for %s", desc.sessionId, r1.endpoint, r2.endpoint, desc.columnFamily); - if (differences.isEmpty()) + String format = String.format("[repair #%s] Endpoints %s and %s %%s for %s", desc.sessionId, firstEndpoint, secondEndpoint, desc.columnFamily); + if (rangesToSync.isEmpty()) { logger.info(String.format(format, "are consistent")); - Tracing.traceRepair("Endpoint {} is consistent with {} for {}", r1.endpoint, r2.endpoint, desc.columnFamily); + Tracing.traceRepair("Endpoint {} is consistent with {} for {}", firstEndpoint, secondEndpoint, desc.columnFamily); set(stat); return; } // non-0 difference: perform streaming repair - logger.info(String.format(format, "have " + differences.size() + " range(s) out of sync")); - Tracing.traceRepair("Endpoint {} has {} range(s) out of sync with {} for {}", r1.endpoint, differences.size(), r2.endpoint, desc.columnFamily); - startSync(differences); + logger.info(String.format(format, "have " + rangesToSync.size() + " range(s) out of sync")); + Tracing.traceRepair("Endpoint {} has {} range(s) out of sync with {} for {}", firstEndpoint, rangesToSync.size(), secondEndpoint, desc.columnFamily); + startSync(rangesToSync); } public SyncStat getCurrentStat() diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index 1769970..b07401a 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -3348,6 +3348,16 @@ public class StorageService extends NotificationBroadcasterSupport implements IE ActiveRepairService.instance.terminateSessions(); } + public void setRepairSessionMaxTreeDepth(int depth) + { + DatabaseDescriptor.setRepairSessionMaxTreeDepth(depth); + } + + public int getRepairSessionMaxTreeDepth() + { + return DatabaseDescriptor.getRepairSessionMaxTreeDepth(); + } + /* End of MBean interface methods */ /** diff --git a/src/java/org/apache/cassandra/service/StorageServiceMBean.java b/src/java/org/apache/cassandra/service/StorageServiceMBean.java index 5692754..ddd2da0 100644 --- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java +++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java @@ -364,6 +364,10 @@ public interface StorageServiceMBean extends NotificationEmitter public void forceTerminateAllRepairSessions(); + public void setRepairSessionMaxTreeDepth(int depth); + + public int getRepairSessionMaxTreeDepth(); + /** * transfer this node's data to other machines and remove it from service. */ diff --git a/test/unit/org/apache/cassandra/config/DatabaseDescriptorTest.java b/test/unit/org/apache/cassandra/config/DatabaseDescriptorTest.java index c078e7b..4a43388 100644 --- a/test/unit/org/apache/cassandra/config/DatabaseDescriptorTest.java +++ b/test/unit/org/apache/cassandra/config/DatabaseDescriptorTest.java @@ -25,6 +25,7 @@ import java.net.InetAddress; import java.net.NetworkInterface; import java.util.Enumeration; +import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Test; import org.junit.runner.RunWith; @@ -42,6 +43,7 @@ import org.apache.cassandra.thrift.ThriftConversion; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; +import static org.junit.Assert.fail; @RunWith(OrderedJUnit4ClassRunner.class) public class DatabaseDescriptorTest @@ -271,4 +273,39 @@ public class DatabaseDescriptorTest DatabaseDescriptor.applyAddressConfig(testConfig); } + + @Test + public void testRepairSessionSizeToggles() + { + int previousDepth = DatabaseDescriptor.getRepairSessionMaxTreeDepth(); + try + { + Assert.assertEquals(18, DatabaseDescriptor.getRepairSessionMaxTreeDepth()); + DatabaseDescriptor.setRepairSessionMaxTreeDepth(10); + Assert.assertEquals(10, DatabaseDescriptor.getRepairSessionMaxTreeDepth()); + + try + { + DatabaseDescriptor.setRepairSessionMaxTreeDepth(9); + fail("Should have received a ConfigurationException for depth of 9"); + } + catch (ConfigurationException ignored) { } + Assert.assertEquals(10, DatabaseDescriptor.getRepairSessionMaxTreeDepth()); + + try + { + DatabaseDescriptor.setRepairSessionMaxTreeDepth(-20); + fail("Should have received a ConfigurationException for depth of -20"); + } + catch (ConfigurationException ignored) { } + Assert.assertEquals(10, DatabaseDescriptor.getRepairSessionMaxTreeDepth()); + + DatabaseDescriptor.setRepairSessionMaxTreeDepth(22); + Assert.assertEquals(22, DatabaseDescriptor.getRepairSessionMaxTreeDepth()); + } + finally + { + DatabaseDescriptor.setRepairSessionMaxTreeDepth(previousDepth); + } + } } diff --git a/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java b/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java index 6aacae6..b891296 100644 --- a/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java +++ b/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java @@ -76,7 +76,9 @@ public class LocalSyncTaskTest extends SchemaLoader // note: we reuse the same endpoint which is bogus in theory but fine here TreeResponse r1 = new TreeResponse(ep1, tree1); TreeResponse r2 = new TreeResponse(ep2, tree2); - LocalSyncTask task = new LocalSyncTask(desc, r1, r2, ActiveRepairService.UNREPAIRED_SSTABLE); + LocalSyncTask task = new LocalSyncTask(desc, r1.endpoint, r2.endpoint, + MerkleTrees.difference(r1.trees, r2.trees), + ActiveRepairService.UNREPAIRED_SSTABLE); task.run(); assertEquals(0, task.get().numberOfDifferences); @@ -111,7 +113,9 @@ public class LocalSyncTaskTest extends SchemaLoader // note: we reuse the same endpoint which is bogus in theory but fine here TreeResponse r1 = new TreeResponse(InetAddress.getByName("127.0.0.1"), tree1); TreeResponse r2 = new TreeResponse(InetAddress.getByName("127.0.0.2"), tree2); - LocalSyncTask task = new LocalSyncTask(desc, r1, r2, ActiveRepairService.UNREPAIRED_SSTABLE); + LocalSyncTask task = new LocalSyncTask(desc, r1.endpoint, r2.endpoint, + MerkleTrees.difference(r1.trees, r2.trees), + ActiveRepairService.UNREPAIRED_SSTABLE); task.run(); // ensure that the changed range was recorded diff --git a/test/unit/org/apache/cassandra/repair/RepairJobTest.java b/test/unit/org/apache/cassandra/repair/RepairJobTest.java new file mode 100644 index 0000000..2f77a34 --- /dev/null +++ b/test/unit/org/apache/cassandra/repair/RepairJobTest.java @@ -0,0 +1,325 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.repair; + +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +import com.google.common.util.concurrent.AsyncFunction; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import org.junit.After; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import org.apache.cassandra.SchemaLoader; +import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.dht.IPartitioner; +import org.apache.cassandra.dht.Murmur3Partitioner; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.net.IMessageSink; +import org.apache.cassandra.net.MessageIn; +import org.apache.cassandra.net.MessageOut; +import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.repair.messages.RepairMessage; +import org.apache.cassandra.repair.messages.SyncRequest; +import org.apache.cassandra.schema.KeyspaceParams; +import org.apache.cassandra.service.ActiveRepairService; +import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.MerkleTree; +import org.apache.cassandra.utils.MerkleTrees; +import org.apache.cassandra.utils.ObjectSizes; +import org.apache.cassandra.utils.UUIDGen; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class RepairJobTest extends SchemaLoader +{ + private static final long TEST_TIMEOUT_S = 10; + private static final long THREAD_TIMEOUT_MILLIS = 100; + private static final IPartitioner MURMUR3_PARTITIONER = Murmur3Partitioner.instance; + private static final String KEYSPACE = "RepairJobTest"; + private static final String CF = "Standard1"; + private static final Object messageLock = new Object(); + + private static final List<Range<Token>> fullRange = Collections.singletonList(new Range<>(MURMUR3_PARTITIONER.getMinimumToken(), + MURMUR3_PARTITIONER.getRandomToken())); + private static InetAddress addr1; + private static InetAddress addr2; + private static InetAddress addr3; + private static InetAddress addr4; + private RepairSession session; + private RepairJob job; + private RepairJobDesc sessionJobDesc; + + // So that threads actually get recycled and we can have accurate memory accounting while testing + // memory retention from CASSANDRA-14096 + private static class MeasureableRepairSession extends RepairSession + { + public MeasureableRepairSession(UUID parentRepairSession, UUID id, Collection<Range<Token>> ranges, String keyspace, + RepairParallelism parallelismDegree, Set<InetAddress> endpoints, long repairedAt, String... cfnames) + { + super(parentRepairSession, id, ranges, keyspace, parallelismDegree, endpoints, repairedAt, cfnames); + } + + protected DebuggableThreadPoolExecutor createExecutor() + { + DebuggableThreadPoolExecutor executor = super.createExecutor(); + executor.setKeepAliveTime(THREAD_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS); + return executor; + } + } + + @BeforeClass + public static void setupClass() throws UnknownHostException + { + SchemaLoader.prepareServer(); + SchemaLoader.createKeyspace(KEYSPACE, + KeyspaceParams.simple(1), + SchemaLoader.standardCFMD(KEYSPACE, CF)); + addr1 = InetAddress.getByName("127.0.0.1"); + addr2 = InetAddress.getByName("127.0.0.2"); + addr3 = InetAddress.getByName("127.0.0.3"); + addr4 = InetAddress.getByName("127.0.0.4"); + } + + @Before + public void setup() + { + Set<InetAddress> neighbors = new HashSet<>(Arrays.asList(addr2, addr3)); + + UUID parentRepairSession = UUID.randomUUID(); + ActiveRepairService.instance.registerParentRepairSession(parentRepairSession, FBUtilities.getBroadcastAddress(), + Collections.singletonList(Keyspace.open(KEYSPACE).getColumnFamilyStore(CF)), fullRange, false, + ActiveRepairService.UNREPAIRED_SSTABLE, false); + + this.session = new MeasureableRepairSession(parentRepairSession, UUIDGen.getTimeUUID(), fullRange, + KEYSPACE, RepairParallelism.SEQUENTIAL, neighbors, + ActiveRepairService.UNREPAIRED_SSTABLE, CF); + + this.job = new RepairJob(session, CF); + this.sessionJobDesc = new RepairJobDesc(session.parentRepairSession, session.getId(), + session.keyspace, CF, session.getRanges()); + + DatabaseDescriptor.setBroadcastAddress(addr1); + } + + @After + public void reset() + { + ActiveRepairService.instance.terminateSessions(); + MessagingService.instance().clearMessageSinks(); + } + + /** + * Ensure we can do an end to end repair of consistent data and get the messages we expect + */ + @Test + public void testEndToEndNoDifferences() throws Exception + { + Map<InetAddress, MerkleTrees> mockTrees = new HashMap<>(); + mockTrees.put(FBUtilities.getBroadcastAddress(), createInitialTree(false)); + mockTrees.put(addr2, createInitialTree(false)); + mockTrees.put(addr3, createInitialTree(false)); + + List<MessageOut> observedMessages = new ArrayList<>(); + interceptRepairMessages(mockTrees, observedMessages); + + job.run(); + + RepairResult result = job.get(TEST_TIMEOUT_S, TimeUnit.SECONDS); + + assertEquals(3, result.stats.size()); + // Should be one RemoteSyncTask left behind (other two should be local) + assertExpectedDifferences(session.getSyncingTasks().values(), 0); + + // RepairJob should send out SNAPSHOTS -> VALIDATIONS -> done + List<RepairMessage.Type> expectedTypes = new ArrayList<>(); + for (int i = 0; i < 3; i++) + expectedTypes.add(RepairMessage.Type.SNAPSHOT); + for (int i = 0; i < 3; i++) + expectedTypes.add(RepairMessage.Type.VALIDATION_REQUEST); + + assertEquals(expectedTypes, observedMessages.stream() + .map(k -> ((RepairMessage) k.payload).messageType) + .collect(Collectors.toList())); + } + + /** + * Regression test for CASSANDRA-14096. We should not retain memory in the RepairSession once the + * ValidationTask -> SyncTask transform is done. + */ + @Test + public void testNoTreesRetainedAfterDifference() throws Throwable + { + Map<InetAddress, MerkleTrees> mockTrees = new HashMap<>(); + mockTrees.put(FBUtilities.getBroadcastAddress(), createInitialTree(false)); + mockTrees.put(addr2, createInitialTree(true)); + mockTrees.put(addr3, createInitialTree(false)); + + List<MessageOut> observedMessages = new ArrayList<>(); + interceptRepairMessages(mockTrees, observedMessages); + + List<TreeResponse> mockTreeResponses = mockTrees.entrySet().stream() + .map(e -> new TreeResponse(e.getKey(), e.getValue())) + .collect(Collectors.toList()); + + long singleTreeSize = ObjectSizes.measureDeep(mockTrees.get(addr2)); + + // Use a different local address so we get all RemoteSyncs (as LocalSyncs try to reach out over the network). + List<SyncTask> syncTasks = job.createSyncTasks(mockTreeResponses, addr4); + + // SyncTasks themselves should not contain significant memory + assertTrue(ObjectSizes.measureDeep(syncTasks) < 0.8 * singleTreeSize); + + ListenableFuture<List<SyncStat>> syncResults = Futures.transform(Futures.immediateFuture(mockTreeResponses), new AsyncFunction<List<TreeResponse>, List<SyncStat>>() + { + public ListenableFuture<List<SyncStat>> apply(List<TreeResponse> treeResponses) + { + return Futures.allAsList(syncTasks); + } + }, session.taskExecutor); + + // The session can retain memory in the contained executor until the threads expire, so we wait for the threads + // that ran the Tree -> SyncTask conversions to die and release the memory + int millisUntilFreed; + for (millisUntilFreed = 0; millisUntilFreed < TEST_TIMEOUT_S * 1000; millisUntilFreed += THREAD_TIMEOUT_MILLIS) + { + // The measured size of the syncingTasks, and result of the computation should be much smaller + if (ObjectSizes.measureDeep(session) < 0.8 * singleTreeSize) + break; + TimeUnit.MILLISECONDS.sleep(THREAD_TIMEOUT_MILLIS); + } + + assertTrue(millisUntilFreed < TEST_TIMEOUT_S * 1000); + + List<SyncStat> results = syncResults.get(TEST_TIMEOUT_S, TimeUnit.SECONDS); + + assertTrue(ObjectSizes.measureDeep(results) < 0.8 * singleTreeSize); + + assertEquals(3, results.size()); + // Should be two RemoteSyncTasks with ranges and one empty one + assertExpectedDifferences(new ArrayList<>(session.getSyncingTasks().values()), 1, 1, 0); + + int numDifferent = 0; + for (SyncStat stat : results) + { + if (stat.nodes.endpoint1.equals(addr2) || stat.nodes.endpoint2.equals(addr2)) + { + assertEquals(1, stat.numberOfDifferences); + numDifferent++; + } + } + assertEquals(2, numDifferent); + } + + private void assertExpectedDifferences(Collection<RemoteSyncTask> tasks, Integer ... differences) + { + List<Integer> expectedDifferences = new ArrayList<>(Arrays.asList(differences)); + List<Integer> observedDifferences = tasks.stream() + .map(t -> (int) t.getCurrentStat().numberOfDifferences) + .collect(Collectors.toList()); + assertEquals(expectedDifferences.size(), observedDifferences.size()); + assertTrue(expectedDifferences.containsAll(observedDifferences)); + } + + private MerkleTrees createInitialTree(boolean invalidate) + { + MerkleTrees tree = new MerkleTrees(MURMUR3_PARTITIONER); + tree.addMerkleTrees((int) Math.pow(2, 15), fullRange); + tree.init(); + for (MerkleTree.TreeRange r : tree.invalids()) + { + r.ensureHashInitialised(); + } + + if (invalidate) + { + // change a range in one of the trees + Token token = MURMUR3_PARTITIONER.midpoint(fullRange.get(0).left, fullRange.get(0).right); + tree.invalidate(token); + tree.get(token).hash("non-empty hash!".getBytes()); + } + + return tree; + } + + private void interceptRepairMessages(Map<InetAddress, MerkleTrees> mockTrees, + List<MessageOut> messageCapture) + { + MessagingService.instance().addMessageSink(new IMessageSink() + { + public boolean allowOutgoingMessage(MessageOut message, int id, InetAddress to) + { + if (message == null || !(message.payload instanceof RepairMessage)) + return false; + + // So different Thread's messages don't overwrite each other. + synchronized (messageLock) + { + messageCapture.add(message); + } + + RepairMessage rm = (RepairMessage) message.payload; + switch (rm.messageType) + { + case SNAPSHOT: + MessageIn<?> messageIn = MessageIn.create(to, null, + Collections.emptyMap(), + MessagingService.Verb.REQUEST_RESPONSE, + MessagingService.current_version); + MessagingService.instance().receive(messageIn, id, System.currentTimeMillis(), false); + break; + case VALIDATION_REQUEST: + session.validationComplete(sessionJobDesc, to, mockTrees.get(to)); + break; + case SYNC_REQUEST: + SyncRequest syncRequest = (SyncRequest) rm; + session.syncComplete(sessionJobDesc, new NodePair(syncRequest.src, syncRequest.dst), true); + break; + default: + break; + } + return false; + } + + public boolean allowIncomingMessage(MessageIn message, int id) + { + return message.verb == MessagingService.Verb.REQUEST_RESPONSE; + } + }); + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org