This is an automated email from the ASF dual-hosted git repository. marcuse pushed a commit to branch cassandra-4.0 in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cassandra-4.0 by this push: new c60ad61 Improve start up processing of Incremental Repair information read from system.repairs c60ad61 is described below commit c60ad61b3b6145af100578f2c652819f61729018 Author: Paul Chandler <p...@redshots.com> AuthorDate: Thu Feb 3 09:15:02 2022 +0000 Improve start up processing of Incremental Repair information read from system.repairs Patch by Paul Chandler, reviewed by Brandon Williams and Marcus Eriksson for CASSANDRA-17342 --- CHANGES.txt | 2 +- .../cassandra/repair/consistent/LocalSessions.java | 29 +++- .../cassandra/repair/consistent/RepairedState.java | 33 +---- .../repair/consistent/BulkRepairStateTest.java | 162 +++++++++++++++++++++ 4 files changed, 192 insertions(+), 34 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index de4876f..d41d293 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,5 +1,5 @@ 4.0.3 - + * Improve start up processing of Incremental Repair information read from system.repairs (CASSANDRA-17342) 4.0.2 * Full Java 11 support (CASSANDRA-16894) diff --git a/src/java/org/apache/cassandra/repair/consistent/LocalSessions.java b/src/java/org/apache/cassandra/repair/consistent/LocalSessions.java index e6ca3ee..9ee0bb0 100644 --- a/src/java/org/apache/cassandra/repair/consistent/LocalSessions.java +++ b/src/java/org/apache/cassandra/repair/consistent/LocalSessions.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.net.UnknownHostException; import java.nio.ByteBuffer; import java.time.Instant; +import java.util.ArrayList; import java.util.Collection; import java.util.Date; import java.util.HashMap; @@ -212,12 +213,7 @@ public class LocalSessions private void maybeUpdateRepairedState(LocalSession session) { - if (session.getState() != FINALIZED) - return; - - // if the session is finalized but has repairedAt set to 0, it was - // a forced repair, and we shouldn't update the repaired state - if (session.repairedAt == ActiveRepairService.UNREPAIRED_SSTABLE) + if (!shouldStoreSession(session)) return; for (TableId tid : session.tableIds) @@ -227,6 +223,16 @@ public class LocalSessions } } + private boolean shouldStoreSession(LocalSession session) + { + if (session.getState() != FINALIZED) + return false; + + // if the session is finalized but has repairedAt set to 0, it was + // a forced repair, and we shouldn't update the repaired state + return session.repairedAt != ActiveRepairService.UNREPAIRED_SSTABLE; + } + /** * Determine if all ranges and tables covered by this session * have since been re-repaired by a more recent session @@ -341,13 +347,19 @@ public class LocalSessions Preconditions.checkArgument(sessions.isEmpty(), "No sessions should be added before start"); UntypedResultSet rows = QueryProcessor.executeInternalWithPaging(String.format("SELECT * FROM %s.%s", keyspace, table), 1000); Map<UUID, LocalSession> loadedSessions = new HashMap<>(); + Map<TableId, List<RepairedState.Level>> initialLevels = new HashMap<>(); for (UntypedResultSet.Row row : rows) { try { LocalSession session = load(row); - maybeUpdateRepairedState(session); loadedSessions.put(session.sessionID, session); + if (shouldStoreSession(session)) + { + for (TableId tid : session.tableIds) + initialLevels.computeIfAbsent(tid, (t) -> new ArrayList<>()) + .add(new RepairedState.Level(session.ranges, session.repairedAt)); + } } catch (IllegalArgumentException | NullPointerException e) { @@ -356,6 +368,9 @@ public class LocalSessions deleteRow(row.getUUID("parent_id")); } } + for (Map.Entry<TableId, List<RepairedState.Level>> entry : initialLevels.entrySet()) + getRepairedState(entry.getKey()).addAll(entry.getValue()); + sessions = ImmutableMap.copyOf(loadedSessions); failOngoingRepairs(); started = true; diff --git a/src/java/org/apache/cassandra/repair/consistent/RepairedState.java b/src/java/org/apache/cassandra/repair/consistent/RepairedState.java index ac0e7cb..ea60eec 100644 --- a/src/java/org/apache/cassandra/repair/consistent/RepairedState.java +++ b/src/java/org/apache/cassandra/repair/consistent/RepairedState.java @@ -23,22 +23,16 @@ import java.util.Collection; import java.util.Collections; import java.util.Comparator; import java.util.HashSet; -import java.util.Iterator; import java.util.List; import java.util.Objects; import java.util.Set; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; -import com.google.common.collect.Iterables; -import com.google.common.collect.Iterators; -import com.google.common.collect.PeekingIterator; import com.google.common.collect.Sets; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; -import org.apache.cassandra.service.ActiveRepairService; -import org.apache.cassandra.utils.UUIDGen; import static org.apache.cassandra.service.ActiveRepairService.UNREPAIRED_SSTABLE; @@ -206,32 +200,20 @@ public class RepairedState return state; } - private static List<Section> levelsToSections(List<Level> levels) + public synchronized void add(Collection<Range<Token>> ranges, long repairedAt) { - List<Section> sections = new ArrayList<>(); - for (Level level : levels) - { - for (Range<Token> range : level.ranges) - { - sections.add(new Section(range, level.repairedAt)); - } - } - sections.sort(Section.tokenComparator); - return sections; + addAll(Collections.singletonList(new Level(ranges, repairedAt))); } - public synchronized void add(Collection<Range<Token>> ranges, long repairedAt) + public void addAll(List<Level> newLevels) { - Level newLevel = new Level(ranges, repairedAt); - State lastState = state; - - List<Level> tmp = new ArrayList<>(lastState.levels.size() + 1); + List<Level> tmp = new ArrayList<>(lastState.levels.size() + newLevels.size()); tmp.addAll(lastState.levels); - tmp.add(newLevel); + tmp.addAll(newLevels); tmp.sort(Level.timeComparator); - List<Level> levels = new ArrayList<>(lastState.levels.size() + 1); + List<Level> levels = new ArrayList<>(tmp.size()); List<Range<Token>> covered = new ArrayList<>(); for (Level level : tmp) @@ -255,9 +237,8 @@ public class RepairedState } } sections.sort(Section.tokenComparator); - state = new State(levels, covered, sections); - } + } public long minRepairedAt(Collection<Range<Token>> ranges) { diff --git a/test/unit/org/apache/cassandra/repair/consistent/BulkRepairStateTest.java b/test/unit/org/apache/cassandra/repair/consistent/BulkRepairStateTest.java new file mode 100644 index 0000000..91096f6 --- /dev/null +++ b/test/unit/org/apache/cassandra/repair/consistent/BulkRepairStateTest.java @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.repair.consistent; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + +import org.apache.cassandra.dht.Murmur3Partitioner; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; + +import org.junit.Test; + +import com.google.common.collect.Lists; + +import static org.junit.Assert.assertEquals; + +public class BulkRepairStateTest +{ + private static Token tk(long t) + { + return new Murmur3Partitioner.LongToken(t); + } + + private static Range<Token> range(long left, long right) + { + return new Range<>(tk(left), tk(right)); + } + + private static List<Range<Token>> ranges(long... tokens) + { + assert tokens.length % 2 == 0; + List<Range<Token>> ranges = new ArrayList<>(); + for (int i = 0; i < tokens.length; i += 2) + { + ranges.add(range(tokens[i], tokens[i + 1])); + } + return ranges; + } + + private static RepairedState.Level level(Collection<Range<Token>> ranges, long repairedAt) + { + return new RepairedState.Level(ranges, repairedAt); + } + + private static RepairedState.Section sect(Range<Token> range, long repairedAt) + { + return new RepairedState.Section(range, repairedAt); + } + + private static RepairedState.Section sect(int l, int r, long time) + { + return sect(range(l, r), time); + } + + private static <T> List<T> l(T... contents) + { + return Lists.newArrayList(contents); + } + + @Test + public void mergeOverlapping() + { + RepairedState repairs = new RepairedState(); + List<RepairedState.Level> list = new ArrayList<>(); + list.add(new RepairedState.Level(ranges(100, 300), 5)); + list.add(new RepairedState.Level(ranges(200, 400), 6)); + repairs.addAll(list); + + RepairedState.State state = repairs.state(); + assertEquals(l(level(ranges(200, 400), 6), level(ranges(100, 200), 5)), state.levels); + assertEquals(l(sect(range(100, 200), 5), sect(range(200, 400), 6)), state.sections); + assertEquals(ranges(100, 400), state.covered); + } + + @Test + public void mergeSameRange() + { + RepairedState repairs = new RepairedState(); + List<RepairedState.Level> list = new ArrayList<>(); + list.add(new RepairedState.Level(ranges(100, 400), 5)); + list.add(new RepairedState.Level(ranges(100, 400), 6)); + repairs.addAll(list); + + RepairedState.State state = repairs.state(); + assertEquals(l(level(ranges(100, 400), 6)), state.levels); + assertEquals(l(sect(range(100, 400), 6)), state.sections); + assertEquals(ranges(100, 400), state.covered); + } + + @Test + public void mergeLargeRange() + { + RepairedState repairs = new RepairedState(); + + List<RepairedState.Level> list = new ArrayList<>(); + list.add(new RepairedState.Level(ranges(200, 300), 5)); + list.add(new RepairedState.Level(ranges(100, 400), 6)); + repairs.addAll(list); + + RepairedState.State state = repairs.state(); + assertEquals(l(level(ranges(100, 400), 6)), state.levels); + assertEquals(l(sect(range(100, 400), 6)), state.sections); + assertEquals(ranges(100, 400), state.covered); + } + + @Test + public void mergeSmallRange() + { + RepairedState repairs = new RepairedState(); + + List<RepairedState.Level> list = new ArrayList<>(); + list.add(new RepairedState.Level(ranges(100, 400), 5)); + list.add(new RepairedState.Level(ranges(200, 300), 6)); + repairs.addAll(list); + + RepairedState.State state = repairs.state(); + assertEquals(l(level(ranges(200, 300), 6), level(ranges(100, 200, 300, 400), 5)), state.levels); + assertEquals(l(sect(range(100, 200), 5), sect(range(200, 300), 6), sect(range(300, 400), 5)), state.sections); + assertEquals(ranges(100, 400), state.covered); + } + + @Test + public void repairedAt() + { + RepairedState rs; + + // overlapping + rs = new RepairedState(); + List<RepairedState.Level> list = new ArrayList<>(); + list.add(new RepairedState.Level(ranges(100, 300), 5)); + list.add(new RepairedState.Level(ranges(200, 400), 6)); + rs.addAll(list); + + assertEquals(5, rs.minRepairedAt(ranges(150, 250))); + assertEquals(5, rs.minRepairedAt(ranges(150, 160))); + assertEquals(5, rs.minRepairedAt(ranges(100, 200))); + assertEquals(6, rs.minRepairedAt(ranges(200, 400))); + assertEquals(0, rs.minRepairedAt(ranges(200, 401))); + assertEquals(0, rs.minRepairedAt(ranges(99, 200))); + assertEquals(0, rs.minRepairedAt(ranges(50, 450))); + assertEquals(0, rs.minRepairedAt(ranges(50, 60))); + assertEquals(0, rs.minRepairedAt(ranges(450, 460))); + } +} + --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org