yifan-c commented on code in PR #4458: URL: https://github.com/apache/cassandra/pull/4458#discussion_r2515901627
########## src/java/org/apache/cassandra/db/compression/CompressionDictionaryDetailsTabularData.java: ########## @@ -0,0 +1,304 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db.compression; + +import java.util.Arrays; +import javax.management.openmbean.ArrayType; +import javax.management.openmbean.CompositeData; +import javax.management.openmbean.CompositeDataSupport; +import javax.management.openmbean.CompositeType; +import javax.management.openmbean.OpenDataException; +import javax.management.openmbean.OpenType; +import javax.management.openmbean.SimpleType; +import javax.management.openmbean.TabularType; + +import org.apache.cassandra.db.compression.CompressionDictionary.LightweightCompressionDictionary; +import org.apache.cassandra.io.util.FileUtils; + +import static java.lang.String.format; + +public class CompressionDictionaryDetailsTabularData +{ + /** + * Position inside index names of tabular type of tabular data returned upon + * listing dictionaries where raw dictionary is expected to be located. + * We do not need to process this entry as listing does not contain any raw dictionary, + * only exporting does. + */ + public static final int TABULAR_DATA_TYPE_RAW_DICTIONARY_INDEX = 3; + + public static final String KEYSPACE_NAME = "Keyspace"; + public static final String TABLE_NAME = "Table"; + public static final String DICT_ID_NAME = "DictId"; + public static final String DICT_NAME = "Dict"; + public static final String KIND_NAME = "Kind"; + public static final String CHECKSUM_NAME = "Checksum"; + public static final String SIZE_NAME = "Size"; + + + private static final String[] ITEM_NAMES = new String[]{ KEYSPACE_NAME, + TABLE_NAME, + DICT_ID_NAME, + DICT_NAME, + KIND_NAME, + CHECKSUM_NAME, + SIZE_NAME }; + + private static final String[] ITEM_DESCS = new String[]{ "keyspace", + "table", + "dictionary_id", + "dictionary_bytes", + "kind", + "checksum", + "size" }; + + private static final String TYPE_NAME = "DictionaryDetails"; + private static final String ROW_DESC = "DictionaryDetails"; + private static final OpenType<?>[] ITEM_TYPES; + private static final CompositeType COMPOSITE_TYPE; + public static final TabularType TABULAR_TYPE; + + static + { + try + { + ITEM_TYPES = new OpenType[]{ SimpleType.STRING, // keyspace + SimpleType.STRING, // table + SimpleType.LONG, // dict id + new ArrayType<String[]>(SimpleType.BYTE, true), // dict bytes + SimpleType.STRING, // kind + SimpleType.INTEGER, // checksum + SimpleType.INTEGER }; // size of dict bytes + + COMPOSITE_TYPE = new CompositeType(TYPE_NAME, ROW_DESC, ITEM_NAMES, ITEM_DESCS, ITEM_TYPES); + TABULAR_TYPE = new TabularType(TYPE_NAME, ROW_DESC, COMPOSITE_TYPE, ITEM_NAMES); + } + catch (OpenDataException e) + { + throw new RuntimeException(e); + } + } + + /** + * This method is meant to be call when listing dictionaries, we do not need actual dictionary byte arrays. + * + * @param dictionary lightweight dictionary to create composite data from + * @return composite data representing dictionary + */ + public static CompositeData fromLightweightCompressionDictionary(LightweightCompressionDictionary dictionary) + { + try + { + return new CompositeDataSupport(COMPOSITE_TYPE, + ITEM_NAMES, + new Object[] + { + dictionary.keyspaceName, + dictionary.tableName, + dictionary.dictId.id, + null, // on purpose not returning actual dictionary + dictionary.dictId.kind.name(), + dictionary.checksum, + dictionary.size, + }); + } + catch (OpenDataException e) + { + throw new RuntimeException(e); + } + } + + /** + * Used upon exporting a dictionary. + * + * @param keyspace keyspace of a dictionary + * @param table table of a dictionary + * @param dictionary dictionary itself + * @return composite data representing dictionary + */ + public static CompositeData fromCompressionDictionary(String keyspace, String table, CompressionDictionary dictionary) + { + try + { + return new CompositeDataSupport(COMPOSITE_TYPE, + ITEM_NAMES, + new Object[] + { + keyspace, + table, + dictionary.dictId().id, + dictionary.rawDictionary(), + dictionary.kind().name(), + dictionary.checksum(), + dictionary.rawDictionary().length, + }); + } + catch (OpenDataException e) + { + throw new RuntimeException(e); + } + } + + /** + * Used upon deserialisation of data to composite data, e.g. upon importing a dictionary from JSON + * string to CompositeData before they are sent over JMX to import method. + * + * @param dataObject data class object to get composite data from + * @return composite data representing data class + */ + public static CompositeData fromCompressionDictionaryDataObject(CompressionDictionaryDataObject dataObject) + { + try + { + return new CompositeDataSupport(COMPOSITE_TYPE, + ITEM_NAMES, + new Object[] + { + dataObject.keyspace, + dataObject.table, + dataObject.dictId, + dataObject.dict, + dataObject.kind, + dataObject.dictChecksum, + dataObject.dictLength + }); + } + catch (OpenDataException e) + { + throw new RuntimeException(e); + } + } + + /** + * Deserializes data to convenience object to work further with. + * + * @param compositeData data to create data object from + * @return deserialized composite data to convenience object + * @throws IllegalArgumentException if values in deserialized object are invalid. + * @see CompressionDictionaryDataObject#validate() + */ + public static CompressionDictionaryDataObject fromCompositeData(CompositeData compositeData) + { + String keyspace = (String) compositeData.get(CompressionDictionaryDetailsTabularData.KEYSPACE_NAME); + String table = (String) compositeData.get(CompressionDictionaryDetailsTabularData.TABLE_NAME); + long dictId = (Long) compositeData.get(CompressionDictionaryDetailsTabularData.DICT_ID_NAME); + byte[] dictionaryBytes = (byte[]) compositeData.get(CompressionDictionaryDetailsTabularData.DICT_NAME); + String kind = (String) compositeData.get(CompressionDictionaryDetailsTabularData.KIND_NAME); + int checksum = (Integer) compositeData.get(CompressionDictionaryDetailsTabularData.CHECKSUM_NAME); + int size = (Integer) compositeData.get(CompressionDictionaryDetailsTabularData.SIZE_NAME); + + CompressionDictionaryDataObject dataObject = new CompressionDictionaryDataObject(); + dataObject.keyspace = keyspace; + dataObject.table = table; + dataObject.dictId = dictId; + dataObject.dict = dictionaryBytes; + dataObject.kind = kind; + dataObject.dictChecksum = checksum; + dataObject.dictLength = size; + + dataObject.validate(); + + return dataObject; + } + + public static class CompressionDictionaryDataObject implements Cloneable + { + public String keyspace; + public String table; + public long dictId; + public byte[] dict; + public String kind; + public int dictChecksum; + public int dictLength; + + /** + * An object of this class is considered to be valid if: + * + * <ul> + * <li>keyspace and table are not null</li> + * <li>dict id is lower than 0</li> + * <li>dict is not null nor empty</li> + * <li>dict lenght is less than or equal to 1MiB</li> + * <li>kind is not null and there is such {@link org.apache.cassandra.db.compression.CompressionDictionary.Kind}</li> + * <li>dictLenght is bigger than 0</li> + * <li>dictLenght has to be equal to dict's length</li> + * <li>dictChecksum has to be equal to checksum computed as part of this method</li> + * </ul> + */ + public void validate() + { + if (keyspace == null) + throw new IllegalArgumentException("keyspace not specified"); + if (table == null) + throw new IllegalArgumentException("table not specified"); + if (dictId <= 0) + throw new IllegalArgumentException("Provided dictionary id is lower than 0, it is '" + dictId + "'.'"); Review Comment: The condition is `dictId <=0`, but the error message says `lower than 0`. The message is incorrect. Maybe change to "Dictionary id must be positive. The provided is {}" The ending `"'.'"` seems wrong too. ########## src/java/org/apache/cassandra/db/compression/CompressionDictionaryDetailsTabularData.java: ########## @@ -0,0 +1,304 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db.compression; + +import java.util.Arrays; +import javax.management.openmbean.ArrayType; +import javax.management.openmbean.CompositeData; +import javax.management.openmbean.CompositeDataSupport; +import javax.management.openmbean.CompositeType; +import javax.management.openmbean.OpenDataException; +import javax.management.openmbean.OpenType; +import javax.management.openmbean.SimpleType; +import javax.management.openmbean.TabularType; + +import org.apache.cassandra.db.compression.CompressionDictionary.LightweightCompressionDictionary; +import org.apache.cassandra.io.util.FileUtils; + +import static java.lang.String.format; + +public class CompressionDictionaryDetailsTabularData +{ + /** + * Position inside index names of tabular type of tabular data returned upon + * listing dictionaries where raw dictionary is expected to be located. + * We do not need to process this entry as listing does not contain any raw dictionary, + * only exporting does. + */ + public static final int TABULAR_DATA_TYPE_RAW_DICTIONARY_INDEX = 3; + + public static final String KEYSPACE_NAME = "Keyspace"; + public static final String TABLE_NAME = "Table"; + public static final String DICT_ID_NAME = "DictId"; + public static final String DICT_NAME = "Dict"; + public static final String KIND_NAME = "Kind"; + public static final String CHECKSUM_NAME = "Checksum"; + public static final String SIZE_NAME = "Size"; + + + private static final String[] ITEM_NAMES = new String[]{ KEYSPACE_NAME, + TABLE_NAME, + DICT_ID_NAME, + DICT_NAME, + KIND_NAME, + CHECKSUM_NAME, + SIZE_NAME }; + + private static final String[] ITEM_DESCS = new String[]{ "keyspace", + "table", + "dictionary_id", + "dictionary_bytes", + "kind", + "checksum", + "size" }; + + private static final String TYPE_NAME = "DictionaryDetails"; + private static final String ROW_DESC = "DictionaryDetails"; + private static final OpenType<?>[] ITEM_TYPES; + private static final CompositeType COMPOSITE_TYPE; + public static final TabularType TABULAR_TYPE; + + static + { + try + { + ITEM_TYPES = new OpenType[]{ SimpleType.STRING, // keyspace + SimpleType.STRING, // table + SimpleType.LONG, // dict id + new ArrayType<String[]>(SimpleType.BYTE, true), // dict bytes + SimpleType.STRING, // kind + SimpleType.INTEGER, // checksum + SimpleType.INTEGER }; // size of dict bytes + + COMPOSITE_TYPE = new CompositeType(TYPE_NAME, ROW_DESC, ITEM_NAMES, ITEM_DESCS, ITEM_TYPES); + TABULAR_TYPE = new TabularType(TYPE_NAME, ROW_DESC, COMPOSITE_TYPE, ITEM_NAMES); + } + catch (OpenDataException e) + { + throw new RuntimeException(e); + } + } + + /** + * This method is meant to be call when listing dictionaries, we do not need actual dictionary byte arrays. + * + * @param dictionary lightweight dictionary to create composite data from + * @return composite data representing dictionary + */ + public static CompositeData fromLightweightCompressionDictionary(LightweightCompressionDictionary dictionary) + { + try + { + return new CompositeDataSupport(COMPOSITE_TYPE, + ITEM_NAMES, + new Object[] + { + dictionary.keyspaceName, + dictionary.tableName, + dictionary.dictId.id, + null, // on purpose not returning actual dictionary + dictionary.dictId.kind.name(), + dictionary.checksum, + dictionary.size, + }); + } + catch (OpenDataException e) + { + throw new RuntimeException(e); + } + } + + /** + * Used upon exporting a dictionary. + * + * @param keyspace keyspace of a dictionary + * @param table table of a dictionary + * @param dictionary dictionary itself + * @return composite data representing dictionary + */ + public static CompositeData fromCompressionDictionary(String keyspace, String table, CompressionDictionary dictionary) + { + try + { + return new CompositeDataSupport(COMPOSITE_TYPE, + ITEM_NAMES, + new Object[] + { + keyspace, + table, + dictionary.dictId().id, + dictionary.rawDictionary(), + dictionary.kind().name(), + dictionary.checksum(), + dictionary.rawDictionary().length, + }); + } + catch (OpenDataException e) + { + throw new RuntimeException(e); + } + } + + /** + * Used upon deserialisation of data to composite data, e.g. upon importing a dictionary from JSON + * string to CompositeData before they are sent over JMX to import method. + * + * @param dataObject data class object to get composite data from + * @return composite data representing data class + */ + public static CompositeData fromCompressionDictionaryDataObject(CompressionDictionaryDataObject dataObject) + { + try + { + return new CompositeDataSupport(COMPOSITE_TYPE, + ITEM_NAMES, + new Object[] + { + dataObject.keyspace, + dataObject.table, + dataObject.dictId, + dataObject.dict, + dataObject.kind, + dataObject.dictChecksum, + dataObject.dictLength + }); + } + catch (OpenDataException e) + { + throw new RuntimeException(e); + } + } + + /** + * Deserializes data to convenience object to work further with. + * + * @param compositeData data to create data object from + * @return deserialized composite data to convenience object + * @throws IllegalArgumentException if values in deserialized object are invalid. + * @see CompressionDictionaryDataObject#validate() + */ + public static CompressionDictionaryDataObject fromCompositeData(CompositeData compositeData) + { + String keyspace = (String) compositeData.get(CompressionDictionaryDetailsTabularData.KEYSPACE_NAME); + String table = (String) compositeData.get(CompressionDictionaryDetailsTabularData.TABLE_NAME); + long dictId = (Long) compositeData.get(CompressionDictionaryDetailsTabularData.DICT_ID_NAME); + byte[] dictionaryBytes = (byte[]) compositeData.get(CompressionDictionaryDetailsTabularData.DICT_NAME); + String kind = (String) compositeData.get(CompressionDictionaryDetailsTabularData.KIND_NAME); + int checksum = (Integer) compositeData.get(CompressionDictionaryDetailsTabularData.CHECKSUM_NAME); + int size = (Integer) compositeData.get(CompressionDictionaryDetailsTabularData.SIZE_NAME); + + CompressionDictionaryDataObject dataObject = new CompressionDictionaryDataObject(); + dataObject.keyspace = keyspace; + dataObject.table = table; + dataObject.dictId = dictId; + dataObject.dict = dictionaryBytes; + dataObject.kind = kind; + dataObject.dictChecksum = checksum; + dataObject.dictLength = size; + + dataObject.validate(); + + return dataObject; + } + + public static class CompressionDictionaryDataObject implements Cloneable + { + public String keyspace; + public String table; + public long dictId; + public byte[] dict; + public String kind; + public int dictChecksum; + public int dictLength; + + /** + * An object of this class is considered to be valid if: + * + * <ul> + * <li>keyspace and table are not null</li> + * <li>dict id is lower than 0</li> + * <li>dict is not null nor empty</li> + * <li>dict lenght is less than or equal to 1MiB</li> Review Comment: typo: ```suggestion * <li>dict length is less than or equal to 1MiB</li> ``` ########## src/java/org/apache/cassandra/tools/NodeProbe.java: ########## @@ -2698,18 +2699,86 @@ public void setMixedMajorVersionRepairEnabled(boolean enabled) */ public void trainCompressionDictionary(String keyspace, String table, boolean force) throws IOException { - CompressionDictionaryManagerMBean proxy = getDictionaryManagerProxy(keyspace, table); + doWithCompressionDictionaryManagerMBean(proxy -> { proxy.train(force); return null; }, keyspace, table); Review Comment: I think it is only my IDE being confused. It prompts that proxy could be null, which I do not believe it is true. ########## src/java/org/apache/cassandra/tools/nodetool/CompressionDictionaryCommandGroup.java: ########## @@ -0,0 +1,295 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.tools.nodetool; + +import java.io.PrintStream; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; +import javax.management.openmbean.CompositeData; +import javax.management.openmbean.TabularData; + +import com.google.common.util.concurrent.Uninterruptibles; + +import org.apache.cassandra.db.compression.CompressionDictionaryDetailsTabularData; +import org.apache.cassandra.db.compression.CompressionDictionaryDetailsTabularData.CompressionDictionaryDataObject; +import org.apache.cassandra.db.compression.ICompressionDictionaryTrainer.TrainingStatus; +import org.apache.cassandra.db.compression.TrainingState; +import org.apache.cassandra.io.util.File; +import org.apache.cassandra.io.util.FileUtils; +import org.apache.cassandra.tools.NodeProbe; +import org.apache.cassandra.tools.nodetool.formatter.TableBuilder; +import org.apache.cassandra.utils.Clock; +import org.apache.cassandra.utils.JsonUtils; +import picocli.CommandLine.Command; +import picocli.CommandLine.Option; +import picocli.CommandLine.Parameters; + +import static java.lang.System.lineSeparator; +import static java.nio.file.StandardOpenOption.CREATE; +import static java.nio.file.StandardOpenOption.TRUNCATE_EXISTING; +import static java.nio.file.StandardOpenOption.WRITE; +import static java.util.stream.Collectors.joining; + +@Command(name = "compressiondictionary", + description = "Manage compression dictionaries", + subcommands = { CompressionDictionaryCommandGroup.TrainDictionary.class, + CompressionDictionaryCommandGroup.ListDictionaries.class, + CompressionDictionaryCommandGroup.ExportDictionary.class, + CompressionDictionaryCommandGroup.ImportDictionary.class }) +public class CompressionDictionaryCommandGroup +{ + @Command(name = "train", + description = "Manually trigger compression dictionary training for a table. If no SSTables are available, the memtable will be flushed first.") + public static class TrainDictionary extends AbstractCommand + { + @Parameters(index = "0", description = "The keyspace name", arity = "1") + private String keyspace; + + @Parameters(index = "1", description = "The table name", arity = "1") + private String table; + + @Option(names = { "-f", "--force" }, description = "Force the dictionary training even if there are not enough samples") + private boolean force = false; + + @Override + public void execute(NodeProbe probe) + { + PrintStream out = probe.output().out; + PrintStream err = probe.output().err; + + try + { + out.printf("Starting compression dictionary training for %s.%s...%n", keyspace, table); + out.printf("Training from existing SSTables (flushing first if needed)%n"); + + probe.trainCompressionDictionary(keyspace, table, force); + + // Wait for training completion (10 minutes timeout for SSTable-based training) + out.println("Sampling from existing SSTables and training."); + long maxWaitMillis = TimeUnit.MINUTES.toMillis(10); + long startTime = Clock.Global.currentTimeMillis(); + + while (Clock.Global.currentTimeMillis() - startTime < maxWaitMillis) + { + TrainingState trainingState = probe.getCompressionDictionaryTrainingState(keyspace, table); + TrainingStatus status = trainingState.getStatus(); + displayProgress(trainingState, startTime, out, status); + if (TrainingStatus.COMPLETED == status) + { + out.printf("%nTraining completed successfully for %s.%s%n", keyspace, table); + return; + } + else if (TrainingStatus.FAILED == status) + { + err.printf("%nTraining failed for %s.%s%n", keyspace, table); + try + { + String failureMessage = trainingState.getFailureMessage(); + if (failureMessage != null && !failureMessage.isEmpty()) + { + err.printf("Reason: %s%n", failureMessage); + } + } + catch (Exception e) + { + // If we can't get the failure message, just continue without it + } + System.exit(1); + } + + Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS); + } + + err.printf("%nTraining did not complete within expected timeframe (10 minutes).%n"); + System.exit(1); + } + catch (Exception e) + { + err.printf("Failed to trigger training: %s%n", e.getMessage()); + System.exit(1); + } + } + + private static void displayProgress(TrainingState trainingState, long startTime, PrintStream out, TrainingStatus status) + { + // Display meaningful statistics + long sampleCount = trainingState.getSampleCount(); + long totalSampleSize = trainingState.getTotalSampleSize(); + long elapsedSeconds = (Clock.Global.currentTimeMillis() - startTime) / 1000; + double sampleSizeMB = totalSampleSize / (1024.0 * 1024.0); + + out.printf("\rStatus: %s | Samples: %d | Size: %.2f MiB | Elapsed: %ds", + status, sampleCount, sampleSizeMB, elapsedSeconds); + } + } + + @Command(name = "list", + description = "List available dictionaries of specific keyspace and table.") + public static class ListDictionaries extends AbstractCommand + { + @Parameters(index = "0", description = "The keyspace name", arity = "1") + private String keyspace; + + @Parameters(index = "1", description = "The table name", arity = "1") + private String table; + + @Override + protected void execute(NodeProbe probe) + { + try + { + TableBuilder tableBuilder = new TableBuilder(); + TabularData tabularData = probe.listCompressionDictionaries(keyspace, table); + List<String> indexNames = tabularData.getTabularType().getIndexNames(); + + List<String> columns = new ArrayList<>(indexNames); + // ignore raw dict + columns.remove(CompressionDictionaryDetailsTabularData.TABULAR_DATA_TYPE_RAW_DICTIONARY_INDEX); + tableBuilder.add(columns); + + for (Object eachDict : tabularData.keySet()) + { + final List<?> dictRow = (List<?>) eachDict; + + List<String> rowValues = new ArrayList<>(); + + for (int i = 0; i < dictRow.size(); i++) + { + if (i == 3) // ignore raw dict + continue; + + rowValues.add(dictRow.get(i).toString()); + } + tableBuilder.add(rowValues); + } + + tableBuilder.printTo(probe.output().out); + } + catch (Exception e) + { + probe.output().err.printf("Failed to list dictionaries: %s%n", e.getMessage()); + System.exit(1); + } + } + } + + @Command(name = "export", + description = "Export dictionary from Cassandra to local file.") + public static class ExportDictionary extends AbstractCommand + { + @Parameters(index = "0", description = "The keyspace name", arity = "1") + private String keyspace; + + @Parameters(index = "1", description = "The table name", arity = "1") + private String table; + + @Parameters(index = "2", description = "File name to save dictionary to", arity = "1") + private String dictionaryPath; + + @Option(paramLabel = "dictId", names = { "-i", "--id" }, + description = "The dictionary id. When not specified, the current dictionary is returned.") + private long dictId = -1; + + @Override + protected void execute(NodeProbe probe) + { + if (dictId <= 0 && dictId != -1) + { + probe.output().err.printf("Dictionary id has to be strictly positive number.%n"); + System.exit(1); + } + + try + { + CompositeData compressionDictionary; + + if (dictId == -1) + { + compressionDictionary = probe.getCompressionDictionary(keyspace, table); + } + else + { + compressionDictionary = probe.getCompressionDictionary(keyspace, table, dictId); + } + + if (compressionDictionary == null) + { + probe.output().err.printf("Dictionary does not exist.%n"); + System.exit(1); + } + + CompressionDictionaryDataObject dataObject = CompressionDictionaryDetailsTabularData.fromCompositeData(compressionDictionary); + String dictionary = JsonUtils.writeAsPrettyJsonString(dataObject); + FileUtils.write(new File(dictionaryPath), List.of(dictionary), CREATE, TRUNCATE_EXISTING, WRITE); + } + catch (Throwable e) + { + probe.output().err.printf("Failed to export dictionary: %s%n", e.getMessage()); + System.exit(1); + } + } + } + + @Command(name = "import", + description = "Import local dictionary to Cassandra.") + public static class ImportDictionary extends AbstractCommand Review Comment: Since import runs on the local node, there could still be concurrent imports from other nodes, even though we synchronize locally. We should probably document this somewhere. When imports happen from multiple nodes, the highest-version dictionary wins. ########## src/java/org/apache/cassandra/db/compression/CompressionDictionaryDetailsTabularData.java: ########## @@ -0,0 +1,304 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db.compression; + +import java.util.Arrays; +import javax.management.openmbean.ArrayType; +import javax.management.openmbean.CompositeData; +import javax.management.openmbean.CompositeDataSupport; +import javax.management.openmbean.CompositeType; +import javax.management.openmbean.OpenDataException; +import javax.management.openmbean.OpenType; +import javax.management.openmbean.SimpleType; +import javax.management.openmbean.TabularType; + +import org.apache.cassandra.db.compression.CompressionDictionary.LightweightCompressionDictionary; +import org.apache.cassandra.io.util.FileUtils; + +import static java.lang.String.format; + +public class CompressionDictionaryDetailsTabularData +{ + /** + * Position inside index names of tabular type of tabular data returned upon + * listing dictionaries where raw dictionary is expected to be located. + * We do not need to process this entry as listing does not contain any raw dictionary, + * only exporting does. + */ + public static final int TABULAR_DATA_TYPE_RAW_DICTIONARY_INDEX = 3; + + public static final String KEYSPACE_NAME = "Keyspace"; + public static final String TABLE_NAME = "Table"; + public static final String DICT_ID_NAME = "DictId"; + public static final String DICT_NAME = "Dict"; + public static final String KIND_NAME = "Kind"; + public static final String CHECKSUM_NAME = "Checksum"; + public static final String SIZE_NAME = "Size"; + + + private static final String[] ITEM_NAMES = new String[]{ KEYSPACE_NAME, + TABLE_NAME, + DICT_ID_NAME, + DICT_NAME, + KIND_NAME, + CHECKSUM_NAME, + SIZE_NAME }; + + private static final String[] ITEM_DESCS = new String[]{ "keyspace", + "table", + "dictionary_id", + "dictionary_bytes", + "kind", + "checksum", + "size" }; + + private static final String TYPE_NAME = "DictionaryDetails"; + private static final String ROW_DESC = "DictionaryDetails"; + private static final OpenType<?>[] ITEM_TYPES; + private static final CompositeType COMPOSITE_TYPE; + public static final TabularType TABULAR_TYPE; + + static + { + try + { + ITEM_TYPES = new OpenType[]{ SimpleType.STRING, // keyspace + SimpleType.STRING, // table + SimpleType.LONG, // dict id + new ArrayType<String[]>(SimpleType.BYTE, true), // dict bytes + SimpleType.STRING, // kind + SimpleType.INTEGER, // checksum + SimpleType.INTEGER }; // size of dict bytes + + COMPOSITE_TYPE = new CompositeType(TYPE_NAME, ROW_DESC, ITEM_NAMES, ITEM_DESCS, ITEM_TYPES); + TABULAR_TYPE = new TabularType(TYPE_NAME, ROW_DESC, COMPOSITE_TYPE, ITEM_NAMES); + } + catch (OpenDataException e) + { + throw new RuntimeException(e); + } + } + + /** + * This method is meant to be call when listing dictionaries, we do not need actual dictionary byte arrays. + * + * @param dictionary lightweight dictionary to create composite data from + * @return composite data representing dictionary + */ + public static CompositeData fromLightweightCompressionDictionary(LightweightCompressionDictionary dictionary) + { + try + { + return new CompositeDataSupport(COMPOSITE_TYPE, + ITEM_NAMES, + new Object[] + { + dictionary.keyspaceName, + dictionary.tableName, + dictionary.dictId.id, + null, // on purpose not returning actual dictionary + dictionary.dictId.kind.name(), + dictionary.checksum, + dictionary.size, + }); + } + catch (OpenDataException e) + { + throw new RuntimeException(e); + } + } + + /** + * Used upon exporting a dictionary. + * + * @param keyspace keyspace of a dictionary + * @param table table of a dictionary + * @param dictionary dictionary itself + * @return composite data representing dictionary + */ + public static CompositeData fromCompressionDictionary(String keyspace, String table, CompressionDictionary dictionary) + { + try + { + return new CompositeDataSupport(COMPOSITE_TYPE, + ITEM_NAMES, + new Object[] + { + keyspace, + table, + dictionary.dictId().id, + dictionary.rawDictionary(), + dictionary.kind().name(), + dictionary.checksum(), + dictionary.rawDictionary().length, + }); + } + catch (OpenDataException e) + { + throw new RuntimeException(e); + } + } + + /** + * Used upon deserialisation of data to composite data, e.g. upon importing a dictionary from JSON + * string to CompositeData before they are sent over JMX to import method. + * + * @param dataObject data class object to get composite data from + * @return composite data representing data class + */ + public static CompositeData fromCompressionDictionaryDataObject(CompressionDictionaryDataObject dataObject) + { + try + { + return new CompositeDataSupport(COMPOSITE_TYPE, + ITEM_NAMES, + new Object[] + { + dataObject.keyspace, + dataObject.table, + dataObject.dictId, + dataObject.dict, + dataObject.kind, + dataObject.dictChecksum, + dataObject.dictLength + }); + } + catch (OpenDataException e) + { + throw new RuntimeException(e); + } + } + + /** + * Deserializes data to convenience object to work further with. + * + * @param compositeData data to create data object from + * @return deserialized composite data to convenience object + * @throws IllegalArgumentException if values in deserialized object are invalid. + * @see CompressionDictionaryDataObject#validate() + */ + public static CompressionDictionaryDataObject fromCompositeData(CompositeData compositeData) + { + String keyspace = (String) compositeData.get(CompressionDictionaryDetailsTabularData.KEYSPACE_NAME); + String table = (String) compositeData.get(CompressionDictionaryDetailsTabularData.TABLE_NAME); + long dictId = (Long) compositeData.get(CompressionDictionaryDetailsTabularData.DICT_ID_NAME); + byte[] dictionaryBytes = (byte[]) compositeData.get(CompressionDictionaryDetailsTabularData.DICT_NAME); + String kind = (String) compositeData.get(CompressionDictionaryDetailsTabularData.KIND_NAME); + int checksum = (Integer) compositeData.get(CompressionDictionaryDetailsTabularData.CHECKSUM_NAME); + int size = (Integer) compositeData.get(CompressionDictionaryDetailsTabularData.SIZE_NAME); + + CompressionDictionaryDataObject dataObject = new CompressionDictionaryDataObject(); + dataObject.keyspace = keyspace; + dataObject.table = table; + dataObject.dictId = dictId; + dataObject.dict = dictionaryBytes; + dataObject.kind = kind; + dataObject.dictChecksum = checksum; + dataObject.dictLength = size; + + dataObject.validate(); + + return dataObject; + } + + public static class CompressionDictionaryDataObject implements Cloneable + { + public String keyspace; + public String table; + public long dictId; + public byte[] dict; + public String kind; + public int dictChecksum; + public int dictLength; + + /** + * An object of this class is considered to be valid if: + * + * <ul> + * <li>keyspace and table are not null</li> + * <li>dict id is lower than 0</li> + * <li>dict is not null nor empty</li> + * <li>dict lenght is less than or equal to 1MiB</li> + * <li>kind is not null and there is such {@link org.apache.cassandra.db.compression.CompressionDictionary.Kind}</li> + * <li>dictLenght is bigger than 0</li> + * <li>dictLenght has to be equal to dict's length</li> + * <li>dictChecksum has to be equal to checksum computed as part of this method</li> + * </ul> + */ + public void validate() + { + if (keyspace == null) + throw new IllegalArgumentException("keyspace not specified"); + if (table == null) + throw new IllegalArgumentException("table not specified"); + if (dictId <= 0) + throw new IllegalArgumentException("Provided dictionary id is lower than 0, it is '" + dictId + "'.'"); + if (dict == null || dict.length == 0) + throw new IllegalArgumentException("Provided dictionary byte array is null or empty."); + if (dict.length > FileUtils.ONE_MIB) + throw new IllegalArgumentException("Imported dictionary can not be larger than " + + FileUtils.ONE_MIB + " bytes, but it is " + + dict.length + " bytes."); + if (kind == null) + throw new IllegalArgumentException("Provided kind is null."); + + CompressionDictionary.Kind dictionaryKind; + + try + { + dictionaryKind = CompressionDictionary.Kind.valueOf(kind); + } + catch (IllegalArgumentException ex) + { + throw new IllegalArgumentException("There is no such dictionary kind like '" + kind + "'. Available kinds: " + Arrays.asList(CompressionDictionary.Kind.values())); + } + + if (dictLength <= 0) + throw new IllegalArgumentException("Size has to be strictly positive number, it is '" + dictLength + "'."); + if (dict.length != dictLength) + throw new IllegalArgumentException("The length of the provided dictionary array (" + dict.length + ") is not equal to provided length value (" + dictLength + ")."); + + int checksumOfDictionaryToImport = CompressionDictionary.calculateChecksum((byte) dictionaryKind.ordinal(), dictId, dict); + if (checksumOfDictionaryToImport != dictChecksum) + { + throw new IllegalArgumentException(format("Computed checksum of dictionary to import (%s) is different from checksum specified on input (%s).", + checksumOfDictionaryToImport, + dictChecksum)); + } + } + + @Override + public CompressionDictionaryDataObject clone() + { + try + { + CompressionDictionaryDataObject clone = (CompressionDictionaryDataObject) super.clone(); + if (this.dict != null) + { + clone.dict = this.dict.clone(); + } + return clone; + } + catch (CloneNotSupportedException e) + { + throw new AssertionError("Clone not supported", e); + } + } + } Review Comment: Why is this method and `Cloneable` necessary? Looks like it is only used in test code `org.apache.cassandra.db.compression.CompressionDictionaryDataObjectTest#assertInvalid` ########## src/java/org/apache/cassandra/db/compression/CompressionDictionaryDetailsTabularData.java: ########## @@ -0,0 +1,304 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db.compression; + +import java.util.Arrays; +import javax.management.openmbean.ArrayType; +import javax.management.openmbean.CompositeData; +import javax.management.openmbean.CompositeDataSupport; +import javax.management.openmbean.CompositeType; +import javax.management.openmbean.OpenDataException; +import javax.management.openmbean.OpenType; +import javax.management.openmbean.SimpleType; +import javax.management.openmbean.TabularType; + +import org.apache.cassandra.db.compression.CompressionDictionary.LightweightCompressionDictionary; +import org.apache.cassandra.io.util.FileUtils; + +import static java.lang.String.format; + +public class CompressionDictionaryDetailsTabularData +{ + /** + * Position inside index names of tabular type of tabular data returned upon + * listing dictionaries where raw dictionary is expected to be located. + * We do not need to process this entry as listing does not contain any raw dictionary, + * only exporting does. + */ + public static final int TABULAR_DATA_TYPE_RAW_DICTIONARY_INDEX = 3; + + public static final String KEYSPACE_NAME = "Keyspace"; + public static final String TABLE_NAME = "Table"; + public static final String DICT_ID_NAME = "DictId"; + public static final String DICT_NAME = "Dict"; + public static final String KIND_NAME = "Kind"; + public static final String CHECKSUM_NAME = "Checksum"; + public static final String SIZE_NAME = "Size"; + + + private static final String[] ITEM_NAMES = new String[]{ KEYSPACE_NAME, + TABLE_NAME, + DICT_ID_NAME, + DICT_NAME, + KIND_NAME, + CHECKSUM_NAME, + SIZE_NAME }; + + private static final String[] ITEM_DESCS = new String[]{ "keyspace", + "table", + "dictionary_id", + "dictionary_bytes", + "kind", + "checksum", + "size" }; + + private static final String TYPE_NAME = "DictionaryDetails"; + private static final String ROW_DESC = "DictionaryDetails"; + private static final OpenType<?>[] ITEM_TYPES; + private static final CompositeType COMPOSITE_TYPE; + public static final TabularType TABULAR_TYPE; + + static + { + try + { + ITEM_TYPES = new OpenType[]{ SimpleType.STRING, // keyspace + SimpleType.STRING, // table + SimpleType.LONG, // dict id + new ArrayType<String[]>(SimpleType.BYTE, true), // dict bytes + SimpleType.STRING, // kind + SimpleType.INTEGER, // checksum + SimpleType.INTEGER }; // size of dict bytes + + COMPOSITE_TYPE = new CompositeType(TYPE_NAME, ROW_DESC, ITEM_NAMES, ITEM_DESCS, ITEM_TYPES); + TABULAR_TYPE = new TabularType(TYPE_NAME, ROW_DESC, COMPOSITE_TYPE, ITEM_NAMES); + } + catch (OpenDataException e) + { + throw new RuntimeException(e); + } + } + + /** + * This method is meant to be call when listing dictionaries, we do not need actual dictionary byte arrays. + * + * @param dictionary lightweight dictionary to create composite data from + * @return composite data representing dictionary + */ + public static CompositeData fromLightweightCompressionDictionary(LightweightCompressionDictionary dictionary) + { + try + { + return new CompositeDataSupport(COMPOSITE_TYPE, + ITEM_NAMES, + new Object[] + { + dictionary.keyspaceName, + dictionary.tableName, + dictionary.dictId.id, + null, // on purpose not returning actual dictionary + dictionary.dictId.kind.name(), + dictionary.checksum, + dictionary.size, + }); + } + catch (OpenDataException e) + { + throw new RuntimeException(e); + } + } + + /** + * Used upon exporting a dictionary. + * + * @param keyspace keyspace of a dictionary + * @param table table of a dictionary + * @param dictionary dictionary itself + * @return composite data representing dictionary + */ + public static CompositeData fromCompressionDictionary(String keyspace, String table, CompressionDictionary dictionary) + { + try + { + return new CompositeDataSupport(COMPOSITE_TYPE, + ITEM_NAMES, + new Object[] + { + keyspace, + table, + dictionary.dictId().id, + dictionary.rawDictionary(), + dictionary.kind().name(), + dictionary.checksum(), + dictionary.rawDictionary().length, + }); + } + catch (OpenDataException e) + { + throw new RuntimeException(e); + } + } + + /** + * Used upon deserialisation of data to composite data, e.g. upon importing a dictionary from JSON + * string to CompositeData before they are sent over JMX to import method. + * + * @param dataObject data class object to get composite data from + * @return composite data representing data class + */ + public static CompositeData fromCompressionDictionaryDataObject(CompressionDictionaryDataObject dataObject) + { + try + { + return new CompositeDataSupport(COMPOSITE_TYPE, + ITEM_NAMES, + new Object[] + { + dataObject.keyspace, + dataObject.table, + dataObject.dictId, + dataObject.dict, + dataObject.kind, + dataObject.dictChecksum, + dataObject.dictLength + }); + } + catch (OpenDataException e) + { + throw new RuntimeException(e); + } + } + + /** + * Deserializes data to convenience object to work further with. + * + * @param compositeData data to create data object from + * @return deserialized composite data to convenience object + * @throws IllegalArgumentException if values in deserialized object are invalid. + * @see CompressionDictionaryDataObject#validate() + */ + public static CompressionDictionaryDataObject fromCompositeData(CompositeData compositeData) + { + String keyspace = (String) compositeData.get(CompressionDictionaryDetailsTabularData.KEYSPACE_NAME); + String table = (String) compositeData.get(CompressionDictionaryDetailsTabularData.TABLE_NAME); + long dictId = (Long) compositeData.get(CompressionDictionaryDetailsTabularData.DICT_ID_NAME); + byte[] dictionaryBytes = (byte[]) compositeData.get(CompressionDictionaryDetailsTabularData.DICT_NAME); + String kind = (String) compositeData.get(CompressionDictionaryDetailsTabularData.KIND_NAME); + int checksum = (Integer) compositeData.get(CompressionDictionaryDetailsTabularData.CHECKSUM_NAME); + int size = (Integer) compositeData.get(CompressionDictionaryDetailsTabularData.SIZE_NAME); + + CompressionDictionaryDataObject dataObject = new CompressionDictionaryDataObject(); + dataObject.keyspace = keyspace; + dataObject.table = table; + dataObject.dictId = dictId; + dataObject.dict = dictionaryBytes; + dataObject.kind = kind; + dataObject.dictChecksum = checksum; + dataObject.dictLength = size; + + dataObject.validate(); + + return dataObject; + } + + public static class CompressionDictionaryDataObject implements Cloneable + { + public String keyspace; + public String table; + public long dictId; + public byte[] dict; + public String kind; + public int dictChecksum; + public int dictLength; Review Comment: nit: add `final` modifier to those public fields. The change would require to add a constructor though ########## src/java/org/apache/cassandra/db/compression/CompressionDictionaryDetailsTabularData.java: ########## @@ -0,0 +1,304 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db.compression; + +import java.util.Arrays; +import javax.management.openmbean.ArrayType; +import javax.management.openmbean.CompositeData; +import javax.management.openmbean.CompositeDataSupport; +import javax.management.openmbean.CompositeType; +import javax.management.openmbean.OpenDataException; +import javax.management.openmbean.OpenType; +import javax.management.openmbean.SimpleType; +import javax.management.openmbean.TabularType; + +import org.apache.cassandra.db.compression.CompressionDictionary.LightweightCompressionDictionary; +import org.apache.cassandra.io.util.FileUtils; + +import static java.lang.String.format; + +public class CompressionDictionaryDetailsTabularData +{ + /** + * Position inside index names of tabular type of tabular data returned upon + * listing dictionaries where raw dictionary is expected to be located. + * We do not need to process this entry as listing does not contain any raw dictionary, + * only exporting does. + */ + public static final int TABULAR_DATA_TYPE_RAW_DICTIONARY_INDEX = 3; + + public static final String KEYSPACE_NAME = "Keyspace"; + public static final String TABLE_NAME = "Table"; + public static final String DICT_ID_NAME = "DictId"; + public static final String DICT_NAME = "Dict"; + public static final String KIND_NAME = "Kind"; + public static final String CHECKSUM_NAME = "Checksum"; + public static final String SIZE_NAME = "Size"; + + + private static final String[] ITEM_NAMES = new String[]{ KEYSPACE_NAME, + TABLE_NAME, + DICT_ID_NAME, + DICT_NAME, + KIND_NAME, + CHECKSUM_NAME, + SIZE_NAME }; + + private static final String[] ITEM_DESCS = new String[]{ "keyspace", + "table", + "dictionary_id", + "dictionary_bytes", + "kind", + "checksum", + "size" }; + + private static final String TYPE_NAME = "DictionaryDetails"; + private static final String ROW_DESC = "DictionaryDetails"; + private static final OpenType<?>[] ITEM_TYPES; + private static final CompositeType COMPOSITE_TYPE; + public static final TabularType TABULAR_TYPE; + + static + { + try + { + ITEM_TYPES = new OpenType[]{ SimpleType.STRING, // keyspace + SimpleType.STRING, // table + SimpleType.LONG, // dict id + new ArrayType<String[]>(SimpleType.BYTE, true), // dict bytes + SimpleType.STRING, // kind + SimpleType.INTEGER, // checksum + SimpleType.INTEGER }; // size of dict bytes + + COMPOSITE_TYPE = new CompositeType(TYPE_NAME, ROW_DESC, ITEM_NAMES, ITEM_DESCS, ITEM_TYPES); + TABULAR_TYPE = new TabularType(TYPE_NAME, ROW_DESC, COMPOSITE_TYPE, ITEM_NAMES); + } + catch (OpenDataException e) + { + throw new RuntimeException(e); + } + } + + /** + * This method is meant to be call when listing dictionaries, we do not need actual dictionary byte arrays. + * + * @param dictionary lightweight dictionary to create composite data from + * @return composite data representing dictionary + */ + public static CompositeData fromLightweightCompressionDictionary(LightweightCompressionDictionary dictionary) + { + try + { + return new CompositeDataSupport(COMPOSITE_TYPE, + ITEM_NAMES, + new Object[] + { + dictionary.keyspaceName, + dictionary.tableName, + dictionary.dictId.id, + null, // on purpose not returning actual dictionary + dictionary.dictId.kind.name(), + dictionary.checksum, + dictionary.size, + }); + } + catch (OpenDataException e) + { + throw new RuntimeException(e); + } + } + + /** + * Used upon exporting a dictionary. + * + * @param keyspace keyspace of a dictionary + * @param table table of a dictionary + * @param dictionary dictionary itself + * @return composite data representing dictionary + */ + public static CompositeData fromCompressionDictionary(String keyspace, String table, CompressionDictionary dictionary) + { + try + { + return new CompositeDataSupport(COMPOSITE_TYPE, + ITEM_NAMES, + new Object[] + { + keyspace, + table, + dictionary.dictId().id, + dictionary.rawDictionary(), + dictionary.kind().name(), + dictionary.checksum(), + dictionary.rawDictionary().length, + }); + } + catch (OpenDataException e) + { + throw new RuntimeException(e); + } + } + + /** + * Used upon deserialisation of data to composite data, e.g. upon importing a dictionary from JSON + * string to CompositeData before they are sent over JMX to import method. + * + * @param dataObject data class object to get composite data from + * @return composite data representing data class + */ + public static CompositeData fromCompressionDictionaryDataObject(CompressionDictionaryDataObject dataObject) + { + try + { + return new CompositeDataSupport(COMPOSITE_TYPE, + ITEM_NAMES, + new Object[] + { + dataObject.keyspace, + dataObject.table, + dataObject.dictId, + dataObject.dict, + dataObject.kind, + dataObject.dictChecksum, + dataObject.dictLength + }); + } + catch (OpenDataException e) + { + throw new RuntimeException(e); + } + } + + /** + * Deserializes data to convenience object to work further with. + * + * @param compositeData data to create data object from + * @return deserialized composite data to convenience object + * @throws IllegalArgumentException if values in deserialized object are invalid. + * @see CompressionDictionaryDataObject#validate() + */ + public static CompressionDictionaryDataObject fromCompositeData(CompositeData compositeData) + { + String keyspace = (String) compositeData.get(CompressionDictionaryDetailsTabularData.KEYSPACE_NAME); + String table = (String) compositeData.get(CompressionDictionaryDetailsTabularData.TABLE_NAME); + long dictId = (Long) compositeData.get(CompressionDictionaryDetailsTabularData.DICT_ID_NAME); + byte[] dictionaryBytes = (byte[]) compositeData.get(CompressionDictionaryDetailsTabularData.DICT_NAME); + String kind = (String) compositeData.get(CompressionDictionaryDetailsTabularData.KIND_NAME); + int checksum = (Integer) compositeData.get(CompressionDictionaryDetailsTabularData.CHECKSUM_NAME); + int size = (Integer) compositeData.get(CompressionDictionaryDetailsTabularData.SIZE_NAME); + + CompressionDictionaryDataObject dataObject = new CompressionDictionaryDataObject(); + dataObject.keyspace = keyspace; + dataObject.table = table; + dataObject.dictId = dictId; + dataObject.dict = dictionaryBytes; + dataObject.kind = kind; + dataObject.dictChecksum = checksum; + dataObject.dictLength = size; + + dataObject.validate(); + + return dataObject; + } + + public static class CompressionDictionaryDataObject implements Cloneable + { + public String keyspace; + public String table; + public long dictId; + public byte[] dict; + public String kind; + public int dictChecksum; + public int dictLength; + + /** + * An object of this class is considered to be valid if: + * + * <ul> + * <li>keyspace and table are not null</li> + * <li>dict id is lower than 0</li> + * <li>dict is not null nor empty</li> + * <li>dict lenght is less than or equal to 1MiB</li> + * <li>kind is not null and there is such {@link org.apache.cassandra.db.compression.CompressionDictionary.Kind}</li> + * <li>dictLenght is bigger than 0</li> + * <li>dictLenght has to be equal to dict's length</li> Review Comment: more length typos ########## src/java/org/apache/cassandra/tools/nodetool/CompressionDictionaryCommandGroup.java: ########## @@ -0,0 +1,295 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.tools.nodetool; + +import java.io.PrintStream; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; +import javax.management.openmbean.CompositeData; +import javax.management.openmbean.TabularData; + +import com.google.common.util.concurrent.Uninterruptibles; + +import org.apache.cassandra.db.compression.CompressionDictionaryDetailsTabularData; +import org.apache.cassandra.db.compression.CompressionDictionaryDetailsTabularData.CompressionDictionaryDataObject; +import org.apache.cassandra.db.compression.ICompressionDictionaryTrainer.TrainingStatus; +import org.apache.cassandra.db.compression.TrainingState; +import org.apache.cassandra.io.util.File; +import org.apache.cassandra.io.util.FileUtils; +import org.apache.cassandra.tools.NodeProbe; +import org.apache.cassandra.tools.nodetool.formatter.TableBuilder; +import org.apache.cassandra.utils.Clock; +import org.apache.cassandra.utils.JsonUtils; +import picocli.CommandLine.Command; +import picocli.CommandLine.Option; +import picocli.CommandLine.Parameters; + +import static java.lang.System.lineSeparator; +import static java.nio.file.StandardOpenOption.CREATE; +import static java.nio.file.StandardOpenOption.TRUNCATE_EXISTING; +import static java.nio.file.StandardOpenOption.WRITE; +import static java.util.stream.Collectors.joining; + +@Command(name = "compressiondictionary", + description = "Manage compression dictionaries", + subcommands = { CompressionDictionaryCommandGroup.TrainDictionary.class, + CompressionDictionaryCommandGroup.ListDictionaries.class, + CompressionDictionaryCommandGroup.ExportDictionary.class, + CompressionDictionaryCommandGroup.ImportDictionary.class }) +public class CompressionDictionaryCommandGroup +{ + @Command(name = "train", + description = "Manually trigger compression dictionary training for a table. If no SSTables are available, the memtable will be flushed first.") + public static class TrainDictionary extends AbstractCommand + { + @Parameters(index = "0", description = "The keyspace name", arity = "1") + private String keyspace; + + @Parameters(index = "1", description = "The table name", arity = "1") + private String table; + + @Option(names = { "-f", "--force" }, description = "Force the dictionary training even if there are not enough samples") + private boolean force = false; + + @Override + public void execute(NodeProbe probe) + { + PrintStream out = probe.output().out; + PrintStream err = probe.output().err; + + try + { + out.printf("Starting compression dictionary training for %s.%s...%n", keyspace, table); + out.printf("Training from existing SSTables (flushing first if needed)%n"); + + probe.trainCompressionDictionary(keyspace, table, force); + + // Wait for training completion (10 minutes timeout for SSTable-based training) + out.println("Sampling from existing SSTables and training."); + long maxWaitMillis = TimeUnit.MINUTES.toMillis(10); + long startTime = Clock.Global.currentTimeMillis(); + + while (Clock.Global.currentTimeMillis() - startTime < maxWaitMillis) + { + TrainingState trainingState = probe.getCompressionDictionaryTrainingState(keyspace, table); + TrainingStatus status = trainingState.getStatus(); + displayProgress(trainingState, startTime, out, status); + if (TrainingStatus.COMPLETED == status) + { + out.printf("%nTraining completed successfully for %s.%s%n", keyspace, table); + return; + } + else if (TrainingStatus.FAILED == status) + { + err.printf("%nTraining failed for %s.%s%n", keyspace, table); + try + { + String failureMessage = trainingState.getFailureMessage(); + if (failureMessage != null && !failureMessage.isEmpty()) + { + err.printf("Reason: %s%n", failureMessage); + } + } + catch (Exception e) + { + // If we can't get the failure message, just continue without it + } + System.exit(1); + } + + Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS); + } + + err.printf("%nTraining did not complete within expected timeframe (10 minutes).%n"); + System.exit(1); + } + catch (Exception e) + { + err.printf("Failed to trigger training: %s%n", e.getMessage()); + System.exit(1); + } + } + + private static void displayProgress(TrainingState trainingState, long startTime, PrintStream out, TrainingStatus status) + { + // Display meaningful statistics + long sampleCount = trainingState.getSampleCount(); + long totalSampleSize = trainingState.getTotalSampleSize(); + long elapsedSeconds = (Clock.Global.currentTimeMillis() - startTime) / 1000; + double sampleSizeMB = totalSampleSize / (1024.0 * 1024.0); + + out.printf("\rStatus: %s | Samples: %d | Size: %.2f MiB | Elapsed: %ds", + status, sampleCount, sampleSizeMB, elapsedSeconds); + } + } + + @Command(name = "list", + description = "List available dictionaries of specific keyspace and table.") + public static class ListDictionaries extends AbstractCommand + { + @Parameters(index = "0", description = "The keyspace name", arity = "1") + private String keyspace; + + @Parameters(index = "1", description = "The table name", arity = "1") + private String table; + + @Override + protected void execute(NodeProbe probe) + { + try + { + TableBuilder tableBuilder = new TableBuilder(); + TabularData tabularData = probe.listCompressionDictionaries(keyspace, table); + List<String> indexNames = tabularData.getTabularType().getIndexNames(); + + List<String> columns = new ArrayList<>(indexNames); + // ignore raw dict + columns.remove(CompressionDictionaryDetailsTabularData.TABULAR_DATA_TYPE_RAW_DICTIONARY_INDEX); + tableBuilder.add(columns); + + for (Object eachDict : tabularData.keySet()) + { + final List<?> dictRow = (List<?>) eachDict; + + List<String> rowValues = new ArrayList<>(); + + for (int i = 0; i < dictRow.size(); i++) + { + if (i == 3) // ignore raw dict Review Comment: Update the condition here as well. ```suggestion if (i == CompressionDictionaryDetailsTabularData.TABULAR_DATA_TYPE_RAW_DICTIONARY_INDEX) // ignore raw dict ``` ########## src/java/org/apache/cassandra/db/compression/CompressionDictionaryManager.java: ########## @@ -244,6 +251,68 @@ public CompositeData getTrainingState() return dictionaryTrainer.getTrainingState().toCompositeData(); } + @Override + public TabularData listCompressionDictionaries() + { + List<LightweightCompressionDictionary> dictionaries = SystemDistributedKeyspace.retrieveLightweightCompressionDictionaries(keyspaceName, tableName); + TabularDataSupport tableData = new TabularDataSupport(CompressionDictionaryDetailsTabularData.TABULAR_TYPE); + + if (dictionaries == null) + { + return tableData; + } + + for (LightweightCompressionDictionary dictionary : dictionaries) + { + tableData.put(CompressionDictionaryDetailsTabularData.fromLightweightCompressionDictionary(dictionary)); + } + + return tableData; + } + + @Override + public CompositeData getCompressionDictionary() + { + CompressionDictionary compressionDictionary = SystemDistributedKeyspace.retrieveLatestCompressionDictionary(keyspaceName, tableName); + if (compressionDictionary == null) + return null; + + return CompressionDictionaryDetailsTabularData.fromCompressionDictionary(keyspaceName, tableName, compressionDictionary); + } + + @Override + public CompositeData getCompressionDictionary(long dictId) + { + CompressionDictionary compressionDictionary = SystemDistributedKeyspace.retrieveCompressionDictionary(keyspaceName, tableName, dictId); + if (compressionDictionary == null) + return null; + + return CompressionDictionaryDetailsTabularData.fromCompressionDictionary(keyspaceName, tableName, compressionDictionary); + } + + @Override + public synchronized void importCompressionDictionary(CompositeData compositeData) + { + CompressionDictionaryDataObject dataObject = CompressionDictionaryDetailsTabularData.fromCompositeData(compositeData); + + if (!keyspaceName.equals(dataObject.keyspace) || !tableName.equals(dataObject.table)) + throw new IllegalArgumentException(format("Keyspace and table of a dictionary to import (%s.%s) does not correspond to the keyspace and table this manager is responsible for (%s.%s)", + dataObject.keyspace, dataObject.table, + keyspaceName, tableName)); + + CompressionDictionary.Kind kind = CompressionDictionary.Kind.valueOf(dataObject.kind); + CompressionDictionary.DictId dictId = new CompressionDictionary.DictId(kind, dataObject.dictId); + + LightweightCompressionDictionary latestCompressionDictionary = SystemDistributedKeyspace.retrieveLightweightLatestCompressionDictionary(keyspaceName, tableName); Review Comment: Should we check table has the correct dictionary compression enabled too? ########## src/java/org/apache/cassandra/tools/nodetool/CompressionDictionaryCommandGroup.java: ########## @@ -0,0 +1,295 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.tools.nodetool; + +import java.io.PrintStream; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; +import javax.management.openmbean.CompositeData; +import javax.management.openmbean.TabularData; + +import com.google.common.util.concurrent.Uninterruptibles; + +import org.apache.cassandra.db.compression.CompressionDictionaryDetailsTabularData; +import org.apache.cassandra.db.compression.CompressionDictionaryDetailsTabularData.CompressionDictionaryDataObject; +import org.apache.cassandra.db.compression.ICompressionDictionaryTrainer.TrainingStatus; +import org.apache.cassandra.db.compression.TrainingState; +import org.apache.cassandra.io.util.File; +import org.apache.cassandra.io.util.FileUtils; +import org.apache.cassandra.tools.NodeProbe; +import org.apache.cassandra.tools.nodetool.formatter.TableBuilder; +import org.apache.cassandra.utils.Clock; +import org.apache.cassandra.utils.JsonUtils; +import picocli.CommandLine.Command; +import picocli.CommandLine.Option; +import picocli.CommandLine.Parameters; + +import static java.lang.System.lineSeparator; +import static java.nio.file.StandardOpenOption.CREATE; +import static java.nio.file.StandardOpenOption.TRUNCATE_EXISTING; +import static java.nio.file.StandardOpenOption.WRITE; +import static java.util.stream.Collectors.joining; + +@Command(name = "compressiondictionary", + description = "Manage compression dictionaries", + subcommands = { CompressionDictionaryCommandGroup.TrainDictionary.class, + CompressionDictionaryCommandGroup.ListDictionaries.class, + CompressionDictionaryCommandGroup.ExportDictionary.class, + CompressionDictionaryCommandGroup.ImportDictionary.class }) +public class CompressionDictionaryCommandGroup +{ + @Command(name = "train", + description = "Manually trigger compression dictionary training for a table. If no SSTables are available, the memtable will be flushed first.") + public static class TrainDictionary extends AbstractCommand + { + @Parameters(index = "0", description = "The keyspace name", arity = "1") + private String keyspace; + + @Parameters(index = "1", description = "The table name", arity = "1") + private String table; + + @Option(names = { "-f", "--force" }, description = "Force the dictionary training even if there are not enough samples") + private boolean force = false; + + @Override + public void execute(NodeProbe probe) + { + PrintStream out = probe.output().out; + PrintStream err = probe.output().err; + + try + { + out.printf("Starting compression dictionary training for %s.%s...%n", keyspace, table); + out.printf("Training from existing SSTables (flushing first if needed)%n"); + + probe.trainCompressionDictionary(keyspace, table, force); + + // Wait for training completion (10 minutes timeout for SSTable-based training) + out.println("Sampling from existing SSTables and training."); + long maxWaitMillis = TimeUnit.MINUTES.toMillis(10); + long startTime = Clock.Global.currentTimeMillis(); + + while (Clock.Global.currentTimeMillis() - startTime < maxWaitMillis) + { + TrainingState trainingState = probe.getCompressionDictionaryTrainingState(keyspace, table); + TrainingStatus status = trainingState.getStatus(); + displayProgress(trainingState, startTime, out, status); + if (TrainingStatus.COMPLETED == status) + { + out.printf("%nTraining completed successfully for %s.%s%n", keyspace, table); + return; + } + else if (TrainingStatus.FAILED == status) + { + err.printf("%nTraining failed for %s.%s%n", keyspace, table); + try + { + String failureMessage = trainingState.getFailureMessage(); + if (failureMessage != null && !failureMessage.isEmpty()) + { + err.printf("Reason: %s%n", failureMessage); + } + } + catch (Exception e) + { + // If we can't get the failure message, just continue without it + } + System.exit(1); + } + + Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS); + } + + err.printf("%nTraining did not complete within expected timeframe (10 minutes).%n"); + System.exit(1); + } + catch (Exception e) + { + err.printf("Failed to trigger training: %s%n", e.getMessage()); + System.exit(1); + } + } + + private static void displayProgress(TrainingState trainingState, long startTime, PrintStream out, TrainingStatus status) + { + // Display meaningful statistics + long sampleCount = trainingState.getSampleCount(); + long totalSampleSize = trainingState.getTotalSampleSize(); + long elapsedSeconds = (Clock.Global.currentTimeMillis() - startTime) / 1000; + double sampleSizeMB = totalSampleSize / (1024.0 * 1024.0); + + out.printf("\rStatus: %s | Samples: %d | Size: %.2f MiB | Elapsed: %ds", + status, sampleCount, sampleSizeMB, elapsedSeconds); + } + } + + @Command(name = "list", + description = "List available dictionaries of specific keyspace and table.") + public static class ListDictionaries extends AbstractCommand + { + @Parameters(index = "0", description = "The keyspace name", arity = "1") + private String keyspace; + + @Parameters(index = "1", description = "The table name", arity = "1") + private String table; + + @Override + protected void execute(NodeProbe probe) + { + try + { + TableBuilder tableBuilder = new TableBuilder(); + TabularData tabularData = probe.listCompressionDictionaries(keyspace, table); + List<String> indexNames = tabularData.getTabularType().getIndexNames(); + + List<String> columns = new ArrayList<>(indexNames); + // ignore raw dict + columns.remove(CompressionDictionaryDetailsTabularData.TABULAR_DATA_TYPE_RAW_DICTIONARY_INDEX); + tableBuilder.add(columns); + + for (Object eachDict : tabularData.keySet()) + { + final List<?> dictRow = (List<?>) eachDict; + + List<String> rowValues = new ArrayList<>(); + + for (int i = 0; i < dictRow.size(); i++) + { + if (i == 3) // ignore raw dict + continue; + + rowValues.add(dictRow.get(i).toString()); + } + tableBuilder.add(rowValues); + } + + tableBuilder.printTo(probe.output().out); + } + catch (Exception e) + { + probe.output().err.printf("Failed to list dictionaries: %s%n", e.getMessage()); + System.exit(1); + } + } + } + + @Command(name = "export", + description = "Export dictionary from Cassandra to local file.") + public static class ExportDictionary extends AbstractCommand + { + @Parameters(index = "0", description = "The keyspace name", arity = "1") + private String keyspace; + + @Parameters(index = "1", description = "The table name", arity = "1") + private String table; + + @Parameters(index = "2", description = "File name to save dictionary to", arity = "1") + private String dictionaryPath; + + @Option(paramLabel = "dictId", names = { "-i", "--id" }, + description = "The dictionary id. When not specified, the current dictionary is returned.") + private long dictId = -1; + + @Override + protected void execute(NodeProbe probe) + { + if (dictId <= 0 && dictId != -1) + { + probe.output().err.printf("Dictionary id has to be strictly positive number.%n"); + System.exit(1); + } + + try + { + CompositeData compressionDictionary; + + if (dictId == -1) + { + compressionDictionary = probe.getCompressionDictionary(keyspace, table); + } + else + { + compressionDictionary = probe.getCompressionDictionary(keyspace, table, dictId); + } + + if (compressionDictionary == null) + { + probe.output().err.printf("Dictionary does not exist.%n"); Review Comment: nit: adding a bit more information in the output ```suggestion probe.output().err.printf("Dictionary%s does not exist for %s.%s.%n", dictId == -1 ? "" : " with id " + dictId, keyspace, table); ``` ########## src/java/org/apache/cassandra/db/compression/CompressionDictionaryManager.java: ########## @@ -267,6 +336,11 @@ private void handleNewDictionary(CompressionDictionary dictionary) onNewDictionaryTrained(dictionary.dictId()); } + public void importDictionary(CompressionDictionary dictionary) Review Comment: ~this method could be `private`~ Actually, should you just remove the method? I think the only call-site `CompressionDictionaryManager#importCompressionDictionary` can call `handleNewDictionary` instead. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]

