Repository: cassandra
Updated Branches:
  refs/heads/10657 [created] 7dc4ae73f


Avoid skipped values when converting read iterator to mutation


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/1fc86961
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/1fc86961
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/1fc86961

Branch: refs/heads/10657
Commit: 1fc869614304cfa1c1ac0ff39d988874a638be98
Parents: 9a27d3f
Author: Sylvain Lebresne <sylv...@datastax.com>
Authored: Tue Dec 22 17:08:17 2015 +0100
Committer: Sylvain Lebresne <sylv...@datastax.com>
Committed: Tue Dec 22 17:08:17 2015 +0100

----------------------------------------------------------------------
 .../db/SinglePartitionReadCommand.java          |  2 +-
 .../cassandra/db/filter/ColumnFilter.java       |  9 ++++
 .../db/partitions/PartitionUpdate.java          | 26 +++++++++--
 .../org/apache/cassandra/db/rows/BTreeRow.java  | 15 ++++++
 .../cassandra/db/rows/ComplexColumnData.java    |  5 ++
 src/java/org/apache/cassandra/db/rows/Row.java  |  9 ++++
 .../apache/cassandra/db/rows/RowIterators.java  | 17 +++++++
 .../db/rows/UnfilteredRowIterators.java         | 17 +++++++
 .../db/rows/WithoutSkippedValuesFunction.java   | 48 ++++++++++++++++++++
 .../apache/cassandra/schema/SchemaKeyspace.java |  2 +-
 .../cassandra/streaming/StreamReceiveTask.java  |  3 +-
 .../cassandra/thrift/CassandraServer.java       |  4 +-
 12 files changed, 148 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/1fc86961/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java 
