ifesdjeen commented on code in PR #3743: URL: https://github.com/apache/cassandra/pull/3743#discussion_r1918072293
########## src/java/org/apache/cassandra/service/accord/RouteInMemoryIndex.java: ########## @@ -0,0 +1,279 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.service.accord; + +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Map; +import java.util.NavigableMap; +import java.util.Set; +import java.util.TreeMap; +import java.util.TreeSet; +import java.util.function.Consumer; + +import accord.impl.CommandChange.Field; +import accord.primitives.Routable; +import accord.primitives.Route; +import accord.primitives.Timestamp; +import accord.primitives.TxnId; +import accord.primitives.Unseekable; +import accord.utils.Invariants; +import org.agrona.collections.Int2ObjectHashMap; +import org.agrona.collections.Long2ObjectHashMap; +import org.apache.cassandra.index.accord.OrderedRouteSerializer; +import org.apache.cassandra.index.accord.RouteJournalIndex; +import org.apache.cassandra.journal.Journal; +import org.apache.cassandra.journal.RecordPointer; +import org.apache.cassandra.journal.StaticSegment; +import org.apache.cassandra.schema.TableId; +import org.apache.cassandra.service.accord.api.AccordRoutingKey; +import org.apache.cassandra.utils.ByteArrayUtil; +import org.apache.cassandra.utils.FastByteOperations; +import org.apache.cassandra.utils.RTree; +import org.apache.cassandra.utils.RangeTree; + +public class RouteInMemoryIndex<K extends JournalKey, V> implements RangeSearcher +{ + private final Long2ObjectHashMap<SegmentIndex> segmentIndexes = new Long2ObjectHashMap<>(); + + public boolean isSupported(JournalKey id) + { + return RouteJournalIndex.allowed(id); + } + + public void onWrite(JournalKey id, Journal.Writer writer, RecordPointer pointer) + { + if (!RouteJournalIndex.allowed(id)) + return; + AccordJournal.Writer saveCommandWriter = (AccordJournal.Writer) writer; + if (!saveCommandWriter.hasField(Field.PARTICIPANTS)) + return; + Route<?> route = saveCommandWriter.after.participants().route(); + if (route != null) + update(pointer.segment, id.commandStoreId, id.id, route); + } + + public synchronized void update(long segment, int commandStoreId, TxnId id, Route<?> route) + { + if (!RouteJournalIndex.allowed(id)) + return; + Invariants.nonNull(route, "route"); + segmentIndexes.computeIfAbsent(segment, SegmentIndex::new).add(commandStoreId, id, route); + } + + public synchronized void onCompact(Collection<StaticSegment<JournalKey, V>> oldSegments) + { + // As of this writing compact in accord journal takes StaticSegments, writes them to a SSTable, and pushes to a table; + // it then stops managing those segments... for this reason compactedSegments is normally empty and none of the + // oldSegments are expected to be tracked anymore, so this index should remove the reference (there is normal table 2i to pick up the job) + oldSegments.forEach(s -> segmentIndexes.remove(s.id())); + } + + public NavigableMap<IndexRange, Set<TxnId>> search(int storeId, AccordRoutingKey key) + { + return search(storeId, key.table(), OrderedRouteSerializer.serializeRoutingKeyNoTable(key)); + } + + private synchronized NavigableMap<IndexRange, Set<TxnId>> search(int storeId, TableId tableId, byte[] key) + { + TreeMap<IndexRange, Set<TxnId>> matches = new TreeMap<>(); + segmentIndexes.values().forEach(s -> s.search(storeId, tableId, key, e -> matches.computeIfAbsent(e.getKey(), i -> new HashSet<>()).add(e.getValue()))); + return matches.isEmpty() ? Collections.emptyNavigableMap() : matches; + } + + public NavigableMap<IndexRange, Set<TxnId>> search(int storeId, AccordRoutingKey start, AccordRoutingKey end) + { + return search(storeId, start.table(), OrderedRouteSerializer.serializeRoutingKeyNoTable(start), OrderedRouteSerializer.serializeRoutingKeyNoTable(end)); + } + + private synchronized NavigableMap<IndexRange, Set<TxnId>> search(int storeId, TableId tableId, byte[] start, byte[] end) + { + TreeMap<IndexRange, Set<TxnId>> matches = new TreeMap<>(); + segmentIndexes.values().forEach(s -> s.search(storeId, tableId, start, end, e -> matches.computeIfAbsent(e.getKey(), i -> new HashSet<>()).add(e.getValue()))); + return matches.isEmpty() ? Collections.emptyNavigableMap() : matches; + } + + public synchronized void truncateForTesting() + { + segmentIndexes.clear(); + } + + @Override + public void intersects(int commandStoreId, TokenRange range, TxnId minTxnId, Timestamp maxTxnId, Consumer<TxnId> forEach) + { + NavigableMap<IndexRange, Set<TxnId>> result = search(commandStoreId, range.start(), range.end()); + TreeSet<TxnId> matches = new TreeSet<>(); + result.values().forEach(s -> matches.addAll(s)); + consume(matches.iterator(), minTxnId, maxTxnId, forEach); + } + + @Override + public void intersects(int commandStoreId, AccordRoutingKey key, TxnId minTxnId, Timestamp maxTxnId, Consumer<TxnId> forEach) + { + NavigableMap<IndexRange, Set<TxnId>> result = search(commandStoreId, key); + TreeSet<TxnId> matches = new TreeSet<>(); + result.values().forEach(s -> matches.addAll(s)); + consume(matches.iterator(), minTxnId, maxTxnId, forEach); + } + + private void consume(Iterator<TxnId> it, TxnId minTxnId, Timestamp maxTxnId, Consumer<TxnId> forEach) + { + while (it.hasNext()) + { + TxnId next = it.next(); + if (next.compareTo(minTxnId) >= 0 && next.compareTo(maxTxnId) < 0) + forEach.accept(next); + } + } + + private static class SegmentIndex + { + private final Int2ObjectHashMap<StoreIndex> storeIndexes = new Int2ObjectHashMap<>(); + + private SegmentIndex(long segment) + { + } + + public void add(int commandStoreId, TxnId id, Route<?> route) + { + storeIndexes.computeIfAbsent(commandStoreId, StoreIndex::new).add(id, route); + } + + public void search(int storeId, TableId tableId, byte[] start, byte[] end, Consumer<Map.Entry<IndexRange, TxnId>> fn) + { + StoreIndex idx = storeIndexes.get(storeId); + if (idx == null) return; + idx.search(tableId, start, end, fn); + } + + public void search(int storeId, TableId tableId, byte[] key, Consumer<Map.Entry<IndexRange, TxnId>> fn) + { + StoreIndex idx = storeIndexes.get(storeId); + if (idx == null) return; + idx.search(tableId, key, fn); + } + } + + private static class StoreIndex + { + private final Map<TableId, TableIndex> tableIndex = new HashMap<>(); + + private StoreIndex(int commandStoreId) Review Comment: nit (can be aloso done on commit): store id is unused; but might be useful for debugging. ########## src/java/org/apache/cassandra/journal/SegmentCompactor.java: ########## @@ -30,5 +31,7 @@ static <K, V> SegmentCompactor<K, V> noop() return (SegmentCompactor<K, V>) NOOP; } + //TODO (review): org.apache.cassandra.journal.Compactor.run documents that this is nullable but no implementation returns null.... If we want to keep nullable I put it in the interface so you have to check, but if we want to remove I can go clean that assumption up. Review Comment: I think at some point we were returning null, which was helping to make distinction between a no-op compaction (i.e. that should leave all segments ad they were) and compaction that replaces segment collection with an empty list. Right now we always return an empty list. I suspect that other _potential_ implementations may have different opinions. Maybe @iamaleksey can comment? ########## src/java/org/apache/cassandra/service/accord/RouteInMemoryIndex.java: ########## @@ -0,0 +1,279 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.service.accord; + +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Map; +import java.util.NavigableMap; +import java.util.Set; +import java.util.TreeMap; +import java.util.TreeSet; +import java.util.function.Consumer; + +import accord.impl.CommandChange.Field; +import accord.primitives.Routable; +import accord.primitives.Route; +import accord.primitives.Timestamp; +import accord.primitives.TxnId; +import accord.primitives.Unseekable; +import accord.utils.Invariants; +import org.agrona.collections.Int2ObjectHashMap; +import org.agrona.collections.Long2ObjectHashMap; +import org.apache.cassandra.index.accord.OrderedRouteSerializer; +import org.apache.cassandra.index.accord.RouteJournalIndex; +import org.apache.cassandra.journal.Journal; +import org.apache.cassandra.journal.RecordPointer; +import org.apache.cassandra.journal.StaticSegment; +import org.apache.cassandra.schema.TableId; +import org.apache.cassandra.service.accord.api.AccordRoutingKey; +import org.apache.cassandra.utils.ByteArrayUtil; +import org.apache.cassandra.utils.FastByteOperations; +import org.apache.cassandra.utils.RTree; +import org.apache.cassandra.utils.RangeTree; + +public class RouteInMemoryIndex<K extends JournalKey, V> implements RangeSearcher +{ + private final Long2ObjectHashMap<SegmentIndex> segmentIndexes = new Long2ObjectHashMap<>(); + + public boolean isSupported(JournalKey id) + { + return RouteJournalIndex.allowed(id); + } + + public void onWrite(JournalKey id, Journal.Writer writer, RecordPointer pointer) + { + if (!RouteJournalIndex.allowed(id)) + return; + AccordJournal.Writer saveCommandWriter = (AccordJournal.Writer) writer; + if (!saveCommandWriter.hasField(Field.PARTICIPANTS)) + return; + Route<?> route = saveCommandWriter.after.participants().route(); + if (route != null) + update(pointer.segment, id.commandStoreId, id.id, route); + } + + public synchronized void update(long segment, int commandStoreId, TxnId id, Route<?> route) + { + if (!RouteJournalIndex.allowed(id)) + return; + Invariants.nonNull(route, "route"); + segmentIndexes.computeIfAbsent(segment, SegmentIndex::new).add(commandStoreId, id, route); + } + + public synchronized void onCompact(Collection<StaticSegment<JournalKey, V>> oldSegments) + { + // As of this writing compact in accord journal takes StaticSegments, writes them to a SSTable, and pushes to a table; + // it then stops managing those segments... for this reason compactedSegments is normally empty and none of the + // oldSegments are expected to be tracked anymore, so this index should remove the reference (there is normal table 2i to pick up the job) + oldSegments.forEach(s -> segmentIndexes.remove(s.id())); + } + + public NavigableMap<IndexRange, Set<TxnId>> search(int storeId, AccordRoutingKey key) + { + return search(storeId, key.table(), OrderedRouteSerializer.serializeRoutingKeyNoTable(key)); + } + + private synchronized NavigableMap<IndexRange, Set<TxnId>> search(int storeId, TableId tableId, byte[] key) + { + TreeMap<IndexRange, Set<TxnId>> matches = new TreeMap<>(); + segmentIndexes.values().forEach(s -> s.search(storeId, tableId, key, e -> matches.computeIfAbsent(e.getKey(), i -> new HashSet<>()).add(e.getValue()))); + return matches.isEmpty() ? Collections.emptyNavigableMap() : matches; + } + + public NavigableMap<IndexRange, Set<TxnId>> search(int storeId, AccordRoutingKey start, AccordRoutingKey end) + { + return search(storeId, start.table(), OrderedRouteSerializer.serializeRoutingKeyNoTable(start), OrderedRouteSerializer.serializeRoutingKeyNoTable(end)); + } + + private synchronized NavigableMap<IndexRange, Set<TxnId>> search(int storeId, TableId tableId, byte[] start, byte[] end) + { + TreeMap<IndexRange, Set<TxnId>> matches = new TreeMap<>(); + segmentIndexes.values().forEach(s -> s.search(storeId, tableId, start, end, e -> matches.computeIfAbsent(e.getKey(), i -> new HashSet<>()).add(e.getValue()))); + return matches.isEmpty() ? Collections.emptyNavigableMap() : matches; + } + + public synchronized void truncateForTesting() + { + segmentIndexes.clear(); + } + + @Override + public void intersects(int commandStoreId, TokenRange range, TxnId minTxnId, Timestamp maxTxnId, Consumer<TxnId> forEach) + { + NavigableMap<IndexRange, Set<TxnId>> result = search(commandStoreId, range.start(), range.end()); + TreeSet<TxnId> matches = new TreeSet<>(); + result.values().forEach(s -> matches.addAll(s)); + consume(matches.iterator(), minTxnId, maxTxnId, forEach); + } + + @Override + public void intersects(int commandStoreId, AccordRoutingKey key, TxnId minTxnId, Timestamp maxTxnId, Consumer<TxnId> forEach) + { + NavigableMap<IndexRange, Set<TxnId>> result = search(commandStoreId, key); + TreeSet<TxnId> matches = new TreeSet<>(); + result.values().forEach(s -> matches.addAll(s)); + consume(matches.iterator(), minTxnId, maxTxnId, forEach); + } + + private void consume(Iterator<TxnId> it, TxnId minTxnId, Timestamp maxTxnId, Consumer<TxnId> forEach) + { + while (it.hasNext()) + { + TxnId next = it.next(); + if (next.compareTo(minTxnId) >= 0 && next.compareTo(maxTxnId) < 0) + forEach.accept(next); + } + } + + private static class SegmentIndex + { + private final Int2ObjectHashMap<StoreIndex> storeIndexes = new Int2ObjectHashMap<>(); + + private SegmentIndex(long segment) + { + } + + public void add(int commandStoreId, TxnId id, Route<?> route) + { + storeIndexes.computeIfAbsent(commandStoreId, StoreIndex::new).add(id, route); + } + + public void search(int storeId, TableId tableId, byte[] start, byte[] end, Consumer<Map.Entry<IndexRange, TxnId>> fn) + { + StoreIndex idx = storeIndexes.get(storeId); + if (idx == null) return; + idx.search(tableId, start, end, fn); + } + + public void search(int storeId, TableId tableId, byte[] key, Consumer<Map.Entry<IndexRange, TxnId>> fn) + { + StoreIndex idx = storeIndexes.get(storeId); + if (idx == null) return; + idx.search(tableId, key, fn); + } + } + + private static class StoreIndex + { + private final Map<TableId, TableIndex> tableIndex = new HashMap<>(); + + private StoreIndex(int commandStoreId) + { + } + + public void add(TxnId id, Route<?> route) + { + for (Unseekable keyOrRange : route) + add(id, keyOrRange); + } + + private void add(TxnId id, Unseekable keyOrRange) + { + if (keyOrRange.domain() != Routable.Domain.Range) + throw new IllegalArgumentException("Unexpected domain: " + keyOrRange.domain()); + TokenRange ts = (TokenRange) keyOrRange; + TableId tableId = ts.table(); + tableIndex.computeIfAbsent(tableId, TableIndex::new).add(id, ts); + } + + public void search(TableId tableId, byte[] start, byte[] end, Consumer<Map.Entry<IndexRange, TxnId>> fn) + { + TableIndex index = tableIndex.get(tableId); + if (index == null) return; + index.search(start, end, fn); + } + + public void search(TableId tableId, byte[] key, Consumer<Map.Entry<IndexRange, TxnId>> fn) + { + TableIndex index = tableIndex.get(tableId); + if (index == null) return; + index.search(key, fn); + } + } + + private static class TableIndex + { + private final RangeTree<byte[], IndexRange, TxnId> index = createRangeTree(); + + private TableIndex(TableId tableId) Review Comment: nit: same as above with table id: unused, but might be useful as a field -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]

