ifesdjeen commented on code in PR #3652:
URL: https://github.com/apache/cassandra/pull/3652#discussion_r1832813150
##########
src/java/org/apache/cassandra/db/marshal/CompositeType.java:
##########
@@ -250,7 +258,64 @@ public <V> ByteSource asComparableBytes(ValueAccessor<V>
accessor, V data, Versi
if (i * 2 + 1 < srcs.length)
srcs = Arrays.copyOfRange(srcs, 0, i * 2 + 1);
- return ByteSource.withTerminatorMaybeLegacy(version,
ByteSource.END_OF_STREAM, srcs);
+ return ByteSource.withTerminatorMaybeLegacy(version, END_OF_STREAM,
srcs);
+ }
+
+ @Override
+ public <V> byte[] asFlatComparableBytes(ValueAccessor<V> accessor, V data,
Version version)
+ {
+ if (data == null || accessor.isEmpty(data))
+ return null;
+
+ byte[] tmpBytes = tmpFlattenBuffer.get();
+ byte[] bytes = tmpBytes;
+ if (bytes == null) bytes = new byte[16];
+
+ int c = 0;
+ int length = accessor.size(data);
+
+ // statics go first
+ boolean isStatic = readIsStaticInternal(data, accessor);
+ int offset = startingOffsetInternal(isStatic);
+ bytes[c++] = (byte) (isStatic ? NEXT_COMPONENT_NULL : NEXT_COMPONENT);
+ bytes[c++] = (byte) (NEXT_COMPONENT);
+
+ int i = 0;
+ byte lastEoc = 0;
+ while (offset < length)
+ {
+ // Only the end-of-component byte of the last component of this
composite can be non-zero, so the
+ // component before can't have a non-zero end-of-component byte.
+ assert lastEoc == 0 : lastEoc;
+
+ int componentLength = accessor.getUnsignedShort(data, offset);
+ offset += 2;
+ ByteSource tmp = types.get(i).asComparableBytes(accessor,
accessor.slice(data, offset, componentLength), version);
+ while (true)
+ {
+ int b = tmp.next();
+ if (b == END_OF_STREAM) break;
+
+ if (c == bytes.length) bytes = Arrays.copyOf(bytes, c * 2);
+ bytes[c++] = (byte)b;
+ }
+ offset += componentLength;
+ lastEoc = accessor.getByte(data, offset);
+ offset += 1;
+ if (c == bytes.length) bytes = Arrays.copyOf(bytes, c * 2);
+ bytes[c++] = (byte) NEXT_COMPONENT;
+ bytes[c++] = (byte) (lastEoc & 0xFF ^ 0x80); // end-of-component
also takes part in comparison as signed byte
+ bytes[c++] = (byte) (offset < length ? NEXT_COMPONENT : version ==
Version.LEGACY ? END_OF_STREAM : TERMINATOR);
+ ++i;
+ }
+
+ byte[] result = Arrays.copyOf(bytes, c);
+ if (bytes != tmpBytes) tmpFlattenBuffer.set(bytes);
+ byte[] test = super.asFlatComparableBytes(accessor, data, version);
+ if (Invariants.isParanoid() && Invariants.testParanoia(LINEAR,
CONSTANT, LOW)) Invariants.checkState(Arrays.equals(test, result));
+ V roundtrip = fromComparableBytes(accessor,
ByteSource.peekable(ByteSource.of(result, version)), version);
Review Comment:
Do we want to have `roundTrip` here, or does it need to be under paranod
invariants?
##########
src/java/org/apache/cassandra/db/memtable/TrieMemtable.java:
##########
@@ -180,7 +180,7 @@ public void discard()
* commitLogSegmentPosition should only be null if this is a secondary
index, in which case it is *expected* to be null
*/
@Override
- public long put(PartitionUpdate update, UpdateTransaction indexer,
OpOrder.Group opGroup)
+ public long put(PartitionUpdate update, UpdateTransaction indexer,
OpOrder.Group opGroup, boolean assumeMissing)
Review Comment:
Should we throw here on `assumeMissing`?
##########
src/java/org/apache/cassandra/journal/ActiveSegment.java:
##########
@@ -488,12 +474,48 @@ void writeInternal(K id, ByteBuffer record, Set<Integer>
hosts)
}
}
- void awaitFlush(Timer waitingOnFlush)
+ void awaitDurable(Timer waitingOnFlush)
{
try (Timer.Context ignored = waitingOnFlush.time())
{
waitForFlush(start);
}
}
+
+ boolean isFsynced()
Review Comment:
nit: unused
##########
src/java/org/apache/cassandra/journal/Metadata.java:
##########
@@ -45,6 +46,7 @@ final class Metadata
private final Set<Integer> unmodifiableHosts;
private final Map<Integer, Integer> recordsPerHost;
+ private int fsyncLimit;
Review Comment:
Does not seem to be ever set?
##########
src/java/org/apache/cassandra/journal/ActiveSegment.java:
##########
@@ -44,16 +45,18 @@ final class ActiveSegment<K, V> extends Segment<K, V>
private final OpOrder appendOrder = new OpOrder();
// position in the buffer we are allocating from
- private volatile int allocateOffset = 0;
- private static final AtomicIntegerFieldUpdater<ActiveSegment>
allocateOffsetUpdater =
AtomicIntegerFieldUpdater.newUpdater(ActiveSegment.class, "allocateOffset");
+ private volatile long allocateOffset = 0;
Review Comment:
Would be great add some infrormation about this field, i.e. the fact that
higher bits are now indicative of in-progress. Maybe even addd accessors to
make it even easier to read the right part.
##########
src/java/org/apache/cassandra/service/accord/AccordCache.java:
##########
@@ -0,0 +1,1292 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.service.accord;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.IdentityHashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.TimeUnit;
+import java.util.function.BiConsumer;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+import java.util.function.ToLongFunction;
+import java.util.stream.Stream;
+
+import javax.annotation.Nullable;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import accord.api.RoutingKey;
+import accord.local.Command;
+import accord.local.cfk.CommandsForKey;
+import accord.primitives.Routable;
+import accord.primitives.SaveStatus;
+import accord.primitives.Txn;
+import accord.primitives.TxnId;
+import accord.utils.IntrusiveLinkedList;
+import accord.utils.Invariants;
+import accord.utils.QuadFunction;
+import accord.utils.TriFunction;
+import org.agrona.collections.Object2ObjectHashMap;
+import org.apache.cassandra.cache.CacheSize;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.exceptions.UnknownTableException;
+import org.apache.cassandra.io.util.DataInputBuffer;
+import org.apache.cassandra.metrics.AccordStateCacheMetrics;
+import org.apache.cassandra.metrics.CacheAccessMetrics;
+import org.apache.cassandra.service.accord.AccordCacheEntry.Status;
+import org.apache.cassandra.service.accord.events.CacheEvents;
+import
org.apache.cassandra.service.accord.serializers.CommandsForKeySerializer;
+import org.apache.cassandra.utils.NoSpamLogger;
+import org.apache.cassandra.utils.NoSpamLogger.NoSpamLogStatement;
+import org.apache.cassandra.utils.ObjectSizes;
+import org.apache.cassandra.utils.concurrent.Future;
+
+import static accord.utils.Invariants.checkState;
+import static org.apache.cassandra.net.MessagingService.current_version;
+import static
org.apache.cassandra.service.accord.AccordCacheEntry.Status.EVICTED;
+import static
org.apache.cassandra.service.accord.AccordCacheEntry.Status.LOADED;
+import static
org.apache.cassandra.service.accord.AccordCacheEntry.Status.MODIFIED;
+
+/**
+ * Cache for AccordCommand and AccordCommandsForKey, available memory is
shared between the two object types.
+ * </p>
+ * Supports dynamic object sizes. After each acquire/free cycle, the cacheable
objects size is recomputed to
+ * account for data added/removed during txn processing if it's modified flag
is set
+ *
+ * TODO (required): we only iterate over unreferenced entries
+ */
+public class AccordCache implements CacheSize
+{
+ private static final Logger logger =
LoggerFactory.getLogger(AccordCache.class);
+ private static final NoSpamLogStatement evictNoEvict =
NoSpamLogger.getStatement(logger, "Found and expired {} marked no evict, with
age {}, exceeding its expected max age of {}", 1L, TimeUnit.MINUTES);
+
+ // Debug mode to verify that loading from journal + system tables results
in
+ // functionally identical (or superceding) command to the one we've just
evicted.
+ private static boolean VALIDATE_LOAD_ON_EVICT = false;
+
+ @VisibleForTesting
+ public static void validateLoadOnEvict(boolean value)
+ {
+ VALIDATE_LOAD_ON_EVICT = value;
+ }
+
+ public interface Adapter<K, V, S>
+ {
+ @Nullable V load(AccordCommandStore commandStore, K key);
+ @Nullable Runnable save(AccordCommandStore commandStore, K key,
@Nullable V value, @Nullable Object shrunk);
+ // a result of null means we can immediately evict, without saving
+ @Nullable V quickShrink(V value);
Review Comment:
Could you add a small description of what shrinking is and rough rules of
thumb about how it functions?
##########
src/java/org/apache/cassandra/journal/ActiveSegment.java:
##########
@@ -364,10 +335,10 @@ boolean discardUnusedTail()
{
while (true)
{
- int prev = allocateOffset;
+ long prev = completeInProgress();
Review Comment:
nit: can be `int`; trying to weigh if it's best to cast to long or do two
casts to int. Probably does not matter much
##########
src/java/org/apache/cassandra/service/accord/AccordExecutor.java:
##########
@@ -0,0 +1,1002 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.accord;
+
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.function.BiConsumer;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+import java.util.function.IntFunction;
+
+import accord.api.Agent;
+import accord.api.RoutingKey;
+import accord.impl.TimestampsForKey;
+import accord.local.Command;
+import accord.local.cfk.CommandsForKey;
+import accord.primitives.TxnId;
+import accord.utils.ArrayBuffers.BufferList;
+import accord.utils.IntrusivePriorityHeap;
+import accord.utils.Invariants;
+import accord.utils.QuadConsumer;
+import accord.utils.QuadFunction;
+import accord.utils.QuintConsumer;
+import accord.utils.TriConsumer;
+import accord.utils.TriFunction;
+import org.agrona.collections.Object2ObjectHashMap;
+import org.apache.cassandra.cache.CacheSize;
+import org.apache.cassandra.concurrent.ExecutorPlus;
+import org.apache.cassandra.concurrent.ScheduledExecutors;
+import org.apache.cassandra.concurrent.Shutdownable;
+import org.apache.cassandra.metrics.AccordStateCacheMetrics;
+import org.apache.cassandra.utils.concurrent.AsyncPromise;
+import org.apache.cassandra.utils.concurrent.Future;
+
+import static
org.apache.cassandra.service.accord.AccordCacheEntry.Status.EVICTED;
+import static
org.apache.cassandra.service.accord.AccordCache.CommandAdapter.COMMAND_ADAPTER;
+import static
org.apache.cassandra.service.accord.AccordCache.CommandsForKeyAdapter.CFK_ADAPTER;
+import static
org.apache.cassandra.service.accord.AccordCache.registerJfrListener;
+import static org.apache.cassandra.service.accord.AccordTask.State.LOADING;
+import static
org.apache.cassandra.service.accord.AccordTask.State.SCANNING_RANGES;
+import static
org.apache.cassandra.service.accord.AccordTask.State.WAITING_TO_LOAD;
+import static
org.apache.cassandra.service.accord.AccordTask.State.WAITING_TO_RUN;
+import static org.apache.cassandra.utils.Clock.Global.nanoTime;
+
+public abstract class AccordExecutor implements CacheSize,
AccordCacheEntry.OnLoaded, AccordCacheEntry.OnSaved, Shutdownable
+{
+ public interface AccordExecutorFactory
+ {
+ AccordExecutor get(int executorId, Mode mode, int threads,
IntFunction<String> name, AccordStateCacheMetrics metrics,
ExecutorFunctionFactory loadExecutor, ExecutorFunctionFactory saveExecutor,
ExecutorFunctionFactory rangeLoadExecutor, Agent agent);
+ }
+
+ public enum Mode { RUN_WITH_LOCK, RUN_WITHOUT_LOCK }
+
+ public interface ExecutorFunction extends BiFunction<Task, Runnable,
Future<?>> {}
+ public interface ExecutorFunctionFactory extends Function<AccordExecutor,
ExecutorFunction> {}
+
+ // WARNING: this is a shared object, so close is NOT idempotent
+ public static final class ExclusiveGlobalCaches extends GlobalCaches
implements AutoCloseable
+ {
+ final Lock lock;
+
+ public ExclusiveGlobalCaches(Lock lock, AccordCache global,
AccordCache.Type<TxnId, Command, AccordSafeCommand> commands,
AccordCache.Type<RoutingKey, TimestampsForKey, AccordSafeTimestampsForKey>
timestampsForKey, AccordCache.Type<RoutingKey, CommandsForKey,
AccordSafeCommandsForKey> commandsForKey)
+ {
+ super(global, commands, timestampsForKey, commandsForKey);
+ this.lock = lock;
+ }
+
+ @Override
+ public void close()
+ {
+ lock.unlock();
+ }
+ }
+
+ public static class GlobalCaches
+ {
+ public final AccordCache global;
+ public final AccordCache.Type<TxnId, Command, AccordSafeCommand>
commands;
+ public final AccordCache.Type<RoutingKey, TimestampsForKey,
AccordSafeTimestampsForKey> timestampsForKey;
+ public final AccordCache.Type<RoutingKey, CommandsForKey,
AccordSafeCommandsForKey> commandsForKey;
+
+ public GlobalCaches(AccordCache global, AccordCache.Type<TxnId,
Command, AccordSafeCommand> commands, AccordCache.Type<RoutingKey,
TimestampsForKey, AccordSafeTimestampsForKey> timestampsForKey,
AccordCache.Type<RoutingKey, CommandsForKey, AccordSafeCommandsForKey>
commandsForKey)
+ {
+ this.global = global;
+ this.commands = commands;
+ this.timestampsForKey = timestampsForKey;
+ this.commandsForKey = commandsForKey;
+ }
+ }
+
+ final Lock lock;
+ final Agent agent;
+ final int executorId;
+ private final AccordCache cache;
+ private final ExecutorFunction loadExecutor;
+ private final ExecutorFunction rangeLoadExecutor;
+
+ private final TaskQueue<AccordTask<?>> scanningRanges = new
TaskQueue<>(SCANNING_RANGES); // never queried, just parked here while scanning
+ private final TaskQueue<AccordTask<?>> loading = new TaskQueue<>(LOADING);
// never queried, just parked here while loading
+
+ private final TaskQueue<AccordTask<?>> waitingToLoadRangeTxns = new
TaskQueue<>(WAITING_TO_LOAD);
+
+ private final TaskQueue<AccordTask<?>> waitingToLoad = new
TaskQueue<>(WAITING_TO_LOAD);
+ private final TaskQueue<Task> waitingToRun = new
TaskQueue<>(WAITING_TO_RUN);
+ private final Object2ObjectHashMap<AccordCommandStore, CommandStoreQueue>
commandStoreQueues = new Object2ObjectHashMap<>();
+
+ private final AccordCacheEntry.OnLoaded onRangeLoaded =
this::onRangeLoaded;
+ private final ExclusiveGlobalCaches caches;
+
+ /**
+ * The maximum total number of loads we can queue at once - this includes
loads for range transactions,
+ * which are subject to this limit as well as that imposed by {@link
#maxQueuedRangeLoads}
+ */
+ private int maxQueuedLoads = 64;
+ /**
+ * The maximum number of loads exclusively for range transactions we can
queue at once; the {@link #maxQueuedLoads} limit also applies.
+ */
+ private int maxQueuedRangeLoads = 8;
+
+ private long maxWorkingSetSizeInBytes;
+ private long maxWorkingCapacityInBytes;
+ private int nextPosition;
+ private int activeLoads, activeRangeLoads;
+ private boolean hasPausedLoading;
+ int tasks;
+ int running;
+
+ AccordExecutor(Lock lock, int executorId, AccordStateCacheMetrics metrics,
ExecutorFunctionFactory loadExecutor, ExecutorFunctionFactory saveExecutor,
ExecutorFunctionFactory rangeLoadExecutor, Agent agent)
+ {
+ this.lock = lock;
+ this.executorId = executorId;
+ this.cache = new AccordCache(alwaysNullTask(saveExecutor.apply(this)),
this, 0, metrics);
+ this.loadExecutor = loadExecutor.apply(this);
+ this.rangeLoadExecutor = rangeLoadExecutor.apply(this);
+ this.agent = agent;
+
+ final AccordCache.Type<TxnId, Command, AccordSafeCommand> commands;
+ final AccordCache.Type<RoutingKey, TimestampsForKey,
AccordSafeTimestampsForKey> timestampsForKey;
+ final AccordCache.Type<RoutingKey, CommandsForKey,
AccordSafeCommandsForKey> commandsForKey;
+ commands = cache.newType(TxnId.class, COMMAND_ADAPTER);
+ registerJfrListener(executorId, commands, "Command");
+ timestampsForKey = cache.newType(RoutingKey.class,
Review Comment:
nit: maybe we could add implementation of TFK cache, just like one for
commands and CFK, and move functional adapter to test code?
##########
src/java/org/apache/cassandra/service/accord/SavedCommand.java:
##########
@@ -296,6 +298,17 @@ static boolean getFieldChanged(Fields field, int oldFlags)
return (oldFlags & (0x10000 << field.ordinal())) != 0;
}
+ static EnumSet<Fields> getFieldsChanged(int flags)
Review Comment:
nit: seems to be unused
##########
src/java/org/apache/cassandra/hints/HintsBuffer.java:
##########
@@ -179,7 +179,19 @@ private Allocation allocate(int totalSize, OpOrder.Group
opGroup)
return new Allocation(offset, totalSize, opGroup);
}
- // allocate bytes in the slab, or return negative if not enough space
+ /**
Review Comment:
looks like you have added this description to a different `allocateBytes`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]