This is an automated email from the ASF dual-hosted git repository.

marcuse pushed a commit to branch cassandra-4.0
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/cassandra-4.0 by this push:
     new c60ad61  Improve start up processing of Incremental Repair information 
read from system.repairs
c60ad61 is described below

commit c60ad61b3b6145af100578f2c652819f61729018
Author: Paul Chandler <p...@redshots.com>
AuthorDate: Thu Feb 3 09:15:02 2022 +0000

    Improve start up processing of Incremental Repair information read from 
system.repairs
    
    Patch by Paul Chandler, reviewed by Brandon Williams and Marcus Eriksson 
for CASSANDRA-17342
---
 CHANGES.txt                                        |   2 +-
 .../cassandra/repair/consistent/LocalSessions.java |  29 +++-
 .../cassandra/repair/consistent/RepairedState.java |  33 +----
 .../repair/consistent/BulkRepairStateTest.java     | 162 +++++++++++++++++++++
 4 files changed, 192 insertions(+), 34 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index de4876f..d41d293 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,5 +1,5 @@
 4.0.3
-
+ * Improve start up processing of Incremental Repair information read from 
system.repairs (CASSANDRA-17342)
 
 4.0.2
  * Full Java 11 support (CASSANDRA-16894)
diff --git a/src/java/org/apache/cassandra/repair/consistent/LocalSessions.java 
b/src/java/org/apache/cassandra/repair/consistent/LocalSessions.java
index e6ca3ee..9ee0bb0 100644
--- a/src/java/org/apache/cassandra/repair/consistent/LocalSessions.java
+++ b/src/java/org/apache/cassandra/repair/consistent/LocalSessions.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.net.UnknownHostException;
 import java.nio.ByteBuffer;
 import java.time.Instant;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Date;
 import java.util.HashMap;
@@ -212,12 +213,7 @@ public class LocalSessions
 
     private void maybeUpdateRepairedState(LocalSession session)
     {
-        if (session.getState() != FINALIZED)
-            return;
-
-        // if the session is finalized but has repairedAt set to 0, it was
-        // a forced repair, and we shouldn't update the repaired state
-        if (session.repairedAt == ActiveRepairService.UNREPAIRED_SSTABLE)
+        if (!shouldStoreSession(session))
             return;
 
         for (TableId tid : session.tableIds)
@@ -227,6 +223,16 @@ public class LocalSessions
         }
     }
 
