Merge branch 'cassandra-3.11' into trunk

Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/2ee9d685
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/2ee9d685
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/2ee9d685

Branch: refs/heads/trunk
Commit: 2ee9d6854bd82608cd36eb3fb606108e84714932
Parents: 9a7db29 1084ad9
Author: Benedict Elliott Smith <bened...@apache.org>
Authored: Thu Nov 29 14:54:21 2018 +0000
Committer: Benedict Elliott Smith <bened...@apache.org>
Committed: Thu Nov 29 14:54:21 2018 +0000

----------------------------------------------------------------------
 CHANGES.txt                                     |   2 +
 src/java/org/apache/cassandra/db/Columns.java   |   2 +
 .../cassandra/db/SerializationHeader.java       |  44 +++---
 .../org/apache/cassandra/db/ColumnsTest.java    |  29 +++-
 .../cassandra/db/SerializationHeaderTest.java   | 145 +++++++++++++++++++
 5 files changed, 203 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/2ee9d685/CHANGES.txt
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2ee9d685/src/java/org/apache/cassandra/db/Columns.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2ee9d685/src/java/org/apache/cassandra/db/SerializationHeader.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/SerializationHeader.java
index 4266629,8e2844b..deadf68
--- a/src/java/org/apache/cassandra/db/SerializationHeader.java
+++ b/src/java/org/apache/cassandra/db/SerializationHeader.java
@@@ -21,20 -21,21 +21,21 @@@ import java.io.IOException
  import java.nio.ByteBuffer;
  import java.util.*;
  
+ import com.google.common.collect.ImmutableList;
 -import org.apache.cassandra.config.CFMetaData;
 -import org.apache.cassandra.config.ColumnDefinition;
  import org.apache.cassandra.db.filter.ColumnFilter;
 -import org.apache.cassandra.db.rows.*;
  import org.apache.cassandra.db.marshal.AbstractType;
 -import org.apache.cassandra.db.marshal.UTF8Type;
  import org.apache.cassandra.db.marshal.TypeParser;
 +import org.apache.cassandra.db.marshal.UTF8Type;
 +import org.apache.cassandra.db.rows.*;
  import org.apache.cassandra.io.sstable.format.SSTableReader;
  import org.apache.cassandra.io.sstable.format.Version;
 -import org.apache.cassandra.io.sstable.metadata.MetadataType;
 -import org.apache.cassandra.io.sstable.metadata.MetadataComponent;
  import org.apache.cassandra.io.sstable.metadata.IMetadataComponentSerializer;
 +import org.apache.cassandra.io.sstable.metadata.MetadataComponent;
 +import org.apache.cassandra.io.sstable.metadata.MetadataType;
  import org.apache.cassandra.io.util.DataInputPlus;
  import org.apache.cassandra.io.util.DataOutputPlus;
 +import org.apache.cassandra.schema.ColumnMetadata;
 +import org.apache.cassandra.schema.TableMetadata;
  import org.apache.cassandra.utils.ByteBufferUtil;
  
  public class SerializationHeader
