Repository: cassandra Updated Branches: refs/heads/10657 [created] 7dc4ae73f
Avoid skipped values when converting read iterator to mutation Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/1fc86961 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/1fc86961 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/1fc86961 Branch: refs/heads/10657 Commit: 1fc869614304cfa1c1ac0ff39d988874a638be98 Parents: 9a27d3f Author: Sylvain Lebresne <sylv...@datastax.com> Authored: Tue Dec 22 17:08:17 2015 +0100 Committer: Sylvain Lebresne <sylv...@datastax.com> Committed: Tue Dec 22 17:08:17 2015 +0100 ---------------------------------------------------------------------- .../db/SinglePartitionReadCommand.java | 2 +- .../cassandra/db/filter/ColumnFilter.java | 9 ++++ .../db/partitions/PartitionUpdate.java | 26 +++++++++-- .../org/apache/cassandra/db/rows/BTreeRow.java | 15 ++++++ .../cassandra/db/rows/ComplexColumnData.java | 5 ++ src/java/org/apache/cassandra/db/rows/Row.java | 9 ++++ .../apache/cassandra/db/rows/RowIterators.java | 17 +++++++ .../db/rows/UnfilteredRowIterators.java | 17 +++++++ .../db/rows/WithoutSkippedValuesFunction.java | 48 ++++++++++++++++++++ .../apache/cassandra/schema/SchemaKeyspace.java | 2 +- .../cassandra/streaming/StreamReceiveTask.java | 3 +- .../cassandra/thrift/CassandraServer.java | 4 +- 12 files changed, 148 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/1fc86961/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java index 4c87d10..a1de3d6 100644 --- a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java +++ b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java @@ -746,7 +746,7 @@ public class SinglePartitionReadCommand extends ReadCommand try (UnfilteredRowIterator iter = result.unfilteredIterator(columnFilter(), Slices.ALL, false)) { - final Mutation mutation = new Mutation(PartitionUpdate.fromIterator(iter)); + final Mutation mutation = new Mutation(PartitionUpdate.fromIterator(iter, columnFilter())); StageManager.getStage(Stage.MUTATION).execute(new Runnable() { public void run() http://git-wip-us.apache.org/repos/asf/cassandra/blob/1fc86961/src/java/org/apache/cassandra/db/filter/ColumnFilter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/filter/ColumnFilter.java b/src/java/org/apache/cassandra/db/filter/ColumnFilter.java index e22c154..9ad4b53 100644 --- a/src/java/org/apache/cassandra/db/filter/ColumnFilter.java +++ b/src/java/org/apache/cassandra/db/filter/ColumnFilter.java @@ -111,6 +111,15 @@ public class ColumnFilter } /** + * Whether the filter or not the {@code canSkipValue()} methods may return + * {@code true} for some column/cell. + */ + public boolean skipSomeValues() + { + return isFetchAll && (selection != null || subSelections != null); + } + + /** * Whether the provided column is selected by this selection. */ public boolean includes(ColumnDefinition column) http://git-wip-us.apache.org/repos/asf/cassandra/blob/1fc86961/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java b/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java index 52f8f67..d32959a 100644 --- a/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java +++ b/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java @@ -193,18 +193,36 @@ public class PartitionUpdate extends AbstractBTreePartition /** * Turns the given iterator into an update. * + * @param iterator the iterator to turn into updates. + * @param filter the column filter used when querying {@code iterator}. This is used to make + * sure we don't include data for which the value has been skipped while reading (as we would + * then be writing something incorrect). + * * Warning: this method does not close the provided iterator, it is up to * the caller to close it. */ - public static PartitionUpdate fromIterator(UnfilteredRowIterator iterator) + public static PartitionUpdate fromIterator(UnfilteredRowIterator iterator, ColumnFilter filter) { + iterator = UnfilteredRowIterators.withoutSkippedValues(iterator, filter); Holder holder = build(iterator, 16); MutableDeletionInfo deletionInfo = (MutableDeletionInfo) holder.deletionInfo; return new PartitionUpdate(iterator.metadata(), iterator.partitionKey(), holder, deletionInfo, false); } - public static PartitionUpdate fromIterator(RowIterator iterator) + /** + * Turns the given iterator into an update. + * + * @param iterator the iterator to turn into updates. + * @param filter the column filter used when querying {@code iterator}. This is used to make + * sure we don't include data for which the value has been skipped while reading (as we would + * then be writing something incorrect). + * + * Warning: this method does not close the provided iterator, it is up to + * the caller to close it. + */ + public static PartitionUpdate fromIterator(RowIterator iterator, ColumnFilter filter) { + iterator = RowIterators.withoutSkippedValues(iterator, filter); MutableDeletionInfo deletionInfo = MutableDeletionInfo.live(); Holder holder = build(iterator, deletionInfo, true, 16); return new PartitionUpdate(iterator.metadata(), iterator.partitionKey(), holder, deletionInfo, false); @@ -296,7 +314,7 @@ public class PartitionUpdate extends AbstractBTreePartition int nowInSecs = FBUtilities.nowInSeconds(); List<UnfilteredRowIterator> asIterators = Lists.transform(updates, AbstractBTreePartition::unfilteredIterator); - return fromIterator(UnfilteredRowIterators.merge(asIterators, nowInSecs)); + return fromIterator(UnfilteredRowIterators.merge(asIterators, nowInSecs), ColumnFilter.all(updates.get(0).metadata())); } /** @@ -668,7 +686,7 @@ public class PartitionUpdate extends AbstractBTreePartition try (UnfilteredRowIterator iterator = LegacyLayout.deserializeLegacyPartition(in, version, flag, key)) { assert iterator != null; // This is only used in mutation, and mutation have never allowed "null" column families - return PartitionUpdate.fromIterator(iterator); + return PartitionUpdate.fromIterator(iterator, ColumnFilter.all(iterator.metadata())); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/1fc86961/src/java/org/apache/cassandra/db/rows/BTreeRow.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/rows/BTreeRow.java b/src/java/org/apache/cassandra/db/rows/BTreeRow.java index 4bd11da..a91ea9d 100644 --- a/src/java/org/apache/cassandra/db/rows/BTreeRow.java +++ b/src/java/org/apache/cassandra/db/rows/BTreeRow.java @@ -271,6 +271,21 @@ public class BTreeRow extends AbstractRow }); } + public Row withoutSkippedValues(ColumnFilter filter) + { + if (!filter.skipSomeValues()) + return this; + + return transformAndFilter(primaryKeyLivenessInfo, deletion, (cd) -> { + + ColumnDefinition column = cd.column(); + if (column.isComplex()) + return ((ComplexColumnData)cd).withoutSkippedValues(filter); + + return filter.canSkipValue(column) ? null : cd; + }); + } + public boolean hasComplex() { // We start by the end cause we know complex columns sort after the simple ones http://git-wip-us.apache.org/repos/asf/cassandra/blob/1fc86961/src/java/org/apache/cassandra/db/rows/ComplexColumnData.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/rows/ComplexColumnData.java b/src/java/org/apache/cassandra/db/rows/ComplexColumnData.java index fab529b..bf2b39c 100644 --- a/src/java/org/apache/cassandra/db/rows/ComplexColumnData.java +++ b/src/java/org/apache/cassandra/db/rows/ComplexColumnData.java @@ -165,6 +165,11 @@ public class ComplexColumnData extends ColumnData implements Iterable<Cell> return transformAndFilter(newDeletion, (cell) -> cell.purge(purger, nowInSec)); } + public ComplexColumnData withoutSkippedValues(ColumnFilter filter) + { + return transformAndFilter(complexDeletion, (cell) -> filter.canSkipValue(column, cell.path()) ? null : cell); + } + private ComplexColumnData transformAndFilter(DeletionTime newDeletion, Function<? super Cell, ? extends Cell> function) { Object[] transformed = BTree.transformAndFilter(cells, function); http://git-wip-us.apache.org/repos/asf/cassandra/blob/1fc86961/src/java/org/apache/cassandra/db/rows/Row.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/rows/Row.java b/src/java/org/apache/cassandra/db/rows/Row.java index 8a67e9b..46c715a 100644 --- a/src/java/org/apache/cassandra/db/rows/Row.java +++ b/src/java/org/apache/cassandra/db/rows/Row.java @@ -205,6 +205,15 @@ public interface Row extends Unfiltered, Collection<ColumnData> public Row purge(DeletionPurger purger, int nowInSec); /** + * Returns a copy of this row that doesn't include any cell for which the value is skipped in {@code filter}. + * + * @param filter the {@code ColumnFilter} to use when deciding which values are skipped. This should be the filter + * that was used when querying the row on which this method is called. + * @return the row but without any cell having its value skipped by {@code filter}. + */ + public Row withoutSkippedValues(ColumnFilter filter); + + /** * Returns a copy of this row where all counter cells have they "local" shard marked for clearing. */ public Row markCounterLocalToBeCleared(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/1fc86961/src/java/org/apache/cassandra/db/rows/RowIterators.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/rows/RowIterators.java b/src/java/org/apache/cassandra/db/rows/RowIterators.java index 551edb8..78d34d1 100644 --- a/src/java/org/apache/cassandra/db/rows/RowIterators.java +++ b/src/java/org/apache/cassandra/db/rows/RowIterators.java @@ -23,6 +23,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.db.filter.ColumnFilter; import org.apache.cassandra.db.transform.Transformation; import org.apache.cassandra.utils.FBUtilities; @@ -49,6 +50,22 @@ public abstract class RowIterators } /** + * Filter the provided iterator to exclude cells whose value is skipped by the provided filter. + * + * @param iterator the iterator to filter. + * @param filter the {@code ColumnFilter} to use when deciding which values are skipped. This should be the filter + * that was used when querying {@code iterator}. + * @return the filtered iterator.. + */ + public static RowIterator withoutSkippedValues(RowIterator iterator, ColumnFilter filter) + { + if (!filter.skipSomeValues()) + return iterator; + + return Transformation.apply(iterator, new WithoutSkippedValuesFunction(filter)); + } + + /** * Wraps the provided iterator so it logs the returned rows for debugging purposes. * <p> * Note that this is only meant for debugging as this can log a very large amount of http://git-wip-us.apache.org/repos/asf/cassandra/blob/1fc86961/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java index ea929d7..3b378ac 100644 --- a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java +++ b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java @@ -25,6 +25,7 @@ import org.slf4j.LoggerFactory; import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.db.*; +import org.apache.cassandra.db.filter.ColumnFilter; import org.apache.cassandra.db.transform.FilteredRows; import org.apache.cassandra.db.transform.MoreRows; import org.apache.cassandra.db.transform.Transformation; @@ -128,6 +129,22 @@ public abstract class UnfilteredRowIterators } /** + * Filter the provided iterator to exclude cells whose value is skipped by the provided filter. + * + * @param iterator the iterator to filter. + * @param filter the {@code ColumnFilter} to use when deciding which values are skipped. This should be the filter + * that was used when querying {@code iterator}. + * @return the filtered iterator.. + */ + public static UnfilteredRowIterator withoutSkippedValues(UnfilteredRowIterator iterator, ColumnFilter filter) + { + if (!filter.skipSomeValues()) + return iterator; + + return Transformation.apply(iterator, new WithoutSkippedValuesFunction(filter)); + } + + /** * Returns an iterator that concatenate two atom iterators. * This method assumes that both iterator are from the same partition and that the atom from * {@code iter2} come after the ones of {@code iter1} (that is, that concatenating the iterator http://git-wip-us.apache.org/repos/asf/cassandra/blob/1fc86961/src/java/org/apache/cassandra/db/rows/WithoutSkippedValuesFunction.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/rows/WithoutSkippedValuesFunction.java b/src/java/org/apache/cassandra/db/rows/WithoutSkippedValuesFunction.java new file mode 100644 index 0000000..6ace793 --- /dev/null +++ b/src/java/org/apache/cassandra/db/rows/WithoutSkippedValuesFunction.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db.rows; + +import org.apache.cassandra.db.*; +import org.apache.cassandra.db.filter.ColumnFilter; +import org.apache.cassandra.db.transform.Transformation; + +/** + * Function to skip cells (from an iterator) whose value is skipped by the + * provided {@code ColumnFilter}. See {@link UnfilteredRowIterators#withoutSkippedValues} for more details. + */ +public class WithoutSkippedValuesFunction<I extends BaseRowIterator<?>> extends Transformation<I> +{ + private final ColumnFilter filter; + + public WithoutSkippedValuesFunction(ColumnFilter filter) + { + this.filter = filter; + } + + @Override + protected Row applyToStatic(Row row) + { + return row.withoutSkippedValues(filter); + } + + @Override + protected Row applyToRow(Row row) + { + return row.withoutSkippedValues(filter); + } +}; http://git-wip-us.apache.org/repos/asf/cassandra/blob/1fc86961/src/java/org/apache/cassandra/schema/SchemaKeyspace.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/schema/SchemaKeyspace.java b/src/java/org/apache/cassandra/schema/SchemaKeyspace.java index a28423d..d0b1256 100644 --- a/src/java/org/apache/cassandra/schema/SchemaKeyspace.java +++ b/src/java/org/apache/cassandra/schema/SchemaKeyspace.java @@ -364,7 +364,7 @@ public final class SchemaKeyspace mutationMap.put(key, mutation); } - mutation.add(PartitionUpdate.fromIterator(partition)); + mutation.add(PartitionUpdate.fromIterator(partition, cmd.columnFilter())); } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/1fc86961/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java index 92a14d1..0e8c9e9 100644 --- a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java +++ b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java @@ -38,6 +38,7 @@ import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.Keyspace; import org.apache.cassandra.db.Mutation; import org.apache.cassandra.db.compaction.OperationType; +import org.apache.cassandra.db.filter.ColumnFilter; import org.apache.cassandra.db.lifecycle.LifecycleTransaction; import org.apache.cassandra.db.partitions.PartitionUpdate; import org.apache.cassandra.db.rows.UnfilteredRowIterator; @@ -164,7 +165,7 @@ public class StreamReceiveTask extends StreamTask try (UnfilteredRowIterator rowIterator = scanner.next()) { //Apply unsafe (we will flush below before transaction is done) - new Mutation(PartitionUpdate.fromIterator(rowIterator)).applyUnsafe(); + new Mutation(PartitionUpdate.fromIterator(rowIterator, ColumnFilter.all(cfs.metadata))).applyUnsafe(); } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/1fc86961/src/java/org/apache/cassandra/thrift/CassandraServer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/thrift/CassandraServer.java b/src/java/org/apache/cassandra/thrift/CassandraServer.java index ee86f9d..61d9b5f 100644 --- a/src/java/org/apache/cassandra/thrift/CassandraServer.java +++ b/src/java/org/apache/cassandra/thrift/CassandraServer.java @@ -941,7 +941,7 @@ public class CassandraServer implements Cassandra.Iface DecoratedKey dk = metadata.decorateKey(key); int nowInSec = FBUtilities.nowInSeconds(); - PartitionUpdate partitionUpdates = PartitionUpdate.fromIterator(LegacyLayout.toRowIterator(metadata, dk, toLegacyCells(metadata, updates, nowInSec).iterator(), nowInSec)); + PartitionUpdate partitionUpdates = PartitionUpdate.fromIterator(LegacyLayout.toRowIterator(metadata, dk, toLegacyCells(metadata, updates, nowInSec).iterator(), nowInSec), ColumnFilter.all(metadata)); // Indexed column values cannot be larger than 64K. See CASSANDRA-3057/4240 for more details Keyspace.open(metadata.ksName).getColumnFamilyStore(metadata.cfName).indexManager.validate(partitionUpdates); @@ -1143,7 +1143,7 @@ public class CassandraServer implements Cassandra.Iface sortAndMerge(metadata, cells, nowInSec); DecoratedKey dk = metadata.decorateKey(key); - PartitionUpdate update = PartitionUpdate.fromIterator(LegacyLayout.toUnfilteredRowIterator(metadata, dk, delInfo, cells.iterator())); + PartitionUpdate update = PartitionUpdate.fromIterator(LegacyLayout.toUnfilteredRowIterator(metadata, dk, delInfo, cells.iterator()), ColumnFilter.all(metadata)); // Indexed column values cannot be larger than 64K. See CASSANDRA-3057/4240 for more details Keyspace.open(metadata.ksName).getColumnFamilyStore(metadata.cfName).indexManager.validate(update);