Github user QiangCai commented on a diff in the pull request: https://github.com/apache/carbondata/pull/2628#discussion_r213922415 --- Diff: core/src/main/java/org/apache/carbondata/core/datastore/compression/CompressorFactory.java --- @@ -17,42 +17,99 @@ package org.apache.carbondata.core.datastore.compression; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; + import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.util.CarbonProperties; +import org.apache.carbondata.format.CompressionCodec; public class CompressorFactory { - private static final CompressorFactory COMPRESSOR_FACTORY = new CompressorFactory(); - private final Compressor snappyCompressor; + private final Map<String, SupportedCompressor> compressors = new HashMap<>(); + + public enum SupportedCompressor { + SNAPPY(CompressionCodec.SNAPPY, "snappy", SnappyCompressor.class), + ZSTD(CompressionCodec.ZSTD, "zstd", ZstdCompressor.class); + + private CompressionCodec codec; + private String name; + private Class<Compressor> compressorClass; + private transient Compressor compressor; + + SupportedCompressor(CompressionCodec codec, String name, Class compressorCls) { + this.codec = codec; + this.name = name; + this.compressorClass = compressorCls; + } + + public CompressionCodec getCodec() { + return codec; + } + + public String getName() { + return name; + } + + /** + * we will load the compressor only if it is needed + */ + public Compressor getCompressor() { + if (this.compressor == null) { + try { + this.compressor = compressorClass.newInstance(); + } catch (InstantiationException | IllegalAccessException e) { + throw new RuntimeException("Exception occurs while getting compressor for " + name); + } + } + return this.compressor; + } + } private CompressorFactory() { - String compressorType = CarbonProperties.getInstance() - .getProperty(CarbonCommonConstants.COMPRESSOR, CarbonCommonConstants.DEFAULT_COMPRESSOR); - switch (compressorType) { - case "snappy": - snappyCompressor = new SnappyCompressor(); - break; - default: - throw new RuntimeException( - "Invalid compressor type provided! Please provide valid compressor type"); + for (SupportedCompressor supportedCompressor : SupportedCompressor.values()) { + compressors.put(supportedCompressor.getName(), supportedCompressor); } } public static CompressorFactory getInstance() { return COMPRESSOR_FACTORY; } + /** + * get the default compressor. + * This method can only be called in data load procedure to compress column page. + * In query procedure, we should read the compressor information from the metadata + * in datafiles when we want to decompress the content. + */ public Compressor getCompressor() { - return getCompressor(CarbonCommonConstants.DEFAULT_COMPRESSOR); + String compressorType = CarbonProperties.getInstance() + .getProperty(CarbonCommonConstants.COMPRESSOR, CarbonCommonConstants.DEFAULT_COMPRESSOR); + if (!compressors.keySet().contains(compressorType)) { --- End diff -- use compressors.containsKey directly
---