@@@ -291,33 -295,40 +292,40 @@@
              return MetadataType.HEADER;
          }
  
 -        public SerializationHeader toHeader(CFMetaData metadata)
 +        public SerializationHeader toHeader(TableMetadata metadata)
          {
              Map<ByteBuffer, AbstractType<?>> typeMap = new 
HashMap<>(staticColumns.size() + regularColumns.size());
-             typeMap.putAll(staticColumns);
-             typeMap.putAll(regularColumns);
  
 -            PartitionColumns.Builder builder = PartitionColumns.builder();
 +            RegularAndStaticColumns.Builder builder = 
RegularAndStaticColumns.builder();
-             for (ByteBuffer name : typeMap.keySet())
+             for (Map<ByteBuffer, AbstractType<?>> map : 
ImmutableList.of(staticColumns, regularColumns))
              {
-                 ColumnMetadata column = metadata.getColumn(name);
-                 if (column == null)
+                 boolean isStatic = map == staticColumns;
+                 for (Map.Entry<ByteBuffer, AbstractType<?>> e : 
map.entrySet())
                  {
-                     // TODO: this imply we don't read data for a column we 
don't yet know about, which imply this is theoretically
-                     // racy with column addition. Currently, it is up to the 
user to not write data before the schema has propagated
-                     // and this is far from being the only place that has 
such problem in practice. This doesn't mean we shouldn't
-                     // improve this.
- 
-                     // If we don't find the definition, it could be we have 
data for a dropped column, and we shouldn't
-                     // fail deserialization because of that. So we grab a 
"fake" ColumnMetadata that ensure proper
-                     // deserialization. The column will be ignore later on 
anyway.
-                     boolean isStatic = staticColumns.containsKey(name);
-                     column = metadata.getDroppedColumn(name, isStatic);
-                     if (column == null)
-                         throw new RuntimeException("Unknown column " + 
UTF8Type.instance.getString(name) + " during deserialization");
+                     ByteBuffer name = e.getKey();
+                     AbstractType<?> other = typeMap.put(name, e.getValue());
+                     if (other != null && !other.equals(e.getValue()))
+                         throw new IllegalStateException("Column " + name + " 
occurs as both regular and static with types " + other + "and " + e.getValue());
+ 
 -                    ColumnDefinition column = 
metadata.getColumnDefinition(name);
++                    ColumnMetadata column = metadata.getColumn(name);
+                     if (column == null || column.isStatic() != isStatic)
+                     {
+                         // TODO: this imply we don't read data for a column 
we don't yet know about, which imply this is theoretically
+                         // racy with column addition. Currently, it is up to 
the user to not write data before the schema has propagated
+                         // and this is far from being the only place that has 
such problem in practice. This doesn't mean we shouldn't
+                         // improve this.
+ 
+                         // If we don't find the definition, it could be we 
have data for a dropped column, and we shouldn't
+                         // fail deserialization because of that. So we grab a 
"fake" ColumnDefinition that ensure proper
+                         // deserialization. The column will be ignore later 
on anyway.
 -                        column = metadata.getDroppedColumnDefinition(name, 
isStatic);
++                        column = metadata.getDroppedColumn(name, isStatic);
+                         if (column == null)
+                             throw new RuntimeException("Unknown column " + 
UTF8Type.instance.getString(name) + " during deserialization");
+                     }
+                     builder.add(column);
                  }
-                 builder.add(column);
              }
+ 
              return new SerializationHeader(true, keyType, clusteringTypes, 
builder.build(), stats, typeMap);
          }
  

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2ee9d685/test/unit/org/apache/cassandra/db/ColumnsTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/ColumnsTest.java
index dae0d0a,1f34c88..a5d267e
--- a/test/unit/org/apache/cassandra/db/ColumnsTest.java
+++ b/test/unit/org/apache/cassandra/db/ColumnsTest.java
@@@ -51,8 -53,33 +53,33 @@@ public class ColumnsTes
          DatabaseDescriptor.daemonInitialization();
      }
  
 -    private static final CFMetaData cfMetaData = MockSchema.newCFS().metadata;
 +    private static final TableMetadata TABLE_METADATA = 
MockSchema.newCFS().metadata();
  
+     @Test
+     public void testDeserializeCorruption() throws IOException
+     {
+         ColumnsCheck check = randomSmall(1, 0, 3, 0);
+         Columns superset = check.columns;
 -        List<ColumnDefinition> minus1 = new ArrayList<>(check.definitions);
++        List<ColumnMetadata> minus1 = new ArrayList<>(check.definitions);
+         minus1.remove(3);
+         Columns minus2 = check.columns
+                 .without(check.columns.getSimple(3))
+                 .without(check.columns.getSimple(2));
+         try (DataOutputBuffer out = new DataOutputBuffer())
+         {
+             // serialize a subset
+             Columns.serializer.serializeSubset(minus1, superset, out);
+             try (DataInputBuffer in = new DataInputBuffer(out.toByteArray()))
+             {
+                 Columns.serializer.deserializeSubset(minus2, in);
+                 Assert.assertFalse(true);
+             }
+             catch (IOException e)
+             {
+             }
+         }
+     }
+ 
      // this tests most of our functionality, since each subset we perform
      // reasonably comprehensive tests of basic functionality against
      @Test