b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
index 4c87d10..a1de3d6 100644
--- a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
+++ b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
@@ -746,7 +746,7 @@ public class SinglePartitionReadCommand extends ReadCommand
 
             try (UnfilteredRowIterator iter = 
result.unfilteredIterator(columnFilter(), Slices.ALL, false))
             {
-                final Mutation mutation = new 
Mutation(PartitionUpdate.fromIterator(iter));
+                final Mutation mutation = new 
Mutation(PartitionUpdate.fromIterator(iter, columnFilter()));
                 StageManager.getStage(Stage.MUTATION).execute(new Runnable()
                 {
                     public void run()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1fc86961/src/java/org/apache/cassandra/db/filter/ColumnFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/filter/ColumnFilter.java 
b/src/java/org/apache/cassandra/db/filter/ColumnFilter.java
index e22c154..9ad4b53 100644
--- a/src/java/org/apache/cassandra/db/filter/ColumnFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/ColumnFilter.java
@@ -111,6 +111,15 @@ public class ColumnFilter
     }
 
     /**
+     * Whether the filter or not the {@code canSkipValue()} methods may return
+     * {@code true} for some column/cell.
+     */
+    public boolean skipSomeValues()
+    {
+        return isFetchAll && (selection != null || subSelections != null);
+    }
+
+    /**
      * Whether the provided column is selected by this selection.
      */
     public boolean includes(ColumnDefinition column)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1fc86961/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java 
b/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
index 52f8f67..d32959a 100644
--- a/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
+++ b/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
@@ -193,18 +193,36 @@ public class PartitionUpdate extends 
AbstractBTreePartition
     /**
      * Turns the given iterator into an update.
      *
+     * @param iterator the iterator to turn into updates.
+     * @param filter the column filter used when querying {@code iterator}. 
This is used to make
+     * sure we don't include data for which the value has been skipped while 
reading (as we would
+     * then be writing something incorrect).
+     *
      * Warning: this method does not close the provided iterator, it is up to
      * the caller to close it.
      */
-    public static PartitionUpdate fromIterator(UnfilteredRowIterator iterator)
+    public static PartitionUpdate fromIterator(UnfilteredRowIterator iterator, 
ColumnFilter filter)
     {
+        iterator = UnfilteredRowIterators.withoutSkippedValues(iterator, 
filter);
         Holder holder = build(iterator, 16);
         MutableDeletionInfo deletionInfo = (MutableDeletionInfo) 
holder.deletionInfo;
         return new PartitionUpdate(iterator.metadata(), 
iterator.partitionKey(), holder, deletionInfo, false);
     }
 
-    public static PartitionUpdate fromIterator(RowIterator iterator)
+    /**
+     * Turns the given iterator into an update.
+     *
+     * @param iterator the iterator to turn into updates.
+     * @param filter the column filter used when querying {@code iterator}. 
This is used to make
+     * sure we don't include data for which the value has been skipped while 
reading (as we would
+     * then be writing something incorrect).
+     *
+     * Warning: this method does not close the provided iterator, it is up to
+     * the caller to close it.
+     */
+    public static PartitionUpdate fromIterator(RowIterator iterator, 
ColumnFilter filter)
     {
+        iterator = RowIterators.withoutSkippedValues(iterator, filter);
         MutableDeletionInfo deletionInfo = MutableDeletionInfo.live();
         Holder holder = build(iterator, deletionInfo, true, 16);
         return new PartitionUpdate(iterator.metadata(), 
iterator.partitionKey(), holder, deletionInfo, false);
@@ -296,7 +314,7 @@ public class PartitionUpdate extends AbstractBTreePartition
 
         int nowInSecs = FBUtilities.nowInSeconds();
         List<UnfilteredRowIterator> asIterators = Lists.transform(updates, 
AbstractBTreePartition::unfilteredIterator);
-        return fromIterator(UnfilteredRowIterators.merge(asIterators, 
nowInSecs));
+        return fromIterator(UnfilteredRowIterators.merge(asIterators, 
nowInSecs), ColumnFilter.all(updates.get(0).metadata()));
     }
 
     /**
@@ -668,7 +686,7 @@ public class PartitionUpdate extends AbstractBTreePartition
             try (UnfilteredRowIterator iterator = 
LegacyLayout.deserializeLegacyPartition(in, version, flag, key))
             {
                 assert iterator != null; // This is only used in mutation, and 
mutation have never allowed "null" column families
-                return PartitionUpdate.fromIterator(iterator);
+                return PartitionUpdate.fromIterator(iterator, 
ColumnFilter.all(iterator.metadata()));
             }
         }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1fc86961/src/java/org/apache/cassandra/db/rows/BTreeRow.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/BTreeRow.java 
b/src/java/org/apache/cassandra/db/rows/BTreeRow.java
index 4bd11da..a91ea9d 100644
--- a/src/java/org/apache/cassandra/db/rows/BTreeRow.java
+++ b/src/java/org/apache/cassandra/db/rows/BTreeRow.java
@@ -271,6 +271,21 @@ public class BTreeRow extends AbstractRow
         });
     }
 
+    public Row withoutSkippedValues(ColumnFilter filter)
+    {
+        if (!filter.skipSomeValues())
+            return this;
+
+        return transformAndFilter(primaryKeyLivenessInfo, deletion, (cd) -> {
+
+            ColumnDefinition column = cd.column();
+            if (column.isComplex())
+                return ((ComplexColumnData)cd).withoutSkippedValues(filter);
+
+            return filter.canSkipValue(column) ? null : cd;
+        });
+    }
+
     public boolean hasComplex()
     {
         // We start by the end cause we know complex columns sort after the 
simple ones

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1fc86961/src/java/org/apache/cassandra/db/rows/ComplexColumnData.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/ComplexColumnData.java 
b/src/java/org/apache/cassandra/db/rows/ComplexColumnData.java
index fab529b..bf2b39c 100644
--- a/src/java/org/apache/cassandra/db/rows/ComplexColumnData.java
+++ b/src/java/org/apache/cassandra/db/rows/ComplexColumnData.java
@@ -165,6 +165,11 @@ public class ComplexColumnData extends ColumnData 
implements Iterable<Cell>
         return transformAndFilter(newDeletion, (cell) -> cell.purge(purger, 
nowInSec));
     }
 
+    public ComplexColumnData withoutSkippedValues(ColumnFilter filter)
+    {
+        return transformAndFilter(complexDeletion, (cell) -> 
filter.canSkipValue(column, cell.path()) ? null : cell);
+    }
+
     private ComplexColumnData transformAndFilter(DeletionTime newDeletion, 
Function<? super Cell, ? extends Cell> function)
     {
         Object[] transformed = BTree.transformAndFilter(cells, function);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1fc86961/src/java/org/apache/cassandra/db/rows/Row.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/Row.java 
b/src/java/org/apache/cassandra/db/rows/Row.java
index 8a67e9b..46c715a 100644
--- a/src/java/org/apache/cassandra/db/rows/Row.java
+++ b/src/java/org/apache/cassandra/db/rows/Row.java
@@ -205,6 +205,15 @@ public interface Row extends Unfiltered, 
Collection<ColumnData>
     public Row purge(DeletionPurger purger, int nowInSec);
 
     /**
+     * Returns a copy of this row that doesn't include any cell for which the 
value is skipped in {@code filter}.
+     *
+     * @param filter the {@code ColumnFilter} to use when deciding which 
values are skipped. This should be the filter
+     * that was used when querying the row on which this method is called.
+     * @return the row but without any cell having its value skipped by {@code 
filter}.
+     */
+    public Row withoutSkippedValues(ColumnFilter filter);
+
+    /**
      * Returns a copy of this row where all counter cells have they "local" 
shard marked for clearing.
      */
     public Row markCounterLocalToBeCleared();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1fc86961/src/java/org/apache/cassandra/db/rows/RowIterators.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/RowIterators.java 
b/src/java/org/apache/cassandra/db/rows/RowIterators.java
index 551edb8..78d34d1 100644
--- a/src/java/org/apache/cassandra/db/rows/RowIterators.java
+++ b/src/java/org/apache/cassandra/db/rows/RowIterators.java
@@ -23,6 +23,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.filter.ColumnFilter;
 import org.apache.cassandra.db.transform.Transformation;
 import org.apache.cassandra.utils.FBUtilities;
 
@@ -49,6 +50,22 @@ public abstract class RowIterators
     }
 
     /**
+     * Filter the provided iterator to exclude cells whose value is skipped by 
the provided filter.
+     *
+     * @param iterator the iterator to filter.
+     * @param filter the {@code ColumnFilter} to use when deciding which 
values are skipped. This should be the filter
+     * that was used when querying {@code iterator}.
+     * @return the filtered iterator..
+     */
+    public static RowIterator withoutSkippedValues(RowIterator iterator, 
ColumnFilter filter)
+    {
+        if (!filter.skipSomeValues())
+            return iterator;
+
+        return Transformation.apply(iterator, new 
WithoutSkippedValuesFunction(filter));
+    }
+
+    /**
      * Wraps the provided iterator so it logs the returned rows for debugging 
purposes.
      * <p>
      * Note that this is only meant for debugging as this can log a very large 
amount of

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1fc86961/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java 
b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java
index ea929d7..3b378ac 100644
--- a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java
+++ b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java
@@ -25,6 +25,7 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.filter.ColumnFilter;
 import org.apache.cassandra.db.transform.FilteredRows;
 import org.apache.cassandra.db.transform.MoreRows;
 import org.apache.cassandra.db.transform.Transformation;
@@ -128,6 +129,22 @@ public abstract class UnfilteredRowIterators
     }
 
     /**
+     * Filter the provided iterator to exclude cells whose value is skipped by 
the provided filter.
+     *
+     * @param iterator the iterator to filter.
+     * @param filter the {@code ColumnFilter} to use when deciding which 
values are skipped. This should be the filter
+     * that was used when querying {@code iterator}.
+     * @return the filtered iterator..
+     */
+    public static UnfilteredRowIterator 
withoutSkippedValues(UnfilteredRowIterator iterator, ColumnFilter filter)
+    {
+        if (!filter.skipSomeValues())
+            return iterator;
+
+        return Transformation.apply(iterator, new 
WithoutSkippedValuesFunction(filter));
+    }
+
+    /**
      * Returns an iterator that concatenate two atom iterators.
      * This method assumes that both iterator are from the same partition and 
that the atom from
      * {@code iter2} come after the ones of {@code iter1} (that is, that 
concatenating the iterator

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1fc86961/src/java/org/apache/cassandra/db/rows/WithoutSkippedValuesFunction.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/rows/WithoutSkippedValuesFunction.java 
b/src/java/org/apache/cassandra/db/rows/WithoutSkippedValuesFunction.java
new file mode 100644
index 0000000..6ace793
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/rows/WithoutSkippedValuesFunction.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.rows;
+
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.filter.ColumnFilter;
+import org.apache.cassandra.db.transform.Transformation;
+
+/**
+ * Function to skip cells (from an iterator) whose value is skipped by the
+ * provided {@code ColumnFilter}. See {@link 
UnfilteredRowIterators#withoutSkippedValues} for more details.
+ */
+public class WithoutSkippedValuesFunction<I extends BaseRowIterator<?>> 
extends Transformation<I>
+{
+    private final ColumnFilter filter;
+
+    public WithoutSkippedValuesFunction(ColumnFilter filter)
+    {
+        this.filter = filter;
+    }
+
+    @Override
+    protected Row applyToStatic(Row row)
+    {
+        return row.withoutSkippedValues(filter);
+    }
+
+    @Override
+    protected Row applyToRow(Row row)
+    {
+        return row.withoutSkippedValues(filter);
+    }
+};

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1fc86961/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/SchemaKeyspace.java 
b/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
index a28423d..d0b1256 100644
--- a/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
+++ b/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
@@ -364,7 +364,7 @@ public final class SchemaKeyspace
                         mutationMap.put(key, mutation);
                     }
 
