[ 
https://issues.apache.org/jira/browse/CASSANDRA-19336?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17812834#comment-17812834
 ] 

David Capwell commented on CASSANDRA-19336:
-------------------------------------------

I did a quick POC of what I am saying with a scheduler...

{code}
    public interface Scheduler
    {

        void scheduleValidation(TimeUUID sessionId, ValidationTask task, 
Executor taskExecutor);
    }

    public static final class NoopScheduler implements Scheduler
    {
        @Override
        public void scheduleValidation(TimeUUID sessionId, ValidationTask task, 
Executor taskExecutor)
        {
            taskExecutor.execute(task);
        }
    }

    public final class LimitedConcurrentScheduler implements Scheduler
    {
        private final int concurrentValidations = 1;
        @GuardedBy("this")
        private int inflight = 0;
        @GuardedBy("this")
        private final Map<TimeUUID, Group> groups = new HashMap<>();

        @Override
        public synchronized void scheduleValidation(TimeUUID sessionId, 
ValidationTask task, Executor taskExecutor)
        {
            groups.computeIfAbsent(sessionId, ignore -> new Group(sessionId, 
taskExecutor)).add(task);
            maybeSchedule();
        }

        private synchronized void onDone(Group group, ValidationTask task, long 
durationNs)
        {
            group.update(durationNs);
            inflight--;
            maybeSchedule();
        }

        private void maybeSchedule()
        {
            if (inflight == concurrentValidations)
                return;
            Group smallest = null;
            long smallestScore = -1;
            for (var g : groups.values())
            {
                if (g.isEmpty())
                    continue;
                if (smallest == null)
                {
                    smallest = g;
                    smallestScore = g.score();
                }
                else
                {
                    var score = g.score();
                    if (score < smallestScore)
                    {
                        smallest = g;
                        smallestScore = score;
                    }
                }
            }
            if (smallest == null)
                return;
            inflight++;
            smallest.executeNext();
        }

        private class Group
        {
            private final TimeUUID sessionId;
            private final Executor taskExecutor;
            private final List<ValidationTask> tasks = new ArrayList<>();
            private final LongArrayList durations = new LongArrayList();
            private int inflight = 0;
            private int completed = 0;

            private Group(TimeUUID sessionId, Executor taskExecutor)
            {
                this.sessionId = sessionId;
                this.taskExecutor = taskExecutor;
            }

            public long score()
            {
                if (tasks.isEmpty())
                    return -1;
                long avgDuration = (long) 
durations.longStream().average().orElse(TimeUnit.HOURS.toNanos(1));
                return tasks.size() * avgDuration;
            }

            public void executeNext()
            {
                var task = tasks.get(0);
                tasks.remove(0);
                inflight++;
                var startNs = ctx.clock().nanoTime();
                task.addCallback((s, f) -> onDone(this, task, 
ctx.clock().nanoTime() - startNs));
                taskExecutor.execute(task);
            }

            public void add(ValidationTask task)
            {
                tasks.add(task);
            }

            private void update(long durationNs)
            {
                durations.add(durationNs);
                inflight--;
                completed++;
            }

            public boolean isEmpty()
            {
                return tasks.isEmpty();
            }

            @Override
            public String toString()
            {
                return "Group{" +
                       "sessionId=" + sessionId +
                       ", tasks=" + tasks.size() +
                       ", durations=" + 
durations.longStream().average().orElse(-1) +
                       ", score=" + score() +
                       ", inflight=" + inflight +
                       ", completed=" + completed +
                       '}';
            }
        }
    }
{code}

I added that to RepairCoordinator and changed CassandraRepairJob to be

{code}
session.scheduler.scheduleValidation(session.state.id, task, taskExecutor);
{code}

rather than

{code}
taskExecutor.execute(task);
{code}


The existing simulation tests are passing and I see that we limit to 1 
concurrent validation for the whole repair...  this allows concurrent streaming 
/ snapshot / paxos / etc., but limits validation.



> Repair causes out of memory
> ---------------------------
>
>                 Key: CASSANDRA-19336
>                 URL: https://issues.apache.org/jira/browse/CASSANDRA-19336
>             Project: Cassandra
>          Issue Type: Bug
>          Components: Consistency/Repair
>            Reporter: Andres de la Peña
>            Priority: Normal
>             Fix For: 4.0.x, 4.1.x, 5.0.x, 5.x
>
>
> CASSANDRA-14096 introduced {{repair_session_space}} as a limit for the memory 
> usage for Merkle tree calculations during repairs. This limit is applied to 
> the set of Merkle trees built for a received validation request 
> ({{{}VALIDATION_REQ{}}}), divided by the replication factor so as not to 
> overwhelm the repair coordinator, who will have requested RF sets of Merkle 
> trees. That way the repair coordinator should only use 
> {{repair_session_space}} for the RF Merkle trees.
> However, a repair session without {{{}-pr-{}}}/{{{}-partitioner-range{}}} 
> will send RF*RF validation requests, because the repair coordinator node has 
> RF-1 replicas and is also the replica of RF-1 nodes. Since all the requests 
> are sent at the same time, at some point the repair coordinator can have up 
> to RF*{{{}repair_session_space{}}} worth of Merkle trees if none of the 
> validation responses is fully processed before the last response arrives.
> Even worse, if the cluster uses virtual nodes, many nodes can be replicas of 
> the repair coordinator, and some nodes can be replicas of multiple token 
> ranges. It would mean that the repair coordinator can send more than RF or 
> RF*RF simultaneous validation requests.
> For example, in an 11-node cluster with RF=3 and 256 tokens, we have seen a 
> repair session involving 44 groups of ranges to be repaired. This produces 
> 44*3=132 validation requests contacting all the nodes in the cluster. When 
> the responses for all these requests start to arrive to the coordinator, each 
> containing up to {{repair_session_space}}/3 of Merkle trees, they accumulate 
> quicker than they are consumed, greatly exceeding {{repair_session_space}} 
> and OOMing the node.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to