+    private boolean shouldStoreSession(LocalSession session)
+    {
+        if (session.getState() != FINALIZED)
+            return false;
+
+        // if the session is finalized but has repairedAt set to 0, it was
+        // a forced repair, and we shouldn't update the repaired state
+        return session.repairedAt != ActiveRepairService.UNREPAIRED_SSTABLE;
+    }
+
     /**
      * Determine if all ranges and tables covered by this session
      * have since been re-repaired by a more recent session
@@ -341,13 +347,19 @@ public class LocalSessions
         Preconditions.checkArgument(sessions.isEmpty(), "No sessions should be 
added before start");
         UntypedResultSet rows = 
QueryProcessor.executeInternalWithPaging(String.format("SELECT * FROM %s.%s", 
keyspace, table), 1000);
         Map<UUID, LocalSession> loadedSessions = new HashMap<>();
+        Map<TableId, List<RepairedState.Level>> initialLevels = new 
HashMap<>();
         for (UntypedResultSet.Row row : rows)
         {
             try
             {
                 LocalSession session = load(row);
-                maybeUpdateRepairedState(session);
                 loadedSessions.put(session.sessionID, session);
+                if (shouldStoreSession(session))
+                {
+                    for (TableId tid : session.tableIds)
+                        initialLevels.computeIfAbsent(tid, (t) -> new 
ArrayList<>())
+                                     .add(new 
RepairedState.Level(session.ranges, session.repairedAt));
+                }
             }
             catch (IllegalArgumentException | NullPointerException e)
             {
@@ -356,6 +368,9 @@ public class LocalSessions
                     deleteRow(row.getUUID("parent_id"));
             }
         }
+        for (Map.Entry<TableId, List<RepairedState.Level>> entry : 
initialLevels.entrySet())
+            getRepairedState(entry.getKey()).addAll(entry.getValue());
+
         sessions = ImmutableMap.copyOf(loadedSessions);
         failOngoingRepairs();
         started = true;
diff --git a/src/java/org/apache/cassandra/repair/consistent/RepairedState.java 
b/src/java/org/apache/cassandra/repair/consistent/RepairedState.java
index ac0e7cb..ea60eec 100644
--- a/src/java/org/apache/cassandra/repair/consistent/RepairedState.java
+++ b/src/java/org/apache/cassandra/repair/consistent/RepairedState.java
@@ -23,22 +23,16 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashSet;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Objects;
 import java.util.Set;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Iterators;
-import com.google.common.collect.PeekingIterator;
 import com.google.common.collect.Sets;
 
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.service.ActiveRepairService;
-import org.apache.cassandra.utils.UUIDGen;
 
 import static 
org.apache.cassandra.service.ActiveRepairService.UNREPAIRED_SSTABLE;
 
@@ -206,32 +200,20 @@ public class RepairedState
         return state;
     }
 
-    private static List<Section> levelsToSections(List<Level> levels)
+    public synchronized void add(Collection<Range<Token>> ranges, long 
repairedAt)
     {
-        List<Section> sections = new ArrayList<>();
-        for (Level level : levels)
-        {
-            for (Range<Token> range : level.ranges)
-            {
-                sections.add(new Section(range, level.repairedAt));
-            }
-        }
-        sections.sort(Section.tokenComparator);
-        return sections;
+        addAll(Collections.singletonList(new Level(ranges, repairedAt)));
     }
 
-    public synchronized void add(Collection<Range<Token>> ranges, long 
repairedAt)
+    public void addAll(List<Level> newLevels)
     {
-        Level newLevel = new Level(ranges, repairedAt);
-
         State lastState = state;
-
-        List<Level> tmp = new ArrayList<>(lastState.levels.size() + 1);
+        List<Level> tmp = new ArrayList<>(lastState.levels.size() + 
newLevels.size());
         tmp.addAll(lastState.levels);
-        tmp.add(newLevel);
+        tmp.addAll(newLevels);
         tmp.sort(Level.timeComparator);
 
-        List<Level> levels = new ArrayList<>(lastState.levels.size() + 1);
+        List<Level> levels = new ArrayList<>(tmp.size());
         List<Range<Token>> covered = new ArrayList<>();
 
         for (Level level : tmp)
@@ -255,9 +237,8 @@ public class RepairedState
             }
         }
         sections.sort(Section.tokenComparator);
-
         state = new State(levels, covered, sections);
-    }
+       }
 
     public long minRepairedAt(Collection<Range<Token>> ranges)
     {
diff --git 
a/test/unit/org/apache/cassandra/repair/consistent/BulkRepairStateTest.java 
b/test/unit/org/apache/cassandra/repair/consistent/BulkRepairStateTest.java
new file mode 100644
index 0000000..91096f6
--- /dev/null
+++ b/test/unit/org/apache/cassandra/repair/consistent/BulkRepairStateTest.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.repair.consistent;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.cassandra.dht.Murmur3Partitioner;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+
+import static org.junit.Assert.assertEquals;
+
+public class BulkRepairStateTest
+{
+    private static Token tk(long t)
+    {
+        return new Murmur3Partitioner.LongToken(t);
+    }
+
+    private static Range<Token> range(long left, long right)
+    {
+        return new Range<>(tk(left), tk(right));
+    }
+
+    private static List<Range<Token>> ranges(long... tokens)
+    {
+        assert tokens.length % 2 == 0;
+        List<Range<Token>> ranges = new ArrayList<>();
+        for (int i = 0; i < tokens.length; i += 2)
+        {
+            ranges.add(range(tokens[i], tokens[i + 1]));
+        }
+        return ranges;
+    }
+
+    private static RepairedState.Level level(Collection<Range<Token>> ranges, 
long repairedAt)
+    {
+        return new RepairedState.Level(ranges, repairedAt);
+    }
+
+    private static RepairedState.Section sect(Range<Token> range, long 
repairedAt)
+    {
+        return new RepairedState.Section(range, repairedAt);
+    }
+
+    private static RepairedState.Section sect(int l, int r, long time)
+    {
+        return sect(range(l, r), time);
+    }
+
+    private static <T> List<T> l(T... contents)
+    {
+        return Lists.newArrayList(contents);
+    }
+
+    @Test
+    public void mergeOverlapping()
+    {
+        RepairedState repairs = new RepairedState();
+        List<RepairedState.Level> list = new ArrayList<>();
+        list.add(new RepairedState.Level(ranges(100, 300), 5));
+        list.add(new RepairedState.Level(ranges(200, 400), 6));
+        repairs.addAll(list);
+
+        RepairedState.State state = repairs.state();
+        assertEquals(l(level(ranges(200, 400), 6), level(ranges(100, 200), 
5)), state.levels);
+        assertEquals(l(sect(range(100, 200), 5), sect(range(200, 400), 6)), 
state.sections);
+        assertEquals(ranges(100, 400), state.covered);
+    }
+
+    @Test
+    public void mergeSameRange()
+    {
+        RepairedState repairs = new RepairedState();
+        List<RepairedState.Level> list = new ArrayList<>();
+        list.add(new RepairedState.Level(ranges(100, 400), 5));
+        list.add(new RepairedState.Level(ranges(100, 400), 6));
+        repairs.addAll(list);
+
+        RepairedState.State state = repairs.state();
+        assertEquals(l(level(ranges(100, 400), 6)), state.levels);
+        assertEquals(l(sect(range(100, 400), 6)), state.sections);
+        assertEquals(ranges(100, 400), state.covered);
+    }
+
+    @Test
+    public void mergeLargeRange()
+    {
+        RepairedState repairs = new RepairedState();
+
+        List<RepairedState.Level> list = new ArrayList<>();
+        list.add(new RepairedState.Level(ranges(200, 300), 5));
+        list.add(new RepairedState.Level(ranges(100, 400), 6));
+        repairs.addAll(list);
+
+        RepairedState.State state = repairs.state();
+        assertEquals(l(level(ranges(100, 400), 6)), state.levels);
+        assertEquals(l(sect(range(100, 400), 6)), state.sections);
+        assertEquals(ranges(100, 400), state.covered);
+    }
+
+    @Test
+    public void mergeSmallRange()
+    {
+        RepairedState repairs = new RepairedState();
+
+        List<RepairedState.Level> list = new ArrayList<>();
+        list.add(new RepairedState.Level(ranges(100, 400), 5));
+        list.add(new RepairedState.Level(ranges(200, 300), 6));
+        repairs.addAll(list);
+
+        RepairedState.State state = repairs.state();
+        assertEquals(l(level(ranges(200, 300), 6), level(ranges(100, 200, 300, 
400), 5)), state.levels);
+        assertEquals(l(sect(range(100, 200), 5), sect(range(200, 300), 6), 
sect(range(300, 400), 5)), state.sections);
+        assertEquals(ranges(100, 400), state.covered);
+    }
+
+    @Test
+    public void repairedAt()
+    {
+        RepairedState rs;
+
+        // overlapping
+        rs = new RepairedState();
+        List<RepairedState.Level> list = new ArrayList<>();
+        list.add(new RepairedState.Level(ranges(100, 300), 5));
+        list.add(new RepairedState.Level(ranges(200, 400), 6));
+        rs.addAll(list);
+
+        assertEquals(5, rs.minRepairedAt(ranges(150, 250)));
+        assertEquals(5, rs.minRepairedAt(ranges(150, 160)));
+        assertEquals(5, rs.minRepairedAt(ranges(100, 200)));
+        assertEquals(6, rs.minRepairedAt(ranges(200, 400)));
+        assertEquals(0, rs.minRepairedAt(ranges(200, 401)));
+        assertEquals(0, rs.minRepairedAt(ranges(99, 200)));
+        assertEquals(0, rs.minRepairedAt(ranges(50, 450)));
+        assertEquals(0, rs.minRepairedAt(ranges(50, 60)));
+        assertEquals(0, rs.minRepairedAt(ranges(450, 460)));
+    }
+}
+

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to