This is an automated email from the ASF dual-hosted git repository.
smiklosovic pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push:
new 98ec8970e1 Move training parameters for Zstd dictionary compression to
CQL
98ec8970e1 is described below
commit 98ec8970e11c1f0ac4f81528da0746f4ada42066
Author: Stefan Miklosovic <[email protected]>
AuthorDate: Mon Dec 15 18:26:51 2025 +0100
Move training parameters for Zstd dictionary compression to CQL
It is also possible to override them via nodetool if necessary.
This patch also fixes the computation of sampling ratio to not lose the
precision.
patch by Stefan Miklosovic; reviewed by Jyothsna Konisha, Yifan Cai for
CASSANDRA-21078
---
CHANGES.txt | 1 +
conf/cassandra.yaml | 16 +---
conf/cassandra_latest.yaml | 16 +---
.../pages/managing/operating/compression.adoc | 48 ++++++++---
src/java/org/apache/cassandra/config/Config.java | 2 -
.../cassandra/config/DatabaseDescriptor.java | 13 +--
.../db/compression/CompressionDictionary.java | 5 +-
.../compression/CompressionDictionaryManager.java | 89 ++++++++++++++++----
.../CompressionDictionaryManagerMBean.java | 25 +++++-
.../CompressionDictionaryTrainingConfig.java | 9 ++-
.../compression/ICompressionDictionaryTrainer.java | 19 ++---
.../db/compression/ZstdDictionaryTrainer.java | 49 ++++++-----
.../io/compress/IDictionaryCompressor.java | 39 ++++++++-
.../io/compress/ZstdDictionaryCompressor.java | 12 ++-
src/java/org/apache/cassandra/tools/NodeProbe.java | 18 +++--
.../CompressionDictionaryCommandGroup.java | 63 ++++++++++++++-
test/resources/nodetool/help/compressiondictionary | 15 +++-
.../nodetool/help/compressiondictionary$train | 18 ++++-
.../CompressionDictionaryIntegrationTest.java | 70 +++++++++++-----
.../CompressionDictionarySchedulerTest.java | 10 ++-
.../CompressionDictionaryTrainingConfigTest.java | 6 +-
.../db/compression/ZstdDictionaryTrainerTest.java | 94 +++++++++++-----------
.../nodetool/TrainCompressionDictionaryTest.java | 51 ++++++++++++
.../utils/CompressionDictionaryHelper.java | 4 +-
24 files changed, 502 insertions(+), 190 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index 94dedc0593..577b2ee1b5 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
5.1
+ * Move training parameters for Zstd dictionary compression to CQL
(CASSANDRA-21078)
* Add configuration for sorted imports in source files (CASSANDRA-17925)
* Change the eager reference counting of compression dictionaries to lazy
(CASSANDRA-21074)
* Add cursor based optimized compaction path (CASSANDRA-20918)
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index 5736890b51..b207ff289b 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -2923,24 +2923,12 @@ compression_dictionary_cache_size: 10
# Min unit: s
compression_dictionary_cache_expire: 24h
-# Dictionary training configuration (advanced settings)
-# These settings control how compression dictionaries are trained from sample
data.
-
-# Maximum size of a trained compression dictionary.
-# Larger dictionaries may provide better compression but use more memory.
-compression_dictionary_training_max_dictionary_size: 64KiB
-
-# Maximum total size of sample data to collect for dictionary training.
-# More sample data generally produces better dictionaries but takes longer to
train.
-# The recommended sample size is 100x the dictionary size.
-compression_dictionary_training_max_total_sample_size: 10MiB
-
# Enable automatic dictionary training based on sampling of write operations.
# When enabled, the system will automatically collect samples and train new
dictionaries.
# Manual training via nodetool is always available regardless of this setting.
compression_dictionary_training_auto_train_enabled: false
-# Sampling rate for automatic dictionary training (1-10000).
-# Value of 100 means 1% of writes are sampled. Lower values reduce overhead
but may
+# Sampling rate for automatic dictionary training (0.01-1).
+# Value of 0.01 means 1% of writes are sampled. Lower values reduce overhead
but may
# result in less representative sample data for dictionary training.
compression_dictionary_training_sampling_rate: 0.01
diff --git a/conf/cassandra_latest.yaml b/conf/cassandra_latest.yaml
index b43f6cf45c..2e5c8fee6b 100644
--- a/conf/cassandra_latest.yaml
+++ b/conf/cassandra_latest.yaml
@@ -2662,24 +2662,12 @@ compression_dictionary_cache_size: 10
# Min unit: s
compression_dictionary_cache_expire: 24h
-# Dictionary training configuration (advanced settings)
-# These settings control how compression dictionaries are trained from sample
data.
-
-# Maximum size of a trained compression dictionary.
-# Larger dictionaries may provide better compression but use more memory.
-compression_dictionary_training_max_dictionary_size: 64KiB
-
-# Maximum total size of sample data to collect for dictionary training.
-# More sample data generally produces better dictionaries but takes longer to
train.
-# The recommended sample size is 100x the dictionary size.
-compression_dictionary_training_max_total_sample_size: 10MiB
-
# Enable automatic dictionary training based on sampling of write operations.
# When enabled, the system will automatically collect samples and train new
dictionaries.
# Manual training via nodetool is always available regardless of this setting.
compression_dictionary_training_auto_train_enabled: false
-# Sampling rate for automatic dictionary training (1-10000).
-# Value of 100 means 1% of writes are sampled. Lower values reduce overhead
but may
+# Sampling rate for automatic dictionary training (0.01-1).
+# Value of 0.01 means 1% of writes are sampled. Lower values reduce overhead
but may
# result in less representative sample data for dictionary training.
compression_dictionary_training_sampling_rate: 0.01
diff --git a/doc/modules/cassandra/pages/managing/operating/compression.adoc
b/doc/modules/cassandra/pages/managing/operating/compression.adoc
index 22214e046b..ca028aff95 100644
--- a/doc/modules/cassandra/pages/managing/operating/compression.adoc
+++ b/doc/modules/cassandra/pages/managing/operating/compression.adoc
@@ -148,12 +148,12 @@ Enable automatic training in `cassandra.yaml`:
[source,yaml]
----
compression_dictionary_training_auto_train_enabled: true
-compression_dictionary_training_sampling_rate: 100 # 1% of writes
+compression_dictionary_training_sampling_rate: 0.01 # 1% of writes
----
When enabled, Cassandra automatically samples write operations and
trains dictionaries in the background based on the configured sampling
-rate (range: 1-10000, where 100 = 1% of writes).
+rate (range: 0.01-1, where 0.01 = 1% of writes).
=== Dictionary Storage and Distribution
@@ -298,17 +298,11 @@ next access.
=== Training Configuration
-* `compression_dictionary_training_max_dictionary_size` (default: `65536`):
-Maximum size of trained dictionaries in bytes. Larger dictionaries can
-capture more patterns but increase memory overhead.
-* `compression_dictionary_training_max_total_sample_size` (default:
-`10485760`): Maximum total size of sample data to collect for training,
-approximately 10MB.
* `compression_dictionary_training_auto_train_enabled` (default: `false`):
Enable automatic background dictionary training. When enabled, Cassandra
samples writes and trains dictionaries automatically.
-* `compression_dictionary_training_sampling_rate` (default: `100`):
-Sampling rate for automatic training, range 1-10000 where 100 = 1% of
+* `compression_dictionary_training_sampling_rate` (default: `0.01`):
+Sampling rate for automatic training, range 0.01-1 where 0.01 = 1% of
writes. Lower values reduce training overhead but may miss data patterns.
Example configuration:
@@ -323,11 +317,34 @@ compression_dictionary_cache_expire: 3600
# Automatic training
compression_dictionary_training_auto_train_enabled: false
-compression_dictionary_training_sampling_rate: 100
-compression_dictionary_training_max_dictionary_size: 65536
-compression_dictionary_training_max_total_sample_size: 10485760
+compression_dictionary_training_sampling_rate: 0.01
----
+=== CQL training parameters:
+
+These parameters are meant to be configured via CQL for each respective table
if defaults are not appropriate.
+
+* `training_max_total_sample_size` (default: `10MiB`): Maximum total size of
sample data to collect for training, approximately 10MB. This parameter is
configured in the
+table's compression options for `ZstdDictionaryCompressor`.
+* `training_max_dictionary_size` (default: `64KiB`): Maximum size of trained
dictionaries in bytes. Larger dictionaries can capture more patterns but
increase memory overhead. This is a parameter of `ZstdDictionaryCompressor` of
a table, in `compression` section.
+
+Example:
+
+[source,cql]
+----
+ALTER TABLE keyspace.table
+ WITH compression = {
+ 'class': 'ZstdDictionaryCompressor',
+ 'compression_level': '3',
+ 'training_max_total_sample_size': '20MiB',
+ 'training_max_dictionary_size': '128KiB'
+ };
+----
+
+It is possible to override these training parameters by `nodetool
compressiondictionary train` command as
+explained in the section futher down below. If `train` subcommand do not
override them, CQL parameters are
+taken into account.
+
== Other options
* `crc_check_chance` (default: `1.0`): determines how likely Cassandra
@@ -417,6 +434,11 @@ There are these four commands for now related to
compression dictionaries:
by a specific id, to a file.
* import - a user can import a compression dictionary, exported by above
command, from a file to a cluster.
+For `train` subcommand, it is possible to specify:
+
+* `--max-dict-size` - overrides `training_max_dictionary_size` in CQL
`compression` configuration.
+* `--max-total-sample-size` - overrides `training_max_total_sample_size` in
CQL `compression` configuration.
+
Importing a dictionary to a table from a file should happen only against one
node at a time as
dictionary will be eventually stored in
`system_distributed.compression_dictionaries` table and reused
cluster-wide. When imports happen from multiple nodes, the highest-version
dictionary will be used.
diff --git a/src/java/org/apache/cassandra/config/Config.java
b/src/java/org/apache/cassandra/config/Config.java
index 7489b29926..193c77c8f4 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -525,8 +525,6 @@ public class Config
public volatile DurationSpec.IntSecondsBound
compression_dictionary_cache_expire = new DurationSpec.IntSecondsBound("24h");
// Dictionary training settings
- public volatile DataStorageSpec.IntKibibytesBound
compression_dictionary_training_max_dictionary_size = new
DataStorageSpec.IntKibibytesBound("64KiB");
- public volatile DataStorageSpec.IntKibibytesBound
compression_dictionary_training_max_total_sample_size = new
DataStorageSpec.IntKibibytesBound("10MiB");
public volatile boolean compression_dictionary_training_auto_train_enabled
= false;
public volatile float compression_dictionary_training_sampling_rate =
0.01f; // samples 1%
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 8565f72be8..809a44fdcc 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -1238,6 +1238,9 @@ public class DatabaseDescriptor
{
throw new ConfigurationException(ex.getMessage());
}
+
+ if (conf.compression_dictionary_training_sampling_rate <= 0.0f ||
conf.compression_dictionary_training_sampling_rate > 1.0f)
+ throw new ConfigurationException("Sampling rate has to be between
(0.0;1], it is " + conf.compression_dictionary_training_sampling_rate);
}
@VisibleForTesting
@@ -4425,16 +4428,6 @@ public class DatabaseDescriptor
return conf.compression_dictionary_cache_expire.toSeconds();
}
- public static int getCompressionDictionaryTrainingMaxDictionarySize()
- {
- return
conf.compression_dictionary_training_max_dictionary_size.toBytes();
- }
-
- public static int getCompressionDictionaryTrainingMaxTotalSampleSize()
- {
- return
conf.compression_dictionary_training_max_total_sample_size.toBytes();
- }
-
public static boolean getCompressionDictionaryTrainingAutoTrainEnabled()
{
return conf.compression_dictionary_training_auto_train_enabled;
diff --git
a/src/java/org/apache/cassandra/db/compression/CompressionDictionary.java
b/src/java/org/apache/cassandra/db/compression/CompressionDictionary.java
index 18c5f58a99..bbacacd85c 100644
--- a/src/java/org/apache/cassandra/db/compression/CompressionDictionary.java
+++ b/src/java/org/apache/cassandra/db/compression/CompressionDictionary.java
@@ -345,13 +345,12 @@ public interface CompressionDictionary
@Override
public ICompressionDictionaryTrainer createTrainer(String
keyspaceName,
String
tableName,
-
CompressionDictionaryTrainingConfig config,
ICompressor
compressor)
{
Preconditions.checkArgument(compressor instanceof
ZstdDictionaryCompressor,
"Expected compressor to be
ZstdDictionaryCompressor; actual: %s",
compressor.getClass().getSimpleName());
- return new ZstdDictionaryTrainer(keyspaceName, tableName,
config, ((ZstdDictionaryCompressor) compressor).compressionLevel());
+ return new ZstdDictionaryTrainer(keyspaceName, tableName,
((ZstdDictionaryCompressor) compressor).compressionLevel());
}
};
@@ -378,13 +377,11 @@ public interface CompressionDictionary
*
* @param keyspaceName the keyspace name
* @param tableName the table name
- * @param config the training configuration
* @param compressor the compressor to use for training
* @return a dictionary trainer instance
*/
public abstract ICompressionDictionaryTrainer createTrainer(String
keyspaceName,
String
tableName,
-
CompressionDictionaryTrainingConfig config,
ICompressor compressor);
}
diff --git
a/src/java/org/apache/cassandra/db/compression/CompressionDictionaryManager.java
b/src/java/org/apache/cassandra/db/compression/CompressionDictionaryManager.java
index 88bbc6c918..a7875fa3ad 100644
---
a/src/java/org/apache/cassandra/db/compression/CompressionDictionaryManager.java
+++
b/src/java/org/apache/cassandra/db/compression/CompressionDictionaryManager.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.db.compression;
import java.nio.ByteBuffer;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import javax.annotation.Nullable;
@@ -32,6 +33,7 @@ import com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.cassandra.config.DataStorageSpec;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.ColumnFamilyStore;
import
org.apache.cassandra.db.compression.CompressionDictionary.LightweightCompressionDictionary;
@@ -43,6 +45,10 @@ import org.apache.cassandra.utils.MBeanWrapper;
import org.apache.cassandra.utils.MBeanWrapper.OnException;
import static java.lang.String.format;
+import static
org.apache.cassandra.io.compress.IDictionaryCompressor.DEFAULT_TRAINING_MAX_DICTIONARY_SIZE_PARAMETER_VALUE;
+import static
org.apache.cassandra.io.compress.IDictionaryCompressor.DEFAULT_TRAINING_MAX_TOTAL_SAMPLE_SIZE_PARAMETER_VALUE;
+import static
org.apache.cassandra.io.compress.IDictionaryCompressor.TRAINING_MAX_DICTIONARY_SIZE_PARAMETER_NAME;
+import static
org.apache.cassandra.io.compress.IDictionaryCompressor.TRAINING_MAX_TOTAL_SAMPLE_SIZE_PARAMETER_NAME;
public class CompressionDictionaryManager implements
CompressionDictionaryManagerMBean,
ICompressionDictionaryCache,
@@ -77,13 +83,12 @@ public class CompressionDictionaryManager implements
CompressionDictionaryManage
{
// Initialize components
this.trainer = ICompressionDictionaryTrainer.create(keyspaceName,
tableName,
-
columnFamilyStore.metadata().params.compression,
-
createTrainingConfig());
+
columnFamilyStore.metadata().params.compression);
trainer.setDictionaryTrainedListener(this::handleNewDictionary);
scheduler.scheduleRefreshTask();
- trainer.start(false);
+ trainer.start(false, createTrainingConfig());
}
if (registerBookkeeping && isEnabled)
@@ -134,7 +139,7 @@ public class CompressionDictionaryManager implements
CompressionDictionaryManage
}
}
- trainer = ICompressionDictionaryTrainer.create(keyspaceName,
tableName, newParams, createTrainingConfig());
+ trainer = ICompressionDictionaryTrainer.create(keyspaceName,
tableName, newParams);
trainer.setDictionaryTrainedListener(this::handleNewDictionary);
}
@@ -143,7 +148,7 @@ public class CompressionDictionaryManager implements
CompressionDictionaryManage
// Start trainer if it exists
if (trainer != null)
{
- trainer.start(false);
+ trainer.start(false, createTrainingConfig());
}
return;
}
@@ -174,7 +179,7 @@ public class CompressionDictionaryManager implements
CompressionDictionaryManage
dictionaryTrainer.addSample(sample);
}
}
-
+
@Nullable
@Override
public CompressionDictionary getCurrent()
@@ -213,7 +218,7 @@ public class CompressionDictionaryManager implements
CompressionDictionaryManage
}
@Override
- public synchronized void train(boolean force)
+ public synchronized void train(boolean force, Map<String, String>
parameters)
{
// Validate table supports dictionary compression
if (!isEnabled)
@@ -226,12 +231,15 @@ public class CompressionDictionaryManager implements
CompressionDictionaryManage
throw new IllegalStateException("Dictionary trainer is not
available for table " + keyspaceName + '.' + tableName);
}
+ // resolve training config and fail fast when invalid, so we do not
reach logic which would e.g. flush unnecessarily.
+ CompressionDictionaryTrainingConfig trainingConfig =
createTrainingConfig(parameters);
+
// SSTable-based training: sample from existing SSTables
Set<SSTableReader> sstables = columnFamilyStore.getLiveSSTables();
if (sstables.isEmpty())
{
logger.info("No SSTables available for training in table {}.{},
flushing memtable first",
- keyspaceName, tableName);
+ keyspaceName, tableName);
columnFamilyStore.forceBlockingFlush(ColumnFamilyStore.FlushReason.USER_FORCED);
sstables = columnFamilyStore.getLiveSSTables();
@@ -242,10 +250,10 @@ public class CompressionDictionaryManager implements
CompressionDictionaryManage
}
logger.info("Starting SSTable-based training for {}.{} with {}
SSTables",
- keyspaceName, tableName, sstables.size());
+ keyspaceName, tableName, sstables.size());
- trainer.start(true);
- scheduler.scheduleSSTableBasedTraining(trainer, sstables,
createTrainingConfig(), force);
+ trainer.start(true, trainingConfig);
+ scheduler.scheduleSSTableBasedTraining(trainer, sstables,
trainingConfig, force);
}
@Override
@@ -358,18 +366,71 @@ public class CompressionDictionaryManager implements
CompressionDictionaryManage
onNewDictionaryTrained(dictionary.dictId());
}
+ /**
+ * @return training configuration with max dictionary size and total
sample size from CQL table compression params.
+ */
private CompressionDictionaryTrainingConfig createTrainingConfig()
+ {
+ return createTrainingConfig(Map.of());
+ }
+
+ /**
+ * Returns configuration for training where max dictionary size and total
sample size can be supplied by a
+ * user, e.g. upon the invocation of training method via JMX.
+ *
+ * @param parameters user-supplied parameters from training, when not
specified, CQL compression parameters
+ * for a given table will be used
+ * @return training configuration with max dictionary size and total
sample size of supplied arguments.
+ */
+ private CompressionDictionaryTrainingConfig
createTrainingConfig(Map<String, String> parameters)
{
CompressionParams compressionParams =
columnFamilyStore.metadata().params.compression;
return CompressionDictionaryTrainingConfig
.builder()
-
.maxDictionarySize(DatabaseDescriptor.getCompressionDictionaryTrainingMaxDictionarySize())
-
.maxTotalSampleSize(DatabaseDescriptor.getCompressionDictionaryTrainingMaxTotalSampleSize())
+
.maxDictionarySize(getCompressionDictionaryTrainingMaxDictionarySize(compressionParams,
parameters))
+
.maxTotalSampleSize(getCompressionDictionaryTrainingMaxTotalSampleSize(compressionParams,
parameters))
.samplingRate(DatabaseDescriptor.getCompressionDictionaryTrainingSamplingRate())
.chunkSize(compressionParams.chunkLength())
.build();
}
+ private int
getCompressionDictionaryTrainingMaxDictionarySize(CompressionParams
compressionParams, Map<String, String> parameters)
+ {
+ return internalTrainingParameterResolution(compressionParams,
+
parameters.get(TRAINING_MAX_DICTIONARY_SIZE_PARAMETER_NAME),
+
TRAINING_MAX_DICTIONARY_SIZE_PARAMETER_NAME,
+
DEFAULT_TRAINING_MAX_DICTIONARY_SIZE_PARAMETER_VALUE);
+ }
+
+ private int
getCompressionDictionaryTrainingMaxTotalSampleSize(CompressionParams
compressionParams, Map<String, String> parameters)
+ {
+ return internalTrainingParameterResolution(compressionParams,
+
parameters.get(TRAINING_MAX_TOTAL_SAMPLE_SIZE_PARAMETER_NAME),
+
TRAINING_MAX_TOTAL_SAMPLE_SIZE_PARAMETER_NAME,
+
DEFAULT_TRAINING_MAX_TOTAL_SAMPLE_SIZE_PARAMETER_VALUE);
+ }
+
+ private int internalTrainingParameterResolution(CompressionParams
compressionParams,
+ String userSuppliedValue,
+ String parameterName,
+ String
defaultParameterValue)
+ {
+ String resolvedValue = null;
+ try
+ {
+ if (userSuppliedValue == null)
+ resolvedValue =
compressionParams.getOtherOptions().getOrDefault(parameterName,
defaultParameterValue);
+ else
+ resolvedValue = userSuppliedValue;
+
+ return new
DataStorageSpec.IntKibibytesBound(resolvedValue).toBytes();
+ }
+ catch (Throwable t)
+ {
+ throw new IllegalArgumentException(String.format("Invalid value
for %s: %s", parameterName, resolvedValue));
+ }
+ }
+
private void storeDictionary(CompressionDictionary dictionary)
{
if (!isEnabled)
@@ -385,7 +446,7 @@ public class CompressionDictionaryManager implements
CompressionDictionaryManage
* Determines if a new trainer should be created based on compression
parameter changes.
* A new trainer is needed when no existing trainer exists or when the
existing trainer
* is not compatible with the new compression parameters.
- *
+ * <p>
* The method is (and should be) only invoked inside {@link
#maybeReloadFromSchema(CompressionParams)},
* which is guarded by synchronized.
*
diff --git
a/src/java/org/apache/cassandra/db/compression/CompressionDictionaryManagerMBean.java
b/src/java/org/apache/cassandra/db/compression/CompressionDictionaryManagerMBean.java
index 3dc349d115..0d4ad2eb7c 100644
---
a/src/java/org/apache/cassandra/db/compression/CompressionDictionaryManagerMBean.java
+++
b/src/java/org/apache/cassandra/db/compression/CompressionDictionaryManagerMBean.java
@@ -18,6 +18,8 @@
package org.apache.cassandra.db.compression;
+import java.util.Map;
+
import javax.annotation.Nullable;
import javax.management.openmbean.CompositeData;
import javax.management.openmbean.TabularData;
@@ -32,12 +34,31 @@ public interface CompressionDictionaryManagerMBean
* If no SSTables are available, automatically flushes the memtable first.
* This operation runs synchronously and blocks until training completes.
*
+ * @param force force the dictionary training even if there are not
enough samples;
+ * otherwise, dictionary training won't start if the
trainer is not ready
+ * @param parameters parameters of training process
+ * @throws UnsupportedOperationException if table doesn't support
dictionary compression
+ * @throws IllegalStateException if no SSTables available after
flush
+ */
+ void train(boolean force, Map<String, String> parameters);
+
+ /**
+ * Starts training from existing SSTables for this table.
+ * Samples chunks from all live SSTables and trains a compression
dictionary.
+ * If no SSTables are available, automatically flushes the memtable first.
+ * This operation runs synchronously and blocks until training completes.
+ * <p>
+ * Training parameters will be taken from CQL's compression section of a
given table training is conducted on.
+ *
* @param force force the dictionary training even if there are not enough
samples;
* otherwise, dictionary training won't start if the trainer
is not ready
* @throws UnsupportedOperationException if table doesn't support
dictionary compression
* @throws IllegalStateException if no SSTables available after
flush
*/
- void train(boolean force);
+ default void train(boolean force)
+ {
+ train(force, Map.of());
+ }
/**
* Gets the current training state for this table.
@@ -77,7 +98,7 @@ public interface CompressionDictionaryManagerMBean
*
* @param dictionary compression dictionary to import
* @throws IllegalArgumentException when dictionary to import is older
(based on dictionary id) than
- * the latest compression dictionary for given table, or when dictionary
data are invalid
+ * the latest compression dictionary for
given table, or when dictionary data are invalid
* @throws IllegalStateException if underlying table does not support
dictionary compression or
* kind of dictionary to import does not
match kind of dictionary table
* is configured for
diff --git
a/src/java/org/apache/cassandra/db/compression/CompressionDictionaryTrainingConfig.java
b/src/java/org/apache/cassandra/db/compression/CompressionDictionaryTrainingConfig.java
index f580b0acfa..794807974b 100644
---
a/src/java/org/apache/cassandra/db/compression/CompressionDictionaryTrainingConfig.java
+++
b/src/java/org/apache/cassandra/db/compression/CompressionDictionaryTrainingConfig.java
@@ -28,7 +28,7 @@ public class CompressionDictionaryTrainingConfig
public final int maxDictionarySize;
public final int maxTotalSampleSize;
public final int acceptableTotalSampleSize;
- public final int samplingRate;
+ public final float samplingRate;
public final int chunkSize;
private CompressionDictionaryTrainingConfig(Builder builder)
@@ -49,7 +49,7 @@ public class CompressionDictionaryTrainingConfig
{
private int maxDictionarySize = 65536; // 64KB default
private int maxTotalSampleSize = 10 * 1024 * 1024; // 10MB total
- private int samplingRate = 100; // Sampling 1%
+ private float samplingRate = 0.01f; // Sampling 1%
private int chunkSize = 64 * 1024; // 64KB default
public Builder maxDictionarySize(int size)
@@ -66,7 +66,10 @@ public class CompressionDictionaryTrainingConfig
public Builder samplingRate(float samplingRate)
{
- this.samplingRate = Math.round(1 / samplingRate);
+ if (samplingRate <= 0.0f || samplingRate > 1.0f)
+ throw new IllegalArgumentException("Sampling rate has to be
between (0.0;1], it is " + samplingRate);
+
+ this.samplingRate = samplingRate;
return this;
}
diff --git
a/src/java/org/apache/cassandra/db/compression/ICompressionDictionaryTrainer.java
b/src/java/org/apache/cassandra/db/compression/ICompressionDictionaryTrainer.java
index 5612531a4f..76a9b02bb2 100644
---
a/src/java/org/apache/cassandra/db/compression/ICompressionDictionaryTrainer.java
+++
b/src/java/org/apache/cassandra/db/compression/ICompressionDictionaryTrainer.java
@@ -42,11 +42,12 @@ public interface ICompressionDictionaryTrainer extends
AutoCloseable
* Starts the trainer for collecting samples.
*
* @param manualTraining true if this is manual training, false for
automatic
+ * @param trainingConfig training configuration to use
* @return true if the trainer is started; otherwise false. The trainer is
started
* in any of those conditions: 1. trainer closed; 2. not requested
for
* either manual or auto training; 3. failed to start
*/
- boolean start(boolean manualTraining);
+ boolean start(boolean manualTraining, CompressionDictionaryTrainingConfig
trainingConfig);
/**
* @return true if the trainer is ready to take a new sample; otherwise,
false
@@ -87,8 +88,10 @@ public interface ICompressionDictionaryTrainer extends
AutoCloseable
/**
* Clears all collected samples and resets trainer state.
+ *
+ * @param trainingConfig configuration to use upon resetting
*/
- void reset();
+ void reset(CompressionDictionaryTrainingConfig trainingConfig);
/**
* Gets the current training state including status, progress, and failure
details.
@@ -122,10 +125,10 @@ public interface ICompressionDictionaryTrainer extends
AutoCloseable
/**
* Updates the sampling rate for this trainer.
*
- * @param newSamplingRate the new sampling rate. For exmaple, 1 = sample
every time (100%),
- * 2 = expect sample 1/2 of data (50%), n = expect
sample 1/n of data
+ * @param newSamplingRate the new sampling rate. For exmaple, 0.01 -
sample 1% of data,
+ * 1 = sample every time (100%), 0.5 - sample 50%
of data.
*/
- void updateSamplingRate(int newSamplingRate);
+ void updateSamplingRate(float newSamplingRate);
/**
* Factory method to create appropriate trainer based on compression
parameters.
@@ -133,14 +136,12 @@ public interface ICompressionDictionaryTrainer extends
AutoCloseable
* @param keyspaceName the keyspace name for logging
* @param tableName the table name for logging
* @param params the compression parameters
- * @param config the training configuration
* @return a dictionary trainer for the specified compression algorithm
* @throws IllegalArgumentException if no dictionary trainer is available
for the compression algorithm
*/
static ICompressionDictionaryTrainer create(String keyspaceName,
String tableName,
- CompressionParams params,
-
CompressionDictionaryTrainingConfig config)
+ CompressionParams params)
{
ICompressor compressor = params.getSstableCompressor();
if (!(compressor instanceof IDictionaryCompressor))
@@ -149,7 +150,7 @@ public interface ICompressionDictionaryTrainer extends
AutoCloseable
}
IDictionaryCompressor dictionaryCompressor = (IDictionaryCompressor)
compressor;
- return
dictionaryCompressor.acceptableDictionaryKind().createTrainer(keyspaceName,
tableName, config, compressor);
+ return
dictionaryCompressor.acceptableDictionaryKind().createTrainer(keyspaceName,
tableName, compressor);
}
enum TrainingStatus
diff --git
a/src/java/org/apache/cassandra/db/compression/ZstdDictionaryTrainer.java
b/src/java/org/apache/cassandra/db/compression/ZstdDictionaryTrainer.java
index bd7c566dcc..1f370143ef 100644
--- a/src/java/org/apache/cassandra/db/compression/ZstdDictionaryTrainer.java
+++ b/src/java/org/apache/cassandra/db/compression/ZstdDictionaryTrainer.java
@@ -51,42 +51,48 @@ public class ZstdDictionaryTrainer implements
ICompressionDictionaryTrainer
private final String keyspaceName;
private final String tableName;
- private final CompressionDictionaryTrainingConfig config;
+ private volatile CompressionDictionaryTrainingConfig config;
private final AtomicLong totalSampleSize;
private final AtomicLong sampleCount;
private final int compressionLevel; // optimal if using the same level for
training as when compressing.
// Sampling rate can be updated during training
- private volatile int samplingRate;
+ private volatile float samplingRate;
// Minimum number of samples required by ZSTD library
private static final int MIN_SAMPLES_REQUIRED = 11;
private volatile Consumer<CompressionDictionary> dictionaryTrainedListener;
// TODO: manage the samples in this class for auto-train (follow-up). The
ZstdDictTrainer cannot be re-used for multiple training runs.
- private ZstdDictTrainer zstdTrainer;
+ private volatile ZstdDictTrainer zstdTrainer;
private volatile boolean closed = false;
private volatile TrainingStatus currentTrainingStatus;
private volatile String failureMessage;
- public ZstdDictionaryTrainer(String keyspaceName, String tableName,
- CompressionDictionaryTrainingConfig config,
- int compressionLevel)
+ public ZstdDictionaryTrainer(String keyspaceName, String tableName, int
compressionLevel)
+ {
+ this(keyspaceName,
+ tableName,
+ compressionLevel,
+
DatabaseDescriptor.getCompressionDictionaryTrainingSamplingRate());
+ }
+
+ @VisibleForTesting
+ public ZstdDictionaryTrainer(String keyspaceName, String tableName, int
compressionLevel, float samplingRate)
{
this.keyspaceName = keyspaceName;
this.tableName = tableName;
- this.config = config;
this.totalSampleSize = new AtomicLong(0);
this.sampleCount = new AtomicLong(0);
this.compressionLevel = compressionLevel;
- this.samplingRate = config.samplingRate;
+ this.samplingRate = samplingRate;
this.currentTrainingStatus = TrainingStatus.NOT_STARTED;
}
@Override
public boolean shouldSample()
{
- return zstdTrainer != null &&
ThreadLocalRandom.current().nextInt(samplingRate) == 0;
+ return zstdTrainer != null && ThreadLocalRandom.current().nextFloat()
< samplingRate;
}
@Override
@@ -228,6 +234,12 @@ public class ZstdDictionaryTrainer implements
ICompressionDictionaryTrainer
return message.toString();
}
+ if (config == null)
+ {
+ message.append(": configuration not initialized (call start()
first)");
+ return message.toString();
+ }
+
long currentSampleCount = sampleCount.get();
long currentTotalSampleSize = totalSampleSize.get();
@@ -263,6 +275,7 @@ public class ZstdDictionaryTrainer implements
ICompressionDictionaryTrainer
return currentTrainingStatus != TrainingStatus.TRAINING
&& !closed
&& zstdTrainer != null
+ && config != null
&& totalSampleSize.get() >= config.acceptableTotalSampleSize
&& sampleCount.get() >= MIN_SAMPLES_REQUIRED;
}
@@ -291,7 +304,7 @@ public class ZstdDictionaryTrainer implements
ICompressionDictionaryTrainer
}
@Override
- public boolean start(boolean manualTraining)
+ public boolean start(boolean manualTraining,
CompressionDictionaryTrainingConfig trainingConfig)
{
if (closed || !(manualTraining || shouldAutoStartTraining()))
return false;
@@ -299,7 +312,7 @@ public class ZstdDictionaryTrainer implements
ICompressionDictionaryTrainer
try
{
// reset on starting; a new zstdTrainer instance is created during
reset
- reset();
+ reset(trainingConfig);
logger.info("Started dictionary training for {}.{}", keyspaceName,
tableName);
currentTrainingStatus = TrainingStatus.SAMPLING;
failureMessage = null; // Clear any previous failure message
@@ -323,7 +336,7 @@ public class ZstdDictionaryTrainer implements
ICompressionDictionaryTrainer
}
@Override
- public void reset()
+ public void reset(CompressionDictionaryTrainingConfig trainingConfig)
{
if (closed)
{
@@ -335,7 +348,8 @@ public class ZstdDictionaryTrainer implements
ICompressionDictionaryTrainer
{
totalSampleSize.set(0);
sampleCount.set(0);
- zstdTrainer = new ZstdDictTrainer(config.maxTotalSampleSize,
config.maxDictionarySize, compressionLevel);
+ zstdTrainer = new
ZstdDictTrainer(trainingConfig.maxTotalSampleSize,
trainingConfig.maxDictionarySize, compressionLevel);
+ config = trainingConfig;
}
}
@@ -352,12 +366,11 @@ public class ZstdDictionaryTrainer implements
ICompressionDictionaryTrainer
}
@Override
- public void updateSamplingRate(int newSamplingRate)
+ public void updateSamplingRate(float newSamplingRate)
{
- if (newSamplingRate <= 0)
- {
- throw new IllegalArgumentException("Sampling rate must be
positive, got: " + newSamplingRate);
- }
+ if (newSamplingRate <= 0.0f || newSamplingRate > 1.0f)
+ throw new IllegalArgumentException("Sampling rate has to be
between (0.0;1], it is " + newSamplingRate);
+
this.samplingRate = newSamplingRate;
logger.debug("Updated sampling rate to {} for {}.{}", newSamplingRate,
keyspaceName, tableName);
}
diff --git
a/src/java/org/apache/cassandra/io/compress/IDictionaryCompressor.java
b/src/java/org/apache/cassandra/io/compress/IDictionaryCompressor.java
index f6f7681f74..fd4ce62ea3 100644
--- a/src/java/org/apache/cassandra/io/compress/IDictionaryCompressor.java
+++ b/src/java/org/apache/cassandra/io/compress/IDictionaryCompressor.java
@@ -18,7 +18,11 @@
package org.apache.cassandra.io.compress;
+import org.apache.cassandra.config.DataStorageSpec;
import org.apache.cassandra.db.compression.CompressionDictionary;
+import org.apache.cassandra.exceptions.ConfigurationException;
+
+import static java.lang.String.format;
/**
* Interface for compressors that support dictionary-based compression.
@@ -26,18 +30,45 @@ import
org.apache.cassandra.db.compression.CompressionDictionary;
* Dictionary compressors can use pre-trained compression dictionaries to
achieve
* better compression ratios, especially for small data chunks that are similar
* to the training data used to create the dictionary.
- *
+ *
* @param <T> the specific type of compression dictionary this compressor
supports
*/
public interface IDictionaryCompressor<T extends CompressionDictionary>
{
+ String TRAINING_MAX_DICTIONARY_SIZE_PARAMETER_NAME =
"training_max_dictionary_size";
+ String DEFAULT_TRAINING_MAX_DICTIONARY_SIZE_PARAMETER_VALUE = "64KiB";
+
+ String TRAINING_MAX_TOTAL_SAMPLE_SIZE_PARAMETER_NAME =
"training_max_total_sample_size";
+ String DEFAULT_TRAINING_MAX_TOTAL_SAMPLE_SIZE_PARAMETER_VALUE = "10MiB";
+
+ /**
+ * Validates value of a parameter for training purposes. The value to
validate should
+ * be accepted by {@link DataStorageSpec.IntKibibytesBound}. This method
is used upon validation
+ * of input parameters in the implementations of dictionary compressor.
+ *
+ * @param parameterName name of a parameter to validate
+ * @param resolvedValue value to validate
+ */
+ static void validateTrainingParameter(String parameterName, String
resolvedValue)
+ {
+ try
+ {
+ new DataStorageSpec.IntKibibytesBound(resolvedValue).toBytes();
+ }
+ catch (Throwable t)
+ {
+ throw new ConfigurationException(format("Unable to set value to
parameter %s: %s. Reason: %s",
+ parameterName,
resolvedValue, t.getMessage()));
+ }
+ }
+
/**
* Returns a compressor instance configured with the specified compression
dictionary.
* <br>
* This method may return the same instance if it already uses the given
dictionary,
* or create a new instance configured with the dictionary. The
implementation should
* be efficient and avoid unnecessary object creation when possible.
- *
+ *
* @param compressionDictionary the dictionary to use for
compression/decompression
* @return a compressor instance that will use the specified dictionary
*/
@@ -49,7 +80,7 @@ public interface IDictionaryCompressor<T extends
CompressionDictionary>
* This is used to validate dictionary compatibility before attempting to
use
* a dictionary with this compressor. Only dictionaries of the returned
kind
* should be passed to {@link
#getOrCopyWithDictionary(CompressionDictionary)}.
- *
+ *
* @return the compression dictionary kind supported by this compressor
*/
CompressionDictionary.Kind acceptableDictionaryKind();
@@ -60,7 +91,7 @@ public interface IDictionaryCompressor<T extends
CompressionDictionary>
* The default implementation compares the dictionary's kind with the kind
* returned by {@link #acceptableDictionaryKind()}. Compressor
implementations
* may override this method to provide more sophisticated compatibility
checks.
- *
+ *
* @param dictionary the compression dictionary to check for compatibility
* @return true if this compressor can use the dictionary, false otherwise
*/
diff --git
a/src/java/org/apache/cassandra/io/compress/ZstdDictionaryCompressor.java
b/src/java/org/apache/cassandra/io/compress/ZstdDictionaryCompressor.java
index 46a0c3d483..29a4131bc1 100644
--- a/src/java/org/apache/cassandra/io/compress/ZstdDictionaryCompressor.java
+++ b/src/java/org/apache/cassandra/io/compress/ZstdDictionaryCompressor.java
@@ -39,6 +39,8 @@ import
org.apache.cassandra.db.compression.CompressionDictionary.Kind;
import org.apache.cassandra.db.compression.ZstdCompressionDictionary;
import org.apache.cassandra.utils.concurrent.Ref;
+import static
org.apache.cassandra.io.compress.IDictionaryCompressor.validateTrainingParameter;
+
public class ZstdDictionaryCompressor extends ZstdCompressorBase implements
ICompressor, IDictionaryCompressor<ZstdCompressionDictionary>
{
private static final ConcurrentHashMap<Integer, ZstdDictionaryCompressor>
instancesPerLevel = new ConcurrentHashMap<>();
@@ -75,6 +77,12 @@ public class ZstdDictionaryCompressor extends
ZstdCompressorBase implements ICom
{
int level = getOrDefaultCompressionLevel(options);
validateCompressionLevel(level);
+ validateTrainingParameter(TRAINING_MAX_DICTIONARY_SIZE_PARAMETER_NAME,
+
options.getOrDefault(TRAINING_MAX_DICTIONARY_SIZE_PARAMETER_NAME,
+
DEFAULT_TRAINING_MAX_DICTIONARY_SIZE_PARAMETER_VALUE));
+
validateTrainingParameter(TRAINING_MAX_TOTAL_SAMPLE_SIZE_PARAMETER_NAME,
+
options.getOrDefault(TRAINING_MAX_TOTAL_SAMPLE_SIZE_PARAMETER_NAME,
+
DEFAULT_TRAINING_MAX_TOTAL_SAMPLE_SIZE_PARAMETER_VALUE));
return getOrCreate(level, null);
}
@@ -109,7 +117,9 @@ public class ZstdDictionaryCompressor extends
ZstdCompressorBase implements ICom
private ZstdDictionaryCompressor(int level, ZstdCompressionDictionary
dictionary, Ref<ZstdCompressionDictionary> dictionaryRef)
{
- super(level, Set.of(COMPRESSION_LEVEL_OPTION_NAME));
+ super(level, Set.of(COMPRESSION_LEVEL_OPTION_NAME,
+ TRAINING_MAX_DICTIONARY_SIZE_PARAMETER_NAME,
+ TRAINING_MAX_TOTAL_SAMPLE_SIZE_PARAMETER_NAME));
this.dictionary = dictionary;
this.dictionaryRef = dictionaryRef;
}
diff --git a/src/java/org/apache/cassandra/tools/NodeProbe.java
b/src/java/org/apache/cassandra/tools/NodeProbe.java
index aa98d2218e..bafb922068 100644
--- a/src/java/org/apache/cassandra/tools/NodeProbe.java
+++ b/src/java/org/apache/cassandra/tools/NodeProbe.java
@@ -2693,15 +2693,19 @@ public class NodeProbe implements AutoCloseable
* Triggers compression dictionary training for the specified table.
* Samples chunks from existing SSTables and trains a dictionary.
*
- * @param keyspace the keyspace name
- * @param table the table name
- * @param force force the dictionary training even if there are not enough
samples
- * @throws IOException if there's an error accessing the MBean
- * @throws IllegalArgumentException if table doesn't support dictionary
compression
+ * @param keyspace the keyspace name
+ * @param table the table name
+ * @param force force the dictionary training even if
there are not enough samples
+ * @param parameters training parameters, if empty,
training parameters will be taken from CQL's
+ * compression section of a given table
training is conducted on.
+ * @throws IOException if there's an error accessing the
MBean
+ * @throws IllegalArgumentException if table doesn't support dictionary
compression
*/
- public void trainCompressionDictionary(String keyspace, String table,
boolean force) throws IOException
+ public void trainCompressionDictionary(String keyspace, String table,
+ boolean force,
+ Map<String, String> parameters)
throws IOException
{
- doWithCompressionDictionaryManagerMBean(proxy -> { proxy.train(force);
return null; }, keyspace, table);
+ doWithCompressionDictionaryManagerMBean(proxy -> {proxy.train(force,
parameters); return null; }, keyspace, table);
}
/**
diff --git
a/src/java/org/apache/cassandra/tools/nodetool/CompressionDictionaryCommandGroup.java
b/src/java/org/apache/cassandra/tools/nodetool/CompressionDictionaryCommandGroup.java
index 3c11597207..757260f66e 100644
---
a/src/java/org/apache/cassandra/tools/nodetool/CompressionDictionaryCommandGroup.java
+++
b/src/java/org/apache/cassandra/tools/nodetool/CompressionDictionaryCommandGroup.java
@@ -19,7 +19,9 @@ package org.apache.cassandra.tools.nodetool;
import java.io.PrintStream;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.TimeUnit;
import javax.management.openmbean.CompositeData;
@@ -28,6 +30,7 @@ import javax.management.openmbean.TabularData;
import com.fasterxml.jackson.databind.exc.ValueInstantiationException;
import com.google.common.util.concurrent.Uninterruptibles;
+import org.apache.cassandra.config.DataStorageSpec;
import
org.apache.cassandra.db.compression.CompressionDictionaryDetailsTabularData;
import
org.apache.cassandra.db.compression.CompressionDictionaryDetailsTabularData.CompressionDictionaryDataObject;
import
org.apache.cassandra.db.compression.ICompressionDictionaryTrainer.TrainingStatus;
@@ -48,6 +51,10 @@ import static java.nio.file.StandardOpenOption.CREATE;
import static java.nio.file.StandardOpenOption.TRUNCATE_EXISTING;
import static java.nio.file.StandardOpenOption.WRITE;
import static java.util.stream.Collectors.joining;
+import static
org.apache.cassandra.io.compress.IDictionaryCompressor.DEFAULT_TRAINING_MAX_DICTIONARY_SIZE_PARAMETER_VALUE;
+import static
org.apache.cassandra.io.compress.IDictionaryCompressor.DEFAULT_TRAINING_MAX_TOTAL_SAMPLE_SIZE_PARAMETER_VALUE;
+import static
org.apache.cassandra.io.compress.IDictionaryCompressor.TRAINING_MAX_DICTIONARY_SIZE_PARAMETER_NAME;
+import static
org.apache.cassandra.io.compress.IDictionaryCompressor.TRAINING_MAX_TOTAL_SAMPLE_SIZE_PARAMETER_NAME;
@Command(name = "compressiondictionary",
description = "Manage compression dictionaries",
@@ -61,6 +68,9 @@ public class CompressionDictionaryCommandGroup
description = "Manually trigger compression dictionary training
for a table. If no SSTables are available, the memtable will be flushed first.")
public static class TrainDictionary extends AbstractCommand
{
+ private static final String MAX_DICT_SIZE_PARAM_NAME =
"--max-dict-size";
+ private static final String MAX_TOTAL_SAMPLE_SIZE_PARAM_NAME =
"--max-total-sample-size";
+
@Parameters(index = "0", description = "The keyspace name", arity =
"1")
private String keyspace;
@@ -70,18 +80,40 @@ public class CompressionDictionaryCommandGroup
@Option(names = { "-f", "--force" }, description = "Force the
dictionary training even if there are not enough samples")
private boolean force = false;
+ @Option(names = MAX_DICT_SIZE_PARAM_NAME, description = "Maximum size
of a trained compression dictionary. " +
+ "Larger
dictionaries may provide better compression but use more memory. When not set,
" +
+ "the value
from compression configuration from CQL for a given table is used. " +
+ "The default
value is " + DEFAULT_TRAINING_MAX_DICTIONARY_SIZE_PARAMETER_VALUE + '.')
+ private String trainingMaxDictionarySize;
+
+ @Option(names = MAX_TOTAL_SAMPLE_SIZE_PARAM_NAME, description =
"Maximum total size of sample data to collect for dictionary training. " +
+ "More
sample data generally produces better dictionaries but takes longer to train. "
+
+ "The
recommended sample size is 100x the dictionary size. When not set, " +
+ "the
value from compression configuration from CQL for a give table is used. " +
+ "The
default value is " + DEFAULT_TRAINING_MAX_TOTAL_SAMPLE_SIZE_PARAMETER_VALUE +
'.')
+ private String trainingMaxTotalSampleSize;
+
@Override
public void execute(NodeProbe probe)
{
PrintStream out = probe.output().out;
PrintStream err = probe.output().err;
+ validateParameters(err, trainingMaxDictionarySize,
trainingMaxTotalSampleSize);
+
try
{
out.printf("Starting compression dictionary training for
%s.%s...%n", keyspace, table);
out.printf("Training from existing SSTables (flushing first if
needed)%n");
- probe.trainCompressionDictionary(keyspace, table, force);
+ Map<String, String> parameters = new HashMap<>();
+ if (trainingMaxDictionarySize != null)
+
parameters.put(TRAINING_MAX_DICTIONARY_SIZE_PARAMETER_NAME,
trainingMaxDictionarySize);
+
+ if (trainingMaxTotalSampleSize != null)
+
parameters.put(TRAINING_MAX_TOTAL_SAMPLE_SIZE_PARAMETER_NAME,
trainingMaxTotalSampleSize);
+
+ probe.trainCompressionDictionary(keyspace, table, force,
parameters);
// Wait for training completion (10 minutes timeout for
SSTable-based training)
out.println("Sampling from existing SSTables and training.");
@@ -140,6 +172,35 @@ public class CompressionDictionaryCommandGroup
out.printf("\rStatus: %s | Samples: %d | Size: %.2f MiB | Elapsed:
%ds",
status, sampleCount, sampleSizeMB, elapsedSeconds);
}
+
+ private static void validateParameters(PrintStream err, String
trainingMaxDictionarySize, String trainingMaxTotalSampleSize)
+ {
+ if (trainingMaxDictionarySize != null)
+ {
+ try
+ {
+ new
DataStorageSpec.IntKibibytesBound(trainingMaxDictionarySize).toBytes();
+ }
+ catch (Throwable t)
+ {
+ err.println("Invalid value for " +
MAX_DICT_SIZE_PARAM_NAME + ": " + t.getMessage());
+ System.exit(1);
+ }
+ }
+
+ if (trainingMaxTotalSampleSize != null)
+ {
+ try
+ {
+ new
DataStorageSpec.IntKibibytesBound(trainingMaxTotalSampleSize).toBytes();
+ }
+ catch (Throwable t)
+ {
+ err.println("Invalid value for " +
MAX_TOTAL_SAMPLE_SIZE_PARAM_NAME + ": " + t.getMessage());
+ System.exit(1);
+ }
+ }
+ }
}
@Command(name = "list",
diff --git a/test/resources/nodetool/help/compressiondictionary
b/test/resources/nodetool/help/compressiondictionary
index 5a65cae36d..92b9ecbe5d 100644
--- a/test/resources/nodetool/help/compressiondictionary
+++ b/test/resources/nodetool/help/compressiondictionary
@@ -12,7 +12,9 @@ SYNOPSIS
[(-pp | --print-port)] [(-pw <password> | --password
<password>)]
[(-pwf <passwordFilePath> | --password-file
<passwordFilePath>)]
[(-u <username> | --username <username>)]
compressiondictionary train
- [(-f | --force)] [--] <keyspace> <table>
+ [(-f | --force)] [--max-dict-size <trainingMaxDictionarySize>]
+ [--max-total-sample-size <trainingMaxTotalSampleSize>] [--]
<keyspace>
+ <table>
nodetool [(-h <host> | --host <host>)] [(-p <port> | --port <port>)]
[(-pp | --print-port)] [(-pw <password> | --password
<password>)]
@@ -60,6 +62,17 @@ COMMANDS
With --force option, Force the dictionary training even if there
are not
enough samples
+
+ With --max-dict-size option, Maximum size of a trained compression
+ dictionary. Larger dictionaries may provide better compression but
use more
+ memory. When not set, the value from compression configuration
from CQL for
+ a given table is used. The default value is 64KiB.
+
+ With --max-total-sample-size option, Maximum total size of sample
data to
+ collect for dictionary training. More sample data generally
produces better
+ dictionaries but takes longer to train. The recommended sample
size is 100x
+ the dictionary size. When not set, the value from compression
configuration
+ from CQL for a give table is used. The default value is 10MiB.
list
List available dictionaries of specific keyspace and table.
export
diff --git a/test/resources/nodetool/help/compressiondictionary$train
b/test/resources/nodetool/help/compressiondictionary$train
index ce36f648f8..206a6d2b0b 100644
--- a/test/resources/nodetool/help/compressiondictionary$train
+++ b/test/resources/nodetool/help/compressiondictionary$train
@@ -8,7 +8,9 @@ SYNOPSIS
[(-pp | --print-port)] [(-pw <password> | --password
<password>)]
[(-pwf <passwordFilePath> | --password-file
<passwordFilePath>)]
[(-u <username> | --username <username>)]
compressiondictionary train
- [(-f | --force)] [--] <keyspace> <table>
+ [(-f | --force)] [--max-dict-size <trainingMaxDictionarySize>]
+ [--max-total-sample-size <trainingMaxTotalSampleSize>] [--]
<keyspace>
+ <table>
OPTIONS
-f, --force
@@ -17,6 +19,20 @@ OPTIONS
-h <host>, --host <host>
Node hostname or ip address
+ --max-dict-size <trainingMaxDictionarySize>
+ Maximum size of a trained compression dictionary. Larger
+ dictionaries may provide better compression but use more memory.
+ When not set, the value from compression configuration from CQL for
+ a given table is used. The default value is 64KiB.
+
+ --max-total-sample-size <trainingMaxTotalSampleSize>
+ Maximum total size of sample data to collect for dictionary
+ training. More sample data generally produces better dictionaries
+ but takes longer to train. The recommended sample size is 100x the
+ dictionary size. When not set, the value from compression
+ configuration from CQL for a give table is used. The default value
+ is 10MiB.
+
-p <port>, --port <port>
Remote jmx agent port number
diff --git
a/test/unit/org/apache/cassandra/db/compression/CompressionDictionaryIntegrationTest.java
b/test/unit/org/apache/cassandra/db/compression/CompressionDictionaryIntegrationTest.java
index 9227580838..b192b6bd2d 100644
---
a/test/unit/org/apache/cassandra/db/compression/CompressionDictionaryIntegrationTest.java
+++
b/test/unit/org/apache/cassandra/db/compression/CompressionDictionaryIntegrationTest.java
@@ -19,13 +19,13 @@
package org.apache.cassandra.db.compression;
import java.util.Collections;
+import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.junit.Before;
import org.junit.Test;
import org.apache.cassandra.config.Config;
-import org.apache.cassandra.config.DataStorageSpec;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.cql3.CQLTester;
import org.apache.cassandra.db.ColumnFamilyStore;
@@ -37,6 +37,10 @@ import org.apache.cassandra.schema.CompressionParams;
import org.apache.cassandra.utils.Clock;
import static org.apache.cassandra.Util.spinUntilTrue;
+import static
org.apache.cassandra.io.compress.IDictionaryCompressor.DEFAULT_TRAINING_MAX_DICTIONARY_SIZE_PARAMETER_VALUE;
+import static
org.apache.cassandra.io.compress.IDictionaryCompressor.DEFAULT_TRAINING_MAX_TOTAL_SAMPLE_SIZE_PARAMETER_VALUE;
+import static
org.apache.cassandra.io.compress.IDictionaryCompressor.TRAINING_MAX_DICTIONARY_SIZE_PARAMETER_NAME;
+import static
org.apache.cassandra.io.compress.IDictionaryCompressor.TRAINING_MAX_TOTAL_SAMPLE_SIZE_PARAMETER_NAME;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatNoException;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -45,13 +49,14 @@ public class CompressionDictionaryIntegrationTest extends
CQLTester
{
private static final String REPEATED_DATA = "The quick brown fox jumps
over the lazy dog. This text repeats for better compression. ";
+ private final static String TRAINING_MAX_TOTAL_SAMPLE_SIZE = "128KiB";
+ private final static String TRAINING_MAX_DICTIONARY_SIZE = "10KiB";
+
@Before
public void configureDatabaseDescriptor()
{
Config config = DatabaseDescriptor.getRawConfig();
config.compression_dictionary_training_sampling_rate = 1.0f;
- config.compression_dictionary_training_max_total_sample_size = new
DataStorageSpec.IntKibibytesBound("128KiB");
- config.compression_dictionary_training_max_dictionary_size = new
DataStorageSpec.IntKibibytesBound("10KiB");
config.flush_compression = Config.FlushCompression.table;
DatabaseDescriptor.setConfig(config);
}
@@ -59,51 +64,59 @@ public class CompressionDictionaryIntegrationTest extends
CQLTester
@Test
public void testEnableDisableDictionaryCompression()
{
- String table = createTable("CREATE TABLE %s (id int PRIMARY KEY, data
text) WITH compression = {'class': 'ZstdDictionaryCompressor'}");
+ String table = createTable(getTableCql());
ColumnFamilyStore cfs =
Keyspace.open(keyspace()).getColumnFamilyStore(table);
CompressionDictionaryManager manager =
cfs.compressionDictionaryManager();
// Insert data and flush to create SSTables
for (int i = 0; i < 100; i++)
{
- execute("INSERT INTO %s (id, data) VALUES (?, ?)", i,
REPEATED_DATA + " " + i);
+ execute("INSERT INTO %s (pk, data) VALUES (?, ?)",
Integer.toString(i), REPEATED_DATA + " " + i);
}
flush();
assertThatNoException()
.as("Should allow manual training")
- .isThrownBy(() -> manager.train(false));
+ .isThrownBy(() -> manager.train(false,
+
Map.of(TRAINING_MAX_DICTIONARY_SIZE_PARAMETER_NAME,
TRAINING_MAX_DICTIONARY_SIZE,
+
TRAINING_MAX_TOTAL_SAMPLE_SIZE_PARAMETER_NAME,
TRAINING_MAX_TOTAL_SAMPLE_SIZE)));
// Disable dictionary compression
CompressionParams nonDictParams = CompressionParams.lz4();
manager.maybeReloadFromSchema(nonDictParams);
- assertThatThrownBy(() -> manager.train(false))
+ assertThatThrownBy(() -> manager.train(false,
+
Map.of(TRAINING_MAX_DICTIONARY_SIZE_PARAMETER_NAME,
TRAINING_MAX_DICTIONARY_SIZE,
+
TRAINING_MAX_TOTAL_SAMPLE_SIZE_PARAMETER_NAME, TRAINING_MAX_TOTAL_SAMPLE_SIZE)))
.as("Should disallow manual training when using lz4")
.isInstanceOf(UnsupportedOperationException.class)
.hasMessageContaining("does not support dictionary compression");
// Re-enable dictionary compression
CompressionParams dictParams =
CompressionParams.zstd(CompressionParams.DEFAULT_CHUNK_LENGTH, true,
-
Collections.singletonMap("compression_level", "3"));
+
Map.of("compression_level", "3",
+
TRAINING_MAX_TOTAL_SAMPLE_SIZE_PARAMETER_NAME, TRAINING_MAX_TOTAL_SAMPLE_SIZE,
+
TRAINING_MAX_DICTIONARY_SIZE_PARAMETER_NAME, TRAINING_MAX_DICTIONARY_SIZE));
manager.maybeReloadFromSchema(dictParams);
// Insert more data for the re-enabled compression
for (int i = 100; i < 200; i++)
{
- execute("INSERT INTO %s (id, data) VALUES (?, ?)", i,
REPEATED_DATA + " " + i);
+ execute("INSERT INTO %s (pk, data) VALUES (?, ?)",
Integer.toString(i), REPEATED_DATA + " " + i);
}
flush();
assertThatNoException()
.as("Should allow manual training after switching back to dictionary
compression")
- .isThrownBy(() -> manager.train(false));
+ .isThrownBy(() -> manager.train(false,
+
Map.of(TRAINING_MAX_DICTIONARY_SIZE_PARAMETER_NAME,
TRAINING_MAX_DICTIONARY_SIZE,
+
TRAINING_MAX_TOTAL_SAMPLE_SIZE_PARAMETER_NAME,
TRAINING_MAX_TOTAL_SAMPLE_SIZE)));
}
@Test
public void testCompressionParameterChanges()
{
- String table = createTable("CREATE TABLE %s (id int PRIMARY KEY, data
text) WITH compression = {'class': 'ZstdDictionaryCompressor'}");
+ String table = createTable(getTableCql());
ColumnFamilyStore cfs =
Keyspace.open(keyspace()).getColumnFamilyStore(table);
CompressionDictionaryManager manager =
cfs.compressionDictionaryManager();
ICompressionDictionaryTrainer trainer = manager.trainer();
@@ -124,7 +137,7 @@ public class CompressionDictionaryIntegrationTest extends
CQLTester
@Test
public void testResourceCleanupOnClose() throws Exception
{
- createTable("CREATE TABLE %s (id int PRIMARY KEY, data text) WITH
compression = {'class': 'ZstdDictionaryCompressor'}");
+ createTable(getTableCql());
ColumnFamilyStore cfs = getCurrentColumnFamilyStore();
CompressionDictionaryManager manager =
cfs.compressionDictionaryManager();
@@ -162,8 +175,7 @@ public class CompressionDictionaryIntegrationTest extends
CQLTester
public void testSSTableBasedTraining()
{
DatabaseDescriptor.setFlushCompression(Config.FlushCompression.table);
- String table = createTable("CREATE TABLE %s (pk text PRIMARY KEY, data
text) " +
- "WITH compression = {'class':
'ZstdDictionaryCompressor', 'chunk_length_in_kb' : 4}");
+ String table = createTable(getTableCqlWithChunkLength());
ColumnFamilyStore cfs =
Keyspace.open(keyspace()).getColumnFamilyStore(table);
CompressionDictionaryManager manager =
cfs.compressionDictionaryManager();
@@ -184,7 +196,8 @@ public class CompressionDictionaryIntegrationTest extends
CQLTester
.hasSizeGreaterThan(0);
// Train from existing SSTables
- manager.train(true);
+ manager.train(true,
Map.of(TRAINING_MAX_DICTIONARY_SIZE_PARAMETER_NAME,
TRAINING_MAX_DICTIONARY_SIZE,
+
TRAINING_MAX_TOTAL_SAMPLE_SIZE_PARAMETER_NAME, TRAINING_MAX_TOTAL_SAMPLE_SIZE));
// Training should complete quickly since we're reading from existing
SSTables
spinUntilTrue(() ->
TrainingState.fromCompositeData(manager.getTrainingState()).status ==
TrainingStatus.COMPLETED, 10);
@@ -210,15 +223,36 @@ public class CompressionDictionaryIntegrationTest extends
CQLTester
@Test
public void testSSTableBasedTrainingWithoutSSTables()
{
- String table = createTable("CREATE TABLE %s (pk text PRIMARY KEY, data
text) " +
- "WITH compression = {'class':
'ZstdDictionaryCompressor'}");
+ String table = createTable(getTableCql());
ColumnFamilyStore cfs =
Keyspace.open(keyspace()).getColumnFamilyStore(table);
CompressionDictionaryManager manager =
cfs.compressionDictionaryManager();
// Try to train without any SSTables
- assertThatThrownBy(() -> manager.train(false))
+ assertThatThrownBy(() -> manager.train(false,
Map.of(TRAINING_MAX_DICTIONARY_SIZE_PARAMETER_NAME,
TRAINING_MAX_DICTIONARY_SIZE,
+
TRAINING_MAX_TOTAL_SAMPLE_SIZE_PARAMETER_NAME, TRAINING_MAX_TOTAL_SAMPLE_SIZE)))
.as("Should fail when no SSTables are available")
.isInstanceOf(IllegalStateException.class)
.hasMessageContaining("No SSTables available for training");
}
+
+ private String getTableCqlWithChunkLength()
+ {
+ return "CREATE TABLE %s (pk text PRIMARY KEY, data text) " +
+ "WITH compression = {" +
+ "'class': 'ZstdDictionaryCompressor'," +
+ "'chunk_length_in_kb' : 4, " +
+ '\'' + TRAINING_MAX_DICTIONARY_SIZE_PARAMETER_NAME + "': '" +
DEFAULT_TRAINING_MAX_DICTIONARY_SIZE_PARAMETER_VALUE + "'," +
+ '\'' + TRAINING_MAX_TOTAL_SAMPLE_SIZE_PARAMETER_NAME + "': '" +
DEFAULT_TRAINING_MAX_TOTAL_SAMPLE_SIZE_PARAMETER_VALUE + '\'' +
+ '}';
+ }
+
+ private String getTableCql()
+ {
+ return "CREATE TABLE %s (pk text PRIMARY KEY, data text) " +
+ "WITH compression = {" +
+ "'class': 'ZstdDictionaryCompressor'," +
+ '\'' + TRAINING_MAX_DICTIONARY_SIZE_PARAMETER_NAME + "': '" +
DEFAULT_TRAINING_MAX_DICTIONARY_SIZE_PARAMETER_VALUE + "'," +
+ '\'' + TRAINING_MAX_TOTAL_SAMPLE_SIZE_PARAMETER_NAME + "': '" +
DEFAULT_TRAINING_MAX_TOTAL_SAMPLE_SIZE_PARAMETER_VALUE + '\'' +
+ '}';
+ }
}
diff --git
a/test/unit/org/apache/cassandra/db/compression/CompressionDictionarySchedulerTest.java
b/test/unit/org/apache/cassandra/db/compression/CompressionDictionarySchedulerTest.java
index 98a0110e6f..42f91a82e0 100644
---
a/test/unit/org/apache/cassandra/db/compression/CompressionDictionarySchedulerTest.java
+++
b/test/unit/org/apache/cassandra/db/compression/CompressionDictionarySchedulerTest.java
@@ -25,13 +25,15 @@ import org.junit.After;
import org.junit.Before;
import org.junit.Test;
-import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.DataStorageSpec;
import org.apache.cassandra.cql3.CQLTester;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import static org.apache.cassandra.Util.spinUntilTrue;
+import static
org.apache.cassandra.io.compress.IDictionaryCompressor.DEFAULT_TRAINING_MAX_DICTIONARY_SIZE_PARAMETER_VALUE;
+import static
org.apache.cassandra.io.compress.IDictionaryCompressor.DEFAULT_TRAINING_MAX_TOTAL_SAMPLE_SIZE_PARAMETER_VALUE;
import static org.assertj.core.api.Assertions.assertThat;
public class CompressionDictionarySchedulerTest extends CQLTester
@@ -90,7 +92,7 @@ public class CompressionDictionarySchedulerTest extends
CQLTester
assertThat(sstables).isNotEmpty();
CompressionDictionaryTrainingConfig config =
createSampleAllTrainingConfig(cfs);
- manager.trainer().start(true);
+ manager.trainer().start(true, config);
assertThat(manager.getCurrent()).as("There should be no dictionary at
this step").isNull();
scheduler.scheduleSSTableBasedTraining(manager.trainer(), sstables,
config, true);
@@ -118,8 +120,8 @@ public class CompressionDictionarySchedulerTest extends
CQLTester
private static CompressionDictionaryTrainingConfig
createSampleAllTrainingConfig(ColumnFamilyStore cfs) {
return CompressionDictionaryTrainingConfig
.builder()
-
.maxDictionarySize(DatabaseDescriptor.getCompressionDictionaryTrainingMaxDictionarySize())
-
.maxTotalSampleSize(DatabaseDescriptor.getCompressionDictionaryTrainingMaxTotalSampleSize())
+ .maxDictionarySize(new
DataStorageSpec.IntKibibytesBound(DEFAULT_TRAINING_MAX_DICTIONARY_SIZE_PARAMETER_VALUE).toBytes())
+ .maxTotalSampleSize(new
DataStorageSpec.IntKibibytesBound(DEFAULT_TRAINING_MAX_TOTAL_SAMPLE_SIZE_PARAMETER_VALUE).toBytes())
.samplingRate(1.0f)
.chunkSize(cfs.metadata().params.compression.chunkLength())
.build();
diff --git
a/test/unit/org/apache/cassandra/db/compression/CompressionDictionaryTrainingConfigTest.java
b/test/unit/org/apache/cassandra/db/compression/CompressionDictionaryTrainingConfigTest.java
index 8f3d746d03..a066f629b9 100644
---
a/test/unit/org/apache/cassandra/db/compression/CompressionDictionaryTrainingConfigTest.java
+++
b/test/unit/org/apache/cassandra/db/compression/CompressionDictionaryTrainingConfigTest.java
@@ -36,8 +36,8 @@ public class CompressionDictionaryTrainingConfigTest
.as("Default max total sample size should be 10MB")
.isEqualTo(10 * 1024 * 1024);
assertThat(config.samplingRate)
- .as("Default sampling rate should be 100 (1%)")
- .isEqualTo(100);
+ .as("Default sampling rate should be 0.01 (1%)")
+ .isEqualTo(0.01f);
}
@Test
@@ -57,7 +57,7 @@ public class CompressionDictionaryTrainingConfigTest
assertThat(config.maxDictionarySize).isEqualTo(dictSize);
assertThat(config.maxTotalSampleSize).isEqualTo(sampleSize);
assertThat(config.acceptableTotalSampleSize).isEqualTo(sampleSize / 10
* 8);
- assertThat(config.samplingRate).isEqualTo(Math.round(1 /
samplingRate));
+ assertThat(config.samplingRate).isEqualTo(0.005f);
// Verify relationship between max and acceptable sample sizes
assertThat(config.acceptableTotalSampleSize)
diff --git
a/test/unit/org/apache/cassandra/db/compression/ZstdDictionaryTrainerTest.java
b/test/unit/org/apache/cassandra/db/compression/ZstdDictionaryTrainerTest.java
index 504bb6368b..7794d25cb9 100644
---
a/test/unit/org/apache/cassandra/db/compression/ZstdDictionaryTrainerTest.java
+++
b/test/unit/org/apache/cassandra/db/compression/ZstdDictionaryTrainerTest.java
@@ -70,7 +70,7 @@ public class ZstdDictionaryTrainerTest
callbackResult = new AtomicReference<>();
mockCallback = callbackResult::set;
- trainer = new ZstdDictionaryTrainer(TEST_KEYSPACE, TEST_TABLE,
testConfig, COMPRESSION_LEVEL);
+ trainer = new ZstdDictionaryTrainer(TEST_KEYSPACE, TEST_TABLE,
COMPRESSION_LEVEL, testConfig.samplingRate);
trainer.setDictionaryTrainedListener(mockCallback);
}
@@ -109,7 +109,7 @@ public class ZstdDictionaryTrainerTest
public void testTrainerStart()
{
// Auto start depends on configuration - test both scenarios
- boolean started = trainer.start(false);
+ boolean started = trainer.start(false, testConfig);
if (started)
{
assertThat(trainer.getTrainingState().getStatus())
@@ -127,7 +127,7 @@ public class ZstdDictionaryTrainerTest
@Test
public void testTrainerStartManual()
{
- assertThat(trainer.start(true))
+ assertThat(trainer.start(true, testConfig))
.as("Manual training should start successfully")
.isTrue();
assertThat(trainer.getTrainingState().getStatus())
@@ -141,17 +141,17 @@ public class ZstdDictionaryTrainerTest
@Test
public void testTrainerStartMultipleTimes()
{
- assertThat(trainer.start(true))
+ assertThat(trainer.start(true, testConfig))
.as("First start (manual training) should succeed")
.isTrue();
Object firstTrainer = trainer.trainer();
assertThat(firstTrainer).isNotNull();
- assertThat(trainer.start(true))
+ assertThat(trainer.start(true, testConfig))
.as("Second start (manual training) should suceed and reset")
.isTrue();
Object secondTrainer = trainer.trainer();
assertThat(secondTrainer).isNotNull().isNotSameAs(firstTrainer);
- assertThat(trainer.start(false))
+ assertThat(trainer.start(false, testConfig))
.as("Third start (not manual training) should fail")
.isFalse();
}
@@ -159,7 +159,7 @@ public class ZstdDictionaryTrainerTest
@Test
public void testTrainerCloseIdempotent()
{
- trainer.start(true);
+ trainer.start(true, testConfig);
trainer.close();
trainer.close(); // Should not throw
trainer.close(); // Should not throw
@@ -172,14 +172,14 @@ public class ZstdDictionaryTrainerTest
@Test
public void testTrainerReset()
{
- trainer.start(true);
+ trainer.start(true, testConfig);
addSampleData(1000); // Add some samples
assertThat(trainer.getTrainingState().getSampleCount())
.as("Should have samples before reset")
.isGreaterThan(0);
- trainer.reset();
+ trainer.reset(testConfig);
assertThat(trainer.getTrainingState().getStatus())
.as("Status should be NOT_STARTED after reset")
.isEqualTo(TrainingStatus.NOT_STARTED);
@@ -194,10 +194,10 @@ public class ZstdDictionaryTrainerTest
@Test
public void testStartAfterClose()
{
- trainer.start(true);
+ trainer.start(true, testConfig);
trainer.close();
- assertThat(trainer.start(true))
+ assertThat(trainer.start(true, testConfig))
.as("Should not start after close")
.isFalse();
assertThat(trainer.getTrainingState().getStatus())
@@ -208,7 +208,7 @@ public class ZstdDictionaryTrainerTest
@Test
public void testShouldSample()
{
- trainer.start(true);
+ trainer.start(true, testConfig);
// With sampling rate 1 (100%), should always return true
for (int i = 0; i < 10; i++)
{
@@ -229,8 +229,7 @@ public class ZstdDictionaryTrainerTest
.samplingRate(0.001f) // 0.1%
sampling
.build();
- try (ZstdDictionaryTrainer lowSamplingTrainer = new
ZstdDictionaryTrainer(TEST_KEYSPACE, TEST_TABLE,
-
lowSamplingConfig, COMPRESSION_LEVEL))
+ try (ZstdDictionaryTrainer lowSamplingTrainer = new
ZstdDictionaryTrainer(TEST_KEYSPACE, TEST_TABLE, COMPRESSION_LEVEL,
lowSamplingConfig.samplingRate))
{
lowSamplingTrainer.setDictionaryTrainedListener(mockCallback);
// With very low sampling rate, should mostly return false
@@ -254,7 +253,7 @@ public class ZstdDictionaryTrainerTest
@Test
public void testAddSample()
{
- trainer.start(true);
+ trainer.start(true, testConfig);
assertThat(trainer.getTrainingState().getSampleCount())
.as("Initial sample count should be 0")
@@ -292,7 +291,7 @@ public class ZstdDictionaryTrainerTest
@Test
public void testAddSampleAfterClose()
{
- trainer.start(true);
+ trainer.start(true, testConfig);
trainer.close();
ByteBuffer sample = ByteBuffer.wrap(SAMPLE_DATA.getBytes());
@@ -309,7 +308,7 @@ public class ZstdDictionaryTrainerTest
@Test
public void testAddNullSample()
{
- trainer.start(true);
+ trainer.start(true, testConfig);
trainer.addSample(null); // Should not throw
assertThat(trainer.getTrainingState().getStatus())
@@ -323,7 +322,7 @@ public class ZstdDictionaryTrainerTest
@Test
public void testAddEmptySample()
{
- trainer.start(true);
+ trainer.start(true, testConfig);
ByteBuffer empty = ByteBuffer.allocate(0);
trainer.addSample(empty); // Should not throw
@@ -338,7 +337,7 @@ public class ZstdDictionaryTrainerTest
@Test
public void testIsReady()
{
- trainer.start(true);
+ trainer.start(true, testConfig);
assertThat(trainer.isReady())
.as("Should not be ready initially")
.isFalse();
@@ -363,7 +362,7 @@ public class ZstdDictionaryTrainerTest
@Test
public void testTrainDictionaryWithInsufficientSampleCount()
{
- trainer.start(true);
+ trainer.start(true, testConfig);
// Add sufficient data size but only 5 samples (less than minimum 11)
for (int i = 0; i < 5; i++)
@@ -396,7 +395,7 @@ public class ZstdDictionaryTrainerTest
@Test
public void testTrainDictionaryWithSufficientSampleCount()
{
- trainer.start(true);
+ trainer.start(true, testConfig);
// Add 15 samples with sufficient total size
for (int i = 0; i < 15; i++)
@@ -417,7 +416,7 @@ public class ZstdDictionaryTrainerTest
@Test
public void testTrainDictionaryAsync() throws Exception
{
- Future<CompressionDictionary> future = startTraining(true, false,
testConfig.acceptableTotalSampleSize);
+ Future<CompressionDictionary> future = startTraining(true, false,
testConfig);
CompressionDictionary dictionary = future.get(5, TimeUnit.SECONDS);
assertThat(dictionary).as("Dictionary should not be null").isNotNull();
@@ -432,7 +431,7 @@ public class ZstdDictionaryTrainerTest
public void testTrainDictionaryAsyncForce() throws Exception
{
// Don't add enough samples
- Future<CompressionDictionary> future = startTraining(true, true, 512);
+ Future<CompressionDictionary> future = startTraining(true, true,
testConfig, 512);
CompressionDictionary dictionary = future.get(1, TimeUnit.SECONDS);
assertThat(dictionary)
.as("Forced async training should produce dictionary")
@@ -443,7 +442,7 @@ public class ZstdDictionaryTrainerTest
public void testTrainDictionaryAsyncForceFailsWithNoData() throws Exception
{
AtomicReference<CompressionDictionary> dictRef = new
AtomicReference<>();
- Future<CompressionDictionary> result = startTraining(true, true, 0)
+ Future<CompressionDictionary> result = startTraining(true, true,
testConfig, 0)
.addCallback((dict,
t) -> dictRef.set(dict));
assertThat(result.isDone() && result.cause() != null)
@@ -460,7 +459,7 @@ public class ZstdDictionaryTrainerTest
@Test
public void testDictionaryTrainedListener()
{
- trainer.start(true);
+ trainer.start(true, testConfig);
addSampleData(testConfig.acceptableTotalSampleSize);
// Train dictionary synchronously - callback should be called
@@ -527,10 +526,10 @@ public class ZstdDictionaryTrainerTest
@Test
public void testUpdateSamplingRate()
{
- trainer.start(true);
+ trainer.start(true, testConfig);
// Test updating to different valid sampling rates
- trainer.updateSamplingRate(10);
+ trainer.updateSamplingRate(0.1f);
// With sampling rate 10 (10%), should mostly return false
int sampleCount = 0;
@@ -550,7 +549,7 @@ public class ZstdDictionaryTrainerTest
.isLessThan(iterations / 5); // at most 20%
// Test updating to 100% sampling
- trainer.updateSamplingRate(1);
+ trainer.updateSamplingRate(1.0f);
// Should always sample now
for (int i = 0; i < 10; i++)
@@ -564,29 +563,29 @@ public class ZstdDictionaryTrainerTest
@Test
public void testUpdateSamplingRateValidation()
{
- trainer.start(true);
+ trainer.start(true, testConfig);
// Test invalid sampling rates
- assertThatThrownBy(() -> trainer.updateSamplingRate(0))
+ assertThatThrownBy(() -> trainer.updateSamplingRate(0f))
.isInstanceOf(IllegalArgumentException.class)
- .hasMessageContaining("Sampling rate must be positive");
+ .hasMessageContaining("Sampling rate has to be between (0.0;1], it is
0.0");
- assertThatThrownBy(() -> trainer.updateSamplingRate(-1))
+ assertThatThrownBy(() -> trainer.updateSamplingRate(-1f))
.isInstanceOf(IllegalArgumentException.class)
- .hasMessageContaining("Sampling rate must be positive");
+ .hasMessageContaining("Sampling rate has to be between (0.0;1], it is
-1.0");
- assertThatThrownBy(() -> trainer.updateSamplingRate(-100))
+ assertThatThrownBy(() -> trainer.updateSamplingRate(-100f))
.isInstanceOf(IllegalArgumentException.class)
- .hasMessageContaining("Sampling rate must be positive");
+ .hasMessageContaining("Sampling rate has to be between (0.0;1], it is
-100.0");
}
@Test
public void testUpdateSamplingRateBeforeStart()
{
// Should be able to update sampling rate even before start
- trainer.updateSamplingRate(5);
+ trainer.updateSamplingRate(0.2f);
- trainer.start(true);
+ trainer.start(true, testConfig);
// Verify the updated rate is used after start
int sampleCount = 0;
@@ -620,7 +619,7 @@ public class ZstdDictionaryTrainerTest
@Test
public void testTrainDictionaryClosed()
{
- trainer.start(true);
+ trainer.start(true, testConfig);
addSampleData(testConfig.acceptableTotalSampleSize);
trainer.close();
@@ -634,7 +633,7 @@ public class ZstdDictionaryTrainerTest
@Test
public void testTrainDictionaryInsufficientSampleSize()
{
- trainer.start(true);
+ trainer.start(true, testConfig);
// Add enough samples (15) but with insufficient total size
for (int i = 0; i < 15; i++)
@@ -665,7 +664,7 @@ public class ZstdDictionaryTrainerTest
@Test
public void testTrainDictionaryInsufficientBothSampleCountAndSize()
{
- trainer.start(true);
+ trainer.start(true, testConfig);
// Add only 3 samples with small size
for (int i = 0; i < 3; i++)
@@ -690,9 +689,9 @@ public class ZstdDictionaryTrainerTest
.hasMessageContaining("Use --force to train anyway");
}
- private Future<CompressionDictionary> startTraining(boolean
manualTraining, boolean forceTrain, int sampleSize) throws Exception
+ private Future<CompressionDictionary> startTraining(boolean
manualTraining, boolean forceTrain, CompressionDictionaryTrainingConfig config,
int sampleSize) throws Exception
{
- trainer.start(manualTraining);
+ trainer.start(manualTraining, config);
if (sampleSize > 0)
{
addSampleData(sampleSize);
@@ -707,13 +706,18 @@ public class ZstdDictionaryTrainerTest
CountDownLatch latch = new CountDownLatch(1);
Future<CompressionDictionary> future =
trainer.trainDictionaryAsync(forceTrain)
-
.addCallback((dict, throwable) -> latch.countDown());
+ .addCallback((dict,
throwable) -> latch.countDown());
assertThat(latch.await(10, TimeUnit.SECONDS))
.as("Training should complete within timeout")
.isTrue();
return future;
}
+ private Future<CompressionDictionary> startTraining(boolean
manualTraining, boolean forceTrain, CompressionDictionaryTrainingConfig config)
throws Exception
+ {
+ return startTraining(manualTraining, forceTrain, config,
config.acceptableTotalSampleSize);
+ }
+
private void addSampleData(int totalSize)
{
byte[] sampleBytes = SAMPLE_DATA.getBytes();
@@ -738,7 +742,7 @@ public class ZstdDictionaryTrainerTest
.isEqualTo(0);
// Start training
- trainer.start(true);
+ trainer.start(true, testConfig);
// Add some samples
byte[] sampleBytes = SAMPLE_DATA.getBytes();
@@ -758,7 +762,7 @@ public class ZstdDictionaryTrainerTest
.as("Total sample size should match number of samples times sample
size")
.isEqualTo((long) numSamples * sampleSize);
- trainer.reset();
+ trainer.reset(testConfig);
assertThat(trainer.getTrainingState().getSampleCount())
.as("Sample count should be 0 after reset")
diff --git
a/test/unit/org/apache/cassandra/tools/nodetool/TrainCompressionDictionaryTest.java
b/test/unit/org/apache/cassandra/tools/nodetool/TrainCompressionDictionaryTest.java
index 0d50ab08e2..cbe9bd41cb 100644
---
a/test/unit/org/apache/cassandra/tools/nodetool/TrainCompressionDictionaryTest.java
+++
b/test/unit/org/apache/cassandra/tools/nodetool/TrainCompressionDictionaryTest.java
@@ -22,6 +22,7 @@ import org.junit.BeforeClass;
import org.junit.Test;
import org.apache.cassandra.cql3.CQLTester;
+import org.apache.cassandra.io.compress.IDictionaryCompressor;
import org.apache.cassandra.tools.ToolRunner;
import static org.apache.cassandra.tools.ToolRunner.invokeNodetool;
@@ -55,6 +56,56 @@ public class TrainCompressionDictionaryTest extends CQLTester
.contains(table);
}
+ @Test
+ public void testTrainingParameterOverride()
+ {
+ // Create a table with dictionary compression enabled
+ String table = createTable("CREATE TABLE %s (id int PRIMARY KEY, data
text) WITH compression = {'class': 'ZstdDictionaryCompressor'}");
+
+ disableCompaction(keyspace(), table);
+
+ createSSTables(true);
+
+ // Test training command without --force since we have limited test
data will fail
+ ToolRunner.ToolResult result = invokeNodetool("compressiondictionary",
"train", keyspace(), table);
+ result.asserts().failure();
+ assertThat(result.getStderr())
+ .as("Should indicate training not completed")
+ .contains("Trainer is not ready: insufficient sample size")
+ .contains("/8 MiB") // 10MiB / 10 * 8
+ .contains(keyspace())
+ .contains(table);
+
+ ToolRunner.ToolResult resultWithOverrides =
invokeNodetool("compressiondictionary",
+ "train",
+
"--max-total-sample-size", "5MiB",
+ keyspace(),
table);
+
+ assertThat(resultWithOverrides.getStderr())
+ .as("Should indicate training not completed")
+ .contains("Trainer is not ready: insufficient sample size")
+ .contains("/4 MiB") // 5MiB / 10 * 8
+ .contains(keyspace())
+ .contains(table);
+
+ execute(String.format("ALTER TABLE %s.%s WITH " +
+ "compression = {'class':
'ZstdDictionaryCompressor', '%s': '6MiB'}",
+ keyspace(),
+ table,
+
IDictionaryCompressor.TRAINING_MAX_TOTAL_SAMPLE_SIZE_PARAMETER_NAME));
+
+ // we are not overriding, but we have changed
training_max_total_sample_size to 6MiB via CQL, so it sticks
+
+ ToolRunner.ToolResult resultWithoutOverrides =
invokeNodetool("compressiondictionary", "train", keyspace(), table);
+
+ assertThat(resultWithoutOverrides.getStderr())
+ .as("Should indicate training not completed")
+ .contains("Trainer is not ready: insufficient sample size")
+ .contains("/4.8 MiB") // 6MiB / 10 * 8
+ .contains(keyspace())
+ .contains(table);
+ }
+
@Test
public void testTrainCommandWithDataButNoSSTables()
{
diff --git
a/test/unit/org/apache/cassandra/utils/CompressionDictionaryHelper.java
b/test/unit/org/apache/cassandra/utils/CompressionDictionaryHelper.java
index d20c02f847..031d79f3b6 100644
--- a/test/unit/org/apache/cassandra/utils/CompressionDictionaryHelper.java
+++ b/test/unit/org/apache/cassandra/utils/CompressionDictionaryHelper.java
@@ -56,9 +56,9 @@ public class CompressionDictionaryHelper
.maxTotalSampleSize(1024
* 1024) // 1MB total
.build();
- try (ZstdDictionaryTrainer trainer = new
ZstdDictionaryTrainer(keyspace, table, config, 3))
+ try (ZstdDictionaryTrainer trainer = new
ZstdDictionaryTrainer(keyspace, table, 3, 100))
{
- trainer.start(true);
+ trainer.start(true, config);
for (int i = 0; i < 25000; i++)
{
trainer.addSample(UTF8Type.instance.fromString(CompressionDictionaryHelper.INSTANCE.getRandomSample()));
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]