This is an automated email from the ASF dual-hosted git repository. samt pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit a8e7cfbc0e146ea82154654ba43b613b058f99d1 Author: Sam Tunnicliffe <s...@beobal.com> AuthorDate: Tue Feb 11 09:59:31 2020 +0000 Ensure repaired data tracking reads a consistent amount of data across replicas Patch by Sam Tunnicliffe; reviewed by Aleksey Yeschenko for CASSANDRA-15601 --- CHANGES.txt | 1 + src/java/org/apache/cassandra/db/ReadCommand.java | 310 ++++--------------- .../org/apache/cassandra/db/RepairedDataInfo.java | 336 +++++++++++++++++++++ .../apache/cassandra/metrics/KeyspaceMetrics.java | 12 + .../org/apache/cassandra/metrics/TableMetrics.java | 18 +- .../distributed/test/RepairDigestTrackingTest.java | 169 ++++++++++- .../org/apache/cassandra/db/ReadCommandTest.java | 120 +++++++- .../apache/cassandra/db/RepairedDataInfoTest.java | 303 +++++++++++++++++++ 8 files changed, 1003 insertions(+), 266 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index 96eeed4..4586c71 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 4.0-alpha4 + * Ensure repaired data tracking reads a consistent amount of data across replicas (CASSANDRA-15601) * Fix CQLSH to avoid arguments being evaluated (CASSANDRA-15660) * Correct Visibility and Improve Safety of Methods in LatencyMetrics (CASSANDRA-15597) * Allow cqlsh to run with Python2.7/Python3.6+ (CASSANDRA-15659,CASSANDRA-15573) diff --git a/src/java/org/apache/cassandra/db/ReadCommand.java b/src/java/org/apache/cassandra/db/ReadCommand.java index 4f8ea3e..4c4c833 100644 --- a/src/java/org/apache/cassandra/db/ReadCommand.java +++ b/src/java/org/apache/cassandra/db/ReadCommand.java @@ -23,6 +23,7 @@ import java.util.*; import java.util.concurrent.TimeUnit; import java.util.function.BiFunction; import java.util.function.LongPredicate; +import java.util.function.Function; import javax.annotation.Nullable; @@ -62,12 +63,12 @@ import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.service.ActiveRepairService; import org.apache.cassandra.service.ClientWarn; import org.apache.cassandra.tracing.Tracing; -import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.FBUtilities; import static com.google.common.collect.Iterables.any; import static com.google.common.collect.Iterables.filter; import static org.apache.cassandra.utils.MonotonicClock.approxTime; +import static org.apache.cassandra.db.partitions.UnfilteredPartitionIterators.MergeListener.NOOP; /** * General interface for storage-engine read commands (common to both range and @@ -91,17 +92,7 @@ public abstract class ReadCommand extends AbstractReadQuery // for data queries, coordinators may request information on the repaired data used in constructing the response private boolean trackRepairedStatus = false; // tracker for repaired data, initialized to singleton null object - private static final RepairedDataInfo NULL_REPAIRED_DATA_INFO = new RepairedDataInfo() - { - void trackPartitionKey(DecoratedKey key){} - void trackDeletion(DeletionTime deletion){} - void trackRangeTombstoneMarker(RangeTombstoneMarker marker){} - void trackRow(Row row){} - boolean isConclusive(){ return true; } - ByteBuffer getDigest(){ return ByteBufferUtil.EMPTY_BYTE_BUFFER; } - }; - - private RepairedDataInfo repairedDataInfo = NULL_REPAIRED_DATA_INFO; + private RepairedDataInfo repairedDataInfo = RepairedDataInfo.NULL_REPAIRED_DATA_INFO; int oldestUnrepairedTombstone = Integer.MAX_VALUE; @@ -450,7 +441,13 @@ public abstract class ReadCommand extends AbstractReadQuery } if (isTrackingRepairedStatus()) - repairedDataInfo = new RepairedDataInfo(); + { + final DataLimits.Counter repairedReadCount = limits().newCounter(nowInSec(), + false, + selectsFullPartition(), + metadata().enforceStrictLiveness()).onlyCount(); + repairedDataInfo = new RepairedDataInfo(repairedReadCount); + } UnfilteredPartitionIterator iterator = (null == searcher) ? queryStorage(cfs, executionController) : searcher.search(executionController); iterator = RTBoundValidator.validate(iterator, Stage.MERGED, false); @@ -475,7 +472,22 @@ public abstract class ReadCommand extends AbstractReadQuery // apply the limits/row counter; this transformation is stopping and would close the iterator as soon // as the count is observed; if that happens in the middle of an open RT, its end bound will not be included. - iterator = limits().filter(iterator, nowInSec(), selectsFullPartition()); + // If tracking repaired data, the counter is needed for overreading repaired data, otherwise we can + // optimise the case where this.limit = DataLimits.NONE which skips an unnecessary transform + if (isTrackingRepairedStatus()) + { + DataLimits.Counter limit = + limits().newCounter(nowInSec(), false, selectsFullPartition(), metadata().enforceStrictLiveness()); + iterator = limit.applyTo(iterator); + // ensure that a consistent amount of repaired data is read on each replica. This causes silent + // overreading from the repaired data set, up to limits(). The extra data is not visible to + // the caller, only iterated to produce the repaired data digest. + iterator = repairedDataInfo.extend(iterator, limit); + } + else + { + iterator = limits().filter(iterator, nowInSec(), selectsFullPartition()); + } // because of the above, we need to append an aritifical end bound if the source iterator was stopped short by a counter. return RTBoundCloser.close(iterator); @@ -723,254 +735,37 @@ public abstract class ReadCommand extends AbstractReadQuery return toCQLString(); } - private static UnfilteredPartitionIterator withRepairedDataInfo(final UnfilteredPartitionIterator iterator, - final RepairedDataInfo repairedDataInfo) - { - class WithRepairedDataTracking extends Transformation<UnfilteredRowIterator> - { - protected UnfilteredRowIterator applyToPartition(UnfilteredRowIterator partition) - { - return withRepairedDataInfo(partition, repairedDataInfo); - } - } - - return Transformation.apply(iterator, new WithRepairedDataTracking()); - } - - private static UnfilteredRowIterator withRepairedDataInfo(final UnfilteredRowIterator iterator, - final RepairedDataInfo repairedDataInfo) - { - class WithTracking extends Transformation - { - protected DecoratedKey applyToPartitionKey(DecoratedKey key) - { - repairedDataInfo.onNewPartition(iterator); - repairedDataInfo.trackPartitionKey(key); - return key; - } - - protected DeletionTime applyToDeletion(DeletionTime deletionTime) - { - repairedDataInfo.trackDeletion(deletionTime); - return deletionTime; - } - - protected RangeTombstoneMarker applyToMarker(RangeTombstoneMarker marker) - { - repairedDataInfo.trackRangeTombstoneMarker(marker); - return marker; - } - - protected Row applyToStatic(Row row) - { - repairedDataInfo.trackStaticRow(row); - return row; - } - - protected Row applyToRow(Row row) - { - repairedDataInfo.trackRow(row); - return row; - } - - protected void onPartitionClose() - { - repairedDataInfo.onPartitionClose(); - } - } - return Transformation.apply(iterator, new WithTracking()); - } - - private static class RepairedDataInfo - { - // Keeps a digest of the partition currently being processed. Since we won't know - // whether a partition will be fully purged from a read result until it's been - // consumed, we buffer this per-partition digest and add it to the final digest - // when the partition is closed (if it wasn't fully purged). - private Digest perPartitionDigest; - private Digest perCommandDigest; - private boolean isConclusive = true; - - // Doesn't actually purge from the underlying iterators, but excludes from the digest - // the purger can't be initialized until we've iterated all the sstables for the query - // as it requires the oldest repaired tombstone - private RepairedDataPurger purger; - private boolean isFullyPurged = true; - - ByteBuffer getDigest() - { - return perCommandDigest == null - ? ByteBufferUtil.EMPTY_BYTE_BUFFER - : ByteBuffer.wrap(perCommandDigest.digest()); - } - - protected void onNewPartition(UnfilteredRowIterator partition) - { - assert purger != null; - purger.setCurrentKey(partition.partitionKey()); - purger.setIsReverseOrder(partition.isReverseOrder()); - } - - protected void setPurger(RepairedDataPurger purger) - { - this.purger = purger; - } - - boolean isConclusive() - { - return isConclusive; - } - - void markInconclusive() - { - isConclusive = false; - } - - void trackPartitionKey(DecoratedKey key) - { - getPerPartitionDigest().update(key.getKey()); - } - - void trackDeletion(DeletionTime deletion) - { - assert purger != null; - DeletionTime purged = purger.applyToDeletion(deletion); - if (!purged.isLive()) - isFullyPurged = false; - - purged.digest(getPerPartitionDigest()); - } - - void trackRangeTombstoneMarker(RangeTombstoneMarker marker) - { - assert purger != null; - RangeTombstoneMarker purged = purger.applyToMarker(marker); - if (purged != null) - { - isFullyPurged = false; - purged.digest(getPerPartitionDigest()); - } - } - - void trackStaticRow(Row row) - { - assert purger != null; - Row purged = purger.applyToRow(row); - if (!purged.isEmpty()) - { - isFullyPurged = false; - purged.digest(getPerPartitionDigest()); - } - } - - void trackRow(Row row) - { - assert purger != null; - Row purged = purger.applyToRow(row); - if (purged != null) - { - isFullyPurged = false; - purged.digest(getPerPartitionDigest()); - } - } - - private Digest getPerPartitionDigest() - { - if (perPartitionDigest == null) - perPartitionDigest = Digest.forRepairedDataTracking(); - - return perPartitionDigest; - } - - private void onPartitionClose() - { - if (perPartitionDigest != null) - { - // If the partition wasn't completely emptied by the purger, - // calculate the digest for the partition and use it to - // update the overall digest - if (!isFullyPurged) - { - if (perCommandDigest == null) - perCommandDigest = Digest.forRepairedDataTracking(); - - byte[] partitionDigest = perPartitionDigest.digest(); - perCommandDigest.update(partitionDigest, 0, partitionDigest.length); - isFullyPurged = true; - } - - perPartitionDigest = null; - } - } - } - - /** - * Although PurgeFunction extends Transformation, this is never applied to an iterator. - * Instead, it is used by RepairedDataInfo during the generation of a repaired data - * digest to exclude data which will actually be purged later on in the read pipeline. - */ - private static class RepairedDataPurger extends PurgeFunction - { - RepairedDataPurger(ColumnFamilyStore cfs, - int nowInSec, - int oldestUnrepairedTombstone) - { - super(nowInSec, - cfs.gcBefore(nowInSec), - oldestUnrepairedTombstone, - cfs.getCompactionStrategyManager().onlyPurgeRepairedTombstones(), - cfs.metadata.get().enforceStrictLiveness()); - } - - protected LongPredicate getPurgeEvaluator() - { - return (time) -> true; - } - - void setCurrentKey(DecoratedKey key) - { - super.onNewPartition(key); - } - - void setIsReverseOrder(boolean isReverseOrder) - { - super.setReverseOrder(isReverseOrder); - } - - public DeletionTime applyToDeletion(DeletionTime deletionTime) - { - return super.applyToDeletion(deletionTime); - } - - public Row applyToRow(Row row) - { - return super.applyToRow(row); - } - - public RangeTombstoneMarker applyToMarker(RangeTombstoneMarker marker) - { - return super.applyToMarker(marker); - } - } - @SuppressWarnings("resource") // resultant iterators are closed by their callers InputCollector<UnfilteredRowIterator> iteratorsForPartition(ColumnFamilyStore.ViewFragment view) { - BiFunction<List<UnfilteredRowIterator>, RepairedDataInfo, UnfilteredRowIterator> merge = - (unfilteredRowIterators, repairedDataInfo) -> - withRepairedDataInfo(UnfilteredRowIterators.merge(unfilteredRowIterators), repairedDataInfo); + final BiFunction<List<UnfilteredRowIterator>, RepairedDataInfo, UnfilteredRowIterator> merge = + (unfilteredRowIterators, repairedDataInfo) -> { + UnfilteredRowIterator repaired = UnfilteredRowIterators.merge(unfilteredRowIterators); + return repairedDataInfo.withRepairedDataInfo(repaired); + }; - return new InputCollector<>(view, repairedDataInfo, merge, isTrackingRepairedStatus()); + // For single partition reads, after reading up to the command's DataLimit nothing extra is required. + // The merged & repaired row iterator will be consumed until it's exhausted or the RepairedDataInfo's + // internal counter is satisfied + final Function<UnfilteredRowIterator, UnfilteredPartitionIterator> postLimitPartitions = + (rows) -> EmptyIterators.unfilteredPartition(metadata()); + return new InputCollector<>(view, repairedDataInfo, merge, postLimitPartitions, isTrackingRepairedStatus()); } @SuppressWarnings("resource") // resultant iterators are closed by their callers InputCollector<UnfilteredPartitionIterator> iteratorsForRange(ColumnFamilyStore.ViewFragment view) { - BiFunction<List<UnfilteredPartitionIterator>, RepairedDataInfo, UnfilteredPartitionIterator> merge = - (unfilteredPartitionIterators, repairedDataInfo) -> - withRepairedDataInfo(UnfilteredPartitionIterators.merge(unfilteredPartitionIterators, UnfilteredPartitionIterators.MergeListener.NOOP), repairedDataInfo); + final BiFunction<List<UnfilteredPartitionIterator>, RepairedDataInfo, UnfilteredPartitionIterator> merge = + (unfilteredPartitionIterators, repairedDataInfo) -> { + UnfilteredPartitionIterator repaired = UnfilteredPartitionIterators.merge(unfilteredPartitionIterators, + NOOP); + return repairedDataInfo.withRepairedDataInfo(repaired); + }; - return new InputCollector<>(view, repairedDataInfo, merge, isTrackingRepairedStatus()); + // Uses identity function to provide additional partitions to be consumed after the command's + // DataLimits are satisfied. The input to the function will be the iterator of merged, repaired partitions + // which we'll keep reading until the RepairedDataInfo's internal counter is satisfied. + return new InputCollector<>(view, repairedDataInfo, merge, Function.identity(), isTrackingRepairedStatus()); } /** @@ -988,12 +783,14 @@ public abstract class ReadCommand extends AbstractReadQuery private final boolean isTrackingRepairedStatus; Set<SSTableReader> repairedSSTables; BiFunction<List<T>, RepairedDataInfo, T> repairedMerger; + Function<T, UnfilteredPartitionIterator> postLimitAdditionalPartitions; List<T> repairedIters; List<T> unrepairedIters; InputCollector(ColumnFamilyStore.ViewFragment view, RepairedDataInfo repairedDataInfo, BiFunction<List<T>, RepairedDataInfo, T> repairedMerger, + Function<T, UnfilteredPartitionIterator> postLimitAdditionalPartitions, boolean isTrackingRepairedStatus) { this.repairedDataInfo = repairedDataInfo; @@ -1023,6 +820,7 @@ public abstract class ReadCommand extends AbstractReadQuery unrepairedIters = new ArrayList<>((view.sstables.size() - repairedSSTables.size()) + Iterables.size(view.memtables) + 1); } this.repairedMerger = repairedMerger; + this.postLimitAdditionalPartitions = postLimitAdditionalPartitions; } void addMemtableIterator(T iter) @@ -1038,15 +836,17 @@ public abstract class ReadCommand extends AbstractReadQuery unrepairedIters.add(iter); } + @SuppressWarnings("resource") // the returned iterators are closed by the caller List<T> finalizeIterators(ColumnFamilyStore cfs, int nowInSec, int oldestUnrepairedTombstone) { if (repairedIters.isEmpty()) return unrepairedIters; // merge the repaired data before returning, wrapping in a digest generator - RepairedDataPurger purger = new RepairedDataPurger(cfs, nowInSec, oldestUnrepairedTombstone); - repairedDataInfo.setPurger(purger); - unrepairedIters.add(repairedMerger.apply(repairedIters, repairedDataInfo)); + repairedDataInfo.prepare(cfs, nowInSec, oldestUnrepairedTombstone); + T repairedIter = repairedMerger.apply(repairedIters, repairedDataInfo); + repairedDataInfo.finalize(postLimitAdditionalPartitions.apply(repairedIter)); + unrepairedIters.add(repairedIter); return unrepairedIters; } diff --git a/src/java/org/apache/cassandra/db/RepairedDataInfo.java b/src/java/org/apache/cassandra/db/RepairedDataInfo.java new file mode 100644 index 0000000..be636d3 --- /dev/null +++ b/src/java/org/apache/cassandra/db/RepairedDataInfo.java @@ -0,0 +1,336 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db; + +import java.nio.ByteBuffer; +import java.util.function.LongPredicate; +import java.util.concurrent.TimeUnit; + +import org.apache.cassandra.db.filter.DataLimits; +import org.apache.cassandra.db.partitions.PurgeFunction; +import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator; +import org.apache.cassandra.db.rows.RangeTombstoneMarker; +import org.apache.cassandra.db.rows.Row; +import org.apache.cassandra.db.rows.UnfilteredRowIterator; +import org.apache.cassandra.db.transform.MoreRows; +import org.apache.cassandra.db.transform.Transformation; +import org.apache.cassandra.metrics.TableMetrics; +import org.apache.cassandra.tracing.Tracing; +import org.apache.cassandra.utils.ByteBufferUtil; + +class RepairedDataInfo +{ + public static final RepairedDataInfo NULL_REPAIRED_DATA_INFO = new RepairedDataInfo(null) + { + boolean isConclusive(){ return true; } + ByteBuffer getDigest(){ return ByteBufferUtil.EMPTY_BYTE_BUFFER; } + }; + + // Keeps a digest of the partition currently being processed. Since we won't know + // whether a partition will be fully purged from a read result until it's been + // consumed, we buffer this per-partition digest and add it to the final digest + // when the partition is closed (if it wasn't fully purged). + private Digest perPartitionDigest; + private Digest perCommandDigest; + private boolean isConclusive = true; + private ByteBuffer calculatedDigest = null; + + // Doesn't actually purge from the underlying iterators, but excludes from the digest + // the purger can't be initialized until we've iterated all the sstables for the query + // as it requires the oldest repaired tombstone + private RepairedDataPurger purger; + private boolean isFullyPurged = true; + + // Supplies additional partitions from the repaired data set to be consumed when the limit of + // executing ReadCommand has been reached. This is to ensure that each replica attempts to + // read the same amount of repaired data, otherwise comparisons of the repaired data digests + // may be invalidated by varying amounts of repaired data being present on each replica. + // This can't be initialized until after the underlying repaired iterators have been merged. + private UnfilteredPartitionIterator postLimitPartitions = null; + private final DataLimits.Counter repairedCounter; + private UnfilteredRowIterator currentPartition; + private TableMetrics metrics; + + public RepairedDataInfo(DataLimits.Counter repairedCounter) + { + this.repairedCounter = repairedCounter; + } + + ByteBuffer getDigest() + { + if (calculatedDigest != null) + return calculatedDigest; + + calculatedDigest = perCommandDigest == null + ? ByteBufferUtil.EMPTY_BYTE_BUFFER + : ByteBuffer.wrap(perCommandDigest.digest()); + + return calculatedDigest; + } + + void prepare(ColumnFamilyStore cfs, int nowInSec, int oldestUnrepairedTombstone) + { + this.purger = new RepairedDataPurger(cfs, nowInSec, oldestUnrepairedTombstone); + this.metrics = cfs.metric; + } + + void finalize(UnfilteredPartitionIterator postLimitPartitions) + { + this.postLimitPartitions = postLimitPartitions; + } + + boolean isConclusive() + { + return isConclusive; + } + + void markInconclusive() + { + isConclusive = false; + } + + private void onNewPartition(UnfilteredRowIterator partition) + { + assert purger != null; + purger.setCurrentKey(partition.partitionKey()); + purger.setIsReverseOrder(partition.isReverseOrder()); + this.currentPartition = partition; + } + + private Digest getPerPartitionDigest() + { + if (perPartitionDigest == null) + perPartitionDigest = Digest.forRepairedDataTracking(); + + return perPartitionDigest; + } + + public UnfilteredPartitionIterator withRepairedDataInfo(final UnfilteredPartitionIterator iterator) + { + class WithTracking extends Transformation<UnfilteredRowIterator> + { + protected UnfilteredRowIterator applyToPartition(UnfilteredRowIterator partition) + { + return withRepairedDataInfo(partition); + } + } + return Transformation.apply(iterator, new WithTracking()); + } + + public UnfilteredRowIterator withRepairedDataInfo(final UnfilteredRowIterator iterator) + { + class WithTracking extends Transformation<UnfilteredRowIterator> + { + protected DecoratedKey applyToPartitionKey(DecoratedKey key) + { + getPerPartitionDigest().update(key.getKey()); + return key; + } + + protected DeletionTime applyToDeletion(DeletionTime deletionTime) + { + if (repairedCounter.isDone()) + return deletionTime; + + assert purger != null; + DeletionTime purged = purger.applyToDeletion(deletionTime); + if (!purged.isLive()) + isFullyPurged = false; + purged.digest(getPerPartitionDigest()); + return deletionTime; + } + + protected RangeTombstoneMarker applyToMarker(RangeTombstoneMarker marker) + { + if (repairedCounter.isDone()) + return marker; + + assert purger != null; + RangeTombstoneMarker purged = purger.applyToMarker(marker); + if (purged != null) + { + isFullyPurged = false; + purged.digest(getPerPartitionDigest()); + } + return marker; + } + + protected Row applyToStatic(Row row) + { + if (repairedCounter.isDone()) + return row; + + assert purger != null; + Row purged = purger.applyToRow(row); + if (!purged.isEmpty()) + { + isFullyPurged = false; + purged.digest(getPerPartitionDigest()); + } + return row; + } + + protected Row applyToRow(Row row) + { + if (repairedCounter.isDone()) + return row; + + assert purger != null; + Row purged = purger.applyToRow(row); + if (purged != null) + { + isFullyPurged = false; + purged.digest(getPerPartitionDigest()); + } + return row; + } + + protected void onPartitionClose() + { + if (perPartitionDigest != null) + { + // If the partition wasn't completely emptied by the purger, + // calculate the digest for the partition and use it to + // update the overall digest + if (!isFullyPurged) + { + if (perCommandDigest == null) + perCommandDigest = Digest.forRepairedDataTracking(); + + byte[] partitionDigest = perPartitionDigest.digest(); + perCommandDigest.update(partitionDigest, 0, partitionDigest.length); + } + + perPartitionDigest = null; + } + isFullyPurged = true; + } + } + + if (repairedCounter.isDone()) + return iterator; + + UnfilteredRowIterator tracked = repairedCounter.applyTo(Transformation.apply(iterator, new WithTracking())); + onNewPartition(tracked); + return tracked; + } + + public UnfilteredPartitionIterator extend(final UnfilteredPartitionIterator partitions, + final DataLimits.Counter limit) + { + class OverreadRepairedData extends Transformation<UnfilteredRowIterator> implements MoreRows<UnfilteredRowIterator> + { + + protected UnfilteredRowIterator applyToPartition(UnfilteredRowIterator partition) + { + return MoreRows.extend(partition, this, partition.columns()); + } + + public UnfilteredRowIterator moreContents() + { + // We don't need to do anything until the DataLimits of the + // of the read have been reached + if (!limit.isDone() || repairedCounter.isDone()) + return null; + + long countBeforeOverreads = repairedCounter.counted(); + long overreadStartTime = System.nanoTime(); + if (currentPartition != null) + consumePartition(currentPartition, repairedCounter); + + if (postLimitPartitions != null) + while (postLimitPartitions.hasNext() && !repairedCounter.isDone()) + consumePartition(postLimitPartitions.next(), repairedCounter); + + // we're not actually providing any more rows, just consuming the repaired data + long rows = repairedCounter.counted() - countBeforeOverreads; + long nanos = System.nanoTime() - overreadStartTime; + metrics.repairedDataTrackingOverreadRows.update(rows); + metrics.repairedDataTrackingOverreadTime.update(nanos, TimeUnit.NANOSECONDS); + Tracing.trace("Read {} additional rows of repaired data for tracking in {}ps", rows, TimeUnit.NANOSECONDS.toMicros(nanos)); + return null; + } + + private void consumePartition(UnfilteredRowIterator partition, DataLimits.Counter counter) + { + if (partition == null) + return; + + while (!counter.isDone() && partition.hasNext()) + partition.next(); + + partition.close(); + } + } + // If the read didn't touch any sstables prepare() hasn't been called and + // we can skip this transformation + if (metrics == null || repairedCounter.isDone()) + return partitions; + return Transformation.apply(partitions, new OverreadRepairedData()); + } + + /** + * Although PurgeFunction extends Transformation, this is never applied to an iterator. + * Instead, it is used by RepairedDataInfo during the generation of a repaired data + * digest to exclude data which will actually be purged later on in the read pipeline. + */ + private static class RepairedDataPurger extends PurgeFunction + { + RepairedDataPurger(ColumnFamilyStore cfs, + int nowInSec, + int oldestUnrepairedTombstone) + { + super(nowInSec, + cfs.gcBefore(nowInSec), + oldestUnrepairedTombstone, + cfs.getCompactionStrategyManager().onlyPurgeRepairedTombstones(), + cfs.metadata.get().enforceStrictLiveness()); + } + + protected LongPredicate getPurgeEvaluator() + { + return (time) -> true; + } + + void setCurrentKey(DecoratedKey key) + { + super.onNewPartition(key); + } + + void setIsReverseOrder(boolean isReverseOrder) + { + super.setReverseOrder(isReverseOrder); + } + + public DeletionTime applyToDeletion(DeletionTime deletionTime) + { + return super.applyToDeletion(deletionTime); + } + + public Row applyToRow(Row row) + { + return super.applyToRow(row); + } + + public RangeTombstoneMarker applyToMarker(RangeTombstoneMarker marker) + { + return super.applyToMarker(marker); + } + } +} diff --git a/src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java b/src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java index ef3338e..9c45dc0 100644 --- a/src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java +++ b/src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java @@ -142,6 +142,15 @@ public class KeyspaceMetrics */ public final Meter unconfirmedRepairedInconsistencies; + /** + * Tracks the amount overreading of repaired data replicas perform in order to produce digests + * at query time. For each query, on a full data read following an initial digest mismatch, the replicas + * may read extra repaired data, up to the DataLimit of the command, so that the coordinator can compare + * the repaired data on each replica. These are tracked on each replica. + */ + public final Histogram repairedDataTrackingOverreadRows; + public final Timer repairedDataTrackingOverreadTime; + public final MetricNameFactory factory; private Keyspace keyspace; @@ -305,6 +314,9 @@ public class KeyspaceMetrics confirmedRepairedInconsistencies = Metrics.meter(factory.createMetricName("RepairedDataInconsistenciesConfirmed")); unconfirmedRepairedInconsistencies = Metrics.meter(factory.createMetricName("RepairedDataInconsistenciesUnconfirmed")); + + repairedDataTrackingOverreadRows = Metrics.histogram(factory.createMetricName("RepairedOverreadRows"), false); + repairedDataTrackingOverreadTime = Metrics.timer(factory.createMetricName("RepairedOverreadTime")); } /** diff --git a/src/java/org/apache/cassandra/metrics/TableMetrics.java b/src/java/org/apache/cassandra/metrics/TableMetrics.java index 6095f50..775d87c 100644 --- a/src/java/org/apache/cassandra/metrics/TableMetrics.java +++ b/src/java/org/apache/cassandra/metrics/TableMetrics.java @@ -31,6 +31,11 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; import java.util.function.Predicate; +import com.google.common.collect.Iterables; +import com.google.common.collect.Maps; + +import com.codahale.metrics.Timer; + import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.Keyspace; import org.apache.cassandra.db.Memtable; @@ -52,9 +57,6 @@ import com.codahale.metrics.Histogram; import com.codahale.metrics.Meter; import com.codahale.metrics.Metric; import com.codahale.metrics.RatioGauge; -import com.codahale.metrics.Timer; -import com.google.common.collect.Iterables; -import com.google.common.collect.Maps; /** * Metrics for {@link ColumnFamilyStore}. @@ -233,6 +235,13 @@ public class TableMetrics // be part of the repaired set whilst others do not. public final TableMeter unconfirmedRepairedInconsistencies; + // Tracks the amount overreading of repaired data replicas perform in order to produce digests + // at query time. For each query, on a full data read following an initial digest mismatch, the replicas + // may read extra repaired data, up to the DataLimit of the command, so that the coordinator can compare + // the repaired data on each replica. These are tracked on each replica. + public final TableHistogram repairedDataTrackingOverreadRows; + public final TableTimer repairedDataTrackingOverreadTime; + public final static LatencyMetrics globalReadLatency = new LatencyMetrics(globalFactory, globalAliasFactory, "Read"); public final static LatencyMetrics globalWriteLatency = new LatencyMetrics(globalFactory, globalAliasFactory, "Write"); public final static LatencyMetrics globalRangeLatency = new LatencyMetrics(globalFactory, globalAliasFactory, "Range"); @@ -946,6 +955,9 @@ public class TableMetrics confirmedRepairedInconsistencies = createTableMeter("RepairedDataInconsistenciesConfirmed", cfs.keyspace.metric.confirmedRepairedInconsistencies); unconfirmedRepairedInconsistencies = createTableMeter("RepairedDataInconsistenciesUnconfirmed", cfs.keyspace.metric.unconfirmedRepairedInconsistencies); + repairedDataTrackingOverreadRows = createTableHistogram("RepairedDataTrackingOverreadRows", cfs.keyspace.metric.repairedDataTrackingOverreadRows, false); + repairedDataTrackingOverreadTime = createTableTimer("RepairedDataTrackingOverreadTime", cfs.keyspace.metric.repairedDataTrackingOverreadTime); + unleveledSSTables = createTableGauge("UnleveledSSTables", cfs::getUnleveledSSTables, () -> { // global gauge int cnt = 0; diff --git a/test/distributed/org/apache/cassandra/distributed/test/RepairDigestTrackingTest.java b/test/distributed/org/apache/cassandra/distributed/test/RepairDigestTrackingTest.java index 664c99d..4b382a1 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/RepairDigestTrackingTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/RepairDigestTrackingTest.java @@ -25,6 +25,8 @@ import java.util.EnumSet; import java.util.Iterator; import java.util.Map; import java.util.concurrent.TimeUnit; +import java.util.stream.IntStream; +import java.util.stream.Stream; import com.google.common.util.concurrent.Uninterruptibles; import org.junit.Assert; @@ -45,6 +47,9 @@ import org.apache.cassandra.service.ActiveRepairService; import org.apache.cassandra.service.SnapshotVerbHandler; import org.apache.cassandra.service.StorageProxy; +import static org.apache.cassandra.distributed.shared.AssertUtils.assertRows; +import static org.junit.Assert.fail; + public class RepairDigestTrackingTest extends TestBaseImpl { private static final String TABLE = "tbl"; @@ -76,7 +81,7 @@ public class RepairDigestTrackingTest extends TestBaseImpl i, i, i); } cluster.forEach(i -> i.flush(KEYSPACE)); - cluster.forEach(i -> assertNotRepaired()); + cluster.forEach(i -> i.runOnInstance(assertNotRepaired())); // mark everything on node 2 repaired cluster.get(2).runOnInstance(markAllRepaired()); @@ -120,10 +125,10 @@ public class RepairDigestTrackingTest extends TestBaseImpl cluster.forEach(i -> i.flush(KEYSPACE)); // nothing is repaired yet - cluster.forEach(i -> assertNotRepaired()); + cluster.forEach(i -> i.runOnInstance(assertNotRepaired())); // mark everything repaired - cluster.forEach(i -> markAllRepaired()); - cluster.forEach(i -> assertRepaired()); + cluster.forEach(i -> i.runOnInstance(markAllRepaired())); + cluster.forEach(i -> i.runOnInstance(assertRepaired())); // now overwrite on node2 only to generate digest mismatches, but don't flush so the repaired dataset is not affected for (int i = 0; i < 10; i++) @@ -198,6 +203,162 @@ public class RepairDigestTrackingTest extends TestBaseImpl } } + @Test + public void testRepairedReadCountNormalizationWithInitialUnderread() throws Throwable + { + // Asserts that the amount of repaired data read for digest generation is consistent + // across replicas where one has to read less repaired data to satisfy the original + // limits of the read request. + try (Cluster cluster = init(Cluster.create(2))) + { + + cluster.get(1).runOnInstance(() -> { + StorageProxy.instance.enableRepairedDataTrackingForRangeReads(); + StorageProxy.instance.enableRepairedDataTrackingForPartitionReads(); + }); + + cluster.schemaChange("CREATE TABLE " + KS_TABLE + " (k INT, c INT, v1 INT, PRIMARY KEY (k,c)) " + + "WITH CLUSTERING ORDER BY (c DESC)"); + + // insert data on both nodes and flush + for (int i=0; i<20; i++) + { + cluster.coordinator(1).execute("INSERT INTO " + KS_TABLE + " (k, c, v1) VALUES (0, ?, ?) USING TIMESTAMP 0", + ConsistencyLevel.ALL, i, i); + cluster.coordinator(1).execute("INSERT INTO " + KS_TABLE + " (k, c, v1) VALUES (1, ?, ?) USING TIMESTAMP 1", + ConsistencyLevel.ALL, i, i); + } + cluster.forEach(c -> c.flush(KEYSPACE)); + // nothing is repaired yet + cluster.forEach(i -> i.runOnInstance(assertNotRepaired())); + // mark everything repaired + cluster.forEach(i -> i.runOnInstance(markAllRepaired())); + cluster.forEach(i -> i.runOnInstance(assertRepaired())); + + // Add some unrepaired data to both nodes + for (int i=20; i<30; i++) + { + cluster.coordinator(1).execute("INSERT INTO " + KS_TABLE + " (k, c, v1) VALUES (1, ?, ?) USING TIMESTAMP 1", + ConsistencyLevel.ALL, i, i); + } + // And some more unrepaired data to node2 only. This causes node2 to read less repaired data than node1 + // when satisfying the limits of the read. So node2 needs to overread more repaired data than node1 when + // calculating the repaired data digest. + cluster.get(2).executeInternal("INSERT INTO " + KS_TABLE + " (k, c, v1) VALUES (1, ?, ?) USING TIMESTAMP 1", 30, 30); + + // Verify single partition read + long ccBefore = getConfirmedInconsistencies(cluster.get(1)); + assertRows(cluster.coordinator(1).execute("SELECT * FROM " + KS_TABLE + " WHERE k=1 LIMIT 20", ConsistencyLevel.ALL), + rows(1, 30, 11)); + long ccAfterPartitionRead = getConfirmedInconsistencies(cluster.get(1)); + + // Recreate a mismatch in unrepaired data and verify partition range read + cluster.get(2).executeInternal("INSERT INTO " + KS_TABLE + " (k, c, v1) VALUES (1, ?, ?)", 31, 31); + assertRows(cluster.coordinator(1).execute("SELECT * FROM " + KS_TABLE + " LIMIT 30", ConsistencyLevel.ALL), + rows(1, 31, 2)); + long ccAfterRangeRead = getConfirmedInconsistencies(cluster.get(1)); + + if (ccAfterPartitionRead != ccAfterRangeRead) + if (ccAfterPartitionRead != ccBefore) + fail("Both range and partition reads reported data inconsistencies but none were expected"); + else + fail("Reported inconsistency during range read but none were expected"); + else if (ccAfterPartitionRead != ccBefore) + fail("Reported inconsistency during partition read but none were expected"); + } + } + + @Test + public void testRepairedReadCountNormalizationWithInitialOverread() throws Throwable + { + // Asserts that the amount of repaired data read for digest generation is consistent + // across replicas where one has to read more repaired data to satisfy the original + // limits of the read request. + try (Cluster cluster = init(Cluster.create(2))) + { + + cluster.get(1).runOnInstance(() -> { + StorageProxy.instance.enableRepairedDataTrackingForRangeReads(); + StorageProxy.instance.enableRepairedDataTrackingForPartitionReads(); + }); + + cluster.schemaChange("CREATE TABLE " + KS_TABLE + " (k INT, c INT, v1 INT, PRIMARY KEY (k,c)) " + + "WITH CLUSTERING ORDER BY (c DESC)"); + + // insert data on both nodes and flush + for (int i=0; i<10; i++) + { + cluster.coordinator(1).execute("INSERT INTO " + KS_TABLE + " (k, c, v1) VALUES (0, ?, ?) USING TIMESTAMP 0", + ConsistencyLevel.ALL, i, i); + cluster.coordinator(1).execute("INSERT INTO " + KS_TABLE + " (k, c, v1) VALUES (1, ?, ?) USING TIMESTAMP 1", + ConsistencyLevel.ALL, i, i); + } + cluster.forEach(c -> c.flush(KEYSPACE)); + // nothing is repaired yet + cluster.forEach(i -> i.runOnInstance(assertNotRepaired())); + // mark everything repaired + cluster.forEach(i -> i.runOnInstance(markAllRepaired())); + cluster.forEach(i -> i.runOnInstance(assertRepaired())); + + // Add some unrepaired data to both nodes + for (int i=10; i<13; i++) + { + cluster.coordinator(1).execute("INSERT INTO " + KS_TABLE + " (k, c, v1) VALUES (0, ?, ?) USING TIMESTAMP 1", + ConsistencyLevel.ALL, i, i); + cluster.coordinator(1).execute("INSERT INTO " + KS_TABLE + " (k, c, v1) VALUES (1, ?, ?) USING TIMESTAMP 1", + ConsistencyLevel.ALL, i, i); + } + cluster.forEach(c -> c.flush(KEYSPACE)); + // And some row deletions on node2 only which cover data in the repaired set + // This will cause node2 to read more repaired data in satisfying the limit of the read request + // so it should overread less than node1 (in fact, it should not overread at all) in order to + // calculate the repaired data digest. + for (int i=7; i<10; i++) + { + cluster.get(2).executeInternal("DELETE FROM " + KS_TABLE + " USING TIMESTAMP 2 WHERE k = 0 AND c = ?", i); + cluster.get(2).executeInternal("DELETE FROM " + KS_TABLE + " USING TIMESTAMP 2 WHERE k = 1 AND c = ?", i); + } + + // Verify single partition read + long ccBefore = getConfirmedInconsistencies(cluster.get(1)); + assertRows(cluster.coordinator(1).execute("SELECT * FROM " + KS_TABLE + " WHERE k=0 LIMIT 5", ConsistencyLevel.ALL), + rows(rows(0, 12, 10), rows(0, 6, 5))); + long ccAfterPartitionRead = getConfirmedInconsistencies(cluster.get(1)); + + // Verify partition range read + assertRows(cluster.coordinator(1).execute("SELECT * FROM " + KS_TABLE + " LIMIT 11", ConsistencyLevel.ALL), + rows(rows(1, 12, 10), rows(1, 6, 0), rows(0, 12, 12))); + long ccAfterRangeRead = getConfirmedInconsistencies(cluster.get(1)); + + if (ccAfterPartitionRead != ccAfterRangeRead) + if (ccAfterPartitionRead != ccBefore) + fail("Both range and partition reads reported data inconsistencies but none were expected"); + else + fail("Reported inconsistency during range read but none were expected"); + else if (ccAfterPartitionRead != ccBefore) + fail("Reported inconsistency during partition read but none were expected"); + } + } + + private Object[][] rows(Object[][] head, Object[][]...tail) + { + return Stream.concat(Stream.of(head), + Stream.of(tail).flatMap(Stream::of)) + .toArray(Object[][]::new); + } + + private Object[][] rows(int partitionKey, int start, int end) + { + if (start == end) + return new Object[][] { new Object[] { partitionKey, start, end } }; + + IntStream clusterings = start > end + ? IntStream.range(end -1, start).map(i -> start - i + end - 1) + : IntStream.range(start, end); + + return clusterings.mapToObj(i -> new Object[] {partitionKey, i, i}).toArray(Object[][]::new); + } + private IIsolatedExecutor.SerializableRunnable assertNotRepaired() { return () -> diff --git a/test/unit/org/apache/cassandra/db/ReadCommandTest.java b/test/unit/org/apache/cassandra/db/ReadCommandTest.java index e0215b7..0824168 100644 --- a/test/unit/org/apache/cassandra/db/ReadCommandTest.java +++ b/test/unit/org/apache/cassandra/db/ReadCommandTest.java @@ -23,6 +23,8 @@ import java.io.OutputStream; import java.net.UnknownHostException; import java.nio.ByteBuffer; import java.util.*; +import java.util.function.Supplier; +import java.util.stream.IntStream; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Lists; @@ -41,6 +43,8 @@ import org.apache.cassandra.db.filter.RowFilter; import org.apache.cassandra.db.marshal.AsciiType; import org.apache.cassandra.db.marshal.BytesType; import org.apache.cassandra.db.marshal.CounterColumnType; +import org.apache.cassandra.db.marshal.Int32Type; +import org.apache.cassandra.db.marshal.ReversedType; import org.apache.cassandra.db.marshal.SetType; import org.apache.cassandra.db.partitions.*; import org.apache.cassandra.db.rows.Row; @@ -61,6 +65,7 @@ import org.apache.cassandra.locator.EndpointsForToken; import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.locator.ReplicaUtils; import org.apache.cassandra.net.Message; +import org.apache.cassandra.metrics.ClearableHistogram; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.net.Verb; import org.apache.cassandra.repair.consistent.LocalSessionAccessor; @@ -94,6 +99,7 @@ public class ReadCommandTest private static final String CF6 = "Standard6"; private static final String CF7 = "Counter7"; private static final String CF8 = "Standard8"; + private static final String CF9 = "Standard9"; private static final InetAddressAndPort REPAIR_COORDINATOR; static { @@ -178,6 +184,12 @@ public class ReadCommandTest .addRegularColumn("b", AsciiType.instance) .addRegularColumn("c", SetType.getInstance(AsciiType.instance, true)); + TableMetadata.Builder metadata9 = + TableMetadata.builder(KEYSPACE, CF9) + .addPartitionKeyColumn("key", Int32Type.instance) + .addClusteringColumn("col", ReversedType.getInstance(Int32Type.instance)) + .addRegularColumn("a", AsciiType.instance); + SchemaLoader.prepareServer(); SchemaLoader.createKeyspace(KEYSPACE, KeyspaceParams.simple(1), @@ -188,7 +200,8 @@ public class ReadCommandTest metadata5, metadata6, metadata7, - metadata8); + metadata8, + metadata9); LocalSessionAccessor.startup(); } @@ -823,14 +836,113 @@ public class ReadCommandTest } } + @Test + public void testRepairedDataOverreadMetrics() + { + ColumnFamilyStore cfs = Keyspace.open(KEYSPACE).getColumnFamilyStore(CF9); + cfs.truncateBlocking(); + cfs.disableAutoCompaction(); + cfs.metadata().withSwapped(cfs.metadata().params.unbuild() + .caching(CachingParams.CACHE_NOTHING) + .build()); + // Insert and repair + insert(cfs, IntStream.range(0, 10), () -> IntStream.range(0, 10)); + cfs.forceBlockingFlush(); + cfs.getLiveSSTables().forEach(sstable -> mutateRepaired(cfs, sstable, 111, null)); + // Insert and leave unrepaired + insert(cfs, IntStream.range(0, 10), () -> IntStream.range(10, 20)); + + // Single partition reads + int limit = 5; + ReadCommand cmd = Util.cmd(cfs, ByteBufferUtil.bytes(0)).withLimit(limit).build(); + assertEquals(0, getAndResetOverreadCount(cfs)); + + // No overreads if not tracking + readAndCheckRowCount(Collections.singletonList(Util.getOnlyPartition(cmd)), limit); + assertEquals(0, getAndResetOverreadCount(cfs)); + + // Overread up to (limit - 1) if tracking is enabled + cmd = cmd.copy(); + cmd.trackRepairedStatus(); + readAndCheckRowCount(Collections.singletonList(Util.getOnlyPartition(cmd)), limit); + // overread count is always < limit as the first read is counted during merging (and so is expected) + assertEquals(limit - 1, getAndResetOverreadCount(cfs)); + + // if limit already requires reading all repaired data, no overreads should be recorded + limit = 20; + cmd = Util.cmd(cfs, ByteBufferUtil.bytes(0)).withLimit(limit).build(); + readAndCheckRowCount(Collections.singletonList(Util.getOnlyPartition(cmd)), limit); + assertEquals(0, getAndResetOverreadCount(cfs)); + + // Range reads + limit = 5; + cmd = Util.cmd(cfs).withLimit(limit).build(); + assertEquals(0, getAndResetOverreadCount(cfs)); + // No overreads if not tracking + readAndCheckRowCount(Util.getAll(cmd), limit); + assertEquals(0, getAndResetOverreadCount(cfs)); + + // Overread up to (limit - 1) if tracking is enabled + cmd = cmd.copy(); + cmd.trackRepairedStatus(); + readAndCheckRowCount(Util.getAll(cmd), limit); + assertEquals(limit - 1, getAndResetOverreadCount(cfs)); + + // if limit already requires reading all repaired data, no overreads should be recorded + limit = 100; + cmd = Util.cmd(cfs).withLimit(limit).build(); + readAndCheckRowCount(Util.getAll(cmd), limit); + assertEquals(0, getAndResetOverreadCount(cfs)); + } + private void setGCGrace(ColumnFamilyStore cfs, int gcGrace) { TableParams newParams = cfs.metadata().params.unbuild().gcGraceSeconds(gcGrace).build(); KeyspaceMetadata keyspaceMetadata = Schema.instance.getKeyspaceMetadata(cfs.metadata().keyspace); Schema.instance.load( - keyspaceMetadata.withSwapped( - keyspaceMetadata.tables.withSwapped( - cfs.metadata().withSwapped(newParams)))); + keyspaceMetadata.withSwapped( + keyspaceMetadata.tables.withSwapped( + cfs.metadata().withSwapped(newParams)))); + } + + private long getAndResetOverreadCount(ColumnFamilyStore cfs) + { + // always clear the histogram after reading to make comparisons & asserts easier + long rows = cfs.metric.repairedDataTrackingOverreadRows.cf.getSnapshot().getMax(); + ((ClearableHistogram)cfs.metric.repairedDataTrackingOverreadRows.cf).clear(); + return rows; + } + + private void readAndCheckRowCount(Iterable<FilteredPartition> partitions, int expected) + { + int count = 0; + for (Partition partition : partitions) + { + assertFalse(partition.isEmpty()); + try (UnfilteredRowIterator iter = partition.unfilteredIterator()) + { + while (iter.hasNext()) + { + iter.next(); + count++; + } + } + } + assertEquals(expected, count); + } + + private void insert(ColumnFamilyStore cfs, IntStream partitionIds, Supplier<IntStream> rowIds) + { + partitionIds.mapToObj(ByteBufferUtil::bytes) + .forEach( pk -> + rowIds.get().forEach( c -> + new RowUpdateBuilder(cfs.metadata(), 0, pk) + .clustering(c) + .add("a", ByteBufferUtil.bytes("abcd")) + .build() + .apply() + + )); } private void assertDigestsDiffer(ByteBuffer b0, ByteBuffer b1) diff --git a/test/unit/org/apache/cassandra/db/RepairedDataInfoTest.java b/test/unit/org/apache/cassandra/db/RepairedDataInfoTest.java new file mode 100644 index 0000000..00a1f56 --- /dev/null +++ b/test/unit/org/apache/cassandra/db/RepairedDataInfoTest.java @@ -0,0 +1,303 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db; + +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.Iterator; +import java.util.stream.IntStream; + +import org.junit.BeforeClass; +import org.junit.Test; + +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.db.commitlog.CommitLog; +import org.apache.cassandra.db.marshal.Int32Type; +import org.apache.cassandra.schema.MockSchema; +import org.apache.cassandra.db.filter.DataLimits; +import org.apache.cassandra.db.marshal.AbstractType; +import org.apache.cassandra.db.marshal.UTF8Type; +import org.apache.cassandra.db.partitions.AbstractUnfilteredPartitionIterator; +import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator; +import org.apache.cassandra.db.rows.AbstractUnfilteredRowIterator; +import org.apache.cassandra.db.rows.BTreeRow; +import org.apache.cassandra.db.rows.BufferCell; +import org.apache.cassandra.db.rows.Cell; +import org.apache.cassandra.db.rows.EncodingStats; +import org.apache.cassandra.db.rows.RangeTombstoneBoundMarker; +import org.apache.cassandra.db.rows.Row; +import org.apache.cassandra.db.rows.Rows; +import org.apache.cassandra.db.rows.Unfiltered; +import org.apache.cassandra.db.rows.UnfilteredRowIterator; +import org.apache.cassandra.schema.ColumnMetadata; +import org.apache.cassandra.schema.TableMetadata; +import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.Hex; + +import static org.apache.cassandra.Util.clustering; +import static org.apache.cassandra.Util.dk; +import static org.apache.cassandra.utils.ByteBufferUtil.*; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; + +public class RepairedDataInfoTest +{ + private static ColumnFamilyStore cfs; + private static TableMetadata metadata; + private static ColumnMetadata valueMetadata; + private static ColumnMetadata staticMetadata; + + private final int nowInSec = FBUtilities.nowInSeconds(); + + @BeforeClass + public static void setUp() + { + DatabaseDescriptor.daemonInitialization(); + CommitLog.instance.start(); + MockSchema.cleanup(); + String ks = "repaired_data_info_test"; + cfs = MockSchema.newCFS(ks, metadata -> metadata.addStaticColumn("s", UTF8Type.instance)); + metadata = cfs.metadata(); + valueMetadata = metadata.regularColumns().getSimple(0); + staticMetadata = metadata.staticColumns().getSimple(0); + } + + @Test + public void withTrackingAppliesRepairedDataCounter() + { + DataLimits.Counter counter = DataLimits.cqlLimits(15).newCounter(nowInSec, false, false, false).onlyCount(); + RepairedDataInfo info = new RepairedDataInfo(counter); + info.prepare(cfs, nowInSec, Integer.MAX_VALUE); + UnfilteredRowIterator[] partitions = new UnfilteredRowIterator[3]; + for (int i=0; i<3; i++) + partitions[i] = partition(bytes(i), rows(0, 5, nowInSec)); + + UnfilteredPartitionIterator iter = partitions(partitions); + iter = info.withRepairedDataInfo(iter); + consume(iter); + + assertEquals(15, counter.counted()); + assertEquals(5, counter.countedInCurrentPartition()); + } + + @Test + public void digestOfSinglePartitionWithSingleRowAndEmptyStaticRow() + { + Digest manualDigest = Digest.forRepairedDataTracking(); + Row[] rows = rows(0, 1, nowInSec); + UnfilteredRowIterator partition = partition(bytes(0), rows); + addToDigest(manualDigest, + partition.partitionKey().getKey(), + partition.partitionLevelDeletion(), + Rows.EMPTY_STATIC_ROW, + rows); + byte[] fromRepairedInfo = consume(partition); + assertArrayEquals(manualDigest.digest(), fromRepairedInfo); + } + + @Test + public void digestOfSinglePartitionWithMultipleRowsAndEmptyStaticRow() + { + Digest manualDigest = Digest.forRepairedDataTracking(); + Row[] rows = rows(0, 5, nowInSec); + UnfilteredRowIterator partition = partition(bytes(0), rows); + addToDigest(manualDigest, + partition.partitionKey().getKey(), + partition.partitionLevelDeletion(), + Rows.EMPTY_STATIC_ROW, + rows); + byte[] fromRepairedInfo = consume(partition); + assertArrayEquals(manualDigest.digest(), fromRepairedInfo); + } + + @Test + public void digestOfSinglePartitionWithMultipleRowsAndTombstones() + { + Digest manualDigest = Digest.forRepairedDataTracking(); + Unfiltered[] unfiltereds = new Unfiltered[] + { + open(0), close(0), + row(1, 1, nowInSec), + open(2), close(4), + row(5, 7, nowInSec) + }; + UnfilteredRowIterator partition = partition(bytes(0), unfiltereds); + addToDigest(manualDigest, + partition.partitionKey().getKey(), + partition.partitionLevelDeletion(), + Rows.EMPTY_STATIC_ROW, + unfiltereds); + byte[] fromRepairedInfo = consume(partition); + assertArrayEquals(manualDigest.digest(), fromRepairedInfo); + } + + @Test + public void digestOfMultiplePartitionsWithMultipleRowsAndNonEmptyStaticRows() + { + Digest manualDigest = Digest.forRepairedDataTracking(); + Row staticRow = staticRow(nowInSec); + Row[] rows = rows(0, 5, nowInSec); + UnfilteredRowIterator[] partitionsArray = new UnfilteredRowIterator[5]; + for (int i=0; i<5; i++) + { + UnfilteredRowIterator partition = partitionWithStaticRow(bytes(i), staticRow, rows); + partitionsArray[i] = partition; + addToDigest(manualDigest, + partition.partitionKey().getKey(), + partition.partitionLevelDeletion(), + staticRow, + rows); + } + + UnfilteredPartitionIterator partitions = partitions(partitionsArray); + byte[] fromRepairedInfo = consume(partitions); + assertArrayEquals(manualDigest.digest(), fromRepairedInfo); + } + + private RepairedDataInfo info() + { + return new RepairedDataInfo(DataLimits.NONE.newCounter(nowInSec, false, false, false)); + } + + private Digest addToDigest(Digest aggregate, + ByteBuffer partitionKey, + DeletionTime deletion, + Row staticRow, + Unfiltered...unfiltereds) + { + Digest perPartitionDigest = Digest.forRepairedDataTracking(); + if (!staticRow.isEmpty()) + staticRow.digest(perPartitionDigest); + perPartitionDigest.update(partitionKey); + deletion.digest(perPartitionDigest); + for (Unfiltered unfiltered : unfiltereds) + unfiltered.digest(perPartitionDigest); + byte[] rowDigestBytes = perPartitionDigest.digest(); + aggregate.update(rowDigestBytes, 0, rowDigestBytes.length); + return aggregate; + } + + private byte[] consume(UnfilteredPartitionIterator partitions) + { + RepairedDataInfo info = info(); + info.prepare(cfs, nowInSec, Integer.MAX_VALUE); + partitions.forEachRemaining(partition -> + { + try (UnfilteredRowIterator iter = info.withRepairedDataInfo(partition)) + { + iter.forEachRemaining(u -> {}); + } + }); + return getArray(info.getDigest()); + } + + private byte[] consume(UnfilteredRowIterator partition) + { + RepairedDataInfo info = info(); + info.prepare(cfs, nowInSec, Integer.MAX_VALUE); + try (UnfilteredRowIterator iter = info.withRepairedDataInfo(partition)) + { + iter.forEachRemaining(u -> {}); + } + return getArray(info.getDigest()); + } + + public static Cell cell(ColumnMetadata def, Object value) + { + ByteBuffer bb = value instanceof ByteBuffer ? (ByteBuffer)value : ((AbstractType)def.type).decompose(value); + return new BufferCell(def, 1L, BufferCell.NO_TTL, BufferCell.NO_DELETION_TIME, bb, null); + } + + private Row staticRow(int nowInSec) + { + Row.Builder builder = BTreeRow.unsortedBuilder(); + builder.newRow(Clustering.STATIC_CLUSTERING); + builder.addCell(cell(staticMetadata, "static value")); + return builder.build(); + } + + private Row row(int clustering, int value, int nowInSec) + { + Row.Builder builder = BTreeRow.unsortedBuilder(); + builder.newRow(clustering(metadata.comparator, Integer.toString(clustering))); + builder.addCell(cell(valueMetadata, Integer.toString(value))); + return builder.build(); + } + + private Row[] rows(int clusteringStart, int clusteringEnd, int nowInSec) + { + return IntStream.range(clusteringStart, clusteringEnd) + .mapToObj(v -> row(v, v, nowInSec)) + .toArray(Row[]::new); + } + + private RangeTombstoneBoundMarker open(int start) + { + return new RangeTombstoneBoundMarker( + ClusteringBound.create(ClusteringBound.boundKind(true, true), + new ByteBuffer[] { Clustering.make(Int32Type.instance.decompose(start)).get(0)}), + new DeletionTime(FBUtilities.timestampMicros(), FBUtilities.nowInSeconds())); + } + + private RangeTombstoneBoundMarker close(int close) + { + return new RangeTombstoneBoundMarker( + ClusteringBound.create(ClusteringBound.boundKind(false, true), + new ByteBuffer[] { Clustering.make(Int32Type.instance.decompose(close)).get(0)}), + new DeletionTime(FBUtilities.timestampMicros(), FBUtilities.nowInSeconds())); + } + + private UnfilteredRowIterator partition(ByteBuffer pk, Unfiltered... unfiltereds) + { + return partitionWithStaticRow(pk, Rows.EMPTY_STATIC_ROW, unfiltereds); + } + + private UnfilteredRowIterator partitionWithStaticRow(ByteBuffer pk, Row staticRow, Unfiltered... unfiltereds) + { + Iterator<Unfiltered> unfilteredIterator = Arrays.asList(unfiltereds).iterator(); + return new AbstractUnfilteredRowIterator(metadata, dk(pk), DeletionTime.LIVE, metadata.regularAndStaticColumns(), staticRow, false, EncodingStats.NO_STATS) { + protected Unfiltered computeNext() + { + return unfilteredIterator.hasNext() ? unfilteredIterator.next() : endOfData(); + } + }; + } + + private static UnfilteredPartitionIterator partitions(UnfilteredRowIterator...partitions) + { + Iterator<UnfilteredRowIterator> partitionsIter = Arrays.asList(partitions).iterator(); + return new AbstractUnfilteredPartitionIterator() + { + public TableMetadata metadata() + { + return metadata; + } + + public boolean hasNext() + { + return partitionsIter.hasNext(); + } + + public UnfilteredRowIterator next() + { + return partitionsIter.next(); + } + }; + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org