Merge branch 'cassandra-3.11' into trunk
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/2ee9d685 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/2ee9d685 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/2ee9d685 Branch: refs/heads/trunk Commit: 2ee9d6854bd82608cd36eb3fb606108e84714932 Parents: 9a7db29 1084ad9 Author: Benedict Elliott Smith <bened...@apache.org> Authored: Thu Nov 29 14:54:21 2018 +0000 Committer: Benedict Elliott Smith <bened...@apache.org> Committed: Thu Nov 29 14:54:21 2018 +0000 ---------------------------------------------------------------------- CHANGES.txt | 2 + src/java/org/apache/cassandra/db/Columns.java | 2 + .../cassandra/db/SerializationHeader.java | 44 +++--- .../org/apache/cassandra/db/ColumnsTest.java | 29 +++- .../cassandra/db/SerializationHeaderTest.java | 145 +++++++++++++++++++ 5 files changed, 203 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/2ee9d685/CHANGES.txt ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/2ee9d685/src/java/org/apache/cassandra/db/Columns.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/2ee9d685/src/java/org/apache/cassandra/db/SerializationHeader.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/SerializationHeader.java index 4266629,8e2844b..deadf68 --- a/src/java/org/apache/cassandra/db/SerializationHeader.java +++ b/src/java/org/apache/cassandra/db/SerializationHeader.java @@@ -21,20 -21,21 +21,21 @@@ import java.io.IOException import java.nio.ByteBuffer; import java.util.*; + import com.google.common.collect.ImmutableList; -import org.apache.cassandra.config.CFMetaData; -import org.apache.cassandra.config.ColumnDefinition; import org.apache.cassandra.db.filter.ColumnFilter; -import org.apache.cassandra.db.rows.*; import org.apache.cassandra.db.marshal.AbstractType; -import org.apache.cassandra.db.marshal.UTF8Type; import org.apache.cassandra.db.marshal.TypeParser; +import org.apache.cassandra.db.marshal.UTF8Type; +import org.apache.cassandra.db.rows.*; import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.io.sstable.format.Version; -import org.apache.cassandra.io.sstable.metadata.MetadataType; -import org.apache.cassandra.io.sstable.metadata.MetadataComponent; import org.apache.cassandra.io.sstable.metadata.IMetadataComponentSerializer; +import org.apache.cassandra.io.sstable.metadata.MetadataComponent; +import org.apache.cassandra.io.sstable.metadata.MetadataType; import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; +import org.apache.cassandra.schema.ColumnMetadata; +import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.utils.ByteBufferUtil; public class SerializationHeader @@@ -291,33 -295,40 +292,40 @@@ return MetadataType.HEADER; } - public SerializationHeader toHeader(CFMetaData metadata) + public SerializationHeader toHeader(TableMetadata metadata) { Map<ByteBuffer, AbstractType<?>> typeMap = new HashMap<>(staticColumns.size() + regularColumns.size()); - typeMap.putAll(staticColumns); - typeMap.putAll(regularColumns); - PartitionColumns.Builder builder = PartitionColumns.builder(); + RegularAndStaticColumns.Builder builder = RegularAndStaticColumns.builder(); - for (ByteBuffer name : typeMap.keySet()) + for (Map<ByteBuffer, AbstractType<?>> map : ImmutableList.of(staticColumns, regularColumns)) { - ColumnMetadata column = metadata.getColumn(name); - if (column == null) + boolean isStatic = map == staticColumns; + for (Map.Entry<ByteBuffer, AbstractType<?>> e : map.entrySet()) { - // TODO: this imply we don't read data for a column we don't yet know about, which imply this is theoretically - // racy with column addition. Currently, it is up to the user to not write data before the schema has propagated - // and this is far from being the only place that has such problem in practice. This doesn't mean we shouldn't - // improve this. - - // If we don't find the definition, it could be we have data for a dropped column, and we shouldn't - // fail deserialization because of that. So we grab a "fake" ColumnMetadata that ensure proper - // deserialization. The column will be ignore later on anyway. - boolean isStatic = staticColumns.containsKey(name); - column = metadata.getDroppedColumn(name, isStatic); - if (column == null) - throw new RuntimeException("Unknown column " + UTF8Type.instance.getString(name) + " during deserialization"); + ByteBuffer name = e.getKey(); + AbstractType<?> other = typeMap.put(name, e.getValue()); + if (other != null && !other.equals(e.getValue())) + throw new IllegalStateException("Column " + name + " occurs as both regular and static with types " + other + "and " + e.getValue()); + - ColumnDefinition column = metadata.getColumnDefinition(name); ++ ColumnMetadata column = metadata.getColumn(name); + if (column == null || column.isStatic() != isStatic) + { + // TODO: this imply we don't read data for a column we don't yet know about, which imply this is theoretically + // racy with column addition. Currently, it is up to the user to not write data before the schema has propagated + // and this is far from being the only place that has such problem in practice. This doesn't mean we shouldn't + // improve this. + + // If we don't find the definition, it could be we have data for a dropped column, and we shouldn't + // fail deserialization because of that. So we grab a "fake" ColumnDefinition that ensure proper + // deserialization. The column will be ignore later on anyway. - column = metadata.getDroppedColumnDefinition(name, isStatic); ++ column = metadata.getDroppedColumn(name, isStatic); + if (column == null) + throw new RuntimeException("Unknown column " + UTF8Type.instance.getString(name) + " during deserialization"); + } + builder.add(column); } - builder.add(column); } + return new SerializationHeader(true, keyType, clusteringTypes, builder.build(), stats, typeMap); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/2ee9d685/test/unit/org/apache/cassandra/db/ColumnsTest.java ---------------------------------------------------------------------- diff --cc test/unit/org/apache/cassandra/db/ColumnsTest.java index dae0d0a,1f34c88..a5d267e --- a/test/unit/org/apache/cassandra/db/ColumnsTest.java +++ b/test/unit/org/apache/cassandra/db/ColumnsTest.java @@@ -51,8 -53,33 +53,33 @@@ public class ColumnsTes DatabaseDescriptor.daemonInitialization(); } - private static final CFMetaData cfMetaData = MockSchema.newCFS().metadata; + private static final TableMetadata TABLE_METADATA = MockSchema.newCFS().metadata(); + @Test + public void testDeserializeCorruption() throws IOException + { + ColumnsCheck check = randomSmall(1, 0, 3, 0); + Columns superset = check.columns; - List<ColumnDefinition> minus1 = new ArrayList<>(check.definitions); ++ List<ColumnMetadata> minus1 = new ArrayList<>(check.definitions); + minus1.remove(3); + Columns minus2 = check.columns + .without(check.columns.getSimple(3)) + .without(check.columns.getSimple(2)); + try (DataOutputBuffer out = new DataOutputBuffer()) + { + // serialize a subset + Columns.serializer.serializeSubset(minus1, superset, out); + try (DataInputBuffer in = new DataInputBuffer(out.toByteArray())) + { + Columns.serializer.deserializeSubset(minus2, in); + Assert.assertFalse(true); + } + catch (IOException e) + { + } + } + } + // this tests most of our functionality, since each subset we perform // reasonably comprehensive tests of basic functionality against @Test @@@ -451,34 -478,33 +478,34 @@@ { int i = 0; for (String name : names) - results.add(ColumnDefinition.clusteringDef(cfMetaData, bytes(name), UTF8Type.instance, i++)); + results.add(ColumnMetadata.clusteringColumn(TABLE_METADATA, bytes(name), UTF8Type.instance, i++)); } - private static void addRegular(List<String> names, List<ColumnDefinition> results) + private static void addRegular(List<String> names, List<ColumnMetadata> results) { for (String name : names) - results.add(ColumnDefinition.regularDef(cfMetaData, bytes(name), UTF8Type.instance)); + results.add(ColumnMetadata.regularColumn(TABLE_METADATA, bytes(name), UTF8Type.instance)); } - private static <V> void addComplex(List<String> names, List<ColumnMetadata> results) - private static void addComplex(List<String> names, List<ColumnDefinition> results) ++ private static void addComplex(List<String> names, List<ColumnMetadata> results) { for (String name : names) - results.add(ColumnDefinition.regularDef(cfMetaData, bytes(name), SetType.getInstance(UTF8Type.instance, true))); + results.add(ColumnMetadata.regularColumn(TABLE_METADATA, bytes(name), SetType.getInstance(UTF8Type.instance, true))); } - private static ColumnDefinition def(String name, AbstractType<?> type, ColumnDefinition.Kind kind) + private static ColumnMetadata def(String name, AbstractType<?> type, ColumnMetadata.Kind kind) { - return new ColumnDefinition(cfMetaData, bytes(name), type, ColumnDefinition.NO_POSITION, kind); + return new ColumnMetadata(TABLE_METADATA, bytes(name), type, ColumnMetadata.NO_POSITION, kind); } - private static CFMetaData mock(Columns columns) + private static TableMetadata mock(Columns columns) { if (columns.isEmpty()) - return cfMetaData; - CFMetaData.Builder builder = CFMetaData.Builder.create(cfMetaData.ksName, cfMetaData.cfName); + return TABLE_METADATA; + + TableMetadata.Builder builder = TableMetadata.builder(TABLE_METADATA.keyspace, TABLE_METADATA.name); boolean hasPartitionKey = false; - for (ColumnDefinition def : columns) + for (ColumnMetadata def : columns) { switch (def.kind) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/2ee9d685/test/unit/org/apache/cassandra/db/SerializationHeaderTest.java ---------------------------------------------------------------------- diff --cc test/unit/org/apache/cassandra/db/SerializationHeaderTest.java index 0000000,84fb51c..1092a90 mode 000000,100644..100644 --- a/test/unit/org/apache/cassandra/db/SerializationHeaderTest.java +++ b/test/unit/org/apache/cassandra/db/SerializationHeaderTest.java @@@ -1,0 -1,136 +1,145 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + package org.apache.cassandra.db; + + import com.google.common.io.Files; -import org.apache.cassandra.config.CFMetaData; -import org.apache.cassandra.config.ColumnDefinition; + import org.apache.cassandra.config.DatabaseDescriptor; + import org.apache.cassandra.cql3.ColumnIdentifier; + import org.apache.cassandra.db.compaction.OperationType; + import org.apache.cassandra.db.lifecycle.LifecycleTransaction; + import org.apache.cassandra.db.marshal.Int32Type; + import org.apache.cassandra.db.partitions.PartitionUpdate; + import org.apache.cassandra.db.rows.BTreeRow; + import org.apache.cassandra.db.rows.BufferCell; + import org.apache.cassandra.db.rows.Cell; + import org.apache.cassandra.db.rows.Row; + import org.apache.cassandra.db.rows.UnfilteredRowIterator; + import org.apache.cassandra.io.sstable.Component; + import org.apache.cassandra.io.sstable.Descriptor; + import org.apache.cassandra.io.sstable.ISSTableScanner; + import org.apache.cassandra.io.sstable.format.SSTableFormat; + import org.apache.cassandra.io.sstable.format.SSTableReader; + import org.apache.cassandra.io.sstable.format.SSTableWriter; + import org.apache.cassandra.io.sstable.format.big.BigFormat; + import org.apache.cassandra.io.sstable.format.big.BigTableWriter; + import org.apache.cassandra.io.util.FileUtils; ++import org.apache.cassandra.schema.ColumnMetadata; ++import org.apache.cassandra.schema.TableMetadata; ++import org.apache.cassandra.schema.TableMetadataRef; + import org.junit.Assert; + import org.junit.Test; + + import java.io.File; + import java.nio.ByteBuffer; + import java.util.Collections; + import java.util.concurrent.Callable; + import java.util.concurrent.atomic.AtomicInteger; + import java.util.function.BiFunction; + import java.util.function.Function; + + public class SerializationHeaderTest + { + private static String KEYSPACE = "SerializationHeaderTest"; + + static + { + DatabaseDescriptor.daemonInitialization(); + } + + @Test + public void testWrittenAsDifferentKind() throws Exception + { + final String tableName = "testWrittenAsDifferentKind"; - final String schemaCqlWithStatic = String.format("CREATE TABLE %s (k int, c int, v int static, PRIMARY KEY(k, c))", tableName); - final String schemaCqlWithRegular = String.format("CREATE TABLE %s (k int, c int, v int, PRIMARY KEY(k, c))", tableName); ++// final String schemaCqlWithStatic = String.format("CREATE TABLE %s (k int, c int, v int static, PRIMARY KEY(k, c))", tableName); ++// final String schemaCqlWithRegular = String.format("CREATE TABLE %s (k int, c int, v int, PRIMARY KEY(k, c))", tableName); + ColumnIdentifier v = ColumnIdentifier.getInterned("v", false); - CFMetaData schemaWithStatic = CFMetaData.compile(schemaCqlWithStatic, KEYSPACE); - CFMetaData schemaWithRegular = CFMetaData.compile(schemaCqlWithRegular, KEYSPACE); - ColumnDefinition columnStatic = schemaWithStatic.getColumnDefinition(v); - ColumnDefinition columnRegular = schemaWithRegular.getColumnDefinition(v); - schemaWithStatic.recordColumnDrop(columnRegular, 0L); - schemaWithRegular.recordColumnDrop(columnStatic, 0L); ++ TableMetadata schemaWithStatic = TableMetadata.builder(KEYSPACE, tableName) ++ .addPartitionKeyColumn("k", Int32Type.instance) ++ .addClusteringColumn("c", Int32Type.instance) ++ .addStaticColumn("v", Int32Type.instance) ++ .build(); ++ TableMetadata schemaWithRegular = TableMetadata.builder(KEYSPACE, tableName) ++ .addPartitionKeyColumn("k", Int32Type.instance) ++ .addClusteringColumn("c", Int32Type.instance) ++ .addRegularColumn("v", Int32Type.instance) ++ .build(); ++ ColumnMetadata columnStatic = schemaWithStatic.getColumn(v); ++ ColumnMetadata columnRegular = schemaWithRegular.getColumn(v); ++ schemaWithStatic = schemaWithStatic.unbuild().recordColumnDrop(columnRegular, 0L).build(); ++ schemaWithRegular = schemaWithRegular.unbuild().recordColumnDrop(columnStatic, 0L).build(); + + final AtomicInteger generation = new AtomicInteger(); + File dir = Files.createTempDir(); + try + { - BiFunction<CFMetaData, Function<ByteBuffer, Clustering>, Callable<Descriptor>> writer = (schema, clusteringFunction) -> () -> { - Descriptor descriptor = new Descriptor(BigFormat.latestVersion, dir, schema.ksName, schema.cfName, generation.incrementAndGet(), SSTableFormat.Type.BIG, Component.DIGEST_CRC32); ++ BiFunction<TableMetadata, Function<ByteBuffer, Clustering>, Callable<Descriptor>> writer = (schema, clusteringFunction) -> () -> { ++ Descriptor descriptor = new Descriptor(BigFormat.latestVersion, dir, schema.keyspace, schema.name, generation.incrementAndGet(), SSTableFormat.Type.BIG); + + SerializationHeader header = SerializationHeader.makeWithoutStats(schema); + try (LifecycleTransaction txn = LifecycleTransaction.offline(OperationType.WRITE); - SSTableWriter sstableWriter = BigTableWriter.create(schema, descriptor, 1, 0L, 0, header, Collections.emptyList(), txn)) ++ SSTableWriter sstableWriter = BigTableWriter.create(TableMetadataRef.forOfflineTools(schema), descriptor, 1, 0L, null, false, 0, header, Collections.emptyList(), txn)) + { - ColumnDefinition cd = schema.getColumnDefinition(v); ++ ColumnMetadata cd = schema.getColumn(v); + for (int i = 0 ; i < 5 ; ++i) { + final ByteBuffer value = Int32Type.instance.decompose(i); + Cell cell = BufferCell.live(cd, 1L, value); + Clustering clustering = clusteringFunction.apply(value); + Row row = BTreeRow.singleCellRow(clustering, cell); + sstableWriter.append(PartitionUpdate.singleRowUpdate(schema, value, row).unfilteredIterator()); + } + sstableWriter.finish(false); + txn.finish(); + } + return descriptor; + }; + + Descriptor sstableWithRegular = writer.apply(schemaWithRegular, BufferClustering::new).call(); + Descriptor sstableWithStatic = writer.apply(schemaWithStatic, value -> Clustering.STATIC_CLUSTERING).call(); - SSTableReader readerWithStatic = SSTableReader.openNoValidation(sstableWithStatic, schemaWithRegular); - SSTableReader readerWithRegular = SSTableReader.openNoValidation(sstableWithRegular, schemaWithStatic); ++ SSTableReader readerWithStatic = SSTableReader.openNoValidation(sstableWithStatic, TableMetadataRef.forOfflineTools(schemaWithRegular)); ++ SSTableReader readerWithRegular = SSTableReader.openNoValidation(sstableWithRegular, TableMetadataRef.forOfflineTools(schemaWithStatic)); + + try (ISSTableScanner partitions = readerWithStatic.getScanner()) { + for (int i = 0 ; i < 5 ; ++i) + { + UnfilteredRowIterator partition = partitions.next(); + Assert.assertFalse(partition.hasNext()); + long value = Int32Type.instance.compose(partition.staticRow().getCell(columnStatic).value()); + Assert.assertEquals(value, (long)i); + } + Assert.assertFalse(partitions.hasNext()); + } + try (ISSTableScanner partitions = readerWithRegular.getScanner()) { + for (int i = 0 ; i < 5 ; ++i) + { + UnfilteredRowIterator partition = partitions.next(); + long value = Int32Type.instance.compose(((Row)partition.next()).getCell(columnRegular).value()); + Assert.assertEquals(value, (long)i); + Assert.assertTrue(partition.staticRow().isEmpty()); + Assert.assertFalse(partition.hasNext()); + } + Assert.assertFalse(partitions.hasNext()); + } + } + finally + { + FileUtils.deleteRecursive(dir); + } + } + + } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org