Merge branch cassandra-2.2 into cassandra-3.0
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/bb56193a Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/bb56193a Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/bb56193a Branch: refs/heads/cassandra-3.8 Commit: bb56193a3f2ac865efb18c9d6944f2927f667771 Parents: 6528fbf 9583b6b Author: Benjamin Lerer <b.le...@gmail.com> Authored: Tue Aug 16 15:29:42 2016 +0200 Committer: Benjamin Lerer <b.le...@gmail.com> Committed: Tue Aug 16 15:34:35 2016 +0200 ---------------------------------------------------------------------- CHANGES.txt | 1 + NEWS.txt | 10 + .../cassandra/db/compaction/Scrubber.java | 169 ++++- .../cassandra/db/marshal/ReversedType.java | 10 - .../db/partitions/AbstractBTreePartition.java | 7 +- .../db/partitions/ImmutableBTreePartition.java | 34 +- .../db/rows/UnfilteredRowIterators.java | 28 + .../org/apache/cassandra/cql3/CQLTester.java | 18 + .../validation/entities/SecondaryIndexTest.java | 131 ++++ .../cql3/validation/operations/DeleteTest.java | 113 +++ .../operations/SelectOrderByTest.java | 719 ++++++++++--------- .../cql3/validation/operations/SelectTest.java | 357 ++++++++- .../cassandra/db/marshal/ReversedTypeTest.java | 4 +- 13 files changed, 1210 insertions(+), 391 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/bb56193a/CHANGES.txt ---------------------------------------------------------------------- diff --cc CHANGES.txt index a202755,c421398..7605952 --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -49,9 -14,12 +49,10 @@@ Merged from 2.2 * Fixed cqlshlib.test.remove_test_db (CASSANDRA-12214) * Synchronize ThriftServer::stop() (CASSANDRA-12105) * Use dedicated thread for JMX notifications (CASSANDRA-12146) - * NPE when trying to remove purgable tombstones from result (CASSANDRA-12143) * Improve streaming synchronization and fault tolerance (CASSANDRA-11414) * MemoryUtil.getShort() should return an unsigned short also for architectures not supporting unaligned memory accesses (CASSANDRA-11973) - * Don't write shadowed range tombstone (CASSANDRA-12030) Merged from 2.1: + * Fix queries with empty ByteBuffer values in clustering column restrictions (CASSANDRA-12127) * Disable passing control to post-flush after flush failure to prevent data loss (CASSANDRA-11828) * Allow STCS-in-L0 compactions to reduce scope with LCS (CASSANDRA-12040) * cannot use cql since upgrading python to 2.7.11+ (CASSANDRA-11850) http://git-wip-us.apache.org/repos/asf/cassandra/blob/bb56193a/NEWS.txt ---------------------------------------------------------------------- diff --cc NEWS.txt index 6b8dfd3,0a3ab36..0bd3920 --- a/NEWS.txt +++ b/NEWS.txt @@@ -13,55 -13,17 +13,65 @@@ restore snapshots created with the prev 'sstableloader' tool. You can upgrade the file format of your snapshots using the provided 'sstableupgrade' tool. -2.2.8 ++3.0.9 + ===== + + Upgrading + --------- - - The ReversedType behaviour has been corrected for clustering columns of - BYTES type containing empty value. Scrub should be run on the existing - SSTables containing a descending clustering column of BYTES type to correct - their ordering. See CASSANDRA-12127 for more details. ++ - The ReversedType behaviour has been corrected for clustering columns of ++ BYTES type containing empty value. Scrub should be run on the existing ++ SSTables containing a descending clustering column of BYTES type to correct ++ their ordering. See CASSANDRA-12127 for more details. + -2.2.7 +3.0.8 +===== + +Upgrading +--------- + - Ec2MultiRegionSnitch will no longer automatically set broadcast_rpc_address + to the public instance IP if this property is defined on cassandra.yaml. + +3.0.7 +===== + +Upgrading +--------- + - A maximum size for SSTables values has been introduced, to prevent out of memory + exceptions when reading corrupt SSTables. This maximum size can be set via + max_value_size_in_mb in cassandra.yaml. The default is 256MB, which matches the default + value of native_transport_max_frame_size_in_mb. SSTables will be considered corrupt if + they contain values whose size exceeds this limit. See CASSANDRA-9530 for more details. + +Deprecation +----------- + - DateTieredCompactionStrategy has been deprecated - new tables should use + TimeWindowCompactionStrategy. Note that migrating an existing DTCS-table to TWCS might + cause increased compaction load for a while after the migration so make sure you run + tests before migrating. Read CASSANDRA-9666 for background on this. + +New features +------------ + - TimeWindowCompactionStrategy has been added. This has proven to be a better approach + to time series compaction and new tables should use this instead of DTCS. See + CASSANDRA-9666 for details. + +3.0.6 +===== + +New features +------------ + - JSON timestamps are now in UTC and contain the timezone information, see + CASSANDRA-11137 for more details. + +3.0.5 +===== + +Upgrading +--------- + - Nothing specific to this release, but please see previous versions upgrading section, + especially if you are upgrading from 2.2. + +3.0.4 ===== New features http://git-wip-us.apache.org/repos/asf/cassandra/blob/bb56193a/src/java/org/apache/cassandra/db/compaction/Scrubber.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/compaction/Scrubber.java index 539c4c7,99ee62e..c010891 --- a/src/java/org/apache/cassandra/db/compaction/Scrubber.java +++ b/src/java/org/apache/cassandra/db/compaction/Scrubber.java @@@ -23,11 -23,11 +23,12 @@@ import java.util.* import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Throwables; -import com.google.common.collect.AbstractIterator; ++import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.db.*; import org.apache.cassandra.db.lifecycle.LifecycleTransaction; -import org.apache.cassandra.db.columniterator.OnDiskAtomIterator; +import org.apache.cassandra.db.rows.*; +import org.apache.cassandra.db.partitions.*; import org.apache.cassandra.io.sstable.*; import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.io.sstable.format.SSTableWriter; @@@ -327,6 -312,32 +302,38 @@@ public class Scrubber implements Closea } } + @SuppressWarnings("resource") + private boolean tryAppend(DecoratedKey prevKey, DecoratedKey key, SSTableRewriter writer) + { - // OrderCheckerIterator will check, at iteration time, that the cells are in the proper order. If it detects - // that one cell is out of order, it will stop returning them. The remaining cells will be sorted and added - // to the outOfOrderRows that will be later written to a new SSTable. - OrderCheckerIterator atoms = new OrderCheckerIterator(new SSTableIdentityIterator(sstable, dataFile, key, checkData), - cfs.metadata.comparator.onDiskAtomComparator()); - if (prevKey != null && prevKey.compareTo(key) > 0) ++ // OrderCheckerIterator will check, at iteration time, that the rows are in the proper order. If it detects ++ // that one row is out of order, it will stop returning them. The remaining rows will be sorted and added ++ // to the outOfOrder set that will be later written to a new SSTable. ++ OrderCheckerIterator sstableIterator = new OrderCheckerIterator(new RowMergingSSTableIterator(sstable, dataFile, key), ++ cfs.metadata.comparator); ++ ++ try (UnfilteredRowIterator iterator = withValidation(sstableIterator, dataFile.getPath())) + { - saveOutOfOrderRow(prevKey, key, atoms); - return false; - } ++ if (prevKey != null && prevKey.compareTo(key) > 0) ++ { ++ saveOutOfOrderRow(prevKey, key, iterator); ++ return false; ++ } + - AbstractCompactedRow compactedRow = new LazilyCompactedRow(controller, Collections.singletonList(atoms)); - if (writer.tryAppend(compactedRow) == null) - emptyRows++; - else - goodRows++; ++ if (writer.tryAppend(iterator) == null) ++ emptyRows++; ++ else ++ goodRows++; ++ } + - if (atoms.hasOutOfOrderCells()) - saveOutOfOrderRow(key, atoms); ++ if (sstableIterator.hasRowsOutOfOrder()) ++ { ++ outputHandler.warn(String.format("Out of order rows found in partition: %s", key)); ++ outOfOrder.add(sstableIterator.getRowsOutOfOrder()); ++ } + + return true; + } + private void updateIndexKey() { currentIndexKey = nextIndexKey; @@@ -469,44 -523,88 +476,146 @@@ } /** + * During 2.x migration, under some circumstances rows might have gotten duplicated. + * Merging iterator merges rows with same clustering. + * + * For more details, refer to CASSANDRA-12144. + */ + private static class RowMergingSSTableIterator extends SSTableIdentityIterator + { + RowMergingSSTableIterator(SSTableReader sstable, RandomAccessReader file, DecoratedKey key) + { + super(sstable, file, key); + } + + @Override + protected Unfiltered doCompute() + { + if (!iterator.hasNext()) + return endOfData(); + + Unfiltered next = iterator.next(); + if (!next.isRow()) + return next; + + while (iterator.hasNext()) + { + Unfiltered peek = iterator.peek(); + // If there was a duplicate row, merge it. + if (next.clustering().equals(peek.clustering()) && peek.isRow()) + { + iterator.next(); // Make sure that the peeked item was consumed. + next = Rows.merge((Row) next, (Row) peek, FBUtilities.nowInSeconds()); + } + else + { + break; + } + } + + return next; + } + } ++ ++ /** + * In some case like CASSANDRA-12127 the cells might have been stored in the wrong order. This decorator check the + * cells order and collect the out of order cells to correct the problem. + */ - private static final class OrderCheckerIterator extends AbstractIterator<OnDiskAtom> implements OnDiskAtomIterator ++ private static final class OrderCheckerIterator extends AbstractIterator<Unfiltered> implements UnfilteredRowIterator + { + /** + * The decorated iterator. + */ - private final OnDiskAtomIterator iterator; ++ private final UnfilteredRowIterator iterator; + - /** - * The atom comparator. - */ - private final Comparator<OnDiskAtom> comparator; ++ private final ClusteringComparator comparator; + - /** - * The Column family containing the cells which are out of order. - */ - private ColumnFamily outOfOrderCells; ++ private Unfiltered previous; + + /** - * The previous atom returned ++ * The partition containing the rows which are out of order. + */ - private OnDiskAtom previous; ++ private Partition rowsOutOfOrder; + - public OrderCheckerIterator(OnDiskAtomIterator iterator, Comparator<OnDiskAtom> comparator) ++ public OrderCheckerIterator(UnfilteredRowIterator iterator, ClusteringComparator comparator) + { + this.iterator = iterator; + this.comparator = comparator; + } + - public ColumnFamily getColumnFamily() ++ public CFMetaData metadata() + { - return iterator.getColumnFamily(); ++ return iterator.metadata(); + } + - public DecoratedKey getKey() ++ public boolean isReverseOrder() + { - return iterator.getKey(); ++ return iterator.isReverseOrder(); + } + - public void close() throws IOException ++ public PartitionColumns columns() + { - iterator.close(); ++ return iterator.columns(); ++ } ++ ++ public DecoratedKey partitionKey() ++ { ++ return iterator.partitionKey(); ++ } ++ ++ public Row staticRow() ++ { ++ return iterator.staticRow(); + } + + @Override - protected OnDiskAtom computeNext() ++ public boolean isEmpty() ++ { ++ return iterator.isEmpty(); ++ } ++ ++ public void close() ++ { ++ iterator.close(); ++ } ++ ++ public DeletionTime partitionLevelDeletion() ++ { ++ return iterator.partitionLevelDeletion(); ++ } ++ ++ public EncodingStats stats() ++ { ++ return iterator.stats(); ++ } ++ ++ public boolean hasRowsOutOfOrder() ++ { ++ return rowsOutOfOrder != null; ++ } ++ ++ public Partition getRowsOutOfOrder() ++ { ++ return rowsOutOfOrder; ++ } ++ ++ protected Unfiltered computeNext() + { + if (!iterator.hasNext()) + return endOfData(); + - OnDiskAtom next = iterator.next(); ++ Unfiltered next = iterator.next(); + - // If we detect that some cells are out of order we will store and sort the remaining once to insert them ++ // If we detect that some rows are out of order we will store and sort the remaining ones to insert them + // in a separate SSTable. + if (previous != null && comparator.compare(next, previous) < 0) + { - outOfOrderCells = collectOutOfOrderCells(next, iterator); ++ rowsOutOfOrder = ImmutableBTreePartition.create(UnfilteredRowIterators.concat(next, iterator), false); + return endOfData(); + } + previous = next; + return next; + } + - public boolean hasOutOfOrderCells() - { - return outOfOrderCells != null; - } - - public ColumnFamily getOutOfOrderCells() - { - return outOfOrderCells; - } - - private static ColumnFamily collectOutOfOrderCells(OnDiskAtom atom, OnDiskAtomIterator iterator) - { - ColumnFamily cf = iterator.getColumnFamily().cloneMeShallow(ArrayBackedSortedColumns.factory, false); - cf.addAtom(atom); - while (iterator.hasNext()) - cf.addAtom(iterator.next()); - return cf; - } + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/bb56193a/src/java/org/apache/cassandra/db/marshal/ReversedType.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/marshal/ReversedType.java index 02320c7,19bee5f..82a1895 --- a/src/java/org/apache/cassandra/db/marshal/ReversedType.java +++ b/src/java/org/apache/cassandra/db/marshal/ReversedType.java @@@ -66,18 -65,8 +66,8 @@@ public class ReversedType<T> extends Ab return baseType.isEmptyValueMeaningless(); } - public int compare(ByteBuffer o1, ByteBuffer o2) + public int compareCustom(ByteBuffer o1, ByteBuffer o2) { - // An empty byte buffer is always smaller - if (o1.remaining() == 0) - { - return o2.remaining() == 0 ? 0 : -1; - } - if (o2.remaining() == 0) - { - return 1; - } - return baseType.compare(o2, o1); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/bb56193a/src/java/org/apache/cassandra/db/partitions/AbstractBTreePartition.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/partitions/AbstractBTreePartition.java index 1fa3324,0000000..c63acc2 mode 100644,000000..100644 --- a/src/java/org/apache/cassandra/db/partitions/AbstractBTreePartition.java +++ b/src/java/org/apache/cassandra/db/partitions/AbstractBTreePartition.java @@@ -1,408 -1,0 +1,413 @@@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ +package org.apache.cassandra.db.partitions; + +import java.util.Iterator; + +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.db.*; +import org.apache.cassandra.db.filter.ColumnFilter; +import org.apache.cassandra.db.rows.*; +import org.apache.cassandra.utils.SearchIterator; +import org.apache.cassandra.utils.btree.BTree; +import org.apache.cassandra.utils.btree.BTreeSearchIterator; + +import static org.apache.cassandra.utils.btree.BTree.Dir.desc; + +public abstract class AbstractBTreePartition implements Partition, Iterable<Row> +{ + protected static final Holder EMPTY = new Holder(PartitionColumns.NONE, BTree.empty(), DeletionInfo.LIVE, Rows.EMPTY_STATIC_ROW, EncodingStats.NO_STATS); + + protected final CFMetaData metadata; + protected final DecoratedKey partitionKey; + + protected abstract Holder holder(); + protected abstract boolean canHaveShadowedData(); + + protected AbstractBTreePartition(CFMetaData metadata, DecoratedKey partitionKey) + { + this.metadata = metadata; + this.partitionKey = partitionKey; + } + + protected static final class Holder + { + final PartitionColumns columns; + final DeletionInfo deletionInfo; + // the btree of rows + final Object[] tree; + final Row staticRow; + final EncodingStats stats; + + Holder(PartitionColumns columns, Object[] tree, DeletionInfo deletionInfo, Row staticRow, EncodingStats stats) + { + this.columns = columns; + this.tree = tree; + this.deletionInfo = deletionInfo; + this.staticRow = staticRow == null ? Rows.EMPTY_STATIC_ROW : staticRow; + this.stats = stats; + } + } + + public DeletionInfo deletionInfo() + { + return holder().deletionInfo; + } + + public Row staticRow() + { + return holder().staticRow; + } + + public boolean isEmpty() + { + Holder holder = holder(); + return holder.deletionInfo.isLive() && BTree.isEmpty(holder.tree) && holder.staticRow.isEmpty(); + } + + public boolean hasRows() + { + Holder holder = holder(); + return !BTree.isEmpty(holder.tree); + } + + public CFMetaData metadata() + { + return metadata; + } + + public DecoratedKey partitionKey() + { + return partitionKey; + } + + public DeletionTime partitionLevelDeletion() + { + return holder().deletionInfo.getPartitionDeletion(); + } + + public PartitionColumns columns() + { + return holder().columns; + } + + public EncodingStats stats() + { + return holder().stats; + } + + public Row getRow(Clustering clustering) + { + Row row = searchIterator(ColumnFilter.selection(columns()), false).next(clustering); + // Note that for statics, this will never return null, this will return an empty row. However, + // it's more consistent for this method to return null if we don't really have a static row. + return row == null || (clustering == Clustering.STATIC_CLUSTERING && row.isEmpty()) ? null : row; + } + + private Row staticRow(Holder current, ColumnFilter columns, boolean setActiveDeletionToRow) + { + DeletionTime partitionDeletion = current.deletionInfo.getPartitionDeletion(); + if (columns.fetchedColumns().statics.isEmpty() || (current.staticRow.isEmpty() && partitionDeletion.isLive())) + return Rows.EMPTY_STATIC_ROW; + + Row row = current.staticRow.filter(columns, partitionDeletion, setActiveDeletionToRow, metadata); + return row == null ? Rows.EMPTY_STATIC_ROW : row; + } + + public SearchIterator<Clustering, Row> searchIterator(final ColumnFilter columns, final boolean reversed) + { + // TODO: we could optimize comparison for "NativeRow" Ã la #6755 + final Holder current = holder(); + return new SearchIterator<Clustering, Row>() + { + private final SearchIterator<Clustering, Row> rawIter = new BTreeSearchIterator<>(current.tree, metadata.comparator, desc(reversed)); + private final DeletionTime partitionDeletion = current.deletionInfo.getPartitionDeletion(); + + public boolean hasNext() + { + return rawIter.hasNext(); + } + + public Row next(Clustering clustering) + { + if (clustering == Clustering.STATIC_CLUSTERING) + return staticRow(current, columns, true); + + Row row = rawIter.next(clustering); + RangeTombstone rt = current.deletionInfo.rangeCovering(clustering); + + // A search iterator only return a row, so it doesn't allow to directly account for deletion that should apply to to row + // (the partition deletion or the deletion of a range tombstone that covers it). So if needs be, reuse the row deletion + // to carry the proper deletion on the row. + DeletionTime activeDeletion = partitionDeletion; + if (rt != null && rt.deletionTime().supersedes(activeDeletion)) + activeDeletion = rt.deletionTime(); + + if (row == null) + return activeDeletion.isLive() ? null : BTreeRow.emptyDeletedRow(clustering, Row.Deletion.regular(activeDeletion)); + + return row.filter(columns, activeDeletion, true, metadata); + } + }; + } + + public UnfilteredRowIterator unfilteredIterator() + { + return unfilteredIterator(ColumnFilter.all(metadata()), Slices.ALL, false); + } + + public UnfilteredRowIterator unfilteredIterator(ColumnFilter selection, Slices slices, boolean reversed) + { + return unfilteredIterator(holder(), selection, slices, reversed); + } + + public UnfilteredRowIterator unfilteredIterator(Holder current, ColumnFilter selection, Slices slices, boolean reversed) + { + Row staticRow = staticRow(current, selection, false); + if (slices.size() == 0) + { + DeletionTime partitionDeletion = current.deletionInfo.getPartitionDeletion(); + return UnfilteredRowIterators.noRowsIterator(metadata, partitionKey, staticRow, partitionDeletion, reversed); + } + + return slices.size() == 1 + ? sliceIterator(selection, slices.get(0), reversed, current, staticRow) + : new SlicesIterator(selection, slices, reversed, current, staticRow); + } + + private UnfilteredRowIterator sliceIterator(ColumnFilter selection, Slice slice, boolean reversed, Holder current, Row staticRow) + { + Slice.Bound start = slice.start() == Slice.Bound.BOTTOM ? null : slice.start(); + Slice.Bound end = slice.end() == Slice.Bound.TOP ? null : slice.end(); + Iterator<Row> rowIter = BTree.slice(current.tree, metadata.comparator, start, true, end, true, desc(reversed)); + Iterator<RangeTombstone> deleteIter = current.deletionInfo.rangeIterator(slice, reversed); + + return merge(rowIter, deleteIter, selection, reversed, current, staticRow); + } + + private RowAndDeletionMergeIterator merge(Iterator<Row> rowIter, Iterator<RangeTombstone> deleteIter, + ColumnFilter selection, boolean reversed, Holder current, Row staticRow) + { + return new RowAndDeletionMergeIterator(metadata, partitionKey, current.deletionInfo.getPartitionDeletion(), + selection, staticRow, reversed, current.stats, + rowIter, deleteIter, + canHaveShadowedData()); + } + + private abstract class AbstractIterator extends AbstractUnfilteredRowIterator + { + final Holder current; + final ColumnFilter selection; + + private AbstractIterator(ColumnFilter selection, boolean isReversed) + { + this(AbstractBTreePartition.this.holder(), selection, isReversed); + } + + private AbstractIterator(Holder current, ColumnFilter selection, boolean isReversed) + { + this(current, + AbstractBTreePartition.this.staticRow(current, selection, false), + selection, isReversed); + } + + private AbstractIterator(Holder current, Row staticRow, ColumnFilter selection, boolean isReversed) + { + super(AbstractBTreePartition.this.metadata, + AbstractBTreePartition.this.partitionKey, + current.deletionInfo.getPartitionDeletion(), + selection.fetchedColumns(), // non-selected columns will be filtered in subclasses by RowAndDeletionMergeIterator + // it would also be more precise to return the intersection of the selection and current.columns, + // but its probably not worth spending time on computing that. + staticRow, + isReversed, + current.stats); + this.current = current; + this.selection = selection; + } + } + + public class SlicesIterator extends AbstractIterator + { + private final Slices slices; + + private int idx; + private Iterator<Unfiltered> currentSlice; + + private SlicesIterator(ColumnFilter selection, + Slices slices, + boolean isReversed, + Holder current, + Row staticRow) + { + super(current, staticRow, selection, isReversed); + this.slices = slices; + } + + protected Unfiltered computeNext() + { + while (true) + { + if (currentSlice == null) + { + if (idx >= slices.size()) + return endOfData(); + + int sliceIdx = isReverseOrder ? slices.size() - idx - 1 : idx; + currentSlice = sliceIterator(selection, slices.get(sliceIdx), isReverseOrder, current, Rows.EMPTY_STATIC_ROW); + idx++; + } + + if (currentSlice.hasNext()) + return currentSlice.next(); + + currentSlice = null; + } + } + } + + public class SliceableIterator extends AbstractIterator implements SliceableUnfilteredRowIterator + { + private Iterator<Unfiltered> iterator; + + protected SliceableIterator(ColumnFilter selection, boolean isReversed) + { + super(selection, isReversed); + } + + protected Unfiltered computeNext() + { + if (iterator == null) + iterator = unfilteredIterator(selection, Slices.ALL, isReverseOrder); + if (!iterator.hasNext()) + return endOfData(); + return iterator.next(); + } + + public Iterator<Unfiltered> slice(Slice slice) + { + return sliceIterator(selection, slice, isReverseOrder, current, staticRow); + } + } + + public SliceableUnfilteredRowIterator sliceableUnfilteredIterator(ColumnFilter columns, boolean reversed) + { + return new SliceableIterator(columns, reversed); + } + + protected SliceableUnfilteredRowIterator sliceableUnfilteredIterator() + { + return sliceableUnfilteredIterator(ColumnFilter.all(metadata), false); + } + + protected static Holder build(UnfilteredRowIterator iterator, int initialRowCapacity) + { ++ return build(iterator, initialRowCapacity, true); ++ } ++ ++ protected static Holder build(UnfilteredRowIterator iterator, int initialRowCapacity, boolean ordered) ++ { + CFMetaData metadata = iterator.metadata(); + PartitionColumns columns = iterator.columns(); + boolean reversed = iterator.isReverseOrder(); + + BTree.Builder<Row> builder = BTree.builder(metadata.comparator, initialRowCapacity); - builder.auto(false); ++ builder.auto(!ordered); + MutableDeletionInfo.Builder deletionBuilder = MutableDeletionInfo.builder(iterator.partitionLevelDeletion(), metadata.comparator, reversed); + + while (iterator.hasNext()) + { + Unfiltered unfiltered = iterator.next(); + if (unfiltered.kind() == Unfiltered.Kind.ROW) + builder.add((Row)unfiltered); + else + deletionBuilder.add((RangeTombstoneMarker)unfiltered); + } + + if (reversed) + builder.reverse(); + + return new Holder(columns, builder.build(), deletionBuilder.build(), iterator.staticRow(), iterator.stats()); + } + + // Note that when building with a RowIterator, deletion will generally be LIVE, but we allow to pass it nonetheless because PartitionUpdate + // passes a MutableDeletionInfo that it mutates later. + protected static Holder build(RowIterator rows, DeletionInfo deletion, boolean buildEncodingStats, int initialRowCapacity) + { + CFMetaData metadata = rows.metadata(); + PartitionColumns columns = rows.columns(); + boolean reversed = rows.isReverseOrder(); + + BTree.Builder<Row> builder = BTree.builder(metadata.comparator, initialRowCapacity); + builder.auto(false); + while (rows.hasNext()) + { + Row row = rows.next(); + builder.add(row); + } + + if (reversed) + builder.reverse(); + + Row staticRow = rows.staticRow(); + Object[] tree = builder.build(); + EncodingStats stats = buildEncodingStats ? EncodingStats.Collector.collect(staticRow, BTree.iterator(tree), deletion) + : EncodingStats.NO_STATS; + return new Holder(columns, tree, deletion, staticRow, stats); + } + + @Override + public String toString() + { + StringBuilder sb = new StringBuilder(); + + sb.append(String.format("[%s.%s] key=%s columns=%s", + metadata.ksName, + metadata.cfName, + metadata.getKeyValidator().getString(partitionKey().getKey()), + columns())); + + if (staticRow() != Rows.EMPTY_STATIC_ROW) + sb.append("\n ").append(staticRow().toString(metadata)); + + for (Row row : this) + sb.append("\n ").append(row.toString(metadata)); + + return sb.toString(); + } + + public int rowCount() + { + return BTree.size(holder().tree); + } + + public Iterator<Row> iterator() + { + return BTree.<Row>iterator(holder().tree); + } + + public Row lastRow() + { + Object[] tree = holder().tree; + if (BTree.isEmpty(tree)) + return null; + + return BTree.findByIndex(tree, BTree.size(tree) - 1); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/bb56193a/src/java/org/apache/cassandra/db/partitions/ImmutableBTreePartition.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/partitions/ImmutableBTreePartition.java index 9af4bad,0000000..8d96f1e mode 100644,000000..100644 --- a/src/java/org/apache/cassandra/db/partitions/ImmutableBTreePartition.java +++ b/src/java/org/apache/cassandra/db/partitions/ImmutableBTreePartition.java @@@ -1,91 -1,0 +1,123 @@@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ +package org.apache.cassandra.db.partitions; + +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.db.DeletionInfo; +import org.apache.cassandra.db.PartitionColumns; +import org.apache.cassandra.db.rows.*; + +public class ImmutableBTreePartition extends AbstractBTreePartition +{ + + protected final Holder holder; + + public ImmutableBTreePartition(CFMetaData metadata, + DecoratedKey partitionKey, + PartitionColumns columns, + Row staticRow, + Object[] tree, + DeletionInfo deletionInfo, + EncodingStats stats) + { + super(metadata, partitionKey); + this.holder = new Holder(columns, tree, deletionInfo, staticRow, stats); + } + + protected ImmutableBTreePartition(CFMetaData metadata, + DecoratedKey partitionKey, + Holder holder) + { + super(metadata, partitionKey); + this.holder = holder; + } + + /** + * Creates an {@code ImmutableBTreePartition} holding all the data of the provided iterator. + * + * Warning: Note that this method does not close the provided iterator and it is + * up to the caller to do so. + * + * @param iterator the iterator to gather in memory. + * @return the created partition. + */ + public static ImmutableBTreePartition create(UnfilteredRowIterator iterator) + { + return create(iterator, 16); + } + + /** + * Creates an {@code ImmutableBTreePartition} holding all the data of the provided iterator. + * + * Warning: Note that this method does not close the provided iterator and it is + * up to the caller to do so. + * + * @param iterator the iterator to gather in memory. ++ * @param ordered {@code true} if the iterator will return the rows in order, {@code false} otherwise. ++ * @return the created partition. ++ */ ++ public static ImmutableBTreePartition create(UnfilteredRowIterator iterator, boolean ordered) ++ { ++ return create(iterator, 16, ordered); ++ } ++ ++ /** ++ * Creates an {@code ImmutableBTreePartition} holding all the data of the provided iterator. ++ * ++ * Warning: Note that this method does not close the provided iterator and it is ++ * up to the caller to do so. ++ * ++ * @param iterator the iterator to gather in memory. + * @param initialRowCapacity sizing hint (in rows) to use for the created partition. It should ideally + * correspond or be a good estimation of the number or rows in {@code iterator}. + * @return the created partition. + */ + public static ImmutableBTreePartition create(UnfilteredRowIterator iterator, int initialRowCapacity) + { - return new ImmutableBTreePartition(iterator.metadata(), iterator.partitionKey(), build(iterator, initialRowCapacity)); ++ return create(iterator, initialRowCapacity, true); ++ } ++ ++ /** ++ * Creates an {@code ImmutableBTreePartition} holding all the data of the provided iterator. ++ * ++ * Warning: Note that this method does not close the provided iterator and it is ++ * up to the caller to do so. ++ * ++ * @param iterator the iterator to gather in memory. ++ * @param initialRowCapacity sizing hint (in rows) to use for the created partition. It should ideally ++ * correspond or be a good estimation of the number or rows in {@code iterator}. ++ * @param ordered {@code true} if the iterator will return the rows in order, {@code false} otherwise. ++ * @return the created partition. ++ */ ++ public static ImmutableBTreePartition create(UnfilteredRowIterator iterator, int initialRowCapacity, boolean ordered) ++ { ++ return new ImmutableBTreePartition(iterator.metadata(), iterator.partitionKey(), build(iterator, initialRowCapacity, ordered)); + } + + protected Holder holder() + { + return holder; + } + + protected boolean canHaveShadowedData() + { + return false; + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/bb56193a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java index 3218ff2,0000000..43653a9 mode 100644,000000..100644 --- a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java +++ b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java @@@ -1,558 -1,0 +1,586 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db.rows; + +import java.util.*; +import java.security.MessageDigest; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.db.*; +import org.apache.cassandra.db.transform.FilteredRows; +import org.apache.cassandra.db.transform.MoreRows; +import org.apache.cassandra.db.transform.Transformation; +import org.apache.cassandra.io.util.FileUtils; +import org.apache.cassandra.io.sstable.CorruptSSTableException; +import org.apache.cassandra.serializers.MarshalException; +import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.IMergeIterator; +import org.apache.cassandra.utils.MergeIterator; +import org.apache.cassandra.utils.memory.AbstractAllocator; + +/** + * Static methods to work with atom iterators. + */ +public abstract class UnfilteredRowIterators +{ + private static final Logger logger = LoggerFactory.getLogger(UnfilteredRowIterators.class); + + private UnfilteredRowIterators() {} + + /** + * Interface for a listener interested in the result of merging multiple versions of a given row. + * <p> + * Implementors of this interface are given enough information that they can easily reconstruct the difference + * between the merged result and each individual input. This is used when reconciling results on replias for + * instance to figure out what to send as read-repair to each source. + */ + public interface MergeListener + { + /** + * Called once for the merged partition. + * + * @param mergedDeletion the partition level deletion for the merged partition. Implementors can test if the + * merged partition actually has a partition level deletion or not by calling {@code mergedDeletion.isLive()}. + * @param versions the partition level deletion for the sources of the merge. Elements of the array will never + * be null, but be "live". + **/ + public void onMergedPartitionLevelDeletion(DeletionTime mergedDeletion, DeletionTime[] versions); + + /** + * Called once for every row participating in the merge. + * <p> + * Note that this is called for every clustering where at least one of the source merged has a row. In + * particular, this may be called in cases where there is no row in the merged output (if a source has a row + * that is shadowed by another source range tombstone or partition level deletion). + * + * @param merged the result of the merge. This cannot be {@code null} but can be empty, in which case this is a + * placeholder for when at least one source has a row, but that row is shadowed in the merged output. + * @param versions for each source, the row in that source corresponding to {@code merged}. This can be + * {@code null} for some sources if the source has not such row. + */ + public void onMergedRows(Row merged, Row[] versions); + + /** + * Called once for every range tombstone marker participating in the merge. + * <p> + * Note that this is called for every "clustering position" where at least one of the source merged has a range + * tombstone marker. + * + * @param merged the marker in the merged output. This can be {@code null} if there is no such marker, which + * means that at least one source has a marker in {@code versions} but the merged out has nothing corresponding + * (this basically mean the merged output has a currently open deletion that shadows whatever marker the source + * had). + * @param versions the marker for each source merged. This can be {@code null} for some source if that source + * has not such marker. + */ + public void onMergedRangeTombstoneMarkers(RangeTombstoneMarker merged, RangeTombstoneMarker[] versions); + + public void close(); + } + + /** + * Returns a iterator that only returns rows with only live content. + * + * This is mainly used in the CQL layer when we know we don't care about deletion + * infos (and since an UnfilteredRowIterator cannot shadow it's own data, we know everyting + * returned isn't shadowed by a tombstone). + */ + public static RowIterator filter(UnfilteredRowIterator iter, int nowInSec) + { + return FilteredRows.filter(iter, nowInSec); + } + + /** + * Returns an iterator that is the result of merging other iterators. + */ + public static UnfilteredRowIterator merge(List<UnfilteredRowIterator> iterators, int nowInSec) + { + assert !iterators.isEmpty(); + if (iterators.size() == 1) + return iterators.get(0); + + return UnfilteredRowMergeIterator.create(iterators, nowInSec, null); + } + + /** + * Returns an iterator that is the result of merging other iterators, and (optionally) using + * specific MergeListener. + * + * Note that this method assumes that there is at least 2 iterators to merge. + */ + public static UnfilteredRowIterator merge(List<UnfilteredRowIterator> iterators, int nowInSec, MergeListener mergeListener) + { + return UnfilteredRowMergeIterator.create(iterators, nowInSec, mergeListener); + } + + /** + * Returns an empty unfiltered iterator for a given partition. + */ + public static UnfilteredRowIterator noRowsIterator(final CFMetaData cfm, final DecoratedKey partitionKey, final Row staticRow, final DeletionTime partitionDeletion, final boolean isReverseOrder) + { + return EmptyIterators.unfilteredRow(cfm, partitionKey, isReverseOrder, staticRow, partitionDeletion); + } + + /** + * Digests the partition represented by the provided iterator. + * + * @param command the command that has yield {@code iterator}. This can be null if {@code version >= MessagingService.VERSION_30} + * as this is only used when producing digest to be sent to legacy nodes. + * @param iterator the iterator to digest. + * @param digest the {@code MessageDigest} to use for the digest. + * @param version the messaging protocol to use when producing the digest. + */ + public static void digest(ReadCommand command, UnfilteredRowIterator iterator, MessageDigest digest, int version) + { + if (version < MessagingService.VERSION_30) + { + LegacyLayout.fromUnfilteredRowIterator(command, iterator).digest(iterator.metadata(), digest); + return; + } + + digest.update(iterator.partitionKey().getKey().duplicate()); + iterator.partitionLevelDeletion().digest(digest); + iterator.columns().regulars.digest(digest); + // When serializing an iterator, we skip the static columns if the iterator has not static row, even if the + // columns() object itself has some (the columns() is a superset of what the iterator actually contains, and + // will correspond to the queried columns pre-serialization). So we must avoid taking the satic column names + // into account if there is no static row or we'd have a digest mismatch between depending on whether the digest + // is computed on an iterator that has been serialized or not (see CASSANDRA-12090) + // TODO: in practice we could completely skip digesting the columns since they are more informative of what the + // iterator may contain, and digesting the actual content is enough. And in fact, that would be more correct + // (since again, the columns could be different without the information represented by the iterator being + // different), but removing them entirely is stricly speaking a breaking change (it would create mismatches on + // upgrade) so we can only do on the next protocol version bump. + if (iterator.staticRow() != Rows.EMPTY_STATIC_ROW) + iterator.columns().statics.digest(digest); + FBUtilities.updateWithBoolean(digest, iterator.isReverseOrder()); + iterator.staticRow().digest(digest); + + while (iterator.hasNext()) + { + Unfiltered unfiltered = iterator.next(); + unfiltered.digest(digest); + } + } + + /** + * Returns an iterator that concatenate two atom iterators. + * This method assumes that both iterator are from the same partition and that the atom from + * {@code iter2} come after the ones of {@code iter1} (that is, that concatenating the iterator + * make sense). + */ + public static UnfilteredRowIterator concat(final UnfilteredRowIterator iter1, final UnfilteredRowIterator iter2) + { + assert iter1.metadata().cfId.equals(iter2.metadata().cfId) + && iter1.partitionKey().equals(iter2.partitionKey()) + && iter1.partitionLevelDeletion().equals(iter2.partitionLevelDeletion()) + && iter1.isReverseOrder() == iter2.isReverseOrder() + && iter1.columns().equals(iter2.columns()) + && iter1.staticRow().equals(iter2.staticRow()); + + class Extend implements MoreRows<UnfilteredRowIterator> + { + boolean returned = false; + public UnfilteredRowIterator moreContents() + { + if (returned) + return null; + returned = true; + return iter2; + } + } + + return MoreRows.extend(iter1, new Extend()); + } + ++ /** ++ * Returns an iterator that concatenate the specified atom with the iterator. ++ */ ++ public static UnfilteredRowIterator concat(final Unfiltered first, final UnfilteredRowIterator rest) ++ { ++ return new WrappingUnfilteredRowIterator(rest) ++ { ++ private boolean hasReturnedFirst; ++ ++ @Override ++ public boolean hasNext() ++ { ++ return hasReturnedFirst ? super.hasNext() : true; ++ } ++ ++ @Override ++ public Unfiltered next() ++ { ++ if (!hasReturnedFirst) ++ { ++ hasReturnedFirst = true; ++ return first; ++ } ++ return super.next(); ++ } ++ }; ++ } ++ + public static UnfilteredRowIterator cloningIterator(UnfilteredRowIterator iterator, final AbstractAllocator allocator) + { + class Cloner extends Transformation + { + private final Row.Builder builder = allocator.cloningBTreeRowBuilder(); + + public Row applyToStatic(Row row) + { + return Rows.copy(row, builder).build(); + } + + @Override + public Row applyToRow(Row row) + { + return Rows.copy(row, builder).build(); + } + + @Override + public RangeTombstoneMarker applyToMarker(RangeTombstoneMarker marker) + { + return marker.copy(allocator); + } + } + return Transformation.apply(iterator, new Cloner()); + } + + /** + * Validate that the data of the provided iterator is valid, that is that the values + * it contains are valid for the type they represent, and more generally that the + * infos stored are sensible. + * + * This is mainly used by scrubber to detect problems in sstables. + * + * @param iterator the partition to check. + * @param filename the name of the file the data is comming from. + * @return an iterator that returns the same data than {@code iterator} but that + * checks said data and throws a {@code CorruptedSSTableException} if it detects + * invalid data. + */ + public static UnfilteredRowIterator withValidation(UnfilteredRowIterator iterator, final String filename) + { + class Validator extends Transformation + { + @Override + public Row applyToStatic(Row row) + { + validate(row); + return row; + } + + @Override + public Row applyToRow(Row row) + { + validate(row); + return row; + } + + @Override + public RangeTombstoneMarker applyToMarker(RangeTombstoneMarker marker) + { + validate(marker); + return marker; + } + + private void validate(Unfiltered unfiltered) + { + try + { + unfiltered.validateData(iterator.metadata()); + } + catch (MarshalException me) + { + throw new CorruptSSTableException(me, filename); + } + } + } + return Transformation.apply(iterator, new Validator()); + } + + /** + * Wraps the provided iterator so it logs the returned atoms for debugging purposes. + * <p> + * Note that this is only meant for debugging as this can log a very large amount of + * logging at INFO. + */ + public static UnfilteredRowIterator loggingIterator(UnfilteredRowIterator iterator, final String id, final boolean fullDetails) + { + CFMetaData metadata = iterator.metadata(); + logger.info("[{}] Logging iterator on {}.{}, partition key={}, reversed={}, deletion={}", + id, + metadata.ksName, + metadata.cfName, + metadata.getKeyValidator().getString(iterator.partitionKey().getKey()), + iterator.isReverseOrder(), + iterator.partitionLevelDeletion().markedForDeleteAt()); + + class Logger extends Transformation + { + @Override + public Row applyToStatic(Row row) + { + if (!row.isEmpty()) + logger.info("[{}] {}", id, row.toString(metadata, fullDetails)); + return row; + } + + @Override + public Row applyToRow(Row row) + { + logger.info("[{}] {}", id, row.toString(metadata, fullDetails)); + return row; + } + + @Override + public RangeTombstoneMarker applyToMarker(RangeTombstoneMarker marker) + { + logger.info("[{}] {}", id, marker.toString(metadata)); + return marker; + } + } + return Transformation.apply(iterator, new Logger()); + } + + /** + * A wrapper over MergeIterator to implement the UnfilteredRowIterator interface. + */ + private static class UnfilteredRowMergeIterator extends AbstractUnfilteredRowIterator + { + private final IMergeIterator<Unfiltered, Unfiltered> mergeIterator; + private final MergeListener listener; + + private UnfilteredRowMergeIterator(CFMetaData metadata, + List<UnfilteredRowIterator> iterators, + PartitionColumns columns, + DeletionTime partitionDeletion, + int nowInSec, + boolean reversed, + MergeListener listener) + { + super(metadata, + iterators.get(0).partitionKey(), + partitionDeletion, + columns, + mergeStaticRows(iterators, columns.statics, nowInSec, listener, partitionDeletion), + reversed, + mergeStats(iterators)); + + this.mergeIterator = MergeIterator.get(iterators, + reversed ? metadata.comparator.reversed() : metadata.comparator, + new MergeReducer(iterators.size(), reversed, nowInSec, listener)); + this.listener = listener; + } + + private static UnfilteredRowMergeIterator create(List<UnfilteredRowIterator> iterators, int nowInSec, MergeListener listener) + { + try + { + checkForInvalidInput(iterators); + return new UnfilteredRowMergeIterator(iterators.get(0).metadata(), + iterators, + collectColumns(iterators), + collectPartitionLevelDeletion(iterators, listener), + nowInSec, + iterators.get(0).isReverseOrder(), + listener); + } + catch (RuntimeException | Error e) + { + try + { + FBUtilities.closeAll(iterators); + } + catch (Exception suppressed) + { + e.addSuppressed(suppressed); + } + throw e; + } + } + + @SuppressWarnings("resource") // We're not really creating any resource here + private static void checkForInvalidInput(List<UnfilteredRowIterator> iterators) + { + if (iterators.isEmpty()) + return; + + UnfilteredRowIterator first = iterators.get(0); + for (int i = 1; i < iterators.size(); i++) + { + UnfilteredRowIterator iter = iterators.get(i); + assert first.metadata().cfId.equals(iter.metadata().cfId); + assert first.partitionKey().equals(iter.partitionKey()); + assert first.isReverseOrder() == iter.isReverseOrder(); + } + } + + @SuppressWarnings("resource") // We're not really creating any resource here + private static DeletionTime collectPartitionLevelDeletion(List<UnfilteredRowIterator> iterators, MergeListener listener) + { + DeletionTime[] versions = listener == null ? null : new DeletionTime[iterators.size()]; + + DeletionTime delTime = DeletionTime.LIVE; + for (int i = 0; i < iterators.size(); i++) + { + UnfilteredRowIterator iter = iterators.get(i); + DeletionTime iterDeletion = iter.partitionLevelDeletion(); + if (listener != null) + versions[i] = iterDeletion; + if (!delTime.supersedes(iterDeletion)) + delTime = iterDeletion; + } + if (listener != null) + listener.onMergedPartitionLevelDeletion(delTime, versions); + return delTime; + } + + private static Row mergeStaticRows(List<UnfilteredRowIterator> iterators, + Columns columns, + int nowInSec, + MergeListener listener, + DeletionTime partitionDeletion) + { + if (columns.isEmpty()) + return Rows.EMPTY_STATIC_ROW; + + if (iterators.stream().allMatch(iter -> iter.staticRow().isEmpty())) + return Rows.EMPTY_STATIC_ROW; + + Row.Merger merger = new Row.Merger(iterators.size(), nowInSec, columns.hasComplex()); + for (int i = 0; i < iterators.size(); i++) + merger.add(i, iterators.get(i).staticRow()); + + Row merged = merger.merge(partitionDeletion); + if (merged == null) + merged = Rows.EMPTY_STATIC_ROW; + if (listener != null) + listener.onMergedRows(merged, merger.mergedRows()); + return merged; + } + + private static PartitionColumns collectColumns(List<UnfilteredRowIterator> iterators) + { + PartitionColumns first = iterators.get(0).columns(); + Columns statics = first.statics; + Columns regulars = first.regulars; + for (int i = 1; i < iterators.size(); i++) + { + PartitionColumns cols = iterators.get(i).columns(); + statics = statics.mergeTo(cols.statics); + regulars = regulars.mergeTo(cols.regulars); + } + return statics == first.statics && regulars == first.regulars + ? first + : new PartitionColumns(statics, regulars); + } + + private static EncodingStats mergeStats(List<UnfilteredRowIterator> iterators) + { + EncodingStats stats = EncodingStats.NO_STATS; + for (UnfilteredRowIterator iter : iterators) + stats = stats.mergeWith(iter.stats()); + return stats; + } + + protected Unfiltered computeNext() + { + while (mergeIterator.hasNext()) + { + Unfiltered merged = mergeIterator.next(); + if (merged != null) + return merged; + } + return endOfData(); + } + + public void close() + { + // This will close the input iterators + FileUtils.closeQuietly(mergeIterator); + + if (listener != null) + listener.close(); + } + + private class MergeReducer extends MergeIterator.Reducer<Unfiltered, Unfiltered> + { + private final MergeListener listener; + + private Unfiltered.Kind nextKind; + + private final Row.Merger rowMerger; + private final RangeTombstoneMarker.Merger markerMerger; + + private MergeReducer(int size, boolean reversed, int nowInSec, MergeListener listener) + { + this.rowMerger = new Row.Merger(size, nowInSec, columns().regulars.hasComplex()); + this.markerMerger = new RangeTombstoneMarker.Merger(size, partitionLevelDeletion(), reversed); + this.listener = listener; + } + + @Override + public boolean trivialReduceIsTrivial() + { + // If we have a listener, we must signal it even when we have a single version + return listener == null; + } + + public void reduce(int idx, Unfiltered current) + { + nextKind = current.kind(); + if (nextKind == Unfiltered.Kind.ROW) + rowMerger.add(idx, (Row)current); + else + markerMerger.add(idx, (RangeTombstoneMarker)current); + } + + protected Unfiltered getReduced() + { + if (nextKind == Unfiltered.Kind.ROW) + { + Row merged = rowMerger.merge(markerMerger.activeDeletion()); + if (listener != null) + listener.onMergedRows(merged == null ? BTreeRow.emptyRow(rowMerger.mergedClustering()) : merged, rowMerger.mergedRows()); + return merged; + } + else + { + RangeTombstoneMarker merged = markerMerger.merge(); + if (listener != null) + listener.onMergedRangeTombstoneMarkers(merged, markerMerger.mergedMarkers()); + return merged; + } + } + + protected void onKeyChange() + { + if (nextKind == Unfiltered.Kind.ROW) + rowMerger.clear(); + else + markerMerger.clear(); + } + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/bb56193a/test/unit/org/apache/cassandra/cql3/CQLTester.java ---------------------------------------------------------------------- diff --cc test/unit/org/apache/cassandra/cql3/CQLTester.java index e3dc220,98b8e23..c9c4631 --- a/test/unit/org/apache/cassandra/cql3/CQLTester.java +++ b/test/unit/org/apache/cassandra/cql3/CQLTester.java @@@ -366,21 -278,20 +366,39 @@@ public abstract class CQLTeste return list.isEmpty() ? Collections.<String>emptyList() : new ArrayList<>(list); } + public ColumnFamilyStore getCurrentColumnFamilyStore() + { + String currentTable = currentTable(); + return currentTable == null + ? null + : Keyspace.open(KEYSPACE).getColumnFamilyStore(currentTable); + } + public void flush() { - try - { - String currentTable = currentTable(); - if (currentTable != null) - Keyspace.open(KEYSPACE).getColumnFamilyStore(currentTable).forceFlush().get(); - } - catch (InterruptedException | ExecutionException e) - { - throw new RuntimeException(e); - } + ColumnFamilyStore store = getCurrentColumnFamilyStore(); + if (store != null) + store.forceBlockingFlush(); + } + ++ @FunctionalInterface ++ public interface CheckedFunction { ++ void apply() throws Throwable; ++ } ++ ++ /** ++ * Runs the given function before and after a flush of sstables. This is useful for checking that behavior is ++ * the same whether data is in memtables or sstables. ++ * @param runnable ++ * @throws Throwable ++ */ ++ public void beforeAndAfterFlush(CheckedFunction runnable) throws Throwable ++ { ++ runnable.apply(); ++ flush(); ++ runnable.apply(); + } + public void compact() { try http://git-wip-us.apache.org/repos/asf/cassandra/blob/bb56193a/test/unit/org/apache/cassandra/cql3/validation/entities/SecondaryIndexTest.java ---------------------------------------------------------------------- diff --cc test/unit/org/apache/cassandra/cql3/validation/entities/SecondaryIndexTest.java index f9802d7,11d2462..0cf13bd --- a/test/unit/org/apache/cassandra/cql3/validation/entities/SecondaryIndexTest.java +++ b/test/unit/org/apache/cassandra/cql3/validation/entities/SecondaryIndexTest.java @@@ -17,47 -17,38 +17,49 @@@ */ package org.apache.cassandra.cql3.validation.entities; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - import java.nio.ByteBuffer; -import java.util.HashMap; -import java.util.List; -import java.util.Locale; -import java.util.Map; -import java.util.Set; -import java.util.UUID; +import java.util.*; +import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; +import org.apache.commons.lang3.StringUtils; +import org.junit.Test; + +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.config.ColumnDefinition; +import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.cql3.CQLTester; -import org.apache.cassandra.db.ColumnFamily; +import org.apache.cassandra.cql3.ColumnIdentifier; +import org.apache.cassandra.cql3.QueryProcessor; +import org.apache.cassandra.cql3.restrictions.StatementRestrictions; +import org.apache.cassandra.cql3.statements.IndexTarget; import org.apache.cassandra.db.ColumnFamilyStore; -import org.apache.cassandra.db.DecoratedKey; -import org.apache.cassandra.db.IndexExpression; -import org.apache.cassandra.db.composites.CellName; -import org.apache.cassandra.db.index.IndexNotAvailableException; -import org.apache.cassandra.db.index.PerRowSecondaryIndex; -import org.apache.cassandra.db.index.SecondaryIndexSearcher; -import org.apache.cassandra.db.index.composites.CompositesSearcher; +import org.apache.cassandra.db.DeletionTime; +import org.apache.cassandra.db.marshal.AbstractType; +import org.apache.cassandra.db.rows.Cell; +import org.apache.cassandra.db.rows.Row; import org.apache.cassandra.exceptions.ConfigurationException; -import org.apache.cassandra.exceptions.InvalidRequestException; import org.apache.cassandra.exceptions.SyntaxException; -import org.apache.cassandra.utils.FBUtilities; -import org.apache.cassandra.utils.concurrent.OpOrder.Group; -import org.apache.commons.lang3.StringUtils; -import org.junit.Test; - +import org.apache.cassandra.index.IndexNotAvailableException; +import org.apache.cassandra.index.SecondaryIndexManager; +import org.apache.cassandra.index.StubIndex; +import org.apache.cassandra.index.internal.CustomCassandraIndex; +import org.apache.cassandra.schema.IndexMetadata; +import org.apache.cassandra.service.ClientState; +import org.apache.cassandra.transport.messages.ResultMessage; +import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.utils.MD5Digest; +import org.apache.cassandra.utils.Pair; + +import static org.apache.cassandra.Util.throwAssert; + import static org.apache.cassandra.utils.ByteBufferUtil.EMPTY_BYTE_BUFFER; + import static org.apache.cassandra.utils.ByteBufferUtil.bytes; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; public class SecondaryIndexTest extends CQLTester { @@@ -949,77 -684,190 +951,206 @@@ } @Test + public void droppingIndexInvalidatesPreparedStatements() throws Throwable + { + createTable("CREATE TABLE %s (a int, b int, c int, PRIMARY KEY ((a), b))"); + createIndex("CREATE INDEX c_idx ON %s(c)"); + MD5Digest cqlId = prepareStatement("SELECT * FROM %s.%s WHERE c=?", false).statementId; + Integer thriftId = prepareStatement("SELECT * FROM %s.%s WHERE c=?", true).toThriftPreparedResult().getItemId(); + + assertNotNull(QueryProcessor.instance.getPrepared(cqlId)); + assertNotNull(QueryProcessor.instance.getPreparedForThrift(thriftId)); + + dropIndex("DROP INDEX %s.c_idx"); + + assertNull(QueryProcessor.instance.getPrepared(cqlId)); + assertNull(QueryProcessor.instance.getPreparedForThrift(thriftId)); + } + + // See CASSANDRA-11021 + @Test + public void testIndexesOnNonStaticColumnsWhereSchemaIncludesStaticColumns() throws Throwable + { + createTable("CREATE TABLE %s (a int, b int, c int static, d int, PRIMARY KEY (a, b))"); + createIndex("CREATE INDEX b_idx on %s(b)"); + createIndex("CREATE INDEX d_idx on %s(d)"); + + execute("INSERT INTO %s (a, b, c ,d) VALUES (0, 0, 0, 0)"); + execute("INSERT INTO %s (a, b, c, d) VALUES (1, 1, 1, 1)"); + assertRows(execute("SELECT * FROM %s WHERE b = 0"), row(0, 0, 0, 0)); + assertRows(execute("SELECT * FROM %s WHERE d = 1"), row(1, 1, 1, 1)); + + execute("UPDATE %s SET c = 2 WHERE a = 0"); + execute("UPDATE %s SET c = 3, d = 4 WHERE a = 1 AND b = 1"); + assertRows(execute("SELECT * FROM %s WHERE b = 0"), row(0, 0, 2, 0)); + assertRows(execute("SELECT * FROM %s WHERE d = 4"), row(1, 1, 3, 4)); + + execute("DELETE FROM %s WHERE a = 0"); + execute("DELETE FROM %s WHERE a = 1 AND b = 1"); + assertEmpty(execute("SELECT * FROM %s WHERE b = 0")); + assertEmpty(execute("SELECT * FROM %s WHERE d = 3")); + } + ++ @Test + public void testWithEmptyRestrictionValueAndSecondaryIndex() throws Throwable + { + createTable("CREATE TABLE %s (pk blob, c blob, v blob, PRIMARY KEY ((pk), c))"); + createIndex("CREATE INDEX on %s(c)"); + createIndex("CREATE INDEX on %s(v)"); + + execute("INSERT INTO %s (pk, c, v) VALUES (?, ?, ?)", bytes("foo123"), bytes("1"), bytes("1")); + execute("INSERT INTO %s (pk, c, v) VALUES (?, ?, ?)", bytes("foo123"), bytes("2"), bytes("1")); + - for (boolean flush : new boolean[]{false, true}) - { - if (flush) - flush(); - ++ beforeAndAfterFlush(() -> { + // Test clustering columns restrictions + assertEmpty(execute("SELECT * FROM %s WHERE pk = textAsBlob('foo123') AND c = textAsBlob('');")); + + assertEmpty(execute("SELECT * FROM %s WHERE pk = textAsBlob('foo123') AND (c) = (textAsBlob(''));")); + + assertRows(execute("SELECT * FROM %s WHERE pk = textAsBlob('foo123') AND c IN (textAsBlob(''), textAsBlob('1'));"), + row(bytes("foo123"), bytes("1"), bytes("1"))); + + assertRows(execute("SELECT * FROM %s WHERE pk = textAsBlob('foo123') AND (c) IN ((textAsBlob('')), (textAsBlob('1')));"), + row(bytes("foo123"), bytes("1"), bytes("1"))); + + assertRows(execute("SELECT * FROM %s WHERE pk = textAsBlob('foo123') AND c > textAsBlob('') AND v = textAsBlob('1') ALLOW FILTERING;"), + row(bytes("foo123"), bytes("1"), bytes("1")), + row(bytes("foo123"), bytes("2"), bytes("1"))); + + assertRows(execute("SELECT * FROM %s WHERE pk = textAsBlob('foo123') AND c >= textAsBlob('') AND v = textAsBlob('1') ALLOW FILTERING;"), + row(bytes("foo123"), bytes("1"), bytes("1")), + row(bytes("foo123"), bytes("2"), bytes("1"))); + + assertRows(execute("SELECT * FROM %s WHERE pk = textAsBlob('foo123') AND (c) >= (textAsBlob('')) AND v = textAsBlob('1') ALLOW FILTERING;"), + row(bytes("foo123"), bytes("1"), bytes("1")), + row(bytes("foo123"), bytes("2"), bytes("1"))); + + assertEmpty(execute("SELECT * FROM %s WHERE pk = textAsBlob('foo123') AND c <= textAsBlob('') AND v = textAsBlob('1') ALLOW FILTERING;")); + + assertEmpty(execute("SELECT * FROM %s WHERE pk = textAsBlob('foo123') AND (c) <= (textAsBlob('')) AND v = textAsBlob('1') ALLOW FILTERING;")); + + assertEmpty(execute("SELECT * FROM %s WHERE pk = textAsBlob('foo123') AND (c) < (textAsBlob('')) AND v = textAsBlob('1') ALLOW FILTERING;")); + + assertEmpty(execute("SELECT * FROM %s WHERE pk = textAsBlob('foo123') AND c < textAsBlob('') AND v = textAsBlob('1') ALLOW FILTERING;")); + + assertEmpty(execute("SELECT * FROM %s WHERE pk = textAsBlob('foo123') AND c > textAsBlob('') AND c < textAsBlob('') AND v = textAsBlob('1') ALLOW FILTERING;")); + + assertEmpty(execute("SELECT * FROM %s WHERE pk = textAsBlob('foo123') AND (c) > (textAsBlob('')) AND (c) < (textAsBlob('')) AND v = textAsBlob('1') ALLOW FILTERING;")); - } ++ }); + + execute("INSERT INTO %s (pk, c, v) VALUES (?, ?, ?)", + bytes("foo123"), EMPTY_BYTE_BUFFER, bytes("1")); + - for (boolean flush : new boolean[]{false, true}) - { - if (flush) - flush(); ++ beforeAndAfterFlush(() -> { + + assertRows(execute("SELECT * FROM %s WHERE pk = textAsBlob('foo123') AND c = textAsBlob('');"), + row(bytes("foo123"), EMPTY_BYTE_BUFFER, bytes("1"))); + + assertRows(execute("SELECT * FROM %s WHERE pk = textAsBlob('foo123') AND (c) = (textAsBlob(''));"), + row(bytes("foo123"), EMPTY_BYTE_BUFFER, bytes("1"))); + + assertRows(execute("SELECT * FROM %s WHERE pk = textAsBlob('foo123') AND c IN (textAsBlob(''), textAsBlob('1'));"), + row(bytes("foo123"), EMPTY_BYTE_BUFFER, bytes("1")), + row(bytes("foo123"), bytes("1"), bytes("1"))); + + assertRows(execute("SELECT * FROM %s WHERE pk = textAsBlob('foo123') AND (c) IN ((textAsBlob('')), (textAsBlob('1')));"), + row(bytes("foo123"), EMPTY_BYTE_BUFFER, bytes("1")), + row(bytes("foo123"), bytes("1"), bytes("1"))); + + assertRows(execute("SELECT * FROM %s WHERE pk = textAsBlob('foo123') AND c > textAsBlob('') AND v = textAsBlob('1') ALLOW FILTERING;"), + row(bytes("foo123"), bytes("1"), bytes("1")), + row(bytes("foo123"), bytes("2"), bytes("1"))); + + assertRows(execute("SELECT * FROM %s WHERE pk = textAsBlob('foo123') AND c >= textAsBlob('') AND v = textAsBlob('1') ALLOW FILTERING;"), + row(bytes("foo123"), EMPTY_BYTE_BUFFER, bytes("1")), + row(bytes("foo123"), bytes("1"), bytes("1")), + row(bytes("foo123"), bytes("2"), bytes("1"))); + + assertRows(execute("SELECT * FROM %s WHERE pk = textAsBlob('foo123') AND (c) >= (textAsBlob('')) AND v = textAsBlob('1') ALLOW FILTERING;"), + row(bytes("foo123"), EMPTY_BYTE_BUFFER, bytes("1")), + row(bytes("foo123"), bytes("1"), bytes("1")), + row(bytes("foo123"), bytes("2"), bytes("1"))); + + assertRows(execute("SELECT * FROM %s WHERE pk = textAsBlob('foo123') AND c <= textAsBlob('') AND v = textAsBlob('1') ALLOW FILTERING;"), + row(bytes("foo123"), EMPTY_BYTE_BUFFER, bytes("1"))); + + assertRows(execute("SELECT * FROM %s WHERE pk = textAsBlob('foo123') AND (c) <= (textAsBlob('')) AND v = textAsBlob('1') ALLOW FILTERING;"), + row(bytes("foo123"), EMPTY_BYTE_BUFFER, bytes("1"))); + + assertEmpty(execute("SELECT * FROM %s WHERE pk = textAsBlob('foo123') AND c < textAsBlob('') AND v = textAsBlob('1') ALLOW FILTERING;")); + + assertEmpty(execute("SELECT * FROM %s WHERE pk = textAsBlob('foo123') AND (c) < (textAsBlob('')) AND v = textAsBlob('1') ALLOW FILTERING;")); + + assertEmpty(execute("SELECT * FROM %s WHERE pk = textAsBlob('foo123') AND c >= textAsBlob('') AND c < textAsBlob('') AND v = textAsBlob('1') ALLOW FILTERING;")); + + assertEmpty(execute("SELECT * FROM %s WHERE pk = textAsBlob('foo123') AND (c) >= (textAsBlob('')) AND c < textAsBlob('') AND v = textAsBlob('1') ALLOW FILTERING;")); + + // Test restrictions on non-primary key value + assertEmpty(execute("SELECT * FROM %s WHERE pk = textAsBlob('foo123') AND v = textAsBlob('');")); - } ++ }); + + execute("INSERT INTO %s (pk, c, v) VALUES (?, ?, ?)", + bytes("foo123"), bytes("3"), EMPTY_BYTE_BUFFER); + - for (boolean flush : new boolean[]{false, true}) - { - if (flush) - flush(); ++ beforeAndAfterFlush(() -> { + + assertRows(execute("SELECT * FROM %s WHERE pk = textAsBlob('foo123') AND v = textAsBlob('');"), + row(bytes("foo123"), bytes("3"), EMPTY_BYTE_BUFFER)); - } ++ }); + } + + @Test + public void testEmptyRestrictionValueWithSecondaryIndexAndCompactTables() throws Throwable + { + createTable("CREATE TABLE %s (pk blob, c blob, v blob, PRIMARY KEY ((pk), c)) WITH COMPACT STORAGE"); - assertInvalidMessage("Secondary indexes are not supported on PRIMARY KEY columns in COMPACT STORAGE tables", ++ assertInvalidMessage("Secondary indexes are not supported on COMPACT STORAGE tables that have clustering columns", + "CREATE INDEX on %s(c)"); + + createTable("CREATE TABLE %s (pk blob PRIMARY KEY, v blob) WITH COMPACT STORAGE"); + createIndex("CREATE INDEX on %s(v)"); + + execute("INSERT INTO %s (pk, v) VALUES (?, ?)", bytes("foo123"), bytes("1")); + + // Test restrictions on non-primary key value + assertEmpty(execute("SELECT * FROM %s WHERE pk = textAsBlob('foo123') AND v = textAsBlob('');")); + + execute("INSERT INTO %s (pk, v) VALUES (?, ?)", bytes("foo124"), EMPTY_BYTE_BUFFER); + + assertRows(execute("SELECT * FROM %s WHERE v = textAsBlob('');"), + row(bytes("foo124"), EMPTY_BYTE_BUFFER)); + } + - /** - * Custom index used to test the behavior of the system when the index is not ready. - * As Custom indices cannot by <code>PerColumnSecondaryIndex</code> we use a <code>PerRowSecondaryIndex</code> - * to avoid the check but return a <code>CompositesSearcher</code>. - */ - public static class IndexBlockingOnInitialization extends PerRowSecondaryIndex + private ResultMessage.Prepared prepareStatement(String cql, boolean forThrift) { - private volatile CountDownLatch latch = new CountDownLatch(1); - - @Override - public void index(ByteBuffer rowKey, ColumnFamily cf) - { - try - { - latch.await(); - } - catch (InterruptedException e) - { - Thread.interrupted(); - } - } - - @Override - public void delete(DecoratedKey key, Group opGroup) - { - } + return QueryProcessor.prepare(String.format(cql, KEYSPACE, currentTable()), + ClientState.forInternalCalls(), + forThrift); + } - @Override - public void init() - { - } + private void validateCell(Cell cell, ColumnDefinition def, ByteBuffer val, long timestamp) + { + assertNotNull(cell); + assertEquals(0, def.type.compare(cell.value(), val)); + assertEquals(timestamp, cell.timestamp()); + } - @Override - public void reload() - { - } + private static void assertColumnValue(int expected, String name, Row row, CFMetaData cfm) + { + ColumnDefinition col = cfm.getColumnDefinition(new ColumnIdentifier(name, true)); + AbstractType<?> type = col.type; + assertEquals(expected, type.compose(row.getCell(col).value())); + } - @Override - public void validateOptions() throws ConfigurationException - { - } + /** + * <code>CassandraIndex</code> that blocks during the initialization. + */ + public static class IndexBlockingOnInitialization extends CustomCassandraIndex + { + private final CountDownLatch latch = new CountDownLatch(1); - @Override - public String getIndexName() + public IndexBlockingOnInitialization(ColumnFamilyStore baseCfs, IndexMetadata indexDef) { - return "testIndex"; + super(baseCfs, indexDef); } @Override