http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/partitions/AbstractThreadUnsafePartition.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/partitions/AbstractThreadUnsafePartition.java b/src/java/org/apache/cassandra/db/partitions/AbstractThreadUnsafePartition.java new file mode 100644 index 0000000..d79ab06 --- /dev/null +++ b/src/java/org/apache/cassandra/db/partitions/AbstractThreadUnsafePartition.java @@ -0,0 +1,393 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db.partitions; + +import java.util.*; + +import com.google.common.collect.Lists; + +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.db.*; +import org.apache.cassandra.db.filter.ColumnFilter; +import org.apache.cassandra.db.rows.*; +import org.apache.cassandra.utils.SearchIterator; + +/** + * Abstract common class for all non-thread safe Partition implementations. + */ +public abstract class AbstractThreadUnsafePartition implements Partition, Iterable<Row> +{ + protected final CFMetaData metadata; + protected final DecoratedKey key; + + protected final PartitionColumns columns; + + protected final List<Row> rows; + + protected AbstractThreadUnsafePartition(CFMetaData metadata, + DecoratedKey key, + PartitionColumns columns, + List<Row> rows) + { + this.metadata = metadata; + this.key = key; + this.columns = columns; + this.rows = rows; + } + + public CFMetaData metadata() + { + return metadata; + } + + public DecoratedKey partitionKey() + { + return key; + } + + public DeletionTime partitionLevelDeletion() + { + return deletionInfo().getPartitionDeletion(); + } + + public PartitionColumns columns() + { + return columns; + } + + public abstract Row staticRow(); + + protected abstract boolean canHaveShadowedData(); + + /** + * The deletion info for the partition update. + * + * Note: do not cast the result to a {@code MutableDeletionInfo} to modify it! + * + * @return the deletion info for the partition update for use as read-only. + */ + public abstract DeletionInfo deletionInfo(); + + public int rowCount() + { + return rows.size(); + } + + public boolean isEmpty() + { + return deletionInfo().isLive() && rows.isEmpty() && staticRow().isEmpty(); + } + + @Override + public String toString() + { + StringBuilder sb = new StringBuilder(); + CFMetaData metadata = metadata(); + sb.append(String.format("Partition[%s.%s] key=%s columns=%s%s", + metadata().ksName, + metadata().cfName, + metadata().getKeyValidator().getString(partitionKey().getKey()), + columns(), + deletionInfo().isLive() ? "" : " " + deletionInfo())); + + if (staticRow() != Rows.EMPTY_STATIC_ROW) + sb.append("\n ").append(staticRow().toString(metadata, true)); + + // We use createRowIterator() directly instead of iterator() because that avoids + // sorting for PartitionUpdate (which inherit this method) and that is useful because + // 1) it can help with debugging and 2) we can't write after sorting but we want to + // be able to print an update while we build it (again for debugging) + for (Row row : this) + sb.append("\n ").append(row.toString(metadata, true)); + + return sb.toString(); + } + + public Row getRow(Clustering clustering) + { + Row row = searchIterator(ColumnFilter.selection(columns()), false).next(clustering); + // Note that for statics, this will never return null, this will return an empty row. However, + // it's more consistent for this method to return null if we don't really have a static row. + return row == null || (clustering == Clustering.STATIC_CLUSTERING && row.isEmpty()) ? null : row; + } + + /** + * Returns an iterator that iterators over the rows of this update in clustering order. + * + * @return an iterator over the rows of this partition. + */ + public Iterator<Row> iterator() + { + return rows.iterator(); + } + + public SearchIterator<Clustering, Row> searchIterator(final ColumnFilter columns, boolean reversed) + { + final RowSearcher searcher = reversed ? new ReverseRowSearcher() : new ForwardRowSearcher(); + return new SearchIterator<Clustering, Row>() + { + public boolean hasNext() + { + return !searcher.isDone(); + } + + public Row next(Clustering clustering) + { + if (clustering == Clustering.STATIC_CLUSTERING) + { + Row staticRow = staticRow(); + return staticRow.isEmpty() || columns.fetchedColumns().statics.isEmpty() + ? Rows.EMPTY_STATIC_ROW + : staticRow.filter(columns, partitionLevelDeletion(), true, metadata); + } + + Row row = searcher.search(clustering); + RangeTombstone rt = deletionInfo().rangeCovering(clustering); + + // A search iterator only return a row, so it doesn't allow to directly account for deletion that should apply to to row + // (the partition deletion or the deletion of a range tombstone that covers it). So if needs be, reuse the row deletion + // to carry the proper deletion on the row. + DeletionTime activeDeletion = partitionLevelDeletion(); + if (rt != null && rt.deletionTime().supersedes(activeDeletion)) + activeDeletion = rt.deletionTime(); + + if (row == null) + return activeDeletion.isLive() ? null : ArrayBackedRow.emptyDeletedRow(clustering, activeDeletion); + + return row.filter(columns, activeDeletion, true, metadata); + } + }; + } + + public UnfilteredRowIterator unfilteredIterator() + { + return unfilteredIterator(ColumnFilter.all(metadata()), Slices.ALL, false); + } + + public UnfilteredRowIterator unfilteredIterator(ColumnFilter columns, Slices slices, boolean reversed) + { + return slices.makeSliceIterator(sliceableUnfilteredIterator(columns, reversed)); + } + + protected SliceableUnfilteredRowIterator sliceableUnfilteredIterator() + { + return sliceableUnfilteredIterator(ColumnFilter.all(metadata()), false); + } + + protected SliceableUnfilteredRowIterator sliceableUnfilteredIterator(ColumnFilter selection, boolean reversed) + { + return new SliceableIterator(this, selection, reversed); + } + + /** + * Simple binary search for a given row (in the rows list). + * + * The return value has the exact same meaning that the one of Collections.binarySearch() but + * we don't use the later because we're searching for a 'Clustering' in an array of 'Row' (and while + * both are Clusterable, it's slightly faster to use the 'Clustering' comparison (see comment on + * ClusteringComparator.rowComparator())). + */ + private int binarySearch(Clustering clustering, int fromIndex, int toIndex) + { + ClusteringComparator comparator = metadata().comparator; + int low = fromIndex; + int mid = toIndex; + int high = mid - 1; + int result = -1; + while (low <= high) + { + mid = (low + high) >> 1; + if ((result = comparator.compare(clustering, rows.get(mid).clustering())) > 0) + low = mid + 1; + else if (result == 0) + return mid; + else + high = mid - 1; + } + return -mid - (result < 0 ? 1 : 2); + } + + private class SliceableIterator extends AbstractUnfilteredRowIterator implements SliceableUnfilteredRowIterator + { + private final ColumnFilter columns; + private RowSearcher searcher; + + private Iterator<Unfiltered> iterator; + + private SliceableIterator(AbstractThreadUnsafePartition partition, ColumnFilter columns, boolean isReverseOrder) + { + super(partition.metadata(), + partition.partitionKey(), + partition.partitionLevelDeletion(), + columns.fetchedColumns(), + partition.staticRow().isEmpty() ? Rows.EMPTY_STATIC_ROW : partition.staticRow().filter(columns, partition.partitionLevelDeletion(), false, partition.metadata()), + isReverseOrder, + partition.stats()); + this.columns = columns; + } + + protected Unfiltered computeNext() + { + if (iterator == null) + iterator = merge(isReverseOrder ? Lists.reverse(rows).iterator(): iterator(), deletionInfo().rangeIterator(isReverseOrder())); + + return iterator.hasNext() ? iterator.next() : endOfData(); + } + + public Iterator<Unfiltered> slice(Slice slice) + { + if (searcher == null) + searcher = isReverseOrder() ? new ReverseRowSearcher() : new ForwardRowSearcher(); + return merge(searcher.slice(slice), deletionInfo().rangeIterator(slice, isReverseOrder())); + } + + private Iterator<Unfiltered> merge(Iterator<Row> rows, Iterator<RangeTombstone> ranges) + { + return new RowAndDeletionMergeIterator(metadata, + partitionKey, + partitionLevelDeletion, + columns, + staticRow(), + isReverseOrder(), + stats(), + rows, + ranges, + canHaveShadowedData()); + } + } + + /** + * Utility class to search for rows or slice of rows in order. + */ + private abstract class RowSearcher + { + public abstract boolean isDone(); + + public abstract Row search(Clustering name); + + public abstract Iterator<Row> slice(Slice slice); + + protected int search(Clustering clustering, int from, int to) + { + return binarySearch(clustering, from, to); + } + + protected int search(Slice.Bound bound, int from, int to) + { + return Collections.binarySearch(rows.subList(from, to), bound, metadata.comparator); + } + } + + private class ForwardRowSearcher extends RowSearcher + { + private int nextIdx = 0; + + public boolean isDone() + { + return nextIdx >= rows.size(); + } + + public Row search(Clustering name) + { + if (isDone()) + return null; + + int idx = search(name, nextIdx, rows.size()); + if (idx < 0) + { + nextIdx = -idx - 1; + return null; + } + else + { + nextIdx = idx + 1; + return rows.get(idx); + } + } + + public Iterator<Row> slice(Slice slice) + { + // Note that because a Slice.Bound can never sort equally to a Clustering, we know none of the search will + // be a match, so we save from testing for it. + + final int start = -search(slice.start(), nextIdx, rows.size()) - 1; // First index to include + if (start >= rows.size()) + return Collections.emptyIterator(); + + final int end = -search(slice.end(), start, rows.size()) - 1; // First index to exclude + + // Remember the end to speed up potential further slice search + nextIdx = end; + + if (start >= end) + return Collections.emptyIterator(); + + return rows.subList(start, end).iterator(); + } + } + + private class ReverseRowSearcher extends RowSearcher + { + private int nextIdx = rows.size() - 1; + + public boolean isDone() + { + return nextIdx < 0; + } + + public Row search(Clustering name) + { + if (isDone()) + return null; + + int idx = search(name, 0, nextIdx); + if (idx < 0) + { + // The insertion point is the first element greater than name, so we want start from the previous one next time + nextIdx = -idx - 2; + return null; + } + else + { + nextIdx = idx - 1; + return rows.get(idx); + } + } + + public Iterator<Row> slice(Slice slice) + { + // Note that because a Slice.Bound can never sort equally to a Clustering, we know none of the search will + // be a match, so we save from testing for it. + + // The insertion point is the first element greater than slice.end(), so we want the previous index + final int start = -search(slice.end(), 0, nextIdx + 1) - 2; // First index to include + if (start < 0) + return Collections.emptyIterator(); + + final int end = -search(slice.start(), 0, start + 1) - 2; // First index to exclude + + // Remember the end to speed up potential further slice search + nextIdx = end; + + if (start < end) + return Collections.emptyIterator(); + + return Lists.reverse(rows.subList(end+1, start+1)).iterator(); + } + } +}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/partitions/AlteringUnfilteredPartitionIterator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/partitions/AlteringUnfilteredPartitionIterator.java b/src/java/org/apache/cassandra/db/partitions/AlteringUnfilteredPartitionIterator.java new file mode 100644 index 0000000..f7d7222 --- /dev/null +++ b/src/java/org/apache/cassandra/db/partitions/AlteringUnfilteredPartitionIterator.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db.partitions; + +import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.db.rows.*; + +/** + * A partition iterator that allows to filter/modify the unfiltered from the + * underlying iterators. + */ +public abstract class AlteringUnfilteredPartitionIterator extends WrappingUnfilteredPartitionIterator +{ + protected AlteringUnfilteredPartitionIterator(UnfilteredPartitionIterator wrapped) + { + super(wrapped); + } + + protected Row computeNextStatic(DecoratedKey partitionKey, Row row) + { + return row; + } + + protected Row computeNext(DecoratedKey partitionKey, Row row) + { + return row; + } + + protected RangeTombstoneMarker computeNext(DecoratedKey partitionKey, RangeTombstoneMarker marker) + { + return marker; + } + + @Override + protected UnfilteredRowIterator computeNext(UnfilteredRowIterator iter) + { + final DecoratedKey partitionKey = iter.partitionKey(); + return new AlteringUnfilteredRowIterator(iter) + { + protected Row computeNextStatic(Row row) + { + return AlteringUnfilteredPartitionIterator.this.computeNextStatic(partitionKey, row); + } + + protected Row computeNext(Row row) + { + return AlteringUnfilteredPartitionIterator.this.computeNext(partitionKey, row); + } + + protected RangeTombstoneMarker computeNext(RangeTombstoneMarker marker) + { + return AlteringUnfilteredPartitionIterator.this.computeNext(partitionKey, marker); + } + }; + } +} + http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/partitions/ArrayBackedCachedPartition.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/partitions/ArrayBackedCachedPartition.java b/src/java/org/apache/cassandra/db/partitions/ArrayBackedCachedPartition.java index bec8056..f39245b 100644 --- a/src/java/org/apache/cassandra/db/partitions/ArrayBackedCachedPartition.java +++ b/src/java/org/apache/cassandra/db/partitions/ArrayBackedCachedPartition.java @@ -18,11 +18,11 @@ package org.apache.cassandra.db.partitions; import java.io.IOException; -import java.nio.ByteBuffer; +import java.util.*; import org.apache.cassandra.config.CFMetaData; -import org.apache.cassandra.config.ColumnDefinition; import org.apache.cassandra.db.*; +import org.apache.cassandra.db.filter.DataLimits; import org.apache.cassandra.db.rows.*; import org.apache.cassandra.io.ISerializer; import org.apache.cassandra.io.util.DataInputPlus; @@ -33,24 +33,31 @@ public class ArrayBackedCachedPartition extends ArrayBackedPartition implements { private final int createdAtInSec; - // Note that those fields are really immutable, but we can't easily pass their values to - // the ctor so they are not final. - private int cachedLiveRows; - private int rowsWithNonExpiringCells; + private final int cachedLiveRows; + private final int rowsWithNonExpiringCells; - private int nonTombstoneCellCount; - private int nonExpiringLiveCells; + private final int nonTombstoneCellCount; + private final int nonExpiringLiveCells; private ArrayBackedCachedPartition(CFMetaData metadata, DecoratedKey partitionKey, - DeletionTime deletionTime, PartitionColumns columns, - int initialRowCapacity, - boolean sortable, - int createdAtInSec) + Row staticRow, + List<Row> rows, + DeletionInfo deletionInfo, + RowStats stats, + int createdAtInSec, + int cachedLiveRows, + int rowsWithNonExpiringCells, + int nonTombstoneCellCount, + int nonExpiringLiveCells) { - super(metadata, partitionKey, deletionTime, columns, initialRowCapacity, sortable); + super(metadata, partitionKey, columns, staticRow, rows, deletionInfo, stats); this.createdAtInSec = createdAtInSec; + this.cachedLiveRows = cachedLiveRows; + this.rowsWithNonExpiringCells = rowsWithNonExpiringCells; + this.nonTombstoneCellCount = nonTombstoneCellCount; + this.nonExpiringLiveCells = nonExpiringLiveCells; } /** @@ -65,7 +72,7 @@ public class ArrayBackedCachedPartition extends ArrayBackedPartition implements */ public static ArrayBackedCachedPartition create(UnfilteredRowIterator iterator, int nowInSec) { - return create(iterator, 4, nowInSec); + return create(iterator, 16, nowInSec); } /** @@ -82,30 +89,76 @@ public class ArrayBackedCachedPartition extends ArrayBackedPartition implements */ public static ArrayBackedCachedPartition create(UnfilteredRowIterator iterator, int initialRowCapacity, int nowInSec) { - ArrayBackedCachedPartition partition = new ArrayBackedCachedPartition(iterator.metadata(), - iterator.partitionKey(), - iterator.partitionLevelDeletion(), - iterator.columns(), - initialRowCapacity, - iterator.isReverseOrder(), - nowInSec); + CFMetaData metadata = iterator.metadata(); + boolean reversed = iterator.isReverseOrder(); - partition.staticRow = iterator.staticRow().takeAlias(); + List<Row> rows = new ArrayList<>(initialRowCapacity); + MutableDeletionInfo.Builder deletionBuilder = MutableDeletionInfo.builder(iterator.partitionLevelDeletion(), metadata.comparator, reversed); - Writer writer = partition.new Writer(nowInSec); - RangeTombstoneCollector markerCollector = partition.new RangeTombstoneCollector(iterator.isReverseOrder()); + int cachedLiveRows = 0; + int rowsWithNonExpiringCells = 0; - copyAll(iterator, writer, markerCollector, partition); + int nonTombstoneCellCount = 0; + int nonExpiringLiveCells = 0; - return partition; + while (iterator.hasNext()) + { + Unfiltered unfiltered = iterator.next(); + if (unfiltered.kind() == Unfiltered.Kind.ROW) + { + Row row = (Row)unfiltered; + rows.add(row); + + // Collect stats + if (row.hasLiveData(nowInSec)) + ++cachedLiveRows; + + boolean hasNonExpiringCell = false; + for (Cell cell : row.cells()) + { + if (!cell.isTombstone()) + { + ++nonTombstoneCellCount; + if (!cell.isExpiring()) + { + hasNonExpiringCell = true; + ++nonExpiringLiveCells; + } + } + } + + if (hasNonExpiringCell) + ++rowsWithNonExpiringCells; + } + else + { + deletionBuilder.add((RangeTombstoneMarker)unfiltered); + } + } + + if (reversed) + Collections.reverse(rows); + + return new ArrayBackedCachedPartition(metadata, + iterator.partitionKey(), + iterator.columns(), + iterator.staticRow(), + rows, + deletionBuilder.build(), + iterator.stats(), + nowInSec, + cachedLiveRows, + rowsWithNonExpiringCells, + nonTombstoneCellCount, + nonExpiringLiveCells); } public Row lastRow() { - if (rows == 0) + if (rows.isEmpty()) return null; - return new InternalReusableRow().setTo(rows - 1); + return rows.get(rows.size() - 1); } /** @@ -146,62 +199,6 @@ public class ArrayBackedCachedPartition extends ArrayBackedPartition implements return nonExpiringLiveCells; } - // Writers that collect the values for 'cachedLiveRows', 'rowsWithNonExpiringCells', 'nonTombstoneCellCount' - // and 'nonExpiringLiveCells'. - protected class Writer extends AbstractPartitionData.Writer - { - private final int nowInSec; - - private boolean hasLiveData; - private boolean hasNonExpiringCell; - - protected Writer(int nowInSec) - { - super(true); - this.nowInSec = nowInSec; - } - - @Override - public void writePartitionKeyLivenessInfo(LivenessInfo info) - { - super.writePartitionKeyLivenessInfo(info); - if (info.isLive(nowInSec)) - hasLiveData = true; - } - - @Override - public void writeCell(ColumnDefinition column, boolean isCounter, ByteBuffer value, LivenessInfo info, CellPath path) - { - super.writeCell(column, isCounter, value, info, path); - - if (info.isLive(nowInSec)) - { - hasLiveData = true; - if (!info.hasTTL()) - { - hasNonExpiringCell = true; - ++ArrayBackedCachedPartition.this.nonExpiringLiveCells; - } - } - - if (!info.hasLocalDeletionTime() || info.hasTTL()) - ++ArrayBackedCachedPartition.this.nonTombstoneCellCount; - } - - @Override - public void endOfRow() - { - super.endOfRow(); - if (hasLiveData) - ++ArrayBackedCachedPartition.this.cachedLiveRows; - if (hasNonExpiringCell) - ++ArrayBackedCachedPartition.this.rowsWithNonExpiringCells; - - hasLiveData = false; - hasNonExpiringCell = false; - } - } - static class Serializer implements ISerializer<CachedPartition> { public void serialize(CachedPartition partition, DataOutputPlus out) throws IOException @@ -210,9 +207,13 @@ public class ArrayBackedCachedPartition extends ArrayBackedPartition implements ArrayBackedCachedPartition p = (ArrayBackedCachedPartition)partition; out.writeInt(p.createdAtInSec); + out.writeInt(p.cachedLiveRows); + out.writeInt(p.rowsWithNonExpiringCells); + out.writeInt(p.nonTombstoneCellCount); + out.writeInt(p.nonExpiringLiveCells); try (UnfilteredRowIterator iter = p.sliceableUnfilteredIterator()) { - UnfilteredRowIteratorSerializer.serializer.serialize(iter, out, MessagingService.current_version, p.rows); + UnfilteredRowIteratorSerializer.serializer.serialize(iter, out, MessagingService.current_version, p.rowCount()); } } @@ -226,18 +227,42 @@ public class ArrayBackedCachedPartition extends ArrayBackedPartition implements // is slightly faster. int createdAtInSec = in.readInt(); + int cachedLiveRows = in.readInt(); + int rowsWithNonExpiringCells = in.readInt(); + int nonTombstoneCellCount = in.readInt(); + int nonExpiringLiveCells = in.readInt(); + + UnfilteredRowIteratorSerializer.Header header = UnfilteredRowIteratorSerializer.serializer.deserializeHeader(in, MessagingService.current_version, SerializationHelper.Flag.LOCAL); + assert !header.isReversed && header.rowEstimate >= 0; - UnfilteredRowIteratorSerializer.Header h = UnfilteredRowIteratorSerializer.serializer.deserializeHeader(in, MessagingService.current_version, SerializationHelper.Flag.LOCAL); - assert !h.isReversed && h.rowEstimate >= 0; + MutableDeletionInfo.Builder deletionBuilder = MutableDeletionInfo.builder(header.partitionDeletion, header.metadata.comparator, false); + List<Row> rows = new ArrayList<>(header.rowEstimate); - ArrayBackedCachedPartition partition = new ArrayBackedCachedPartition(h.metadata, h.key, h.partitionDeletion, h.sHeader.columns(), h.rowEstimate, false, createdAtInSec); - partition.staticRow = h.staticRow; + try (UnfilteredRowIterator partition = UnfilteredRowIteratorSerializer.serializer.deserialize(in, MessagingService.current_version, SerializationHelper.Flag.LOCAL, header)) + { + while (partition.hasNext()) + { + Unfiltered unfiltered = partition.next(); + if (unfiltered.kind() == Unfiltered.Kind.ROW) + rows.add((Row)unfiltered); + else + deletionBuilder.add((RangeTombstoneMarker)unfiltered); + } + } - Writer writer = partition.new Writer(createdAtInSec); - RangeTombstoneMarker.Writer markerWriter = partition.new RangeTombstoneCollector(false); + return new ArrayBackedCachedPartition(header.metadata, + header.key, + header.sHeader.columns(), + header.staticRow, + rows, + deletionBuilder.build(), + header.sHeader.stats(), + createdAtInSec, + cachedLiveRows, + rowsWithNonExpiringCells, + nonTombstoneCellCount, + nonExpiringLiveCells); - UnfilteredRowIteratorSerializer.serializer.deserialize(in, new SerializationHelper(MessagingService.current_version, SerializationHelper.Flag.LOCAL), h.sHeader, writer, markerWriter); - return partition; } public long serializedSize(CachedPartition partition) @@ -248,7 +273,11 @@ public class ArrayBackedCachedPartition extends ArrayBackedPartition implements try (UnfilteredRowIterator iter = p.sliceableUnfilteredIterator()) { return TypeSizes.sizeof(p.createdAtInSec) - + UnfilteredRowIteratorSerializer.serializer.serializedSize(iter, MessagingService.current_version, p.rows); + + TypeSizes.sizeof(p.cachedLiveRows) + + TypeSizes.sizeof(p.rowsWithNonExpiringCells) + + TypeSizes.sizeof(p.nonTombstoneCellCount) + + TypeSizes.sizeof(p.nonExpiringLiveCells) + + UnfilteredRowIteratorSerializer.serializer.serializedSize(iter, MessagingService.current_version, p.rowCount()); } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/partitions/ArrayBackedPartition.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/partitions/ArrayBackedPartition.java b/src/java/org/apache/cassandra/db/partitions/ArrayBackedPartition.java index d7f3a88..4485117 100644 --- a/src/java/org/apache/cassandra/db/partitions/ArrayBackedPartition.java +++ b/src/java/org/apache/cassandra/db/partitions/ArrayBackedPartition.java @@ -17,28 +17,30 @@ */ package org.apache.cassandra.db.partitions; -import java.io.DataInput; -import java.io.IOException; -import java.nio.ByteBuffer; +import java.util.*; import org.apache.cassandra.config.CFMetaData; -import org.apache.cassandra.config.ColumnDefinition; import org.apache.cassandra.db.*; import org.apache.cassandra.db.rows.*; -import org.apache.cassandra.io.ISerializer; -import org.apache.cassandra.io.util.DataOutputPlus; -import org.apache.cassandra.net.MessagingService; -public class ArrayBackedPartition extends AbstractPartitionData +public class ArrayBackedPartition extends AbstractThreadUnsafePartition { + private final Row staticRow; + private final DeletionInfo deletionInfo; + private final RowStats stats; + protected ArrayBackedPartition(CFMetaData metadata, DecoratedKey partitionKey, - DeletionTime deletionTime, PartitionColumns columns, - int initialRowCapacity, - boolean sortable) + Row staticRow, + List<Row> rows, + DeletionInfo deletionInfo, + RowStats stats) { - super(metadata, partitionKey, deletionTime, columns, initialRowCapacity, sortable); + super(metadata, partitionKey, columns, rows); + this.staticRow = staticRow; + this.deletionInfo = deletionInfo; + this.stats = stats; } /** @@ -52,7 +54,7 @@ public class ArrayBackedPartition extends AbstractPartitionData */ public static ArrayBackedPartition create(UnfilteredRowIterator iterator) { - return create(iterator, 4); + return create(iterator, 16); } /** @@ -68,37 +70,45 @@ public class ArrayBackedPartition extends AbstractPartitionData */ public static ArrayBackedPartition create(UnfilteredRowIterator iterator, int initialRowCapacity) { - ArrayBackedPartition partition = new ArrayBackedPartition(iterator.metadata(), - iterator.partitionKey(), - iterator.partitionLevelDeletion(), - iterator.columns(), - initialRowCapacity, - iterator.isReverseOrder()); - - partition.staticRow = iterator.staticRow().takeAlias(); - - Writer writer = partition.new Writer(true); - RangeTombstoneCollector markerCollector = partition.new RangeTombstoneCollector(iterator.isReverseOrder()); + CFMetaData metadata = iterator.metadata(); + boolean reversed = iterator.isReverseOrder(); - copyAll(iterator, writer, markerCollector, partition); - - return partition; - } + List<Row> rows = new ArrayList<>(initialRowCapacity); + MutableDeletionInfo.Builder deletionBuilder = MutableDeletionInfo.builder(iterator.partitionLevelDeletion(), metadata.comparator, reversed); - protected static void copyAll(UnfilteredRowIterator iterator, Writer writer, RangeTombstoneCollector markerCollector, ArrayBackedPartition partition) - { while (iterator.hasNext()) { Unfiltered unfiltered = iterator.next(); if (unfiltered.kind() == Unfiltered.Kind.ROW) - ((Row) unfiltered).copyTo(writer); + rows.add((Row)unfiltered); else - ((RangeTombstoneMarker) unfiltered).copyTo(markerCollector); + deletionBuilder.add((RangeTombstoneMarker)unfiltered); } - // A Partition (or more precisely AbstractPartitionData) always assumes that its data is in clustering - // order. So if we've just added them in reverse clustering order, reverse them. - if (iterator.isReverseOrder()) - partition.reverse(); + if (reversed) + Collections.reverse(rows); + + return new ArrayBackedPartition(metadata, iterator.partitionKey(), iterator.columns(), iterator.staticRow(), rows, deletionBuilder.build(), iterator.stats()); + } + + protected boolean canHaveShadowedData() + { + // We only create instances from UnfilteredRowIterator that don't have shadowed data + return false; + } + + public Row staticRow() + { + return staticRow; + } + + public DeletionInfo deletionInfo() + { + return deletionInfo; + } + + public RowStats stats() + { + return stats; } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/partitions/AtomicBTreePartition.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/partitions/AtomicBTreePartition.java b/src/java/org/apache/cassandra/db/partitions/AtomicBTreePartition.java index 6a888a6..1361422 100644 --- a/src/java/org/apache/cassandra/db/partitions/AtomicBTreePartition.java +++ b/src/java/org/apache/cassandra/db/partitions/AtomicBTreePartition.java @@ -26,11 +26,8 @@ import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.collect.AbstractIterator; -import com.google.common.collect.Iterators; import org.apache.cassandra.config.CFMetaData; -import org.apache.cassandra.config.ColumnDefinition; import org.apache.cassandra.db.*; import org.apache.cassandra.db.rows.*; import org.apache.cassandra.db.filter.ColumnFilter; @@ -89,10 +86,7 @@ public class AtomicBTreePartition implements Partition private static final AtomicIntegerFieldUpdater<AtomicBTreePartition> wasteTrackerUpdater = AtomicIntegerFieldUpdater.newUpdater(AtomicBTreePartition.class, "wasteTracker"); - private static final DeletionInfo LIVE = DeletionInfo.live(); - // This is a small optimization: DeletionInfo is mutable, but we know that we will always copy it in that class, - // so we can safely alias one DeletionInfo.live() reference and avoid some allocations. - private static final Holder EMPTY = new Holder(BTree.empty(), LIVE, null, RowStats.NO_STATS); + private static final Holder EMPTY = new Holder(BTree.empty(), DeletionInfo.LIVE, Rows.EMPTY_STATIC_ROW, RowStats.NO_STATS); private final CFMetaData metadata; private final DecoratedKey partitionKey; @@ -154,146 +148,56 @@ public class AtomicBTreePartition implements Partition return row == null || (clustering == Clustering.STATIC_CLUSTERING && row.isEmpty()) ? null : row; } + private Row staticRow(Holder current, ColumnFilter columns, boolean setActiveDeletionToRow) + { + DeletionTime partitionDeletion = current.deletionInfo.getPartitionDeletion(); + if (columns.fetchedColumns().statics.isEmpty() || (current.staticRow.isEmpty() && partitionDeletion.isLive())) + return Rows.EMPTY_STATIC_ROW; + + Row row = current.staticRow.filter(columns, partitionDeletion, setActiveDeletionToRow, metadata); + return row == null ? Rows.EMPTY_STATIC_ROW : row; + } + public SearchIterator<Clustering, Row> searchIterator(final ColumnFilter columns, final boolean reversed) { // TODO: we could optimize comparison for "NativeRow" Ã la #6755 final Holder current = ref; return new SearchIterator<Clustering, Row>() { - private final SearchIterator<Clustering, MemtableRowData> rawIter = new BTreeSearchIterator<>(current.tree, metadata.comparator, !reversed); - private final MemtableRowData.ReusableRow row = allocator.newReusableRow(); - private final ReusableFilteringRow filter = new ReusableFilteringRow(columns.fetchedColumns().regulars, columns); - private final long partitionDeletion = current.deletionInfo.getPartitionDeletion().markedForDeleteAt(); + private final SearchIterator<Clustering, Row> rawIter = new BTreeSearchIterator<>(current.tree, metadata.comparator, !reversed); + private final DeletionTime partitionDeletion = current.deletionInfo.getPartitionDeletion(); public boolean hasNext() { return rawIter.hasNext(); } - public Row next(Clustering key) + public Row next(Clustering clustering) { - if (key == Clustering.STATIC_CLUSTERING) - return makeStatic(columns, current, allocator); + if (clustering == Clustering.STATIC_CLUSTERING) + return staticRow(current, columns, true); - MemtableRowData data = rawIter.next(key); - // We also need to find if there is a range tombstone covering this key - RangeTombstone rt = current.deletionInfo.rangeCovering(key); + Row row = rawIter.next(clustering); + RangeTombstone rt = current.deletionInfo.rangeCovering(clustering); - if (data == null) - { - // If we have a range tombstone but not data, "fake" the RT by return a row deletion - // corresponding to the tombstone. - if (rt != null && rt.deletionTime().markedForDeleteAt() > partitionDeletion) - return filter.setRowDeletion(rt.deletionTime()).setTo(emptyDeletedRow(key, rt.deletionTime())); - return null; - } + // A search iterator only return a row, so it doesn't allow to directly account for deletion that should apply to to row + // (the partition deletion or the deletion of a range tombstone that covers it). So if needs be, reuse the row deletion + // to carry the proper deletion on the row. + DeletionTime activeDeletion = partitionDeletion; + if (rt != null && rt.deletionTime().supersedes(activeDeletion)) + activeDeletion = rt.deletionTime(); - row.setTo(data); + if (row == null) + return activeDeletion.isLive() ? null : ArrayBackedRow.emptyDeletedRow(clustering, activeDeletion); - filter.setRowDeletion(null); - if (rt == null || rt.deletionTime().markedForDeleteAt() < partitionDeletion) - { - filter.setDeletionTimestamp(partitionDeletion); - } - else - { - filter.setDeletionTimestamp(rt.deletionTime().markedForDeleteAt()); - // If we have a range tombstone covering that row and it's bigger than the row deletion itself, then - // we replace the row deletion by the tombstone deletion as a way to return the tombstone. - if (rt.deletionTime().supersedes(row.deletion())) - filter.setRowDeletion(rt.deletionTime()); - } - - return filter.setTo(row); - } - }; - } - - private static Row emptyDeletedRow(Clustering clustering, DeletionTime deletion) - { - return new AbstractRow() - { - public Columns columns() - { - return Columns.NONE; - } - - public LivenessInfo primaryKeyLivenessInfo() - { - return LivenessInfo.NONE; - } - - public DeletionTime deletion() - { - return deletion; - } - - public boolean isEmpty() - { - return true; - } - - public boolean hasComplexDeletion() - { - return false; - } - - public Clustering clustering() - { - return clustering; - } - - public Cell getCell(ColumnDefinition c) - { - return null; - } - - public Cell getCell(ColumnDefinition c, CellPath path) - { - return null; - } - - public Iterator<Cell> getCells(ColumnDefinition c) - { - return null; - } - - public DeletionTime getDeletion(ColumnDefinition c) - { - return DeletionTime.LIVE; - } - - public Iterator<Cell> iterator() - { - return Iterators.<Cell>emptyIterator(); - } - - public SearchIterator<ColumnDefinition, ColumnData> searchIterator() - { - return new SearchIterator<ColumnDefinition, ColumnData>() - { - public boolean hasNext() - { - return false; - } - - public ColumnData next(ColumnDefinition column) - { - return null; - } - }; - } - - public Row takeAlias() - { - return this; + return row.filter(columns, activeDeletion, true, metadata); } }; } public UnfilteredRowIterator unfilteredIterator() { - return unfilteredIterator(ColumnFilter.selection(columns()), Slices.ALL, false); + return unfilteredIterator(ColumnFilter.all(metadata()), Slices.ALL, false); } public UnfilteredRowIterator unfilteredIterator(ColumnFilter selection, Slices slices, boolean reversed) @@ -309,7 +213,7 @@ public class AtomicBTreePartition implements Partition partitionKey, partitionDeletion, selection.fetchedColumns(), - makeStatic(selection, current, allocator), + staticRow(current, selection, false), reversed, current.stats) { @@ -320,189 +224,51 @@ public class AtomicBTreePartition implements Partition }; } + Holder current = ref; + Row staticRow = staticRow(current, selection, false); return slices.size() == 1 - ? new SingleSliceIterator(metadata, partitionKey, ref, selection, slices.get(0), reversed, allocator) - : new SlicesIterator(metadata, partitionKey, ref, selection, slices, reversed, allocator); + ? sliceIterator(selection, slices.get(0), reversed, current, staticRow) + : new SlicesIterator(metadata, partitionKey, selection, slices, reversed, current, staticRow); } - private static Row makeStatic(ColumnFilter selection, Holder holder, MemtableAllocator allocator) + private UnfilteredRowIterator sliceIterator(ColumnFilter selection, Slice slice, boolean reversed, Holder current, Row staticRow) { - Columns statics = selection.fetchedColumns().statics; - if (statics.isEmpty() || holder.staticRow == null) - return Rows.EMPTY_STATIC_ROW; - - return new ReusableFilteringRow(statics, selection) - .setDeletionTimestamp(holder.deletionInfo.getPartitionDeletion().markedForDeleteAt()) - .setTo(allocator.newReusableRow().setTo(holder.staticRow)); + Slice.Bound start = slice.start() == Slice.Bound.BOTTOM ? null : slice.start(); + Slice.Bound end = slice.end() == Slice.Bound.TOP ? null : slice.end(); + Iterator<Row> rowIter = BTree.slice(current.tree, metadata.comparator, start, true, end, true, !reversed); + + return new RowAndDeletionMergeIterator(metadata, + partitionKey, + current.deletionInfo.getPartitionDeletion(), + selection, + staticRow, + reversed, + current.stats, + rowIter, + current.deletionInfo.rangeIterator(slice, reversed), + true); } - private static class ReusableFilteringRow extends FilteringRow + public class SlicesIterator extends AbstractUnfilteredRowIterator { - private final Columns columns; - private final ColumnFilter selection; - private ColumnFilter.Tester tester; - private long deletionTimestamp; - - // Used by searchIterator in case the row is covered by a tombstone. - private DeletionTime rowDeletion; - - public ReusableFilteringRow(Columns columns, ColumnFilter selection) - { - this.columns = columns; - this.selection = selection; - } - - public ReusableFilteringRow setDeletionTimestamp(long timestamp) - { - this.deletionTimestamp = timestamp; - return this; - } - - public ReusableFilteringRow setRowDeletion(DeletionTime rowDeletion) - { - this.rowDeletion = rowDeletion; - return this; - } - - @Override - public DeletionTime deletion() - { - return rowDeletion == null ? super.deletion() : rowDeletion; - } - - @Override - protected boolean include(LivenessInfo info) - { - return info.timestamp() > deletionTimestamp; - } - - @Override - protected boolean include(ColumnDefinition def) - { - return columns.contains(def); - } - - @Override - protected boolean include(DeletionTime dt) - { - return dt.markedForDeleteAt() > deletionTimestamp; - } - - @Override - protected boolean include(ColumnDefinition c, DeletionTime dt) - { - return dt.markedForDeleteAt() > deletionTimestamp; - } - - @Override - protected boolean include(Cell cell) - { - return selection.includes(cell); - } - } - - private static class SingleSliceIterator extends AbstractUnfilteredRowIterator - { - private final Iterator<Unfiltered> iterator; - private final ReusableFilteringRow row; - - private SingleSliceIterator(CFMetaData metadata, - DecoratedKey key, - Holder holder, - ColumnFilter selection, - Slice slice, - boolean isReversed, - MemtableAllocator allocator) - { - super(metadata, - key, - holder.deletionInfo.getPartitionDeletion(), - selection.fetchedColumns(), - makeStatic(selection, holder, allocator), - isReversed, - holder.stats); - - Iterator<Row> rowIter = rowIter(metadata, - holder, - slice, - !isReversed, - allocator); - - this.iterator = new RowAndTombstoneMergeIterator(metadata.comparator, isReversed) - .setTo(rowIter, holder.deletionInfo.rangeIterator(slice, isReversed)); - - this.row = new ReusableFilteringRow(selection.fetchedColumns().regulars, selection) - .setDeletionTimestamp(partitionLevelDeletion.markedForDeleteAt()); - } - - private Iterator<Row> rowIter(CFMetaData metadata, - Holder holder, - Slice slice, - boolean forwards, - final MemtableAllocator allocator) - { - Slice.Bound start = slice.start() == Slice.Bound.BOTTOM ? null : slice.start(); - Slice.Bound end = slice.end() == Slice.Bound.TOP ? null : slice.end(); - final Iterator<MemtableRowData> dataIter = BTree.slice(holder.tree, metadata.comparator, start, true, end, true, forwards); - return new AbstractIterator<Row>() - { - private final MemtableRowData.ReusableRow row = allocator.newReusableRow(); - - protected Row computeNext() - { - return dataIter.hasNext() ? row.setTo(dataIter.next()) : endOfData(); - } - }; - } - - protected Unfiltered computeNext() - { - while (iterator.hasNext()) - { - Unfiltered next = iterator.next(); - if (next.kind() == Unfiltered.Kind.ROW) - { - row.setTo((Row)next); - if (!row.isEmpty()) - return row; - } - else - { - RangeTombstoneMarker marker = (RangeTombstoneMarker)next; - - long deletion = partitionLevelDeletion().markedForDeleteAt(); - if (marker.isOpen(isReverseOrder())) - deletion = Math.max(deletion, marker.openDeletionTime(isReverseOrder()).markedForDeleteAt()); - row.setDeletionTimestamp(deletion); - return marker; - } - } - return endOfData(); - } - } - - public static class SlicesIterator extends AbstractUnfilteredRowIterator - { - private final Holder holder; - private final MemtableAllocator allocator; + private final Holder current; private final ColumnFilter selection; private final Slices slices; private int idx; - private UnfilteredRowIterator currentSlice; + private Iterator<Unfiltered> currentSlice; private SlicesIterator(CFMetaData metadata, DecoratedKey key, - Holder holder, ColumnFilter selection, Slices slices, boolean isReversed, - MemtableAllocator allocator) + Holder holder, + Row staticRow) { - super(metadata, key, holder.deletionInfo.getPartitionDeletion(), selection.fetchedColumns(), makeStatic(selection, holder, allocator), isReversed, holder.stats); - this.holder = holder; + super(metadata, key, holder.deletionInfo.getPartitionDeletion(), selection.fetchedColumns(), staticRow, isReversed, holder.stats); + this.current = holder; this.selection = selection; - this.allocator = allocator; this.slices = slices; } @@ -516,13 +282,7 @@ public class AtomicBTreePartition implements Partition return endOfData(); int sliceIdx = isReverseOrder ? slices.size() - idx - 1 : idx; - currentSlice = new SingleSliceIterator(metadata, - partitionKey, - holder, - selection, - slices.get(sliceIdx), - isReverseOrder, - allocator); + currentSlice = sliceIterator(selection, slices.get(sliceIdx), isReverseOrder, current, Rows.EMPTY_STATIC_ROW); idx++; } @@ -565,7 +325,7 @@ public class AtomicBTreePartition implements Partition if (inputDeletionInfoCopy == null) inputDeletionInfoCopy = update.deletionInfo().copy(HeapAllocator.instance); - deletionInfo = current.deletionInfo.copy().add(inputDeletionInfoCopy); + deletionInfo = current.deletionInfo.mutableCopy().add(inputDeletionInfoCopy); updater.allocated(deletionInfo.unsharedHeapSize() - current.deletionInfo.unsharedHeapSize()); } else @@ -574,9 +334,9 @@ public class AtomicBTreePartition implements Partition } Row newStatic = update.staticRow(); - MemtableRowData staticRow = newStatic == Rows.EMPTY_STATIC_ROW - ? current.staticRow - : (current.staticRow == null ? updater.apply(newStatic) : updater.apply(current.staticRow, newStatic)); + Row staticRow = newStatic.isEmpty() + ? current.staticRow + : (current.staticRow.isEmpty() ? updater.apply(newStatic) : updater.apply(current.staticRow, newStatic)); Object[] tree = BTree.update(current.tree, update.metadata().comparator, update, update.rowCount(), updater); RowStats newStats = current.stats.mergeWith(update.stats()); @@ -661,10 +421,10 @@ public class AtomicBTreePartition implements Partition final DeletionInfo deletionInfo; // the btree of rows final Object[] tree; - final MemtableRowData staticRow; + final Row staticRow; final RowStats stats; - Holder(Object[] tree, DeletionInfo deletionInfo, MemtableRowData staticRow, RowStats stats) + Holder(Object[] tree, DeletionInfo deletionInfo, Row staticRow, RowStats stats) { this.tree = tree; this.deletionInfo = deletionInfo; @@ -679,7 +439,7 @@ public class AtomicBTreePartition implements Partition } // the function we provide to the btree utilities to perform any column replacements - private static final class RowUpdater implements UpdateFunction<Row, MemtableRowData> + private static final class RowUpdater implements UpdateFunction<Row, Row> { final AtomicBTreePartition updating; final MemtableAllocator allocator; @@ -687,13 +447,13 @@ public class AtomicBTreePartition implements Partition final Updater indexer; final int nowInSec; Holder ref; + Row.Builder regularBuilder; long dataSize; long heapSize; long colUpdateTimeDelta = Long.MAX_VALUE; - final MemtableRowData.ReusableRow row; final MemtableAllocator.DataReclaimer reclaimer; - final MemtableAllocator.RowAllocator rowAllocator; - List<MemtableRowData> inserted; // TODO: replace with walk of aborted BTree + List<Row> inserted; // TODO: replace with walk of aborted BTree + private RowUpdater(AtomicBTreePartition updating, MemtableAllocator allocator, OpOrder.Group writeOp, Updater indexer) { @@ -702,18 +462,25 @@ public class AtomicBTreePartition implements Partition this.writeOp = writeOp; this.indexer = indexer; this.nowInSec = FBUtilities.nowInSeconds(); - this.row = allocator.newReusableRow(); this.reclaimer = allocator.reclaimer(); - this.rowAllocator = allocator.newRowAllocator(updating.metadata(), writeOp); } - public MemtableRowData apply(Row insert) + private Row.Builder builder(Clustering clustering) { - rowAllocator.allocateNewRow(insert.clustering().size(), insert.columns(), insert.isStatic()); - insert.copyTo(rowAllocator); - MemtableRowData data = rowAllocator.allocatedRowData(); + boolean isStatic = clustering == Clustering.STATIC_CLUSTERING; + // We know we only insert/update one static per PartitionUpdate, so no point in saving the builder + if (isStatic) + return allocator.rowBuilder(updating.metadata(), writeOp, true); + + if (regularBuilder == null) + regularBuilder = allocator.rowBuilder(updating.metadata(), writeOp, false); + return regularBuilder; + } - insertIntoIndexes(insert); + public Row apply(Row insert) + { + Row data = Rows.copy(insert, builder(insert.clustering())).build(); + insertIntoIndexes(data); this.dataSize += data.dataSize(); this.heapSize += data.unsharedHeapSizeExcludingData(); @@ -723,14 +490,14 @@ public class AtomicBTreePartition implements Partition return data; } - public MemtableRowData apply(MemtableRowData existing, Row update) + public Row apply(Row existing, Row update) { Columns mergedColumns = existing.columns().mergeTo(update.columns()); - rowAllocator.allocateNewRow(update.clustering().size(), mergedColumns, update.isStatic()); - colUpdateTimeDelta = Math.min(colUpdateTimeDelta, Rows.merge(row.setTo(existing), update, mergedColumns, rowAllocator, nowInSec, indexer)); + Row.Builder builder = builder(existing.clustering()); + colUpdateTimeDelta = Math.min(colUpdateTimeDelta, Rows.merge(existing, update, mergedColumns, builder, nowInSec, indexer)); - MemtableRowData reconciled = rowAllocator.allocatedRowData(); + Row reconciled = builder.build(); dataSize += reconciled.dataSize() - existing.dataSize(); heapSize += reconciled.unsharedHeapSizeExcludingData() - existing.unsharedHeapSizeExcludingData(); @@ -749,7 +516,7 @@ public class AtomicBTreePartition implements Partition maybeIndexPrimaryKeyColumns(toInsert); Clustering clustering = toInsert.clustering(); - for (Cell cell : toInsert) + for (Cell cell : toInsert.cells()) indexer.insert(clustering, cell); } @@ -761,15 +528,15 @@ public class AtomicBTreePartition implements Partition long timestamp = row.primaryKeyLivenessInfo().timestamp(); int ttl = row.primaryKeyLivenessInfo().ttl(); - for (Cell cell : row) + for (Cell cell : row.cells()) { - long cellTimestamp = cell.livenessInfo().timestamp(); + long cellTimestamp = cell.timestamp(); if (cell.isLive(nowInSec)) { if (cellTimestamp > timestamp) { timestamp = cellTimestamp; - ttl = cell.livenessInfo().ttl(); + ttl = cell.ttl(); } } } @@ -783,19 +550,19 @@ public class AtomicBTreePartition implements Partition this.heapSize = 0; if (inserted != null) { - for (MemtableRowData row : inserted) + for (Row row : inserted) abort(row); inserted.clear(); } reclaimer.cancel(); } - protected void abort(MemtableRowData abort) + protected void abort(Row abort) { reclaimer.reclaimImmediately(abort); } - protected void discard(MemtableRowData discard) + protected void discard(Row discard) { reclaimer.reclaim(discard); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/partitions/CountingUnfilteredRowIterator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/partitions/CountingUnfilteredRowIterator.java b/src/java/org/apache/cassandra/db/partitions/CountingUnfilteredRowIterator.java index acaef5d..e5d1e75 100644 --- a/src/java/org/apache/cassandra/db/partitions/CountingUnfilteredRowIterator.java +++ b/src/java/org/apache/cassandra/db/partitions/CountingUnfilteredRowIterator.java @@ -49,10 +49,10 @@ public class CountingUnfilteredRowIterator extends WrappingUnfilteredRowIterator @Override public Unfiltered next() { - Unfiltered unfiltered = super.next(); - if (unfiltered.kind() == Unfiltered.Kind.ROW) - counter.newRow((Row) unfiltered); - return unfiltered; + Unfiltered next = super.next(); + if (next.isRow()) + counter.newRow((Row)next); + return next; } @Override http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/partitions/FilteredPartition.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/partitions/FilteredPartition.java b/src/java/org/apache/cassandra/db/partitions/FilteredPartition.java index 813654d..1cac274 100644 --- a/src/java/org/apache/cassandra/db/partitions/FilteredPartition.java +++ b/src/java/org/apache/cassandra/db/partitions/FilteredPartition.java @@ -17,21 +17,24 @@ */ package org.apache.cassandra.db.partitions; -import java.util.Iterator; +import java.util.*; import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.db.*; import org.apache.cassandra.db.rows.*; -public class FilteredPartition extends AbstractPartitionData implements Iterable<Row> +public class FilteredPartition extends AbstractThreadUnsafePartition { + private final Row staticRow; + private FilteredPartition(CFMetaData metadata, DecoratedKey partitionKey, PartitionColumns columns, - int initialRowCapacity, - boolean sortable) + Row staticRow, + List<Row> rows) { - super(metadata, partitionKey, DeletionTime.LIVE, columns, initialRowCapacity, sortable); + super(metadata, partitionKey, columns, rows); + this.staticRow = staticRow; } /** @@ -42,25 +45,43 @@ public class FilteredPartition extends AbstractPartitionData implements Iterable */ public static FilteredPartition create(RowIterator iterator) { - FilteredPartition partition = new FilteredPartition(iterator.metadata(), - iterator.partitionKey(), - iterator.columns(), - 4, - iterator.isReverseOrder()); - - partition.staticRow = iterator.staticRow().takeAlias(); + CFMetaData metadata = iterator.metadata(); + boolean reversed = iterator.isReverseOrder(); - Writer writer = partition.new Writer(true); + List<Row> rows = new ArrayList<>(); while (iterator.hasNext()) - iterator.next().copyTo(writer); + { + Unfiltered unfiltered = iterator.next(); + if (unfiltered.isRow()) + rows.add((Row)unfiltered); + } + + if (reversed) + Collections.reverse(rows); + + return new FilteredPartition(metadata, iterator.partitionKey(), iterator.columns(), iterator.staticRow(), rows); + } + + protected boolean canHaveShadowedData() + { + // We only create instances from RowIterator that don't have shadowed data (nor deletion info really) + return false; + } - // A Partition (or more precisely AbstractPartitionData) always assumes that its data is in clustering - // order. So if we've just added them in reverse clustering order, reverse them. - if (iterator.isReverseOrder()) - partition.reverse(); + public Row staticRow() + { + return staticRow; + } - return partition; + public DeletionInfo deletionInfo() + { + return DeletionInfo.LIVE; + } + + public RowStats stats() + { + return RowStats.NO_STATS; } public RowIterator rowIterator() @@ -90,7 +111,7 @@ public class FilteredPartition extends AbstractPartitionData implements Iterable public Row staticRow() { - return staticRow == null ? Rows.EMPTY_STATIC_ROW : staticRow; + return FilteredPartition.this.staticRow(); } public boolean hasNext() @@ -117,26 +138,20 @@ public class FilteredPartition extends AbstractPartitionData implements Iterable @Override public String toString() { - try (RowIterator iterator = rowIterator()) - { - StringBuilder sb = new StringBuilder(); - CFMetaData metadata = iterator.metadata(); - PartitionColumns columns = iterator.columns(); + StringBuilder sb = new StringBuilder(); - sb.append(String.format("[%s.%s] key=%s columns=%s reversed=%b", - metadata.ksName, - metadata.cfName, - metadata.getKeyValidator().getString(iterator.partitionKey().getKey()), - columns, - iterator.isReverseOrder())); + sb.append(String.format("[%s.%s] key=%s columns=%s", + metadata.ksName, + metadata.cfName, + metadata.getKeyValidator().getString(partitionKey().getKey()), + columns)); - if (iterator.staticRow() != Rows.EMPTY_STATIC_ROW) - sb.append("\n ").append(iterator.staticRow().toString(metadata)); + if (staticRow() != Rows.EMPTY_STATIC_ROW) + sb.append("\n ").append(staticRow().toString(metadata)); - while (iterator.hasNext()) - sb.append("\n ").append(iterator.next().toString(metadata)); + for (Row row : this) + sb.append("\n ").append(row.toString(metadata)); - return sb.toString(); - } + return sb.toString(); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/partitions/FilteringPartitionIterator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/partitions/FilteringPartitionIterator.java b/src/java/org/apache/cassandra/db/partitions/FilteringPartitionIterator.java deleted file mode 100644 index c40109b..0000000 --- a/src/java/org/apache/cassandra/db/partitions/FilteringPartitionIterator.java +++ /dev/null @@ -1,146 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.db.partitions; - -import org.apache.cassandra.db.*; -import org.apache.cassandra.db.rows.*; - -/** - * Abstract class to make it easier to write iterators that filter some - * parts of another iterator (used for purging tombstones and removing dropped columns). - */ -public abstract class FilteringPartitionIterator extends WrappingUnfilteredPartitionIterator -{ - private UnfilteredRowIterator next; - - protected FilteringPartitionIterator(UnfilteredPartitionIterator iter) - { - super(iter); - } - - // The filter to use for filtering row contents. Is null by default to mean no particular filtering - // but can be overriden by subclasses. Please see FilteringAtomIterator for details on how this is used. - protected FilteringRow makeRowFilter() - { - return null; - } - - // Whether or not we should bother filtering the provided rows iterator. This - // exists mainly for preformance - protected boolean shouldFilter(UnfilteredRowIterator iterator) - { - return true; - } - - protected boolean includeRangeTombstoneMarker(RangeTombstoneMarker marker) - { - return true; - } - - protected boolean includePartitionDeletion(DeletionTime dt) - { - return true; - } - - // Allows to modify the range tombstone returned. This is called *after* includeRangeTombstoneMarker has been called. - protected RangeTombstoneMarker filterRangeTombstoneMarker(RangeTombstoneMarker marker, boolean reversed) - { - return marker; - } - - // Called when a particular partition is skipped due to being empty post filtering - protected void onEmpty(DecoratedKey key) - { - } - - public boolean hasNext() - { - while (next == null && super.hasNext()) - { - UnfilteredRowIterator iterator = super.next(); - if (shouldFilter(iterator)) - { - next = new FilteringIterator(iterator); - if (!isForThrift() && next.isEmpty()) - { - onEmpty(iterator.partitionKey()); - iterator.close(); - next = null; - } - } - else - { - next = iterator; - } - } - return next != null; - } - - public UnfilteredRowIterator next() - { - UnfilteredRowIterator toReturn = next; - next = null; - return toReturn; - } - - @Override - public void close() - { - try - { - super.close(); - } - finally - { - if (next != null) - next.close(); - } - } - - private class FilteringIterator extends FilteringRowIterator - { - private FilteringIterator(UnfilteredRowIterator iterator) - { - super(iterator); - } - - @Override - protected FilteringRow makeRowFilter() - { - return FilteringPartitionIterator.this.makeRowFilter(); - } - - @Override - protected boolean includeRangeTombstoneMarker(RangeTombstoneMarker marker) - { - return FilteringPartitionIterator.this.includeRangeTombstoneMarker(marker); - } - - @Override - protected RangeTombstoneMarker filterRangeTombstoneMarker(RangeTombstoneMarker marker, boolean reversed) - { - return FilteringPartitionIterator.this.filterRangeTombstoneMarker(marker, reversed); - } - - @Override - protected boolean includePartitionDeletion(DeletionTime dt) - { - return FilteringPartitionIterator.this.includePartitionDeletion(dt); - } - } -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/partitions/PartitionStatisticsCollector.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/partitions/PartitionStatisticsCollector.java b/src/java/org/apache/cassandra/db/partitions/PartitionStatisticsCollector.java new file mode 100644 index 0000000..510b9c8 --- /dev/null +++ b/src/java/org/apache/cassandra/db/partitions/PartitionStatisticsCollector.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db.partitions; + +import org.apache.cassandra.db.*; +import org.apache.cassandra.db.rows.Cell; + +public interface PartitionStatisticsCollector +{ + public void update(LivenessInfo info); + public void update(DeletionTime deletionTime); + public void update(Cell cell); + public void updateColumnSetPerRow(long columnSetInRow); + public void updateHasLegacyCounterShards(boolean hasLegacyCounterShards); +}