http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/rows/Rows.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/rows/Rows.java b/src/java/org/apache/cassandra/db/rows/Rows.java index 76dcf60..122f7d3 100644 --- a/src/java/org/apache/cassandra/db/rows/Rows.java +++ b/src/java/org/apache/cassandra/db/rows/Rows.java @@ -20,14 +20,12 @@ package org.apache.cassandra.db.rows; import java.util.*; import com.google.common.collect.Iterators; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import com.google.common.collect.PeekingIterator; -import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.config.ColumnDefinition; import org.apache.cassandra.db.*; +import org.apache.cassandra.db.partitions.PartitionStatisticsCollector; import org.apache.cassandra.db.index.SecondaryIndexManager; -import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.SearchIterator; /** @@ -35,110 +33,198 @@ import org.apache.cassandra.utils.SearchIterator; */ public abstract class Rows { - private static final Logger logger = LoggerFactory.getLogger(Rows.class); - - private Rows() {} - - public static final Row EMPTY_STATIC_ROW = new AbstractRow() + // TODO: we could have a that in a more generic place... + private static final SearchIterator<ColumnDefinition, ColumnData> EMPTY_SEARCH_ITERATOR = new SearchIterator<ColumnDefinition, ColumnData>() { - public Columns columns() + public boolean hasNext() { - return Columns.NONE; + return false; } - public LivenessInfo primaryKeyLivenessInfo() + public ColumnData next(ColumnDefinition column) { - return LivenessInfo.NONE; + return null; } + }; - public DeletionTime deletion() - { - return DeletionTime.LIVE; - } + private Rows() {} - public boolean isEmpty() - { - return true; - } + public static final Row EMPTY_STATIC_ROW = ArrayBackedRow.emptyRow(Clustering.STATIC_CLUSTERING); - public boolean hasComplexDeletion() + public static Row.Builder copy(Row row, Row.Builder builder) + { + builder.newRow(row.clustering()); + builder.addPrimaryKeyLivenessInfo(row.primaryKeyLivenessInfo()); + builder.addRowDeletion(row.deletion()); + for (ColumnData cd : row) { - return false; + if (cd.column().isSimple()) + { + builder.addCell((Cell)cd); + } + else + { + ComplexColumnData complexData = (ComplexColumnData)cd; + builder.addComplexDeletion(complexData.column(), complexData.complexDeletion()); + for (Cell cell : complexData) + builder.addCell(cell); + } } + return builder; + } - public Clustering clustering() - { - return Clustering.STATIC_CLUSTERING; - } + /** + * Collect statistics ont a given row. + * + * @param row the row for which to collect stats. + * @param collector the stats collector. + * @return the total number of cells in {@code row}. + */ + public static int collectStats(Row row, PartitionStatisticsCollector collector) + { + assert !row.isEmpty(); - public Cell getCell(ColumnDefinition c) - { - return null; - } + collector.update(row.primaryKeyLivenessInfo()); + collector.update(row.deletion()); - public Cell getCell(ColumnDefinition c, CellPath path) + int columnCount = 0; + int cellCount = 0; + for (ColumnData cd : row) { - return null; - } + if (cd.column().isSimple()) + { + ++columnCount; + ++cellCount; + Cells.collectStats((Cell)cd, collector); + } + else + { + ComplexColumnData complexData = (ComplexColumnData)cd; + collector.update(complexData.complexDeletion()); + if (complexData.hasCells()) + { + ++columnCount; + for (Cell cell : complexData) + { + ++cellCount; + Cells.collectStats(cell, collector); + } + } + } - public Iterator<Cell> getCells(ColumnDefinition c) - { - return null; } + collector.updateColumnSetPerRow(columnCount); + return cellCount; + } - public DeletionTime getDeletion(ColumnDefinition c) + /** + * Given the result ({@code merged}) of merging multiple {@code inputs}, signals the difference between + * each input and {@code merged} to {@code diffListener}. + * + * @param merged the result of merging {@code inputs}. + * @param columns a superset of all the columns in any of {@code merged}/{@code inputs}. + * @param inputs the inputs whose merge yielded {@code merged}. + * @param diffListener the listener to which to signal the differences between the inputs and the merged + * result. + */ + public static void diff(Row merged, Columns columns, Row[] inputs, RowDiffListener diffListener) + { + Clustering clustering = merged.clustering(); + LivenessInfo mergedInfo = merged.primaryKeyLivenessInfo().isEmpty() ? null : merged.primaryKeyLivenessInfo(); + DeletionTime mergedDeletion = merged.deletion().isLive() ? null : merged.deletion(); + for (int i = 0; i < inputs.length; i++) { - return DeletionTime.LIVE; + Row input = inputs[i]; + LivenessInfo inputInfo = input == null || input.primaryKeyLivenessInfo().isEmpty() ? null : input.primaryKeyLivenessInfo(); + DeletionTime inputDeletion = input == null || input.deletion().isLive() ? null : input.deletion(); + + if (mergedInfo != null || inputInfo != null) + diffListener.onPrimaryKeyLivenessInfo(i, clustering, mergedInfo, inputInfo); + if (mergedDeletion != null || inputDeletion != null) + diffListener.onDeletion(i, clustering, mergedDeletion, inputDeletion); } - public Iterator<Cell> iterator() + SearchIterator<ColumnDefinition, ColumnData> mergedIterator = merged.searchIterator(); + List<SearchIterator<ColumnDefinition, ColumnData>> inputIterators = new ArrayList<>(inputs.length); + + for (Row row : inputs) + inputIterators.add(row == null ? EMPTY_SEARCH_ITERATOR : row.searchIterator()); + + Iterator<ColumnDefinition> simpleColumns = columns.simpleColumns(); + while (simpleColumns.hasNext()) { - return Iterators.<Cell>emptyIterator(); + ColumnDefinition column = simpleColumns.next(); + Cell mergedCell = (Cell)mergedIterator.next(column); + for (int i = 0; i < inputs.length; i++) + { + Cell inputCell = (Cell)inputIterators.get(i).next(column); + if (mergedCell != null || inputCell != null) + diffListener.onCell(i, clustering, mergedCell, inputCell); + } } - public SearchIterator<ColumnDefinition, ColumnData> searchIterator() + Iterator<ColumnDefinition> complexColumns = columns.complexColumns(); + while (complexColumns.hasNext()) { - return new SearchIterator<ColumnDefinition, ColumnData>() + ColumnDefinition column = complexColumns.next(); + ComplexColumnData mergedData = (ComplexColumnData)mergedIterator.next(column); + // Doing one input at a time is not the most efficient, but it's a lot simpler for now + for (int i = 0; i < inputs.length; i++) { - public boolean hasNext() + ComplexColumnData inputData = (ComplexColumnData)inputIterators.get(i).next(column); + if (mergedData == null) { - return false; + if (inputData == null) + continue; + + // Everything in inputData has been shadowed + if (!inputData.complexDeletion().isLive()) + diffListener.onComplexDeletion(i, clustering, column, null, inputData.complexDeletion()); + for (Cell inputCell : inputData) + diffListener.onCell(i, clustering, null, inputCell); } - - public ColumnData next(ColumnDefinition column) + else if (inputData == null) { - return null; + // Everything in inputData is new + if (!mergedData.complexDeletion().isLive()) + diffListener.onComplexDeletion(i, clustering, column, mergedData.complexDeletion(), null); + for (Cell mergedCell : mergedData) + diffListener.onCell(i, clustering, mergedCell, null); } - }; - } - - public Kind kind() - { - return Unfiltered.Kind.ROW; - } - - public Row takeAlias() - { - return this; + else + { + PeekingIterator<Cell> mergedCells = Iterators.peekingIterator(mergedData.iterator()); + PeekingIterator<Cell> inputCells = Iterators.peekingIterator(inputData.iterator()); + while (mergedCells.hasNext() && inputCells.hasNext()) + { + int cmp = column.cellPathComparator().compare(mergedCells.peek().path(), inputCells.peek().path()); + if (cmp == 0) + diffListener.onCell(i, clustering, mergedCells.next(), inputCells.next()); + else if (cmp < 0) + diffListener.onCell(i, clustering, mergedCells.next(), null); + else // cmp > 0 + diffListener.onCell(i, clustering, null, inputCells.next()); + } + while (mergedCells.hasNext()) + diffListener.onCell(i, clustering, mergedCells.next(), null); + while (inputCells.hasNext()) + diffListener.onCell(i, clustering, null, inputCells.next()); + } + } } - }; - - public interface SimpleMergeListener - { - public void onAdded(Cell newCell); - public void onRemoved(Cell removedCell); - public void onUpdated(Cell existingCell, Cell updatedCell); } - public static void writeClustering(Clustering clustering, Row.Writer writer) + public static Row merge(Row row1, Row row2, int nowInSec) { - for (int i = 0; i < clustering.size(); i++) - writer.writeClusteringValue(clustering.get(i)); + Columns mergedColumns = row1.columns().mergeTo(row2.columns()); + Row.Builder builder = ArrayBackedRow.sortedBuilder(mergedColumns); + merge(row1, row2, mergedColumns, builder, nowInSec, SecondaryIndexManager.nullUpdater); + return builder.build(); } - public static void merge(Row row1, Row row2, Columns mergedColumns, Row.Writer writer, int nowInSec) + public static void merge(Row row1, Row row2, Columns mergedColumns, Row.Builder builder, int nowInSec) { - merge(row1, row2, mergedColumns, writer, nowInSec, SecondaryIndexManager.nullUpdater); + merge(row1, row2, mergedColumns, builder, nowInSec, SecondaryIndexManager.nullUpdater); } // Merge rows in memtable @@ -146,26 +232,26 @@ public abstract class Rows public static long merge(Row existing, Row update, Columns mergedColumns, - Row.Writer writer, + Row.Builder builder, int nowInSec, SecondaryIndexManager.Updater indexUpdater) { Clustering clustering = existing.clustering(); - writeClustering(clustering, writer); + builder.newRow(clustering); LivenessInfo existingInfo = existing.primaryKeyLivenessInfo(); LivenessInfo updateInfo = update.primaryKeyLivenessInfo(); - LivenessInfo mergedInfo = existingInfo.mergeWith(updateInfo); + LivenessInfo mergedInfo = existingInfo.supersedes(updateInfo) ? existingInfo : updateInfo; long timeDelta = Math.abs(existingInfo.timestamp() - mergedInfo.timestamp()); DeletionTime deletion = existing.deletion().supersedes(update.deletion()) ? existing.deletion() : update.deletion(); if (deletion.deletes(mergedInfo)) - mergedInfo = LivenessInfo.NONE; + mergedInfo = LivenessInfo.EMPTY; - writer.writePartitionKeyLivenessInfo(mergedInfo); - writer.writeRowDeletion(deletion); + builder.addPrimaryKeyLivenessInfo(mergedInfo); + builder.addRowDeletion(deletion); indexUpdater.maybeIndex(clustering, mergedInfo.timestamp(), mergedInfo.ttl(), deletion); @@ -178,7 +264,7 @@ public abstract class Rows existingCell, updateCell, deletion, - writer, + builder, nowInSec, indexUpdater)); } @@ -186,20 +272,22 @@ public abstract class Rows for (int i = 0; i < mergedColumns.complexColumnCount(); i++) { ColumnDefinition c = mergedColumns.getComplex(i); - DeletionTime existingDt = existing.getDeletion(c); - DeletionTime updateDt = update.getDeletion(c); + ComplexColumnData existingData = existing.getComplexColumnData(c); + ComplexColumnData updateData = update.getComplexColumnData(c); + + DeletionTime existingDt = existingData == null ? DeletionTime.LIVE : existingData.complexDeletion(); + DeletionTime updateDt = updateData == null ? DeletionTime.LIVE : updateData.complexDeletion(); DeletionTime maxDt = existingDt.supersedes(updateDt) ? existingDt : updateDt; if (maxDt.supersedes(deletion)) - writer.writeComplexDeletion(c, maxDt); + builder.addComplexDeletion(c, maxDt); else maxDt = deletion; - Iterator<Cell> existingCells = existing.getCells(c); - Iterator<Cell> updateCells = update.getCells(c); - timeDelta = Math.min(timeDelta, Cells.reconcileComplex(clustering, c, existingCells, updateCells, maxDt, writer, nowInSec, indexUpdater)); + Iterator<Cell> existingCells = existingData == null ? null : existingData.iterator(); + Iterator<Cell> updateCells = updateData == null ? null : updateData.iterator(); + timeDelta = Math.min(timeDelta, Cells.reconcileComplex(clustering, c, existingCells, updateCells, maxDt, builder, nowInSec, indexUpdater)); } - writer.endOfRow(); return timeDelta; } }
http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/rows/SerializationHelper.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/rows/SerializationHelper.java b/src/java/org/apache/cassandra/db/rows/SerializationHelper.java index 56b993c..6b4bc2e 100644 --- a/src/java/org/apache/cassandra/db/rows/SerializationHelper.java +++ b/src/java/org/apache/cassandra/db/rows/SerializationHelper.java @@ -18,12 +18,13 @@ package org.apache.cassandra.db.rows; import java.nio.ByteBuffer; +import java.util.*; +import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.config.ColumnDefinition; import org.apache.cassandra.db.*; import org.apache.cassandra.db.context.CounterContext; import org.apache.cassandra.db.filter.ColumnFilter; -import org.apache.cassandra.utils.ByteBufferUtil; public class SerializationHelper { @@ -38,100 +39,90 @@ public class SerializationHelper * when we must ensure that deserializing and reserializing the * result yield the exact same bytes. Streaming uses this. */ - public static enum Flag + public enum Flag { - LOCAL, FROM_REMOTE, PRESERVE_SIZE; + LOCAL, FROM_REMOTE, PRESERVE_SIZE } private final Flag flag; public final int version; - private final ReusableLivenessInfo livenessInfo = new ReusableLivenessInfo(); - - // The currently read row liveness infos (timestamp, ttl and localDeletionTime). - private long rowTimestamp; - private int rowTTL; - private int rowLocalDeletionTime; - private final ColumnFilter columnsToFetch; private ColumnFilter.Tester tester; - public SerializationHelper(int version, Flag flag, ColumnFilter columnsToFetch) + private final Map<ByteBuffer, CFMetaData.DroppedColumn> droppedColumns; + private CFMetaData.DroppedColumn currentDroppedComplex; + + + public SerializationHelper(CFMetaData metadata, int version, Flag flag, ColumnFilter columnsToFetch) { this.flag = flag; this.version = version; this.columnsToFetch = columnsToFetch; + this.droppedColumns = metadata.getDroppedColumns(); } - public SerializationHelper(int version, Flag flag) + public SerializationHelper(CFMetaData metadata, int version, Flag flag) { - this(version, flag, null); + this(metadata, version, flag, null); } - public void writePartitionKeyLivenessInfo(Row.Writer writer, long timestamp, int ttl, int localDeletionTime) + public Columns fetchedStaticColumns(SerializationHeader header) { - livenessInfo.setTo(timestamp, ttl, localDeletionTime); - writer.writePartitionKeyLivenessInfo(livenessInfo); - - rowTimestamp = timestamp; - rowTTL = ttl; - rowLocalDeletionTime = localDeletionTime; + return columnsToFetch == null ? header.columns().statics : columnsToFetch.fetchedColumns().statics; } - public long getRowTimestamp() + public Columns fetchedRegularColumns(SerializationHeader header) { - return rowTimestamp; + return columnsToFetch == null ? header.columns().regulars : columnsToFetch.fetchedColumns().regulars; } - public int getRowTTL() + public boolean includes(ColumnDefinition column) { - return rowTTL; + return columnsToFetch == null || columnsToFetch.includes(column); } - public int getRowLocalDeletionTime() + public boolean includes(CellPath path) { - return rowLocalDeletionTime; + return path == null || tester == null || tester.includes(path); } - public boolean includes(ColumnDefinition column) + public boolean canSkipValue(ColumnDefinition column) { - return columnsToFetch == null || columnsToFetch.includes(column); + return columnsToFetch != null && columnsToFetch.canSkipValue(column); } - public boolean canSkipValue(ColumnDefinition column) + public boolean canSkipValue(CellPath path) { - return columnsToFetch != null && columnsToFetch.canSkipValue(column); + return path != null && tester != null && tester.canSkipValue(path); } public void startOfComplexColumn(ColumnDefinition column) { this.tester = columnsToFetch == null ? null : columnsToFetch.newTester(column); + this.currentDroppedComplex = droppedColumns.get(column.name.bytes); } - public void endOfComplexColumn(ColumnDefinition column) + public void endOfComplexColumn() { this.tester = null; } - public void writeCell(Row.Writer writer, - ColumnDefinition column, - boolean isCounter, - ByteBuffer value, - long timestamp, - int localDelTime, - int ttl, - CellPath path) + public boolean isDropped(Cell cell, boolean isComplex) { - livenessInfo.setTo(timestamp, ttl, localDelTime); + CFMetaData.DroppedColumn dropped = isComplex ? currentDroppedComplex : droppedColumns.get(cell.column().name.bytes); + return dropped != null && cell.timestamp() <= dropped.droppedTime; + } - if (isCounter && ((flag == Flag.FROM_REMOTE || (flag == Flag.LOCAL && CounterContext.instance().shouldClearLocal(value))))) - value = CounterContext.instance().clearAllLocal(value); + public boolean isDroppedComplexDeletion(DeletionTime complexDeletion) + { + return currentDroppedComplex != null && complexDeletion.markedForDeleteAt() <= currentDroppedComplex.droppedTime; + } - if (!column.isComplex() || tester == null || tester.includes(path)) - { - if (tester != null && tester.canSkipValue(path)) - value = ByteBufferUtil.EMPTY_BYTE_BUFFER; - writer.writeCell(column, isCounter, value, livenessInfo, path); - } + public ByteBuffer maybeClearCounterValue(ByteBuffer value) + { + return flag == Flag.FROM_REMOTE || (flag == Flag.LOCAL && CounterContext.instance().shouldClearLocal(value)) + ? CounterContext.instance().clearAllLocal(value) + : value; } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/rows/SimpleRowDataBlock.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/rows/SimpleRowDataBlock.java b/src/java/org/apache/cassandra/db/rows/SimpleRowDataBlock.java deleted file mode 100644 index 08f37fd..0000000 --- a/src/java/org/apache/cassandra/db/rows/SimpleRowDataBlock.java +++ /dev/null @@ -1,188 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.db.rows; - -import java.nio.ByteBuffer; - -import com.google.common.collect.UnmodifiableIterator; - -import org.apache.cassandra.config.ColumnDefinition; -import org.apache.cassandra.db.*; -import org.apache.cassandra.utils.ObjectSizes; - -/** - * Holds cells data for the simple columns of one or more rows. - * <p> - * In practice, a {@code SimpleRowDataBlock} contains a single {@code CellData} "array" and - * the (simple) columns for which the {@code SimplerowDataBlock} has data for. The cell for - * a row i and a column c is stored in the {@code CellData} at index 'i * index(c)'. - * <p> - * This does mean that we store cells in a "dense" way: if column doesn't have a cell for a - * given row, the correspond index in the cell data array will simple have a {@code null} value. - * We might want to switch to a more sparse encoding in the future but we keep it simple for - * now (having a sparse encoding make things a tad more complex because we need to be able to - * swap the cells for 2 given rows as seen in ComplexRowDataBlock). - */ -public class SimpleRowDataBlock -{ - private static final long EMPTY_SIZE = ObjectSizes.measure(new SimpleRowDataBlock(Columns.NONE, 0, false)); - - final Columns columns; - final CellData data; - - public SimpleRowDataBlock(Columns columns, int rows, boolean isCounter) - { - this.columns = columns; - this.data = new CellData(rows * columns.simpleColumnCount(), isCounter); - } - - public Columns columns() - { - return columns; - } - - // Swap row i and j - public void swap(int i, int j) - { - int s = columns.simpleColumnCount(); - for (int k = 0; k < s; k++) - data.swapCell(i * s + k, j * s + k); - } - - // Merge row i into j - public void merge(int i, int j, int nowInSec) - { - int s = columns.simpleColumnCount(); - for (int k = 0; k < s; k++) - data.mergeCell(i * s + k, j * s + k, nowInSec); - } - - // Move row i into j - public void move(int i, int j) - { - int s = columns.simpleColumnCount(); - for (int k = 0; k < s; k++) - data.moveCell(i * s + k, j * s + k); - } - - public long unsharedHeapSizeExcludingData() - { - return EMPTY_SIZE + data.unsharedHeapSizeExcludingData(); - } - - public int dataSize() - { - return data.dataSize(); - } - - public CellWriter cellWriter(boolean inOrderCells) - { - return new CellWriter(inOrderCells); - } - - public static CellData.ReusableCell reusableCell() - { - return new CellData.ReusableCell(); - } - - public static ReusableIterator reusableIterator() - { - return new ReusableIterator(); - } - - public void clear() - { - data.clear(); - } - - static class ReusableIterator extends UnmodifiableIterator<Cell> - { - private SimpleRowDataBlock dataBlock; - private final CellData.ReusableCell cell = new CellData.ReusableCell(); - - private int base; - private int column; - - private ReusableIterator() - { - } - - public ReusableIterator setTo(SimpleRowDataBlock dataBlock, int row) - { - this.dataBlock = dataBlock; - this.base = dataBlock == null ? -1 : row * dataBlock.columns.simpleColumnCount(); - this.column = 0; - return this; - } - - public boolean hasNext() - { - if (dataBlock == null) - return false; - - int columnCount = dataBlock.columns.simpleColumnCount(); - // iterate over column until we find one with data - while (column < columnCount && !dataBlock.data.hasCell(base + column)) - ++column; - - return column < columnCount; - } - - public Cell next() - { - cell.setTo(dataBlock.data, dataBlock.columns.getSimple(column), base + column); - ++column; - return cell; - } - } - - public class CellWriter - { - private final boolean inOrderCells; - - private int base; - private int lastColumnIdx; - - public CellWriter(boolean inOrderCells) - { - this.inOrderCells = inOrderCells; - } - - public void addCell(ColumnDefinition column, ByteBuffer value, LivenessInfo info) - { - int fromIdx = inOrderCells ? lastColumnIdx : 0; - lastColumnIdx = columns.simpleIdx(column, fromIdx); - assert lastColumnIdx >= 0 : "Cannot find column " + column.name + " in " + columns + " from " + fromIdx; - int idx = base + lastColumnIdx; - data.setCell(idx, value, info); - } - - public void reset() - { - base = 0; - lastColumnIdx = 0; - data.clear(); - } - - public void endOfRow() - { - base += columns.simpleColumnCount(); - lastColumnIdx = 0; - } - } -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/rows/StaticRow.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/rows/StaticRow.java b/src/java/org/apache/cassandra/db/rows/StaticRow.java deleted file mode 100644 index 2ad9fb4..0000000 --- a/src/java/org/apache/cassandra/db/rows/StaticRow.java +++ /dev/null @@ -1,193 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.db.rows; - -import java.nio.ByteBuffer; -import java.util.Iterator; - -import org.apache.cassandra.db.*; - -import org.apache.cassandra.config.ColumnDefinition; -import org.apache.cassandra.utils.SearchIterator; - -public class StaticRow extends AbstractRow -{ - private final DeletionTime deletion; - private final RowDataBlock data; - - private StaticRow(DeletionTime deletion, RowDataBlock data) - { - this.deletion = deletion.takeAlias(); - this.data = data; - } - - public Columns columns() - { - return data.columns(); - } - - public Cell getCell(ColumnDefinition c) - { - assert !c.isComplex(); - if (data.simpleData == null) - return null; - - int idx = columns().simpleIdx(c, 0); - if (idx < 0) - return null; - - return SimpleRowDataBlock.reusableCell().setTo(data.simpleData.data, c, idx); - } - - public Cell getCell(ColumnDefinition c, CellPath path) - { - assert c.isComplex(); - - ComplexRowDataBlock dataBlock = data.complexData; - if (dataBlock == null) - return null; - - int idx = dataBlock.cellIdx(0, c, path); - if (idx < 0) - return null; - - return SimpleRowDataBlock.reusableCell().setTo(dataBlock.cellData(0), c, idx); - } - - public Iterator<Cell> getCells(ColumnDefinition c) - { - assert c.isComplex(); - return ComplexRowDataBlock.reusableComplexCells().setTo(data.complexData, 0, c); - } - - public boolean hasComplexDeletion() - { - return data.hasComplexDeletion(0); - } - - public DeletionTime getDeletion(ColumnDefinition c) - { - assert c.isComplex(); - if (data.complexData == null) - return DeletionTime.LIVE; - - int idx = data.complexData.complexDeletionIdx(0, c); - return idx < 0 - ? DeletionTime.LIVE - : ComplexRowDataBlock.complexDeletionCursor().setTo(data.complexData.complexDelTimes, idx); - } - - public Iterator<Cell> iterator() - { - return RowDataBlock.reusableIterator().setTo(data, 0); - } - - public SearchIterator<ColumnDefinition, ColumnData> searchIterator() - { - return new SearchIterator<ColumnDefinition, ColumnData>() - { - private int simpleIdx = 0; - - public boolean hasNext() - { - // TODO: we can do better, but we expect users to no rely on this anyway - return true; - } - - public ColumnData next(ColumnDefinition column) - { - if (column.isComplex()) - { - // TODO: this is sub-optimal - - Iterator<Cell> cells = getCells(column); - return cells == null ? null : new ColumnData(column, null, cells, getDeletion(column)); - } - else - { - simpleIdx = columns().simpleIdx(column, simpleIdx); - assert simpleIdx >= 0; - - Cell cell = SimpleRowDataBlock.reusableCell().setTo(data.simpleData.data, column, simpleIdx); - ++simpleIdx; - return cell == null ? null : new ColumnData(column, cell, null, null); - } - } - }; - } - - public Row takeAlias() - { - return this; - } - - public Clustering clustering() - { - return Clustering.STATIC_CLUSTERING; - } - - public LivenessInfo primaryKeyLivenessInfo() - { - return LivenessInfo.NONE; - } - - public DeletionTime deletion() - { - return deletion; - } - - public static Builder builder(Columns columns, boolean inOrderCells, boolean isCounter) - { - return new Builder(columns, inOrderCells, isCounter); - } - - public static class Builder extends RowDataBlock.Writer - { - private final RowDataBlock data; - private DeletionTime deletion = DeletionTime.LIVE; - - public Builder(Columns columns, boolean inOrderCells, boolean isCounter) - { - super(inOrderCells); - this.data = new RowDataBlock(columns, 1, false, isCounter); - updateWriter(data); - } - - public void writeClusteringValue(ByteBuffer buffer) - { - throw new UnsupportedOperationException(); - } - - public void writePartitionKeyLivenessInfo(LivenessInfo info) - { - // Static rows are special and don't really have an existence unless they have live cells, - // so we shouldn't have any partition key liveness info. - assert info.equals(LivenessInfo.NONE); - } - - public void writeRowDeletion(DeletionTime deletion) - { - this.deletion = deletion; - } - - public StaticRow build() - { - return new StaticRow(deletion, data); - } - } -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/rows/TombstoneFilteringRow.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/rows/TombstoneFilteringRow.java b/src/java/org/apache/cassandra/db/rows/TombstoneFilteringRow.java deleted file mode 100644 index a6167ea..0000000 --- a/src/java/org/apache/cassandra/db/rows/TombstoneFilteringRow.java +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.db.rows; - -import org.apache.cassandra.config.ColumnDefinition; -import org.apache.cassandra.db.*; - -public class TombstoneFilteringRow extends FilteringRow -{ - private final int nowInSec; - - public TombstoneFilteringRow(int nowInSec) - { - this.nowInSec = nowInSec; - } - - @Override - protected boolean include (LivenessInfo info) - { - return info.isLive(nowInSec); - } - - @Override - protected boolean include(DeletionTime dt) - { - return false; - } - - @Override - protected boolean include(Cell cell) - { - return cell.isLive(nowInSec); - } - - @Override - protected boolean include(ColumnDefinition c, DeletionTime dt) - { - return false; - } -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/rows/Unfiltered.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/rows/Unfiltered.java b/src/java/org/apache/cassandra/db/rows/Unfiltered.java index b1692e3..ba03741 100644 --- a/src/java/org/apache/cassandra/db/rows/Unfiltered.java +++ b/src/java/org/apache/cassandra/db/rows/Unfiltered.java @@ -57,4 +57,14 @@ public interface Unfiltered extends Clusterable public String toString(CFMetaData metadata); public String toString(CFMetaData metadata, boolean fullDetails); + + default boolean isRow() + { + return kind() == Kind.ROW; + } + + default boolean isRangeTombstoneMarker() + { + return kind() == Kind.RANGE_TOMBSTONE_MARKER; + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorSerializer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorSerializer.java b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorSerializer.java index 8abd228..129ed50 100644 --- a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorSerializer.java +++ b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIteratorSerializer.java @@ -17,7 +17,6 @@ */ package org.apache.cassandra.db.rows; -import java.io.DataInput; import java.io.IOException; import java.io.IOError; @@ -30,7 +29,6 @@ import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.utils.ByteBufferUtil; -import org.apache.cassandra.utils.FBUtilities; /** * Serialize/Deserialize an unfiltered row iterator. @@ -38,7 +36,7 @@ import org.apache.cassandra.utils.FBUtilities; * The serialization is composed of a header, follows by the rows and range tombstones of the iterator serialized * until we read the end of the partition (see UnfilteredSerializer for details). The header itself * is: - * <cfid><key><flags><s_header>[<partition_deletion>][<static_row>] + * <cfid><key><flags><s_header>[<partition_deletion>][<static_row>][<row_estimate>] * where: * <cfid> is the table cfid. * <key> is the partition key. @@ -49,23 +47,17 @@ import org.apache.cassandra.utils.FBUtilities; * - has partition deletion: whether or not there is a <partition_deletion> following * - has static row: whether or not there is a <static_row> following * - has row estimate: whether or not there is a <row_estimate> following - * <s_header> is the SerializationHeader. More precisely it's - * <min_timetamp><min_localDelTime><min_ttl>[<static_columns>]<columns> - * where: - * - <min_timestamp> is the base timestamp used for delta-encoding timestamps - * - <min_localDelTime> is the base localDeletionTime used for delta-encoding local deletion times - * - <min_ttl> is the base localDeletionTime used for delta-encoding ttls - * - <static_columns> is the static columns if a static row is present. It's - * the number of columns as an unsigned short, followed by the column names. - * - <columns> is the columns of the rows of the iterator. It's serialized as <static_columns>. + * <s_header> is the {@code SerializationHeader}. It contains in particular the columns contains in the serialized + * iterator as well as other information necessary to decoding the serialized rows + * (see {@code SerializationHeader.Serializer for details}). * <partition_deletion> is the deletion time for the partition (delta-encoded) * <static_row> is the static row for this partition as serialized by UnfilteredSerializer. - * <row_estimate> is the (potentially estimated) number of rows serialized. This is only use for - * the purpose of some sizing on the receiving end and should not be relied upon too strongly. + * <row_estimate> is the (potentially estimated) number of rows serialized. This is only used for + * the purpose of sizing on the receiving end and should not be relied upon too strongly. * - * !!! Please note that the serialized value depends on the schema and as such should not be used as is if - * it might be deserialized after the schema as changed !!! - * TODO: we should add a flag to include the relevant metadata in the header for commit log etc..... + * Please note that the format described above is the on-wire format. On-disk, the format is basically the + * same, but the header is written once per sstable, not once per-partition. Further, the actual row and + * range tombstones are not written using this class, but rather by {@link ColumnIndex}. */ public class UnfilteredRowIteratorSerializer { @@ -79,11 +71,13 @@ public class UnfilteredRowIteratorSerializer public static final UnfilteredRowIteratorSerializer serializer = new UnfilteredRowIteratorSerializer(); + // Should only be used for the on-wire format. public void serialize(UnfilteredRowIterator iterator, DataOutputPlus out, int version) throws IOException { serialize(iterator, out, version, -1); } + // Should only be used for the on-wire format. public void serialize(UnfilteredRowIterator iterator, DataOutputPlus out, int version, int rowEstimate) throws IOException { SerializationHeader header = new SerializationHeader(iterator.metadata(), @@ -92,6 +86,7 @@ public class UnfilteredRowIteratorSerializer serialize(iterator, out, header, version, rowEstimate); } + // Should only be used for the on-wire format. public void serialize(UnfilteredRowIterator iterator, DataOutputPlus out, SerializationHeader header, int version, int rowEstimate) throws IOException { CFMetaData.serializer.serialize(iterator.metadata(), out, version); @@ -129,7 +124,7 @@ public class UnfilteredRowIteratorSerializer UnfilteredSerializer.serializer.serialize(staticRow, header, out, version); if (rowEstimate >= 0) - out.writeInt(rowEstimate); + out.writeVInt(rowEstimate); while (iterator.hasNext()) UnfilteredSerializer.serializer.serialize(iterator.next(), header, out, version); @@ -137,7 +132,7 @@ public class UnfilteredRowIteratorSerializer } // Please note that this consume the iterator, and as such should not be called unless we have a simple way to - // recreate an iterator for both serialize and serializedSize, which is mostly only PartitionUpdate + // recreate an iterator for both serialize and serializedSize, which is mostly only PartitionUpdate/ArrayBackedCachedPartition. public long serializedSize(UnfilteredRowIterator iterator, int version, int rowEstimate) { SerializationHeader header = new SerializationHeader(iterator.metadata(), @@ -166,7 +161,7 @@ public class UnfilteredRowIteratorSerializer size += UnfilteredSerializer.serializer.serializedSize(staticRow, header, version); if (rowEstimate >= 0) - size += TypeSizes.sizeof(rowEstimate); + size += TypeSizes.sizeofVInt(rowEstimate); while (iterator.hasNext()) size += UnfilteredSerializer.serializer.serializedSize(iterator.next(), header, version); @@ -197,41 +192,29 @@ public class UnfilteredRowIteratorSerializer Row staticRow = Rows.EMPTY_STATIC_ROW; if (hasStatic) - staticRow = UnfilteredSerializer.serializer.deserializeStaticRow(in, header, new SerializationHelper(version, flag)); + staticRow = UnfilteredSerializer.serializer.deserializeStaticRow(in, header, new SerializationHelper(metadata, version, flag)); - int rowEstimate = hasRowEstimate ? in.readInt() : -1; + int rowEstimate = hasRowEstimate ? (int)in.readVInt() : -1; return new Header(header, metadata, key, isReversed, false, partitionDeletion, staticRow, rowEstimate); } - public void deserialize(DataInput in, SerializationHelper helper, SerializationHeader header, Row.Writer rowWriter, RangeTombstoneMarker.Writer markerWriter) throws IOException + public UnfilteredRowIterator deserialize(DataInputPlus in, int version, SerializationHelper.Flag flag, Header header) throws IOException { - while (UnfilteredSerializer.serializer.deserialize(in, header, helper, rowWriter, markerWriter) != null); - } - - public UnfilteredRowIterator deserialize(final DataInputPlus in, int version, SerializationHelper.Flag flag) throws IOException - { - final Header h = deserializeHeader(in, version, flag); - - if (h.isEmpty) - return UnfilteredRowIterators.emptyIterator(h.metadata, h.key, h.isReversed); - - final int clusteringSize = h.metadata.clusteringColumns().size(); - final SerializationHelper helper = new SerializationHelper(version, flag); + if (header.isEmpty) + return UnfilteredRowIterators.emptyIterator(header.metadata, header.key, header.isReversed); - return new AbstractUnfilteredRowIterator(h.metadata, h.key, h.partitionDeletion, h.sHeader.columns(), h.staticRow, h.isReversed, h.sHeader.stats()) + final SerializationHelper helper = new SerializationHelper(header.metadata, version, flag); + final SerializationHeader sHeader = header.sHeader; + return new AbstractUnfilteredRowIterator(header.metadata, header.key, header.partitionDeletion, sHeader.columns(), header.staticRow, header.isReversed, sHeader.stats()) { - private final ReusableRow row = new ReusableRow(clusteringSize, h.sHeader.columns().regulars, true, h.metadata.isCounter()); - private final RangeTombstoneMarker.Builder markerBuilder = new RangeTombstoneMarker.Builder(clusteringSize); + private final Row.Builder builder = ArrayBackedRow.sortedBuilder(sHeader.columns().regulars); protected Unfiltered computeNext() { try { - Unfiltered.Kind kind = UnfilteredSerializer.serializer.deserialize(in, h.sHeader, helper, row.writer(), markerBuilder.reset()); - if (kind == null) - return endOfData(); - - return kind == Unfiltered.Kind.ROW ? row : markerBuilder.build(); + Unfiltered unfiltered = UnfilteredSerializer.serializer.deserialize(in, sHeader, helper, builder); + return unfiltered == null ? endOfData() : unfiltered; } catch (IOException e) { @@ -241,30 +224,34 @@ public class UnfilteredRowIteratorSerializer }; } + public UnfilteredRowIterator deserialize(DataInputPlus in, int version, SerializationHelper.Flag flag) throws IOException + { + return deserialize(in, version, flag, deserializeHeader(in, version, flag)); + } + public static void writeDelTime(DeletionTime dt, SerializationHeader header, DataOutputPlus out) throws IOException { - out.writeLong(header.encodeTimestamp(dt.markedForDeleteAt())); - out.writeInt(header.encodeDeletionTime(dt.localDeletionTime())); + out.writeVInt(header.encodeTimestamp(dt.markedForDeleteAt())); + out.writeVInt(header.encodeDeletionTime(dt.localDeletionTime())); } public static long delTimeSerializedSize(DeletionTime dt, SerializationHeader header) { - return TypeSizes.sizeof(header.encodeTimestamp(dt.markedForDeleteAt())) - + TypeSizes.sizeof(header.encodeDeletionTime(dt.localDeletionTime())); + return TypeSizes.sizeofVInt(header.encodeTimestamp(dt.markedForDeleteAt())) + + TypeSizes.sizeofVInt(header.encodeDeletionTime(dt.localDeletionTime())); } - public static DeletionTime readDelTime(DataInput in, SerializationHeader header) throws IOException + public static DeletionTime readDelTime(DataInputPlus in, SerializationHeader header) throws IOException { - long markedAt = header.decodeTimestamp(in.readLong()); - int localDelTime = header.decodeDeletionTime(in.readInt()); - return new SimpleDeletionTime(markedAt, localDelTime); + long markedAt = header.decodeTimestamp(in.readVInt()); + int localDelTime = header.decodeDeletionTime((int)in.readVInt()); + return new DeletionTime(markedAt, localDelTime); } - public static void skipDelTime(DataInput in, SerializationHeader header) throws IOException + public static void skipDelTime(DataInputPlus in, SerializationHeader header) throws IOException { - // Note that since we might use VINT, we shouldn't assume the size of a long or an int - in.readLong(); - in.readInt(); + in.readVInt(); + in.readVInt(); } public static class Header http://git-wip-us.apache.org/repos/asf/cassandra/blob/24575994/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java index 2c71cf3..6b6ec67 100644 --- a/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java +++ b/src/java/org/apache/cassandra/db/rows/UnfilteredRowIterators.java @@ -17,7 +17,6 @@ */ package org.apache.cassandra.db.rows; -import java.nio.ByteBuffer; import java.util.*; import java.security.MessageDigest; @@ -26,13 +25,10 @@ import org.slf4j.LoggerFactory; import com.google.common.collect.AbstractIterator; import org.apache.cassandra.config.CFMetaData; -import org.apache.cassandra.config.ColumnDefinition; import org.apache.cassandra.db.*; -import org.apache.cassandra.db.partitions.PartitionUpdate; import org.apache.cassandra.io.util.FileUtils; import org.apache.cassandra.io.sstable.CorruptSSTableException; import org.apache.cassandra.serializers.MarshalException; -import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.IMergeIterator; import org.apache.cassandra.utils.MergeIterator; @@ -49,13 +45,9 @@ public abstract class UnfilteredRowIterators public interface MergeListener { - public void onMergePartitionLevelDeletion(DeletionTime mergedDeletion, DeletionTime[] versions); - - public void onMergingRows(Clustering clustering, LivenessInfo mergedInfo, DeletionTime mergedDeletion, Row[] versions); - public void onMergedComplexDeletion(ColumnDefinition c, DeletionTime mergedComplexDeletion, DeletionTime[] versions); - public void onMergedCells(Cell mergedCell, Cell[] versions); - public void onRowDone(); + public void onMergedPartitionLevelDeletion(DeletionTime mergedDeletion, DeletionTime[] versions); + public void onMergedRows(Row merged, Columns columns, Row[] versions); public void onMergedRangeTombstoneMarkers(RangeTombstoneMarker merged, RangeTombstoneMarker[] versions); public void close(); @@ -87,14 +79,13 @@ public abstract class UnfilteredRowIterators } /** - * Returns an iterator that is the result of merging other iterators, and using + * Returns an iterator that is the result of merging other iterators, and (optionally) using * specific MergeListener. * * Note that this method assumes that there is at least 2 iterators to merge. */ public static UnfilteredRowIterator merge(List<UnfilteredRowIterator> iterators, int nowInSec, MergeListener mergeListener) { - assert mergeListener != null; return UnfilteredRowMergeIterator.create(iterators, nowInSec, mergeListener); } @@ -175,10 +166,7 @@ public abstract class UnfilteredRowIterators while (iterator.hasNext()) { Unfiltered unfiltered = iterator.next(); - if (unfiltered.kind() == Unfiltered.Kind.ROW) - ((Row) unfiltered).digest(digest); - else - ((RangeTombstoneMarker) unfiltered).digest(digest); + unfiltered.digest(digest); } } @@ -198,12 +186,12 @@ public abstract class UnfilteredRowIterators && iter1.staticRow().equals(iter2.staticRow()); return new AbstractUnfilteredRowIterator(iter1.metadata(), - iter1.partitionKey(), - iter1.partitionLevelDeletion(), - iter1.columns(), - iter1.staticRow(), - iter1.isReverseOrder(), - iter1.stats()) + iter1.partitionKey(), + iter1.partitionLevelDeletion(), + iter1.columns(), + iter1.staticRow(), + iter1.isReverseOrder(), + iter1.stats()) { protected Unfiltered computeNext() { @@ -230,155 +218,35 @@ public abstract class UnfilteredRowIterators public static UnfilteredRowIterator cloningIterator(UnfilteredRowIterator iterator, final AbstractAllocator allocator) { - return new WrappingUnfilteredRowIterator(iterator) + return new AlteringUnfilteredRowIterator(iterator) { - private final CloningRow cloningRow = new CloningRow(); - private final RangeTombstoneMarker.Builder markerBuilder = new RangeTombstoneMarker.Builder(iterator.metadata().comparator.size()); - - public Unfiltered next() - { - Unfiltered next = super.next(); - return next.kind() == Unfiltered.Kind.ROW - ? cloningRow.setTo((Row)next) - : clone((RangeTombstoneMarker)next); - } - - private RangeTombstoneMarker clone(RangeTombstoneMarker marker) - { - markerBuilder.reset(); + private Row.Builder regularBuilder; - RangeTombstone.Bound bound = marker.clustering(); - for (int i = 0; i < bound.size(); i++) - markerBuilder.writeClusteringValue(allocator.clone(bound.get(i))); - markerBuilder.writeBoundKind(bound.kind()); - if (marker.isBoundary()) - { - RangeTombstoneBoundaryMarker bm = (RangeTombstoneBoundaryMarker)marker; - markerBuilder.writeBoundaryDeletion(bm.endDeletionTime(), bm.startDeletionTime()); - } - else - { - markerBuilder.writeBoundDeletion(((RangeTombstoneBoundMarker)marker).deletionTime()); - } - markerBuilder.endOfMarker(); - return markerBuilder.build(); - } - - class CloningRow extends WrappingRow + @Override + protected Row computeNextStatic(Row row) { - private final CloningClustering cloningClustering = new CloningClustering(); - private final CloningCell cloningCell = new CloningCell(); - - protected Cell filterCell(Cell cell) - { - return cloningCell.setTo(cell); - } - - @Override - public Clustering clustering() - { - return cloningClustering.setTo(super.clustering()); - } + Row.Builder staticBuilder = allocator.cloningArrayBackedRowBuilder(columns().statics); + return Rows.copy(row, staticBuilder).build(); } - class CloningClustering extends Clustering + @Override + protected Row computeNext(Row row) { - private Clustering wrapped; - - public Clustering setTo(Clustering wrapped) - { - this.wrapped = wrapped; - return this; - } - - public int size() - { - return wrapped.size(); - } - - public ByteBuffer get(int i) - { - ByteBuffer value = wrapped.get(i); - return value == null ? null : allocator.clone(value); - } + if (regularBuilder == null) + regularBuilder = allocator.cloningArrayBackedRowBuilder(columns().regulars); - public ByteBuffer[] getRawValues() - { - throw new UnsupportedOperationException(); - } + return Rows.copy(row, regularBuilder).build(); } - class CloningCell extends AbstractCell + @Override + protected RangeTombstoneMarker computeNext(RangeTombstoneMarker marker) { - private Cell wrapped; - - public Cell setTo(Cell wrapped) - { - this.wrapped = wrapped; - return this; - } - - public ColumnDefinition column() - { - return wrapped.column(); - } - - public boolean isCounterCell() - { - return wrapped.isCounterCell(); - } - - public ByteBuffer value() - { - return allocator.clone(wrapped.value()); - } - - public LivenessInfo livenessInfo() - { - return wrapped.livenessInfo(); - } - - public CellPath path() - { - CellPath path = wrapped.path(); - if (path == null) - return null; - - assert path.size() == 1; - return CellPath.create(allocator.clone(path.get(0))); - } + return marker.copy(allocator); } }; } /** - * Turns the given iterator into an update. - * - * Warning: this method does not close the provided iterator, it is up to - * the caller to close it. - */ - public static PartitionUpdate toUpdate(UnfilteredRowIterator iterator) - { - PartitionUpdate update = new PartitionUpdate(iterator.metadata(), iterator.partitionKey(), iterator.columns(), 1); - - update.addPartitionDeletion(iterator.partitionLevelDeletion()); - - if (iterator.staticRow() != Rows.EMPTY_STATIC_ROW) - iterator.staticRow().copyTo(update.staticWriter()); - - while (iterator.hasNext()) - { - Unfiltered unfiltered = iterator.next(); - if (unfiltered.kind() == Unfiltered.Kind.ROW) - ((Row) unfiltered).copyTo(update.writer()); - else - ((RangeTombstoneMarker) unfiltered).copyTo(update.markerWriter(iterator.isReverseOrder())); - } - - return update; - } - - /** * Validate that the data of the provided iterator is valid, that is that the values * it contains are valid for the type they represent, and more generally that the * infos stored are sensible. @@ -393,15 +261,34 @@ public abstract class UnfilteredRowIterators */ public static UnfilteredRowIterator withValidation(UnfilteredRowIterator iterator, final String filename) { - return new WrappingUnfilteredRowIterator(iterator) + return new AlteringUnfilteredRowIterator(iterator) { - public Unfiltered next() + @Override + protected Row computeNextStatic(Row row) + { + validate(row); + return row; + } + + @Override + protected Row computeNext(Row row) + { + validate(row); + return row; + } + + @Override + protected RangeTombstoneMarker computeNext(RangeTombstoneMarker marker) + { + validate(marker); + return marker; + } + + private void validate(Unfiltered unfiltered) { - Unfiltered next = super.next(); try { - next.validateData(metadata()); - return next; + unfiltered.validateData(iterator.metadata()); } catch (MarshalException me) { @@ -412,56 +299,6 @@ public abstract class UnfilteredRowIterators } /** - * Convert all expired cells to equivalent tombstones. - * <p> - * Once a cell expires, it acts exactly as a tombstone and this until it is purged. But in particular that - * means we don't care about the value of an expired cell, and it is thus equivalent but more efficient to - * replace the expired cell by an equivalent tombstone (that has no value). - * - * @param iterator the iterator in which to conver expired cells. - * @param nowInSec the current time to use to decide if a cell is expired. - * @return an iterator that returns the same data than {@code iterator} but with all expired cells converted - * to equivalent tombstones. - */ - public static UnfilteredRowIterator convertExpiredCellsToTombstones(UnfilteredRowIterator iterator, final int nowInSec) - { - return new FilteringRowIterator(iterator) - { - protected FilteringRow makeRowFilter() - { - return new FilteringRow() - { - @Override - protected Cell filterCell(Cell cell) - { - Cell filtered = super.filterCell(cell); - if (filtered == null) - return null; - - LivenessInfo info = filtered.livenessInfo(); - if (info.hasTTL() && !filtered.isLive(nowInSec)) - { - // The column is now expired, we can safely return a simple tombstone. Note that as long as the expiring - // column and the tombstone put together live longer than GC grace seconds, we'll fulfil our responsibility - // to repair. See discussion at - // http://cassandra-user-incubator-apache-org.3065146.n2.nabble.com/repair-compaction-and-tombstone-rows-td7583481.html - return Cells.create(filtered.column(), - filtered.isCounterCell(), - ByteBufferUtil.EMPTY_BYTE_BUFFER, - SimpleLivenessInfo.forDeletion(info.timestamp(), info.localDeletionTime() - info.ttl()), - filtered.path()); - } - else - { - return filtered; - } - } - }; - } - }; - } - - /** * Wraps the provided iterator so it logs the returned atoms for debugging purposes. * <p> * Note that this is only meant for debugging as this can log a very large amount of @@ -478,26 +315,28 @@ public abstract class UnfilteredRowIterators iterator.isReverseOrder(), iterator.partitionLevelDeletion().markedForDeleteAt()); - return new WrappingUnfilteredRowIterator(iterator) + return new AlteringUnfilteredRowIterator(iterator) { @Override - public Row staticRow() + protected Row computeNextStatic(Row row) { - Row row = super.staticRow(); if (!row.isEmpty()) logger.info("[{}] {}", id, row.toString(metadata(), fullDetails)); return row; } @Override - public Unfiltered next() + protected Row computeNext(Row row) { - Unfiltered next = super.next(); - if (next.kind() == Unfiltered.Kind.ROW) - logger.info("[{}] {}", id, ((Row)next).toString(metadata(), fullDetails)); - else - logger.info("[{}] {}", id, ((RangeTombstoneMarker)next).toString(metadata())); - return next; + logger.info("[{}] {}", id, row.toString(metadata(), fullDetails)); + return row; + } + + @Override + protected RangeTombstoneMarker computeNext(RangeTombstoneMarker marker) + { + logger.info("[{}] {}", id, marker.toString(metadata())); + return marker; } }; } @@ -526,10 +365,10 @@ public abstract class UnfilteredRowIterators reversed, mergeStats(iterators)); - this.listener = listener; this.mergeIterator = MergeIterator.get(iterators, reversed ? metadata.comparator.reversed() : metadata.comparator, - new MergeReducer(metadata, iterators.size(), reversed, nowInSec)); + new MergeReducer(iterators.size(), reversed, nowInSec, listener)); + this.listener = listener; } private static UnfilteredRowMergeIterator create(List<UnfilteredRowIterator> iterators, int nowInSec, MergeListener listener) @@ -591,7 +430,7 @@ public abstract class UnfilteredRowIterators delTime = iterDeletion; } if (listener != null && !delTime.isLive()) - listener.onMergePartitionLevelDeletion(delTime, versions); + listener.onMergedPartitionLevelDeletion(delTime, versions); return delTime; } @@ -605,14 +444,19 @@ public abstract class UnfilteredRowIterators if (columns.isEmpty()) return Rows.EMPTY_STATIC_ROW; - Row.Merger merger = Row.Merger.createStatic(metadata, iterators.size(), nowInSec, columns, listener); + if (iterators.stream().allMatch(iter -> iter.staticRow().isEmpty())) + return Rows.EMPTY_STATIC_ROW; + + Row.Merger merger = new Row.Merger(iterators.size(), nowInSec, columns); for (int i = 0; i < iterators.size(); i++) merger.add(i, iterators.get(i).staticRow()); - // Note that we should call 'takeAlias' on the result in theory, but we know that we - // won't reuse the merger and so that it's ok not to. Row merged = merger.merge(partitionDeletion); - return merged == null ? Rows.EMPTY_STATIC_ROW : merged; + if (merged == null) + merged = Rows.EMPTY_STATIC_ROW; + if (listener != null) + listener.onMergedRows(merged, columns, merger.mergedRows()); + return merged; } private static PartitionColumns collectColumns(List<UnfilteredRowIterator> iterators) @@ -659,26 +503,26 @@ public abstract class UnfilteredRowIterators listener.close(); } - /** - * Specific reducer for merge operations that rewrite the same reusable - * row every time. This also skip cells shadowed by range tombstones when writing. - */ private class MergeReducer extends MergeIterator.Reducer<Unfiltered, Unfiltered> { + private final MergeListener listener; + private Unfiltered.Kind nextKind; private final Row.Merger rowMerger; private final RangeTombstoneMarker.Merger markerMerger; - private MergeReducer(CFMetaData metadata, int size, boolean reversed, int nowInSec) + private MergeReducer(int size, boolean reversed, int nowInSec, MergeListener listener) { - this.rowMerger = Row.Merger.createRegular(metadata, size, nowInSec, columns().regulars, listener); - this.markerMerger = new RangeTombstoneMarker.Merger(metadata, size, partitionLevelDeletion(), reversed, listener); + this.rowMerger = new Row.Merger(size, nowInSec, columns().regulars); + this.markerMerger = new RangeTombstoneMarker.Merger(size, partitionLevelDeletion(), reversed); + this.listener = listener; } @Override public boolean trivialReduceIsTrivial() { + // If we have a listener, we must signal it even when we have a single version return listener == null; } @@ -693,9 +537,20 @@ public abstract class UnfilteredRowIterators protected Unfiltered getReduced() { - return nextKind == Unfiltered.Kind.ROW - ? rowMerger.merge(markerMerger.activeDeletion()) - : markerMerger.merge(); + if (nextKind == Unfiltered.Kind.ROW) + { + Row merged = rowMerger.merge(markerMerger.activeDeletion()); + if (listener != null) + listener.onMergedRows(merged == null ? ArrayBackedRow.emptyRow(rowMerger.mergedClustering()) : merged, columns().regulars, rowMerger.mergedRows()); + return merged; + } + else + { + RangeTombstoneMarker merged = markerMerger.merge(); + if (merged != null && listener != null) + listener.onMergedRangeTombstoneMarkers(merged, markerMerger.mergedMarkers()); + return merged; + } } protected void onKeyChange() @@ -712,13 +567,11 @@ public abstract class UnfilteredRowIterators { private final UnfilteredRowIterator iter; private final int nowInSec; - private final TombstoneFilteringRow filter; public FilteringIterator(UnfilteredRowIterator iter, int nowInSec) { this.iter = iter; this.nowInSec = nowInSec; - this.filter = new TombstoneFilteringRow(nowInSec); } public CFMetaData metadata() @@ -744,7 +597,11 @@ public abstract class UnfilteredRowIterators public Row staticRow() { Row row = iter.staticRow(); - return row.isEmpty() ? row : new TombstoneFilteringRow(nowInSec).setTo(row); + if (row.isEmpty()) + return Rows.EMPTY_STATIC_ROW; + + row = row.purge(DeletionPurger.PURGE_ALL, nowInSec); + return row == null ? Rows.EMPTY_STATIC_ROW : row; } protected Row computeNext() @@ -752,11 +609,11 @@ public abstract class UnfilteredRowIterators while (iter.hasNext()) { Unfiltered next = iter.next(); - if (next.kind() != Unfiltered.Kind.ROW) + if (next.isRangeTombstoneMarker()) continue; - Row row = filter.setTo((Row)next); - if (!row.isEmpty()) + Row row = ((Row)next).purge(DeletionPurger.PURGE_ALL, nowInSec); + if (row != null) return row; } return endOfData();