Drop/add column name with different Kind can result in corruption patch by Benedict; reviewed by Sam for CASSANDRA-14843
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/4b1f40d5 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/4b1f40d5 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/4b1f40d5 Branch: refs/heads/cassandra-3.11 Commit: 4b1f40d5382638bf3913293b713d5d22b57c844d Parents: f7630e4 Author: Benedict Elliott Smith <bened...@apache.org> Authored: Tue Nov 27 16:22:05 2018 +0000 Committer: Benedict Elliott Smith <bened...@apache.org> Committed: Thu Nov 29 14:29:20 2018 +0000 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../cassandra/db/SerializationHeader.java | 44 ++++--- .../cassandra/db/SerializationHeaderTest.java | 129 +++++++++++++++++++ 3 files changed, 155 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/4b1f40d5/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 6e18de1..060fa9d 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,6 +1,7 @@ 3.0.18 * RangeTombstoneList doesn't properly clean up mergeable or superseded rts in some cases (CASSANDRA-14894) * Fix handling of collection tombstones for dropped columns from legacy sstables (CASSANDRA-14912) + * Drop/add column name with different Kind can result in corruption (CASSANDRA-14843) * Fix missing rows when reading 2.1 SSTables with static columns in 3.0 (CASSANDRA-14873) * Move TWCS message 'No compaction necessary for bucket size' to Trace level (CASSANDRA-14884) * Sstable min/max metadata can cause data loss (CASSANDRA-14861) http://git-wip-us.apache.org/repos/asf/cassandra/blob/4b1f40d5/src/java/org/apache/cassandra/db/SerializationHeader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/SerializationHeader.java b/src/java/org/apache/cassandra/db/SerializationHeader.java index b2ed26e..5c4f518 100644 --- a/src/java/org/apache/cassandra/db/SerializationHeader.java +++ b/src/java/org/apache/cassandra/db/SerializationHeader.java @@ -325,31 +325,37 @@ public class SerializationHeader public SerializationHeader toHeader(CFMetaData metadata) { Map<ByteBuffer, AbstractType<?>> typeMap = new HashMap<>(staticColumns.size() + regularColumns.size()); - typeMap.putAll(staticColumns); - typeMap.putAll(regularColumns); PartitionColumns.Builder builder = PartitionColumns.builder(); - for (ByteBuffer name : typeMap.keySet()) + for (Map<ByteBuffer, AbstractType<?>> map : ImmutableList.of(staticColumns, regularColumns)) { - ColumnDefinition column = metadata.getColumnDefinition(name); - - if (column == null) + boolean isStatic = map == staticColumns; + for (Map.Entry<ByteBuffer, AbstractType<?>> e : map.entrySet()) { - // TODO: this imply we don't read data for a column we don't yet know about, which imply this is theoretically - // racy with column addition. Currently, it is up to the user to not write data before the schema has propagated - // and this is far from being the only place that has such problem in practice. This doesn't mean we shouldn't - // improve this. - - // If we don't find the definition, it could be we have data for a dropped column, and we shouldn't - // fail deserialization because of that. So we grab a "fake" ColumnDefinition that ensure proper - // deserialization. The column will be ignore later on anyway. - boolean isStatic = staticColumns.containsKey(name); - column = metadata.getDroppedColumnDefinition(name, isStatic); - if (column == null) - throw new RuntimeException("Unknown column " + UTF8Type.instance.getString(name) + " during deserialization"); + ByteBuffer name = e.getKey(); + AbstractType<?> other = typeMap.put(name, e.getValue()); + if (other != null && !other.equals(e.getValue())) + throw new IllegalStateException("Column " + name + " occurs as both regular and static with types " + other + "and " + e.getValue()); + + ColumnDefinition column = metadata.getColumnDefinition(name); + if (column == null || column.isStatic() != isStatic) + { + // TODO: this imply we don't read data for a column we don't yet know about, which imply this is theoretically + // racy with column addition. Currently, it is up to the user to not write data before the schema has propagated + // and this is far from being the only place that has such problem in practice. This doesn't mean we shouldn't + // improve this. + + // If we don't find the definition, it could be we have data for a dropped column, and we shouldn't + // fail deserialization because of that. So we grab a "fake" ColumnDefinition that ensure proper + // deserialization. The column will be ignore later on anyway. + column = metadata.getDroppedColumnDefinition(name, isStatic); + if (column == null) + throw new RuntimeException("Unknown column " + UTF8Type.instance.getString(name) + " during deserialization"); + } + builder.add(column); } - builder.add(column); } + return new SerializationHeader(true, keyType, clusteringTypes, builder.build(), stats, typeMap); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/4b1f40d5/test/unit/org/apache/cassandra/db/SerializationHeaderTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/SerializationHeaderTest.java b/test/unit/org/apache/cassandra/db/SerializationHeaderTest.java new file mode 100644 index 0000000..3e9f3bc --- /dev/null +++ b/test/unit/org/apache/cassandra/db/SerializationHeaderTest.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db; + +import com.google.common.io.Files; +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.config.ColumnDefinition; +import org.apache.cassandra.cql3.ColumnIdentifier; +import org.apache.cassandra.db.compaction.OperationType; +import org.apache.cassandra.db.lifecycle.LifecycleTransaction; +import org.apache.cassandra.db.marshal.Int32Type; +import org.apache.cassandra.db.partitions.PartitionUpdate; +import org.apache.cassandra.db.rows.BTreeRow; +import org.apache.cassandra.db.rows.BufferCell; +import org.apache.cassandra.db.rows.Cell; +import org.apache.cassandra.db.rows.Row; +import org.apache.cassandra.db.rows.UnfilteredRowIterator; +import org.apache.cassandra.io.sstable.Component; +import org.apache.cassandra.io.sstable.Descriptor; +import org.apache.cassandra.io.sstable.ISSTableScanner; +import org.apache.cassandra.io.sstable.format.SSTableFormat; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.io.sstable.format.SSTableWriter; +import org.apache.cassandra.io.sstable.format.big.BigFormat; +import org.apache.cassandra.io.sstable.format.big.BigTableWriter; +import org.apache.cassandra.io.util.FileUtils; +import org.junit.Assert; +import org.junit.Test; + +import java.io.File; +import java.nio.ByteBuffer; +import java.util.concurrent.Callable; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.BiFunction; +import java.util.function.Function; + +public class SerializationHeaderTest +{ + private static String KEYSPACE = "SerializationHeaderTest"; + + @Test + public void testWrittenAsDifferentKind() throws Exception + { + final String tableName = "testWrittenAsDifferentKind"; + final String schemaCqlWithStatic = String.format("CREATE TABLE %s (k int, c int, v int static, PRIMARY KEY(k, c))", tableName); + final String schemaCqlWithRegular = String.format("CREATE TABLE %s (k int, c int, v int, PRIMARY KEY(k, c))", tableName); + ColumnIdentifier v = ColumnIdentifier.getInterned("v", false); + CFMetaData schemaWithStatic = CFMetaData.compile(schemaCqlWithStatic, KEYSPACE); + CFMetaData schemaWithRegular = CFMetaData.compile(schemaCqlWithRegular, KEYSPACE); + ColumnDefinition columnStatic = schemaWithStatic.getColumnDefinition(v); + ColumnDefinition columnRegular = schemaWithRegular.getColumnDefinition(v); + schemaWithStatic.recordColumnDrop(columnRegular, 0L); + schemaWithRegular.recordColumnDrop(columnStatic, 0L); + + final AtomicInteger generation = new AtomicInteger(); + File dir = Files.createTempDir(); + try + { + BiFunction<CFMetaData, Function<ByteBuffer, Clustering>, Callable<Descriptor>> writer = (schema, clusteringFunction) -> () -> { + Descriptor descriptor = new Descriptor(BigFormat.latestVersion, dir, schema.ksName, schema.cfName, generation.incrementAndGet(), SSTableFormat.Type.BIG, Component.DIGEST_CRC32); + + SerializationHeader header = SerializationHeader.makeWithoutStats(schema); + try (LifecycleTransaction txn = LifecycleTransaction.offline(OperationType.WRITE); + SSTableWriter sstableWriter = BigTableWriter.create(schema, descriptor, 1, 0L, 0, header, txn)) + { + ColumnDefinition cd = schema.getColumnDefinition(v); + for (int i = 0 ; i < 5 ; ++i) { + final ByteBuffer value = Int32Type.instance.decompose(i); + Cell cell = BufferCell.live(schema, cd, 1L, value); + Clustering clustering = clusteringFunction.apply(value); + Row row = BTreeRow.singleCellRow(clustering, cell); + sstableWriter.append(PartitionUpdate.singleRowUpdate(schema, value, row).unfilteredIterator()); + } + sstableWriter.finish(false); + txn.finish(); + } + return descriptor; + }; + + Descriptor sstableWithRegular = writer.apply(schemaWithRegular, Clustering::new).call(); + Descriptor sstableWithStatic = writer.apply(schemaWithStatic, value -> Clustering.STATIC_CLUSTERING).call(); + SSTableReader readerWithStatic = SSTableReader.openNoValidation(sstableWithStatic, schemaWithRegular); + SSTableReader readerWithRegular = SSTableReader.openNoValidation(sstableWithRegular, schemaWithStatic); + + try (ISSTableScanner partitions = readerWithStatic.getScanner()) { + for (int i = 0 ; i < 5 ; ++i) + { + UnfilteredRowIterator partition = partitions.next(); + Assert.assertFalse(partition.hasNext()); + long value = Int32Type.instance.compose(partition.staticRow().getCell(columnStatic).value()); + Assert.assertEquals(value, (long)i); + } + Assert.assertFalse(partitions.hasNext()); + } + try (ISSTableScanner partitions = readerWithRegular.getScanner()) { + for (int i = 0 ; i < 5 ; ++i) + { + UnfilteredRowIterator partition = partitions.next(); + long value = Int32Type.instance.compose(((Row)partition.next()).getCell(columnRegular).value()); + Assert.assertEquals(value, (long)i); + Assert.assertTrue(partition.staticRow().isEmpty()); + Assert.assertFalse(partition.hasNext()); + } + Assert.assertFalse(partitions.hasNext()); + } + } + finally + { + FileUtils.deleteRecursive(dir); + } + } + +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org