[ https://issues.apache.org/jira/browse/CASSANDRA-19336?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17812834#comment-17812834 ]
David Capwell commented on CASSANDRA-19336: ------------------------------------------- I did a quick POC of what I am saying with a scheduler... {code} public interface Scheduler { void scheduleValidation(TimeUUID sessionId, ValidationTask task, Executor taskExecutor); } public static final class NoopScheduler implements Scheduler { @Override public void scheduleValidation(TimeUUID sessionId, ValidationTask task, Executor taskExecutor) { taskExecutor.execute(task); } } public final class LimitedConcurrentScheduler implements Scheduler { private final int concurrentValidations = 1; @GuardedBy("this") private int inflight = 0; @GuardedBy("this") private final Map<TimeUUID, Group> groups = new HashMap<>(); @Override public synchronized void scheduleValidation(TimeUUID sessionId, ValidationTask task, Executor taskExecutor) { groups.computeIfAbsent(sessionId, ignore -> new Group(sessionId, taskExecutor)).add(task); maybeSchedule(); } private synchronized void onDone(Group group, ValidationTask task, long durationNs) { group.update(durationNs); inflight--; maybeSchedule(); } private void maybeSchedule() { if (inflight == concurrentValidations) return; Group smallest = null; long smallestScore = -1; for (var g : groups.values()) { if (g.isEmpty()) continue; if (smallest == null) { smallest = g; smallestScore = g.score(); } else { var score = g.score(); if (score < smallestScore) { smallest = g; smallestScore = score; } } } if (smallest == null) return; inflight++; smallest.executeNext(); } private class Group { private final TimeUUID sessionId; private final Executor taskExecutor; private final List<ValidationTask> tasks = new ArrayList<>(); private final LongArrayList durations = new LongArrayList(); private int inflight = 0; private int completed = 0; private Group(TimeUUID sessionId, Executor taskExecutor) { this.sessionId = sessionId; this.taskExecutor = taskExecutor; } public long score() { if (tasks.isEmpty()) return -1; long avgDuration = (long) durations.longStream().average().orElse(TimeUnit.HOURS.toNanos(1)); return tasks.size() * avgDuration; } public void executeNext() { var task = tasks.get(0); tasks.remove(0); inflight++; var startNs = ctx.clock().nanoTime(); task.addCallback((s, f) -> onDone(this, task, ctx.clock().nanoTime() - startNs)); taskExecutor.execute(task); } public void add(ValidationTask task) { tasks.add(task); } private void update(long durationNs) { durations.add(durationNs); inflight--; completed++; } public boolean isEmpty() { return tasks.isEmpty(); } @Override public String toString() { return "Group{" + "sessionId=" + sessionId + ", tasks=" + tasks.size() + ", durations=" + durations.longStream().average().orElse(-1) + ", score=" + score() + ", inflight=" + inflight + ", completed=" + completed + '}'; } } } {code} I added that to RepairCoordinator and changed CassandraRepairJob to be {code} session.scheduler.scheduleValidation(session.state.id, task, taskExecutor); {code} rather than {code} taskExecutor.execute(task); {code} The existing simulation tests are passing and I see that we limit to 1 concurrent validation for the whole repair... this allows concurrent streaming / snapshot / paxos / etc., but limits validation. > Repair causes out of memory > --------------------------- > > Key: CASSANDRA-19336 > URL: https://issues.apache.org/jira/browse/CASSANDRA-19336 > Project: Cassandra > Issue Type: Bug > Components: Consistency/Repair > Reporter: Andres de la Peña > Priority: Normal > Fix For: 4.0.x, 4.1.x, 5.0.x, 5.x > > > CASSANDRA-14096 introduced {{repair_session_space}} as a limit for the memory > usage for Merkle tree calculations during repairs. This limit is applied to > the set of Merkle trees built for a received validation request > ({{{}VALIDATION_REQ{}}}), divided by the replication factor so as not to > overwhelm the repair coordinator, who will have requested RF sets of Merkle > trees. That way the repair coordinator should only use > {{repair_session_space}} for the RF Merkle trees. > However, a repair session without {{{}-pr-{}}}/{{{}-partitioner-range{}}} > will send RF*RF validation requests, because the repair coordinator node has > RF-1 replicas and is also the replica of RF-1 nodes. Since all the requests > are sent at the same time, at some point the repair coordinator can have up > to RF*{{{}repair_session_space{}}} worth of Merkle trees if none of the > validation responses is fully processed before the last response arrives. > Even worse, if the cluster uses virtual nodes, many nodes can be replicas of > the repair coordinator, and some nodes can be replicas of multiple token > ranges. It would mean that the repair coordinator can send more than RF or > RF*RF simultaneous validation requests. > For example, in an 11-node cluster with RF=3 and 256 tokens, we have seen a > repair session involving 44 groups of ranges to be repaired. This produces > 44*3=132 validation requests contacting all the nodes in the cluster. When > the responses for all these requests start to arrive to the coordinator, each > containing up to {{repair_session_space}}/3 of Merkle trees, they accumulate > quicker than they are consumed, greatly exceeding {{repair_session_space}} > and OOMing the node. -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org