This is an automated email from the ASF dual-hosted git repository. ifesdjeen pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit 188997026e4a4620538f8c4d519cf38cc160b7b3 Merge: 2ff4569 2a6e4df Author: Alex Petrov <oleksandr.pet...@gmail.com> AuthorDate: Mon Mar 15 12:45:41 2021 +0100 Merge branch 'cassandra-3.11' into trunk .../org/apache/cassandra/db/ColumnFamilyStore.java | 4 +- .../cassandra/io/sstable/format/SSTableWriter.java | 2 +- .../io/sstable/format/big/BigTableWriter.java | 2 +- .../apache/cassandra/schema/TableMetadataRef.java | 21 ++++++++ .../cassandra/distributed/test/AlterTest.java | 60 ++++++++++++++++++++++ 5 files changed, 85 insertions(+), 4 deletions(-) diff --cc src/java/org/apache/cassandra/db/ColumnFamilyStore.java index bcb96a5,f4e3c1c..b9309fd --- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java +++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java @@@ -330,21 -378,16 +330,21 @@@ public class ColumnFamilyStore implemen public Map<String,String> getCompressionParameters() { - return metadata().params.compression.asMap(); - return metadata.compressionParams().asMap(); ++ return metadata.getLocal().params.compression.asMap(); + } + + public String getCompressionParametersJson() + { + return FBUtilities.json(getCompressionParameters()); } public void setCompressionParameters(Map<String,String> opts) { try { - CompressionParams newParams = CompressionParams.fromMap(opts); - newParams.validate(); - metadata.setLocalCompressionParams(newParams); + CompressionParams params = CompressionParams.fromMap(opts); + params.validate(); - throw new UnsupportedOperationException(); // TODO FIXME CASSANDRA-12949 ++ metadata.setLocalOverrides(metadata().unbuild().compression(params).build()); } catch (ConfigurationException e) { diff --cc src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java index cce5378,83a07fd..43c50c5 --- a/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java +++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java @@@ -84,14 -80,12 +84,14 @@@ public abstract class SSTableWriter ext SerializationHeader header, Collection<SSTableFlushObserver> observers) { - super(descriptor, components(metadata.get()), metadata, DatabaseDescriptor.getDiskOptimizationStrategy()); - super(descriptor, components(metadata), metadata, DatabaseDescriptor.getDiskOptimizationStrategy()); ++ super(descriptor, components(metadata.getLocal()), metadata, DatabaseDescriptor.getDiskOptimizationStrategy()); this.keyCount = keyCount; this.repairedAt = repairedAt; + this.pendingRepair = pendingRepair; + this.isTransient = isTransient; this.metadataCollector = metadataCollector; - this.header = header != null ? header : SerializationHeader.makeWithoutStats(metadata); //null header indicates streaming from pre-3.0 sstable - this.rowIndexEntrySerializer = descriptor.version.getSSTableFormat().getIndexSerializer(metadata, descriptor.version, header); + this.header = header; + this.rowIndexEntrySerializer = descriptor.version.getSSTableFormat().getIndexSerializer(metadata.get(), descriptor.version, header); this.observers = observers == null ? Collections.emptySet() : observers; } diff --cc src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java index 609ca60,b0d9fb1..eeb9153 --- a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java +++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java @@@ -108,45 -104,6 +108,45 @@@ public class BigTableWriter extends SST columnIndexWriter = new ColumnIndex(this.header, dataFile, descriptor.version, this.observers, getRowIndexEntrySerializer().indexInfoSerializer()); } + /** + * Given an OpType, determine the correct Compression Parameters + * @param opType + * @return {@link org.apache.cassandra.schema.CompressionParams} + */ + private CompressionParams compressionFor(final OperationType opType) + { - CompressionParams compressionParams = metadata().params.compression; ++ CompressionParams compressionParams = metadata.getLocal().params.compression; + final ICompressor compressor = compressionParams.getSstableCompressor(); + + if (null != compressor && opType == OperationType.FLUSH) + { + // When we are flushing out of the memtable throughput of the compressor is critical as flushes, + // especially of large tables, can queue up and potentially block writes. + // This optimization allows us to fall back to a faster compressor if a particular + // compression algorithm indicates we should. See CASSANDRA-15379 for more details. + switch (DatabaseDescriptor.getFlushCompression()) + { + // It is relatively easier to insert a Noop compressor than to disable compressed writing + // entirely as the "compression" member field is provided outside the scope of this class. + // It may make sense in the future to refactor the ownership of the compression flag so that + // We can bypass the CompressedSequentialWriter in this case entirely. + case none: + compressionParams = CompressionParams.NOOP; + break; + case fast: + if (!compressor.recommendedUses().contains(ICompressor.Uses.FAST_COMPRESSION)) + { + // The default compressor is generally fast (LZ4 with 16KiB block size) + compressionParams = CompressionParams.DEFAULT; + break; + } + case table: + default: + } + } + return compressionParams; + } + public void mark() { dataMark = dataFile.mark(); diff --cc src/java/org/apache/cassandra/schema/TableMetadataRef.java index 3c45594,0000000..3325510 mode 100644,000000..100644 --- a/src/java/org/apache/cassandra/schema/TableMetadataRef.java +++ b/src/java/org/apache/cassandra/schema/TableMetadataRef.java @@@ -1,78 -1,0 +1,99 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.schema; + +import org.github.jamm.Unmetered; + +/** + * Encapsulates a volatile reference to an immutable {@link TableMetadata} instance. + * + * Used in classes that need up-to-date metadata to avoid the cost of looking up {@link Schema} hashmaps. + */ +@Unmetered +public final class TableMetadataRef +{ + public final TableId id; + public final String keyspace; + public final String name; + + private volatile TableMetadata metadata; ++ private volatile TableMetadata localTableMetadata; ++ private volatile CompressionParams localCompressionParams; + + TableMetadataRef(TableMetadata metadata) + { + this.metadata = metadata; + + id = metadata.id; + keyspace = metadata.keyspace; + name = metadata.name; + } + + /** + * Create a new ref to the passed {@link TableMetadata} for use by offline tools only. + * + * @param metadata {@link TableMetadata} to reference + * @return a new TableMetadataRef instance linking to the passed {@link TableMetadata} + */ + public static TableMetadataRef forOfflineTools(TableMetadata metadata) + { + return new TableMetadataRef(metadata); + } + + public TableMetadata get() + { + return metadata; + } + + /** ++ * Returns node-local table metadata ++ */ ++ public TableMetadata getLocal() ++ { ++ if (this.localTableMetadata != null) ++ return localTableMetadata; ++ ++ return metadata; ++ } ++ ++ /** + * Update the reference with the most current version of {@link TableMetadata} + * <p> + * Must only be used by methods in {@link Schema}, *DO NOT* make public + * even for testing purposes, it isn't safe. + */ + void set(TableMetadata metadata) + { + metadata.validateCompatibility(get()); + this.metadata = metadata; ++ this.localTableMetadata = null; ++ } ++ ++ ++ public void setLocalOverrides(TableMetadata metadata) ++ { ++ metadata.validateCompatibility(get()); ++ this.localTableMetadata = metadata; + } + + @Override + public String toString() + { + return get().toString(); + } +} diff --cc test/distributed/org/apache/cassandra/distributed/test/AlterTest.java index 0000000,685adb9..b7cb8b4 mode 000000,100644..100644 --- a/test/distributed/org/apache/cassandra/distributed/test/AlterTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/AlterTest.java @@@ -1,0 -1,60 +1,60 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + package org.apache.cassandra.distributed.test; + + import org.junit.Assert; + import org.junit.Test; + + import org.apache.cassandra.db.Keyspace; + import org.apache.cassandra.distributed.api.ICluster; + import org.apache.cassandra.distributed.api.IInvokableInstance; + import org.apache.cassandra.distributed.api.IIsolatedExecutor; + + public class AlterTest extends TestBaseImpl + { + @Test + public void getAndSetCompressionParametersTest() throws Throwable + { + try (ICluster<IInvokableInstance> cluster = init(builder().withNodes(2) + .start())) + { + cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck));"); + cluster.stream().forEach((i) -> { + i.acceptsOnInstance((IIsolatedExecutor.SerializableConsumer<String>) (ks) -> { + Keyspace.open(ks) + .getColumnFamilyStore("tbl") + .setCompressionParametersJson("{\"chunk_length_in_kb\": \"128\"," + - " \"class\": \"org.apache.cassandra.io.compress.LZ4Compressor\"}"); ++ " \"class\": \"org.apache.cassandra.io.compress.LZ4Compressor\"}"); + Assert.assertTrue(Keyspace.open(ks) + .getColumnFamilyStore("tbl") + .getCompressionParametersJson().contains("128")); + }).accept(KEYSPACE); + }); + cluster.schemaChange("ALTER TABLE " + KEYSPACE + ".tbl ADD v2 int"); + + cluster.stream().forEach((i) -> { + i.acceptsOnInstance((IIsolatedExecutor.SerializableConsumer<String>) (ks) -> { + Assert.assertFalse(Keyspace.open(ks) + .getColumnFamilyStore("tbl") + .getCompressionParametersJson().contains("128")); + }).accept(KEYSPACE); + }); + } + } -} ++} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org