smiklosovic commented on code in PR #4399: URL: https://github.com/apache/cassandra/pull/4399#discussion_r2410967130
########## src/java/org/apache/cassandra/db/compression/ICompressionDictionaryTrainer.java: ########## @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db.compression; + +import java.nio.ByteBuffer; +import java.util.concurrent.CompletableFuture; +import java.util.function.Consumer; + +import org.apache.cassandra.concurrent.ScheduledExecutors; +import org.apache.cassandra.io.compress.ICompressor; +import org.apache.cassandra.io.compress.IDictionaryCompressor; +import org.apache.cassandra.io.compress.ZstdDictionaryCompressor; +import org.apache.cassandra.schema.CompressionParams; + +/** + * Interface for training compression dictionaries from sample data. + * <p> + * Implementations handle: + * - Sample collection and management + * - Dictionary training lifecycle + * - Asynchronous training execution + * - Training status tracking + */ +public interface ICompressionDictionaryTrainer extends AutoCloseable +{ + /** + * Starts the trainer for collecting samples. + * + * @param manualTraining true if this is manual training, false for automatic + * @return true if the trainer is started; otherwise false. The trainer is started + * in any of those conditions: 1. trainer closed; 2. not requested for + * either manual or auto training; 3. failed to start + */ + boolean start(boolean manualTraining); + + /** + * @return true if the trainer is ready to take a new sample; otherwise, false + */ + boolean shouldSample(); + + /** + * Adds a sample to the training dataset. + * + * @param sample the sample data to add for training + */ + void addSample(ByteBuffer sample); + + /** + * Trains and produces a compression dictionary from collected samples synchronously. + * + * @param force force the dictionary training even if there are not enough samples; + * otherwise, dictionary training won't start if the trainer is not ready + * @return the trained compression dictionary + */ + CompressionDictionary trainDictionary(boolean force); + + /** + * Trains and produces a compression dictionary from collected samples asynchronously. + * + * @param force force the dictionary training even if there are not enough samples + * @return CompletableFuture that completes when training is done + */ + default CompletableFuture<CompressionDictionary> trainDictionaryAsync(boolean force) + { + return CompletableFuture.supplyAsync(() -> trainDictionary(force), ScheduledExecutors.nonPeriodicTasks); + } + + /** + * @return true if enough samples have been collected for training + */ + boolean isReady(); + + /** + * Clears all collected samples and resets trainer state. + */ + void reset(); + + /** + * @return the current training status + */ + TrainingStatus getTrainingStatus(); + + /** + * @return the compression algorithm kind this trainer supports + */ + CompressionDictionary.Kind kind(); + + /** + * Determines if this trainer is compatible with the given compression parameters. + * This method allows the trainer to decide whether it can continue operating + * with new compression parameters or if a new trainer instance is needed. + * + * @param newParams the new compression parameters to check compatibility against + * @return true if this trainer is compatible with the new parameters, false otherwise + */ + boolean isCompatibleWith(CompressionParams newParams); + + /** + * Sets the listener for dictionary training events. + * + * @param listener the listener to be notified when dictionaries are trained, null to remove listener + */ + void setDictionaryTrainedListener(Consumer<CompressionDictionary> listener); + + /** + * Updates the sampling rate for this trainer. + * + * @param newSamplingRate the new sampling rate. For exmaple, 1 = sample every time (100%), + * 2 = expect sample 1/2 of data (50%), n = expect sample 1/n of data + */ + void updateSamplingRate(int newSamplingRate); + + /** + * Factory method to create appropriate trainer based on compression parameters. + * + * @param keyspaceName the keyspace name for logging + * @param tableName the table name for logging + * @param params the compression parameters + * @param config the training configuration + * @return a dictionary trainer for the specified compression algorithm + * @throws IllegalArgumentException if no dictionary trainer is available for the compression algorithm + */ + static ICompressionDictionaryTrainer create(String keyspaceName, + String tableName, + CompressionParams params, + CompressionDictionaryTrainingConfig config) + { + ICompressor compressor = params.getSstableCompressor(); + if (!(compressor instanceof IDictionaryCompressor)) + { + throw new IllegalArgumentException("Compressor does not support dictionary training: " + params.getSstableCompressor()); + } + + IDictionaryCompressor dictionaryCompressor = (IDictionaryCompressor) compressor; + if (dictionaryCompressor.acceptableDictionaryKind() == CompressionDictionary.Kind.ZSTD) Review Comment: we are zstd specific here, not sure that is really needed. ########## src/java/org/apache/cassandra/db/compression/CompressionDictionaryEventHandler.java: ########## @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db.compression; + +import org.apache.cassandra.concurrent.ScheduledExecutors; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.net.Message; +import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.net.Verb; +import org.apache.cassandra.schema.SystemDistributedKeyspace; +import org.apache.cassandra.tcm.ClusterMetadata; +import org.apache.cassandra.utils.FBUtilities; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collection; +import java.util.concurrent.CompletableFuture; + +/** + * Handles compression dictionary events including training completion and cluster notifications. + * <p> + * This class handles: + * - Broadcasting dictionary updates to cluster nodes + * - Retrieving new dictionaries when notified by other nodes + * - Managing dictionary cache updates + */ +public class CompressionDictionaryEventHandler implements ICompressionDictionaryEventHandler +{ + private static final Logger logger = LoggerFactory.getLogger(CompressionDictionaryEventHandler.class); + + private final ColumnFamilyStore cfs; + private final String keyspaceName; + private final String tableName; + private final ICompressionDictionaryCache cache; + + public CompressionDictionaryEventHandler(ColumnFamilyStore cfs, ICompressionDictionaryCache cache) + { + this.cfs = cfs; + this.keyspaceName = cfs.keyspace.getName(); + this.tableName = cfs.getTableName(); + this.cache = cache; + } + + @Override + public void onNewDictionaryTrained(long dictionaryId) + { + logger.info("Notifying cluster about dictionary update for {}.{} with dictionaryId {}", + keyspaceName, tableName, dictionaryId); + + CompressionDictionaryUpdateMessage message = new CompressionDictionaryUpdateMessage(cfs.metadata().id, dictionaryId); + Collection<InetAddressAndPort> allNodes = ClusterMetadata.current().directory.allJoinedEndpoints(); + // Broadcast notification using the fire-and-forget fashion + for (InetAddressAndPort node : allNodes) + { + if (node.equals(FBUtilities.getBroadcastAddressAndPort())) // skip ourself + continue; + sendNotification(node, message); + } + } + + @Override + public void onNewDictionaryAvailable(long dictId) + { + // Best effort to retrieve the dictionary; otherwise, the periodic task should retrieve the dictionary later + CompletableFuture.runAsync(() -> { + try + { + if (!cfs.metadata().params.compression.isDictionaryCompressionEnabled()) + { + return; + } + + CompressionDictionary dictionary = SystemDistributedKeyspace.retrieveCompressionDictionary(keyspaceName, tableName, dictId); + cache.setCurrentIfNewer(dictionary); + } + catch (Exception e) + { + logger.warn("Failed to retrieve compression dictionary for {}.{}. dictionaryId={}", + keyspaceName, tableName, dictId, e); + } + }, ScheduledExecutors.nonPeriodicTasks); + } + + // Best effort to notify the peer regarding the new dictionary being available to pull. + // If the request fails, each peer has periodic task scheduled to pull. + private void sendNotification(InetAddressAndPort target, CompressionDictionaryUpdateMessage message) + { + logger.debug("Sending dictionary update notification to {}", target); Review Comment: Could we be more verbose? Dictionary of what kind and id? Here `DictId` would come handy. Same for all other debug messages, like in the listener below. ########## src/java/org/apache/cassandra/db/compression/CompressionDictionaryScheduler.java: ########## @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db.compression; + +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +import com.google.common.annotations.VisibleForTesting; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.concurrent.ScheduledExecutors; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.db.compression.ICompressionDictionaryTrainer.TrainingStatus; +import org.apache.cassandra.schema.SystemDistributedKeyspace; + +/** + * Manages scheduled tasks for compression dictionary operations. + * <p> + * This class handles: + * - Periodic refresh of dictionaries from system tables + * - Manual training task scheduling and monitoring + * - Cleanup of scheduled tasks + */ +public class CompressionDictionaryScheduler implements ICompressionDictionaryScheduler +{ + private static final Logger logger = LoggerFactory.getLogger(CompressionDictionaryScheduler.class); + + private final String keyspaceName; + private final String tableName; + private final ICompressionDictionaryCache cache; + + private volatile ScheduledFuture<?> scheduledRefreshTask; + private volatile ScheduledFuture<?> scheduledManualTrainingTask; + private volatile boolean isEnabled; + + public CompressionDictionaryScheduler(String keyspaceName, + String tableName, + ICompressionDictionaryCache cache, + boolean isEnabled) + { + this.keyspaceName = keyspaceName; + this.tableName = tableName; + this.cache = cache; + this.isEnabled = isEnabled; + } + + /** + * Schedules the periodic dictionary refresh task if not already scheduled. + */ + public void scheduleRefreshTask() + { + if (scheduledRefreshTask != null) + return; + + this.scheduledRefreshTask = ScheduledExecutors.scheduledTasks.scheduleWithFixedDelay( + this::refreshDictionaryFromSystemTable, + DatabaseDescriptor.getCompressionDictionaryRefreshInitialDelaySeconds(), + DatabaseDescriptor.getCompressionDictionaryRefreshIntervalSeconds(), + TimeUnit.SECONDS + ); + } + + @Override + public void scheduleManualTraining(ManualTrainingOptions options, ICompressionDictionaryTrainer trainer) + { + if (scheduledManualTrainingTask != null) + { + throw new IllegalStateException("Training already in progress for table " + keyspaceName + '.' + tableName); + } + + int maxSamplingDurationSeconds = options.getMaxSamplingDurationSeconds(); + + logger.info("Starting manual dictionary training for {}.{} with max sampling duration: {} seconds", + keyspaceName, tableName, maxSamplingDurationSeconds); + + long deadlineMillis = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(maxSamplingDurationSeconds); + + ManualTrainingTask task = new ManualTrainingTask(deadlineMillis, trainer); + + // Check every second whether it gets enough samples and completes training + scheduledManualTrainingTask = ScheduledExecutors.scheduledTasks + .scheduleWithFixedDelay(task, 1, 1, TimeUnit.SECONDS); + } + + @Override + public void cancelManualTraining() + { + ScheduledFuture<?> future = scheduledManualTrainingTask; + if (future != null) + { + future.cancel(false); + } + scheduledManualTrainingTask = null; + } + + /** + * Sets the enabled state of the scheduler. When disabled, refresh tasks will not execute. + * + * @param enabled whether the scheduler should be enabled + */ + @Override + public void setEnabled(boolean enabled) + { + this.isEnabled = enabled; + } + + /** + * Refreshes dictionary from system table and updates the cache. + * This method is called periodically by the scheduled refresh task. + */ + private void refreshDictionaryFromSystemTable() + { + try + { + if (!isEnabled) + { + return; + } + + CompressionDictionary dictionary = SystemDistributedKeyspace.retrieveLatestCompressionDictionary(keyspaceName, tableName); + cache.setCurrentIfNewer(dictionary); + } + catch (Exception e) + { + logger.warn("Failed to refresh compression dictionary for {}.{}", + keyspaceName, tableName, e); + } + } + + @Override + public void close() + { + if (scheduledRefreshTask != null) + { + scheduledRefreshTask.cancel(false); + scheduledRefreshTask = null; + } + + if (scheduledManualTrainingTask != null) + { + scheduledManualTrainingTask.cancel(false); + scheduledManualTrainingTask = null; + } + } + + private class ManualTrainingTask implements Runnable + { + private final long deadlineMillis; + private final ICompressionDictionaryTrainer trainer; + private boolean isTraining = false; + + private ManualTrainingTask(long deadlineMillis, ICompressionDictionaryTrainer trainer) + { + this.deadlineMillis = deadlineMillis; + this.trainer = trainer; + } + + @Override + public void run() + { + if (trainer.getTrainingStatus() == TrainingStatus.NOT_STARTED) + { + logger.warn("Trainer is not started. Stop training dictionary for table {}.{}", keyspaceName, tableName); + cancelManualTraining(); + return; + } + + long now = System.currentTimeMillis(); Review Comment: same as above ########## src/java/org/apache/cassandra/db/compression/CompressionDictionaryCache.java: ########## @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db.compression; + +import java.time.Duration; +import java.util.concurrent.atomic.AtomicReference; +import javax.annotation.Nullable; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; +import com.github.benmanes.caffeine.cache.RemovalCause; +import org.apache.cassandra.config.DatabaseDescriptor; + +/** + * Manages caching and current dictionary state for compression dictionaries. + * <p> + * This class handles: + * - Local caching of compression dictionaries with automatic cleanup + * - Managing the current active dictionary for write operations + * - Thread-safe access to cached dictionaries + */ +public class CompressionDictionaryCache implements ICompressionDictionaryCache +{ + private static final Logger logger = LoggerFactory.getLogger(CompressionDictionaryCache.class); + + private final Cache<CompressionDictionary.DictId, CompressionDictionary> cache; + private final AtomicReference<CompressionDictionary> currentDictionary = new AtomicReference<>(); + + public CompressionDictionaryCache() + { + Duration expiryTime = Duration.ofSeconds(DatabaseDescriptor.getCompressionDictionaryCacheExpireSeconds()); Review Comment: can be just inlined into `expireAfterAccess`? .expireAfterAccess(Duration.ofSeconds(DatabaseDescriptor.getCompressionDictionaryCacheExpireSeconds())) We are not making special variable for `DatabaseDescriptor.getCompressionDictionaryCacheSize()` for `maximumSize` method. ########## src/java/org/apache/cassandra/db/compression/ICompressionDictionaryTrainer.java: ########## @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db.compression; + +import java.nio.ByteBuffer; +import java.util.concurrent.CompletableFuture; Review Comment: illegal import -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]

