ifesdjeen commented on code in PR #3743:
URL: https://github.com/apache/cassandra/pull/3743#discussion_r1918072293


##########
src/java/org/apache/cassandra/service/accord/RouteInMemoryIndex.java:
##########
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.accord;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.function.Consumer;
+
+import accord.impl.CommandChange.Field;
+import accord.primitives.Routable;
+import accord.primitives.Route;
+import accord.primitives.Timestamp;
+import accord.primitives.TxnId;
+import accord.primitives.Unseekable;
+import accord.utils.Invariants;
+import org.agrona.collections.Int2ObjectHashMap;
+import org.agrona.collections.Long2ObjectHashMap;
+import org.apache.cassandra.index.accord.OrderedRouteSerializer;
+import org.apache.cassandra.index.accord.RouteJournalIndex;
+import org.apache.cassandra.journal.Journal;
+import org.apache.cassandra.journal.RecordPointer;
+import org.apache.cassandra.journal.StaticSegment;
+import org.apache.cassandra.schema.TableId;
+import org.apache.cassandra.service.accord.api.AccordRoutingKey;
+import org.apache.cassandra.utils.ByteArrayUtil;
+import org.apache.cassandra.utils.FastByteOperations;
+import org.apache.cassandra.utils.RTree;
+import org.apache.cassandra.utils.RangeTree;
+
+public class RouteInMemoryIndex<K extends JournalKey, V> implements 
RangeSearcher
+{
+    private final Long2ObjectHashMap<SegmentIndex> segmentIndexes = new 
Long2ObjectHashMap<>();
+
+    public boolean isSupported(JournalKey id)
+    {
+        return RouteJournalIndex.allowed(id);
+    }
+
+    public void onWrite(JournalKey id, Journal.Writer writer, RecordPointer 
pointer)
+    {
+        if (!RouteJournalIndex.allowed(id))
+            return;
+        AccordJournal.Writer saveCommandWriter = (AccordJournal.Writer) writer;
+        if (!saveCommandWriter.hasField(Field.PARTICIPANTS))
+            return;
+        Route<?> route = saveCommandWriter.after.participants().route();
+        if (route != null)
+            update(pointer.segment, id.commandStoreId, id.id, route);
+    }
+
+    public synchronized void update(long segment, int commandStoreId, TxnId 
id, Route<?> route)
+    {
+        if (!RouteJournalIndex.allowed(id))
+            return;
+        Invariants.nonNull(route, "route");
+        segmentIndexes.computeIfAbsent(segment, 
SegmentIndex::new).add(commandStoreId, id, route);
+    }
+
+    public synchronized void onCompact(Collection<StaticSegment<JournalKey, 
V>> oldSegments)
+    {
+        // As of this writing compact in accord journal takes StaticSegments, 
writes them to a SSTable, and pushes to a table;
+        // it then stops managing those segments... for this reason 
compactedSegments is normally empty and none of the
+        // oldSegments are expected to be tracked anymore, so this index 
should remove the reference (there is normal table 2i to pick up the job)
+        oldSegments.forEach(s -> segmentIndexes.remove(s.id()));
+    }
+
+    public NavigableMap<IndexRange, Set<TxnId>> search(int storeId, 
AccordRoutingKey key)
+    {
+        return search(storeId, key.table(), 
OrderedRouteSerializer.serializeRoutingKeyNoTable(key));
+    }
+
+    private synchronized NavigableMap<IndexRange, Set<TxnId>> search(int 
storeId, TableId tableId, byte[] key)
+    {
+        TreeMap<IndexRange, Set<TxnId>> matches = new TreeMap<>();
+        segmentIndexes.values().forEach(s -> s.search(storeId, tableId, key, e 
-> matches.computeIfAbsent(e.getKey(), i -> new 
HashSet<>()).add(e.getValue())));
+        return matches.isEmpty() ? Collections.emptyNavigableMap() : matches;
+    }
+
+    public NavigableMap<IndexRange, Set<TxnId>> search(int storeId, 
AccordRoutingKey start, AccordRoutingKey end)
+    {
+        return search(storeId, start.table(), 
OrderedRouteSerializer.serializeRoutingKeyNoTable(start), 
OrderedRouteSerializer.serializeRoutingKeyNoTable(end));
+    }
+
+    private synchronized NavigableMap<IndexRange, Set<TxnId>> search(int 
storeId, TableId tableId, byte[] start, byte[] end)
+    {
+        TreeMap<IndexRange, Set<TxnId>> matches = new TreeMap<>();
+        segmentIndexes.values().forEach(s -> s.search(storeId, tableId, start, 
end, e -> matches.computeIfAbsent(e.getKey(), i -> new 
HashSet<>()).add(e.getValue())));
+        return matches.isEmpty() ? Collections.emptyNavigableMap() : matches;
+    }
+
+    public synchronized void truncateForTesting()
+    {
+        segmentIndexes.clear();
+    }
+
+    @Override
+    public void intersects(int commandStoreId, TokenRange range, TxnId 
minTxnId, Timestamp maxTxnId, Consumer<TxnId> forEach)
+    {
+        NavigableMap<IndexRange, Set<TxnId>> result = search(commandStoreId, 
range.start(), range.end());
+        TreeSet<TxnId> matches = new TreeSet<>();
+        result.values().forEach(s -> matches.addAll(s));
+        consume(matches.iterator(), minTxnId, maxTxnId, forEach);
+    }
+
+    @Override
+    public void intersects(int commandStoreId, AccordRoutingKey key, TxnId 
minTxnId, Timestamp maxTxnId, Consumer<TxnId> forEach)
+    {
+        NavigableMap<IndexRange, Set<TxnId>> result = search(commandStoreId, 
key);
+        TreeSet<TxnId> matches = new TreeSet<>();
+        result.values().forEach(s -> matches.addAll(s));
+        consume(matches.iterator(), minTxnId, maxTxnId, forEach);
+    }
+
+    private void consume(Iterator<TxnId> it, TxnId minTxnId, Timestamp 
maxTxnId, Consumer<TxnId> forEach)
+    {
+        while (it.hasNext())
+        {
+            TxnId next = it.next();
+            if (next.compareTo(minTxnId) >= 0 && next.compareTo(maxTxnId) < 0)
+                forEach.accept(next);
+        }
+    }
+
+    private static class SegmentIndex
+    {
+        private final Int2ObjectHashMap<StoreIndex> storeIndexes = new 
Int2ObjectHashMap<>();
+
+        private SegmentIndex(long segment)
+        {
+        }
+
+        public void add(int commandStoreId, TxnId id, Route<?> route)
+        {
+            storeIndexes.computeIfAbsent(commandStoreId, 
StoreIndex::new).add(id, route);
+        }
+
+        public void search(int storeId, TableId tableId, byte[] start, byte[] 
end, Consumer<Map.Entry<IndexRange, TxnId>> fn)
+        {
+            StoreIndex idx = storeIndexes.get(storeId);
+            if (idx == null) return;
+            idx.search(tableId, start, end, fn);
+        }
+
+        public void search(int storeId, TableId tableId, byte[] key, 
Consumer<Map.Entry<IndexRange, TxnId>> fn)
+        {
+            StoreIndex idx = storeIndexes.get(storeId);
+            if (idx == null) return;
+            idx.search(tableId, key, fn);
+        }
+    }
+
+    private static class StoreIndex
+    {
+        private final Map<TableId, TableIndex> tableIndex = new HashMap<>();
+
+        private StoreIndex(int commandStoreId)

Review Comment:
   nit (can be aloso done on commit): store id is unused; but might be useful 
for debugging.  



##########
src/java/org/apache/cassandra/journal/SegmentCompactor.java:
##########
@@ -30,5 +31,7 @@ static <K, V> SegmentCompactor<K, V> noop()
         return (SegmentCompactor<K, V>) NOOP;
     }
 
+    //TODO (review): org.apache.cassandra.journal.Compactor.run documents that 
this is nullable but no implementation returns null.... If we want to keep 
nullable I put it in the interface so you have to check, but if we want to 
remove I can go clean that assumption up.

Review Comment:
   I think at some point we were returning null, which was helping to make 
distinction between a no-op compaction (i.e. that should leave all segments ad 
they were) and compaction that replaces segment collection with an empty list. 
Right now we always return an empty list. I suspect that other _potential_ 
implementations may have different opinions. Maybe @iamaleksey can comment?



##########
src/java/org/apache/cassandra/service/accord/RouteInMemoryIndex.java:
##########
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.accord;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.function.Consumer;
+
+import accord.impl.CommandChange.Field;
+import accord.primitives.Routable;
+import accord.primitives.Route;
+import accord.primitives.Timestamp;
+import accord.primitives.TxnId;
+import accord.primitives.Unseekable;
+import accord.utils.Invariants;
+import org.agrona.collections.Int2ObjectHashMap;
+import org.agrona.collections.Long2ObjectHashMap;
+import org.apache.cassandra.index.accord.OrderedRouteSerializer;
+import org.apache.cassandra.index.accord.RouteJournalIndex;
+import org.apache.cassandra.journal.Journal;
+import org.apache.cassandra.journal.RecordPointer;
+import org.apache.cassandra.journal.StaticSegment;
+import org.apache.cassandra.schema.TableId;
+import org.apache.cassandra.service.accord.api.AccordRoutingKey;
+import org.apache.cassandra.utils.ByteArrayUtil;
+import org.apache.cassandra.utils.FastByteOperations;
+import org.apache.cassandra.utils.RTree;
+import org.apache.cassandra.utils.RangeTree;
+
+public class RouteInMemoryIndex<K extends JournalKey, V> implements 
RangeSearcher
+{
+    private final Long2ObjectHashMap<SegmentIndex> segmentIndexes = new 
Long2ObjectHashMap<>();
+
+    public boolean isSupported(JournalKey id)
+    {
+        return RouteJournalIndex.allowed(id);
+    }
+
+    public void onWrite(JournalKey id, Journal.Writer writer, RecordPointer 
pointer)
+    {
+        if (!RouteJournalIndex.allowed(id))
+            return;
+        AccordJournal.Writer saveCommandWriter = (AccordJournal.Writer) writer;
+        if (!saveCommandWriter.hasField(Field.PARTICIPANTS))
+            return;
+        Route<?> route = saveCommandWriter.after.participants().route();
+        if (route != null)
+            update(pointer.segment, id.commandStoreId, id.id, route);
+    }
+
+    public synchronized void update(long segment, int commandStoreId, TxnId 
id, Route<?> route)
+    {
+        if (!RouteJournalIndex.allowed(id))
+            return;
+        Invariants.nonNull(route, "route");
+        segmentIndexes.computeIfAbsent(segment, 
SegmentIndex::new).add(commandStoreId, id, route);
+    }
+
+    public synchronized void onCompact(Collection<StaticSegment<JournalKey, 
V>> oldSegments)
+    {
+        // As of this writing compact in accord journal takes StaticSegments, 
writes them to a SSTable, and pushes to a table;
+        // it then stops managing those segments... for this reason 
compactedSegments is normally empty and none of the
+        // oldSegments are expected to be tracked anymore, so this index 
should remove the reference (there is normal table 2i to pick up the job)
+        oldSegments.forEach(s -> segmentIndexes.remove(s.id()));
+    }
+
+    public NavigableMap<IndexRange, Set<TxnId>> search(int storeId, 
AccordRoutingKey key)
+    {
+        return search(storeId, key.table(), 
OrderedRouteSerializer.serializeRoutingKeyNoTable(key));
+    }
+
+    private synchronized NavigableMap<IndexRange, Set<TxnId>> search(int 
storeId, TableId tableId, byte[] key)
+    {
+        TreeMap<IndexRange, Set<TxnId>> matches = new TreeMap<>();
+        segmentIndexes.values().forEach(s -> s.search(storeId, tableId, key, e 
-> matches.computeIfAbsent(e.getKey(), i -> new 
HashSet<>()).add(e.getValue())));
+        return matches.isEmpty() ? Collections.emptyNavigableMap() : matches;
+    }
+
+    public NavigableMap<IndexRange, Set<TxnId>> search(int storeId, 
AccordRoutingKey start, AccordRoutingKey end)
+    {
+        return search(storeId, start.table(), 
OrderedRouteSerializer.serializeRoutingKeyNoTable(start), 
OrderedRouteSerializer.serializeRoutingKeyNoTable(end));
+    }
+
+    private synchronized NavigableMap<IndexRange, Set<TxnId>> search(int 
storeId, TableId tableId, byte[] start, byte[] end)
+    {
+        TreeMap<IndexRange, Set<TxnId>> matches = new TreeMap<>();
+        segmentIndexes.values().forEach(s -> s.search(storeId, tableId, start, 
end, e -> matches.computeIfAbsent(e.getKey(), i -> new 
HashSet<>()).add(e.getValue())));
+        return matches.isEmpty() ? Collections.emptyNavigableMap() : matches;
+    }
+
+    public synchronized void truncateForTesting()
+    {
+        segmentIndexes.clear();
+    }
+
+    @Override
+    public void intersects(int commandStoreId, TokenRange range, TxnId 
minTxnId, Timestamp maxTxnId, Consumer<TxnId> forEach)
+    {
+        NavigableMap<IndexRange, Set<TxnId>> result = search(commandStoreId, 
range.start(), range.end());
+        TreeSet<TxnId> matches = new TreeSet<>();
+        result.values().forEach(s -> matches.addAll(s));
+        consume(matches.iterator(), minTxnId, maxTxnId, forEach);
+    }
+
+    @Override
+    public void intersects(int commandStoreId, AccordRoutingKey key, TxnId 
minTxnId, Timestamp maxTxnId, Consumer<TxnId> forEach)
+    {
+        NavigableMap<IndexRange, Set<TxnId>> result = search(commandStoreId, 
key);
+        TreeSet<TxnId> matches = new TreeSet<>();
+        result.values().forEach(s -> matches.addAll(s));
+        consume(matches.iterator(), minTxnId, maxTxnId, forEach);
+    }
+
+    private void consume(Iterator<TxnId> it, TxnId minTxnId, Timestamp 
maxTxnId, Consumer<TxnId> forEach)
+    {
+        while (it.hasNext())
+        {
+            TxnId next = it.next();
+            if (next.compareTo(minTxnId) >= 0 && next.compareTo(maxTxnId) < 0)
+                forEach.accept(next);
+        }
+    }
+
+    private static class SegmentIndex
+    {
+        private final Int2ObjectHashMap<StoreIndex> storeIndexes = new 
Int2ObjectHashMap<>();
+
+        private SegmentIndex(long segment)
+        {
+        }
+
+        public void add(int commandStoreId, TxnId id, Route<?> route)
+        {
+            storeIndexes.computeIfAbsent(commandStoreId, 
StoreIndex::new).add(id, route);
+        }
+
+        public void search(int storeId, TableId tableId, byte[] start, byte[] 
end, Consumer<Map.Entry<IndexRange, TxnId>> fn)
+        {
+            StoreIndex idx = storeIndexes.get(storeId);
+            if (idx == null) return;
+            idx.search(tableId, start, end, fn);
+        }
+
+        public void search(int storeId, TableId tableId, byte[] key, 
Consumer<Map.Entry<IndexRange, TxnId>> fn)
+        {
+            StoreIndex idx = storeIndexes.get(storeId);
+            if (idx == null) return;
+            idx.search(tableId, key, fn);
+        }
+    }
+
+    private static class StoreIndex
+    {
+        private final Map<TableId, TableIndex> tableIndex = new HashMap<>();
+
+        private StoreIndex(int commandStoreId)
+        {
+        }
+
+        public void add(TxnId id, Route<?> route)
+        {
+            for (Unseekable keyOrRange : route)
+                add(id, keyOrRange);
+        }
+
+        private void add(TxnId id, Unseekable keyOrRange)
+        {
+            if (keyOrRange.domain() != Routable.Domain.Range)
+                throw new IllegalArgumentException("Unexpected domain: " + 
keyOrRange.domain());
+            TokenRange ts = (TokenRange) keyOrRange;
+            TableId tableId = ts.table();
+            tableIndex.computeIfAbsent(tableId, TableIndex::new).add(id, ts);
+        }
+
+        public void search(TableId tableId, byte[] start, byte[] end, 
Consumer<Map.Entry<IndexRange, TxnId>> fn)
+        {
+            TableIndex index = tableIndex.get(tableId);
+            if (index == null) return;
+            index.search(start, end, fn);
+        }
+
+        public void search(TableId tableId, byte[] key, 
Consumer<Map.Entry<IndexRange, TxnId>> fn)
+        {
+            TableIndex index = tableIndex.get(tableId);
+            if (index == null) return;
+            index.search(key, fn);
+        }
+    }
+
+    private static class TableIndex
+    {
+        private final RangeTree<byte[], IndexRange, TxnId> index = 
createRangeTree();
+
+        private TableIndex(TableId tableId)

Review Comment:
   nit: same as above with table id: unused, but might be useful as a field



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to