This is an automated email from the ASF dual-hosted git repository.

ifesdjeen pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit 188997026e4a4620538f8c4d519cf38cc160b7b3
Merge: 2ff4569 2a6e4df
Author: Alex Petrov <oleksandr.pet...@gmail.com>
AuthorDate: Mon Mar 15 12:45:41 2021 +0100

    Merge branch 'cassandra-3.11' into trunk

 .../org/apache/cassandra/db/ColumnFamilyStore.java |  4 +-
 .../cassandra/io/sstable/format/SSTableWriter.java |  2 +-
 .../io/sstable/format/big/BigTableWriter.java      |  2 +-
 .../apache/cassandra/schema/TableMetadataRef.java  | 21 ++++++++
 .../cassandra/distributed/test/AlterTest.java      | 60 ++++++++++++++++++++++
 5 files changed, 85 insertions(+), 4 deletions(-)

diff --cc src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index bcb96a5,f4e3c1c..b9309fd
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@@ -330,21 -378,16 +330,21 @@@ public class ColumnFamilyStore implemen
  
      public Map<String,String> getCompressionParameters()
      {
-         return metadata().params.compression.asMap();
 -        return metadata.compressionParams().asMap();
++        return metadata.getLocal().params.compression.asMap();
 +    }
 +
 +    public String getCompressionParametersJson()
 +    {
 +        return FBUtilities.json(getCompressionParameters());
      }
  
      public void setCompressionParameters(Map<String,String> opts)
      {
          try
          {
 -            CompressionParams newParams = CompressionParams.fromMap(opts);
 -            newParams.validate();
 -            metadata.setLocalCompressionParams(newParams);
 +            CompressionParams params = CompressionParams.fromMap(opts);
 +            params.validate();
-             throw new UnsupportedOperationException(); // TODO FIXME 
CASSANDRA-12949
++            
metadata.setLocalOverrides(metadata().unbuild().compression(params).build());
          }
          catch (ConfigurationException e)
          {
diff --cc src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java
index cce5378,83a07fd..43c50c5
--- a/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java
@@@ -84,14 -80,12 +84,14 @@@ public abstract class SSTableWriter ext
                              SerializationHeader header,
                              Collection<SSTableFlushObserver> observers)
      {
-         super(descriptor, components(metadata.get()), metadata, 
DatabaseDescriptor.getDiskOptimizationStrategy());
 -        super(descriptor, components(metadata), metadata, 
DatabaseDescriptor.getDiskOptimizationStrategy());
++        super(descriptor, components(metadata.getLocal()), metadata, 
DatabaseDescriptor.getDiskOptimizationStrategy());
          this.keyCount = keyCount;
          this.repairedAt = repairedAt;
 +        this.pendingRepair = pendingRepair;
 +        this.isTransient = isTransient;
          this.metadataCollector = metadataCollector;
 -        this.header = header != null ? header : 
SerializationHeader.makeWithoutStats(metadata); //null header indicates 
streaming from pre-3.0 sstable
 -        this.rowIndexEntrySerializer = 
descriptor.version.getSSTableFormat().getIndexSerializer(metadata, 
descriptor.version, header);
 +        this.header = header;
 +        this.rowIndexEntrySerializer = 
descriptor.version.getSSTableFormat().getIndexSerializer(metadata.get(), 
descriptor.version, header);
          this.observers = observers == null ? Collections.emptySet() : 
observers;
      }
  
diff --cc 
src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
index 609ca60,b0d9fb1..eeb9153
--- a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
@@@ -108,45 -104,6 +108,45 @@@ public class BigTableWriter extends SST
          columnIndexWriter = new ColumnIndex(this.header, dataFile, 
descriptor.version, this.observers, 
getRowIndexEntrySerializer().indexInfoSerializer());
      }
  
 +    /**
 +     * Given an OpType, determine the correct Compression Parameters
 +     * @param opType
 +     * @return {@link org.apache.cassandra.schema.CompressionParams}
 +     */
 +    private CompressionParams compressionFor(final OperationType opType)
 +    {
-         CompressionParams compressionParams = metadata().params.compression;
++        CompressionParams compressionParams = 
metadata.getLocal().params.compression;
 +        final ICompressor compressor = 
compressionParams.getSstableCompressor();
 +
 +        if (null != compressor && opType == OperationType.FLUSH)
 +        {
 +            // When we are flushing out of the memtable throughput of the 
compressor is critical as flushes,
 +            // especially of large tables, can queue up and potentially block 
writes.
 +            // This optimization allows us to fall back to a faster 
compressor if a particular
 +            // compression algorithm indicates we should. See CASSANDRA-15379 
for more details.
 +            switch (DatabaseDescriptor.getFlushCompression())
 +            {
 +                // It is relatively easier to insert a Noop compressor than 
to disable compressed writing
 +                // entirely as the "compression" member field is provided 
outside the scope of this class.
 +                // It may make sense in the future to refactor the ownership 
of the compression flag so that
 +                // We can bypass the CompressedSequentialWriter in this case 
entirely.
 +                case none:
 +                    compressionParams = CompressionParams.NOOP;
 +                    break;
 +                case fast:
 +                    if 
(!compressor.recommendedUses().contains(ICompressor.Uses.FAST_COMPRESSION))
 +                    {
 +                        // The default compressor is generally fast (LZ4 with 
16KiB block size)
 +                        compressionParams = CompressionParams.DEFAULT;
 +                        break;
 +                    }
 +                case table:
 +                default:
 +            }
 +        }
 +        return compressionParams;
 +    }
 +
      public void mark()
      {
          dataMark = dataFile.mark();
diff --cc src/java/org/apache/cassandra/schema/TableMetadataRef.java
index 3c45594,0000000..3325510
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/schema/TableMetadataRef.java
+++ b/src/java/org/apache/cassandra/schema/TableMetadataRef.java
@@@ -1,78 -1,0 +1,99 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.schema;
 +
 +import org.github.jamm.Unmetered;
 +
 +/**
 + * Encapsulates a volatile reference to an immutable {@link TableMetadata} 
instance.
 + *
 + * Used in classes that need up-to-date metadata to avoid the cost of looking 
up {@link Schema} hashmaps.
 + */
 +@Unmetered
 +public final class TableMetadataRef
 +{
 +    public final TableId id;
 +    public final String keyspace;
 +    public final String name;
 +
 +    private volatile TableMetadata metadata;
++    private volatile TableMetadata localTableMetadata;
++    private volatile CompressionParams localCompressionParams;
 +
 +    TableMetadataRef(TableMetadata metadata)
 +    {
 +        this.metadata = metadata;
 +
 +        id = metadata.id;
 +        keyspace = metadata.keyspace;
 +        name = metadata.name;
 +    }
 +
 +    /**
 +     * Create a new ref to the passed {@link TableMetadata} for use by 
offline tools only.
 +     *
 +     * @param metadata {@link TableMetadata} to reference
 +     * @return a new TableMetadataRef instance linking to the passed {@link 
TableMetadata}
 +     */
 +    public static TableMetadataRef forOfflineTools(TableMetadata metadata)
 +    {
 +        return new TableMetadataRef(metadata);
 +    }
 +
 +    public TableMetadata get()
 +    {
 +        return metadata;
 +    }
 +
 +    /**
++     * Returns node-local table metadata
++     */
++    public TableMetadata getLocal()
++    {
++        if (this.localTableMetadata != null)
++            return localTableMetadata;
++
++        return metadata;
++    }
++
++    /**
 +     * Update the reference with the most current version of {@link 
TableMetadata}
 +     * <p>
 +     * Must only be used by methods in {@link Schema}, *DO NOT* make public
 +     * even for testing purposes, it isn't safe.
 +     */
 +    void set(TableMetadata metadata)
 +    {
 +        metadata.validateCompatibility(get());
 +        this.metadata = metadata;
++        this.localTableMetadata = null;
++    }
++
++
++    public void setLocalOverrides(TableMetadata metadata)
++    {
++        metadata.validateCompatibility(get());
++        this.localTableMetadata = metadata;
 +    }
 +
 +    @Override
 +    public String toString()
 +    {
 +        return get().toString();
 +    }
 +}
diff --cc test/distributed/org/apache/cassandra/distributed/test/AlterTest.java
index 0000000,685adb9..b7cb8b4
mode 000000,100644..100644
--- a/test/distributed/org/apache/cassandra/distributed/test/AlterTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/AlterTest.java
@@@ -1,0 -1,60 +1,60 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ 
+ package org.apache.cassandra.distributed.test;
+ 
+ import org.junit.Assert;
+ import org.junit.Test;
+ 
+ import org.apache.cassandra.db.Keyspace;
+ import org.apache.cassandra.distributed.api.ICluster;
+ import org.apache.cassandra.distributed.api.IInvokableInstance;
+ import org.apache.cassandra.distributed.api.IIsolatedExecutor;
+ 
+ public class AlterTest extends TestBaseImpl
+ {
+     @Test
+     public void getAndSetCompressionParametersTest() throws Throwable
+     {
+         try (ICluster<IInvokableInstance> cluster = 
init(builder().withNodes(2)
+                                                                   .start()))
+         {
+             cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, 
ck int, v int, PRIMARY KEY (pk, ck));");
+             cluster.stream().forEach((i) -> {
+                 
i.acceptsOnInstance((IIsolatedExecutor.SerializableConsumer<String>) (ks) -> {
+                     Keyspace.open(ks)
+                             .getColumnFamilyStore("tbl")
+                             
.setCompressionParametersJson("{\"chunk_length_in_kb\": \"128\"," +
 -                                                          " \"class\": 
\"org.apache.cassandra.io.compress.LZ4Compressor\"}");
++                                                          "  \"class\": 
\"org.apache.cassandra.io.compress.LZ4Compressor\"}");
+                     Assert.assertTrue(Keyspace.open(ks)
+                                               .getColumnFamilyStore("tbl")
+                                               
.getCompressionParametersJson().contains("128"));
+                 }).accept(KEYSPACE);
+             });
+             cluster.schemaChange("ALTER TABLE " + KEYSPACE + ".tbl ADD v2 
int");
+ 
+             cluster.stream().forEach((i) -> {
+                 
i.acceptsOnInstance((IIsolatedExecutor.SerializableConsumer<String>) (ks) -> {
+                     Assert.assertFalse(Keyspace.open(ks)
+                                                .getColumnFamilyStore("tbl")
+                                                
.getCompressionParametersJson().contains("128"));
+                 }).accept(KEYSPACE);
+             });
+         }
+     }
 -}
++}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to