-                    mutation.add(PartitionUpdate.fromIterator(partition));
+                    mutation.add(PartitionUpdate.fromIterator(partition, 
cmd.columnFilter()));
                 }
             }
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1fc86961/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java 
b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
index 92a14d1..0e8c9e9 100644
--- a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
@@ -38,6 +38,7 @@ import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.db.Mutation;
 import org.apache.cassandra.db.compaction.OperationType;
+import org.apache.cassandra.db.filter.ColumnFilter;
 import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
 import org.apache.cassandra.db.partitions.PartitionUpdate;
 import org.apache.cassandra.db.rows.UnfilteredRowIterator;
@@ -164,7 +165,7 @@ public class StreamReceiveTask extends StreamTask
                                     try (UnfilteredRowIterator rowIterator = 
scanner.next())
                                     {
                                         //Apply unsafe (we will flush below 
before transaction is done)
-                                        new 
Mutation(PartitionUpdate.fromIterator(rowIterator)).applyUnsafe();
+                                        new 
Mutation(PartitionUpdate.fromIterator(rowIterator, 
ColumnFilter.all(cfs.metadata))).applyUnsafe();
                                     }
                                 }
                             }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1fc86961/src/java/org/apache/cassandra/thrift/CassandraServer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/thrift/CassandraServer.java 
