This is an automated email from the ASF dual-hosted git repository. blerer pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit 186c7316648bae39b4271223fcc1cd628669c337 Merge: f87e0ea a9ab7bd Author: Benjamin Lerer <b.le...@gmail.com> AuthorDate: Mon Jan 25 11:42:13 2021 +0100 Merge branch cassandra-3.11 into trunk CHANGES.txt | 1 + .../org/apache/cassandra/db/rows/ArrayCell.java | 12 +- .../org/apache/cassandra/db/rows/BTreeRow.java | 12 +- .../org/apache/cassandra/db/rows/BufferCell.java | 5 + src/java/org/apache/cassandra/db/rows/Cell.java | 7 + .../cassandra/db/rows/ComplexColumnData.java | 15 ++- .../org/apache/cassandra/db/rows/NativeCell.java | 6 + .../db/SSTableAndMemTableDigestMatchTest.java | 148 +++++++++++++++++++++ 8 files changed, 196 insertions(+), 10 deletions(-) diff --cc CHANGES.txt index 7f2410b,0993ba5..6827031 --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -1,18 -1,10 +1,19 @@@ -3.11.10 +4.0-beta5 + * Restore validation of each message's protocol version (CASSANDRA-16374) + * Upgrade netty and chronicle-queue dependencies to get Auditing and native library loading working on arm64 architectures (CASSANDRA-16384,CASSANDRA-16392) + * Release StreamingTombstoneHistogramBuilder spool when switching writers (CASSANDRA-14834) + * Correct memtable on-heap size calculations to match actual use (CASSANDRA-16318) + * Fix client notifications in CQL protocol v5 (CASSANDRA-16353) + * Too defensive check when picking sstables for preview repair (CASSANDRA-16284) + * Ensure pre-negotiation native protocol responses have correct stream id (CASSANDRA-16376) + * Fix check for -Xlog in cassandra-env.sh (CASSANDRA-16279) + * SSLFactory should initialize SSLContext before setting protocols (CASSANDRA-16362) + * Restore sasi dependencies jflex, snowball-stemmer, and concurrent-trees, in the cassandra-all pom (CASSANDRA-16303) + * Fix DecimalDeserializer#toString OOM (CASSANDRA-14925) +Merged from 3.11: + * Fix digest computation for queries with fetched but non queried columns (CASSANDRA-15962) * Reduce amount of allocations during batch statement execution (CASSANDRA-16201) * Update jflex-1.6.0.jar to match upstream (CASSANDRA-16393) - * Fix DecimalDeserializer#toString OOM (CASSANDRA-14925) - * Rate limit validation compactions using compaction_throughput_mb_per_sec (CASSANDRA-16161) - * SASI's `max_compaction_flush_memory_in_mb` settings over 100GB revert to default of 1GB (CASSANDRA-16071) Merged from 3.0: * Prevent unbounded number of pending flushing tasks (CASSANDRA-16261) * Improve empty hint file handling during startup (CASSANDRA-16162) diff --cc src/java/org/apache/cassandra/db/rows/ArrayCell.java index 5911e42,0000000..edeb65c mode 100644,000000..100644 --- a/src/java/org/apache/cassandra/db/rows/ArrayCell.java +++ b/src/java/org/apache/cassandra/db/rows/ArrayCell.java @@@ -1,111 -1,0 +1,117 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db.rows; + +import java.nio.ByteBuffer; + - import org.apache.cassandra.db.ExpirationDateOverflowHandling; +import org.apache.cassandra.db.marshal.ByteArrayAccessor; +import org.apache.cassandra.db.marshal.ByteType; +import org.apache.cassandra.db.marshal.ValueAccessor; +import org.apache.cassandra.schema.ColumnMetadata; - import org.apache.cassandra.utils.ByteArrayUtil; +import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.utils.ObjectSizes; +import org.apache.cassandra.utils.memory.AbstractAllocator; + ++import static org.apache.cassandra.utils.ByteArrayUtil.EMPTY_BYTE_ARRAY; ++ +public class ArrayCell extends AbstractCell<byte[]> +{ - private static final long EMPTY_SIZE = ObjectSizes.measure(new ArrayCell(ColumnMetadata.regularColumn("", "", "", ByteType.instance), 0L, 0, 0, ByteArrayUtil.EMPTY_BYTE_ARRAY, null)); ++ private static final long EMPTY_SIZE = ObjectSizes.measure(new ArrayCell(ColumnMetadata.regularColumn("", "", "", ByteType.instance), 0L, 0, 0, EMPTY_BYTE_ARRAY, null)); + + private final long timestamp; + private final int ttl; + private final int localDeletionTime; + + private final byte[] value; + private final CellPath path; + + public ArrayCell(ColumnMetadata column, long timestamp, int ttl, int localDeletionTime, byte[] value, CellPath path) + { + super(column); + this.timestamp = timestamp; + this.ttl = ttl; + this.localDeletionTime = localDeletionTime; + this.value = value; + this.path = path; + } + + public long timestamp() + { + return timestamp; + } + + public int ttl() + { + return ttl; + } + + public int localDeletionTime() + { + return localDeletionTime; + } + + public byte[] value() + { + return value; + } + + public ValueAccessor<byte[]> accessor() + { + return ByteArrayAccessor.instance; + } + + public CellPath path() + { + return path; + } + + public Cell<?> withUpdatedColumn(ColumnMetadata newColumn) + { + return new ArrayCell(newColumn, timestamp, ttl, localDeletionTime, value, path); + } + + public Cell<?> withUpdatedValue(ByteBuffer newValue) + { + return new ArrayCell(column, timestamp, ttl, localDeletionTime, ByteBufferUtil.getArray(newValue), path); + } + + public Cell<?> withUpdatedTimestampAndLocalDeletionTime(long newTimestamp, int newLocalDeletionTime) + { + return new ArrayCell(column, newTimestamp, ttl, newLocalDeletionTime, value, path); + } + ++ @Override ++ public Cell<?> withSkippedValue() ++ { ++ return new ArrayCell(column, timestamp, ttl, localDeletionTime, EMPTY_BYTE_ARRAY, path); ++ } ++ + public Cell<?> copy(AbstractAllocator allocator) + { + if (value.length == 0) + return this; + + return new BufferCell(column, timestamp, ttl, localDeletionTime, allocator.clone(value), path == null ? null : path.copy(allocator)); + } + + public long unsharedHeapSizeExcludingData() + { + return EMPTY_SIZE + ObjectSizes.sizeOfArray(value) + value.length + (path == null ? 0 : path.unsharedHeapSizeExcludingData()); + } +} diff --cc src/java/org/apache/cassandra/db/rows/BTreeRow.java index b971307,7540c11..bd44b66 --- a/src/java/org/apache/cassandra/db/rows/BTreeRow.java +++ b/src/java/org/apache/cassandra/db/rows/BTreeRow.java @@@ -322,13 -269,12 +322,13 @@@ public class BTreeRow extends AbstractR return filter(filter, DeletionTime.LIVE, false, metadata); } - public Row filter(ColumnFilter filter, DeletionTime activeDeletion, boolean setActiveDeletionToRow, CFMetaData metadata) + public Row filter(ColumnFilter filter, DeletionTime activeDeletion, boolean setActiveDeletionToRow, TableMetadata metadata) { - Map<ByteBuffer, CFMetaData.DroppedColumn> droppedColumns = metadata.getDroppedColumns(); + Map<ByteBuffer, DroppedColumn> droppedColumns = metadata.droppedColumns; - boolean mayFilterColumns = !filter.fetchesAllColumns(isStatic()); - boolean mayFilterColumns = !filter.fetchesAllColumns() || !filter.allFetchedColumnsAreQueried(); - boolean mayHaveShadowed = activeDeletion.supersedes(deletion.time()); ++ boolean mayFilterColumns = !filter.fetchesAllColumns(isStatic()) || !filter.allFetchedColumnsAreQueried(); + // When merging sstable data in Row.Merger#merge(), rowDeletion is removed if it doesn't supersede activeDeletion. + boolean mayHaveShadowed = !activeDeletion.isLive() && !deletion.time().supersedes(activeDeletion); if (!mayFilterColumns && !mayHaveShadowed && droppedColumns.isEmpty()) return this; diff --cc src/java/org/apache/cassandra/db/rows/BufferCell.java index 786ac3c,e445049..3795b0c --- a/src/java/org/apache/cassandra/db/rows/BufferCell.java +++ b/src/java/org/apache/cassandra/db/rows/BufferCell.java @@@ -127,7 -120,12 +127,12 @@@ public class BufferCell extends Abstrac return new BufferCell(column, newTimestamp, ttl, newLocalDeletionTime, value, path); } - public Cell withSkippedValue() ++ public Cell<?> withSkippedValue() + { + return withUpdatedValue(ByteBufferUtil.EMPTY_BYTE_BUFFER); + } + - public Cell copy(AbstractAllocator allocator) + public Cell<?> copy(AbstractAllocator allocator) { if (!value.hasRemaining()) return this; diff --cc src/java/org/apache/cassandra/db/rows/Cell.java index 6dd8f61,9e59246..38d1c84 --- a/src/java/org/apache/cassandra/db/rows/Cell.java +++ b/src/java/org/apache/cassandra/db/rows/Cell.java @@@ -144,13 -138,20 +144,20 @@@ public abstract class Cell<V> extends C */ public abstract CellPath path(); - public abstract Cell withUpdatedColumn(ColumnDefinition newColumn); + public abstract Cell<?> withUpdatedColumn(ColumnMetadata newColumn); - public abstract Cell withUpdatedValue(ByteBuffer newValue); + public abstract Cell<?> withUpdatedValue(ByteBuffer newValue); - public abstract Cell withUpdatedTimestampAndLocalDeletionTime(long newTimestamp, int newLocalDeletionTime); + public abstract Cell<?> withUpdatedTimestampAndLocalDeletionTime(long newTimestamp, int newLocalDeletionTime); + /** + * Used to apply the same optimization as in {@link Cell.Serializer#deserialize} when + * the column is not queried but eventhough it's used for digest calculation. + * @return a cell with an empty buffer as value + */ - public abstract Cell withSkippedValue(); ++ public abstract Cell<?> withSkippedValue(); + - public abstract Cell copy(AbstractAllocator allocator); + public abstract Cell<?> copy(AbstractAllocator allocator); @Override // Overrides super type to provide a more precise return type. diff --cc src/java/org/apache/cassandra/db/rows/ComplexColumnData.java index d421af2,2dea162..9f35437 --- a/src/java/org/apache/cassandra/db/rows/ComplexColumnData.java +++ b/src/java/org/apache/cassandra/db/rows/ComplexColumnData.java @@@ -160,10 -143,11 +160,11 @@@ public class ComplexColumnData extends return transformAndFilter(complexDeletion, Cell::markCounterLocalToBeCleared); } - public ComplexColumnData filter(ColumnFilter filter, DeletionTime activeDeletion, CFMetaData.DroppedColumn dropped, LivenessInfo rowLiveness) + public ComplexColumnData filter(ColumnFilter filter, DeletionTime activeDeletion, DroppedColumn dropped, LivenessInfo rowLiveness) { ColumnFilter.Tester cellTester = filter.newTester(column); - if (cellTester == null && activeDeletion.isLive() && dropped == null) + boolean isQueriedColumn = filter.fetchedColumnIsQueried(column); + if (cellTester == null && activeDeletion.isLive() && dropped == null && isQueriedColumn) return this; DeletionTime newDeletion = activeDeletion.supersedes(complexDeletion) ? DeletionTime.LIVE : complexDeletion; diff --cc src/java/org/apache/cassandra/db/rows/NativeCell.java index fe6f403,1f8c258..52d9ab9 --- a/src/java/org/apache/cassandra/db/rows/NativeCell.java +++ b/src/java/org/apache/cassandra/db/rows/NativeCell.java @@@ -20,9 -20,8 +20,10 @@@ package org.apache.cassandra.db.rows import java.nio.ByteBuffer; import java.nio.ByteOrder; -import org.apache.cassandra.config.ColumnDefinition; +import org.apache.cassandra.db.marshal.ByteBufferAccessor; +import org.apache.cassandra.db.marshal.ValueAccessor; +import org.apache.cassandra.schema.ColumnMetadata; + import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.ObjectSizes; import org.apache.cassandra.utils.concurrent.OpOrder; import org.apache.cassandra.utils.memory.MemoryUtil; diff --cc test/unit/org/apache/cassandra/db/SSTableAndMemTableDigestMatchTest.java index 0000000,54ed80b..30f9628 mode 000000,100644..100644 --- a/test/unit/org/apache/cassandra/db/SSTableAndMemTableDigestMatchTest.java +++ b/test/unit/org/apache/cassandra/db/SSTableAndMemTableDigestMatchTest.java @@@ -1,0 -1,157 +1,148 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + package org.apache.cassandra.db; + + import java.nio.ByteBuffer; + import java.util.HashMap; + import java.util.Map; + import java.util.NavigableSet; + import java.util.function.Function; + + import com.google.common.collect.Sets; + import org.junit.Test; + -import com.sun.xml.internal.xsom.impl.scd.Iterators; + import org.apache.cassandra.config.DatabaseDescriptor; + import org.apache.cassandra.cql3.CQLTester; + import org.apache.cassandra.cql3.ColumnIdentifier; + import org.apache.cassandra.db.filter.ClusteringIndexNamesFilter; + import org.apache.cassandra.db.filter.ColumnFilter; + import org.apache.cassandra.db.marshal.Int32Type; -import org.apache.cassandra.db.marshal.IntegerType; + import org.apache.cassandra.db.partitions.SingletonUnfilteredPartitionIterator; + import org.apache.cassandra.db.rows.CellPath; + import org.apache.cassandra.db.rows.UnfilteredRowIterator; + import org.apache.cassandra.net.MessagingService; ++import org.apache.cassandra.schema.TableMetadata; + import org.apache.cassandra.utils.ByteBufferUtil; + + import static org.junit.Assert.assertEquals; + + public class SSTableAndMemTableDigestMatchTest extends CQLTester + { + private final static long writeTime = System.currentTimeMillis() * 1000L; + + @Test + public void testSelectAllColumns() throws Throwable + { - testWithFilter(cfs -> ColumnFilter.all(cfs.metadata)); ++ testWithFilter(tableMetadata -> ++ ColumnFilter.all(tableMetadata)); + } + + @Test + public void testSelectNoColumns() throws Throwable + { - testWithFilter(cfs -> ColumnFilter.selection(cfs.metadata, PartitionColumns.NONE)); ++ testWithFilter(tableMetadata -> ++ ColumnFilter.selection(tableMetadata, RegularAndStaticColumns.builder().build())); + } + + @Test + public void testSelectEmptyColumn() throws Throwable + { - testWithFilter(cfs -> ColumnFilter.selection(cfs.metadata, PartitionColumns.of(cfs.metadata.getColumnDefinition(ColumnIdentifier.getInterned("e", false))))); ++ testWithFilter(tableMetadata -> ++ ColumnFilter.selection(tableMetadata, RegularAndStaticColumns.of(tableMetadata.getColumn(ColumnIdentifier.getInterned("e", false))))); + } + + @Test + public void testSelectNonEmptyColumn() throws Throwable + { - testWithFilter(cfs -> ColumnFilter.selection(cfs.metadata, PartitionColumns.of(cfs.metadata.getColumnDefinition(ColumnIdentifier.getInterned("v1", false))))); ++ testWithFilter(tableMetadata -> ++ ColumnFilter.selection(tableMetadata, RegularAndStaticColumns.of(tableMetadata.getColumn(ColumnIdentifier.getInterned("v1", false))))); + } + + @Test + public void testSelectEachNonEmptyColumn() throws Throwable + { - testWithFilter(cfs -> ColumnFilter.selection(cfs.metadata, - PartitionColumns.builder() - .add(cfs.metadata.getColumnDefinition(ColumnIdentifier.getInterned("v1", false))) - .add(cfs.metadata.getColumnDefinition(ColumnIdentifier.getInterned("v2", false))) - .add(cfs.metadata.getColumnDefinition(ColumnIdentifier.getInterned("m", false))) - .build())); - } - - @Test - public void testSelectEmptyComplexColumn() throws Throwable - { - testWithFilter(cfs -> ColumnFilter.selection(cfs.metadata, - PartitionColumns.builder() - .add(cfs.metadata.getColumnDefinition(ColumnIdentifier.getInterned("em", false))) ++ testWithFilter(tableMetadata -> ++ ColumnFilter.selection(tableMetadata, ++ RegularAndStaticColumns.builder() ++ .add(tableMetadata.getColumn(ColumnIdentifier.getInterned("v1", false))) ++ .add(tableMetadata.getColumn(ColumnIdentifier.getInterned("v2", false))) + .build())); + } + + @Test + public void testSelectCellsFromEmptyComplexColumn() throws Throwable + { - testWithFilter(cfs -> ColumnFilter.selectionBuilder() - .select(cfs.metadata.getColumnDefinition(ColumnIdentifier.getInterned("em", false)), - CellPath.create(Int32Type.instance.decompose(5))).build()); ++ testWithFilter(tableMetadata -> ColumnFilter.selectionBuilder().select(tableMetadata.getColumn(ColumnIdentifier.getInterned("em", false)), ++ CellPath.create(Int32Type.instance.decompose(5))).build()); + } + + @Test + public void testSelectNonEmptyCellsFromComplexColumn() throws Throwable + { - testWithFilter(cfs -> ColumnFilter.selectionBuilder() - .select(cfs.metadata.getColumnDefinition(ColumnIdentifier.getInterned("m", false)), - CellPath.create(Int32Type.instance.decompose(1))).build()); ++ testWithFilter(tableMetadata -> ColumnFilter.selectionBuilder().select(tableMetadata.getColumn(ColumnIdentifier.getInterned("m", false)), ++ CellPath.create(Int32Type.instance.decompose(1))).build()); + } + + @Test + public void testSelectEmptyCellsFromNonEmptyComplexColumn() throws Throwable + { - testWithFilter(cfs -> ColumnFilter.selectionBuilder() - .select(cfs.metadata.getColumnDefinition(ColumnIdentifier.getInterned("m", false)), - CellPath.create(Int32Type.instance.decompose(5))).build()); ++ testWithFilter(tableMetadata -> ColumnFilter.selectionBuilder().select(tableMetadata.getColumn(ColumnIdentifier.getInterned("m", false)), ++ CellPath.create(Int32Type.instance.decompose(5))).build()); + } + - private void testWithFilter(Function<ColumnFamilyStore, ColumnFilter> filterFactory) throws Throwable ++ private void testWithFilter(Function<TableMetadata, ColumnFilter> filterFactory) throws Throwable + { + Map<Integer, Integer> m = new HashMap<>(); + m.put(1, 10); + createTable("CREATE TABLE %s (k int PRIMARY KEY, v1 int, v2 int, e text, m map<int, int>, em map<int, int>)"); + execute("INSERT INTO %s (k, v1, v2, m) values (?, ?, ?, ?) USING TIMESTAMP ?", 1, 2, 3, m, writeTime); + + ColumnFamilyStore cfs = getCurrentColumnFamilyStore(); - ColumnFilter filter = filterFactory.apply(cfs); ++ ColumnFilter filter = filterFactory.apply(cfs.metadata()); + String digest1 = getDigest(filter); + flush(); + String digest2 = getDigest(filter); + + assertEquals(digest1, digest2); + } + + private String getDigest(ColumnFilter filter) + { + ColumnFamilyStore cfs = getCurrentColumnFamilyStore(); - NavigableSet<Clustering> clusterings = Sets.newTreeSet(new ClusteringComparator()); ++ NavigableSet<Clustering<?>> clusterings = Sets.newTreeSet(new ClusteringComparator()); + clusterings.add(Clustering.EMPTY); + BufferDecoratedKey key = new BufferDecoratedKey(DatabaseDescriptor.getPartitioner().getToken(Int32Type.instance.decompose(1)), + Int32Type.instance.decompose(1)); + SinglePartitionReadCommand cmd = SinglePartitionReadCommand - .create(cfs.metadata, ++ .create(cfs.metadata(), + (int) (System.currentTimeMillis() / 1000), + key, + filter, + new ClusteringIndexNamesFilter(clusterings, false)).copyAsDigestQuery(); + cmd.setDigestVersion(MessagingService.current_version); + ReadResponse resp; + try (ReadExecutionController ctrl = ReadExecutionController.forCommand(cmd); UnfilteredRowIterator iterator = cmd.queryMemtableAndDisk(cfs, ctrl)) + { - resp = ReadResponse.createDataResponse(new SingletonUnfilteredPartitionIterator(iterator, false), cmd); ++ resp = ReadResponse.createDataResponse(new SingletonUnfilteredPartitionIterator(iterator), cmd); + logger.info("Response is: {}", resp.toDebugString(cmd, key)); + ByteBuffer digest = resp.digest(cmd); + return ByteBufferUtil.bytesToHex(digest); + } + } + } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org