ifesdjeen commented on code in PR #3743:
URL: https://github.com/apache/cassandra/pull/3743#discussion_r1910450616


##########
src/java/org/apache/cassandra/journal/Journal.java:
##########
@@ -906,8 +941,19 @@ private String maybeAddDiskSpaceContext(String message)
     @VisibleForTesting
     public void truncateForTesting()
     {
-        advanceSegment(null);
-        segments.set(Segments.none());
+        ActiveSegment<?, ?> discarding = currentSegment;
+        if (discarding.index.size() > 0) // if there is no data in the 
segement then ignore it
+        {
+            closeCurrentSegmentForTestingIfNonEmpty();
+            // wait for the ActiveSegment to get released, else can see weird 
race conditions;
+            // this thread will see the static segmenet and will release it 
(which will delete the file),
+            // and the sync thread will then try to release and will fail as 
the file no longer exists...
+            while (discarding.selfRef().globalCount() > 0) {}

Review Comment:
   I think we need to investigate the race conditions rather than try to patch 
it here: if after waiting for `isSwitched` we still have lingering references, 
we best check which ones are there, and let them be scheduled appropriately.



##########
src/java/org/apache/cassandra/service/accord/IJournal.java:
##########
@@ -37,4 +41,12 @@ default SavedCommand.MinimalCommand loadMinimal(int 
commandStoreId, TxnId txnId,
     }
 
     Persister<DurableBefore, DurableBefore> durableBeforePersister();
+
+    RangeSearcher rangeSearcher();
+
+    interface RangeSearcher

Review Comment:
   This probably will go during rebase, since we do not have a separate 
interface in the integration anymore. You can just pull this to the top level 
on rebase, or make it as a part of one of the searcher subclasses. Either way 
works for me.



##########
src/java/org/apache/cassandra/service/accord/ParticipantsInMemoryIndex.java:
##########
@@ -0,0 +1,286 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.accord;
+
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.function.Consumer;
+
+import accord.local.StoreParticipants;
+import accord.primitives.Routable;
+import accord.primitives.Route;
+import accord.primitives.Timestamp;
+import accord.primitives.TxnId;
+import accord.primitives.Unseekable;
+import accord.utils.Invariants;
+import org.agrona.collections.Int2ObjectHashMap;
+import org.agrona.collections.Long2ObjectHashMap;
+import org.apache.cassandra.index.accord.OrderedRouteSerializer;
+import org.apache.cassandra.index.accord.ParticipantsJournalIndex;
+import org.apache.cassandra.index.accord.RouteIndexFormat;
+import org.apache.cassandra.journal.Journal;
+import org.apache.cassandra.journal.RecordPointer;
+import org.apache.cassandra.journal.StaticSegment;
+import org.apache.cassandra.schema.TableId;
+import org.apache.cassandra.service.accord.api.AccordRoutingKey;
+import org.apache.cassandra.utils.ByteArrayUtil;
+import org.apache.cassandra.utils.FastByteOperations;
+import org.apache.cassandra.utils.RTree;
+import org.apache.cassandra.utils.RangeTree;
+
+public class ParticipantsInMemoryIndex<K extends JournalKey, V> implements 
Journal.Listener<K, V>, IJournal.RangeSearcher
+{
+    private final Long2ObjectHashMap<ParticipantsInMemorySegmentIndex> 
segmentIndexes = new Long2ObjectHashMap<>();
+
+    @Override
+    public void onWrite(K id, Journal.Writer writer, Set<Integer> hosts, 
RecordPointer pointer)
+    {
+        if (!ParticipantsJournalIndex.allowed(id))
+            return;
+        SavedCommand.Writer saveCommandWriter = (SavedCommand.Writer) writer;
+        if (!saveCommandWriter.hasField(SavedCommand.Fields.PARTICIPANTS))
+            return;
+        StoreParticipants participants = 
saveCommandWriter.after().participants();
+        Route<?> route = participants.route();
+        if (route != null)
+            update(pointer.segment, id.commandStoreId, id.id, route);
+    }
+
+    public synchronized void update(long segment, int commandStoreId, TxnId 
id, Route<?> route)
+    {
+        if (!ParticipantsJournalIndex.allowed(id))
+            return;
+        Invariants.nonNull(route, "route");
+        segmentIndexes.computeIfAbsent(segment, 
ParticipantsInMemorySegmentIndex::new).add(commandStoreId, id, route);
+    }
+
+    public void update(long segment, K id, ByteBuffer buffer, int userVersion)
+    {
+        if (!ParticipantsJournalIndex.allowed(id))
+            return;
+        var participants = RouteIndexFormat.extract(id.id, buffer, 
userVersion).participants();
+        if (participants == null || participants.route() == null)
+            return;
+        update(segment, id.commandStoreId, id.id, participants.route());
+    }
+
+    @Override
+    public synchronized void onCompact(Collection<StaticSegment<K, V>> 
oldSegments, Collection<StaticSegment<K, V>> compactedSegments)
+    {
+        oldSegments.forEach(s -> segmentIndexes.remove(s.id()));
+    }
+
+    public NavigableMap<IndexRange, Set<TxnId>> search(int storeId, 
AccordRoutingKey key)
+    {
+        return search(storeId, key.table(), 
OrderedRouteSerializer.serializeRoutingKeyNoTable(key));
+    }
+
+    private synchronized NavigableMap<IndexRange, Set<TxnId>> search(int 
storeId, TableId tableId, byte[] key)
+    {
+        TreeMap<IndexRange, Set<TxnId>> matches = new TreeMap<>();
+        segmentIndexes.values().forEach(s -> s.search(storeId, tableId, key, e 
-> matches.computeIfAbsent(e.getKey(), i -> new 
HashSet<>()).add(e.getValue())));
+        return matches.isEmpty() ? Collections.emptyNavigableMap() : matches;
+    }
+
+    public NavigableMap<IndexRange, Set<TxnId>> search(int storeId, 
AccordRoutingKey start, AccordRoutingKey end)
+    {
+        return search(storeId, start.table(), 
OrderedRouteSerializer.serializeRoutingKeyNoTable(start), 
OrderedRouteSerializer.serializeRoutingKeyNoTable(end));
+    }
+
+    private synchronized NavigableMap<IndexRange, Set<TxnId>> search(int 
storeId, TableId tableId, byte[] start, byte[] end)
+    {
+        TreeMap<IndexRange, Set<TxnId>> matches = new TreeMap<>();
+        segmentIndexes.values().forEach(s -> s.search(storeId, tableId, start, 
end, e -> matches.computeIfAbsent(e.getKey(), i -> new 
HashSet<>()).add(e.getValue())));
+        return matches.isEmpty() ? Collections.emptyNavigableMap() : matches;
+    }
+
+    public synchronized void truncateForTesting()
+    {
+        segmentIndexes.clear();
+    }
+
+    @Override
+    public void intersects(int commandStoreId, TokenRange range, TxnId 
minTxnId, Timestamp maxTxnId, Consumer<TxnId> forEach)
+    {
+        var result = search(commandStoreId, range.start(), range.end());
+        TreeSet<TxnId> matches = new TreeSet<>();
+        result.values().forEach(s -> matches.addAll(s));
+        consume(matches.iterator(), minTxnId, maxTxnId, forEach);
+    }
+
+    @Override
+    public void intersects(int commandStoreId, AccordRoutingKey key, TxnId 
minTxnId, Timestamp maxTxnId, Consumer<TxnId> forEach)
+    {
+        var result = search(commandStoreId, key);
+        TreeSet<TxnId> matches = new TreeSet<>();
+        result.values().forEach(s -> matches.addAll(s));
+        consume(matches.iterator(), minTxnId, maxTxnId, forEach);
+    }
+
+    private void consume(Iterator<TxnId> it, TxnId minTxnId, Timestamp 
maxTxnId, Consumer<TxnId> forEach)
+    {
+        while (it.hasNext())
+        {
+            TxnId next = it.next();
+            if (next.compareTo(minTxnId) >= 0 && next.compareTo(maxTxnId) < 0)
+                forEach.accept(next);
+        }
+    }
+
+    private static class ParticipantsInMemorySegmentIndex
+    {
+        private final Int2ObjectHashMap<ParticipantsInMemoryStoreIndex> 
storeIndexes = new Int2ObjectHashMap<>();
+
+        private ParticipantsInMemorySegmentIndex(long segment)
+        {
+        }
+
+        public void add(int commandStoreId, TxnId id, Route<?> route)
+        {
+            storeIndexes.computeIfAbsent(commandStoreId, 
ParticipantsInMemoryStoreIndex::new).add(id, route);
+        }
+
+        public void search(int storeId, TableId tableId, byte[] start, byte[] 
end, Consumer<Map.Entry<IndexRange, TxnId>> fn)
+        {
+            ParticipantsInMemoryStoreIndex idx = storeIndexes.get(storeId);
+            if (idx == null) return;
+            idx.search(tableId, start, end, fn);
+        }
+
+        public void search(int storeId, TableId tableId, byte[] key, 
Consumer<Map.Entry<IndexRange, TxnId>> fn)
+        {
+            ParticipantsInMemoryStoreIndex idx = storeIndexes.get(storeId);
+            if (idx == null) return;
+            idx.search(tableId, key, fn);
+        }
+    }
+
+    private static class ParticipantsInMemoryStoreIndex

Review Comment:
   Since we are already scoped in `ParticipantsInMemoryIndex`, might be best to 
just call this `StoreIndex`



##########
src/java/org/apache/cassandra/service/accord/ParticipantsInMemoryIndex.java:
##########
@@ -0,0 +1,286 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.accord;
+
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.function.Consumer;
+
+import accord.local.StoreParticipants;
+import accord.primitives.Routable;
+import accord.primitives.Route;
+import accord.primitives.Timestamp;
+import accord.primitives.TxnId;
+import accord.primitives.Unseekable;
+import accord.utils.Invariants;
+import org.agrona.collections.Int2ObjectHashMap;
+import org.agrona.collections.Long2ObjectHashMap;
+import org.apache.cassandra.index.accord.OrderedRouteSerializer;
+import org.apache.cassandra.index.accord.ParticipantsJournalIndex;
+import org.apache.cassandra.index.accord.RouteIndexFormat;
+import org.apache.cassandra.journal.Journal;
+import org.apache.cassandra.journal.RecordPointer;
+import org.apache.cassandra.journal.StaticSegment;
+import org.apache.cassandra.schema.TableId;
+import org.apache.cassandra.service.accord.api.AccordRoutingKey;
+import org.apache.cassandra.utils.ByteArrayUtil;
+import org.apache.cassandra.utils.FastByteOperations;
+import org.apache.cassandra.utils.RTree;
+import org.apache.cassandra.utils.RangeTree;
+
+public class ParticipantsInMemoryIndex<K extends JournalKey, V> implements 
Journal.Listener<K, V>, IJournal.RangeSearcher
+{
+    private final Long2ObjectHashMap<ParticipantsInMemorySegmentIndex> 
segmentIndexes = new Long2ObjectHashMap<>();
+
+    @Override
+    public void onWrite(K id, Journal.Writer writer, Set<Integer> hosts, 
RecordPointer pointer)
+    {
+        if (!ParticipantsJournalIndex.allowed(id))
+            return;
+        SavedCommand.Writer saveCommandWriter = (SavedCommand.Writer) writer;
+        if (!saveCommandWriter.hasField(SavedCommand.Fields.PARTICIPANTS))
+            return;
+        StoreParticipants participants = 
saveCommandWriter.after().participants();
+        Route<?> route = participants.route();
+        if (route != null)
+            update(pointer.segment, id.commandStoreId, id.id, route);
+    }
+
+    public synchronized void update(long segment, int commandStoreId, TxnId 
id, Route<?> route)
+    {
+        if (!ParticipantsJournalIndex.allowed(id))
+            return;
+        Invariants.nonNull(route, "route");
+        segmentIndexes.computeIfAbsent(segment, 
ParticipantsInMemorySegmentIndex::new).add(commandStoreId, id, route);
+    }
+
+    public void update(long segment, K id, ByteBuffer buffer, int userVersion)
+    {
+        if (!ParticipantsJournalIndex.allowed(id))
+            return;
+        var participants = RouteIndexFormat.extract(id.id, buffer, 
userVersion).participants();
+        if (participants == null || participants.route() == null)
+            return;
+        update(segment, id.commandStoreId, id.id, participants.route());
+    }
+
+    @Override
+    public synchronized void onCompact(Collection<StaticSegment<K, V>> 
oldSegments, Collection<StaticSegment<K, V>> compactedSegments)
+    {
+        oldSegments.forEach(s -> segmentIndexes.remove(s.id()));
+    }
+
+    public NavigableMap<IndexRange, Set<TxnId>> search(int storeId, 
AccordRoutingKey key)
+    {
+        return search(storeId, key.table(), 
OrderedRouteSerializer.serializeRoutingKeyNoTable(key));
+    }
+
+    private synchronized NavigableMap<IndexRange, Set<TxnId>> search(int 
storeId, TableId tableId, byte[] key)
+    {
+        TreeMap<IndexRange, Set<TxnId>> matches = new TreeMap<>();
+        segmentIndexes.values().forEach(s -> s.search(storeId, tableId, key, e 
-> matches.computeIfAbsent(e.getKey(), i -> new 
HashSet<>()).add(e.getValue())));
+        return matches.isEmpty() ? Collections.emptyNavigableMap() : matches;
+    }
+
+    public NavigableMap<IndexRange, Set<TxnId>> search(int storeId, 
AccordRoutingKey start, AccordRoutingKey end)
+    {
+        return search(storeId, start.table(), 
OrderedRouteSerializer.serializeRoutingKeyNoTable(start), 
OrderedRouteSerializer.serializeRoutingKeyNoTable(end));
+    }
+
+    private synchronized NavigableMap<IndexRange, Set<TxnId>> search(int 
storeId, TableId tableId, byte[] start, byte[] end)
+    {
+        TreeMap<IndexRange, Set<TxnId>> matches = new TreeMap<>();
+        segmentIndexes.values().forEach(s -> s.search(storeId, tableId, start, 
end, e -> matches.computeIfAbsent(e.getKey(), i -> new 
HashSet<>()).add(e.getValue())));
+        return matches.isEmpty() ? Collections.emptyNavigableMap() : matches;
+    }
+
+    public synchronized void truncateForTesting()
+    {
+        segmentIndexes.clear();
+    }
+
+    @Override
+    public void intersects(int commandStoreId, TokenRange range, TxnId 
minTxnId, Timestamp maxTxnId, Consumer<TxnId> forEach)
+    {
+        var result = search(commandStoreId, range.start(), range.end());
+        TreeSet<TxnId> matches = new TreeSet<>();
+        result.values().forEach(s -> matches.addAll(s));
+        consume(matches.iterator(), minTxnId, maxTxnId, forEach);
+    }
+
+    @Override
+    public void intersects(int commandStoreId, AccordRoutingKey key, TxnId 
minTxnId, Timestamp maxTxnId, Consumer<TxnId> forEach)
+    {
+        var result = search(commandStoreId, key);
+        TreeSet<TxnId> matches = new TreeSet<>();
+        result.values().forEach(s -> matches.addAll(s));
+        consume(matches.iterator(), minTxnId, maxTxnId, forEach);
+    }
+
+    private void consume(Iterator<TxnId> it, TxnId minTxnId, Timestamp 
maxTxnId, Consumer<TxnId> forEach)
+    {
+        while (it.hasNext())
+        {
+            TxnId next = it.next();
+            if (next.compareTo(minTxnId) >= 0 && next.compareTo(maxTxnId) < 0)
+                forEach.accept(next);
+        }
+    }
+
+    private static class ParticipantsInMemorySegmentIndex
+    {
+        private final Int2ObjectHashMap<ParticipantsInMemoryStoreIndex> 
storeIndexes = new Int2ObjectHashMap<>();
+
+        private ParticipantsInMemorySegmentIndex(long segment)
+        {
+        }
+
+        public void add(int commandStoreId, TxnId id, Route<?> route)
+        {
+            storeIndexes.computeIfAbsent(commandStoreId, 
ParticipantsInMemoryStoreIndex::new).add(id, route);
+        }
+
+        public void search(int storeId, TableId tableId, byte[] start, byte[] 
end, Consumer<Map.Entry<IndexRange, TxnId>> fn)
+        {
+            ParticipantsInMemoryStoreIndex idx = storeIndexes.get(storeId);
+            if (idx == null) return;
+            idx.search(tableId, start, end, fn);
+        }
+
+        public void search(int storeId, TableId tableId, byte[] key, 
Consumer<Map.Entry<IndexRange, TxnId>> fn)
+        {
+            ParticipantsInMemoryStoreIndex idx = storeIndexes.get(storeId);
+            if (idx == null) return;
+            idx.search(tableId, key, fn);
+        }
+    }
+
+    private static class ParticipantsInMemoryStoreIndex
+    {
+        private final Map<TableId, ParticipantsInMemoryTableIndex> tableIndex 
= new HashMap<>();
+
+        private ParticipantsInMemoryStoreIndex(int commandStoreId)
+        {
+        }
+
+        public void add(TxnId id, Route<?> route)
+        {
+            for (var keyOrRange : route)
+                add(id, keyOrRange);
+        }
+
+        private void add(TxnId id, Unseekable keyOrRange)
+        {
+            if (keyOrRange.domain() != Routable.Domain.Range)
+                throw new IllegalArgumentException("Unexpected domain: " + 
keyOrRange.domain());
+            TokenRange ts = (TokenRange) keyOrRange;
+            TableId tableId = ts.table();
+            tableIndex.computeIfAbsent(tableId, 
ParticipantsInMemoryTableIndex::new).add(id, ts);
+        }
+
+        public void search(TableId tableId, byte[] start, byte[] end, 
Consumer<Map.Entry<IndexRange, TxnId>> fn)
+        {
+            var index = tableIndex.get(tableId);
+            if (index == null) return;
+            index.search(start, end, fn);
+        }
+
+        public void search(TableId tableId, byte[] key, 
Consumer<Map.Entry<IndexRange, TxnId>> fn)
+        {
+            var index = tableIndex.get(tableId);
+            if (index == null) return;
+            index.search(key, fn);
+        }
+    }
+
+    private static class ParticipantsInMemoryTableIndex

Review Comment:
   Same as above, just `TableIndex`



##########
test/unit/org/apache/cassandra/index/accord/RouteIndexTest.java:
##########
@@ -216,11 +207,12 @@ public void test()
     {
         cfs().disableAutoCompaction(); // let the test control compaction
         //TODO (coverage): include with the ability to mark ranges as durable 
for compaction cleanup
-        AccordService.unsafeSetNoop(); // disable accord service since 
compaction touches it.  It would be nice to include this for cleanup support....
-        stateful().withExamples(50).check(commands(() -> State::new, i -> 
cfs())
-                                          .destroySut(sut -> 
sut.truncateBlocking())
-                                          .add(FLUSH)
+        stateful().withExamples(10).withSteps(500).check(commands(() -> 
State::new, Sut::new)
+                                          .destroyState(State::close)
+                                          .destroySut(Sut::close)
+                                          .addIf(State::mayFlush, FLUSH)

Review Comment:
   May I suggest a slight change in semantics for testing a bit? 
   
   ```
   diff --git a/test/unit/org/apache/cassandra/index/accord/RouteIndexTest.java 
b/test/unit/org/apache/cassandra/index/accord/RouteIndexTest.java
   index 6807269c95..a4fc38a708 100644
   --- a/test/unit/org/apache/cassandra/index/accord/RouteIndexTest.java
   +++ b/test/unit/org/apache/cassandra/index/accord/RouteIndexTest.java
   @@ -160,7 +160,6 @@ public class RouteIndexTest extends CQLTester.InMemory
   
        private Command<State, Sut, ?> insert(RandomSource rs, State state)
        {
   -//        int storeId = rs.nextInt(0, state.numStores);
            Domain domain = state.domainGen.next(rs);
            TxnId txnId = state.nextTxnId(domain);
            Route<?> route = createRoute(state, rs, domain, rs.nextInt(1, 20));
   @@ -210,8 +209,9 @@ public class RouteIndexTest extends CQLTester.InMemory
            stateful().withExamples(10).withSteps(500).check(commands(() -> 
State::new, Sut::new)
                                              .destroyState(State::close)
                                              .destroySut(Sut::close)
   -                                          .addIf(State::mayFlush, FLUSH)
   -                                          .add(COMPACT)
   +                                          .add(CLOSE)
   +                                          .add(FLUSH)
   +                                          .add(PURGE)
                                              .add(RESTART)
                                              .add(this::insert)
                                              .add(RouteIndexTest::rangeSearch)
   @@ -378,7 +378,7 @@ public class RouteIndexTest extends CQLTester.InMemory
            }
        }
   
   -    private static final CassandraCommand FLUSH = new 
CassandraCommand("Flush")
   +    private static final CassandraCommand CLOSE = new 
CassandraCommand("Close")
        {
            @Override
            public void runUnit(Sut sut)
   @@ -387,24 +387,24 @@ public class RouteIndexTest extends CQLTester.InMemory
            }
        };
   
   -    private static final CassandraCommand COMPACT = new 
CassandraCommand("Compact")
   +    private static final CassandraCommand FLUSH = new 
CassandraCommand("Flush")
        {
            @Override
            public void runUnit(Sut sut)
            {
                sut.journal.get().runCompactorForTesting();
   -            try
   -            {
   -                sut.cfs.enableAutoCompaction();
   -                
FBUtilities.waitOnFutures(CompactionManager.instance.submitBackground(sut.cfs));
   -            }
   -            finally
   -            {
   -                sut.cfs.disableAutoCompaction();
   -            }
            }
        };
   
   +    private static final CassandraCommand PURGE = new 
CassandraCommand("Purge")
   +    {
   +        @Override
   +        public void runUnit(Sut sut)
   +        {
   +            sut.journal.get().purge(sut.stores.get());
   +        }
   +    };
   +
        private static final UnitCommand<State, Sut> RESTART = new 
UnitCommand<State, Sut>()
        {
            @Override
   @@ -544,10 +544,12 @@ public class RouteIndexTest extends CQLTester.InMemory
        {
            private final ColumnFamilyStore cfs;
            private final Supplier<AccordJournal> journal;
   +        private final Supplier<CommandStores> stores;
   
            public Sut(State state)
            {
                cfs = cfs();
   +            this.stores = () -> state.accordService.node().commandStores();
                this.journal = () -> state.accordService.journal();
   ```
   
   In short, the idea here is that Journal's flow is a bit different: we first 
_flush_ multiple segments into one SSTable, then we _compact_ multiple SSTables 
together (which would be great to have as a separate event), and then we 
_purge_ SSTables (i.e. compact them and clean up / throw away keys that state 
machine tells us we can throw away).



##########
src/java/org/apache/cassandra/service/accord/ParticipantsInMemoryIndex.java:
##########
@@ -0,0 +1,286 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.accord;
+
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.function.Consumer;
+
+import accord.local.StoreParticipants;
+import accord.primitives.Routable;
+import accord.primitives.Route;
+import accord.primitives.Timestamp;
+import accord.primitives.TxnId;
+import accord.primitives.Unseekable;
+import accord.utils.Invariants;
+import org.agrona.collections.Int2ObjectHashMap;
+import org.agrona.collections.Long2ObjectHashMap;
+import org.apache.cassandra.index.accord.OrderedRouteSerializer;
+import org.apache.cassandra.index.accord.ParticipantsJournalIndex;
+import org.apache.cassandra.index.accord.RouteIndexFormat;
+import org.apache.cassandra.journal.Journal;
+import org.apache.cassandra.journal.RecordPointer;
+import org.apache.cassandra.journal.StaticSegment;
+import org.apache.cassandra.schema.TableId;
+import org.apache.cassandra.service.accord.api.AccordRoutingKey;
+import org.apache.cassandra.utils.ByteArrayUtil;
+import org.apache.cassandra.utils.FastByteOperations;
+import org.apache.cassandra.utils.RTree;
+import org.apache.cassandra.utils.RangeTree;
+
+public class ParticipantsInMemoryIndex<K extends JournalKey, V> implements 
Journal.Listener<K, V>, IJournal.RangeSearcher
+{
+    private final Long2ObjectHashMap<ParticipantsInMemorySegmentIndex> 
segmentIndexes = new Long2ObjectHashMap<>();
+
+    @Override
+    public void onWrite(K id, Journal.Writer writer, Set<Integer> hosts, 
RecordPointer pointer)
+    {
+        if (!ParticipantsJournalIndex.allowed(id))
+            return;
+        SavedCommand.Writer saveCommandWriter = (SavedCommand.Writer) writer;
+        if (!saveCommandWriter.hasField(SavedCommand.Fields.PARTICIPANTS))
+            return;
+        StoreParticipants participants = 
saveCommandWriter.after().participants();
+        Route<?> route = participants.route();
+        if (route != null)
+            update(pointer.segment, id.commandStoreId, id.id, route);
+    }
+
+    public synchronized void update(long segment, int commandStoreId, TxnId 
id, Route<?> route)
+    {
+        if (!ParticipantsJournalIndex.allowed(id))
+            return;
+        Invariants.nonNull(route, "route");
+        segmentIndexes.computeIfAbsent(segment, 
ParticipantsInMemorySegmentIndex::new).add(commandStoreId, id, route);
+    }
+
+    public void update(long segment, K id, ByteBuffer buffer, int userVersion)
+    {
+        if (!ParticipantsJournalIndex.allowed(id))
+            return;
+        var participants = RouteIndexFormat.extract(id.id, buffer, 
userVersion).participants();

Review Comment:
   nit: var



##########
src/java/org/apache/cassandra/service/accord/ParticipantsInMemoryIndex.java:
##########


Review Comment:
   There are a few `var`s in this file; would you mind to ask Idea to replace 
them with full types?



##########
src/java/org/apache/cassandra/index/accord/ParticipantsJournalIndex.java:
##########


Review Comment:
   There is a TODO saying `flesh this stuff out...` in the middle of this 
class. Does this still apply? It is unlikely we will return to this class, or 
at least this is not on the roadmap; maybe best to check this out now or at 
least describe what fleshing out means?



##########
src/java/org/apache/cassandra/journal/Journal.java:
##########
@@ -124,6 +126,36 @@ public class Journal<K, V> implements Shutdownable
 
     final OpOrder readOrder = new OpOrder();
 
+    public interface Listener<K, V>
+    {
+        void onWrite(K id, Writer writer, Set<Integer> hosts, RecordPointer 
pointer);

Review Comment:
   We already have a `flush` notification, which is probably the only thing we 
need to have in a Journal API. Moreover, this functionality probably belongs 
only to `AccordJournal`, and not to Journal. One of the options would be to 
update in-memory index on flush (in `saveCommand`).
   
   As regards compaction, I think the best integration point is 
`AccordSegmentCompactor`, since we have both old and new sets there, and it is 
accord-only, so we are not modifying the Journal API.



##########
src/java/org/apache/cassandra/journal/Journal.java:
##########
@@ -906,8 +941,19 @@ private String maybeAddDiskSpaceContext(String message)
     @VisibleForTesting
     public void truncateForTesting()
     {
-        advanceSegment(null);
-        segments.set(Segments.none());
+        ActiveSegment<?, ?> discarding = currentSegment;
+        if (discarding.index.size() > 0) // if there is no data in the 
segement then ignore it

Review Comment:
   Is there any reason not to use `discarding.isEmpty()`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to