This is an automated email from the ASF dual-hosted git repository. maedhroz pushed a commit to branch cep-15-accord in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit f9f2d93447add98dc4784a331310245138312183 Author: Benedict Elliott Smith <bened...@apache.org> AuthorDate: Sun Jan 8 22:06:38 2023 +0000 Shard local CommandStores on contiguous ranges patch by Benedict; reviewed by Aleksey Yeschenko for CASSANDRA-18142 --- .build/include-accord.sh | 2 +- .../cassandra/db/marshal/ByteArrayAccessor.java | 2 + .../apache/cassandra/dht/AccordBytesSplitter.java | 89 ++++ .../org/apache/cassandra/dht/AccordSplitter.java | 103 ++++ .../cassandra/dht/ByteOrderedPartitioner.java | 8 + .../org/apache/cassandra/dht/IPartitioner.java | 3 + .../org/apache/cassandra/dht/LocalPartitioner.java | 8 + .../apache/cassandra/dht/Murmur3Partitioner.java | 22 + .../cassandra/dht/OrderPreservingPartitioner.java | 65 ++- .../apache/cassandra/dht/RandomPartitioner.java | 20 + src/java/org/apache/cassandra/dht/Splitter.java | 8 +- src/java/org/apache/cassandra/schema/TableId.java | 2 +- .../cassandra/service/accord/AccordCommand.java | 16 +- .../service/accord/AccordCommandStore.java | 516 +++++++++++---------- .../service/accord/AccordCommandStores.java | 51 +- .../cassandra/service/accord/AccordKeyspace.java | 22 +- .../service/accord/AccordObjectSizes.java | 3 +- .../service/accord/AccordSerializers.java | 2 +- .../cassandra/service/accord/AccordService.java | 6 + .../service/accord/AccordTopologyUtils.java | 52 +-- .../cassandra/service/accord/ListenerProxy.java | 9 +- .../cassandra/service/accord/TokenRange.java | 5 +- .../service/accord/api/AccordRoutableKey.java | 36 +- .../service/accord/api/AccordRoutingKey.java | 105 +++-- .../cassandra/service/accord/api/PartitionKey.java | 26 +- .../service/accord/async/AsyncOperation.java | 4 +- .../cassandra/service/accord/txn/TxnData.java | 17 +- .../cassandra/service/accord/txn/TxnNamedRead.java | 2 +- .../cassandra/service/accord/txn/TxnUpdate.java | 2 +- .../cassandra/dht/ByteOrderedPartitionerTest.java | 16 + .../apache/cassandra/dht/LengthPartitioner.java | 34 +- .../dht/OrderPreservingPartitionerTest.java | 20 +- .../apache/cassandra/dht/PartitionerTestCase.java | 115 +++++ .../service/accord/AccordCommandTest.java | 7 +- .../cassandra/service/accord/AccordTestUtils.java | 81 +--- .../service/accord/AccordTopologyTest.java | 4 +- .../service/accord/api/AccordKeyTest.java | 40 +- .../service/accord/async/AsyncOperationTest.java | 4 +- .../accord/serializers/CommandSerializersTest.java | 5 +- .../service/accord/txn/AbstractKeySortedTest.java | 2 +- 40 files changed, 1005 insertions(+), 529 deletions(-) diff --git a/.build/include-accord.sh b/.build/include-accord.sh index b1409603fb..37bdcbe079 100755 --- a/.build/include-accord.sh +++ b/.build/include-accord.sh @@ -25,7 +25,7 @@ set -o nounset bin="$(cd "$(dirname "$0")" > /dev/null; pwd)" accord_repo='https://github.com/apache/cassandra-accord.git' -accord_branch='63c37e20cfe66a421c1b07ba1f430a9e6aabe4c5' +accord_branch='804a77d32c8ae45751a3a7f450b372560f08cacc' accord_src="$bin/cassandra-accord" checkout() { diff --git a/src/java/org/apache/cassandra/db/marshal/ByteArrayAccessor.java b/src/java/org/apache/cassandra/db/marshal/ByteArrayAccessor.java index d7108992da..4926027a7b 100644 --- a/src/java/org/apache/cassandra/db/marshal/ByteArrayAccessor.java +++ b/src/java/org/apache/cassandra/db/marshal/ByteArrayAccessor.java @@ -25,6 +25,7 @@ import java.nio.charset.Charset; import java.util.Arrays; import java.util.UUID; +import accord.utils.Invariants; import org.apache.cassandra.db.Digest; import org.apache.cassandra.db.TypeSizes; import org.apache.cassandra.io.util.DataInputPlus; @@ -107,6 +108,7 @@ public class ByteArrayAccessor implements ValueAccessor<byte[]> @Override public byte[] slice(byte[] input, int offset, int length) { + Invariants.checkArgument(offset + length <= input.length); return Arrays.copyOfRange(input, offset, offset + length); } diff --git a/src/java/org/apache/cassandra/dht/AccordBytesSplitter.java b/src/java/org/apache/cassandra/dht/AccordBytesSplitter.java new file mode 100644 index 0000000000..c27bc43599 --- /dev/null +++ b/src/java/org/apache/cassandra/dht/AccordBytesSplitter.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.dht; + +import java.math.BigInteger; + +import accord.api.RoutingKey; +import accord.primitives.Ranges; +import accord.utils.Invariants; +import org.apache.cassandra.service.accord.api.AccordRoutingKey; + +import static accord.utils.Invariants.checkArgument; +import static java.math.BigInteger.ONE; +import static java.math.BigInteger.ZERO; + +public class AccordBytesSplitter extends AccordSplitter +{ + final int byteLength; + + protected AccordBytesSplitter(Ranges ranges) + { + int bytesLength = 0; + for (accord.primitives.Range range : ranges) + { + bytesLength = Integer.max(bytesLength, byteLength(range.start())); + bytesLength = Integer.max(bytesLength, byteLength(range.end())); + } + this.byteLength = bytesLength; + } + + @Override + BigInteger minimumValue() + { + return ZERO; + } + + @Override + BigInteger maximumValue() + { + return ONE.shiftLeft(8 * byteLength).subtract(ONE); + } + + @Override + BigInteger valueForToken(Token token) + { + byte[] bytes = ((ByteOrderedPartitioner.BytesToken) token).token; + checkArgument(bytes.length <= byteLength); + BigInteger value = ZERO; + for (int i = 0 ; i < bytes.length ; ++i) + value = value.add(BigInteger.valueOf(bytes[i] & 0xffL).shiftLeft((byteLength - 1 - i) * 8)); + return value; + } + + @Override + Token tokenForValue(BigInteger value) + { + Invariants.checkArgument(value.compareTo(ZERO) >= 0); + byte[] bytes = new byte[byteLength]; + for (int i = 0 ; i < bytes.length ; ++i) + bytes[i] = value.shiftRight((byteLength - 1 - i) * 8).byteValue(); + return new ByteOrderedPartitioner.BytesToken(bytes); + } + + private static int byteLength(RoutingKey routingKey) + { + return byteLength(((AccordRoutingKey) routingKey).token()); + } + + private static int byteLength(Token token) + { + return ((ByteOrderedPartitioner.BytesToken) token).token.length; + } +} diff --git a/src/java/org/apache/cassandra/dht/AccordSplitter.java b/src/java/org/apache/cassandra/dht/AccordSplitter.java new file mode 100644 index 0000000000..232a47d454 --- /dev/null +++ b/src/java/org/apache/cassandra/dht/AccordSplitter.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.dht; + +import java.math.BigInteger; + +import accord.local.ShardDistributor; +import org.apache.cassandra.service.accord.TokenRange; +import org.apache.cassandra.service.accord.api.AccordRoutingKey; +import org.apache.cassandra.service.accord.api.AccordRoutingKey.SentinelKey; +import org.apache.cassandra.service.accord.api.AccordRoutingKey.TokenKey; + +import static java.math.BigInteger.ZERO; + +public abstract class AccordSplitter implements ShardDistributor.EvenSplit.Splitter<BigInteger> +{ + abstract BigInteger valueForToken(Token token); + abstract Token tokenForValue(BigInteger value); + abstract BigInteger minimumValue(); + abstract BigInteger maximumValue(); + + @Override + public BigInteger sizeOf(accord.primitives.Range range) + { + // note: minimum value + BigInteger start = range.start() instanceof SentinelKey ? minimumValue() : valueForToken(((AccordRoutingKey)range.start()).token()); + BigInteger end = range.end() instanceof SentinelKey ? maximumValue() : valueForToken(((AccordRoutingKey)range.end()).token()); + return end.subtract(start); + } + + @Override + public accord.primitives.Range subRange(accord.primitives.Range range, BigInteger startOffset, BigInteger endOffset) + { + AccordRoutingKey startBound = (AccordRoutingKey)range.start(); + AccordRoutingKey endBound = (AccordRoutingKey)range.end(); + + BigInteger start = startBound instanceof SentinelKey ? minimumValue() : valueForToken(startBound.token()); + BigInteger end = endBound instanceof SentinelKey ? maximumValue() : valueForToken(endBound.token()); + BigInteger sizeOfRange = end.subtract(start); + + String keyspace = startBound.keyspace(); + return new TokenRange(startOffset.equals(ZERO) ? startBound : new TokenKey(keyspace, tokenForValue(start.add(startOffset))), + endOffset.equals(sizeOfRange) ? endBound : new TokenKey(keyspace, tokenForValue(start.add(endOffset)))); + } + + @Override + public BigInteger zero() + { + return ZERO; + } + + @Override + public BigInteger add(BigInteger a, BigInteger b) + { + return a.add(b); + } + + @Override + public BigInteger subtract(BigInteger a, BigInteger b) + { + return a.subtract(b); + } + + @Override + public BigInteger divide(BigInteger a, int i) + { + return a.divide(BigInteger.valueOf(i)); + } + + @Override + public BigInteger multiply(BigInteger a, int i) + { + return a.multiply(BigInteger.valueOf(i)); + } + + @Override + public int min(BigInteger v, int i) + { + return v.min(BigInteger.valueOf(i)).intValue(); + } + + @Override + public int compare(BigInteger a, BigInteger b) + { + return a.compareTo(b); + } +} diff --git a/src/java/org/apache/cassandra/dht/ByteOrderedPartitioner.java b/src/java/org/apache/cassandra/dht/ByteOrderedPartitioner.java index 788763635a..9732da9475 100644 --- a/src/java/org/apache/cassandra/dht/ByteOrderedPartitioner.java +++ b/src/java/org/apache/cassandra/dht/ByteOrderedPartitioner.java @@ -17,6 +17,7 @@ */ package org.apache.cassandra.dht; +import accord.primitives.Ranges; import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.schema.Schema; import org.apache.cassandra.db.BufferDecoratedKey; @@ -44,6 +45,7 @@ import java.util.List; import java.util.Map; import java.util.Random; import java.util.concurrent.ThreadLocalRandom; +import java.util.function.Function; import com.google.common.collect.Maps; @@ -386,4 +388,10 @@ public class ByteOrderedPartitioner implements IPartitioner { return BytesType.instance; } + + @Override + public Function<Ranges, AccordSplitter> accordSplitter() + { + return AccordBytesSplitter::new; + } } diff --git a/src/java/org/apache/cassandra/dht/IPartitioner.java b/src/java/org/apache/cassandra/dht/IPartitioner.java index 0d32dc9c31..a2596f22ed 100644 --- a/src/java/org/apache/cassandra/dht/IPartitioner.java +++ b/src/java/org/apache/cassandra/dht/IPartitioner.java @@ -23,6 +23,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Random; +import java.util.function.Function; import org.apache.cassandra.db.DecoratedKey; import org.apache.cassandra.db.marshal.AbstractType; @@ -136,6 +137,8 @@ public interface IPartitioner return Optional.empty(); } + Function<accord.primitives.Ranges, AccordSplitter> accordSplitter(); + default boolean isFixedLength() { return false; diff --git a/src/java/org/apache/cassandra/dht/LocalPartitioner.java b/src/java/org/apache/cassandra/dht/LocalPartitioner.java index ad8461962d..2daf45e7ac 100644 --- a/src/java/org/apache/cassandra/dht/LocalPartitioner.java +++ b/src/java/org/apache/cassandra/dht/LocalPartitioner.java @@ -22,7 +22,9 @@ import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Random; +import java.util.function.Function; +import accord.primitives.Ranges; import org.apache.cassandra.db.DecoratedKey; import org.apache.cassandra.db.CachedHashDecoratedKey; import org.apache.cassandra.db.marshal.AbstractType; @@ -207,4 +209,10 @@ public class LocalPartitioner implements IPartitioner return EMPTY_SIZE + ObjectSizes.sizeOnHeapOf(token); } } + + @Override + public Function<Ranges, AccordSplitter> accordSplitter() + { + return AccordBytesSplitter::new; + } } diff --git a/src/java/org/apache/cassandra/dht/Murmur3Partitioner.java b/src/java/org/apache/cassandra/dht/Murmur3Partitioner.java index 1f7f3605e9..4101449b1c 100644 --- a/src/java/org/apache/cassandra/dht/Murmur3Partitioner.java +++ b/src/java/org/apache/cassandra/dht/Murmur3Partitioner.java @@ -23,7 +23,9 @@ import java.math.BigInteger; import java.nio.ByteBuffer; import java.util.*; import java.util.concurrent.ThreadLocalRandom; +import java.util.function.Function; +import accord.primitives.Ranges; import org.apache.cassandra.db.DecoratedKey; import org.apache.cassandra.db.PreHashedDecoratedKey; import org.apache.cassandra.db.TypeSizes; @@ -58,6 +60,8 @@ public class Murmur3Partitioner implements IPartitioner private final Splitter splitter = new Splitter(this) { + final BigInteger MAX = BigInteger.valueOf(Long.MAX_VALUE), MIN = BigInteger.valueOf(Long.MIN_VALUE); + public Token tokenForValue(BigInteger value) { return new LongToken(value.longValue()); @@ -67,6 +71,18 @@ public class Murmur3Partitioner implements IPartitioner { return BigInteger.valueOf(((LongToken) token).token); } + + @Override + BigInteger minimumValue() + { + return MIN; + } + + @Override + BigInteger maximumValue() + { + return MAX; + } }; public DecoratedKey decorateKey(ByteBuffer key) @@ -448,4 +464,10 @@ public class Murmur3Partitioner implements IPartitioner { return Optional.of(splitter); } + + @Override + public Function<Ranges, AccordSplitter> accordSplitter() + { + return ignore -> splitter; + } } diff --git a/src/java/org/apache/cassandra/dht/OrderPreservingPartitioner.java b/src/java/org/apache/cassandra/dht/OrderPreservingPartitioner.java index 18cd94c388..edceee7d66 100644 --- a/src/java/org/apache/cassandra/dht/OrderPreservingPartitioner.java +++ b/src/java/org/apache/cassandra/dht/OrderPreservingPartitioner.java @@ -22,7 +22,10 @@ import java.nio.ByteBuffer; import java.nio.charset.CharacterCodingException; import java.util.*; import java.util.concurrent.ThreadLocalRandom; +import java.util.function.Function; +import accord.api.RoutingKey; +import accord.primitives.Ranges; import org.apache.cassandra.db.DecoratedKey; import org.apache.cassandra.db.CachedHashDecoratedKey; import org.apache.cassandra.db.marshal.AbstractType; @@ -32,6 +35,7 @@ import org.apache.cassandra.gms.VersionedValue; import org.apache.cassandra.schema.Schema; import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.service.StorageService; +import org.apache.cassandra.service.accord.api.AccordRoutingKey; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.bytecomparable.ByteComparable; import org.apache.cassandra.utils.bytecomparable.ByteSource; @@ -40,6 +44,11 @@ import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.ObjectSizes; import org.apache.cassandra.utils.Pair; +import static accord.utils.Invariants.checkArgument; +import static java.lang.Integer.max; +import static java.math.BigInteger.ONE; +import static java.math.BigInteger.ZERO; + public class OrderPreservingPartitioner implements IPartitioner { private static final String rndchars = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"; @@ -81,7 +90,7 @@ public class OrderPreservingPartitioner implements IPartitioner { assert str.length() <= sigchars; - BigInteger big = BigInteger.ZERO; + BigInteger big = ZERO; for (int i = 0; i < str.length(); i++) { int charpos = 16 * (sigchars - (i + 1)); @@ -276,4 +285,58 @@ public class OrderPreservingPartitioner implements IPartitioner { return UTF8Type.instance; } + + @Override + public Function<Ranges, AccordSplitter> accordSplitter() + { + return ranges -> new AccordSplitter() + { + final int charLength = ranges.stream().mapToInt(range -> max(charLength(range.start()), charLength(range.end()))) + .max().orElse(0); + + @Override + BigInteger valueForToken(Token token) + { + String chars = ((StringToken) token).token; + checkArgument(chars.length() <= charLength); + BigInteger value = ZERO; + for (int i = 0 ; i < chars.length() ; ++i) + value = value.add(BigInteger.valueOf(chars.charAt(i) & 0xffffL).shiftLeft((charLength - 1 - i) * 16)); + return value; + } + + @Override + Token tokenForValue(BigInteger value) + { + // TODO (required): test + checkArgument(value.compareTo(ZERO) >= 0); + char[] chars = new char[charLength]; + for (int i = 0 ; i < chars.length ; ++i) + chars[i] = (char) value.shiftRight((charLength - 1 - i) * 16).shortValue(); + return new StringToken(new String(chars)); + } + + @Override + BigInteger minimumValue() + { + return ZERO; + } + + @Override + BigInteger maximumValue() + { + return ONE.shiftLeft(charLength * 16).subtract(ONE); + } + }; + } + + private static int charLength(RoutingKey routingKey) + { + return charLength(((AccordRoutingKey) routingKey).token()); + } + + private static int charLength(Token token) + { + return ((StringToken) token).token.length(); + } } diff --git a/src/java/org/apache/cassandra/dht/RandomPartitioner.java b/src/java/org/apache/cassandra/dht/RandomPartitioner.java index 7d1e7505a1..180726348d 100644 --- a/src/java/org/apache/cassandra/dht/RandomPartitioner.java +++ b/src/java/org/apache/cassandra/dht/RandomPartitioner.java @@ -23,9 +23,11 @@ import java.math.BigInteger; import java.nio.ByteBuffer; import java.security.MessageDigest; import java.util.*; +import java.util.function.Function; import com.google.common.annotations.VisibleForTesting; +import accord.primitives.Ranges; import org.apache.cassandra.db.CachedHashDecoratedKey; import org.apache.cassandra.db.marshal.ByteArrayAccessor; import org.apache.cassandra.db.marshal.ByteBufferAccessor; @@ -92,6 +94,18 @@ public class RandomPartitioner implements IPartitioner { return ((BigIntegerToken)token).getTokenValue(); } + + @Override + BigInteger minimumValue() + { + return MINIMUM.getTokenValue(); + } + + @Override + BigInteger maximumValue() + { + return MAXIMUM; + } }; public DecoratedKey decorateKey(ByteBuffer key) @@ -368,6 +382,12 @@ public class RandomPartitioner implements IPartitioner return Optional.of(splitter); } + @Override + public Function<Ranges, AccordSplitter> accordSplitter() + { + return ignore -> splitter; + } + private static BigInteger hashToBigInteger(ByteBuffer data) { MessageDigest messageDigest = localMD5Digest.get(); diff --git a/src/java/org/apache/cassandra/dht/Splitter.java b/src/java/org/apache/cassandra/dht/Splitter.java index e410a9cb29..91782f467a 100644 --- a/src/java/org/apache/cassandra/dht/Splitter.java +++ b/src/java/org/apache/cassandra/dht/Splitter.java @@ -36,7 +36,7 @@ import static java.util.stream.Collectors.toSet; /** * Partition splitter. */ -public abstract class Splitter +public abstract class Splitter extends AccordSplitter { private final IPartitioner partitioner; @@ -45,12 +45,6 @@ public abstract class Splitter this.partitioner = partitioner; } - @VisibleForTesting - protected abstract Token tokenForValue(BigInteger value); - - @VisibleForTesting - protected abstract BigInteger valueForToken(Token token); - @VisibleForTesting protected BigInteger tokensInRange(Range<Token> range) { diff --git a/src/java/org/apache/cassandra/schema/TableId.java b/src/java/org/apache/cassandra/schema/TableId.java index 53b4465bf3..cc4fd39c39 100644 --- a/src/java/org/apache/cassandra/schema/TableId.java +++ b/src/java/org/apache/cassandra/schema/TableId.java @@ -131,7 +131,7 @@ public class TableId implements Comparable<TableId> return position - offset; } - public static int serializedSize() + public final int serializedSize() { return 16; } diff --git a/src/java/org/apache/cassandra/service/accord/AccordCommand.java b/src/java/org/apache/cassandra/service/accord/AccordCommand.java index 2b4f36863d..2003e77ae1 100644 --- a/src/java/org/apache/cassandra/service/accord/AccordCommand.java +++ b/src/java/org/apache/cassandra/service/accord/AccordCommand.java @@ -56,6 +56,7 @@ import accord.primitives.Txn; import accord.primitives.TxnId; import accord.primitives.Writes; import accord.utils.DeterministicIdentitySet; +import org.apache.cassandra.service.accord.AccordCommandStore.SafeAccordCommandStore; import org.apache.cassandra.service.accord.api.PartitionKey; import org.apache.cassandra.service.accord.async.AsyncContext; import org.apache.cassandra.service.accord.store.StoredNavigableMap; @@ -342,7 +343,7 @@ public class AccordCommand extends Command implements AccordState<TxnId> blockingCommitOn.clearModifiedFlag(); waitingOnApply.clearModifiedFlag(); blockingApplyOn.clearModifiedFlag(); - storedListeners.clearModifiedFlag();; + storedListeners.clearModifiedFlag(); } @Override @@ -628,7 +629,7 @@ public class AccordCommand extends Command implements AccordState<TxnId> @Override protected void postApply(SafeCommandStore safeStore) { - AccordStateCache.Instance<TxnId, AccordCommand> cache = ((AccordCommandStore) safeStore).commandCache(); + AccordStateCache.Instance<TxnId, AccordCommand> cache = ((SafeAccordCommandStore) safeStore).commandStore().commandCache(); cache.cleanupWriteFuture(txnId); super.postApply(safeStore); } @@ -640,10 +641,7 @@ public class AccordCommand extends Command implements AccordState<TxnId> for (int i=0,mi=keys.size(); i<mi; i++) { PartitionKey key = (PartitionKey) keys.get(i); - if (((AccordCommandStore)safeStore).isCommandsForKeyInContext(key)) - continue; - - if (!safeStore.commandStore().hashIntersects(key)) + if (((SafeAccordCommandStore) safeStore).commandStore().isCommandsForKeyInContext(key)) continue; if (!ranges.contains(key)) @@ -672,7 +670,7 @@ public class AccordCommand extends Command implements AccordState<TxnId> private Future<Void> apply(SafeCommandStore safeStore, boolean canReschedule) { - AccordStateCache.Instance<TxnId, AccordCommand> cache = ((AccordCommandStore) safeStore).commandCache(); + AccordStateCache.Instance<TxnId, AccordCommand> cache = ((SafeAccordCommandStore) safeStore).commandStore().commandCache(); Future<Void> future = cache.getWriteFuture(txnId); if (future != null) return future; @@ -700,7 +698,7 @@ public class AccordCommand extends Command implements AccordState<TxnId> @Override public Future<Data> read(SafeCommandStore safeStore) { - AccordStateCache.Instance<TxnId, AccordCommand> cache = ((AccordCommandStore) safeStore).commandCache(); + AccordStateCache.Instance<TxnId, AccordCommand> cache = ((SafeAccordCommandStore) safeStore).commandStore().commandCache(); Future<Data> future = cache.getReadFuture(txnId); if (future != null) return future; @@ -756,7 +754,7 @@ public class AccordCommand extends Command implements AccordState<TxnId> storedListeners.getView().forEach(l -> l.onChange(safeStore, this)); transientListeners.forEach(listener -> { PreLoadContext ctx = listener.listenerPreLoadContext(txnId()); - AsyncContext context = ((AccordCommandStore)safeStore).getContext(); + AsyncContext context = ((SafeAccordCommandStore)safeStore).context(); if (context.containsScopedItems(ctx)) { logger.trace("{}: synchronously updating listener {}", txnId(), listener); diff --git a/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java b/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java index 068b08eff8..f2b54492fa 100644 --- a/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java +++ b/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java @@ -27,8 +27,6 @@ import java.util.function.BinaryOperator; import java.util.function.Consumer; import java.util.function.Function; -import com.google.common.base.Preconditions; - import accord.api.Agent; import accord.api.DataStore; import accord.api.Key; @@ -36,6 +34,8 @@ import accord.api.ProgressLog; import accord.local.Command; import accord.local.CommandListener; import accord.local.CommandStore; +import accord.local.CommandStores.RangesForEpoch; +import accord.local.CommandStores.RangesForEpochHolder; import accord.local.CommandsForKey; import accord.local.NodeTimeService; import accord.local.PreLoadContext; @@ -48,6 +48,7 @@ import accord.primitives.Seekables; import accord.primitives.Timestamp; import accord.primitives.AbstractKeys; import accord.primitives.TxnId; +import accord.utils.Invariants; import org.apache.cassandra.service.accord.api.PartitionKey; import org.apache.cassandra.service.accord.async.AsyncContext; import org.apache.cassandra.service.accord.async.AsyncOperation; @@ -55,11 +56,257 @@ import org.apache.cassandra.utils.Clock; import org.apache.cassandra.utils.concurrent.Future; import org.apache.cassandra.utils.concurrent.UncheckedInterruptedException; -public class AccordCommandStore extends CommandStore implements SafeCommandStore +import static org.apache.cassandra.concurrent.ExecutorFactory.Global.executorFactory; + +public class AccordCommandStore extends CommandStore { - public static long maxCacheSize() + public class SafeAccordCommandStore implements SafeCommandStore { - return 5 << 20; // TODO: make configurable + final RangesForEpoch rangesForEpoch; + final AsyncContext context; + + SafeAccordCommandStore(RangesForEpoch rangesForEpoch, AsyncContext context) + { + this.rangesForEpoch = rangesForEpoch; + this.context = context; + } + + public AsyncContext context() + { + return context; + } + + @Override + public Command command(TxnId txnId) + { + AccordCommand command = getCommandInternal(txnId); + if (command.isEmpty()) + command.initialize(); + return command; + } + + @Override + public Command ifPresent(TxnId txnId) + { + AccordCommand command = getCommandInternal(txnId); + return !command.isEmpty() ? command : null; + } + + @Override + public Command ifLoaded(TxnId txnId) + { + AccordCommand command = commandCache.getOrNull(txnId); + if (command != null && command.isLoaded()) + { + getContext().commands.add(command); + return command; + } + return null; + } + + @Override + public CommandsForKey commandsForKey(Key key) + { + AccordCommandsForKey commandsForKey = getCommandsForKeyInternal(key); + if (commandsForKey.isEmpty()) + commandsForKey.initialize(); + return commandsForKey; + } + + @Override + public CommandsForKey maybeCommandsForKey(Key key) + { + AccordCommandsForKey commandsForKey = getCommandsForKeyInternal(key); + return !commandsForKey.isEmpty() ? commandsForKey : null; + } + + @Override + public void addAndInvokeListener(TxnId txnId, CommandListener listener) + { + AccordCommand.WriteOnly command = (AccordCommand.WriteOnly) getContext().commands.getOrCreateWriteOnly(txnId, (ignore, id) -> new AccordCommand.WriteOnly(id), commandStore()); + command.addListener(listener); + execute(listener.listenerPreLoadContext(txnId), store -> { + listener.onChange(store, store.command(txnId)); + }); + } + + @Override + public <T> T mapReduce(Routables<?, ?> keysOrRanges, Ranges slice, Function<CommandsForKey, T> map, BinaryOperator<T> reduce, T initialValue) + { + switch (keysOrRanges.kindOfContents()) { + default: + throw new AssertionError(); + case Key: + // TODO: efficiency + AbstractKeys<Key, ?> keys = (AbstractKeys<Key, ?>) keysOrRanges; + return keys.stream() + .filter(slice::contains) + .map(this::commandsForKey) + .map(map) + .reduce(initialValue, reduce); + case Range: + // TODO: implement + throw new UnsupportedOperationException(); + } + } + + public <T> T mapReduce(Routables<?, ?> keysOrRanges, Function<CommandsForKey, T> map, BinaryOperator<T> reduce, T initialValue) + { + switch (keysOrRanges.kindOfContents()) { + default: + throw new AssertionError(); + case Key: + AbstractKeys<Key, ?> keys = (AbstractKeys<Key, ?>) keysOrRanges; + return keys.stream() + .map(this::commandsForKey) + .map(map) + .reduce(initialValue, reduce); + case Range: + // TODO: implement + throw new UnsupportedOperationException(); + } + } + + public void forEach(Routables<?, ?> keysOrRanges, Consumer<CommandsForKey> forEach) + { + switch (keysOrRanges.kindOfContents()) { + default: + throw new AssertionError(); + case Key: + AbstractKeys<Key, ?> keys = (AbstractKeys<Key, ?>) keysOrRanges; + keys.forEach(key -> forEach.accept(commandsForKey(key))); + break; + case Range: + // TODO: implement + throw new UnsupportedOperationException(); + } + } + + public void forEach(Routable keyOrRange, Consumer<CommandsForKey> forEach) + { + switch (keyOrRange.kind()) + { + default: throw new AssertionError(); + case Key: + forEach.accept(commandsForKey((Key) keyOrRange)); + break; + case Range: + // TODO: implement + throw new UnsupportedOperationException(); + } + } + + @Override + public void forEach(Routables<?, ?> keysOrRanges, Ranges slice, Consumer<CommandsForKey> forEach) + { + switch (keysOrRanges.kindOfContents()) { + default: + throw new AssertionError(); + case Key: + AbstractKeys<Key, ?> keys = (AbstractKeys<Key, ?>) keysOrRanges; + keys.forEach(slice, key -> { + forEach.accept(commandsForKey(key)); + }); + break; + case Range: + // TODO: implement + throw new UnsupportedOperationException(); + } + } + + @Override + public void forEach(Routable keyOrRange, Ranges slice, Consumer<CommandsForKey> forEach) + { + switch (keyOrRange.kind()) + { + default: throw new AssertionError(); + case Key: + Key key = (Key) keyOrRange; + if (slice.contains(key)) + forEach.accept(commandsForKey(key)); + break; + case Range: + // TODO: implement + throw new UnsupportedOperationException(); + } + } + + @Override + public AccordCommandStore commandStore() + { + return AccordCommandStore.this; + } + + @Override + public DataStore dataStore() + { + return dataStore; + } + + @Override + public Agent agent() + { + return agent; + } + + @Override + public ProgressLog progressLog() + { + return progressLog; + } + + @Override + public RangesForEpoch ranges() + { + return rangesForEpoch; + } + + @Override + public long latestEpoch() + { + return time.epoch(); + } + + @Override + public Timestamp preaccept(TxnId txnId, Seekables<?, ?> keys) + { + Timestamp max = maxConflict(keys); + long epoch = latestEpoch(); + if (txnId.compareTo(max) > 0 && txnId.epoch >= epoch && !agent.isExpired(txnId, time.now())) + return txnId; + + return time.uniqueNow(max); + } + + @Override + public Future<Void> execute(PreLoadContext context, Consumer<? super SafeCommandStore> consumer) + { + return AccordCommandStore.this.execute(context, consumer); + } + + @Override + public <T> Future<T> submit(PreLoadContext context, Function<? super SafeCommandStore, T> function) + { + return AccordCommandStore.this.submit(context, function); + } + + @Override + public NodeTimeService time() + { + return time; + } + + public Timestamp maxConflict(Seekables<?, ?> keys) + { + // TODO: Seekables + // TODO: efficiency + return ((Keys)keys).stream() + .map(this::maybeCommandsForKey) + .filter(Objects::nonNull) + .map(CommandsForKey::max) + .max(Comparator.naturalOrder()) + .orElse(Timestamp.NONE); + } } private static long getThreadId(ExecutorService executor) @@ -91,29 +338,25 @@ public class AccordCommandStore extends CommandStore implements SafeCommandStore private final Agent agent; private final DataStore dataStore; private final ProgressLog progressLog; - private final RangesForEpoch rangesForEpoch; + private final RangesForEpochHolder rangesForEpochHolder; public AccordCommandStore(int id, - int generation, - int index, - int numShards, NodeTimeService time, Agent agent, DataStore dataStore, ProgressLog.Factory progressLogFactory, - RangesForEpoch rangesForEpoch, - ExecutorService executor) + RangesForEpochHolder rangesForEpoch) { - super(id, generation, index, numShards); + super(id); this.time = time; this.agent = agent; this.dataStore = dataStore; this.progressLog = progressLogFactory.create(this); - this.rangesForEpoch = rangesForEpoch; - this.loggingId = String.format("[%s:%s]", generation, index); - this.executor = executor; + this.rangesForEpochHolder = rangesForEpoch; + this.loggingId = String.format("[%s]", id); + this.executor = executorFactory().sequential(CommandStore.class.getSimpleName() + '[' + id + ']'); this.threadId = getThreadId(this.executor); - this.stateCache = new AccordStateCache(maxCacheSize() / numShards); + this.stateCache = new AccordStateCache(0); this.commandCache = stateCache.instance(TxnId.class, AccordCommand.class, AccordCommand::new); @@ -128,14 +371,19 @@ public class AccordCommandStore extends CommandStore implements SafeCommandStore stateCache.setMaxSize(bytes); } + public SafeAccordCommandStore safeStore(AsyncContext context) + { + return new SafeAccordCommandStore(rangesForEpochHolder.get(), context); + } + public void checkInStoreThread() { - Preconditions.checkState(Thread.currentThread().getId() == threadId); + Invariants.checkState(Thread.currentThread().getId() == threadId); } public void checkNotInStoreThread() { - Preconditions.checkState(Thread.currentThread().getId() != threadId); + Invariants.checkState(Thread.currentThread().getId() != threadId); } public ExecutorService executor() @@ -155,19 +403,19 @@ public class AccordCommandStore extends CommandStore implements SafeCommandStore public void setContext(AsyncContext context) { - Preconditions.checkState(currentCtx == null); + Invariants.checkState(currentCtx == null); currentCtx = context; } public AsyncContext getContext() { - Preconditions.checkState(currentCtx != null); + Invariants.checkState(currentCtx != null); return currentCtx; } public void unsetContext(AsyncContext context) { - Preconditions.checkState(currentCtx == context); + Invariants.checkState(currentCtx == context); currentCtx = null; } @@ -179,44 +427,14 @@ public class AccordCommandStore extends CommandStore implements SafeCommandStore private AccordCommand getCommandInternal(TxnId txnId) { - Preconditions.checkState(currentCtx != null); + Invariants.checkState(currentCtx != null); AccordCommand command = currentCtx.commands.get(txnId); if (command == null) throw new IllegalArgumentException("No command in context for txnId " + txnId); - - Preconditions.checkState(command.isLoaded() || (command.isReadOnly() && command.isPartiallyLoaded())); - - return command; - } - - @Override - public Command command(TxnId txnId) - { - AccordCommand command = getCommandInternal(txnId); - if (command.isEmpty()) - command.initialize(); + Invariants.checkState(command.isLoaded() || (command.isReadOnly() && command.isPartiallyLoaded())); return command; } - @Override - public Command ifPresent(TxnId txnId) - { - AccordCommand command = getCommandInternal(txnId); - return !command.isEmpty() ? command : null; - } - - @Override - public Command ifLoaded(TxnId txnId) - { - AccordCommand command = commandCache.getOrNull(txnId); - if (command != null && command.isLoaded()) - { - getContext().commands.add(command); - return command; - } - return null; - } - public boolean isCommandsForKeyInContext(PartitionKey key) { return currentCtx.commandsForKey.get(key) != null; @@ -230,102 +448,10 @@ public class AccordCommandStore extends CommandStore implements SafeCommandStore AccordCommandsForKey commandsForKey = currentCtx.commandsForKey.get((PartitionKey) key); if (commandsForKey == null) throw new IllegalArgumentException("No commandsForKey in context for key " + key); - Preconditions.checkState(commandsForKey.isLoaded()); + Invariants.checkState(commandsForKey.isLoaded()); return commandsForKey; } - @Override - public CommandsForKey commandsForKey(Key key) - { - AccordCommandsForKey commandsForKey = getCommandsForKeyInternal(key); - if (commandsForKey.isEmpty()) - commandsForKey.initialize(); - return commandsForKey; - } - - @Override - public CommandsForKey maybeCommandsForKey(Key key) - { - AccordCommandsForKey commandsForKey = getCommandsForKeyInternal(key); - return !commandsForKey.isEmpty() ? commandsForKey : null; - } - - @Override - public void addAndInvokeListener(TxnId txnId, CommandListener listener) - { - AccordCommand.WriteOnly command = (AccordCommand.WriteOnly) getContext().commands.getOrCreateWriteOnly(txnId, (ignore, id) -> new AccordCommand.WriteOnly(id), this); - command.addListener(listener); - execute(listener.listenerPreLoadContext(txnId), store -> { - listener.onChange(store, store.command(txnId)); - }); - } - - public <T> T mapReduce(Routables<?, ?> keysOrRanges, Ranges slice, Function<CommandsForKey, T> map, BinaryOperator<T> reduce, T initialValue) - { - switch (keysOrRanges.kindOfContents()) { - default: - throw new AssertionError(); - case Key: - // TODO: efficiency - AbstractKeys<Key, ?> keys = (AbstractKeys<Key, ?>) keysOrRanges; - return keys.stream() - .filter(slice::contains) - .filter(this::hashIntersects) - .map(this::commandsForKey) - .map(map) - .reduce(initialValue, reduce); - case Range: - // TODO: - throw new UnsupportedOperationException(); - } - } - - public void forEach(Routables<?, ?> keysOrRanges, Ranges slice, Consumer<CommandsForKey> forEach) - { - switch (keysOrRanges.kindOfContents()) { - default: - throw new AssertionError(); - case Key: - AbstractKeys<Key, ?> keys = (AbstractKeys<Key, ?>) keysOrRanges; - keys.forEach(slice, key -> { - if (hashIntersects(key)) - forEach.accept(commandsForKey(key)); - }); - break; - case Range: - // TODO: - throw new UnsupportedOperationException(); - } - } - - public void forEach(Routable keyOrRange, Ranges slice, Consumer<CommandsForKey> forEach) - { - switch (keyOrRange.kind()) - { - default: throw new AssertionError(); - case Key: - Key key = (Key) keyOrRange; - if (slice.contains(key)) - forEach.accept(commandsForKey(key)); - break; - case Range: - // TODO: - throw new UnsupportedOperationException(); - } - } - - @Override - public CommandStore commandStore() - { - return this; - } - - @Override - public DataStore dataStore() - { - return dataStore; - } - @Override public <T> Future<T> submit(PreLoadContext loadCtx, Function<? super SafeCommandStore, T> function) { @@ -340,53 +466,6 @@ public class AccordCommandStore extends CommandStore implements SafeCommandStore return agent; } - @Override - public ProgressLog progressLog() - { - return progressLog; - } - - @Override - public RangesForEpoch ranges() - { - return rangesForEpoch; - } - - @Override - public long latestEpoch() - { - return time.epoch(); - } - - @Override - public Timestamp preaccept(TxnId txnId, Seekables<?, ?> keys) - { - Timestamp max = maxConflict(keys); - long epoch = latestEpoch(); - if (txnId.compareTo(max) > 0 && txnId.epoch >= epoch && !agent.isExpired(txnId, time.now())) - return txnId; - - return time.uniqueNow(max); - } - - @Override - public NodeTimeService time() - { - return time; - } - - public Timestamp maxConflict(Seekables<?, ?> keys) - { - // TODO: Seekables - // TODO: efficiency - return ((Keys)keys).stream() - .map(this::maybeCommandsForKey) - .filter(Objects::nonNull) - .map(CommandsForKey::max) - .max(Comparator.naturalOrder()) - .orElse(Timestamp.NONE); - } - @Override public Future<Void> execute(PreLoadContext preLoadContext, Consumer<? super SafeCommandStore> consumer) { @@ -411,58 +490,9 @@ public class AccordCommandStore extends CommandStore implements SafeCommandStore } } - public <T> T mapReduce(Routables<?, ?> keysOrRanges, Function<CommandsForKey, T> map, BinaryOperator<T> reduce, T initialValue) { - switch (keysOrRanges.kindOfContents()) { - default: - throw new AssertionError(); - case Key: - AbstractKeys<Key, ?> keys = (AbstractKeys<Key, ?>) keysOrRanges; - return keys.stream() - .filter(this::hashIntersects) - .map(this::commandsForKey) - .map(map) - .reduce(initialValue, reduce); - case Range: - // TODO: implement - throw new UnsupportedOperationException(); - } - } - - public void forEach(Routables<?, ?> keysOrRanges, Consumer<CommandsForKey> forEach) - { - switch (keysOrRanges.kindOfContents()) { - default: - throw new AssertionError(); - case Key: - AbstractKeys<Key, ?> keys = (AbstractKeys<Key, ?>) keysOrRanges; - keys.forEach(key -> { - if (hashIntersects(key)) - forEach.accept(commandsForKey(key)); - }); - break; - case Range: - // TODO: implement - throw new UnsupportedOperationException(); - } - } - - public void forEach(Routable keyOrRange, Consumer<CommandsForKey> forEach) - { - switch (keyOrRange.kind()) - { - default: throw new AssertionError(); - case Key: - forEach.accept(commandsForKey((Key) keyOrRange)); - break; - case Range: - // TODO: implement - throw new UnsupportedOperationException(); - } - } - @Override public void shutdown() { - // executors are shutdown by AccordCommandStores + executor.shutdown(); } } diff --git a/src/java/org/apache/cassandra/service/accord/AccordCommandStores.java b/src/java/org/apache/cassandra/service/accord/AccordCommandStores.java index 24b77bcc5c..14dd0851c2 100644 --- a/src/java/org/apache/cassandra/service/accord/AccordCommandStores.java +++ b/src/java/org/apache/cassandra/service/accord/AccordCommandStores.java @@ -18,50 +18,50 @@ package org.apache.cassandra.service.accord; -import java.util.concurrent.ExecutorService; - import accord.api.Agent; import accord.api.DataStore; import accord.api.ProgressLog; import accord.local.AsyncCommandStores; -import accord.local.CommandStore; -import accord.local.Node; import accord.local.NodeTimeService; -import org.apache.cassandra.concurrent.ExecutorFactory; -import org.apache.cassandra.utils.ExecutorUtils; +import accord.local.ShardDistributor; +import accord.topology.Topology; +import org.apache.cassandra.service.accord.AccordCommandStore.SafeAccordCommandStore; public class AccordCommandStores extends AsyncCommandStores { - private final ExecutorService[] executors; + private long cacheSize; + AccordCommandStores(NodeTimeService time, Agent agent, DataStore store, + ShardDistributor shardDistributor, ProgressLog.Factory progressLogFactory) + { + super(time, agent, store, shardDistributor, progressLogFactory, AccordCommandStore::new); + setCacheSize(maxCacheSize()); + } - public AccordCommandStores(int numShards, Node node, Agent agent, DataStore store, - ProgressLog.Factory progressLogFactory) + synchronized void setCacheSize(long bytes) { - this(numShards, node, agent, store, progressLogFactory, executors(node, numShards)); + cacheSize = bytes; + refreshCacheSizes(); } - private AccordCommandStores(int numShards, NodeTimeService time, Agent agent, DataStore store, - ProgressLog.Factory progressLogFactory, ExecutorService[] executors) + synchronized void refreshCacheSizes() { - super(numShards, time, agent, store, progressLogFactory, - (id, generation, index, numShards1, time1, agent1, store1, progressLogFactory1, rangesForEpoch) - -> new AccordCommandStore(id, generation, index, numShards1, time1, agent1, store1, progressLogFactory1, rangesForEpoch, executors[index])); - this.executors = executors; + if (count() == 0) + return; + long perStore = cacheSize / count(); + // TODO (low priority, safety): we might transiently breach our limit if we increase one store before decreasing another + forEach(commandStore -> ((SafeAccordCommandStore) commandStore).commandStore().setCacheSize(perStore)); } - private static ExecutorService[] executors(Node node, int count) + private static long maxCacheSize() { - ExecutorService[] executors = new ExecutorService[count]; - for (int i=0; i<count; i++) - { - executors[i] = ExecutorFactory.Global.executorFactory().sequential(CommandStore.class.getSimpleName() + '[' + node + ':' + i + ']'); - } - return executors; + return 5 << 20; // TODO (required): make configurable } - void setCacheSize(long bytes) + @Override + public synchronized void updateTopology(Topology newTopology) { - forEach(commandStore -> ((AccordCommandStore) commandStore).setCacheSize(bytes)); + super.updateTopology(newTopology); + refreshCacheSizes(); } @Override @@ -69,6 +69,5 @@ public class AccordCommandStores extends AsyncCommandStores { super.shutdown(); //TODO shutdown isn't useful by itself, we need a way to "wait" as well. Should be AutoCloseable or offer awaitTermination as well (think Shutdownable interface) - ExecutorUtils.shutdown(executors); } } diff --git a/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java b/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java index f3382e17cf..12da1ffc10 100644 --- a/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java +++ b/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java @@ -133,8 +133,7 @@ public class AccordKeyspace parse(COMMANDS, "accord commands", "CREATE TABLE %s (" - + "store_generation int," - + "store_index int," + + "store_id int," + format("txn_id %s,", TIMESTAMP_TUPLE) + "status int," + "home_key blob," @@ -154,7 +153,7 @@ public class AccordKeyspace + "listeners set<blob>, " + format("blocking_commit_on set<%s>, ", TIMESTAMP_TUPLE) + format("blocking_apply_on set<%s>, ", TIMESTAMP_TUPLE) - + "PRIMARY KEY((store_generation, store_index, txn_id))" + + "PRIMARY KEY((store_id, txn_id))" + ')'); // TODO: naming is not very clearly distinct from the base serializers @@ -208,8 +207,7 @@ public class AccordKeyspace parse(COMMANDS_FOR_KEY, "accord commands per key", "CREATE TABLE %s (" - + "store_generation int, " - + "store_index int, " + + "store_id int, " + format("key %s, ", KEY_TUPLE) + format("max_timestamp %s static, ", TIMESTAMP_TUPLE) + format("last_executed_timestamp %s static, ", TIMESTAMP_TUPLE) @@ -219,7 +217,7 @@ public class AccordKeyspace + "series int, " + format("timestamp %s, ", TIMESTAMP_TUPLE) + "data blob, " - + "PRIMARY KEY((store_generation, store_index, key), series, timestamp)" + + "PRIMARY KEY((store_id, key), series, timestamp)" + ')'); private static class CommandsForKeyColumns @@ -504,8 +502,7 @@ public class AccordKeyspace timestampMicros, nowInSeconds, command.storedListeners, ListenerProxy::identifier); } - ByteBuffer key = CommandsColumns.keyComparator.make(commandStore.generation(), - commandStore.index(), + ByteBuffer key = CommandsColumns.keyComparator.make(commandStore.id(), serializeTimestamp(command.txnId())).serializeAsPartitionKey(); PartitionUpdate update = PartitionUpdate.singleRowUpdate(Commands, key, builder.build()); return new Mutation(update); @@ -563,13 +560,11 @@ public class AccordKeyspace public static UntypedResultSet loadCommandRow(CommandStore commandStore, TxnId txnId) { String cql = "SELECT * FROM %s.%s " + - "WHERE store_generation=? " + - "AND store_index=? " + + "WHERE store_id = ? " + "AND txn_id=(?, ?, ?, ?)"; return executeOnceInternal(String.format(cql, ACCORD_KEYSPACE_NAME, COMMANDS), - commandStore.generation(), - commandStore.index(), + commandStore.id(), txnId.epoch, txnId.real, txnId.logical, txnId.node.id); } @@ -650,8 +645,7 @@ public class AccordKeyspace private static DecoratedKey makeKey(CommandStore commandStore, PartitionKey key) { - ByteBuffer pk = CommandsForKeyColumns.keyComparator.make(commandStore.generation(), - commandStore.index(), + ByteBuffer pk = CommandsForKeyColumns.keyComparator.make(commandStore.id(), serializeKey(key)).serializeAsPartitionKey(); return CommandsForKey.partitioner.decorateKey(pk); } diff --git a/src/java/org/apache/cassandra/service/accord/AccordObjectSizes.java b/src/java/org/apache/cassandra/service/accord/AccordObjectSizes.java index 983641d5a9..7f7a8a86e3 100644 --- a/src/java/org/apache/cassandra/service/accord/AccordObjectSizes.java +++ b/src/java/org/apache/cassandra/service/accord/AccordObjectSizes.java @@ -40,7 +40,6 @@ import accord.primitives.Timestamp; import accord.primitives.TxnId; import accord.primitives.Unseekables; import accord.primitives.Writes; -import org.apache.cassandra.schema.TableId; import org.apache.cassandra.service.accord.api.PartitionKey; import org.apache.cassandra.service.accord.api.AccordRoutingKey; import org.apache.cassandra.service.accord.api.AccordRoutingKey.TokenKey; @@ -62,7 +61,7 @@ public class AccordObjectSizes return ((AccordRoutingKey) key).estimatedSizeOnHeap(); } - private static final long EMPTY_KEY_RANGE_SIZE = ObjectSizes.measure(TokenRange.fullRange(TableId.generate())); + private static final long EMPTY_KEY_RANGE_SIZE = ObjectSizes.measure(TokenRange.fullRange("")); public static long range(Range range) { return EMPTY_KEY_RANGE_SIZE + key(range.start()) + key(range.end()); diff --git a/src/java/org/apache/cassandra/service/accord/AccordSerializers.java b/src/java/org/apache/cassandra/service/accord/AccordSerializers.java index 3eb3b9705e..dc48d4bae7 100644 --- a/src/java/org/apache/cassandra/service/accord/AccordSerializers.java +++ b/src/java/org/apache/cassandra/service/accord/AccordSerializers.java @@ -168,7 +168,7 @@ public class AccordSerializers @Override public long serializedSize(TableMetadata metadata, int version) { - return TableId.serializedSize(); + return metadata.id.serializedSize(); } }; } diff --git a/src/java/org/apache/cassandra/service/accord/AccordService.java b/src/java/org/apache/cassandra/service/accord/AccordService.java index fcc8a8cebc..0e99689d5a 100644 --- a/src/java/org/apache/cassandra/service/accord/AccordService.java +++ b/src/java/org/apache/cassandra/service/accord/AccordService.java @@ -31,6 +31,7 @@ import accord.coordinate.Timeout; import accord.impl.SimpleProgressLog; import accord.impl.SizeOfIntersectionSorter; import accord.local.Node; +import accord.local.ShardDistributor.EvenSplit; import accord.messages.Request; import accord.primitives.Txn; import org.apache.cassandra.config.DatabaseDescriptor; @@ -41,6 +42,7 @@ import org.apache.cassandra.concurrent.Shutdownable; import org.apache.cassandra.exceptions.WriteTimeoutException; import org.apache.cassandra.net.IVerbHandler; import org.apache.cassandra.service.accord.api.AccordAgent; +import org.apache.cassandra.service.accord.api.AccordRoutingKey.KeyspaceSplitter; import org.apache.cassandra.service.accord.api.AccordScheduler; import org.apache.cassandra.service.accord.txn.TxnData; import org.apache.cassandra.utils.Clock; @@ -49,6 +51,9 @@ import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.concurrent.Future; import org.apache.cassandra.utils.concurrent.UncheckedInterruptedException; +import static org.apache.cassandra.config.DatabaseDescriptor.getConcurrentAccordOps; +import static org.apache.cassandra.config.DatabaseDescriptor.getPartitioner; + public class AccordService implements Shutdownable { public final Node node; @@ -84,6 +89,7 @@ public class AccordService implements Shutdownable configService, AccordService::uniqueNow, () -> null, + new KeyspaceSplitter(new EvenSplit<>(getConcurrentAccordOps(), getPartitioner().accordSplitter())), new AccordAgent(), new Random(), scheduler, diff --git a/src/java/org/apache/cassandra/service/accord/AccordTopologyUtils.java b/src/java/org/apache/cassandra/service/accord/AccordTopologyUtils.java index 22bd631e31..bc6ec7e7b6 100644 --- a/src/java/org/apache/cassandra/service/accord/AccordTopologyUtils.java +++ b/src/java/org/apache/cassandra/service/accord/AccordTopologyUtils.java @@ -25,8 +25,6 @@ import java.util.List; import java.util.Set; import java.util.stream.Collectors; -import com.google.common.base.Preconditions; - import accord.topology.Shard; import accord.topology.Topology; import org.apache.cassandra.db.Keyspace; @@ -35,9 +33,6 @@ import org.apache.cassandra.locator.AbstractReplicationStrategy; import org.apache.cassandra.locator.EndpointsForToken; import org.apache.cassandra.locator.TokenMetadata; import org.apache.cassandra.schema.Schema; -import org.apache.cassandra.schema.SchemaConstants; -import org.apache.cassandra.schema.TableId; -import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.service.accord.api.AccordRoutingKey.SentinelKey; import org.apache.cassandra.service.accord.api.AccordRoutingKey.TokenKey; @@ -52,26 +47,23 @@ public class AccordTopologyUtils pending.stream().map(EndpointMapping::getId).collect(Collectors.toSet())); } - private static TokenRange minRange(TableId tableId, Token token) + private static TokenRange minRange(String keyspace, Token token) { - return new TokenRange(SentinelKey.min(tableId), new TokenKey(tableId, token)); + return new TokenRange(SentinelKey.min(keyspace), new TokenKey(keyspace, token)); } - private static TokenRange maxRange(TableId tableId, Token token) + private static TokenRange maxRange(String keyspace, Token token) { - return new TokenRange(new TokenKey(tableId, token), SentinelKey.max(tableId)); + return new TokenRange(new TokenKey(keyspace, token), SentinelKey.max(keyspace)); } - private static TokenRange range(TableId tableId, Token left, Token right) + private static TokenRange range(String keyspace, Token left, Token right) { - return new TokenRange(new TokenKey(tableId, left), new TokenKey(tableId, right)); + return new TokenRange(new TokenKey(keyspace, left), new TokenKey(keyspace, right)); } - public static List<Shard> createShards(TableMetadata tableMetadata, TokenMetadata tokenMetadata) + public static List<Shard> createShards(String keyspace, TokenMetadata tokenMetadata) { - TableId tableId = tableMetadata.id; - String keyspace = tableMetadata.keyspace; - AbstractReplicationStrategy replication = Keyspace.open(keyspace).getReplicationStrategy(); Set<Token> tokenSet = new HashSet<>(tokenMetadata.sortedTokens()); tokenSet.addAll(tokenMetadata.getBootstrapTokens().keySet()); @@ -88,13 +80,13 @@ public class AccordTopologyUtils EndpointsForToken pending = tokenMetadata.pendingEndpointsForToken(token, keyspace); if (i == 0) { - shards.add(createShard(minRange(tableId, token), natural, pending)); - finalShard = createShard(maxRange(tableId, tokens.get(mi-1)), natural, pending); + shards.add(createShard(minRange(keyspace, token), natural, pending)); + finalShard = createShard(maxRange(keyspace, tokens.get(mi-1)), natural, pending); } else { Token prev = tokens.get(i - 1); - shards.add(createShard(range(tableId, prev, token), natural, pending)); + shards.add(createShard(range(keyspace, prev, token), natural, pending)); } } shards.add(finalShard); @@ -104,31 +96,11 @@ public class AccordTopologyUtils public static Topology createTopology(long epoch) { - List<TableId> tableIds = new ArrayList<>(); TokenMetadata tokenMetadata = StorageService.instance.getTokenMetadata(); - for (String ksname: Schema.instance.getKeyspaces()) - { - // TODO: add a table metadata flag to enable and enforce accord use - if (SchemaConstants.LOCAL_SYSTEM_KEYSPACE_NAMES.contains(ksname)) - continue; - if (SchemaConstants.REPLICATED_SYSTEM_KEYSPACE_NAMES.contains(ksname)) - continue; - - Keyspace keyspace = Keyspace.open(ksname); - for (TableMetadata tableMetadata : keyspace.getMetadata().tables) - { - tableIds.add(tableMetadata.id); - } - } - - tableIds.sort(Comparator.naturalOrder()); - List<Shard> shards = new ArrayList<>(); - for (TableId tableId : tableIds) + for (String keyspace : Schema.instance.distributedKeyspaces().names()) { - TableMetadata tableMetadata = Schema.instance.getTableMetadata(tableId); - Preconditions.checkNotNull(tableMetadata); - shards.addAll(createShards(tableMetadata, tokenMetadata)); + shards.addAll(createShards(keyspace, tokenMetadata)); } return new Topology(epoch, shards.toArray(new Shard[0])); diff --git a/src/java/org/apache/cassandra/service/accord/ListenerProxy.java b/src/java/org/apache/cassandra/service/accord/ListenerProxy.java index bd9061d350..ea7a74c0c6 100644 --- a/src/java/org/apache/cassandra/service/accord/ListenerProxy.java +++ b/src/java/org/apache/cassandra/service/accord/ListenerProxy.java @@ -34,6 +34,7 @@ import accord.primitives.Keys; import accord.primitives.TxnId; import org.apache.cassandra.db.marshal.ByteBufferAccessor; import org.apache.cassandra.db.marshal.ValueAccessor; +import org.apache.cassandra.service.accord.AccordCommandStore.SafeAccordCommandStore; import org.apache.cassandra.service.accord.api.PartitionKey; import org.apache.cassandra.service.accord.async.AsyncContext; import org.apache.cassandra.service.accord.serializers.CommandSerializers; @@ -128,8 +129,8 @@ public abstract class ListenerProxy implements CommandListener, Comparable<Liste public void onChange(SafeCommandStore safeStore, Command c) { AccordCommand command = (AccordCommand) c; - AccordCommandStore commandStore = (AccordCommandStore) safeStore; - AsyncContext context = commandStore.getContext(); + SafeAccordCommandStore commandStore = (SafeAccordCommandStore) safeStore; + AsyncContext context = commandStore.context(); PreLoadContext loadCtx = PreLoadContext.contextFor(ImmutableList.of(command.txnId(), txnId), Keys.EMPTY); if (context.containsScopedItems(loadCtx)) { @@ -228,8 +229,8 @@ public abstract class ListenerProxy implements CommandListener, Comparable<Liste public void onChange(SafeCommandStore safeStore, Command c) { AccordCommand command = (AccordCommand) c; - AccordCommandStore commandStore = (AccordCommandStore) safeStore; - AsyncContext context = commandStore.getContext(); + SafeAccordCommandStore commandStore = (SafeAccordCommandStore) safeStore; + AsyncContext context = commandStore.context(); PreLoadContext loadCtx = PreLoadContext.contextFor(ImmutableList.of(command.txnId()), Keys.of(key)); if (context.containsScopedItems(loadCtx)) { diff --git a/src/java/org/apache/cassandra/service/accord/TokenRange.java b/src/java/org/apache/cassandra/service/accord/TokenRange.java index 7fb1ca8f34..81cb329f59 100644 --- a/src/java/org/apache/cassandra/service/accord/TokenRange.java +++ b/src/java/org/apache/cassandra/service/accord/TokenRange.java @@ -25,7 +25,6 @@ import accord.primitives.Range; import org.apache.cassandra.io.IVersionedSerializer; import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; -import org.apache.cassandra.schema.TableId; import org.apache.cassandra.service.accord.api.AccordRoutingKey; import org.apache.cassandra.service.accord.api.AccordRoutingKey.SentinelKey; @@ -36,9 +35,9 @@ public class TokenRange extends Range.EndInclusive super(start, end); } - public static TokenRange fullRange(TableId tableId) + public static TokenRange fullRange(String keyspace) { - return new TokenRange(SentinelKey.min(tableId), SentinelKey.max(tableId)); + return new TokenRange(SentinelKey.min(keyspace), SentinelKey.max(keyspace)); } @Override diff --git a/src/java/org/apache/cassandra/service/accord/api/AccordRoutableKey.java b/src/java/org/apache/cassandra/service/accord/api/AccordRoutableKey.java index f4066e9c1c..d19f832ace 100644 --- a/src/java/org/apache/cassandra/service/accord/api/AccordRoutableKey.java +++ b/src/java/org/apache/cassandra/service/accord/api/AccordRoutableKey.java @@ -22,32 +22,28 @@ import java.util.Objects; import accord.primitives.RoutableKey; import org.apache.cassandra.dht.Token; -import org.apache.cassandra.schema.TableId; +import org.apache.cassandra.service.accord.api.AccordRoutingKey.SentinelKey; +import org.apache.cassandra.service.accord.api.AccordRoutingKey.TokenKey; public abstract class AccordRoutableKey implements RoutableKey { - final TableId tableId; + final String keyspace; // TODO (desired): use an id (TrM) - protected AccordRoutableKey(TableId tableId) + protected AccordRoutableKey(String keyspace) { - this.tableId = tableId; + this.keyspace = keyspace; } - public final TableId tableId() { return tableId; } + public final String keyspace() { return keyspace; } public abstract Token token(); - @Override - public final int routingHash() - { - return token().tokenHash(); - } - @Override public int hashCode() { - return Objects.hash(tableId, routingHash()); + return Objects.hash(keyspace, token().tokenHash()); } + @Override public final int compareTo(RoutableKey that) { return compareTo((AccordRoutableKey) that); @@ -55,18 +51,24 @@ public abstract class AccordRoutableKey implements RoutableKey public final int compareTo(AccordRoutableKey that) { - int cmp = this.tableId().compareTo(that.tableId()); + int cmp = this.keyspace().compareTo(that.keyspace()); if (cmp != 0) return cmp; - if (this instanceof AccordRoutingKey.SentinelKey || that instanceof AccordRoutingKey.SentinelKey) + if (this.getClass() == SentinelKey.class || that.getClass() == SentinelKey.class) { - int leftInt = this instanceof AccordRoutingKey.SentinelKey ? ((AccordRoutingKey.SentinelKey) this).asInt() : 0; - int rightInt = that instanceof AccordRoutingKey.SentinelKey ? ((AccordRoutingKey.SentinelKey) that).asInt() : 0; + int leftInt = this.getClass() == SentinelKey.class ? ((SentinelKey) this).asInt() : 0; + int rightInt = that.getClass() == SentinelKey.class ? ((SentinelKey) that).asInt() : 0; return Integer.compare(leftInt, rightInt); } - return this.token().compareTo(that.token()); + cmp = this.token().compareTo(that.token()); + if (cmp != 0) + return cmp; + + if (this.getClass() == TokenKey.class) + return that.getClass() == TokenKey.class ? 0 : 1; + return that.getClass() == TokenKey.class ? -1 : ((PartitionKey)this).tableId.compareTo(((PartitionKey)that).tableId); } @Override diff --git a/src/java/org/apache/cassandra/service/accord/api/AccordRoutingKey.java b/src/java/org/apache/cassandra/service/accord/api/AccordRoutingKey.java index e9c600ae4a..8a763f5e61 100644 --- a/src/java/org/apache/cassandra/service/accord/api/AccordRoutingKey.java +++ b/src/java/org/apache/cassandra/service/accord/api/AccordRoutingKey.java @@ -19,23 +19,29 @@ package org.apache.cassandra.service.accord.api; import java.io.IOException; +import java.math.BigInteger; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; import java.util.Objects; +import java.util.TreeMap; import accord.api.Key; import accord.api.RoutingKey; -import org.apache.cassandra.config.DatabaseDescriptor; +import accord.local.ShardDistributor; +import accord.primitives.Range; +import accord.primitives.Ranges; import org.apache.cassandra.db.TypeSizes; import org.apache.cassandra.dht.IPartitioner; import org.apache.cassandra.dht.Token; import org.apache.cassandra.io.IVersionedSerializer; import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; -import org.apache.cassandra.schema.Schema; -import org.apache.cassandra.schema.TableId; -import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.ObjectSizes; +import static org.apache.cassandra.config.DatabaseDescriptor.getPartitioner; + public abstract class AccordRoutingKey extends AccordRoutableKey implements RoutingKey { enum RoutingKeyKind @@ -43,9 +49,9 @@ public abstract class AccordRoutingKey extends AccordRoutableKey implements Rout TOKEN, SENTINEL } - protected AccordRoutingKey(TableId tableId) + protected AccordRoutingKey(String keyspace) { - super(tableId); + super(keyspace); } public abstract RoutingKeyKind kindOfRoutingKey(); @@ -56,22 +62,23 @@ public abstract class AccordRoutingKey extends AccordRoutableKey implements Rout return (AccordRoutingKey) key; } - public static class SentinelKey extends AccordRoutingKey + // final in part because we refer to its class directly in AccordRoutableKey.compareTo + public static final class SentinelKey extends AccordRoutingKey { private static final long EMPTY_SIZE = ObjectSizes.measure(new SentinelKey(null, true)); private final boolean isMin; - private SentinelKey(TableId tableId, boolean isMin) + private SentinelKey(String keyspace, boolean isMin) { - super(tableId); + super(keyspace); this.isMin = isMin; } @Override public int hashCode() { - return Objects.hash(tableId, isMin); + return Objects.hash(keyspace, isMin); } @Override @@ -86,20 +93,20 @@ public abstract class AccordRoutingKey extends AccordRoutableKey implements Rout return EMPTY_SIZE; } - public static SentinelKey min(TableId tableId) + public static SentinelKey min(String keyspace) { - return new SentinelKey(tableId, true); + return new SentinelKey(keyspace, true); } - public static SentinelKey max(TableId tableId) + public static SentinelKey max(String keyspace) { - return new SentinelKey(tableId, false); + return new SentinelKey(keyspace, false); } public TokenKey toTokenKey() { - IPartitioner partitioner = DatabaseDescriptor.getPartitioner(); - return new TokenKey(tableId, isMin ? + IPartitioner partitioner = getPartitioner(); + return new TokenKey(keyspace, isMin ? partitioner.getMinimumToken().increaseSlightly() : partitioner.getMaximumToken().decreaseSlightly()); } @@ -119,7 +126,7 @@ public abstract class AccordRoutingKey extends AccordRoutableKey implements Rout public String toString() { return "SentinelKey{" + - "tableId=" + tableId + + "keyspace=" + keyspace + ", key=" + (isMin ? "min": "max") + '}'; } @@ -130,39 +137,40 @@ public abstract class AccordRoutingKey extends AccordRoutableKey implements Rout public void serialize(SentinelKey key, DataOutputPlus out, int version) throws IOException { out.writeBoolean(key.isMin); - key.tableId().serialize(out); + out.writeUTF(key.keyspace); } @Override public SentinelKey deserialize(DataInputPlus in, int version) throws IOException { boolean isMin = in.readBoolean(); - TableId tableId = TableId.deserialize(in); - return new SentinelKey(tableId, isMin); + String keyspace = in.readUTF(); + return new SentinelKey(keyspace, isMin); } @Override public long serializedSize(SentinelKey key, int version) { - return TypeSizes.BOOL_SIZE + TableId.serializedSize(); + return TypeSizes.BOOL_SIZE + TypeSizes.sizeof(key.keyspace); } }; } - public static class TokenKey extends AccordRoutingKey + // final in part because we refer to its class directly in AccordRoutableKey.compareToe + public static final class TokenKey extends AccordRoutingKey { private static final long EMPTY_SIZE; static { - Token key = DatabaseDescriptor.getPartitioner().decorateKey(ByteBufferUtil.EMPTY_BYTE_BUFFER).getToken(); + Token key = getPartitioner().decorateKey(ByteBufferUtil.EMPTY_BYTE_BUFFER).getToken(); EMPTY_SIZE = ObjectSizes.measureDeep(new TokenKey(null, key)); } final Token token; - public TokenKey(TableId tableId, Token token) + public TokenKey(String keyspace, Token token) { - super(tableId); + super(keyspace); this.token = token; } @@ -182,7 +190,7 @@ public abstract class AccordRoutingKey extends AccordRoutableKey implements Rout public String toString() { return "TokenKey{" + - "tableId=" + tableId() + + "keyspace=" + keyspace() + ", key=" + token() + '}'; } @@ -200,24 +208,22 @@ public abstract class AccordRoutingKey extends AccordRoutableKey implements Rout @Override public void serialize(TokenKey key, DataOutputPlus out, int version) throws IOException { - key.tableId().serialize(out); + out.writeUTF(key.keyspace); Token.compactSerializer.serialize(key.token, out, version); } @Override public TokenKey deserialize(DataInputPlus in, int version) throws IOException { - TableId tableId = TableId.deserialize(in); - TableMetadata metadata = Schema.instance.getTableMetadata(tableId); - // TODO: metadata might be null here if the table was dropped? - Token token = Token.compactSerializer.deserialize(in, metadata.partitioner, version); - return new TokenKey(tableId, token); + String keyspace = in.readUTF(); + Token token = Token.compactSerializer.deserialize(in, getPartitioner(), version); + return new TokenKey(keyspace, token); } @Override public long serializedSize(TokenKey key, int version) { - return TableId.serializedSize() + Token.compactSerializer.serializedSize(key.token(), version); + return TypeSizes.sizeof(key.keyspace) + Token.compactSerializer.serializedSize(key.token(), version); } } } @@ -275,4 +281,37 @@ public abstract class AccordRoutingKey extends AccordRoutableKey implements Rout return size; } }; + + public static class KeyspaceSplitter implements ShardDistributor + { + final EvenSplit<BigInteger> subSplitter; + public KeyspaceSplitter(EvenSplit<BigInteger> subSplitter) + { + this.subSplitter = subSplitter; + } + + @Override + public List<Ranges> split(Ranges ranges) + { + Map<String, List<Range>> byKeyspace = new TreeMap<>(); + for (Range range : ranges) + { + byKeyspace.computeIfAbsent(((AccordRoutableKey)range.start()).keyspace, ignore -> new ArrayList<>()) + .add(range); + } + + List<Ranges> results = new ArrayList<>(); + for (List<Range> keyspaceRanges : byKeyspace.values()) + { + List<Ranges> splits = subSplitter.split(Ranges.ofSortedAndDeoverlapped(keyspaceRanges.toArray(new Range[0]))); + + for (int i = 0; i < splits.size(); i++) + { + if (i == results.size()) results.add(Ranges.EMPTY); + results.set(i, results.get(i).union(splits.get(i))); + } + } + return results; + } + } } diff --git a/src/java/org/apache/cassandra/service/accord/api/PartitionKey.java b/src/java/org/apache/cassandra/service/accord/api/PartitionKey.java index 18296d4adc..13e5439802 100644 --- a/src/java/org/apache/cassandra/service/accord/api/PartitionKey.java +++ b/src/java/org/apache/cassandra/service/accord/api/PartitionKey.java @@ -20,7 +20,6 @@ package org.apache.cassandra.service.accord.api; import java.io.IOException; import java.nio.ByteBuffer; -import java.util.Objects; import com.google.common.base.Preconditions; @@ -45,21 +44,24 @@ import org.apache.cassandra.service.accord.api.AccordRoutingKey.TokenKey; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.ObjectSizes; -public class PartitionKey extends AccordRoutableKey implements Key +// final in part because we refer to its class directly in AccordRoutableKey.compareTo +public final class PartitionKey extends AccordRoutableKey implements Key { private static final long EMPTY_SIZE; static { DecoratedKey key = DatabaseDescriptor.getPartitioner().decorateKey(ByteBufferUtil.EMPTY_BYTE_BUFFER); - EMPTY_SIZE = ObjectSizes.measureDeep(new PartitionKey(null, key)); + EMPTY_SIZE = ObjectSizes.measureDeep(new PartitionKey(null, null, key)); } + final TableId tableId; // TODO (expected): move to PartitionKey final DecoratedKey key; - public PartitionKey(TableId tableId, DecoratedKey key) + public PartitionKey(String keyspace, TableId tableId, DecoratedKey key) { - super(tableId); + super(keyspace); + this.tableId = tableId; this.key = key; } @@ -70,14 +72,16 @@ public class PartitionKey extends AccordRoutableKey implements Key public static PartitionKey of(Partition partition) { - return new PartitionKey(partition.metadata().id, partition.partitionKey()); + return new PartitionKey(partition.metadata().keyspace, partition.metadata().id, partition.partitionKey()); } public static PartitionKey of(SinglePartitionReadCommand command) { - return new PartitionKey(command.metadata().id, command.partitionKey()); + return new PartitionKey(command.metadata().keyspace, command.metadata().id, command.partitionKey()); } + public final TableId tableId() { return tableId; } + @Override public Token token() { @@ -92,7 +96,7 @@ public class PartitionKey extends AccordRoutableKey implements Key @Override public RoutingKey toUnseekable() { - return new TokenKey(tableId(), token()); + return new TokenKey(keyspace, token()); } public long estimatedSizeOnHeap() @@ -147,20 +151,20 @@ public class PartitionKey extends AccordRoutableKey implements Key TableId tableId = TableId.deserialize(in); TableMetadata metadata = Schema.instance.getExistingTableMetadata(tableId); DecoratedKey key = metadata.partitioner.decorateKey(ByteBufferUtil.readWithShortLength(in)); - return new PartitionKey(tableId, key); + return new PartitionKey(metadata.keyspace, tableId, key); } public <V> PartitionKey deserialize(V src, ValueAccessor<V> accessor, int offset) throws IOException { TableId tableId = TableId.deserialize(src, accessor, offset); - offset += TableId.serializedSize(); + offset += tableId.serializedSize(); TableMetadata metadata = Schema.instance.getTableMetadata(tableId); int numBytes = accessor.getShort(src, offset); offset += TypeSizes.SHORT_SIZE; ByteBuffer bytes = ByteBuffer.allocate(numBytes); accessor.copyTo(src, offset, bytes, ByteBufferAccessor.instance, 0, numBytes); DecoratedKey key = metadata.partitioner.decorateKey(bytes); - return new PartitionKey(tableId, key); + return new PartitionKey(metadata.keyspace, tableId, key); } @Override diff --git a/src/java/org/apache/cassandra/service/accord/async/AsyncOperation.java b/src/java/org/apache/cassandra/service/accord/async/AsyncOperation.java index d52fcb1439..f03eab2f0d 100644 --- a/src/java/org/apache/cassandra/service/accord/async/AsyncOperation.java +++ b/src/java/org/apache/cassandra/service/accord/async/AsyncOperation.java @@ -33,6 +33,7 @@ import accord.local.SafeCommandStore; import accord.primitives.Seekables; import accord.primitives.TxnId; import org.apache.cassandra.service.accord.AccordCommandStore; +import org.apache.cassandra.service.accord.AccordCommandStore.SafeAccordCommandStore; import org.apache.cassandra.service.accord.api.PartitionKey; import org.apache.cassandra.utils.concurrent.AsyncPromise; @@ -140,6 +141,7 @@ public abstract class AsyncOperation<R> extends AsyncPromise<R> implements Runna protected void runInternal() { + SafeAccordCommandStore safeStore = commandStore.safeStore(context); switch (state) { case INITIALIZED: @@ -149,7 +151,7 @@ public abstract class AsyncOperation<R> extends AsyncPromise<R> implements Runna return; state = State.RUNNING; - result = apply(commandStore); + result = apply(safeStore); state = State.SAVING; case SAVING: diff --git a/src/java/org/apache/cassandra/service/accord/txn/TxnData.java b/src/java/org/apache/cassandra/service/accord/txn/TxnData.java index 55f1400075..20b56b4c9c 100644 --- a/src/java/org/apache/cassandra/service/accord/txn/TxnData.java +++ b/src/java/org/apache/cassandra/service/accord/txn/TxnData.java @@ -24,8 +24,6 @@ import java.util.Iterator; import java.util.Map; import java.util.Set; -import com.google.common.base.Preconditions; - import accord.api.Data; import accord.api.Result; import org.apache.cassandra.db.TypeSizes; @@ -131,20 +129,16 @@ public class TxnData implements Data, Result, Iterable<FilteredPartition> public void serialize(FilteredPartition partition, DataOutputPlus out, int version) throws IOException { partition.metadata().id.serialize(out); - TableMetadata metadata = Schema.instance.getTableMetadata(partition.metadata().id); - try (UnfilteredRowIterator iterator = partition.unfilteredIterator()) { - // TODO: Will metadata be null if we've dropped a table? - UnfilteredRowIteratorSerializer.serializer.serialize(iterator, ColumnFilter.all(metadata), out, version, partition.rowCount()); + UnfilteredRowIteratorSerializer.serializer.serialize(iterator, ColumnFilter.all(partition.metadata()), out, version, partition.rowCount()); } } @Override public FilteredPartition deserialize(DataInputPlus in, int version) throws IOException { - TableMetadata metadata = Schema.instance.getTableMetadata(TableId.deserialize(in)); - Preconditions.checkState(metadata != null); + TableMetadata metadata = Schema.instance.getExistingTableMetadata(TableId.deserialize(in)); try (UnfilteredRowIterator partition = UnfilteredRowIteratorSerializer.serializer.deserialize(in, version, metadata, ColumnFilter.all(metadata), DeserializationHelper.Flag.FROM_REMOTE)) { return new FilteredPartition(UnfilteredRowIterators.filter(partition, 0)); @@ -154,12 +148,11 @@ public class TxnData implements Data, Result, Iterable<FilteredPartition> @Override public long serializedSize(FilteredPartition partition, int version) { - long size = TableId.serializedSize(); - TableMetadata metadata = Schema.instance.getTableMetadata(partition.metadata().id); - Preconditions.checkState(metadata != null); + TableId tableId = partition.metadata().id; + long size = tableId.serializedSize(); try (UnfilteredRowIterator iterator = partition.unfilteredIterator()) { - return size + UnfilteredRowIteratorSerializer.serializer.serializedSize(iterator, ColumnFilter.all(metadata), version, partition.rowCount()); + return size + UnfilteredRowIteratorSerializer.serializer.serializedSize(iterator, ColumnFilter.all(partition.metadata()), version, partition.rowCount()); } } }; diff --git a/src/java/org/apache/cassandra/service/accord/txn/TxnNamedRead.java b/src/java/org/apache/cassandra/service/accord/txn/TxnNamedRead.java index 089e1cda34..fe98121f3f 100644 --- a/src/java/org/apache/cassandra/service/accord/txn/TxnNamedRead.java +++ b/src/java/org/apache/cassandra/service/accord/txn/TxnNamedRead.java @@ -58,7 +58,7 @@ public class TxnNamedRead extends AbstractSerialized<ReadCommand> { super(value); this.name = name; - this.key = new PartitionKey(value.metadata().id, value.partitionKey()); + this.key = new PartitionKey(value.metadata().keyspace, value.metadata().id, value.partitionKey()); } private TxnNamedRead(TxnDataName name, PartitionKey key, ByteBuffer bytes) diff --git a/src/java/org/apache/cassandra/service/accord/txn/TxnUpdate.java b/src/java/org/apache/cassandra/service/accord/txn/TxnUpdate.java index 7b26b2acdd..0eb3e781aa 100644 --- a/src/java/org/apache/cassandra/service/accord/txn/TxnUpdate.java +++ b/src/java/org/apache/cassandra/service/accord/txn/TxnUpdate.java @@ -161,7 +161,7 @@ public class TxnUpdate implements Update else if (ByteBufferUtil.compareUnsigned(left[l], right[r]) != 0) { throw new IllegalStateException("The same keys have different values in each input"); } else { out[o++] = left[l++]; r++; } } - while (l < leftKeys.size()) { out[o++] = left[l]; } + while (l < leftKeys.size()) { out[o++] = left[l++]; } while (r < rightKeys.size()) { out[o++] = right[r++]; } return out; } diff --git a/test/unit/org/apache/cassandra/dht/ByteOrderedPartitionerTest.java b/test/unit/org/apache/cassandra/dht/ByteOrderedPartitionerTest.java index f40c284b0e..e4ff1bc66e 100644 --- a/test/unit/org/apache/cassandra/dht/ByteOrderedPartitionerTest.java +++ b/test/unit/org/apache/cassandra/dht/ByteOrderedPartitionerTest.java @@ -17,6 +17,12 @@ */ package org.apache.cassandra.dht; +import java.util.Arrays; + +import org.junit.Assert; + +import org.apache.cassandra.dht.ByteOrderedPartitioner.BytesToken; + public class ByteOrderedPartitionerTest extends PartitionerTestCase { public void initPartitioner() @@ -28,4 +34,14 @@ public class ByteOrderedPartitionerTest extends PartitionerTestCase { return false; } + + @Override + protected void checkRoundTrip(Token original, Token roundTrip) + { + BytesToken orig = (BytesToken) original; + BytesToken rt = (BytesToken) roundTrip; + Assert.assertArrayEquals(orig.token, Arrays.copyOf(rt.token, orig.token.length)); + for (int i = orig.token.length ; i < rt.token.length ; ++i) + Assert.assertEquals((byte)0, rt.token[i]); + } } diff --git a/test/unit/org/apache/cassandra/dht/LengthPartitioner.java b/test/unit/org/apache/cassandra/dht/LengthPartitioner.java index ca6504ced8..626d8e0c41 100644 --- a/test/unit/org/apache/cassandra/dht/LengthPartitioner.java +++ b/test/unit/org/apache/cassandra/dht/LengthPartitioner.java @@ -21,7 +21,9 @@ import java.math.BigInteger; import java.nio.ByteBuffer; import java.util.*; import java.util.concurrent.ThreadLocalRandom; +import java.util.function.Function; +import accord.primitives.Ranges; import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.schema.Schema; import org.apache.cassandra.db.BufferDecoratedKey; @@ -37,7 +39,7 @@ import org.apache.cassandra.utils.Pair; import org.apache.cassandra.utils.bytecomparable.ByteComparable; import org.apache.cassandra.utils.bytecomparable.ByteSource; -public class LengthPartitioner implements IPartitioner +public class LengthPartitioner extends AccordSplitter implements IPartitioner { public static final BigInteger ZERO = new BigInteger("0"); public static final BigIntegerToken MINIMUM = new BigIntegerToken("-1"); @@ -179,4 +181,34 @@ public class LengthPartitioner implements IPartitioner { return new PartitionerDefinedOrder(this); } + + @Override + public Function<Ranges, AccordSplitter> accordSplitter() + { + return ignore -> this; + } + + @Override + BigInteger valueForToken(Token token) + { + return ((BigIntegerToken)token).token; + } + + @Override + Token tokenForValue(BigInteger value) + { + return new BigIntegerToken(value); + } + + @Override + BigInteger minimumValue() + { + throw new UnsupportedOperationException(); + } + + @Override + BigInteger maximumValue() + { + throw new UnsupportedOperationException(); + } } diff --git a/test/unit/org/apache/cassandra/dht/OrderPreservingPartitionerTest.java b/test/unit/org/apache/cassandra/dht/OrderPreservingPartitionerTest.java index 6ab5b456b3..f8e65bd89f 100644 --- a/test/unit/org/apache/cassandra/dht/OrderPreservingPartitionerTest.java +++ b/test/unit/org/apache/cassandra/dht/OrderPreservingPartitionerTest.java @@ -18,11 +18,16 @@ package org.apache.cassandra.dht; import java.io.IOException; +import java.math.BigInteger; +import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Test; import org.apache.cassandra.SchemaLoader; +import org.apache.cassandra.dht.OrderPreservingPartitioner.StringToken; +import org.apache.cassandra.service.accord.TokenRange; +import org.apache.cassandra.service.accord.api.AccordRoutingKey; public class OrderPreservingPartitionerTest extends PartitionerTestCase { @@ -44,15 +49,12 @@ public class OrderPreservingPartitionerTest extends PartitionerTestCase return false; } - @Test - public void testCompare() + @Override + protected void checkRoundTrip(Token original, Token roundTrip) { - assert tok("").compareTo(tok("asdf")) < 0; - assert tok("asdf").compareTo(tok("")) > 0; - assert tok("").compareTo(tok("")) == 0; - assert tok("z").compareTo(tok("a")) > 0; - assert tok("a").compareTo(tok("z")) < 0; - assert tok("asdf").compareTo(tok("asdf")) == 0; - assert tok("asdz").compareTo(tok("asdf")) > 0; + StringToken orig = (StringToken) original; + StringToken rt = (StringToken) roundTrip; + Assert.assertEquals(orig.token, rt.token.substring(0, orig.token.length())); + Assert.assertTrue(rt.token.substring(orig.token.length()).matches("\0*")); } } diff --git a/test/unit/org/apache/cassandra/dht/PartitionerTestCase.java b/test/unit/org/apache/cassandra/dht/PartitionerTestCase.java index ec535b0d6a..a4346b8a0c 100644 --- a/test/unit/org/apache/cassandra/dht/PartitionerTestCase.java +++ b/test/unit/org/apache/cassandra/dht/PartitionerTestCase.java @@ -17,6 +17,7 @@ */ package org.apache.cassandra.dht; +import java.math.BigInteger; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collections; @@ -24,13 +25,18 @@ import java.util.List; import java.util.Map; import java.util.Random; +import org.junit.Assert; import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; +import accord.primitives.Ranges; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.commitlog.CommitLog; import org.apache.cassandra.service.StorageService; +import org.apache.cassandra.service.accord.TokenRange; +import org.apache.cassandra.service.accord.api.AccordRoutingKey; +import org.apache.cassandra.service.accord.api.AccordRoutingKey.TokenKey; import static org.junit.Assert.assertEquals; import static org.junit.Assert.fail; @@ -215,4 +221,113 @@ public abstract class PartitionerTestCase totalOwnership += ownership; assertEquals(1.0, totalOwnership, 0.001); } + + @Test + public void testCompare() + { + if (!partitioner.preservesOrder()) + return; + + assert tok("").compareTo(tok("asdf")) < 0; + assert tok("asdf").compareTo(tok("")) > 0; + assert tok("").compareTo(tok("")) == 0; + assert tok("z").compareTo(tok("a")) > 0; + assert tok("a").compareTo(tok("z")) < 0; + assert tok("asdf").compareTo(tok("asdf")) == 0; + assert tok("asdz").compareTo(tok("asdf")) > 0; + } + + @Test + public void testCompareSplitter() + { + for (int i = 0 ; i < 16 ; ++i) + { + Token a = partitioner.getRandomToken(), b = partitioner.getRandomToken(); + while (a.equals(b)) + b = partitioner.getRandomToken(); + if (a.compareTo(b) > 0) { Token tmp = a; a = b; b = tmp; } + testCompareSplitter(a, b); + } + + if (!partitioner.preservesOrder()) + return; + + testCompareSplitter(tok(""), tok("asdf")); + testCompareSplitter(tok(""), tok("")); + testCompareSplitter(tok("a"), tok("z")); + testCompareSplitter(tok("asdf"), tok("asdf")); + testCompareSplitter(tok("asd"), tok("asdf")); + testCompareSplitter(tok("asdf"), tok("asf")); + testCompareSplitter(tok("asdf"), tok("asdz")); + } + + @Test + public void testSplitter() + { + for (int i = 0 ; i < 1024 ; ++i) + { + Token a = partitioner.getRandomToken(), b = partitioner.getRandomToken(); + while (a.equals(b)) + b = partitioner.getRandomToken(); + if (a.compareTo(b) > 0) { Token tmp = a; a = b; b = tmp; } + testSplitter(a, b); + } + + if (!partitioner.preservesOrder()) + return; + + testSplitter(tok(""), tok("asdf")); + testSplitter(tok("a"), tok("z")); + testSplitter(tok("asd"), tok("asdf")); + testSplitter(tok("asdf"), tok("asdz")); + } + + void testCompareSplitter(Token less, Token more) + { + Ranges ranges; + if (less.equals(more) && less.isMinimum()) + ranges = Ranges.EMPTY; + else if (less.equals(more)) + ranges = Ranges.of(new TokenRange(new TokenKey("", partitioner.getMinimumToken()), new TokenKey("", less))); + else + ranges = Ranges.of(new TokenRange(new TokenKey("", less), new TokenKey("", more))); + + AccordSplitter splitter = partitioner.accordSplitter().apply(ranges); + BigInteger lv = splitter.valueForToken(less); + BigInteger rv = splitter.valueForToken(more); + Assert.assertEquals(less.equals(more) ? 0 : -1, normaliseCompare(lv.compareTo(rv))); + Assert.assertEquals(less.equals(more) ? 0 : 1, normaliseCompare(rv.compareTo(lv))); + checkRoundTrip(less, splitter.tokenForValue(lv)); + checkRoundTrip(more, splitter.tokenForValue(rv)); + } + + void testSplitter(Token start, Token end) + { + accord.primitives.Range range = new TokenRange(new TokenKey("", start), new TokenKey("", end)); + AccordSplitter splitter = partitioner.accordSplitter().apply(Ranges.of(range)); + if (!start.isMinimum()) + testSplitter(new TokenRange(new TokenKey("", partitioner.getMinimumToken()), new TokenKey("", start))); + testSplitter(new TokenRange(new TokenKey("", start), new TokenKey("", splitter.tokenForValue(splitter.maximumValue())))); + checkRoundTrip(start, splitter.tokenForValue(splitter.valueForToken(start))); + checkRoundTrip(end, splitter.tokenForValue(splitter.valueForToken(end))); + } + + void testSplitter(accord.primitives.Range range) + { + AccordSplitter splitter = partitioner.accordSplitter().apply(Ranges.of(range)); + BigInteger size = splitter.sizeOf(range); + Assert.assertEquals(range, splitter.subRange(range, BigInteger.ZERO, size)); + } + + protected void checkRoundTrip(Token original, Token roundTrip) + { + Assert.assertEquals(original, roundTrip); + } + + static int normaliseCompare(int compareResult) + { + if (compareResult < 0) return -1; + if (compareResult > 0) return 1; + return 0; + } } diff --git a/test/unit/org/apache/cassandra/service/accord/AccordCommandTest.java b/test/unit/org/apache/cassandra/service/accord/AccordCommandTest.java index 67683661e1..fa16a474f8 100644 --- a/test/unit/org/apache/cassandra/service/accord/AccordCommandTest.java +++ b/test/unit/org/apache/cassandra/service/accord/AccordCommandTest.java @@ -49,6 +49,7 @@ import org.apache.cassandra.schema.KeyspaceParams; import org.apache.cassandra.schema.Schema; import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.service.StorageService; +import org.apache.cassandra.service.accord.AccordCommandStore.SafeAccordCommandStore; import org.apache.cassandra.service.accord.api.PartitionKey; import org.apache.cassandra.utils.ByteBufferUtil; @@ -75,7 +76,7 @@ public class AccordCommandTest private static PartitionKey key(int k) { TableMetadata metadata = Schema.instance.getTableMetadata("ks", "tbl"); - return new PartitionKey(metadata.id, metadata.partitioner.decorateKey(ByteBufferUtil.bytes(k))); + return new PartitionKey(metadata.keyspace, metadata.id, metadata.partitioner.decorateKey(ByteBufferUtil.bytes(k))); } /** @@ -85,7 +86,7 @@ public class AccordCommandTest public void basicCycleTest() throws ExecutionException, InterruptedException { AccordCommandStore commandStore = createAccordCommandStore(clock::incrementAndGet, "ks", "tbl"); - commandStore.execute(PreLoadContext.empty(), instance -> { ((AccordCommandStore) instance).setCacheSize(0); }).get(); + commandStore.execute(PreLoadContext.empty(), instance -> { ((SafeAccordCommandStore) instance).commandStore().setCacheSize(0); }).get(); TxnId txnId = txnId(1, clock.incrementAndGet(), 0, 1); @@ -167,7 +168,7 @@ public class AccordCommandTest public void computeDeps() throws Throwable { AccordCommandStore commandStore = createAccordCommandStore(clock::incrementAndGet, "ks", "tbl"); - commandStore.execute(PreLoadContext.empty(), instance -> { ((AccordCommandStore) instance).setCacheSize(0); }).get(); + commandStore.execute(PreLoadContext.empty(), instance -> { ((SafeAccordCommandStore) instance).commandStore().setCacheSize(0); }).get(); TxnId txnId1 = txnId(1, clock.incrementAndGet(), 0, 1); Txn txn = createTxn(2); diff --git a/test/unit/org/apache/cassandra/service/accord/AccordTestUtils.java b/test/unit/org/apache/cassandra/service/accord/AccordTestUtils.java index 20142c439b..b835e0fbf5 100644 --- a/test/unit/org/apache/cassandra/service/accord/AccordTestUtils.java +++ b/test/unit/org/apache/cassandra/service/accord/AccordTestUtils.java @@ -18,14 +18,9 @@ package org.apache.cassandra.service.accord; -import java.util.Arrays; import java.util.Collections; -import java.util.Comparator; -import java.util.List; import java.util.Set; import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.function.LongSupplier; import javax.annotation.Nullable; @@ -41,7 +36,7 @@ import accord.api.RoutingKey; import accord.api.Write; import accord.impl.InMemoryCommandStore; import accord.local.Command; -import accord.local.CommandStore; +import accord.local.CommandStores; import accord.local.Node; import accord.local.Node.Id; import accord.local.NodeTimeService; @@ -51,7 +46,6 @@ import accord.primitives.Ballot; import accord.primitives.Ranges; import accord.primitives.Keys; import accord.primitives.PartialTxn; -import accord.primitives.Range; import accord.primitives.Timestamp; import accord.primitives.Txn; import accord.primitives.TxnId; @@ -63,7 +57,6 @@ import org.apache.cassandra.cql3.QueryOptions; import org.apache.cassandra.cql3.QueryProcessor; import org.apache.cassandra.cql3.statements.TransactionStatement; import org.apache.cassandra.schema.Schema; -import org.apache.cassandra.schema.TableId; import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.service.ClientState; import org.apache.cassandra.service.accord.api.AccordAgent; @@ -97,23 +90,6 @@ public class AccordTestUtils @Override public void waiting(TxnId blockedBy, Known blockedUntil, Unseekables<?, ?> blockedOn) {} }; - public static Topology simpleTopology(TableId... tables) - { - Arrays.sort(tables, Comparator.naturalOrder()); - Id node = localNodeId(); - Shard[] shards = new Shard[tables.length]; - - List<Id> nodes = Lists.newArrayList(node); - Set<Id> fastPath = Sets.newHashSet(node); - for (int i=0; i<tables.length; i++) - { - Range range = TokenRange.fullRange(tables[i]); - shards[i] = new Shard(range, nodes, fastPath, Collections.emptySet()); - } - - return new Topology(1, shards); - } - public static TxnId txnId(long epoch, long real, int logical, long node) { return new TxnId(epoch, real, logical, new Node.Id(node)); @@ -143,7 +119,7 @@ public class AccordTestUtils .map(key -> { try { - return read.read(key, command.kind(), commandStore, command.executeAt(), null).get(); + return read.read(key, command.kind(), instance, command.executeAt(), null).get(); } catch (InterruptedException e) { @@ -202,8 +178,8 @@ public class AccordTestUtils public static Ranges fullRange(Txn txn) { - TableId tableId = ((PartitionKey)txn.keys().get(0)).tableId(); - return Ranges.of(TokenRange.fullRange(tableId)); + PartitionKey key = (PartitionKey) txn.keys().get(0); + return Ranges.of(TokenRange.fullRange(key.keyspace())); } public static PartialTxn createPartialTxn(int key) @@ -213,46 +189,21 @@ public class AccordTestUtils return new PartialTxn.InMemory(ranges, txn.kind(), txn.keys(), txn.read(), txn.query(), txn.update()); } - private static class SingleEpochRanges implements CommandStore.RangesForEpoch + private static class SingleEpochRanges extends CommandStores.RangesForEpochHolder { private final Ranges ranges; public SingleEpochRanges(Ranges ranges) { this.ranges = ranges; - } - - @Override - public Ranges at(long epoch) - { - assert epoch == 1; - return ranges; - } - - @Override - public Ranges between(long fromInclusive, long toInclusive) - { - return ranges; - } - - @Override - public Ranges since(long epoch) - { - assert epoch == 1; - return ranges; - } - - @Override - public boolean owns(long epoch, RoutingKey key) - { - return ranges.contains(key); + this.current = new CommandStores.RangesForEpoch(1, ranges); } } public static InMemoryCommandStore.Synchronized createInMemoryCommandStore(LongSupplier now, String keyspace, String table) { TableMetadata metadata = Schema.instance.getTableMetadata(keyspace, table); - TokenRange range = TokenRange.fullRange(metadata.id); + TokenRange range = TokenRange.fullRange(metadata.keyspace); Node.Id node = EndpointMapping.endpointToId(FBUtilities.getBroadcastAddressAndPort()); Topology topology = new Topology(1, new Shard(range, Lists.newArrayList(node), Sets.newHashSet(node), Collections.emptySet())); NodeTimeService time = new NodeTimeService() @@ -262,7 +213,7 @@ public class AccordTestUtils @Override public long now() {return now.getAsLong(); } @Override public Timestamp uniqueNow(Timestamp atLeast) { return new Timestamp(1, now.getAsLong(), 0, node); } }; - return new InMemoryCommandStore.Synchronized(0, 0, 1, 8, + return new InMemoryCommandStore.Synchronized(0, time, new AccordAgent(), null, @@ -272,11 +223,6 @@ public class AccordTestUtils public static AccordCommandStore createAccordCommandStore(Node.Id node, LongSupplier now, Topology topology) { - ExecutorService executor = Executors.newSingleThreadExecutor(r -> { - Thread thread = new Thread(r); - thread.setName(CommandStore.class.getSimpleName() + '[' + node + ':' + 0 + ']'); - return thread; - }); NodeTimeService time = new NodeTimeService() { @Override public Id id() { return node;} @@ -284,21 +230,22 @@ public class AccordTestUtils @Override public long now() {return now.getAsLong(); } @Override public Timestamp uniqueNow(Timestamp atLeast) { return new Timestamp(1, now.getAsLong(), 0, node); } }; - return new AccordCommandStore(0, 0, 0, 1, + return new AccordCommandStore(0, time, new AccordAgent(), null, cs -> NOOP_PROGRESS_LOG, - new SingleEpochRanges(topology.rangesForNode(node)), - executor); + new SingleEpochRanges(topology.rangesForNode(node))); } public static AccordCommandStore createAccordCommandStore(LongSupplier now, String keyspace, String table) { TableMetadata metadata = Schema.instance.getTableMetadata(keyspace, table); - TokenRange range = TokenRange.fullRange(metadata.id); + TokenRange range = TokenRange.fullRange(metadata.keyspace); Node.Id node = EndpointMapping.endpointToId(FBUtilities.getBroadcastAddressAndPort()); Topology topology = new Topology(1, new Shard(range, Lists.newArrayList(node), Sets.newHashSet(node), Collections.emptySet())); - return createAccordCommandStore(node, now, topology); + AccordCommandStore store = createAccordCommandStore(node, now, topology); + store.execute(PreLoadContext.empty(), safeStore -> ((AccordCommandStore)safeStore.commandStore()).setCacheSize(1 << 20)); + return store; } public static void execute(AccordCommandStore commandStore, Runnable runnable) diff --git a/test/unit/org/apache/cassandra/service/accord/AccordTopologyTest.java b/test/unit/org/apache/cassandra/service/accord/AccordTopologyTest.java index a7a015a7c1..8b34803d6a 100644 --- a/test/unit/org/apache/cassandra/service/accord/AccordTopologyTest.java +++ b/test/unit/org/apache/cassandra/service/accord/AccordTopologyTest.java @@ -62,10 +62,10 @@ public class AccordTopologyTest Token maxToken = partitioner.getMaximumToken(); // topology.forKey(new AccordKey.TokenKey(tableId, minToken.minKeyBound())); - topology.forKey(new PartitionKey(tableId, new BufferDecoratedKey(minToken, ByteBufferUtil.bytes(0))).toUnseekable()); + topology.forKey(new PartitionKey("ks", tableId, new BufferDecoratedKey(minToken, ByteBufferUtil.bytes(0))).toUnseekable()); // topology.forKey(new AccordKey.TokenKey(tableId, minToken.maxKeyBound())); // topology.forKey(new AccordKey.TokenKey(tableId, maxToken.minKeyBound())); - topology.forKey(new PartitionKey(tableId, new BufferDecoratedKey(maxToken, ByteBufferUtil.bytes(0))).toUnseekable()); + topology.forKey(new PartitionKey("ks", tableId, new BufferDecoratedKey(maxToken, ByteBufferUtil.bytes(0))).toUnseekable()); // topology.forKey(new AccordKey.TokenKey(tableId, maxToken.maxKeyBound())); } } diff --git a/test/unit/org/apache/cassandra/service/accord/api/AccordKeyTest.java b/test/unit/org/apache/cassandra/service/accord/api/AccordKeyTest.java index f520a6bb6c..64feaedc98 100644 --- a/test/unit/org/apache/cassandra/service/accord/api/AccordKeyTest.java +++ b/test/unit/org/apache/cassandra/service/accord/api/AccordKeyTest.java @@ -59,7 +59,7 @@ public class AccordKeyTest public void partitionKeyTest() { DecoratedKey dk = partitioner(TABLE1).decorateKey(ByteBufferUtil.bytes(5)); - PartitionKey pk = new PartitionKey(TABLE1, dk); + PartitionKey pk = new PartitionKey("ks", TABLE1, dk); SerializerTestUtils.assertSerializerIOEquality(pk, PartitionKey.serializer); } @@ -67,7 +67,7 @@ public class AccordKeyTest public void tokenKeyTest() { DecoratedKey dk = partitioner(TABLE1).decorateKey(ByteBufferUtil.bytes(5)); - TokenKey pk = new TokenKey(TABLE1, dk.getToken()); + TokenKey pk = new TokenKey("", dk.getToken()); SerializerTestUtils.assertSerializerIOEquality(pk, TokenKey.serializer); } @@ -75,12 +75,12 @@ public class AccordKeyTest public void comparisonTest() { DecoratedKey dk = partitioner(TABLE1).decorateKey(ByteBufferUtil.bytes(5)); - PartitionKey pk = new PartitionKey(TABLE1, dk); - TokenKey tk = new TokenKey(TABLE1, dk.getToken()); - TokenKey tkLow = new TokenKey(TABLE1, dk.getToken().decreaseSlightly()); - TokenKey tkHigh = new TokenKey(TABLE1, dk.getToken().increaseSlightly()); + PartitionKey pk = new PartitionKey("", TABLE1, dk); + TokenKey tk = new TokenKey("", dk.getToken()); + TokenKey tkLow = new TokenKey("", dk.getToken().decreaseSlightly()); + TokenKey tkHigh = new TokenKey("", dk.getToken().increaseSlightly()); - Assert.assertTrue(tk.compareTo(pk) == 0); + Assert.assertTrue(tk.compareTo(pk) > 0); Assert.assertTrue(tkLow.compareTo(pk) < 0); Assert.assertTrue(pk.compareTo(tkHigh) < 0); } @@ -91,10 +91,22 @@ public class AccordKeyTest Assert.assertTrue(TABLE1.compareTo(TABLE2) < 0); DecoratedKey dk1 = partitioner(TABLE1).decorateKey(ByteBufferUtil.bytes(5)); - PartitionKey pk1 = new PartitionKey(TABLE1, dk1); + PartitionKey pk1 = new PartitionKey("", TABLE1, dk1); DecoratedKey dk2 = partitioner(TABLE2).decorateKey(ByteBufferUtil.bytes(5)); - PartitionKey pk2 = new PartitionKey(TABLE2, dk2); + PartitionKey pk2 = new PartitionKey("", TABLE2, dk2); + + Assert.assertTrue(pk1.compareTo(pk2) < 0); + } + + @Test + public void keyspaceComparisonTest() + { + DecoratedKey dk1 = partitioner(TABLE1).decorateKey(ByteBufferUtil.bytes(5)); + PartitionKey pk1 = new PartitionKey("a", TABLE1, dk1); + + DecoratedKey dk2 = partitioner(TABLE1).decorateKey(ByteBufferUtil.bytes(5)); + PartitionKey pk2 = new PartitionKey("b", TABLE1, dk2); Assert.assertTrue(pk1.compareTo(pk2) < 0); } @@ -104,16 +116,18 @@ public class AccordKeyTest { Assert.assertTrue(TABLE1.compareTo(TABLE2) < 0); DecoratedKey dk1 = partitioner(TABLE1).decorateKey(ByteBufferUtil.bytes(5)); - PartitionKey pk1 = new PartitionKey(TABLE1, dk1); + PartitionKey pk1 = new PartitionKey("a", TABLE1, dk1); DecoratedKey dk2 = partitioner(TABLE2).decorateKey(ByteBufferUtil.bytes(5)); - PartitionKey pk2 = new PartitionKey(TABLE2, dk2); + PartitionKey pk2 = new PartitionKey("b", TABLE2, dk2); - SentinelKey loSentinel = SentinelKey.min(TABLE1); - SentinelKey hiSentinel = SentinelKey.max(TABLE1); + SentinelKey loSentinel = SentinelKey.min("a"); + SentinelKey hiSentinel = SentinelKey.max("a"); Assert.assertTrue(loSentinel.compareTo(hiSentinel) < 0); Assert.assertTrue(pk1.compareTo(loSentinel) > 0); Assert.assertTrue(loSentinel.compareTo(pk1) < 0); + Assert.assertTrue(pk1.compareTo(hiSentinel) < 0); + Assert.assertTrue(hiSentinel.compareTo(pk1) > 0); Assert.assertTrue(hiSentinel.compareTo(pk2) < 0); } } diff --git a/test/unit/org/apache/cassandra/service/accord/async/AsyncOperationTest.java b/test/unit/org/apache/cassandra/service/accord/async/AsyncOperationTest.java index 30edd5f8af..f12943bbb5 100644 --- a/test/unit/org/apache/cassandra/service/accord/async/AsyncOperationTest.java +++ b/test/unit/org/apache/cassandra/service/accord/async/AsyncOperationTest.java @@ -109,8 +109,8 @@ public class AsyncOperationTest Txn txn = createTxn((int)clock.incrementAndGet()); PartitionKey key = (PartitionKey) Iterables.getOnlyElement(txn.keys()); - commandStore.execute(contextFor(Collections.emptyList(), Keys.of(key)),instance -> { - CommandsForKey cfk = commandStore.maybeCommandsForKey(key); + commandStore.execute(contextFor(Collections.emptyList(), Keys.of(key)), instance -> { + CommandsForKey cfk = instance.maybeCommandsForKey(key); Assert.assertNull(cfk); }).get(); diff --git a/test/unit/org/apache/cassandra/service/accord/serializers/CommandSerializersTest.java b/test/unit/org/apache/cassandra/service/accord/serializers/CommandSerializersTest.java index 3a4435b8c6..34fda94017 100644 --- a/test/unit/org/apache/cassandra/service/accord/serializers/CommandSerializersTest.java +++ b/test/unit/org/apache/cassandra/service/accord/serializers/CommandSerializersTest.java @@ -26,7 +26,6 @@ import accord.primitives.Ranges; import accord.primitives.Txn; import org.apache.cassandra.SchemaLoader; import org.apache.cassandra.schema.KeyspaceParams; -import org.apache.cassandra.schema.TableId; import org.apache.cassandra.service.accord.AccordTestUtils; import org.apache.cassandra.service.accord.TokenRange; import org.apache.cassandra.service.accord.api.PartitionKey; @@ -55,8 +54,8 @@ public class CommandSerializersTest " INSERT INTO ks.tbl (k, c, v) VALUES (0, 0, 1);\n" + " END IF\n" + "COMMIT TRANSACTION"); - TableId tableId = ((PartitionKey) txn.keys().get(0)).tableId(); - PartialTxn expected = txn.slice(Ranges.of(TokenRange.fullRange(tableId)), true); + PartitionKey key = (PartitionKey) txn.keys().get(0); + PartialTxn expected = txn.slice(Ranges.of(TokenRange.fullRange(key.keyspace())), true); SerializerTestUtils.assertSerializerIOEquality(expected, CommandSerializers.partialTxn); } } diff --git a/test/unit/org/apache/cassandra/service/accord/txn/AbstractKeySortedTest.java b/test/unit/org/apache/cassandra/service/accord/txn/AbstractKeySortedTest.java index 001206d858..890c760b0d 100644 --- a/test/unit/org/apache/cassandra/service/accord/txn/AbstractKeySortedTest.java +++ b/test/unit/org/apache/cassandra/service/accord/txn/AbstractKeySortedTest.java @@ -114,7 +114,7 @@ public class AbstractKeySortedTest private static PartitionKey key(int k) { DecoratedKey dk = ByteOrderedPartitioner.instance.decorateKey(ByteBufferUtil.bytes(k)); - return new PartitionKey(TABLE1, dk); + return new PartitionKey("", TABLE1, dk); } private static Item item(int k, int v) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org