b/src/java/org/apache/cassandra/thrift/CassandraServer.java
index ee86f9d..61d9b5f 100644
--- a/src/java/org/apache/cassandra/thrift/CassandraServer.java
+++ b/src/java/org/apache/cassandra/thrift/CassandraServer.java
@@ -941,7 +941,7 @@ public class CassandraServer implements Cassandra.Iface
             DecoratedKey dk = metadata.decorateKey(key);
             int nowInSec = FBUtilities.nowInSeconds();
 
-            PartitionUpdate partitionUpdates = 
PartitionUpdate.fromIterator(LegacyLayout.toRowIterator(metadata, dk, 
toLegacyCells(metadata, updates, nowInSec).iterator(), nowInSec));
+            PartitionUpdate partitionUpdates = 
PartitionUpdate.fromIterator(LegacyLayout.toRowIterator(metadata, dk, 
toLegacyCells(metadata, updates, nowInSec).iterator(), nowInSec), 
ColumnFilter.all(metadata));
             // Indexed column values cannot be larger than 64K.  See 
CASSANDRA-3057/4240 for more details
             
Keyspace.open(metadata.ksName).getColumnFamilyStore(metadata.cfName).indexManager.validate(partitionUpdates);
 
@@ -1143,7 +1143,7 @@ public class CassandraServer implements Cassandra.Iface
 
                 sortAndMerge(metadata, cells, nowInSec);
                 DecoratedKey dk = metadata.decorateKey(key);
-                PartitionUpdate update = 
PartitionUpdate.fromIterator(LegacyLayout.toUnfilteredRowIterator(metadata, dk, 
delInfo, cells.iterator()));
+                PartitionUpdate update = 
PartitionUpdate.fromIterator(LegacyLayout.toUnfilteredRowIterator(metadata, dk, 
delInfo, cells.iterator()), ColumnFilter.all(metadata));
 
                 // Indexed column values cannot be larger than 64K.  See 
CASSANDRA-3057/4240 for more details
                 
Keyspace.open(metadata.ksName).getColumnFamilyStore(metadata.cfName).indexManager.validate(update);

Reply via email to