Merge branch 'cassandra-2.0' into cassandra-2.1 Conflicts: CHANGES.txt src/java/org/apache/cassandra/db/ArrayBackedSortedColumns.java src/java/org/apache/cassandra/db/ColumnFamilyStore.java test/unit/org/apache/cassandra/db/ArrayBackedSortedColumnsTest.java
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/06cd494c Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/06cd494c Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/06cd494c Branch: refs/heads/cassandra-2.1 Commit: 06cd494c1496ec96886ed41ff3207847631986c9 Parents: 0c2eaa9 cc5fb19 Author: Aleksey Yeschenko <alek...@apache.org> Authored: Thu Jan 22 03:17:55 2015 +0300 Committer: Aleksey Yeschenko <alek...@apache.org> Committed: Thu Jan 22 03:17:55 2015 +0300 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../cassandra/db/ArrayBackedSortedColumns.java | 96 +++++++++++++++++-- .../apache/cassandra/db/AtomicBTreeColumns.java | 5 + .../org/apache/cassandra/db/ColumnFamily.java | 6 ++ .../apache/cassandra/db/ColumnFamilyStore.java | 7 +- .../cassandra/utils/BatchRemoveIterator.java | 32 +++++++ .../db/ArrayBackedSortedColumnsTest.java | 99 +++++++++++++++++++- 7 files changed, 236 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/06cd494c/CHANGES.txt ---------------------------------------------------------------------- diff --cc CHANGES.txt index 9cd8189,0d08cce..a94ca04 --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -1,70 -1,9 +1,71 @@@ -2.0.13: - * Round up time deltas lower than 1ms in BulkLoader (CASSANDRA-8645) +2.1.3 + * Duplicate rows returned when in clause has repeated values (CASSANDRA-6707) + * Add tooling to detect hot partitions (CASSANDRA-7974) + * Fix cassandra-stress user-mode truncation of partition generation (CASSANDRA-8608) + * Only stream from unrepaired sstables during inc repair (CASSANDRA-8267) + * Don't allow starting multiple inc repairs on the same sstables (CASSANDRA-8316) + * Invalidate prepared BATCH statements when related tables + or keyspaces are dropped (CASSANDRA-8652) + * Fix missing results in secondary index queries on collections + with ALLOW FILTERING (CASSANDRA-8421) + * Expose EstimatedHistogram metrics for range slices (CASSANDRA-8627) + * (cqlsh) Escape clqshrc passwords properly (CASSANDRA-8618) + * Fix NPE when passing wrong argument in ALTER TABLE statement (CASSANDRA-8355) + * Pig: Refactor and deprecate CqlStorage (CASSANDRA-8599) + * Don't reuse the same cleanup strategy for all sstables (CASSANDRA-8537) + * Fix case-sensitivity of index name on CREATE and DROP INDEX + statements (CASSANDRA-8365) + * Better detection/logging for corruption in compressed sstables (CASSANDRA-8192) + * Use the correct repairedAt value when closing writer (CASSANDRA-8570) + * (cqlsh) Handle a schema mismatch being detected on startup (CASSANDRA-8512) + * Properly calculate expected write size during compaction (CASSANDRA-8532) + * Invalidate affected prepared statements when a table's columns + are altered (CASSANDRA-7910) + * Stress - user defined writes should populate sequentally (CASSANDRA-8524) + * Fix regression in SSTableRewriter causing some rows to become unreadable + during compaction (CASSANDRA-8429) + * Run major compactions for repaired/unrepaired in parallel (CASSANDRA-8510) + * (cqlsh) Fix compression options in DESCRIBE TABLE output when compression + is disabled (CASSANDRA-8288) + * (cqlsh) Fix DESCRIBE output after keyspaces are altered (CASSANDRA-7623) + * Make sure we set lastCompactedKey correctly (CASSANDRA-8463) + * (cqlsh) Fix output of CONSISTENCY command (CASSANDRA-8507) + * (cqlsh) Fixed the handling of LIST statements (CASSANDRA-8370) + * Make sstablescrub check leveled manifest again (CASSANDRA-8432) + * Check first/last keys in sstable when giving out positions (CASSANDRA-8458) + * Disable mmap on Windows (CASSANDRA-6993) + * Add missing ConsistencyLevels to cassandra-stress (CASSANDRA-8253) + * Add auth support to cassandra-stress (CASSANDRA-7985) + * Fix ArrayIndexOutOfBoundsException when generating error message + for some CQL syntax errors (CASSANDRA-8455) + * Scale memtable slab allocation logarithmically (CASSANDRA-7882) + * cassandra-stress simultaneous inserts over same seed (CASSANDRA-7964) + * Reduce cassandra-stress sampling memory requirements (CASSANDRA-7926) + * Ensure memtable flush cannot expire commit log entries from its future (CASSANDRA-8383) + * Make read "defrag" async to reclaim memtables (CASSANDRA-8459) + * Remove tmplink files for offline compactions (CASSANDRA-8321) + * Reduce maxHintsInProgress (CASSANDRA-8415) + * BTree updates may call provided update function twice (CASSANDRA-8018) + * Release sstable references after anticompaction (CASSANDRA-8386) + * Handle abort() in SSTableRewriter properly (CASSANDRA-8320) + * Fix high size calculations for prepared statements (CASSANDRA-8231) + * Centralize shared executors (CASSANDRA-8055) + * Fix filtering for CONTAINS (KEY) relations on frozen collection + clustering columns when the query is restricted to a single + partition (CASSANDRA-8203) + * Do more aggressive entire-sstable TTL expiry checks (CASSANDRA-8243) + * Add more log info if readMeter is null (CASSANDRA-8238) + * add check of the system wall clock time at startup (CASSANDRA-8305) + * Support for frozen collections (CASSANDRA-7859) + * Fix overflow on histogram computation (CASSANDRA-8028) + * Have paxos reuse the timestamp generation of normal queries (CASSANDRA-7801) + * Fix incremental repair not remove parent session on remote (CASSANDRA-8291) + * Improve JBOD disk utilization (CASSANDRA-7386) + * Log failed host when preparing incremental repair (CASSANDRA-8228) + * Force config client mode in CQLSSTableWriter (CASSANDRA-8281) +Merged from 2.0: + * Add batch remove iterator to ABSC (CASSANDRA-8414) - - -2.0.12: + * Round up time deltas lower than 1ms in BulkLoader (CASSANDRA-8645) * Use more efficient slice size for querying internal secondary index tables (CASSANDRA-8550) * Fix potentially returning deleted rows with range tombstone (CASSANDRA-8558) http://git-wip-us.apache.org/repos/asf/cassandra/blob/06cd494c/src/java/org/apache/cassandra/db/ArrayBackedSortedColumns.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/ArrayBackedSortedColumns.java index b5ed8d2,8d553be..64752e3 --- a/src/java/org/apache/cassandra/db/ArrayBackedSortedColumns.java +++ b/src/java/org/apache/cassandra/db/ArrayBackedSortedColumns.java @@@ -17,42 -17,31 +17,38 @@@ */ package org.apache.cassandra.db; - import java.util.AbstractCollection; - import java.util.Arrays; - import java.util.Collection; - import java.util.Comparator; - import java.util.Iterator; -import java.nio.ByteBuffer; + import java.util.*; import com.google.common.base.Function; import com.google.common.collect.AbstractIterator; import com.google.common.collect.Iterables; -import com.google.common.collect.Lists; - import net.nicoulaj.compilecommand.annotations.Inline; import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.db.composites.CellName; +import org.apache.cassandra.db.composites.Composite; import org.apache.cassandra.db.filter.ColumnSlice; -import org.apache.cassandra.db.marshal.AbstractType; -import org.apache.cassandra.utils.Allocator; + import org.apache.cassandra.utils.BatchRemoveIterator; +import org.apache.cassandra.utils.memory.AbstractAllocator; /** - * A ColumnFamily backed by an ArrayList. + * A ColumnFamily backed by an array. * This implementation is not synchronized and should only be used when * thread-safety is not required. This implementation makes sense when the - * main operations performed are iterating over the map and adding columns + * main operations performed are iterating over the cells and adding cells * (especially if insertion is in sorted order). */ -public class ArrayBackedSortedColumns extends AbstractThreadUnsafeSortedColumns +public class ArrayBackedSortedColumns extends ColumnFamily { + private static final Cell[] EMPTY_ARRAY = new Cell[0]; + private static final int MINIMAL_CAPACITY = 10; + private final boolean reversed; - private final ArrayList<Column> columns; + + private DeletionInfo deletionInfo; + private Cell[] cells; + private int size; + private int sortedSize; + private volatile boolean isSorted; public static final ColumnFamily.Factory<ArrayBackedSortedColumns> factory = new Factory<ArrayBackedSortedColumns>() { @@@ -114,15 -80,15 +110,103 @@@ return reversed; } - private Comparator<ByteBuffer> internalComparator() ++ public BatchRemoveIterator<Cell> batchRemoveIterator() ++ { ++ maybeSortCells(); ++ ++ return new BatchRemoveIterator<Cell>() ++ { ++ private final Iterator<Cell> iter = iterator(); ++ private BitSet removedIndexes = new BitSet(size); ++ private int idx = -1; ++ private boolean shouldCallNext = false; ++ private boolean isCommitted = false; ++ private boolean removedAnything = false; ++ ++ public void commit() ++ { ++ if (isCommitted) ++ throw new IllegalStateException(); ++ isCommitted = true; ++ ++ if (!removedAnything) ++ return; ++ ++ // the lowest index both not visited and known to be not removed ++ int keepIdx = removedIndexes.nextClearBit(0); ++ // the running total of kept items ++ int resultLength = 0; ++ // start from the first not-removed cell, and shift left. ++ int removeIdx = removedIndexes.nextSetBit(keepIdx + 1); ++ while (removeIdx >= 0) ++ { ++ int length = removeIdx - keepIdx; ++ if (length > 0) ++ { ++ copy(keepIdx, resultLength, length); ++ resultLength += length; ++ } ++ keepIdx = removedIndexes.nextClearBit(removeIdx + 1); ++ if (keepIdx < 0) ++ keepIdx = size; ++ removeIdx = removedIndexes.nextSetBit(keepIdx + 1); ++ } ++ // Copy everything after the last deleted column ++ int length = size - keepIdx; ++ if (length > 0) ++ { ++ copy(keepIdx, resultLength, length); ++ resultLength += length; ++ } ++ ++ for (int i = resultLength; i < size; i++) ++ cells[i] = null; ++ ++ size = sortedSize = resultLength; ++ } ++ ++ private void copy(int src, int dst, int len) ++ { ++ // [src, src+len) and [dst, dst+len) might overlap but it's okay because we're going from left to right ++ assert dst <= src : "dst must not be greater than src"; ++ ++ if (dst < src) ++ System.arraycopy(cells, src, cells, dst, len); ++ } ++ ++ public boolean hasNext() ++ { ++ return iter.hasNext(); ++ } ++ ++ public Cell next() ++ { ++ idx++; ++ shouldCallNext = false; ++ return iter.next(); ++ } ++ ++ public void remove() ++ { ++ if (shouldCallNext) ++ throw new IllegalStateException(); ++ ++ removedIndexes.set(reversed ? size - idx - 1 : idx); ++ removedAnything = true; ++ shouldCallNext = true; ++ } ++ }; ++ } ++ + private Comparator<Composite> internalComparator() { - return reversed ? getComparator().reverseComparator : getComparator(); + return reversed ? getComparator().reverseComparator() : getComparator(); } - public Column getColumn(ByteBuffer name) + private void maybeSortCells() { - int pos = binarySearch(name); - return pos >= 0 ? columns.get(pos) : null; + if (!isSorted) + sortCells(); } /** http://git-wip-us.apache.org/repos/asf/cassandra/blob/06cd494c/src/java/org/apache/cassandra/db/AtomicBTreeColumns.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/AtomicBTreeColumns.java index dc2b5ee,0000000..47f0b85 mode 100644,000000..100644 --- a/src/java/org/apache/cassandra/db/AtomicBTreeColumns.java +++ b/src/java/org/apache/cassandra/db/AtomicBTreeColumns.java @@@ -1,559 -1,0 +1,564 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db; + +import java.util.AbstractCollection; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Comparator; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; + +import com.google.common.base.Function; +import com.google.common.base.Functions; +import com.google.common.collect.AbstractIterator; +import com.google.common.collect.Iterators; + +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.db.composites.CellName; +import org.apache.cassandra.db.composites.Composite; +import org.apache.cassandra.db.filter.ColumnSlice; +import org.apache.cassandra.utils.*; +import org.apache.cassandra.utils.btree.BTree; +import org.apache.cassandra.utils.btree.UpdateFunction; +import org.apache.cassandra.utils.concurrent.Locks; +import org.apache.cassandra.utils.concurrent.OpOrder; +import org.apache.cassandra.utils.memory.HeapAllocator; +import org.apache.cassandra.utils.memory.MemtableAllocator; +import org.apache.cassandra.utils.memory.NativePool; + +import static org.apache.cassandra.db.index.SecondaryIndexManager.Updater; + +/** + * A thread-safe and atomic ISortedColumns implementation. + * Operations (in particular addAll) on this implemenation are atomic and + * isolated (in the sense of ACID). Typically a addAll is guaranteed that no + * other thread can see the state where only parts but not all columns have + * been added. + * <p/> + * WARNING: removing element through getSortedColumns().iterator() is *not* supported + */ +public class AtomicBTreeColumns extends ColumnFamily +{ + static final long EMPTY_SIZE = ObjectSizes.measure(new AtomicBTreeColumns(CFMetaData.IndexCf, null)) + + ObjectSizes.measure(new Holder(null, null)); + + // Reserved values for wasteTracker field. These values must not be consecutive (see avoidReservedValues) + private static final int TRACKER_NEVER_WASTED = 0; + private static final int TRACKER_PESSIMISTIC_LOCKING = Integer.MAX_VALUE; + + // The granularity with which we track wasted allocation/work; we round up + private static final int ALLOCATION_GRANULARITY_BYTES = 1024; + // The number of bytes we have to waste in excess of our acceptable realtime rate of waste (defined below) + private static final long EXCESS_WASTE_BYTES = 10 * 1024 * 1024L; + private static final int EXCESS_WASTE_OFFSET = (int) (EXCESS_WASTE_BYTES / ALLOCATION_GRANULARITY_BYTES); + // Note this is a shift, because dividing a long time and then picking the low 32 bits doesn't give correct rollover behavior + private static final int CLOCK_SHIFT = 17; + // CLOCK_GRANULARITY = 1^9ns >> CLOCK_SHIFT == 132us == (1/7.63)ms + + /** + * (clock + allocation) granularity are combined to give us an acceptable (waste) allocation rate that is defined by + * the passage of real time of ALLOCATION_GRANULARITY_BYTES/CLOCK_GRANULARITY, or in this case 7.63Kb/ms, or 7.45Mb/s + * + * in wasteTracker we maintain within EXCESS_WASTE_OFFSET before the current time; whenever we waste bytes + * we increment the current value if it is within this window, and set it to the min of the window plus our waste + * otherwise. + */ + private volatile int wasteTracker = TRACKER_NEVER_WASTED; + + private static final AtomicIntegerFieldUpdater<AtomicBTreeColumns> wasteTrackerUpdater = AtomicIntegerFieldUpdater.newUpdater(AtomicBTreeColumns.class, "wasteTracker"); + + private static final Function<Cell, CellName> NAME = new Function<Cell, CellName>() + { + public CellName apply(Cell column) + { + return column.name(); + } + }; + + public static final Factory<AtomicBTreeColumns> factory = new Factory<AtomicBTreeColumns>() + { + public AtomicBTreeColumns create(CFMetaData metadata, boolean insertReversed, int initialCapacity) + { + if (insertReversed) + throw new IllegalArgumentException(); + return new AtomicBTreeColumns(metadata); + } + }; + + private static final DeletionInfo LIVE = DeletionInfo.live(); + // This is a small optimization: DeletionInfo is mutable, but we know that we will always copy it in that class, + // so we can safely alias one DeletionInfo.live() reference and avoid some allocations. + private static final Holder EMPTY = new Holder(BTree.empty(), LIVE); + + private volatile Holder ref; + + private static final AtomicReferenceFieldUpdater<AtomicBTreeColumns, Holder> refUpdater = AtomicReferenceFieldUpdater.newUpdater(AtomicBTreeColumns.class, Holder.class, "ref"); + + private AtomicBTreeColumns(CFMetaData metadata) + { + this(metadata, EMPTY); + } + + private AtomicBTreeColumns(CFMetaData metadata, Holder holder) + { + super(metadata); + this.ref = holder; + } + + public Factory getFactory() + { + return factory; + } + + public ColumnFamily cloneMe() + { + return new AtomicBTreeColumns(metadata, ref); + } + + public DeletionInfo deletionInfo() + { + return ref.deletionInfo; + } + + public void delete(DeletionTime delTime) + { + delete(new DeletionInfo(delTime)); + } + + protected void delete(RangeTombstone tombstone) + { + delete(new DeletionInfo(tombstone, getComparator())); + } + + public void delete(DeletionInfo info) + { + if (info.isLive()) + return; + + // Keeping deletion info for max markedForDeleteAt value + while (true) + { + Holder current = ref; + DeletionInfo curDelInfo = current.deletionInfo; + DeletionInfo newDelInfo = info.mayModify(curDelInfo) ? curDelInfo.copy().add(info) : curDelInfo; + if (refUpdater.compareAndSet(this, current, current.with(newDelInfo))) + break; + } + } + + public void setDeletionInfo(DeletionInfo newInfo) + { + ref = ref.with(newInfo); + } + + public void purgeTombstones(int gcBefore) + { + while (true) + { + Holder current = ref; + if (!current.deletionInfo.hasPurgeableTombstones(gcBefore)) + break; + + DeletionInfo purgedInfo = current.deletionInfo.copy(); + purgedInfo.purge(gcBefore); + if (refUpdater.compareAndSet(this, current, current.with(purgedInfo))) + break; + } + } + + /** + * This is only called by Memtable.resolve, so only AtomicBTreeColumns needs to implement it. + * + * @return the difference in size seen after merging the given columns + */ + public Pair<Long, Long> addAllWithSizeDelta(final ColumnFamily cm, MemtableAllocator allocator, OpOrder.Group writeOp, Updater indexer) + { + ColumnUpdater updater = new ColumnUpdater(this, cm.metadata, allocator, writeOp, indexer); + DeletionInfo inputDeletionInfoCopy = null; + + boolean monitorOwned = false; + try + { + if (usePessimisticLocking()) + { + Locks.monitorEnterUnsafe(this); + monitorOwned = true; + } + while (true) + { + Holder current = ref; + updater.ref = current; + updater.reset(); + + DeletionInfo deletionInfo; + if (cm.deletionInfo().mayModify(current.deletionInfo)) + { + if (inputDeletionInfoCopy == null) + inputDeletionInfoCopy = cm.deletionInfo().copy(HeapAllocator.instance); + + deletionInfo = current.deletionInfo.copy().add(inputDeletionInfoCopy); + updater.allocated(deletionInfo.unsharedHeapSize() - current.deletionInfo.unsharedHeapSize()); + } + else + { + deletionInfo = current.deletionInfo; + } + + Object[] tree = BTree.update(current.tree, metadata.comparator.columnComparator(Memtable.MEMORY_POOL instanceof NativePool), cm, cm.getColumnCount(), true, updater); + + if (tree != null && refUpdater.compareAndSet(this, current, new Holder(tree, deletionInfo))) + { + indexer.updateRowLevelIndexes(); + updater.finish(); + return Pair.create(updater.dataSize, updater.colUpdateTimeDelta); + } + else if (!monitorOwned) + { + boolean shouldLock = usePessimisticLocking(); + if (!shouldLock) + { + shouldLock = updateWastedAllocationTracker(updater.heapSize); + } + if (shouldLock) + { + Locks.monitorEnterUnsafe(this); + monitorOwned = true; + } + } + } + } + finally + { + if (monitorOwned) + Locks.monitorExitUnsafe(this); + } + } + + boolean usePessimisticLocking() + { + return wasteTracker == TRACKER_PESSIMISTIC_LOCKING; + } + + /** + * Update the wasted allocation tracker state based on newly wasted allocation information + * + * @param wastedBytes the number of bytes wasted by this thread + * @return true if the caller should now proceed with pessimistic locking because the waste limit has been reached + */ + private boolean updateWastedAllocationTracker(long wastedBytes) { + // Early check for huge allocation that exceeds the limit + if (wastedBytes < EXCESS_WASTE_BYTES) + { + // We round up to ensure work < granularity are still accounted for + int wastedAllocation = ((int) (wastedBytes + ALLOCATION_GRANULARITY_BYTES - 1)) / ALLOCATION_GRANULARITY_BYTES; + + int oldTrackerValue; + while (TRACKER_PESSIMISTIC_LOCKING != (oldTrackerValue = wasteTracker)) + { + // Note this time value has an arbitrary offset, but is a constant rate 32 bit counter (that may wrap) + int time = (int) (System.nanoTime() >>> CLOCK_SHIFT); + int delta = oldTrackerValue - time; + if (oldTrackerValue == TRACKER_NEVER_WASTED || delta >= 0 || delta < -EXCESS_WASTE_OFFSET) + delta = -EXCESS_WASTE_OFFSET; + delta += wastedAllocation; + if (delta >= 0) + break; + if (wasteTrackerUpdater.compareAndSet(this, oldTrackerValue, avoidReservedValues(time + delta))) + return false; + } + } + // We have definitely reached our waste limit so set the state if it isn't already + wasteTrackerUpdater.set(this, TRACKER_PESSIMISTIC_LOCKING); + // And tell the caller to proceed with pessimistic locking + return true; + } + + private static int avoidReservedValues(int wasteTracker) + { + if (wasteTracker == TRACKER_NEVER_WASTED || wasteTracker == TRACKER_PESSIMISTIC_LOCKING) + return wasteTracker + 1; + return wasteTracker; + } + + // no particular reason not to implement these next methods, we just haven't needed them yet + + public void addColumn(Cell column) + { + throw new UnsupportedOperationException(); + } + + public void maybeAppendColumn(Cell cell, DeletionInfo.InOrderTester tester, int gcBefore) + { + throw new UnsupportedOperationException(); + } + + public void addAll(ColumnFamily cf) + { + throw new UnsupportedOperationException(); + } + + public void clear() + { + throw new UnsupportedOperationException(); + } + + public Cell getColumn(CellName name) + { + return (Cell) BTree.find(ref.tree, asymmetricComparator(), name); + } + + private Comparator<Object> asymmetricComparator() + { + return metadata.comparator.asymmetricColumnComparator(Memtable.MEMORY_POOL instanceof NativePool); + } + + public Iterable<CellName> getColumnNames() + { + return collection(false, NAME); + } + + public Collection<Cell> getSortedColumns() + { + return collection(true, Functions.<Cell>identity()); + } + + public Collection<Cell> getReverseSortedColumns() + { + return collection(false, Functions.<Cell>identity()); + } + + private <V> Collection<V> collection(final boolean forwards, final Function<Cell, V> f) + { + final Holder ref = this.ref; + return new AbstractCollection<V>() + { + public Iterator<V> iterator() + { + return Iterators.transform(BTree.<Cell>slice(ref.tree, forwards), f); + } + + public int size() + { + return BTree.slice(ref.tree, true).count(); + } + }; + } + + public int getColumnCount() + { + return BTree.slice(ref.tree, true).count(); + } + + public boolean hasColumns() + { + return !BTree.isEmpty(ref.tree); + } + + public Iterator<Cell> iterator(ColumnSlice[] slices) + { + return slices.length == 1 + ? slice(ref.tree, asymmetricComparator(), slices[0].start, slices[0].finish, true) + : new SliceIterator(ref.tree, asymmetricComparator(), true, slices); + } + + public Iterator<Cell> reverseIterator(ColumnSlice[] slices) + { + return slices.length == 1 + ? slice(ref.tree, asymmetricComparator(), slices[0].finish, slices[0].start, false) + : new SliceIterator(ref.tree, asymmetricComparator(), false, slices); + } + + public boolean isInsertReversed() + { + return false; + } + ++ public BatchRemoveIterator<Cell> batchRemoveIterator() ++ { ++ throw new UnsupportedOperationException(); ++ } ++ + private static final class Holder + { + final DeletionInfo deletionInfo; + // the btree of columns + final Object[] tree; + + Holder(Object[] tree, DeletionInfo deletionInfo) + { + this.tree = tree; + this.deletionInfo = deletionInfo; + } + + Holder with(DeletionInfo info) + { + return new Holder(this.tree, info); + } + } + + // the function we provide to the btree utilities to perform any column replacements + private static final class ColumnUpdater implements UpdateFunction<Cell> + { + final AtomicBTreeColumns updating; + final CFMetaData metadata; + final MemtableAllocator allocator; + final OpOrder.Group writeOp; + final Updater indexer; + Holder ref; + long dataSize; + long heapSize; + long colUpdateTimeDelta = Long.MAX_VALUE; + final MemtableAllocator.DataReclaimer reclaimer; + List<Cell> inserted; // TODO: replace with walk of aborted BTree + + private ColumnUpdater(AtomicBTreeColumns updating, CFMetaData metadata, MemtableAllocator allocator, OpOrder.Group writeOp, Updater indexer) + { + this.updating = updating; + this.allocator = allocator; + this.writeOp = writeOp; + this.indexer = indexer; + this.metadata = metadata; + this.reclaimer = allocator.reclaimer(); + } + + public Cell apply(Cell insert) + { + indexer.insert(insert); + insert = insert.localCopy(metadata, allocator, writeOp); + this.dataSize += insert.cellDataSize(); + this.heapSize += insert.unsharedHeapSizeExcludingData(); + if (inserted == null) + inserted = new ArrayList<>(); + inserted.add(insert); + return insert; + } + + public Cell apply(Cell existing, Cell update) + { + Cell reconciled = existing.reconcile(update); + indexer.update(existing, reconciled); + if (existing != reconciled) + { + reconciled = reconciled.localCopy(metadata, allocator, writeOp); + dataSize += reconciled.cellDataSize() - existing.cellDataSize(); + heapSize += reconciled.unsharedHeapSizeExcludingData() - existing.unsharedHeapSizeExcludingData(); + if (inserted == null) + inserted = new ArrayList<>(); + inserted.add(reconciled); + discard(existing); + //Getting the minimum delta for an update containing multiple columns + colUpdateTimeDelta = Math.min(Math.abs(existing.timestamp() - update.timestamp()), colUpdateTimeDelta); + } + return reconciled; + } + + protected void reset() + { + this.dataSize = 0; + this.heapSize = 0; + if (inserted != null) + { + for (Cell cell : inserted) + abort(cell); + inserted.clear(); + } + reclaimer.cancel(); + } + + protected void abort(Cell abort) + { + reclaimer.reclaimImmediately(abort); + } + + protected void discard(Cell discard) + { + reclaimer.reclaim(discard); + } + + public boolean abortEarly() + { + return updating.ref != ref; + } + + public void allocated(long heapSize) + { + this.heapSize += heapSize; + } + + protected void finish() + { + allocator.onHeap().allocate(heapSize, writeOp); + reclaimer.commit(); + } + } + + private static class SliceIterator extends AbstractIterator<Cell> + { + private final Object[] btree; + private final boolean forwards; + private final Comparator<Object> comparator; + private final ColumnSlice[] slices; + + private int idx = 0; + private Iterator<Cell> currentSlice; + + SliceIterator(Object[] btree, Comparator<Object> comparator, boolean forwards, ColumnSlice[] slices) + { + this.btree = btree; + this.comparator = comparator; + this.slices = slices; + this.forwards = forwards; + } + + protected Cell computeNext() + { + while (currentSlice != null || idx < slices.length) + { + if (currentSlice == null) + { + ColumnSlice slice = slices[idx++]; + if (forwards) + currentSlice = slice(btree, comparator, slice.start, slice.finish, true); + else + currentSlice = slice(btree, comparator, slice.finish, slice.start, false); + } + + if (currentSlice.hasNext()) + return currentSlice.next(); + + currentSlice = null; + } + + return endOfData(); + } + } + + private static Iterator<Cell> slice(Object[] btree, Comparator<Object> comparator, Composite start, Composite finish, boolean forwards) + { + return BTree.slice(btree, + comparator, + start.isEmpty() ? null : start, + true, + finish.isEmpty() ? null : finish, + true, + forwards); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/06cd494c/src/java/org/apache/cassandra/db/ColumnFamily.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/ColumnFamily.java index 483ecb0,19f8c16..f21d161 --- a/src/java/org/apache/cassandra/db/ColumnFamily.java +++ b/src/java/org/apache/cassandra/db/ColumnFamily.java @@@ -514,6 -532,38 +514,12 @@@ public abstract class ColumnFamily impl return ByteBuffer.wrap(out.getData(), 0, out.getLength()); } + + /** + * @return an iterator where the removes are carried out once everything has been iterated + */ - public BatchRemoveIterator<Column> batchRemoveIterator() - { - // Default implementation is the ordinary iterator - return new BatchRemoveIterator<Column>() - { - private final Iterator<Column> iter = iterator(); - - public void commit() - { - } - - public boolean hasNext() - { - return iter.hasNext(); - } - - public Column next() - { - return iter.next(); - } - - public void remove() - { - iter.remove(); - } - }; - } ++ public abstract BatchRemoveIterator<Cell> batchRemoveIterator(); + public abstract static class Factory <T extends ColumnFamily> { /** http://git-wip-us.apache.org/repos/asf/cassandra/blob/06cd494c/src/java/org/apache/cassandra/db/ColumnFamilyStore.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/ColumnFamilyStore.java index 0c95b0e,34d3f1d..3822648 --- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java +++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java @@@ -1237,14 -951,15 +1237,14 @@@ public class ColumnFamilyStore implemen * columns that have been dropped from the schema (for CQL3 tables only). * @return the updated ColumnFamily */ - public static long removeDeletedColumnsOnly(ColumnFamily cf, int gcBefore, SecondaryIndexManager.Updater indexer) + public static ColumnFamily removeDeletedColumnsOnly(ColumnFamily cf, int gcBefore, SecondaryIndexManager.Updater indexer) { - Iterator<Cell> iter = cf.iterator(); - BatchRemoveIterator<Column> iter = cf.batchRemoveIterator(); ++ BatchRemoveIterator<Cell> iter = cf.batchRemoveIterator(); DeletionInfo.InOrderTester tester = cf.inOrderDeletionTester(); boolean hasDroppedColumns = !cf.metadata.getDroppedColumns().isEmpty(); - long removedBytes = 0; while (iter.hasNext()) { - Column c = iter.next(); + Cell c = iter.next(); // remove columns if // (a) the column itself is gcable or // (b) the column is shadowed by a CF tombstone @@@ -1253,10 -968,16 +1253,10 @@@ { iter.remove(); indexer.remove(c); - removedBytes += c.dataSize(); } } - + iter.commit(); - return removedBytes; - } - - public static long removeDeletedColumnsOnly(ColumnFamily cf, int gcBefore) - { - return removeDeletedColumnsOnly(cf, gcBefore, SecondaryIndexManager.nullUpdater); + return cf; } // returns true if @@@ -1273,7 -994,7 +1273,7 @@@ if (cf == null || cf.metadata.getDroppedColumns().isEmpty()) return; - Iterator<Cell> iter = cf.iterator(); - BatchRemoveIterator<Column> iter = cf.batchRemoveIterator(); ++ BatchRemoveIterator<Cell> iter = cf.batchRemoveIterator(); while (iter.hasNext()) if (isDroppedColumn(iter.next(), metadata)) iter.remove(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/06cd494c/test/unit/org/apache/cassandra/db/ArrayBackedSortedColumnsTest.java ---------------------------------------------------------------------- diff --cc test/unit/org/apache/cassandra/db/ArrayBackedSortedColumnsTest.java index 83a58e4,90cd70f..18851d4 --- a/test/unit/org/apache/cassandra/db/ArrayBackedSortedColumnsTest.java +++ b/test/unit/org/apache/cassandra/db/ArrayBackedSortedColumnsTest.java @@@ -25,13 -27,16 +25,16 @@@ import org.junit.Test import static org.junit.Assert.*; -import com.google.common.base.Functions; + import com.google.common.collect.Sets; + import org.apache.cassandra.SchemaLoader; import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.config.Schema; - import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.db.composites.*; +import org.apache.cassandra.db.filter.ColumnSlice; +import org.apache.cassandra.db.marshal.Int32Type; + import org.apache.cassandra.utils.BatchRemoveIterator; + import org.apache.cassandra.utils.ByteBufferUtil; -import org.apache.cassandra.db.filter.ColumnSlice; -import org.apache.cassandra.utils.HeapAllocator; public class ArrayBackedSortedColumnsTest extends SchemaLoader { @@@ -265,4 -195,95 +268,98 @@@ iter.remove(); assertTrue(!iter.hasNext()); } + + @Test(expected = IllegalStateException.class) + public void testBatchRemoveTwice() + { ++ CellNameType type = new SimpleDenseCellNameType(Int32Type.instance); + ColumnFamily map = ArrayBackedSortedColumns.factory.create(metadata(), false); - map.addColumn(new Column(ByteBufferUtil.bytes(1)), HeapAllocator.instance); - map.addColumn(new Column(ByteBufferUtil.bytes(2)), HeapAllocator.instance); ++ map.addColumn(new BufferCell(type.makeCellName(1))); ++ map.addColumn(new BufferCell(type.makeCellName(2))); + - BatchRemoveIterator<Column> batchIter = map.batchRemoveIterator(); ++ BatchRemoveIterator<Cell> batchIter = map.batchRemoveIterator(); + batchIter.next(); + batchIter.remove(); + batchIter.remove(); + } + + @Test(expected = IllegalStateException.class) + public void testBatchCommitTwice() + { ++ CellNameType type = new SimpleDenseCellNameType(Int32Type.instance); + ColumnFamily map = ArrayBackedSortedColumns.factory.create(metadata(), false); - map.addColumn(new Column(ByteBufferUtil.bytes(1)), HeapAllocator.instance); - map.addColumn(new Column(ByteBufferUtil.bytes(2)), HeapAllocator.instance); ++ map.addColumn(new BufferCell(type.makeCellName(1))); ++ map.addColumn(new BufferCell(type.makeCellName(2))); + - BatchRemoveIterator<Column> batchIter = map.batchRemoveIterator(); ++ BatchRemoveIterator<Cell> batchIter = map.batchRemoveIterator(); + batchIter.next(); + batchIter.remove(); + batchIter.commit(); + batchIter.commit(); + } + + @Test + public void testBatchRemove() + { + testBatchRemoveInternal(false); + testBatchRemoveInternal(true); + } + + public void testBatchRemoveInternal(boolean reversed) + { ++ CellNameType type = new SimpleDenseCellNameType(Int32Type.instance); + ColumnFamily map = ArrayBackedSortedColumns.factory.create(metadata(), reversed); + int[] values = new int[]{ 1, 2, 3, 5 }; + + for (int i = 0; i < values.length; ++i) - map.addColumn(new Column(ByteBufferUtil.bytes(values[reversed ? values.length - 1 - i : i])), HeapAllocator.instance); ++ map.addColumn(new BufferCell(type.makeCellName(values[reversed ? values.length - 1 - i : i]))); + - BatchRemoveIterator<Column> batchIter = map.batchRemoveIterator(); ++ BatchRemoveIterator<Cell> batchIter = map.batchRemoveIterator(); + batchIter.next(); + batchIter.remove(); + batchIter.next(); + batchIter.remove(); + - assertEquals("1st column before commit", 1, map.iterator().next().name().getInt(0)); ++ assertEquals("1st column before commit", 1, map.iterator().next().name().toByteBuffer().getInt(0)); + + batchIter.commit(); + - assertEquals("1st column after commit", 3, map.iterator().next().name().getInt(0)); ++ assertEquals("1st column after commit", 3, map.iterator().next().name().toByteBuffer().getInt(0)); + } + + @Test + public void testBatchRemoveCopy() + { + // Test delete some random columns and check the result ++ CellNameType type = new SimpleDenseCellNameType(Int32Type.instance); + ColumnFamily map = ArrayBackedSortedColumns.factory.create(metadata(), false); + int n = 127; + int[] values = new int[n]; - for (int i = 0; i < n; i++) values[i] = i; ++ for (int i = 0; i < n; i++) ++ values[i] = i; + Set<Integer> toRemove = Sets.newHashSet(3, 12, 13, 15, 58, 103, 112); + + for (int value : values) - map.addColumn(new Column(ByteBufferUtil.bytes(value)), HeapAllocator.instance); ++ map.addColumn(new BufferCell(type.makeCellName(value))); + - BatchRemoveIterator<Column> batchIter = map.batchRemoveIterator(); ++ BatchRemoveIterator<Cell> batchIter = map.batchRemoveIterator(); + while (batchIter.hasNext()) - if (toRemove.contains(batchIter.next().name().getInt(0))) ++ if (toRemove.contains(batchIter.next().name().toByteBuffer().getInt(0))) + batchIter.remove(); + + batchIter.commit(); + + int expected = 0; - + while (toRemove.contains(expected)) + expected++; + - for (Column column : map) ++ for (Cell column : map) + { - assertEquals(expected, column.name().getInt(0)); ++ assertEquals(expected, column.name().toByteBuffer().getInt(0)); + expected++; + while (toRemove.contains(expected)) + expected++; + } - + assertEquals(expected, n); + } }