This is an automated email from the ASF dual-hosted git repository. bdeggleston pushed a commit to branch cep-15-accord in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit 3c37d9908dbfd64acc428738e1f9fe0a735294a4 Author: Blake Eggleston <bl...@ultrablake.com> AuthorDate: Fri Sep 20 13:12:20 2024 -0700 CEP-15 (C*): Read accord repair cfk keys from sstable index. Patch by Blake Eggleston; Reviewed by David Capwell for CASSANDRA-19920 --- .../db/marshal/AbstractCompositeType.java | 5 + .../dht/LocalCompositePrefixPartitioner.java | 341 ++++++++++++++++++++ .../org/apache/cassandra/dht/LocalPartitioner.java | 2 +- .../dht/ReversedLongLocalPartitioner.java | 4 +- .../apache/cassandra/io/sstable/KeyIterator.java | 35 ++- .../cassandra/io/sstable/format/SSTableReader.java | 4 +- .../io/sstable/format/big/BigTableReader.java | 26 +- .../io/sstable/format/bti/BtiTableReader.java | 30 +- .../apache/cassandra/service/StorageService.java | 4 +- .../cassandra/service/StorageServiceMBean.java | 2 +- .../cassandra/service/accord/AccordKeyspace.java | 350 ++++++++------------- .../service/accord/repair/AccordRepair.java | 3 +- .../tools/nodetool/ConsensusMigrationAdmin.java | 2 +- .../org/apache/cassandra/utils/MergeIterator.java | 17 + .../io/sstable/format/ForwardingSSTableReader.java | 6 + .../cassandra/db/marshal/AbstractTypeTest.java | 9 + .../org/apache/cassandra/dht/IPartitionerTest.java | 57 ++++ .../dht/LocalCompositePrefixPartitionerTest.java | 115 +++++++ .../service/accord/AccordKeyspaceTest.java | 24 +- .../apache/cassandra/utils/AccordGenerators.java | 2 +- .../cassandra/utils/CassandraGenerators.java | 81 ++++- .../cassandra/utils/CassandraGeneratorsTest.java | 44 +++ 22 files changed, 884 insertions(+), 279 deletions(-) diff --git a/src/java/org/apache/cassandra/db/marshal/AbstractCompositeType.java b/src/java/org/apache/cassandra/db/marshal/AbstractCompositeType.java index dfa7496ea6..44e2158078 100644 --- a/src/java/org/apache/cassandra/db/marshal/AbstractCompositeType.java +++ b/src/java/org/apache/cassandra/db/marshal/AbstractCompositeType.java @@ -88,6 +88,11 @@ public abstract class AbstractCompositeType extends AbstractType<ByteBuffer> ++i; } + return compareCustomRemainder(left, accessorL, offsetL, right, accessorR, offsetR); + } + + protected <VL, VR> int compareCustomRemainder(VL left, ValueAccessor<VL> accessorL, int offsetL, VR right, ValueAccessor<VR> accessorR, int offsetR) + { if (accessorL.isEmptyFromOffset(left, offsetL)) return accessorR.sizeFromOffset(right, offsetR) == 0 ? 0 : -1; diff --git a/src/java/org/apache/cassandra/dht/LocalCompositePrefixPartitioner.java b/src/java/org/apache/cassandra/dht/LocalCompositePrefixPartitioner.java new file mode 100644 index 0000000000..c0a056e043 --- /dev/null +++ b/src/java/org/apache/cassandra/dht/LocalCompositePrefixPartitioner.java @@ -0,0 +1,341 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.dht; + +import accord.utils.Invariants; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; +import org.apache.cassandra.db.*; +import org.apache.cassandra.db.filter.ClusteringIndexSliceFilter; +import org.apache.cassandra.db.filter.ColumnFilter; +import org.apache.cassandra.db.lifecycle.View; +import org.apache.cassandra.db.marshal.AbstractType; +import org.apache.cassandra.db.marshal.ByteBufferAccessor; +import org.apache.cassandra.db.marshal.CompositeType; +import org.apache.cassandra.db.marshal.ValueAccessor; +import org.apache.cassandra.db.memtable.Memtable; +import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator; +import org.apache.cassandra.io.sstable.SSTableReadsListener; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.schema.TableMetadata; +import org.apache.cassandra.utils.AbstractIterator; +import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.utils.CloseableIterator; +import org.apache.cassandra.utils.MergeIterator; +import org.apache.cassandra.utils.bytecomparable.ByteComparable; +import org.apache.cassandra.utils.bytecomparable.ByteSource; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + +/** + * Local partitioner that supports doing range scans of composite primary keys using composite prefixes using the iterator + * methods it provides. This is neccessary for correctly handling exclusive start and inclusive end prefixes, since + * these won't work as intended given normal byte/component comparisons + */ +public class LocalCompositePrefixPartitioner extends LocalPartitioner +{ + /** + * Composite type that only compares + */ + private static class PrefixCompositeType extends CompositeType + { + public PrefixCompositeType(List<AbstractType<?>> types) + { + super(types); + } + + @Override + protected <VL, VR> int compareCustomRemainder(VL left, ValueAccessor<VL> accessorL, int offsetL, VR right, ValueAccessor<VR> accessorR, int offsetR) + { + return 0; + } + } + + public abstract class AbstractCompositePrefixToken extends LocalToken + { + public AbstractCompositePrefixToken(ByteBuffer token) + { + super(token); + } + + @Override + public int compareTo(Token o) + { + Invariants.checkArgument(o instanceof AbstractCompositePrefixToken); + AbstractCompositePrefixToken that = (AbstractCompositePrefixToken) o; + CompositeType comparator = comparatorForPrefixLength(Math.min(this.prefixSize(), that.prefixSize())); + return comparator.compare(this.token, that.token); + } + + @Override + public int hashCode() + { + throw new UnsupportedOperationException(); + } + + @Override + public boolean equals(Object obj) + { + if (!(obj instanceof AbstractCompositePrefixToken)) + return false; + return compareTo((AbstractCompositePrefixToken) obj) == 0; + } + + @Override + public ByteSource asComparableBytes(ByteComparable.Version version) + { + return comparatorForPrefixLength(prefixSize()).asComparableBytes(ByteBufferAccessor.instance, token, version); + } + + ByteBuffer token() + { + return token; + } + + abstract int prefixSize(); + } + + public class FullToken extends AbstractCompositePrefixToken + { + + public FullToken(ByteBuffer token) + { + super(token); + } + + @Override + int prefixSize() + { + return prefixComparators.size(); + } + } + + public class PrefixToken extends AbstractCompositePrefixToken + { + final int prefixSize; + public PrefixToken(ByteBuffer token, int prefixSize) + { + super(token); + Invariants.checkArgument(prefixSize > 0); + this.prefixSize = prefixSize; + } + + @Override + int prefixSize() + { + return prefixSize; + } + } + + private final Token.TokenFactory tokenFactory = new Token.TokenFactory() + { + @Override + public Token fromComparableBytes(ByteSource.Peekable comparableBytes, ByteComparable.Version version) + { + ByteBuffer tokenData = comparator.fromComparableBytes(ByteBufferAccessor.instance, comparableBytes, version); + return new FullToken(tokenData); + } + + @Override + public ByteBuffer toByteArray(Token token) + { + return ((FullToken)token).token(); + } + + @Override + public Token fromByteArray(ByteBuffer bytes) + { + return new FullToken(bytes); + } + + @Override + public String toString(Token token) + { + return comparator.getString(((FullToken)token).token()); + } + + @Override + public void validate(String token) + { + comparator.validate(comparator.fromString(token)); + } + + @Override + public Token fromString(String string) + { + return new FullToken(comparator.fromString(string)); + } + }; + + private final List<CompositeType> prefixComparators; + + public LocalCompositePrefixPartitioner(CompositeType comparator) + { + super(comparator); + ArrayList<CompositeType> comparators = new ArrayList<>(comparator.subTypes().size()); + comparators.add(comparator); + + List<AbstractType<?>> subtypes = comparator.subTypes(); + subtypes = subtypes.subList(0, subtypes.size() - 1); + while (!subtypes.isEmpty()) + { + comparators.add(new PrefixCompositeType(subtypes)); + subtypes = subtypes.subList(0, subtypes.size() - 1); + } + + prefixComparators = ImmutableList.copyOf(Lists.reverse(comparators)); + } + + + @SuppressWarnings("rawtypes") + public LocalCompositePrefixPartitioner(AbstractType... types) + { + this(CompositeType.getInstance(types)); + } + + private CompositeType comparatorForPrefixLength(int size) + { + return prefixComparators.get(size - 1); + } + + public ByteBuffer createPrefixKey(Object... values) + { + return comparatorForPrefixLength(values.length).decompose(values); + } + + public AbstractCompositePrefixToken createPrefixToken(Object... values) + { + ByteBuffer key = createPrefixKey(values); + return values.length == prefixComparators.size() ? new FullToken(key) : new PrefixToken(key, values.length); + } + + public DecoratedKey decoratedKey(Object... values) + { + Invariants.checkArgument(values.length == prefixComparators.size()); + ByteBuffer key = createPrefixKey(values); + return decorateKey(key); + } + + + @Override + public LocalToken getToken(ByteBuffer key) + { + return new FullToken(key); + } + + @Override + public LocalToken getMinimumToken() + { + return new FullToken(ByteBufferUtil.EMPTY_BYTE_BUFFER); + } + + @Override + public Token.TokenFactory getTokenFactory() + { + return tokenFactory; + } + + + /** + * Returns a DecoratedKey iterator for the given range. Skips reading data files for sstable formats with a partition index file + */ + private static CloseableIterator<DecoratedKey> keyIterator(Memtable memtable, AbstractBounds<PartitionPosition> range) + { + + AbstractBounds<PartitionPosition> memtableRange = range.withNewRight(memtable.metadata().partitioner.getMinimumToken().maxKeyBound()); + DataRange dataRange = new DataRange(memtableRange, new ClusteringIndexSliceFilter(Slices.ALL, false)); + UnfilteredPartitionIterator iter = memtable.partitionIterator(ColumnFilter.NONE, dataRange, SSTableReadsListener.NOOP_LISTENER); + + int rangeStartCmpMin = range.isStartInclusive() ? 0 : 1; + int rangeEndCmpMax = range.isEndInclusive() ? 0 : -1; + + return new AbstractIterator<>() + { + @Override + protected DecoratedKey computeNext() + { + while (iter.hasNext()) + { + DecoratedKey key = iter.next().partitionKey(); + if (key.compareTo(range.left) < rangeStartCmpMin) + continue; + + if (key.compareTo(range.right) > rangeEndCmpMax) + break; + + return key; + } + return endOfData(); + } + + @Override + public void close() + { + iter.close(); + } + }; + } + + public static CloseableIterator<DecoratedKey> keyIterator(TableMetadata metadata, AbstractBounds<PartitionPosition> range) throws IOException + { + ColumnFamilyStore cfs = Keyspace.openAndGetStore(metadata); + ColumnFamilyStore.ViewFragment view = cfs.select(View.selectLive(range)); + + List<CloseableIterator<?>> closeableIterators = new ArrayList<>(); + List<Iterator<DecoratedKey>> iterators = new ArrayList<>(); + + try + { + for (Memtable memtable : view.memtables) + { + CloseableIterator<DecoratedKey> iter = keyIterator(memtable, range); + iterators.add(iter); + closeableIterators.add(iter); + } + + for (SSTableReader sstable : view.sstables) + { + CloseableIterator<DecoratedKey> iter = sstable.keyIterator(range); + iterators.add(iter); + closeableIterators.add(iter); + } + } + catch (Throwable e) + { + for (CloseableIterator<?> iter: closeableIterators) + { + try + { + iter.close(); + } + catch (Throwable e2) + { + e.addSuppressed(e2); + } + } + throw e; + } + + return MergeIterator.get(iterators, DecoratedKey::compareTo, new MergeIterator.Reducer.Trivial<>()); + } +} diff --git a/src/java/org/apache/cassandra/dht/LocalPartitioner.java b/src/java/org/apache/cassandra/dht/LocalPartitioner.java index c2886fd539..0a1ede356b 100644 --- a/src/java/org/apache/cassandra/dht/LocalPartitioner.java +++ b/src/java/org/apache/cassandra/dht/LocalPartitioner.java @@ -40,7 +40,7 @@ public class LocalPartitioner implements IPartitioner { private static final long EMPTY_SIZE = ObjectSizes.measure(new LocalPartitioner(null).new LocalToken()); - final AbstractType<?> comparator; // package-private to avoid access workarounds in embedded LocalToken. + protected final AbstractType<?> comparator; public LocalPartitioner(AbstractType<?> comparator) { diff --git a/src/java/org/apache/cassandra/dht/ReversedLongLocalPartitioner.java b/src/java/org/apache/cassandra/dht/ReversedLongLocalPartitioner.java index abee8868e7..f95f1e7764 100644 --- a/src/java/org/apache/cassandra/dht/ReversedLongLocalPartitioner.java +++ b/src/java/org/apache/cassandra/dht/ReversedLongLocalPartitioner.java @@ -25,6 +25,7 @@ import java.util.Map; import java.util.Random; import java.util.function.Function; +import com.google.common.annotations.VisibleForTesting; import com.google.common.primitives.Longs; import accord.primitives.Ranges; @@ -163,7 +164,8 @@ public class ReversedLongLocalPartitioner implements IPartitioner throw new UnsupportedOperationException("Accord is not supported by " + getClass().getName()); } - private static class ReversedLongLocalToken extends Token + @VisibleForTesting + public static class ReversedLongLocalToken extends Token { private final long token; diff --git a/src/java/org/apache/cassandra/io/sstable/KeyIterator.java b/src/java/org/apache/cassandra/io/sstable/KeyIterator.java index dbe501f36e..c8c1709503 100644 --- a/src/java/org/apache/cassandra/io/sstable/KeyIterator.java +++ b/src/java/org/apache/cassandra/io/sstable/KeyIterator.java @@ -21,12 +21,15 @@ import java.io.IOException; import java.util.concurrent.locks.ReadWriteLock; import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.db.PartitionPosition; +import org.apache.cassandra.dht.AbstractBounds; import org.apache.cassandra.dht.IPartitioner; import org.apache.cassandra.utils.AbstractIterator; import org.apache.cassandra.utils.CloseableIterator; public class KeyIterator extends AbstractIterator<DecoratedKey> implements CloseableIterator<DecoratedKey> { + private final AbstractBounds<PartitionPosition> bounds; private final IPartitioner partitioner; private final KeyReader it; private final ReadWriteLock fileAccessLock; @@ -34,8 +37,9 @@ public class KeyIterator extends AbstractIterator<DecoratedKey> implements Close private boolean initialized = false; - public KeyIterator(KeyReader it, IPartitioner partitioner, long totalBytes, ReadWriteLock fileAccessLock) + public KeyIterator(AbstractBounds<PartitionPosition> bounds, KeyReader it, IPartitioner partitioner, long totalBytes, ReadWriteLock fileAccessLock) { + this.bounds = bounds; this.it = it; this.partitioner = partitioner; this.totalBytes = totalBytes; @@ -48,19 +52,26 @@ public class KeyIterator extends AbstractIterator<DecoratedKey> implements Close fileAccessLock.readLock().lock(); try { - if (!initialized) + while (true) { - initialized = true; - return it.isExhausted() - ? endOfData() - : partitioner.decorateKey(it.key()); - } - else - { - return it.advance() - ? partitioner.decorateKey(it.key()) - : endOfData(); + if (!initialized) + { + initialized = true; + if (it.isExhausted()) + break; + } + else if (!it.advance()) + break; + + DecoratedKey key = partitioner.decorateKey(it.key()); + if (bounds == null || bounds.contains(key)) + return key; + + if (key.compareTo(bounds.right) >= 0) + break; } + + return endOfData(); } catch (IOException e) { diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java index 716d9a8157..cfbe73d810 100644 --- a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java +++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java @@ -825,9 +825,11 @@ public abstract class SSTableReader extends SSTable implements UnfilteredSource, */ public KeyIterator keyIterator() throws IOException { - return new KeyIterator(keyReader(), getPartitioner(), uncompressedLength(), new ReentrantReadWriteLock()); + return new KeyIterator(null, keyReader(), getPartitioner(), uncompressedLength(), new ReentrantReadWriteLock()); } + public abstract KeyIterator keyIterator(AbstractBounds<PartitionPosition> range) throws IOException; + /** * Finds and returns the first key beyond a given token in this SSTable or null if no such key exists. */ diff --git a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java index b58dbc532e..b6038237bd 100644 --- a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java +++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java @@ -26,11 +26,14 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.concurrent.locks.ReentrantReadWriteLock; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; +import org.apache.cassandra.io.sstable.*; +import org.apache.cassandra.utils.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -49,16 +52,6 @@ import org.apache.cassandra.db.rows.UnfilteredRowIterators; import org.apache.cassandra.dht.AbstractBounds; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; -import org.apache.cassandra.io.sstable.AbstractRowIndexEntry; -import org.apache.cassandra.io.sstable.CorruptSSTableException; -import org.apache.cassandra.io.sstable.Descriptor; -import org.apache.cassandra.io.sstable.Downsampling; -import org.apache.cassandra.io.sstable.ISSTableScanner; -import org.apache.cassandra.io.sstable.IVerifier; -import org.apache.cassandra.io.sstable.IndexInfo; -import org.apache.cassandra.io.sstable.KeyReader; -import org.apache.cassandra.io.sstable.SSTable; -import org.apache.cassandra.io.sstable.SSTableReadsListener; import org.apache.cassandra.io.sstable.SSTableReadsListener.SelectionReason; import org.apache.cassandra.io.sstable.SSTableReadsListener.SkippingReason; import org.apache.cassandra.io.sstable.format.SSTableReader; @@ -74,9 +67,6 @@ import org.apache.cassandra.io.util.FileDataInput; import org.apache.cassandra.io.util.FileHandle; import org.apache.cassandra.io.util.FileUtils; import org.apache.cassandra.io.util.RandomAccessReader; -import org.apache.cassandra.utils.ByteBufferUtil; -import org.apache.cassandra.utils.IFilter; -import org.apache.cassandra.utils.OutputHandler; import static org.apache.cassandra.utils.concurrent.SharedCloseable.sharedCopyOrNull; @@ -157,6 +147,16 @@ public class BigTableReader extends SSTableReaderWithFilter implements IndexSumm return BigTableKeyReader.create(ifile, rowIndexEntrySerializer); } + @Override + public KeyIterator keyIterator(AbstractBounds<PartitionPosition> range) throws IOException + { + + RandomAccessReader ifileReader = ifile.createReader(); + ifileReader.seek(getIndexScanPosition(range.left)); + BigTableKeyReader keyReader = BigTableKeyReader.create(ifileReader, rowIndexEntrySerializer); + return new KeyIterator(range, keyReader, getPartitioner(), uncompressedLength(), new ReentrantReadWriteLock()); + } + /** * Direct I/O SSTableScanner over an iterator of bounds. * diff --git a/src/java/org/apache/cassandra/io/sstable/format/bti/BtiTableReader.java b/src/java/org/apache/cassandra/io/sstable/format/bti/BtiTableReader.java index c5571e7fbb..2ff3c2bc89 100644 --- a/src/java/org/apache/cassandra/io/sstable/format/bti/BtiTableReader.java +++ b/src/java/org/apache/cassandra/io/sstable/format/bti/BtiTableReader.java @@ -24,6 +24,7 @@ import java.util.Collection; import java.util.Collections; import java.util.Iterator; import java.util.List; +import java.util.concurrent.locks.ReentrantReadWriteLock; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; @@ -43,21 +44,14 @@ import org.apache.cassandra.db.rows.UnfilteredRowIterators; import org.apache.cassandra.dht.AbstractBounds; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; -import org.apache.cassandra.io.sstable.CorruptSSTableException; -import org.apache.cassandra.io.sstable.Descriptor; -import org.apache.cassandra.io.sstable.ISSTableScanner; -import org.apache.cassandra.io.sstable.IVerifier; -import org.apache.cassandra.io.sstable.SSTable; -import org.apache.cassandra.io.sstable.SSTableReadsListener; +import org.apache.cassandra.io.sstable.*; import org.apache.cassandra.io.sstable.SSTableReadsListener.SelectionReason; import org.apache.cassandra.io.sstable.SSTableReadsListener.SkippingReason; import org.apache.cassandra.io.sstable.format.SSTableReaderWithFilter; import org.apache.cassandra.io.util.FileDataInput; import org.apache.cassandra.io.util.FileHandle; import org.apache.cassandra.io.util.RandomAccessReader; -import org.apache.cassandra.utils.ByteBufferUtil; -import org.apache.cassandra.utils.IFilter; -import org.apache.cassandra.utils.OutputHandler; +import org.apache.cassandra.utils.*; import static org.apache.cassandra.io.sstable.format.SSTableReader.Operator.EQ; import static org.apache.cassandra.io.sstable.format.SSTableReader.Operator.GE; @@ -487,6 +481,24 @@ public class BtiTableReader extends SSTableReaderWithFilter return BtiTableScanner.getScanner(this, columnFilter, dataRange, listener); } + @Override + public KeyIterator keyIterator(AbstractBounds<PartitionPosition> range) + { + PartitionIterator iter; + try + { + iter = PartitionIterator.create(partitionIndex, metadata().partitioner, rowIndexFile, dfile, + range.left, bounds.inclusiveLeft() ? -1 : 0, + null, 0, descriptor.version); + } + catch (IOException e) + { + throw new RuntimeException(e); + } + + return new KeyIterator(range, iter, metadata().partitioner, uncompressedLength(), new ReentrantReadWriteLock()); + } + @Override public IVerifier getVerifier(ColumnFamilyStore cfs, OutputHandler outputHandler, boolean isOffline, IVerifier.Options options) { diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index e8edb4d34e..97bab8ec5d 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -1678,10 +1678,10 @@ public class StorageService extends NotificationBroadcasterSupport implements IE public Integer finishConsensusMigration(@Nonnull String keyspace, @Nullable List<String> maybeTableNames, @Nullable String maybeRangesStr, - @Nonnull ConsensusMigrationTarget target) + @Nonnull String target) { checkArgument(!keyspace.equals(SchemaConstants.METADATA_KEYSPACE_NAME)); - return finishMigrationToConsensusProtocol(keyspace, Optional.ofNullable(maybeTableNames), Optional.ofNullable(maybeRangesStr), target); + return finishMigrationToConsensusProtocol(keyspace, Optional.ofNullable(maybeTableNames), Optional.ofNullable(maybeRangesStr), ConsensusMigrationTarget.valueOf(target)); } @Override diff --git a/src/java/org/apache/cassandra/service/StorageServiceMBean.java b/src/java/org/apache/cassandra/service/StorageServiceMBean.java index 8845a9850f..46d5de2a9e 100644 --- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java +++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java @@ -1083,7 +1083,7 @@ public interface StorageServiceMBean extends NotificationEmitter Integer finishConsensusMigration(@Nonnull String keyspace, @Nullable List<String> maybeTableNames, @Nullable String maybeRangesStr, - @Nonnull ConsensusMigrationTarget target); + @Nonnull String target); String listConsensusMigrations(@Nullable Set<String> keyspaceNames, @Nullable Set<String> tableNames, @Nonnull String format); diff --git a/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java b/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java index 8046b9d388..b32d50da86 100644 --- a/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java +++ b/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java @@ -21,7 +21,6 @@ package org.apache.cassandra.service.accord; import java.io.IOException; import java.io.UncheckedIOException; import java.nio.ByteBuffer; -import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; @@ -30,7 +29,6 @@ import java.util.Objects; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.Executor; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; @@ -43,12 +41,10 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; -import com.google.common.collect.Sets; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import accord.api.Key; -import accord.local.cfk.CommandsForKey; import accord.impl.TimestampsForKey; import accord.local.Command; import accord.local.CommandStore; @@ -58,17 +54,15 @@ import accord.local.RedundantBefore; import accord.local.SaveStatus; import accord.local.Status; import accord.local.Status.Durability; +import accord.local.cfk.CommandsForKey; import accord.primitives.Ranges; -import accord.primitives.Routable; import accord.primitives.Route; import accord.primitives.Timestamp; -import accord.primitives.Txn; import accord.primitives.TxnId; import accord.topology.Topology; import accord.utils.Invariants; import accord.utils.ReducingRangeMap; import accord.utils.async.Observable; -import org.apache.cassandra.concurrent.DebuggableTask; import org.apache.cassandra.concurrent.Stage; import org.apache.cassandra.cql3.ColumnIdentifier; import org.apache.cassandra.cql3.QueryOptions; @@ -86,9 +80,11 @@ import org.apache.cassandra.db.IMutation; import org.apache.cassandra.db.Keyspace; import org.apache.cassandra.db.LivenessInfo; import org.apache.cassandra.db.Mutation; +import org.apache.cassandra.db.PartitionPosition; import org.apache.cassandra.db.ReadExecutionController; import org.apache.cassandra.db.RegularAndStaticColumns; import org.apache.cassandra.db.SinglePartitionReadCommand; +import org.apache.cassandra.db.WriteContext; import org.apache.cassandra.db.filter.ClusteringIndexFilter; import org.apache.cassandra.db.filter.ClusteringIndexNamesFilter; import org.apache.cassandra.db.filter.ColumnFilter; @@ -111,10 +107,16 @@ import org.apache.cassandra.db.rows.Row; import org.apache.cassandra.db.rows.Row.Deletion; import org.apache.cassandra.db.rows.RowIterator; import org.apache.cassandra.db.transform.FilteredPartitions; +import org.apache.cassandra.dht.AbstractBounds; +import org.apache.cassandra.dht.Bounds; import org.apache.cassandra.dht.ByteOrderedPartitioner; +import org.apache.cassandra.dht.ExcludingBounds; import org.apache.cassandra.dht.IPartitioner; +import org.apache.cassandra.dht.IncludingExcludingBounds; +import org.apache.cassandra.dht.LocalCompositePrefixPartitioner; import org.apache.cassandra.dht.LocalPartitioner; import org.apache.cassandra.dht.Murmur3Partitioner; +import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; import org.apache.cassandra.index.accord.RouteIndex; import org.apache.cassandra.io.IVersionedSerializer; @@ -148,9 +150,11 @@ import org.apache.cassandra.service.accord.serializers.KeySerializers; import org.apache.cassandra.service.accord.serializers.TopologySerializers; import org.apache.cassandra.transport.Dispatcher; import org.apache.cassandra.utils.Clock.Global; +import org.apache.cassandra.utils.CloseableIterator; import org.apache.cassandra.utils.btree.BTree; import org.apache.cassandra.utils.btree.BTreeSet; import org.apache.cassandra.utils.bytecomparable.ByteComparable; +import org.apache.cassandra.utils.concurrent.OpOrder; import static accord.utils.Invariants.checkArgument; import static accord.utils.Invariants.checkState; @@ -188,15 +192,23 @@ public class AccordKeyspace private static final TupleType KEY_TYPE = new TupleType(Arrays.asList(UUIDType.instance, BytesType.instance)); private static final String KEY_TUPLE = KEY_TYPE.asCQL3Type().toString(); - // shared LocalPartitioner for all *_for_key Accord tables with (store_id, key_token, key) partition key - private static final LocalPartitioner FOR_KEYS_LOCAL_PARTITIONER = - new LocalPartitioner(CompositeType.getInstance(Int32Type.instance, BytesType.instance, KEY_TYPE)); - private static final ClusteringIndexFilter FULL_PARTITION = new ClusteringIndexNamesFilter(BTreeSet.of(new ClusteringComparator(), Clustering.EMPTY), false); //TODO (now, performance): should this be partitioner rather than TableId? As of this patch distributed tables should only have 1 partitioner... private static final ConcurrentMap<TableId, AccordRoutingKeyByteSource.Serializer> TABLE_SERIALIZERS = new ConcurrentHashMap<>(); + private static AccordRoutingKeyByteSource.Serializer getRoutingKeySerializer(AccordRoutingKey key) + { + return TABLE_SERIALIZERS.computeIfAbsent(key.table(), ignore -> { + IPartitioner partitioner; + if (key.kindOfRoutingKey() == AccordRoutingKey.RoutingKeyKind.TOKEN) + partitioner = key.asTokenKey().token().getPartitioner(); + else + partitioner = SchemaHolder.schema.getTablePartitioner(key.table()); + return AccordRoutingKeyByteSource.variableLength(partitioner); + }); + } + // Schema needs all system keyspace, and this is a system keyspace! So can not touch schema in init private static class SchemaHolder { @@ -475,14 +487,13 @@ public class AccordKeyspace + format("last_write_timestamp %s, ", TIMESTAMP_TUPLE) + "PRIMARY KEY((store_id, key_token, key))" + ')') - .partitioner(FOR_KEYS_LOCAL_PARTITIONER) + .partitioner(new LocalPartitioner(CompositeType.getInstance(Int32Type.instance, BytesType.instance, KEY_TYPE))) .build(); public static class TimestampsForKeyColumns { static final ClusteringComparator keyComparator = TimestampsForKeys.partitionKeyAsClusteringComparator(); static final CompositeType partitionKeyType = (CompositeType) TimestampsForKeys.partitionKeyType; - static final ColumnFilter allColumns = ColumnFilter.all(TimestampsForKeys); static final ColumnMetadata store_id = getColumn(TimestampsForKeys, "store_id"); static final ColumnMetadata key_token = getColumn(TimestampsForKeys, "key_token"); static final ColumnMetadata key = getColumn(TimestampsForKeys, "key"); @@ -579,22 +590,24 @@ public class AccordKeyspace } } + private static final LocalCompositePrefixPartitioner CFKPartitioner = new LocalCompositePrefixPartitioner(Int32Type.instance, UUIDType.instance, BytesType.instance, BytesType.instance); private static final TableMetadata CommandsForKeys = commandsForKeysTable(COMMANDS_FOR_KEY); private static TableMetadata commandsForKeysTable(String tableName) { return parse(tableName, - "accord commands per key", - "CREATE TABLE %s (" - + "store_id int, " - + "key_token blob, " // can't use "token" as this is restricted word in CQL - + format("key %s, ", KEY_TUPLE) - + "data blob, " - + "PRIMARY KEY((store_id, key_token, key))" - + ')' - + " WITH compression = {'class':'NoopCompressor'};") - .partitioner(FOR_KEYS_LOCAL_PARTITIONER) - .build(); + "accord commands per key", + "CREATE TABLE %s (" + + "store_id int, " + + "table_id uuid, " + + "key_token blob, " // can't use "token" as this is restricted word in CQL + + "key blob, " + + "data blob, " + + "PRIMARY KEY((store_id, table_id, key_token, key))" + + ')' + + " WITH compression = {'class':'NoopCompressor'};") + .partitioner(CFKPartitioner) + .build(); } public static class CommandsForKeyAccessor @@ -604,6 +617,7 @@ public class AccordKeyspace final CompositeType partitionKeyType; final ColumnFilter allColumns; final ColumnMetadata store_id; + final ColumnMetadata table_id; final ColumnMetadata key_token; final ColumnMetadata key; final ColumnMetadata data; @@ -617,6 +631,7 @@ public class AccordKeyspace this.partitionKeyType = (CompositeType) table.partitionKeyType; this.allColumns = ColumnFilter.all(table); this.store_id = getColumn(table, "store_id"); + this.table_id = getColumn(table, "table_id"); this.key_token = getColumn(table, "key_token"); this.key = getColumn(table, "key"); this.data = getColumn(table, "data"); @@ -633,6 +648,11 @@ public class AccordKeyspace return Int32Type.instance.compose(partitionKeyComponents[store_id.position()]); } + public TableId getTableId(ByteBuffer[] partitionKeyComponents) + { + return TableId.fromUUID(UUIDType.instance.compose(partitionKeyComponents[table_id.position()])); + } + public PartitionKey getKey(DecoratedKey key) { return getKey(splitPartitionKey(key)); @@ -640,7 +660,12 @@ public class AccordKeyspace public PartitionKey getKey(ByteBuffer[] partitionKeyComponents) { - return deserializeKey(partitionKeyComponents[key.position()]); + TableId tableId = TableId.fromUUID(UUIDSerializer.instance.deserialize(partitionKeyComponents[table_id.position()])); + ByteBuffer keyBytes = partitionKeyComponents[key.position()]; + IPartitioner partitioner = SchemaHolder.schema.getTablePartitioner(tableId); + if (partitioner == null) + throw new IllegalStateException("Table with id " + tableId + " could not be found; was it deleted?"); + return new PartitionKey(tableId, partitioner.decorateKey(keyBytes)); } public CommandsForKey getCommandsForKey(PartitionKey key, Row row) @@ -652,6 +677,13 @@ public class AccordKeyspace return CommandsForKeySerializer.fromBytes(key, cell.buffer()); } + @VisibleForTesting + public ByteBuffer serializeKeyNoTable(AccordRoutingKey key) + { + byte[] bytes = getRoutingKeySerializer(key).serializeNoTable(key); + return ByteBuffer.wrap(bytes); + } + // TODO (expected): garbage-free filtering, reusing encoding public Row withoutRedundantCommands(PartitionKey key, Row row, RedundantBefore.Entry redundantBefore) { @@ -674,6 +706,19 @@ public class AccordKeyspace ByteBuffer buffer = CommandsForKeySerializer.toBytesWithoutKey(updated); return BTreeRow.singleCellRow(Clustering.EMPTY, BufferCell.live(data, cell.timestamp(), buffer)); } + + public LocalCompositePrefixPartitioner.AbstractCompositePrefixToken getPrefixToken(int commandStore, AccordRoutingKey key) + { + if (key.kindOfRoutingKey() == AccordRoutingKey.RoutingKeyKind.TOKEN) + { + ByteBuffer tokenBytes = ByteBuffer.wrap(getRoutingKeySerializer(key).serializeNoTable(key)); + return CFKPartitioner.createPrefixToken(commandStore, key.table().asUUID(), tokenBytes); + } + else + { + return CFKPartitioner.createPrefixToken(commandStore, key.table().asUUID()); + } + } } public static final CommandsForKeyAccessor CommandsForKeysAccessor = new CommandsForKeyAccessor(CommandsForKeys); @@ -940,201 +985,70 @@ public class AccordKeyspace txnId.msb, txnId.lsb, txnId.node.id); } - private static abstract class TableWalk implements Runnable, DebuggableTask + /** + * Calculates token bounds based on key prefixes. + */ + public static void findAllKeysBetween(int commandStore, + AccordRoutingKey start, boolean startInclusive, + AccordRoutingKey end, boolean endInclusive, + Observable<PartitionKey> callback) { - private final long creationTimeNanos = Global.nanoTime(); - private final Executor executor; - private final Observable<UntypedResultSet.Row> callback; - private long startTimeNanos = -1; - private int numQueries = 0; - private UntypedResultSet.Row lastSeen = null; - private TableWalk(Executor executor, Observable<UntypedResultSet.Row> callback) - { - this.executor = executor; - this.callback = callback; - } + Token startToken = CommandsForKeysAccessor.getPrefixToken(commandStore, start); + Token endToken = CommandsForKeysAccessor.getPrefixToken(commandStore, end); - protected abstract UntypedResultSet query(UntypedResultSet.Row lastSeen); + if (start instanceof AccordRoutingKey.SentinelKey) + startInclusive = true; + if (end instanceof AccordRoutingKey.SentinelKey) + endInclusive = true; - public final void schedule() - { - executor.execute(this); - } + PartitionPosition startPosition = startInclusive ? startToken.minKeyBound() : startToken.maxKeyBound(); + PartitionPosition endPosition = endInclusive ? endToken.maxKeyBound() : endToken.minKeyBound(); + AbstractBounds<PartitionPosition> bounds; + if (startInclusive && endInclusive) + bounds = new Bounds<>(startPosition, endPosition); + else if (endInclusive) + bounds = new Range<>(startPosition, endPosition); + else if (startInclusive) + bounds = new IncludingExcludingBounds<>(startPosition, endPosition); + else + bounds = new ExcludingBounds<>(startPosition, endPosition); - @Override - public final void run() - { - try + Stage.READ.executor().submit(() -> { + ColumnFamilyStore baseCfs = Keyspace.openAndGetStore(CommandsForKeys); + try (OpOrder.Group baseOp = baseCfs.readOrdering.start(); + WriteContext writeContext = baseCfs.keyspace.getWriteHandler().createContextForRead(); + CloseableIterator<DecoratedKey> iter = LocalCompositePrefixPartitioner.keyIterator(CommandsForKeys, bounds)) { - if (startTimeNanos == -1) - startTimeNanos = Global.nanoTime(); - numQueries++; - UntypedResultSet result = query(lastSeen); - if (result.isEmpty()) + // Need the second try to handle callback errors vs read errors. + // Callback will see the read errors, but if the callback fails the outer try will see those errors + try { + while (iter.hasNext()) + { + PartitionKey pk = CommandsForKeysAccessor.getKey(iter.next()); + callback.onNext(pk); + } callback.onCompleted(); - return; } - UntypedResultSet.Row lastRow = null; - for (UntypedResultSet.Row row : result) + catch (Exception e) { - callback.onNext(row); - lastRow = row; + callback.onError(e); } - lastSeen = lastRow; - schedule(); } - catch (Throwable t) - { - callback.onError(t); - } - } - - @Override - public long creationTimeNanos() - { - return creationTimeNanos; - } - - @Override - public long startTimeNanos() - { - return startTimeNanos; - } - - @Override - public String description() - { - return format("Table Walker for %s; queries = %d", getClass().getSimpleName(), numQueries); - } - } - - private static String selection(TableMetadata metadata, Set<String> requiredColumns, Set<String> forIteration) - { - StringBuilder selection = new StringBuilder(); - if (requiredColumns.isEmpty()) - selection.append("*"); - else - { - Sets.SetView<String> other = Sets.difference(requiredColumns, forIteration); - for (String name : other) - { - ColumnMetadata meta = metadata.getColumn(new ColumnIdentifier(name, true)); - if (meta == null) - throw new IllegalArgumentException("Unknown column: " + name); - } - List<String> names = new ArrayList<>(forIteration.size() + other.size()); - names.addAll(forIteration); - names.addAll(other); - // this sort is to make sure the CQL is determanistic - Collections.sort(names); - for (int i = 0; i < names.size(); i++) - { - if (i > 0) - selection.append(", "); - selection.append(names.get(i)); - } - } - return selection.toString(); - } - - private static class WalkCommandsForDomain extends TableWalk - { - private static final Set<String> COLUMNS_FOR_ITERATION = ImmutableSet.of("txn_id", "store_id", "domain"); - private final String cql; - private final int storeId, domain; - - private WalkCommandsForDomain(int commandStore, Routable.Domain domain, Set<String> requiredColumns, Executor executor, Observable<UntypedResultSet.Row> callback) - { - super(executor, callback); - this.storeId = commandStore; - this.domain = domain.ordinal(); - cql = format("SELECT %s " + - "FROM %s " + - "WHERE store_id = ? " + - " AND domain = ? " + - " AND token(store_id, domain, txn_id) > token(?, ?, (?, ?, ?)) " + - "ALLOW FILTERING", selection(Commands, requiredColumns, COLUMNS_FOR_ITERATION), Commands); - } - - @Override - protected UntypedResultSet query(UntypedResultSet.Row lastSeen) - { - TxnId lastTxnId = lastSeen == null ? - new TxnId(0, 0, Txn.Kind.Read, Routable.Domain.Key, Node.Id.NONE) - : deserializeTxnId(lastSeen); - return executeInternal(cql, storeId, domain, storeId, domain, lastTxnId.msb, lastTxnId.lsb, lastTxnId.node.id); - } - } - - public static void findAllKeysBetween(int commandStore, - AccordRoutingKey start, boolean startInclusive, - AccordRoutingKey end, boolean endInclusive, - Observable<PartitionKey> callback) - { - //TODO (optimize) : CQL doesn't look smart enough to only walk Index.db, and ends up walking the Data.db file for each row in the partitions found (for frequent keys, this cost adds up) - // it would be possible to find all SSTables that "could" intersect this range, then have a merge iterator over the Index.db (filtered to the range; index stores partition liveness)... - KeysBetween work = new KeysBetween(commandStore, - AccordKeyspace.serializeRoutingKey(start), startInclusive, - AccordKeyspace.serializeRoutingKey(end), endInclusive, - ImmutableSet.of("key"), - Stage.READ.executor(), Observable.distinct(callback).map(AccordKeyspace::deserializeKey)); - work.schedule(); - } - - private static class KeysBetween extends TableWalk - { - private static final Set<String> COLUMNS_FOR_ITERATION = ImmutableSet.of("store_id", "key_token"); - - private final int storeId; - private final ByteBuffer start, end; - private final String cqlFirst; - private final String cqlContinue; - - private KeysBetween(int storeId, - ByteBuffer start, boolean startInclusive, - ByteBuffer end, boolean endInclusive, - Set<String> requiredColumns, - Executor executor, Observable<UntypedResultSet.Row> callback) - { - super(executor, callback); - this.storeId = storeId; - this.start = start; - this.end = end; - - String selection = selection(CommandsForKeys, requiredColumns, COLUMNS_FOR_ITERATION); - this.cqlFirst = format("SELECT DISTINCT %s\n" + - "FROM %s\n" + - "WHERE store_id = ?\n" + - (startInclusive ? " AND key_token >= ?\n" : " AND key_token > ?\n") + - (endInclusive ? " AND key_token <= ?\n" : " AND key_token < ?\n") + - "ALLOW FILTERING", - selection, CommandsForKeys); - this.cqlContinue = format("SELECT DISTINCT %s\n" + - "FROM %s\n" + - "WHERE store_id = ?\n" + - " AND key_token > ?\n" + - " AND key > ?\n" + - (endInclusive ? " AND key_token <= ?\n" : " AND key_token < ?\n") + - "ALLOW FILTERING", - selection, CommandsForKeys); - } - - @Override - protected UntypedResultSet query(UntypedResultSet.Row lastSeen) - { - if (lastSeen == null) - { - return executeInternal(cqlFirst, storeId, start, end); - } - else + catch (IOException e) { - ByteBuffer previousToken = lastSeen.getBytes("key_token"); - ByteBuffer previousKey = lastSeen.getBytes("key"); - return executeInternal(cqlContinue, storeId, previousToken, previousKey, end); + try + { + callback.onError(e); + } + catch (Throwable t) + { + e.addSuppressed(t); + } + throw new RuntimeException(e); } - } + }); } public static TxnId deserializeTxnId(UntypedResultSet.Row row) @@ -1272,26 +1186,24 @@ public class AccordKeyspace private static DecoratedKey makeKey(CommandsForKeyAccessor accessor, int storeId, PartitionKey key) { ByteBuffer pk = accessor.keyComparator.make(storeId, + UUIDSerializer.instance.serialize(key.table().asUUID()), serializeRoutingKey(key.toUnseekable()), - serializeKey(key)).serializeAsPartitionKey(); + key.partitionKey().getKey()).serializeAsPartitionKey(); return accessor.table.partitioner.decorateKey(pk); } @VisibleForTesting public static ByteBuffer serializeRoutingKey(AccordRoutingKey routingKey) { - AccordRoutingKeyByteSource.Serializer serializer = TABLE_SERIALIZERS.computeIfAbsent(routingKey.table(), ignore -> { - IPartitioner partitioner; - if (routingKey.kindOfRoutingKey() == AccordRoutingKey.RoutingKeyKind.TOKEN) - partitioner = routingKey.asTokenKey().token().getPartitioner(); - else - partitioner = SchemaHolder.schema.getTablePartitioner(routingKey.table()); - return AccordRoutingKeyByteSource.variableLength(partitioner); - }); - byte[] bytes = serializer.serialize(routingKey); + byte[] bytes = getRoutingKeySerializer(routingKey).serialize(routingKey); return ByteBuffer.wrap(bytes); } + public static ByteBuffer serializeRoutingKeyNoTable(AccordRoutingKey key) + { + return CommandsForKeysAccessor.serializeKeyNoTable(key); + } + private static PartitionUpdate getCommandsForKeyPartitionUpdate(int storeId, PartitionKey key, CommandsForKey commandsForKey, long timestampMicros) { ByteBuffer bytes = CommandsForKeySerializer.toBytesWithoutKey(commandsForKey); @@ -1475,7 +1387,7 @@ public class AccordKeyspace /** * Update the disk state for this epoch, if it's higher than the one we have one disk. - * + * <p> * This is meant to be called before any update involving the new epoch, not after. This way if the update * fails, we can detect and cleanup. If we updated disk state after an update and it failed, we could "forget" * about (now acked) topology updates after a restart. @@ -1624,7 +1536,6 @@ public class AccordKeyspace Ranges redundant = row.has("redundant") ? blobMapToRanges(row.getMap("redundant", BytesType.instance, BytesType.instance)) : Ranges.EMPTY; consumer.load(epoch, topology, syncStatus, pendingSyncNotify, remoteSyncComplete, closed, redundant); - } public static EpochDiskState loadTopologies(TopologyLoadConsumer consumer) @@ -1710,8 +1621,8 @@ public class AccordKeyspace public interface CommandStoreMetadataConsumer { void accept(ReducingRangeMap<Timestamp> rejectBefore, DurableBefore durableBefore, RedundantBefore redundantBefore, NavigableMap<TxnId, Ranges> bootstrapBeganAt, NavigableMap<Timestamp, Ranges> safeToRead); - } + public static void loadCommandStoreMetadata(int id, CommandStoreMetadataConsumer consumer) { UntypedResultSet result = executeOnceInternal(format("SELECT * FROM %s.%s WHERE store_id=?", ACCORD_KEYSPACE_NAME, COMMAND_STORE_METADATA), id); @@ -1758,5 +1669,4 @@ public class AccordKeyspace TABLE_SERIALIZERS.clear(); SchemaHolder.schema = Schema.instance; } - } diff --git a/src/java/org/apache/cassandra/service/accord/repair/AccordRepair.java b/src/java/org/apache/cassandra/service/accord/repair/AccordRepair.java index 18fb045475..51497792cd 100644 --- a/src/java/org/apache/cassandra/service/accord/repair/AccordRepair.java +++ b/src/java/org/apache/cassandra/service/accord/repair/AccordRepair.java @@ -124,10 +124,11 @@ public class AccordRepair List<accord.primitives.Range> repairedRanges = new ArrayList<>(); int rangeStepUpdateInterval = ACCORD_REPAIR_RANGE_STEP_UPDATE_INTERVAL.getInt(); RoutingKey remainingStart = range.start(); + // TODO (expected): repair ranges should have a configurable lower limit of split size so already small repairs aren't broken up into excessively tiny ones BigInteger rangeSize = splitter.sizeOf(range); if (rangeStep == null) { - BigInteger divide = splitter.divide(rangeSize, 1000); + BigInteger divide = splitter.divide(rangeSize, 10000); rangeStep = divide.equals(BigInteger.ZERO) ? rangeSize : BigInteger.ONE.max(divide); } diff --git a/src/java/org/apache/cassandra/tools/nodetool/ConsensusMigrationAdmin.java b/src/java/org/apache/cassandra/tools/nodetool/ConsensusMigrationAdmin.java index e7e1fd9cfc..7eea5a021b 100644 --- a/src/java/org/apache/cassandra/tools/nodetool/ConsensusMigrationAdmin.java +++ b/src/java/org/apache/cassandra/tools/nodetool/ConsensusMigrationAdmin.java @@ -119,7 +119,7 @@ public abstract class ConsensusMigrationAdmin extends NodeTool.NodeToolCmd @Override public Integer start() { - return probe.getStorageService().finishConsensusMigration(keyspace, maybeTableNames, maybeRangesStr, target); + return probe.getStorageService().finishConsensusMigration(keyspace, maybeTableNames, maybeRangesStr, target.toString()); } } diff --git a/src/java/org/apache/cassandra/utils/MergeIterator.java b/src/java/org/apache/cassandra/utils/MergeIterator.java index 1dd1f7833b..7ffb55a2db 100644 --- a/src/java/org/apache/cassandra/utils/MergeIterator.java +++ b/src/java/org/apache/cassandra/utils/MergeIterator.java @@ -421,6 +421,23 @@ public abstract class MergeIterator<In,Out> extends AbstractIterator<Out> implem /** Accumulator that collects values of type A, and outputs a value of type B. */ public static abstract class Reducer<In,Out> { + public static class Trivial<T> extends Reducer<T, T> + { + private T reduced = null; + + @Override + public boolean trivialReduceIsTrivial() { return true; } + + @Override + public void reduce(int idx, T current) { reduced = current; } + + @Override + protected T getReduced() { return reduced; } + + @Override + protected void onKeyChange() { reduced = null; } + } + /** * @return true if Out is the same as In for the case of a single source iterator */ diff --git a/test/distributed/org/apache/cassandra/io/sstable/format/ForwardingSSTableReader.java b/test/distributed/org/apache/cassandra/io/sstable/format/ForwardingSSTableReader.java index 710a426647..743583e362 100644 --- a/test/distributed/org/apache/cassandra/io/sstable/format/ForwardingSSTableReader.java +++ b/test/distributed/org/apache/cassandra/io/sstable/format/ForwardingSSTableReader.java @@ -249,6 +249,12 @@ public abstract class ForwardingSSTableReader extends SSTableReader return delegate.keyIterator(); } + @Override + public KeyIterator keyIterator(AbstractBounds<PartitionPosition> range) throws IOException + { + return delegate.keyIterator(range); + } + @Override public DecoratedKey firstKeyBeyond(PartitionPosition token) { diff --git a/test/unit/org/apache/cassandra/db/marshal/AbstractTypeTest.java b/test/unit/org/apache/cassandra/db/marshal/AbstractTypeTest.java index 35ecfab60d..d498323e85 100644 --- a/test/unit/org/apache/cassandra/db/marshal/AbstractTypeTest.java +++ b/test/unit/org/apache/cassandra/db/marshal/AbstractTypeTest.java @@ -233,6 +233,8 @@ public class AbstractTypeTest continue; if (isTestType(klass)) continue; + if (isPrefixCompositeType(klass)) + continue; String name = klass.getCanonicalName(); if (name == null) name = klass.getName(); @@ -259,6 +261,13 @@ public class AbstractTypeTest return "test".equals(new File(src.getLocation().getPath()).name()); } + @SuppressWarnings("rawtypes") + private boolean isPrefixCompositeType(Class<? extends AbstractType> klass) + { + String name = klass.getCanonicalName(); + return name.contains("PrefixCompositeType"); + } + @Test public void unsafeSharedSerializer() { diff --git a/test/unit/org/apache/cassandra/dht/IPartitionerTest.java b/test/unit/org/apache/cassandra/dht/IPartitionerTest.java index 5e46f09ed6..e531e25da5 100644 --- a/test/unit/org/apache/cassandra/dht/IPartitionerTest.java +++ b/test/unit/org/apache/cassandra/dht/IPartitionerTest.java @@ -18,21 +18,78 @@ package org.apache.cassandra.dht; +import java.lang.reflect.Modifier; +import java.security.CodeSource; +import java.security.ProtectionDomain; import java.util.Objects; +import java.util.Set; +import com.google.common.collect.Sets; import org.junit.Test; +import org.apache.cassandra.io.util.File; import org.apache.cassandra.utils.AbstractTypeGenerators; import org.apache.cassandra.utils.AccordGenerators; import org.apache.cassandra.utils.CassandraGenerators; import org.apache.cassandra.utils.bytecomparable.ByteComparable; import org.apache.cassandra.utils.bytecomparable.ByteSource; import org.assertj.core.api.Assertions; +import org.reflections.Reflections; +import org.reflections.scanners.Scanners; +import org.reflections.util.ConfigurationBuilder; import static accord.utils.Property.qt; public class IPartitionerTest { + //TODO (now, maintaince): this is copied from AbstractTypeTest + private static final Reflections reflections = new Reflections(new ConfigurationBuilder() + .forPackage("org.apache.cassandra") + .setScanners(Scanners.SubTypes) + .setExpandSuperTypes(true) + .setParallel(true)); + + @Test + public void allCovered() + { + Set<Class<? extends IPartitioner>> subTypes = reflections.getSubTypesOf(IPartitioner.class); + Set<Class<? extends IPartitioner>> coverage = CassandraGenerators.knownPartitioners(); + StringBuilder sb = new StringBuilder(); + for (Class<? extends IPartitioner> klass : Sets.difference(subTypes, coverage)) + { + if (Modifier.isAbstract(klass.getModifiers())) + continue; + if (isTestType(klass)) + continue; + if (ReversedLongLocalPartitioner.class.equals(klass)) + continue; + String name = klass.getCanonicalName(); + if (name == null) + name = klass.getName(); + sb.append(name).append('\n'); + } + if (sb.length() > 0) + throw new AssertionError("Uncovered types:\n" + sb); + } + + private boolean isTestType(Class<? extends IPartitioner> klass) + { + String name = klass.getCanonicalName(); + if (name == null) + name = klass.getName(); + if (name == null) + name = klass.toString(); + if (name.contains("Test")) + return true; + if (name.equals(LengthPartitioner.class.getCanonicalName())) + return true; + ProtectionDomain domain = klass.getProtectionDomain(); + if (domain == null) return false; + CodeSource src = domain.getCodeSource(); + if (src == null) return false; + return "test".equals(new File(src.getLocation().getPath()).name()); + } + @Test public void byteCompareSerde() { diff --git a/test/unit/org/apache/cassandra/dht/LocalCompositePrefixPartitionerTest.java b/test/unit/org/apache/cassandra/dht/LocalCompositePrefixPartitionerTest.java new file mode 100644 index 0000000000..565e5a59fd --- /dev/null +++ b/test/unit/org/apache/cassandra/dht/LocalCompositePrefixPartitionerTest.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.dht; + +import com.google.common.collect.Lists; +import org.apache.cassandra.CassandraTestBase; +import org.apache.cassandra.SchemaLoader; +import org.apache.cassandra.cql3.statements.schema.CreateTableStatement; +import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.db.marshal.AbstractType; +import org.apache.cassandra.db.marshal.BytesType; +import org.apache.cassandra.db.marshal.Int32Type; +import org.apache.cassandra.schema.KeyspaceParams; +import org.apache.cassandra.schema.TableMetadata; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import static java.lang.String.format; +import static org.apache.cassandra.cql3.QueryProcessor.executeOnceInternal; +import static org.apache.cassandra.utils.ByteBufferUtil.hexToBytes; + +public class LocalCompositePrefixPartitionerTest extends CassandraTestBase +{ + private static final String KEYSPACE = "ks"; + @BeforeClass + public static void setupClass() + { + SchemaLoader.prepareServer(); + } + + private static TableMetadata.Builder parse(String keyspace, String name, String cql) + { + return CreateTableStatement.parse(format(cql, name), KEYSPACE) + .gcGraceSeconds((int) TimeUnit.DAYS.toSeconds(90)); + } + + private static LocalCompositePrefixPartitioner partitioner(AbstractType... types) + { + return new LocalCompositePrefixPartitioner(types); + } + + private static void assertKeysMatch(Iterable<DecoratedKey> expected, Iterator<DecoratedKey> actual) + { + List<DecoratedKey> expectedList = Lists.newArrayList(expected); + List<DecoratedKey> actualList = Lists.newArrayList(actual); + Assert.assertEquals(expectedList, actualList); + } + + @Test + public void keyIteratorTest() throws Throwable + { + String keyspaceName = "ks"; + String tableName = "tbl"; + LocalCompositePrefixPartitioner partitioner = partitioner(Int32Type.instance, BytesType.instance, Int32Type.instance); + TableMetadata metadata = parse(keyspaceName, tableName, + "CREATE TABLE %s (" + + "p1 int," + + "p2 blob," + + "p3 int," + + "v int," + + "PRIMARY KEY ((p1, p2, p3))" + + ")").partitioner(partitioner).build(); + SchemaLoader.createKeyspace(keyspaceName, KeyspaceParams.local(), metadata); + + executeOnceInternal(String.format("INSERT INTO %s.%s (p1, p2, p3, v) VALUES (1, 0x00, 5, 0)", keyspaceName, tableName)); + executeOnceInternal(String.format("INSERT INTO %s.%s (p1, p2, p3, v) VALUES (1, 0x0000, 5, 0)", keyspaceName, tableName)); + executeOnceInternal(String.format("INSERT INTO %s.%s (p1, p2, p3, v) VALUES (2, 0x00, 5, 0)", keyspaceName, tableName)); + executeOnceInternal(String.format("INSERT INTO %s.%s (p1, p2, p3, v) VALUES (2, 0x0100, 5, 0)", keyspaceName, tableName)); + executeOnceInternal(String.format("INSERT INTO %s.%s (p1, p2, p3, v) VALUES (2, 0x02, 5, 0)", keyspaceName, tableName)); + executeOnceInternal(String.format("INSERT INTO %s.%s (p1, p2, p3, v) VALUES (2, 0x02, 6, 0)", keyspaceName, tableName)); + + Token startToken = partitioner.createPrefixToken(1, hexToBytes("0000")); + Token endToken1 = partitioner.createPrefixToken(2, hexToBytes("0100")); + Token endToken2 = partitioner.createPrefixToken(2, hexToBytes("02")); + + + assertKeysMatch(List.of(partitioner.decoratedKey(2, hexToBytes("00"), 5), + partitioner.decoratedKey(2, hexToBytes("0100"), 5) + ), partitioner.keyIterator(metadata, new Range<>(startToken.maxKeyBound(), endToken1.maxKeyBound()))); + + assertKeysMatch(List.of(partitioner.decoratedKey(1, hexToBytes("0000"), 5), + partitioner.decoratedKey(2, hexToBytes("00"), 5), + partitioner.decoratedKey(2, hexToBytes("0100"), 5), + partitioner.decoratedKey(2, hexToBytes("02"), 5), + partitioner.decoratedKey(2, hexToBytes("02"), 6) + ), partitioner.keyIterator(metadata, new Bounds<>(startToken.minKeyBound(), endToken2.maxKeyBound()))); + + assertKeysMatch(List.of(partitioner.decoratedKey(1, hexToBytes("0000"), 5), + partitioner.decoratedKey(2, hexToBytes("00"), 5), + partitioner.decoratedKey(2, hexToBytes("0100"), 5) + ), partitioner.keyIterator(metadata, new IncludingExcludingBounds<>(startToken.minKeyBound(), endToken2.minKeyBound()))); + + } +} diff --git a/test/unit/org/apache/cassandra/service/accord/AccordKeyspaceTest.java b/test/unit/org/apache/cassandra/service/accord/AccordKeyspaceTest.java index 717cda1de3..02d823cfd4 100644 --- a/test/unit/org/apache/cassandra/service/accord/AccordKeyspaceTest.java +++ b/test/unit/org/apache/cassandra/service/accord/AccordKeyspaceTest.java @@ -60,6 +60,7 @@ import org.apache.cassandra.db.Mutation; import org.apache.cassandra.db.marshal.Int32Type; import org.apache.cassandra.dht.IPartitioner; import org.apache.cassandra.dht.LocalPartitioner; +import org.apache.cassandra.dht.ReversedLongLocalPartitioner; import org.apache.cassandra.exceptions.InvalidRequestException; import org.apache.cassandra.schema.MemtableParams; import org.apache.cassandra.schema.Schema; @@ -183,8 +184,13 @@ public class AccordKeyspaceTest extends CQLTester.InMemory { TableId tableId = rs.pickOrderedSet(tables.navigableKeySet()); IPartitioner partitioner = tables.get(tableId); - ByteBuffer data = !(partitioner instanceof LocalPartitioner) ? Int32Type.instance.decompose(rs.nextInt()) - : fromQT(getTypeSupport(partitioner.getTokenValidator()).bytesGen()).next(rs); + ByteBuffer data; + if (partitioner instanceof ReversedLongLocalPartitioner) + data = fromQT(CassandraGenerators.reversedLongLocalKeys()).next(rs); + else if (partitioner instanceof LocalPartitioner) + data = fromQT(getTypeSupport(partitioner.getTokenValidator()).bytesGen()).next(rs); + else + data = Int32Type.instance.decompose(rs.nextInt()); PartitionKey key = new PartitionKey(tableId, tables.get(tableId).decorateKey(data)); if (keys.add(key)) { @@ -200,8 +206,8 @@ public class AccordKeyspaceTest extends CQLTester.InMemory // The memtable will allow the write, but it will be dropped when writing to the SSTable... //TODO (now, correctness): since we store the user token + user key, if a key is close to the PK limits then we could tip over and loose our CFK // new Mutation(AccordKeyspace.getCommandsForKeyPartitionUpdate(store, pk, 42, ByteBufferUtil.EMPTY_BYTE_BUFFER)).apply(); - execute("INSERT INTO system_accord.commands_for_key (store_id, key_token, key) VALUES (?, ?, ?)", - store, AccordKeyspace.serializeRoutingKey(pk.toUnseekable()), AccordKeyspace.serializeKey(pk)); + execute("INSERT INTO system_accord.commands_for_key (store_id, table_id, key_token, key) VALUES (?, ?, ?, ?)", + store, pk.table().asUUID(), AccordKeyspace.serializeRoutingKeyNoTable(pk.toUnseekable()), pk.partitionKey().getKey()); } catch (IllegalArgumentException | InvalidRequestException e) { @@ -235,17 +241,21 @@ public class AccordKeyspaceTest extends CQLTester.InMemory for (var e : storesToKeys.entrySet()) { int store = e.getKey(); - expectedCqlStoresToKeys.put(store, new TreeSet<>(e.getValue().stream().map(p -> AccordKeyspace.serializeRoutingKey(p.toUnseekable())).collect(Collectors.toList()))); + SortedSet<PartitionKey> keys = e.getValue(); + if (keys.isEmpty()) + continue; + expectedCqlStoresToKeys.put(store, new TreeSet<>(keys.stream().map(p -> AccordKeyspace.serializeRoutingKeyNoTable(p.toUnseekable())).collect(Collectors.toList()))); } // make sure no data loss... when this test was written sstable had all the rows but the sstable didn't... this // is mostly a santity check to detect that case early - var resultSet = execute("SELECT store_id, key_token FROM system_accord.commands_for_key ALLOW FILTERING"); + var resultSet = execute("SELECT store_id, table_id, key_token FROM system_accord.commands_for_key ALLOW FILTERING"); TreeMap<Integer, SortedSet<ByteBuffer>> cqlStoresToKeys = new TreeMap<>(); for (var row : resultSet) { int storeId = row.getInt("store_id"); ByteBuffer bb = row.getBytes("key_token"); + // FIXME: include table_id cqlStoresToKeys.computeIfAbsent(storeId, ignore -> new TreeSet<>()).add(bb); } Assertions.assertThat(cqlStoresToKeys).isEqualTo(expectedCqlStoresToKeys); @@ -255,6 +265,8 @@ public class AccordKeyspaceTest extends CQLTester.InMemory { int store = rs.pickOrderedSet(storesToKeys.navigableKeySet()); var keysForStore = new ArrayList<>(storesToKeys.get(store)); + if (keysForStore.isEmpty()) + continue; int offset; int offsetEnd; diff --git a/test/unit/org/apache/cassandra/utils/AccordGenerators.java b/test/unit/org/apache/cassandra/utils/AccordGenerators.java index e827bb33d6..cf23494d84 100644 --- a/test/unit/org/apache/cassandra/utils/AccordGenerators.java +++ b/test/unit/org/apache/cassandra/utils/AccordGenerators.java @@ -70,7 +70,7 @@ import static org.apache.cassandra.service.accord.AccordTestUtils.createPartialT public class AccordGenerators { - private static final Gen<IPartitioner> PARTITIONER_GEN = fromQT(CassandraGenerators.partitioners()); + private static final Gen<IPartitioner> PARTITIONER_GEN = fromQT(CassandraGenerators.nonLocalPartitioners()); private AccordGenerators() { diff --git a/test/unit/org/apache/cassandra/utils/CassandraGenerators.java b/test/unit/org/apache/cassandra/utils/CassandraGenerators.java index 2294183191..f45d05b7ce 100644 --- a/test/unit/org/apache/cassandra/utils/CassandraGenerators.java +++ b/test/unit/org/apache/cassandra/utils/CassandraGenerators.java @@ -37,6 +37,7 @@ import javax.annotation.Nullable; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; import org.apache.commons.lang3.builder.MultilineRecursiveToStringStyle; import org.apache.commons.lang3.builder.ReflectionToStringBuilder; @@ -57,11 +58,13 @@ import org.apache.cassandra.db.rows.Cell; import org.apache.cassandra.db.marshal.UserType; import org.apache.cassandra.dht.ByteOrderedPartitioner; import org.apache.cassandra.dht.IPartitioner; +import org.apache.cassandra.dht.LocalCompositePrefixPartitioner; import org.apache.cassandra.dht.LocalPartitioner; import org.apache.cassandra.dht.Murmur3Partitioner; import org.apache.cassandra.dht.OrderPreservingPartitioner; import org.apache.cassandra.dht.RandomPartitioner; import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.ReversedLongLocalPartitioner; import org.apache.cassandra.dht.Token; import org.apache.cassandra.gms.ApplicationState; import org.apache.cassandra.gms.EndpointState; @@ -609,11 +612,21 @@ public final class CassandraGenerators return rs -> partitioner.getToken(bytes.generate(rs)); } - public static Gen<IPartitioner> localPartitioner() + public static Gen<LocalPartitioner> localPartitioner() { return AbstractTypeGenerators.safeTypeGen().map(LocalPartitioner::new); } + public static Gen<LocalCompositePrefixPartitioner> localCompositePrefixPartitioner() + { + return AbstractTypeGenerators.safeTypeGen().map(type -> { + if (type instanceof CompositeType) + return new LocalCompositePrefixPartitioner((CompositeType) type); + else + return new LocalCompositePrefixPartitioner(type); + }); + } + public static Gen<Token> localPartitionerToken() { var lpGen = localPartitioner(); @@ -624,6 +637,31 @@ public final class CassandraGenerators }; } + public static Gen<Token> localCompositePrefixPartitionerToken() + { + var lpGen = localCompositePrefixPartitioner(); + return rs -> { + var lp = lpGen.generate(rs); + var bytes = AbstractTypeGenerators.getTypeSupport(lp.getTokenValidator()).bytesGen(); + return lp.getToken(bytes.generate(rs)); + }; + } + + public static Gen<Token> reversedLongLocalToken() + { + Constraint range = Constraint.between(0, Long.MAX_VALUE); + return rs -> new ReversedLongLocalPartitioner.ReversedLongLocalToken(rs.next(range)); + } + + public static Gen<ByteBuffer> reversedLongLocalKeys() + { + Constraint range = Constraint.between(0, Long.MAX_VALUE); + return rs -> { + long value = rs.next(range); + return ByteBufferUtil.bytes(value); + }; + } + public static Gen<Token> orderPreservingToken() { // empty token only happens if partition key is byte[0], which isn't allowed @@ -640,23 +678,39 @@ public final class CassandraGenerators private enum SupportedPartitioners { - Murmur(ignore -> Murmur3Partitioner.instance), - ByteOrdered(ignore -> ByteOrderedPartitioner.instance), - Random(ignore -> RandomPartitioner.instance), - Local(localPartitioner()), - OrderPreserving(ignore -> OrderPreservingPartitioner.instance); + Murmur(Murmur3Partitioner.class, ignore -> Murmur3Partitioner.instance), + ByteOrdered(ByteOrderedPartitioner.class, ignore -> ByteOrderedPartitioner.instance), + Random(RandomPartitioner.class, ignore -> RandomPartitioner.instance), + Local(LocalPartitioner.class, localPartitioner()), + OrderPreserving(OrderPreservingPartitioner.class, ignore -> OrderPreservingPartitioner.instance), + LocalCompositePrefix(LocalCompositePrefixPartitioner.class, localCompositePrefixPartitioner()); - private final Gen<IPartitioner> partitioner; + private final Class<? extends IPartitioner> clazz; + private final Gen<? extends IPartitioner> partitioner; - SupportedPartitioners(Gen<IPartitioner> partitionerGen) + <T extends IPartitioner> SupportedPartitioners(Class<T> clazz, Gen<T> partitionerGen) { + this.clazz = clazz; partitioner = partitionerGen; } - public Gen<IPartitioner> partitioner() + public Gen<? extends IPartitioner> partitioner() { return partitioner; } + + public static Set<Class<? extends IPartitioner>> knownPartitioners() + { + ImmutableSet.Builder<Class<? extends IPartitioner>> builder = ImmutableSet.builder(); + for (SupportedPartitioners p : values()) + builder.add(p.clazz); + return builder.build(); + } + } + + public static Set<Class<? extends IPartitioner>> knownPartitioners() + { + return SupportedPartitioners.knownPartitioners(); } public static Gen<IPartitioner> partitioners() @@ -665,10 +719,12 @@ public final class CassandraGenerators .flatMap(SupportedPartitioners::partitioner); } + public static Gen<IPartitioner> nonLocalPartitioners() { return SourceDSL.arbitrary().enumValues(SupportedPartitioners.class) - .assuming(p -> p != SupportedPartitioners.Local) + .assuming(p -> p != SupportedPartitioners.Local && + p != SupportedPartitioners.LocalCompositePrefix) .flatMap(SupportedPartitioners::partitioner); } @@ -682,6 +738,7 @@ public final class CassandraGenerators if (partitioner instanceof Murmur3Partitioner) return murmurToken(); if (partitioner instanceof ByteOrderedPartitioner) return byteOrderToken(); if (partitioner instanceof RandomPartitioner) return randomPartitionerToken(); + if (partitioner instanceof LocalCompositePrefixPartitioner) return localCompositePrefixPartitionerToken(); if (partitioner instanceof LocalPartitioner) return localPartitionerToken((LocalPartitioner) partitioner); if (partitioner instanceof OrderPreservingPartitioner) return orderPreservingToken(); throw new UnsupportedOperationException("Unsupported partitioner: " + partitioner.getClass()); @@ -842,6 +899,10 @@ public final class CassandraGenerators LocalPartitioner lp = (LocalPartitioner) partitioner; valueGen = AbstractTypeGenerators.getTypeSupport(lp.getTokenValidator()).bytesGen(); } + else if (partitioner instanceof ReversedLongLocalPartitioner) + { + valueGen = reversedLongLocalKeys(); + } return partitioner.decorateKey(valueGen.generate(rs)); }; } diff --git a/test/unit/org/apache/cassandra/utils/CassandraGeneratorsTest.java b/test/unit/org/apache/cassandra/utils/CassandraGeneratorsTest.java new file mode 100644 index 0000000000..b4e037ef3a --- /dev/null +++ b/test/unit/org/apache/cassandra/utils/CassandraGeneratorsTest.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.utils; + +import org.junit.Test; + +import accord.utils.Gens; +import org.assertj.core.api.Assertions; + +import static accord.utils.Property.qt; +import static org.apache.cassandra.utils.Generators.toGen; + +public class CassandraGeneratorsTest +{ + @Test + public void partitionerToToken() + { + qt().forAll(Gens.random(), toGen(CassandraGenerators.partitioners())) + .check((rs, p) -> Assertions.assertThat(toGen(CassandraGenerators.token(p)).next(rs)).isNotNull()); + } + + @Test + public void partitionerKeys() + { + qt().forAll(Gens.random(), toGen(CassandraGenerators.partitioners())) + .check((rs, p) -> Assertions.assertThat(toGen(CassandraGenerators.decoratedKeys(i -> p)).next(rs)).isNotNull()); + } +} \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org