@@@ -451,34 -478,33 +478,34 @@@
      {
          int i = 0;
          for (String name : names)
 -            results.add(ColumnDefinition.clusteringDef(cfMetaData, 
bytes(name), UTF8Type.instance, i++));
 +            results.add(ColumnMetadata.clusteringColumn(TABLE_METADATA, 
bytes(name), UTF8Type.instance, i++));
      }
  
 -    private static void addRegular(List<String> names, List<ColumnDefinition> 
results)
 +    private static void addRegular(List<String> names, List<ColumnMetadata> 
results)
      {
          for (String name : names)
 -            results.add(ColumnDefinition.regularDef(cfMetaData, bytes(name), 
UTF8Type.instance));
 +            results.add(ColumnMetadata.regularColumn(TABLE_METADATA, 
bytes(name), UTF8Type.instance));
      }
  
-     private static <V> void addComplex(List<String> names, 
List<ColumnMetadata> results)
 -    private static void addComplex(List<String> names, List<ColumnDefinition> 
results)
++    private static void addComplex(List<String> names, List<ColumnMetadata> 
results)
      {
          for (String name : names)
 -            results.add(ColumnDefinition.regularDef(cfMetaData, bytes(name), 
SetType.getInstance(UTF8Type.instance, true)));
 +            results.add(ColumnMetadata.regularColumn(TABLE_METADATA, 
bytes(name), SetType.getInstance(UTF8Type.instance, true)));
      }
  
 -    private static ColumnDefinition def(String name, AbstractType<?> type, 
ColumnDefinition.Kind kind)
 +    private static ColumnMetadata def(String name, AbstractType<?> type, 
ColumnMetadata.Kind kind)
      {
 -        return new ColumnDefinition(cfMetaData, bytes(name), type, 
ColumnDefinition.NO_POSITION, kind);
 +        return new ColumnMetadata(TABLE_METADATA, bytes(name), type, 
ColumnMetadata.NO_POSITION, kind);
      }
  
 -    private static CFMetaData mock(Columns columns)
 +    private static TableMetadata mock(Columns columns)
      {
          if (columns.isEmpty())
 -            return cfMetaData;
 -        CFMetaData.Builder builder = 
CFMetaData.Builder.create(cfMetaData.ksName, cfMetaData.cfName);
 +            return TABLE_METADATA;
 +
 +        TableMetadata.Builder builder = 
TableMetadata.builder(TABLE_METADATA.keyspace, TABLE_METADATA.name);
          boolean hasPartitionKey = false;
 -        for (ColumnDefinition def : columns)
 +        for (ColumnMetadata def : columns)
          {
              switch (def.kind)
              {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2ee9d685/test/unit/org/apache/cassandra/db/SerializationHeaderTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/SerializationHeaderTest.java
index 0000000,84fb51c..1092a90
mode 000000,100644..100644
--- a/test/unit/org/apache/cassandra/db/SerializationHeaderTest.java
+++ b/test/unit/org/apache/cassandra/db/SerializationHeaderTest.java
@@@ -1,0 -1,136 +1,145 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ 
+ package org.apache.cassandra.db;
+ 
+ import com.google.common.io.Files;
 -import org.apache.cassandra.config.CFMetaData;
 -import org.apache.cassandra.config.ColumnDefinition;
+ import org.apache.cassandra.config.DatabaseDescriptor;
+ import org.apache.cassandra.cql3.ColumnIdentifier;
+ import org.apache.cassandra.db.compaction.OperationType;
+ import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
+ import org.apache.cassandra.db.marshal.Int32Type;
+ import org.apache.cassandra.db.partitions.PartitionUpdate;
+ import org.apache.cassandra.db.rows.BTreeRow;
+ import org.apache.cassandra.db.rows.BufferCell;
+ import org.apache.cassandra.db.rows.Cell;
+ import org.apache.cassandra.db.rows.Row;
+ import org.apache.cassandra.db.rows.UnfilteredRowIterator;
+ import org.apache.cassandra.io.sstable.Component;
+ import org.apache.cassandra.io.sstable.Descriptor;
+ import org.apache.cassandra.io.sstable.ISSTableScanner;
+ import org.apache.cassandra.io.sstable.format.SSTableFormat;
+ import org.apache.cassandra.io.sstable.format.SSTableReader;
+ import org.apache.cassandra.io.sstable.format.SSTableWriter;
+ import org.apache.cassandra.io.sstable.format.big.BigFormat;
+ import org.apache.cassandra.io.sstable.format.big.BigTableWriter;
+ import org.apache.cassandra.io.util.FileUtils;
++import org.apache.cassandra.schema.ColumnMetadata;
++import org.apache.cassandra.schema.TableMetadata;
++import org.apache.cassandra.schema.TableMetadataRef;
+ import org.junit.Assert;
+ import org.junit.Test;
+ 
+ import java.io.File;
+ import java.nio.ByteBuffer;
+ import java.util.Collections;
+ import java.util.concurrent.Callable;
+ import java.util.concurrent.atomic.AtomicInteger;
+ import java.util.function.BiFunction;
+ import java.util.function.Function;
+ 
+ public class SerializationHeaderTest
+ {
+     private static String KEYSPACE = "SerializationHeaderTest";
+ 
+     static
+     {
+         DatabaseDescriptor.daemonInitialization();
+     }
+     
+     @Test
+     public void testWrittenAsDifferentKind() throws Exception
+     {
+         final String tableName = "testWrittenAsDifferentKind";
 -        final String schemaCqlWithStatic = String.format("CREATE TABLE %s (k 
int, c int, v int static, PRIMARY KEY(k, c))", tableName);
 -        final String schemaCqlWithRegular = String.format("CREATE TABLE %s (k 
int, c int, v int, PRIMARY KEY(k, c))", tableName);
++//        final String schemaCqlWithStatic = String.format("CREATE TABLE %s 
(k int, c int, v int static, PRIMARY KEY(k, c))", tableName);
++//        final String schemaCqlWithRegular = String.format("CREATE TABLE %s 
(k int, c int, v int, PRIMARY KEY(k, c))", tableName);
+         ColumnIdentifier v = ColumnIdentifier.getInterned("v", false);
 -        CFMetaData schemaWithStatic = CFMetaData.compile(schemaCqlWithStatic, 
KEYSPACE);
 -        CFMetaData schemaWithRegular = 
CFMetaData.compile(schemaCqlWithRegular, KEYSPACE);
 -        ColumnDefinition columnStatic = 
schemaWithStatic.getColumnDefinition(v);
 -        ColumnDefinition columnRegular = 
schemaWithRegular.getColumnDefinition(v);
 -        schemaWithStatic.recordColumnDrop(columnRegular, 0L);
 -        schemaWithRegular.recordColumnDrop(columnStatic, 0L);
++        TableMetadata schemaWithStatic = TableMetadata.builder(KEYSPACE, 
tableName)
++                .addPartitionKeyColumn("k", Int32Type.instance)
++                .addClusteringColumn("c", Int32Type.instance)
++                .addStaticColumn("v", Int32Type.instance)
++                .build();
++        TableMetadata schemaWithRegular = TableMetadata.builder(KEYSPACE, 
tableName)
++                .addPartitionKeyColumn("k", Int32Type.instance)
++                .addClusteringColumn("c", Int32Type.instance)
++                .addRegularColumn("v", Int32Type.instance)
++                .build();
++        ColumnMetadata columnStatic = schemaWithStatic.getColumn(v);
++        ColumnMetadata columnRegular = schemaWithRegular.getColumn(v);
++        schemaWithStatic = 
schemaWithStatic.unbuild().recordColumnDrop(columnRegular, 0L).build();
++        schemaWithRegular = 
schemaWithRegular.unbuild().recordColumnDrop(columnStatic, 0L).build();
+ 
+         final AtomicInteger generation = new AtomicInteger();
+         File dir = Files.createTempDir();
+         try
+         {
 -            BiFunction<CFMetaData, Function<ByteBuffer, Clustering>, 
Callable<Descriptor>> writer = (schema, clusteringFunction) -> () -> {
 -                Descriptor descriptor = new 
Descriptor(BigFormat.latestVersion, dir, schema.ksName, schema.cfName, 
generation.incrementAndGet(), SSTableFormat.Type.BIG, Component.DIGEST_CRC32);
++            BiFunction<TableMetadata, Function<ByteBuffer, Clustering>, 
Callable<Descriptor>> writer = (schema, clusteringFunction) -> () -> {
++                Descriptor descriptor = new 
Descriptor(BigFormat.latestVersion, dir, schema.keyspace, schema.name, 
generation.incrementAndGet(), SSTableFormat.Type.BIG);
+ 
+                 SerializationHeader header = 
SerializationHeader.makeWithoutStats(schema);
+                 try (LifecycleTransaction txn = 
LifecycleTransaction.offline(OperationType.WRITE);
 -                     SSTableWriter sstableWriter = 
BigTableWriter.create(schema, descriptor, 1, 0L, 0, header, 
Collections.emptyList(),  txn))
++                     SSTableWriter sstableWriter = 
BigTableWriter.create(TableMetadataRef.forOfflineTools(schema), descriptor, 1, 
0L, null, false, 0, header, Collections.emptyList(),  txn))
+                 {
 -                    ColumnDefinition cd = schema.getColumnDefinition(v);
++                    ColumnMetadata cd = schema.getColumn(v);
+                     for (int i = 0 ; i < 5 ; ++i) {
+                         final ByteBuffer value = 
Int32Type.instance.decompose(i);
+                         Cell cell = BufferCell.live(cd, 1L, value);
+                         Clustering clustering = 
clusteringFunction.apply(value);
+                         Row row = BTreeRow.singleCellRow(clustering, cell);
+                         
sstableWriter.append(PartitionUpdate.singleRowUpdate(schema, value, 
row).unfilteredIterator());
+                     }
+                     sstableWriter.finish(false);
+                     txn.finish();
+                 }
+                 return descriptor;
+             };
+ 
+             Descriptor sstableWithRegular = writer.apply(schemaWithRegular, 
BufferClustering::new).call();
+             Descriptor sstableWithStatic = writer.apply(schemaWithStatic, 
value -> Clustering.STATIC_CLUSTERING).call();
 -            SSTableReader readerWithStatic = 
SSTableReader.openNoValidation(sstableWithStatic, schemaWithRegular);
 -            SSTableReader readerWithRegular = 
SSTableReader.openNoValidation(sstableWithRegular, schemaWithStatic);
++            SSTableReader readerWithStatic = 
SSTableReader.openNoValidation(sstableWithStatic, 
TableMetadataRef.forOfflineTools(schemaWithRegular));
++            SSTableReader readerWithRegular = 
SSTableReader.openNoValidation(sstableWithRegular, 
TableMetadataRef.forOfflineTools(schemaWithStatic));
+ 
+             try (ISSTableScanner partitions = readerWithStatic.getScanner()) {
+                 for (int i = 0 ; i < 5 ; ++i)
+                 {
+                     UnfilteredRowIterator partition = partitions.next();
+                     Assert.assertFalse(partition.hasNext());
+                     long value = 
Int32Type.instance.compose(partition.staticRow().getCell(columnStatic).value());
+                     Assert.assertEquals(value, (long)i);
+                 }
+                 Assert.assertFalse(partitions.hasNext());
+             }
+             try (ISSTableScanner partitions = readerWithRegular.getScanner()) 
{
+                 for (int i = 0 ; i < 5 ; ++i)
+                 {
+                     UnfilteredRowIterator partition = partitions.next();
+                     long value = 
Int32Type.instance.compose(((Row)partition.next()).getCell(columnRegular).value());
+                     Assert.assertEquals(value, (long)i);
+                     Assert.assertTrue(partition.staticRow().isEmpty());
+                     Assert.assertFalse(partition.hasNext());
+                 }
+                 Assert.assertFalse(partitions.hasNext());
+             }
+         }
+         finally
+         {
+             FileUtils.deleteRecursive(dir);
+         }
+     }
+ 
+ }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to