smiklosovic commented on code in PR #4399:
URL: https://github.com/apache/cassandra/pull/4399#discussion_r2410508310


##########
src/java/org/apache/cassandra/db/compression/CompressionDictionary.java:
##########
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.compression;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.Objects;
+import javax.annotation.Nullable;
+
+import com.google.common.hash.Hasher;
+import com.google.common.hash.Hashing;
+
+import org.apache.cassandra.cql3.UntypedResultSet;
+
+public interface CompressionDictionary extends AutoCloseable
+{
+    /**
+     * Get the dictionary id
+     *
+     * @return dictionary id
+     */
+    DictId identifier();
+
+    /**
+     * Get the raw bytes of the compression dictionary
+     *
+     * @return raw compression dictionary
+     */
+    byte[] rawDictionary();
+
+    /**
+     * Get the kind of the compression algorithm
+     *
+     * @return compression algorithm kind
+     */
+    default Kind kind()
+    {
+        return identifier().kind;
+    }
+
+    /**
+     * Write compression dictionary to file
+     *
+     * @param out file output stream
+     * @throws IOException on any I/O exception when writing to the file
+     */
+    default void serialize(DataOutput out) throws IOException
+    {
+        DictId dictId = identifier();
+        int ordinal = dictId.kind.ordinal();
+        out.writeByte(ordinal);
+        out.writeLong(dictId.id);
+        byte[] dict = rawDictionary();
+        out.writeInt(dict.length);
+        out.write(dict);
+        int checksum = calculateChecksum((byte) ordinal, dictId.id, dict);
+        out.writeInt(checksum);
+    }
+
+    /**
+     * A factory method to create concrete CompressionDictionary from the file 
content
+     *
+     * @param input   file input stream
+     * @param manager compression dictionary manager that caches the 
dictionaries
+     * @return compression dictionary; otherwise, null if there is no 
dictionary
+     * @throws IOException on any I/O exception when reading from the file
+     */
+    @Nullable
+    static CompressionDictionary deserialize(DataInput input, @Nullable 
CompressionDictionaryManager manager) throws IOException
+    {
+        int kindOrdinal;
+        try
+        {
+            kindOrdinal = input.readByte();
+        }
+        catch (EOFException eof)
+        {
+            // no dictionary
+            return null;
+        }
+
+        if (kindOrdinal < 0 || kindOrdinal >= Kind.values().length)
+        {
+            throw new IOException("Invalid compression dictionary kind: " + 
kindOrdinal);
+        }
+        Kind kind = Kind.values()[kindOrdinal];
+        long id = input.readLong();
+        DictId dictId = new DictId(kind, id);
+
+        if (manager != null)
+        {
+            CompressionDictionary dictionary = manager.get(dictId);
+            if (dictionary != null)
+            {
+                return dictionary;
+            }
+        }
+
+        int length = input.readInt();
+        byte[] dict = new byte[length];
+        input.readFully(dict);
+        int checksum = input.readInt();
+        int calculatedChecksum = calculateChecksum((byte) kindOrdinal, id, 
dict);
+        if (checksum != calculatedChecksum)
+        {
+            throw new IOException("Compression dictionary checksum does not 
match");
+        }
+
+        CompressionDictionary dictionary = null;
+        if (kind == Kind.ZSTD)
+        {
+            dictionary = new ZstdCompressionDictionary(dictId, dict);
+        }
+
+        if (dictionary == null)
+        {
+            throw new IOException(kind + " compression dictionary is not 
created");
+        }
+
+        // update the dictionary manager if it exists
+        if (manager != null)
+        {
+            manager.add(dictionary);
+        }
+        return dictionary;
+    }
+
+    static CompressionDictionary createFromRow(UntypedResultSet.Row row)
+    {
+        String kindStr = row.getString("kind");
+        long id = row.getLong("dict_id");
+        byte[] dict = row.getByteArray("dict");
+        CompressionDictionary.DictId dictId = new 
CompressionDictionary.DictId(CompressionDictionary.Kind.valueOf(kindStr), id);

Review Comment:
   I think this whole logic is too complex. I would do this for Kind:
   
       enum Kind
       {
           // Order matters: the enum ordinal is serialized
           ZSTD
           {
               public CompressionDictionary getDictionary(DictId dictId, byte[] 
dict)
               {
                   return new ZstdCompressionDictionary(dictId, dict);
               }
           };
   
           public abstract CompressionDictionary 
getDictionary(CompressionDictionary.DictId dictId, byte[] dict);
       }
   
   And then 
   
       static CompressionDictionary createFromRow(UntypedResultSet.Row row)
       {
           String kindStr = row.getString("kind");
   
           Kind kind;
   
           try
           {
               kind = CompressionDictionary.Kind.valueOf(kindStr);
           }
           catch (IllegalArgumentException ex)
           {
               throw new IllegalStateException(kindStr + " compression 
dictionary is not created");
           }
   
           return kind.getDictionary(new DictId(kind, row.getLong("dict_id")), 
row.getByteArray("dict"));
       }
   
   
   The fact that we have successfully materialized `Kind` from `kind` string 
from `Row` pretty much guarantees that we can create a dictionary from it.  



##########
src/java/org/apache/cassandra/db/compression/CompressionDictionaryManager.java:
##########
@@ -0,0 +1,346 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.compression;
+
+import java.nio.ByteBuffer;
+import java.util.Map;
+import javax.annotation.Nullable;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import 
org.apache.cassandra.db.compression.ICompressionDictionaryTrainer.TrainingStatus;
+import org.apache.cassandra.schema.CompressionParams;
+import org.apache.cassandra.schema.SystemDistributedKeyspace;
+import org.apache.cassandra.utils.MBeanWrapper;
+import org.apache.cassandra.utils.MBeanWrapper.OnException;
+
+public class CompressionDictionaryManager implements 
CompressionDictionaryManagerMBean,
+                                                     
ICompressionDictionaryCache,
+                                                     
ICompressionDictionaryEventHandler,
+                                                     AutoCloseable
+{
+    private static final Logger logger = 
LoggerFactory.getLogger(CompressionDictionaryManager.class);
+
+    private final String keyspaceName;
+    private final String tableName;
+    private volatile boolean mbeanRegistered;
+    private volatile boolean isEnabled;
+
+    // Components
+    private final ICompressionDictionaryEventHandler eventHandler;
+    private final ICompressionDictionaryCache cache;
+    private final ICompressionDictionaryScheduler scheduler;
+    private ICompressionDictionaryTrainer trainer = null;
+
+    public CompressionDictionaryManager(ColumnFamilyStore columnFamilyStore, 
boolean registerBookkeeping)
+    {
+        this.keyspaceName = columnFamilyStore.keyspace.getName();
+        this.tableName = columnFamilyStore.getTableName();
+
+        this.isEnabled = 
columnFamilyStore.metadata().params.compression.isDictionaryCompressionEnabled();
+        this.cache = new CompressionDictionaryCache();
+        this.eventHandler = new 
CompressionDictionaryEventHandler(columnFamilyStore, cache);
+        this.scheduler = new CompressionDictionaryScheduler(keyspaceName, 
tableName, cache, isEnabled);
+        if (isEnabled)
+        {
+            // Initialize components
+            this.trainer = ICompressionDictionaryTrainer.create(keyspaceName, 
tableName,
+                                                                
columnFamilyStore.metadata().params.compression,
+                                                                
createTrainingConfig());
+            trainer.setDictionaryTrainedListener(this::handleNewDictionary);
+
+            scheduler.scheduleRefreshTask();
+
+            trainer.start(false);
+        }
+
+        if (registerBookkeeping)
+        {
+            MBeanWrapper.instance.registerMBean(this, mbeanName(keyspaceName, 
tableName));
+        }
+        mbeanRegistered = registerBookkeeping;
+    }
+
+    static String mbeanName(String keyspaceName, String tableName)
+    {
+        return 
"org.apache.cassandra.db.compression:type=CompressionDictionaryManager" +

Review Comment:
   this happens to be extracted into a constant like
   
   public static final String MBEAN_NAME = 
"org.apache.cassandra.hints:type=HintsService";
   
   for other MBeans. I would also move that constant to MBean interface itself 
and just referenced here. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to