KYLIN-976 Clean up, remove all explict check on TopN & HLLC
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/7f50c17a Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/7f50c17a Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/7f50c17a Branch: refs/heads/2.x-staging Commit: 7f50c17a5202be9508c21d9cc301d231c8b5843e Parents: 636aeec Author: Li, Yang <yang...@ebay.com> Authored: Fri Dec 4 16:38:04 2015 +0800 Committer: Yang Li <liy...@apache.org> Committed: Sat Dec 5 06:55:39 2015 +0800 ---------------------------------------------------------------------- .../org/apache/kylin/common/KylinConfig.java | 16 ++- .../apache/kylin/common/topn/TopNCounter.java | 2 +- .../kylin/common/util/CliCommandExecutor.java | 4 +- .../java/org/apache/kylin/common/util/Pair.java | 3 +- .../kylin/common/topn/TopNCounterTest.java | 4 +- .../kylin/cube/CubeCapabilityChecker.java | 5 - .../org/apache/kylin/cube/CubeDescManager.java | 15 ++- .../org/apache/kylin/cube/CubeInstance.java | 3 - .../java/org/apache/kylin/cube/CubeManager.java | 4 +- .../kylin/cube/gridtable/CubeCodeSystem.java | 2 +- .../gridtable/CuboidToGridTableMapping.java | 30 +---- .../cube/gridtable/TrimmedCubeCodeSystem.java | 2 +- .../InMemCubeBuilderInputConverter.java | 3 +- .../cube/inmemcubing/InMemCubeBuilderUtils.java | 30 +---- .../org/apache/kylin/cube/model/CubeDesc.java | 11 +- .../model/validation/rule/FunctionRule.java | 4 - .../kylin/gridtable/GTSampleCodeSystem.java | 2 +- .../cube/inmemcubing/InMemCubeBuilderTest.java | 25 ++-- .../apache/kylin/dict/lookup/LookupTable.java | 2 +- .../apache/kylin/measure/IMeasureFactory.java | 24 ---- .../apache/kylin/measure/MeasureAggregator.java | 6 +- .../kylin/measure/MeasureAggregators.java | 2 +- .../org/apache/kylin/measure/MeasureType.java | 50 ++------ .../kylin/measure/MeasureTypeFactory.java | 89 ++++++++++++++ .../measure/basic/BasicMeasureFactory.java | 32 ----- .../kylin/measure/basic/BasicMeasureType.java | 74 +++++++----- .../measure/hllc/HLLCAggregationFactory.java | 36 ------ .../kylin/measure/hllc/HLLCMeasureType.java | 76 ++++++++---- .../kylin/measure/hllc/HLLCSerializer.java | 3 +- .../kylin/measure/hllc/LDCAggregator.java | 63 ---------- .../kylin/measure/topn/TopNMeasureFactory.java | 36 ------ .../kylin/measure/topn/TopNMeasureType.java | 116 +++++++++++++------ .../kylin/metadata/datatype/DataType.java | 33 ++++-- .../metadata/datatype/DataTypeSerializer.java | 38 +++--- .../kylin/metadata/model/FunctionDesc.java | 65 ++--------- .../kylin/metadata/tuple/TeeTupleIterator.java | 5 - .../org/apache/kylin/metadata/tuple/Tuple.java | 12 +- .../kylin/storage/translate/HBaseKeyRange.java | 2 +- .../apache/kylin/engine/mr/DFSFileTable.java | 2 +- .../engine/mr/steps/BaseCuboidMapperBase.java | 3 +- .../apache/kylin/engine/mr/steps/CuboidJob.java | 5 +- .../mr/steps/MergeCuboidFromStorageMapper.java | 2 +- .../engine/mr/steps/MergeCuboidMapper.java | 2 +- .../apache/kylin/engine/spark/SparkCubing.java | 11 +- .../measure/FixedLenMeasureCodec.java | 3 +- .../cube/v1/CubeSegmentTopNTupleIterator.java | 86 -------------- .../hbase/cube/v1/CubeSegmentTupleIterator.java | 2 +- .../storage/hbase/cube/v1/CubeStorageQuery.java | 46 ++------ .../hbase/cube/v1/CubeTupleConverter.java | 60 +--------- .../cube/v1/SerializedHBaseTupleIterator.java | 14 +-- .../observer/ObserverAggregators.java | 20 +++- .../hbase/cube/v1/filter/FuzzyRowFilterV2.java | 6 +- .../storage/hbase/cube/v2/CubeHBaseRPC.java | 4 +- .../storage/hbase/cube/v2/CubeStorageQuery.java | 67 +++-------- .../hbase/cube/v2/CubeTupleConverter.java | 65 +---------- .../hbase/cube/v2/HBaseReadonlyStore.java | 1 - .../v2/SequentialCubeTopNTupleIterator.java | 68 ----------- .../cube/v2/SequentialCubeTupleIterator.java | 2 +- .../endpoint/EndpointAggregators.java | 7 +- .../endpoint/EndpointTupleIterator.java | 3 +- .../storage/hbase/steps/CreateHTableJob.java | 7 +- .../storage/hbase/steps/HBaseMROutput2.java | 2 +- .../hbase/steps/HBaseMROutput2Transition.java | 2 +- .../storage/hbase/steps/RowValueDecoder.java | 21 +--- .../hbase/util/GridTableHBaseBenchmark.java | 2 +- .../v1/filter/TestFuzzyRowFilterV2EndToEnd.java | 8 +- 66 files changed, 473 insertions(+), 977 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/7f50c17a/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java ---------------------------------------------------------------------- diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java index d2a94c5..a4b2b1a 100644 --- a/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java +++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java @@ -279,14 +279,20 @@ public class KylinConfig implements Serializable { return getOptional(KYLIN_STORAGE_URL); } + /** was for route to hive, not used any more */ + @Deprecated public String getHiveUrl() { return getOptional(HIVE_URL, ""); } + /** was for route to hive, not used any more */ + @Deprecated public String getHiveUser() { return getOptional(HIVE_USER, ""); } + /** was for route to hive, not used any more */ + @Deprecated public String getHivePassword() { return getOptional(HIVE_PASSWORD, ""); } @@ -302,6 +308,10 @@ public class KylinConfig implements Serializable { public String getHBaseClusterFs() { return getOptional(KYLIN_HBASE_CLUSTER_FS, ""); } + + public String[] getMeasureTypeFactories() { + return getOptionalStringArray("kylin.measure.type.factories", new String[0]); + } public String getKylinJobLogDir() { return getOptional(KYLIN_JOB_LOG_DIR, "/tmp/kylin/logs"); @@ -445,7 +455,7 @@ public class KylinConfig implements Serializable { } public String[] getRestServers() { - return getOptionalStringArray(KYLIN_REST_SERVERS); + return getOptionalStringArray(KYLIN_REST_SERVERS, new String[0]); } public String getAdminDls() { @@ -532,12 +542,12 @@ public class KylinConfig implements Serializable { return property != null ? property : properties.getProperty(prop); } - private String[] getOptionalStringArray(String prop) { + private String[] getOptionalStringArray(String prop, String[] dft) { final String property = getOptional(prop); if (!StringUtils.isBlank(property)) { return property.split("\\s*,\\s*"); } else { - return new String[] {}; + return dft; } } http://git-wip-us.apache.org/repos/asf/kylin/blob/7f50c17a/core-common/src/main/java/org/apache/kylin/common/topn/TopNCounter.java ---------------------------------------------------------------------- diff --git a/core-common/src/main/java/org/apache/kylin/common/topn/TopNCounter.java b/core-common/src/main/java/org/apache/kylin/common/topn/TopNCounter.java index 4a95c66..065d969 100644 --- a/core-common/src/main/java/org/apache/kylin/common/topn/TopNCounter.java +++ b/core-common/src/main/java/org/apache/kylin/common/topn/TopNCounter.java @@ -108,7 +108,7 @@ public class TopNCounter<T> implements Iterable<Counter<T>> { incrementCounter(counterNode, incrementCount); - return new Pair<Boolean, T>(isNewItem, droppedItem); + return Pair.newPair(isNewItem, droppedItem); } protected void incrementCounter(ListNode2<Counter<T>> counterNode, double incrementCount) { http://git-wip-us.apache.org/repos/asf/kylin/blob/7f50c17a/core-common/src/main/java/org/apache/kylin/common/util/CliCommandExecutor.java ---------------------------------------------------------------------- diff --git a/core-common/src/main/java/org/apache/kylin/common/util/CliCommandExecutor.java b/core-common/src/main/java/org/apache/kylin/common/util/CliCommandExecutor.java index cc5346c..aa25c22 100644 --- a/core-common/src/main/java/org/apache/kylin/common/util/CliCommandExecutor.java +++ b/core-common/src/main/java/org/apache/kylin/common/util/CliCommandExecutor.java @@ -104,7 +104,7 @@ public class CliCommandExecutor { sshOutput = ssh.execCommand(command, remoteTimeoutSeconds, logAppender); int exitCode = sshOutput.getExitCode(); String output = sshOutput.getText(); - return new Pair<Integer, String>(exitCode, output); + return Pair.newPair(exitCode, output); } catch (IOException e) { throw e; } catch (Exception e) { @@ -140,7 +140,7 @@ public class CliCommandExecutor { try { int exitCode = proc.waitFor(); - return new Pair<Integer, String>(exitCode, result.toString()); + return Pair.newPair(exitCode, result.toString()); } catch (InterruptedException e) { throw new IOException(e); } http://git-wip-us.apache.org/repos/asf/kylin/blob/7f50c17a/core-common/src/main/java/org/apache/kylin/common/util/Pair.java ---------------------------------------------------------------------- diff --git a/core-common/src/main/java/org/apache/kylin/common/util/Pair.java b/core-common/src/main/java/org/apache/kylin/common/util/Pair.java index 9efdf3b..d28b05f 100644 --- a/core-common/src/main/java/org/apache/kylin/common/util/Pair.java +++ b/core-common/src/main/java/org/apache/kylin/common/util/Pair.java @@ -28,6 +28,7 @@ import java.io.Serializable; */ public class Pair<T1, T2> implements Serializable { private static final long serialVersionUID = -3986244606585552569L; + protected T1 first = null; protected T2 second = null; @@ -96,7 +97,7 @@ public class Pair<T1, T2> implements Serializable { } @Override - @SuppressWarnings("unchecked") + @SuppressWarnings("rawtypes") public boolean equals(Object other) { return other instanceof Pair && equals(first, ((Pair) other).first) && equals(second, ((Pair) other).second); } http://git-wip-us.apache.org/repos/asf/kylin/blob/7f50c17a/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterTest.java ---------------------------------------------------------------------- diff --git a/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterTest.java b/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterTest.java index a6aa610..821a0fc 100644 --- a/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterTest.java +++ b/core-common/src/test/java/org/apache/kylin/common/topn/TopNCounterTest.java @@ -248,7 +248,7 @@ public class TopNCounterTest { List<Pair<String, Double>> allRecords = Lists.newArrayList(); for (Counter<String> counter : tops) - allRecords.add(new Pair(counter.getItem(), counter.getCount())); + allRecords.add(Pair.newPair(counter.getItem(), counter.getCount())); timeSpent += (System.currentTimeMillis() - startTime); return allRecords; } @@ -284,7 +284,7 @@ public class TopNCounterTest { List<Pair<String, Double>> allRecords = Lists.newArrayList(); for (Map.Entry<String, Double> entry : hashMap.entrySet()) { - allRecords.add(new Pair(entry.getKey(), entry.getValue())); + allRecords.add(Pair.newPair(entry.getKey(), entry.getValue())); } Collections.sort(allRecords, new Comparator<Pair<String, Double>>() { http://git-wip-us.apache.org/repos/asf/kylin/blob/7f50c17a/core-cube/src/main/java/org/apache/kylin/cube/CubeCapabilityChecker.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeCapabilityChecker.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeCapabilityChecker.java index 624bb0b..28252a3 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/CubeCapabilityChecker.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeCapabilityChecker.java @@ -169,11 +169,6 @@ public class CubeCapabilityChecker { continue; } - // calcite can not handle distinct count - if (functionDesc.isCountDistinct()) { - continue; - } - // calcite can do aggregation from columns on-the-fly List<TblColRef> neededCols = functionDesc.getParameter().getColRefs(); if (neededCols.size() > 0 && cubeDesc.listDimensionColumnsIncludingDerived().containsAll(neededCols)) { http://git-wip-us.apache.org/repos/asf/kylin/blob/7f50c17a/core-cube/src/main/java/org/apache/kylin/cube/CubeDescManager.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeDescManager.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeDescManager.java index 53cd00f..93bde26 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/CubeDescManager.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeDescManager.java @@ -34,6 +34,7 @@ import org.apache.kylin.cube.cuboid.Cuboid; import org.apache.kylin.cube.model.CubeDesc; import org.apache.kylin.cube.model.validation.CubeMetadataValidator; import org.apache.kylin.cube.model.validation.ValidateContext; +import org.apache.kylin.measure.MeasureTypeFactory; import org.apache.kylin.metadata.MetadataConstants; import org.apache.kylin.metadata.MetadataManager; import org.slf4j.Logger; @@ -53,17 +54,13 @@ public class CubeDescManager { // static cached instances private static final ConcurrentHashMap<KylinConfig, CubeDescManager> CACHE = new ConcurrentHashMap<KylinConfig, CubeDescManager>(); - // ============================================================================ - - private KylinConfig config; - // name ==> CubeDesc - private CaseInsensitiveStringCache<CubeDesc> cubeDescMap; - public static CubeDescManager getInstance(KylinConfig config) { CubeDescManager r = CACHE.get(config); if (r != null) { return r; } + + MeasureTypeFactory.init(config); synchronized (CubeDescManager.class) { r = CACHE.get(config); @@ -87,6 +84,12 @@ public class CubeDescManager { CACHE.clear(); } + // ============================================================================ + + private KylinConfig config; + // name ==> CubeDesc + private CaseInsensitiveStringCache<CubeDesc> cubeDescMap; + private CubeDescManager(KylinConfig config) throws IOException { logger.info("Initializing CubeDescManager with config " + config); this.config = config; http://git-wip-us.apache.org/repos/asf/kylin/blob/7f50c17a/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java index 81c7909..dccc3f1 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java @@ -421,9 +421,6 @@ public class CubeInstance extends RootPersistentEntity implements IRealization, if (!this.getDescriptor().getModel().getPartitionDesc().isPartitioned()) return false; - if (this.getDescriptor().hasHolisticCountDistinctMeasures()) - return false; - return this.getDescriptor().getAutoMergeTimeRanges() != null && this.getDescriptor().getAutoMergeTimeRanges().length > 0; } http://git-wip-us.apache.org/repos/asf/kylin/blob/7f50c17a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java index 24286cc..885dafa 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java @@ -399,7 +399,7 @@ public class CubeManager implements IRealizationProvider { CubeUpdate cubeBuilder = new CubeUpdate(cube).setToAddSegs(appendSegment, mergeSegment); updateCube(cubeBuilder); - return new Pair<CubeSegment, CubeSegment>(appendSegment, mergeSegment); + return Pair.newPair(appendSegment, mergeSegment); } public CubeSegment appendSegments(CubeInstance cube, long endDate) throws IOException { @@ -491,7 +491,7 @@ public class CubeManager implements IRealizationProvider { } } } - return new Pair<Long, Long>(start, end); + return Pair.newPair(start, end); } private boolean hasOverlap(long startDate, long endDate, long anotherStartDate, long anotherEndDate) { http://git-wip-us.apache.org/repos/asf/kylin/blob/7f50c17a/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CubeCodeSystem.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CubeCodeSystem.java b/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CubeCodeSystem.java index 7f38c26..d766776 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CubeCodeSystem.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CubeCodeSystem.java @@ -125,7 +125,7 @@ public class CubeCodeSystem implements IGTCodeSystem { MeasureAggregator<?>[] result = new MeasureAggregator[aggrFunctions.length]; for (int i = 0; i < result.length; i++) { int col = columns.trueBitAt(i); - result[i] = MeasureAggregator.create(aggrFunctions[i], info.getColumnType(col).toString()); + result[i] = MeasureAggregator.create(aggrFunctions[i], info.getColumnType(col)); } // deal with holistic distinct count http://git-wip-us.apache.org/repos/asf/kylin/blob/7f50c17a/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CuboidToGridTableMapping.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CuboidToGridTableMapping.java b/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CuboidToGridTableMapping.java index 361d96a..d0fca02 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CuboidToGridTableMapping.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CuboidToGridTableMapping.java @@ -15,8 +15,6 @@ import org.apache.kylin.metadata.model.FunctionDesc; import org.apache.kylin.metadata.model.MeasureDesc; import org.apache.kylin.metadata.model.TblColRef; -import com.google.common.collect.LinkedListMultimap; -import com.google.common.collect.ListMultimap; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -32,7 +30,7 @@ public class CuboidToGridTableMapping { private ImmutableBitSet gtPrimaryKey; private int nMetrics; - private ListMultimap<FunctionDesc, Integer> metrics2gt; // because count distinct may have a holistic version + private Map<FunctionDesc, Integer> metrics2gt; // because count distinct may have a holistic version public CuboidToGridTableMapping(Cuboid cuboid) { this.cuboid = cuboid; @@ -68,19 +66,12 @@ public class CuboidToGridTableMapping { } // metrics - metrics2gt = LinkedListMultimap.create(); + metrics2gt = Maps.newHashMap(); for (MeasureDesc measure :cuboid.getCubeDesc().getMeasures()) { // Count distinct & holistic count distinct are equals() but different. // Ensure the holistic version if exists is always the first. FunctionDesc func = measure.getFunction(); - if (func.isHolisticCountDistinct()) { - List<Integer> existing = metrics2gt.removeAll(func); - metrics2gt.put(func, gtColIdx); - metrics2gt.putAll(func, existing); - } else { - metrics2gt.put(func, gtColIdx); - } - + metrics2gt.put(func, gtColIdx); gtDataTypes.add(func.getReturnDataType()); // map to column block @@ -135,19 +126,8 @@ public class CuboidToGridTableMapping { } public int getIndexOf(FunctionDesc metric) { - List<Integer> list = metrics2gt.get(metric); - // normal case - if (list.size() == 1) { - return list.get(0); - } - // count distinct & its holistic version - else if (list.size() == 2) { - assert metric.isCountDistinct(); - return metric.isHolisticCountDistinct() ? list.get(0) : list.get(1); - } - // unexpected - else - return -1; + Integer r = metrics2gt.get(metric); + return r == null ? -1 : r.intValue(); } public List<TblColRef> getCuboidDimensionsInGTOrder() { http://git-wip-us.apache.org/repos/asf/kylin/blob/7f50c17a/core-cube/src/main/java/org/apache/kylin/cube/gridtable/TrimmedCubeCodeSystem.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/gridtable/TrimmedCubeCodeSystem.java b/core-cube/src/main/java/org/apache/kylin/cube/gridtable/TrimmedCubeCodeSystem.java index c4d0a7e..19ebaef 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/gridtable/TrimmedCubeCodeSystem.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/gridtable/TrimmedCubeCodeSystem.java @@ -94,7 +94,7 @@ public class TrimmedCubeCodeSystem implements IGTCodeSystem { MeasureAggregator<?>[] result = new MeasureAggregator[aggrFunctions.length]; for (int i = 0; i < result.length; i++) { int col = columns.trueBitAt(i); - result[i] = MeasureAggregator.create(aggrFunctions[i], info.getColumnType(col).toString()); + result[i] = MeasureAggregator.create(aggrFunctions[i], info.getColumnType(col)); } // deal with holistic distinct count http://git-wip-us.apache.org/repos/asf/kylin/blob/7f50c17a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderInputConverter.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderInputConverter.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderInputConverter.java index fed9479..3e9a67d 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderInputConverter.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderInputConverter.java @@ -99,8 +99,7 @@ public class InMemCubeBuilderInputConverter { int paramColIdx = 0; // index among parameters of column type for (int i = 0; i < paramCount; i++, param = param.getNextParameter()) { String value; - if (function.isCount() || function.isHolisticCountDistinct()) { - // note for holistic count distinct, this value will be ignored + if (function.isCount()) { value = "1"; } else if (param.isColumnType()) { value = row.get(colIdxOnFlatTable[paramColIdx++]); http://git-wip-us.apache.org/repos/asf/kylin/blob/7f50c17a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderUtils.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderUtils.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderUtils.java index f8a887d..8fa99cd 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderUtils.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderUtils.java @@ -17,43 +17,15 @@ */ package org.apache.kylin.cube.inmemcubing; -import com.google.common.collect.Maps; +import java.util.BitSet; -import org.apache.kylin.common.util.Dictionary; import org.apache.kylin.common.util.ImmutableBitSet; import org.apache.kylin.common.util.Pair; -import org.apache.kylin.cube.model.CubeDesc; -import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc; -import org.apache.kylin.metadata.model.FunctionDesc; -import org.apache.kylin.metadata.model.MeasureDesc; -import org.apache.kylin.metadata.model.TblColRef; - -import java.util.BitSet; -import java.util.HashMap; -import java.util.Map; /** */ public final class InMemCubeBuilderUtils { - public static final HashMap<Integer, Dictionary<String>> createTopNLiteralColDictionaryMap(CubeDesc cubeDesc, CubeJoinedFlatTableDesc intermediateTableDesc, Map<TblColRef, Dictionary<?>> dictionaryMap) { - HashMap<Integer, Dictionary<String>> result = Maps.newHashMap(); - for (int measureIdx = 0; measureIdx < cubeDesc.getMeasures().size(); measureIdx++) { - MeasureDesc measureDesc = cubeDesc.getMeasures().get(measureIdx); - FunctionDesc func = measureDesc.getFunction(); - if (func.isTopN()) { - int[] flatTableIdx = intermediateTableDesc.getMeasureColumnIndexes()[measureIdx]; - int literalColIdx = flatTableIdx[flatTableIdx.length - 1]; - TblColRef literalCol = func.getTopNLiteralColumn(); - @SuppressWarnings("unchecked") - Dictionary<String> dictionary = (Dictionary<String>) dictionaryMap.get(literalCol); - //Preconditions.checkNotNull(dictionary);//FIXME disable check since dictionary is null when building empty segment - result.put(literalColIdx, dictionary); - } - } - return result; - } - public static final Pair<ImmutableBitSet, ImmutableBitSet> getDimensionAndMetricColumnBitSet(final long cuboidId, final int measureCount) { int cardinality = Long.bitCount(cuboidId); BitSet dimension = new BitSet(); http://git-wip-us.apache.org/repos/asf/kylin/blob/7f50c17a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java index 8c7d891..7e79f64 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java @@ -642,11 +642,6 @@ public class CubeDesc extends RootPersistentEntity { FunctionDesc func = m.getFunction(); func.init(factTable); allColumns.addAll(func.getParameter().getColRefs()); - - // verify holistic count distinct as a dependent measure - if (func.isHolisticCountDistinct() && StringUtils.isBlank(m.getDependentMeasureRef())) { - throw new IllegalStateException(m + " is a holistic count distinct but it has no DependentMeasureRef defined!"); - } } } @@ -677,9 +672,9 @@ public class CubeDesc extends RootPersistentEntity { } } - public boolean hasHolisticCountDistinctMeasures() { + public boolean hasMemoryHungryMeasures() { for (MeasureDesc measure : measures) { - if (measure.getFunction().isHolisticCountDistinct()) { + if (measure.getFunction().getMeasureType().isMemoryHungry()) { return true; } } @@ -766,7 +761,7 @@ public class CubeDesc extends RootPersistentEntity { } for (MeasureDesc measure : measures) { - MeasureType aggrType = measure.getFunction().getMeasureType(); + MeasureType<?> aggrType = measure.getFunction().getMeasureType(); result.addAll(aggrType.getColumnsNeedDictionary(measure.getFunction())); } return result; http://git-wip-us.apache.org/repos/asf/kylin/blob/7f50c17a/core-cube/src/main/java/org/apache/kylin/cube/model/validation/rule/FunctionRule.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/validation/rule/FunctionRule.java b/core-cube/src/main/java/org/apache/kylin/cube/model/validation/rule/FunctionRule.java index c918798..530ce07 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/model/validation/rule/FunctionRule.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/model/validation/rule/FunctionRule.java @@ -114,10 +114,6 @@ public class FunctionRule implements IValidatorRule<CubeDesc> { if (rtype.isIntegerFamily() == false) { context.addResult(ResultLevel.ERROR, "Return type for function " + func + " must be one of " + DataType.INTEGER_FAMILY); } - } else if (funcDesc.isCountDistinct()) { - if (rtype.isHLLC() == false && funcDesc.isHolisticCountDistinct() == false) { - context.addResult(ResultLevel.ERROR, "Return type for function " + func + " must be hllc(10), hllc(12) etc."); - } } else if (funcDesc.isMax() || funcDesc.isMin() || funcDesc.isSum()) { if (rtype.isNumberFamily() == false) { context.addResult(ResultLevel.ERROR, "Return type for function " + func + " must be one of " + DataType.NUMBER_FAMILY); http://git-wip-us.apache.org/repos/asf/kylin/blob/7f50c17a/core-cube/src/main/java/org/apache/kylin/gridtable/GTSampleCodeSystem.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTSampleCodeSystem.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTSampleCodeSystem.java index b3133be..77eb430 100644 --- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTSampleCodeSystem.java +++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTSampleCodeSystem.java @@ -60,7 +60,7 @@ public class GTSampleCodeSystem implements IGTCodeSystem { MeasureAggregator<?>[] result = new MeasureAggregator[aggrFunctions.length]; for (int i = 0; i < result.length; i++) { int col = columns.trueBitAt(i); - result[i] = MeasureAggregator.create(aggrFunctions[i], info.getColumnType(col).toString()); + result[i] = MeasureAggregator.create(aggrFunctions[i], info.getColumnType(col)); } return result; } http://git-wip-us.apache.org/repos/asf/kylin/blob/7f50c17a/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderTest.java ---------------------------------------------------------------------- diff --git a/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderTest.java b/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderTest.java index ab87c2b..b593e48 100644 --- a/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderTest.java +++ b/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderTest.java @@ -186,15 +186,22 @@ public class InMemCubeBuilderTest extends LocalFileMetadataTestCase { for (int measureIdx = 0; measureIdx < cube.getDescriptor().getMeasures().size(); measureIdx++) { MeasureDesc measureDesc = cube.getDescriptor().getMeasures().get(measureIdx); FunctionDesc func = measureDesc.getFunction(); - if (func.isTopN()) { - int[] flatTableIdx = flatTableDesc.getMeasureColumnIndexes()[measureIdx]; - int literalColIdx = flatTableIdx[flatTableIdx.length - 1]; - TblColRef literalCol = func.getTopNLiteralColumn(); - logger.info("Building dictionary for " + literalCol); - List<byte[]> valueList = readValueList(flatTable, nColumns, literalColIdx); - Dictionary<String> dict = DictionaryGenerator.buildDictionaryFromValueEnumerator(literalCol.getType(), new IterableDictionaryValueEnumerator(valueList)); - - result.put(literalCol, dict); + List<TblColRef> dictCols = func.getMeasureType().getColumnsNeedDictionary(func); + if (dictCols.isEmpty()) + continue; + + int[] flatTableIdx = flatTableDesc.getMeasureColumnIndexes()[measureIdx]; + List<TblColRef> paramCols = func.getParameter().getColRefs(); + for (int i = 0; i < paramCols.size(); i++) { + TblColRef col = paramCols.get(i); + if (dictCols.contains(col)) { + int colIdxOnFlat = flatTableIdx[i]; + logger.info("Building dictionary for " + col); + List<byte[]> valueList = readValueList(flatTable, nColumns, colIdxOnFlat); + Dictionary<String> dict = DictionaryGenerator.buildDictionaryFromValueEnumerator(col.getType(), new IterableDictionaryValueEnumerator(valueList)); + + result.put(col, dict); + } } } http://git-wip-us.apache.org/repos/asf/kylin/blob/7f50c17a/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/LookupTable.java ---------------------------------------------------------------------- diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/LookupTable.java b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/LookupTable.java index eb2b963..9abd28c 100644 --- a/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/LookupTable.java +++ b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/LookupTable.java @@ -125,7 +125,7 @@ abstract public class LookupTable<T extends Comparable<T>> { if (returnBegin == null && returnEnd == null) return null; else - return new Pair<T, T>(returnBegin, returnEnd); + return Pair.newPair(returnBegin, returnEnd); } public Set<T> mapValues(String col, Set<T> values, String returnCol) { http://git-wip-us.apache.org/repos/asf/kylin/blob/7f50c17a/core-metadata/src/main/java/org/apache/kylin/measure/IMeasureFactory.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/IMeasureFactory.java b/core-metadata/src/main/java/org/apache/kylin/measure/IMeasureFactory.java deleted file mode 100644 index 0ab547a..0000000 --- a/core-metadata/src/main/java/org/apache/kylin/measure/IMeasureFactory.java +++ /dev/null @@ -1,24 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package org.apache.kylin.measure; - -public interface IMeasureFactory { - - public MeasureType createMeasureType(String funcName, String dataType); -} http://git-wip-us.apache.org/repos/asf/kylin/blob/7f50c17a/core-metadata/src/main/java/org/apache/kylin/measure/MeasureAggregator.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/MeasureAggregator.java b/core-metadata/src/main/java/org/apache/kylin/measure/MeasureAggregator.java index 32e5128..4c031c7 100644 --- a/core-metadata/src/main/java/org/apache/kylin/measure/MeasureAggregator.java +++ b/core-metadata/src/main/java/org/apache/kylin/measure/MeasureAggregator.java @@ -20,13 +20,15 @@ package org.apache.kylin.measure; import java.io.Serializable; +import org.apache.kylin.metadata.datatype.DataType; + /** */ @SuppressWarnings("serial") abstract public class MeasureAggregator<V> implements Serializable { - public static MeasureAggregator<?> create(String funcName, String dataType) { - return MeasureType.create(funcName, dataType).newAggregator(); + public static MeasureAggregator<?> create(String funcName, DataType dataType) { + return MeasureTypeFactory.create(funcName, dataType).newAggregator(); } public static int guessBigDecimalMemBytes() { http://git-wip-us.apache.org/repos/asf/kylin/blob/7f50c17a/core-metadata/src/main/java/org/apache/kylin/measure/MeasureAggregators.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/MeasureAggregators.java b/core-metadata/src/main/java/org/apache/kylin/measure/MeasureAggregators.java index c6b456e..b3edbc3 100644 --- a/core-metadata/src/main/java/org/apache/kylin/measure/MeasureAggregators.java +++ b/core-metadata/src/main/java/org/apache/kylin/measure/MeasureAggregators.java @@ -45,7 +45,7 @@ public class MeasureAggregators implements Serializable { Map<String, Integer> measureIndexMap = new HashMap<String, Integer>(); for (int i = 0; i < descLength; i++) { FunctionDesc func = measureDescs[i].getFunction(); - aggs[i] = MeasureAggregator.create(func.getExpression(), func.getReturnType()); + aggs[i] = func.getMeasureType().newAggregator(); measureIndexMap.put(measureDescs[i].getName(), i); } // fill back dependent aggregator http://git-wip-us.apache.org/repos/asf/kylin/blob/7f50c17a/core-metadata/src/main/java/org/apache/kylin/measure/MeasureType.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/MeasureType.java b/core-metadata/src/main/java/org/apache/kylin/measure/MeasureType.java index d4b2700..cf04c60 100644 --- a/core-metadata/src/main/java/org/apache/kylin/measure/MeasureType.java +++ b/core-metadata/src/main/java/org/apache/kylin/measure/MeasureType.java @@ -24,11 +24,6 @@ import java.util.List; import java.util.Map; import org.apache.kylin.common.util.Dictionary; -import org.apache.kylin.measure.basic.BasicMeasureFactory; -import org.apache.kylin.measure.hllc.HLLCAggregationFactory; -import org.apache.kylin.measure.topn.TopNMeasureFactory; -import org.apache.kylin.metadata.datatype.DataType; -import org.apache.kylin.metadata.datatype.DataTypeSerializer; import org.apache.kylin.metadata.model.FunctionDesc; import org.apache.kylin.metadata.model.MeasureDesc; import org.apache.kylin.metadata.model.TblColRef; @@ -37,54 +32,27 @@ import org.apache.kylin.metadata.realization.SQLDigest; import org.apache.kylin.metadata.tuple.Tuple; import org.apache.kylin.metadata.tuple.TupleInfo; -import com.google.common.collect.Maps; - -abstract public class MeasureType { - - private static final Map<String, IMeasureFactory> factoryRegistry = Maps.newConcurrentMap(); - private static final IMeasureFactory defaultFactory = new BasicMeasureFactory(); - - static { - factoryRegistry.put(FunctionDesc.FUNC_COUNT_DISTINCT, new HLLCAggregationFactory()); - factoryRegistry.put(FunctionDesc.FUNC_TOP_N, new TopNMeasureFactory()); - } - - public static MeasureType create(String funcName, String dataType) { - funcName = funcName.toUpperCase(); - dataType = dataType.toLowerCase(); - - IMeasureFactory factory = factoryRegistry.get(funcName); - if (factory == null) - factory = defaultFactory; - - MeasureType result = factory.createMeasureType(funcName, dataType); - - // register serializer for aggr data type - DataType aggregationDataType = result.getAggregationDataType(); - if (DataTypeSerializer.hasRegistered(aggregationDataType.getName()) == false) { - DataTypeSerializer.register(aggregationDataType.getName(), result.getAggregationDataSeralizer()); - } - - return result; - } +abstract public class MeasureType<T> { /* ============================================================================ * Define * ---------------------------------------------------------------------------- */ - abstract public DataType getAggregationDataType(); - - abstract public Class<? extends DataTypeSerializer<?>> getAggregationDataSeralizer(); + public void validate(FunctionDesc functionDesc) throws IllegalArgumentException { + return; + } - abstract public void validate(MeasureDesc measureDesc) throws IllegalArgumentException; + public boolean isMemoryHungry() { + return false; + } /* ============================================================================ * Build * ---------------------------------------------------------------------------- */ - abstract public MeasureIngester<?> newIngester(); + abstract public MeasureIngester<T> newIngester(); - abstract public MeasureAggregator<?> newAggregator(); + abstract public MeasureAggregator<T> newAggregator(); public List<TblColRef> getColumnsNeedDictionary(FunctionDesc functionDesc) { return Collections.emptyList(); http://git-wip-us.apache.org/repos/asf/kylin/blob/7f50c17a/core-metadata/src/main/java/org/apache/kylin/measure/MeasureTypeFactory.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/MeasureTypeFactory.java b/core-metadata/src/main/java/org/apache/kylin/measure/MeasureTypeFactory.java new file mode 100644 index 0000000..48eddb2 --- /dev/null +++ b/core-metadata/src/main/java/org/apache/kylin/measure/MeasureTypeFactory.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.measure; + +import java.util.List; +import java.util.Map; + +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.util.ClassUtil; +import org.apache.kylin.common.util.Pair; +import org.apache.kylin.measure.basic.BasicMeasureType; +import org.apache.kylin.measure.hllc.HLLCMeasureType; +import org.apache.kylin.measure.topn.TopNMeasureType; +import org.apache.kylin.metadata.datatype.DataType; +import org.apache.kylin.metadata.datatype.DataTypeSerializer; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; + +abstract public class MeasureTypeFactory<T> { + + abstract public MeasureType<T> createMeasureType(String funcName, DataType dataType); + + abstract public String getAggrFunctionName(); + abstract public String getAggrDataTypeName(); + abstract public Class<? extends DataTypeSerializer<T>> getAggrDataTypeSerializer(); + + // ============================================================================ + + private static Map<Pair<String, String>, MeasureTypeFactory<?>> factories = Maps.newHashMap(); + private static MeasureTypeFactory<?> defaultFactory = new BasicMeasureType.Factory(); + + public static synchronized void init(KylinConfig config) { + if (factories.isEmpty() == false) + return; + + List<MeasureTypeFactory<?>> factoryInsts = Lists.newArrayList(); + + // two built-in advanced measure types + factoryInsts.add(new HLLCMeasureType.Factory()); + factoryInsts.add(new TopNMeasureType.Factory()); + + // more custom measure types + for (String factoryClz : config.getMeasureTypeFactories()) { + factoryInsts.add((MeasureTypeFactory<?>) ClassUtil.newInstance(factoryClz)); + } + + // register factories & data type serializers + for (MeasureTypeFactory<?> factory : factoryInsts) { + String funcName = factory.getAggrFunctionName().toUpperCase(); + String dataTypeName = factory.getAggrDataTypeName().toLowerCase(); + Class<? extends DataTypeSerializer<?>> serializer = factory.getAggrDataTypeSerializer(); + + DataType.register(dataTypeName); + DataTypeSerializer.register(dataTypeName, serializer); + factories.put(Pair.newPair(funcName, dataTypeName), factory); + } + } + + public static MeasureType<?> create(String funcName, String dataType) { + return create(funcName, DataType.getType(dataType)); + } + + public static MeasureType<?> create(String funcName, DataType dataType) { + funcName = funcName.toUpperCase(); + + MeasureTypeFactory<?> factory = factories.get(Pair.newPair(funcName, dataType.getName())); + if (factory == null) + factory = defaultFactory; + + return factory.createMeasureType(funcName, dataType); + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/7f50c17a/core-metadata/src/main/java/org/apache/kylin/measure/basic/BasicMeasureFactory.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/basic/BasicMeasureFactory.java b/core-metadata/src/main/java/org/apache/kylin/measure/basic/BasicMeasureFactory.java deleted file mode 100644 index 7bfee49..0000000 --- a/core-metadata/src/main/java/org/apache/kylin/measure/basic/BasicMeasureFactory.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package org.apache.kylin.measure.basic; - -import org.apache.kylin.measure.IMeasureFactory; -import org.apache.kylin.measure.MeasureType; -import org.apache.kylin.metadata.datatype.DataType; - -public class BasicMeasureFactory implements IMeasureFactory { - - @Override - public MeasureType createMeasureType(String funcName, String dataType) { - return new BasicMeasureType(funcName, DataType.getType(dataType)); - } - -} http://git-wip-us.apache.org/repos/asf/kylin/blob/7f50c17a/core-metadata/src/main/java/org/apache/kylin/measure/basic/BasicMeasureType.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/basic/BasicMeasureType.java b/core-metadata/src/main/java/org/apache/kylin/measure/basic/BasicMeasureType.java index fea6ba5..bc9e6b1 100644 --- a/core-metadata/src/main/java/org/apache/kylin/measure/basic/BasicMeasureType.java +++ b/core-metadata/src/main/java/org/apache/kylin/measure/basic/BasicMeasureType.java @@ -21,47 +21,59 @@ package org.apache.kylin.measure.basic; import org.apache.kylin.measure.MeasureAggregator; import org.apache.kylin.measure.MeasureIngester; import org.apache.kylin.measure.MeasureType; -import org.apache.kylin.metadata.datatype.BigDecimalSerializer; +import org.apache.kylin.measure.MeasureTypeFactory; import org.apache.kylin.metadata.datatype.DataType; -import org.apache.kylin.metadata.datatype.DataTypeSerializer; -import org.apache.kylin.metadata.datatype.DoubleSerializer; -import org.apache.kylin.metadata.datatype.LongSerializer; import org.apache.kylin.metadata.model.FunctionDesc; -import org.apache.kylin.metadata.model.MeasureDesc; +@SuppressWarnings("rawtypes") public class BasicMeasureType extends MeasureType { + public static class Factory extends MeasureTypeFactory { + + @Override + public MeasureType createMeasureType(String funcName, DataType dataType) { + return new BasicMeasureType(funcName, dataType); + } + + @Override + public String getAggrFunctionName() { + return null; + } + + @Override + public String getAggrDataTypeName() { + return null; + } + + @Override + public Class getAggrDataTypeSerializer() { + return null; + } + } + private final String funcName; private final DataType dataType; public BasicMeasureType(String funcName, DataType dataType) { + validate(funcName, dataType); this.funcName = funcName; this.dataType = dataType; } @Override - public DataType getAggregationDataType() { - return dataType; + public void validate(FunctionDesc functionDesc) throws IllegalArgumentException { + validate(functionDesc.getExpression(), functionDesc.getReturnDataType()); } - public Class<? extends DataTypeSerializer<?>> getAggregationDataSeralizer() { - if (dataType.isIntegerFamily()) - return LongSerializer.class; - else if (dataType.isDecimal()) - return BigDecimalSerializer.class; - else if (dataType.isNumberFamily()) - return DoubleSerializer.class; - else - throw new IllegalArgumentException("No serializer for aggregation type " + dataType); + private void validate(String funcName, DataType dataType) throws IllegalArgumentException { + if ((funcName.equals(FunctionDesc.FUNC_SUM) // + || funcName.equals(FunctionDesc.FUNC_COUNT) // + || funcName.equals(FunctionDesc.FUNC_MAX) // + || funcName.equals(FunctionDesc.FUNC_MIN)) == false) + throw new IllegalArgumentException(); } @Override - public void validate(MeasureDesc measureDesc) throws IllegalArgumentException { - // TODO Auto-generated method stub - - } - - @Override public MeasureIngester<?> newIngester() { if (dataType.isIntegerFamily()) return new LongIngester(); @@ -72,7 +84,7 @@ public class BasicMeasureType extends MeasureType { else throw new IllegalArgumentException("No ingester for aggregation type " + dataType); } - + @Override public MeasureAggregator<?> newAggregator() { if (isSum() || isCount()) { @@ -99,21 +111,21 @@ public class BasicMeasureType extends MeasureType { } throw new IllegalArgumentException("No aggregator for func '" + funcName + "' and return type '" + dataType + "'"); } - + private boolean isSum() { - return FunctionDesc.FUNC_SUM.equalsIgnoreCase(funcName); + return FunctionDesc.FUNC_SUM.equals(funcName); } private boolean isCount() { - return FunctionDesc.FUNC_COUNT.equalsIgnoreCase(funcName); + return FunctionDesc.FUNC_COUNT.equals(funcName); } - + private boolean isMax() { - return FunctionDesc.FUNC_MAX.equalsIgnoreCase(funcName); + return FunctionDesc.FUNC_MAX.equals(funcName); } - + private boolean isMin() { - return FunctionDesc.FUNC_MIN.equalsIgnoreCase(funcName); + return FunctionDesc.FUNC_MIN.equals(funcName); } @Override @@ -126,5 +138,5 @@ public class BasicMeasureType extends MeasureType { // TODO Auto-generated method stub return null; } - + } http://git-wip-us.apache.org/repos/asf/kylin/blob/7f50c17a/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCAggregationFactory.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCAggregationFactory.java b/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCAggregationFactory.java deleted file mode 100644 index 13e5520..0000000 --- a/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCAggregationFactory.java +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package org.apache.kylin.measure.hllc; - -import org.apache.kylin.measure.IMeasureFactory; -import org.apache.kylin.measure.MeasureType; -import org.apache.kylin.metadata.datatype.DataType; -import org.apache.kylin.metadata.model.FunctionDesc; - -public class HLLCAggregationFactory implements IMeasureFactory { - - @Override - public MeasureType createMeasureType(String funcName, String dataType) { - if (FunctionDesc.FUNC_COUNT_DISTINCT.equalsIgnoreCase(funcName) == false) - throw new IllegalArgumentException(); - - return new HLLCMeasureType(DataType.getType(dataType)); - } - -} http://git-wip-us.apache.org/repos/asf/kylin/blob/7f50c17a/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCMeasureType.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCMeasureType.java b/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCMeasureType.java index 83fcf3b..073de6d 100644 --- a/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCMeasureType.java +++ b/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCMeasureType.java @@ -25,43 +25,72 @@ import org.apache.kylin.common.util.Dictionary; import org.apache.kylin.measure.MeasureAggregator; import org.apache.kylin.measure.MeasureIngester; import org.apache.kylin.measure.MeasureType; +import org.apache.kylin.measure.MeasureTypeFactory; import org.apache.kylin.metadata.datatype.DataType; import org.apache.kylin.metadata.datatype.DataTypeSerializer; +import org.apache.kylin.metadata.model.FunctionDesc; import org.apache.kylin.metadata.model.MeasureDesc; import org.apache.kylin.metadata.model.TblColRef; -public class HLLCMeasureType extends MeasureType { +public class HLLCMeasureType extends MeasureType<HyperLogLogPlusCounter> { + + public static final String FUNC_COUNT_DISTINCT = "COUNT_DISTINCT"; + public static final String DATATYPE_HLLC = "hllc"; + + public static class Factory extends MeasureTypeFactory<HyperLogLogPlusCounter> { + + @Override + public MeasureType<HyperLogLogPlusCounter> createMeasureType(String funcName, DataType dataType) { + return new HLLCMeasureType(funcName, dataType); + } + + @Override + public String getAggrFunctionName() { + return FUNC_COUNT_DISTINCT; + } + + @Override + public String getAggrDataTypeName() { + return DATATYPE_HLLC; + } + + @Override + public Class<? extends DataTypeSerializer<HyperLogLogPlusCounter>> getAggrDataTypeSerializer() { + return HLLCSerializer.class; + } + } + + // ============================================================================ private final DataType dataType; - public HLLCMeasureType(DataType dataType) { - if ("hllc".equals(dataType.getName()) == false) - throw new IllegalArgumentException(); - + public HLLCMeasureType(String funcName, DataType dataType) { + validate(funcName, dataType); this.dataType = dataType; - - if (this.dataType.getPrecision() < 10 || this.dataType.getPrecision() > 16) - throw new IllegalArgumentException("HLLC precision must be between 10 and 16"); } - @Override - public DataType getAggregationDataType() { - return dataType; + public void validate(FunctionDesc functionDesc) throws IllegalArgumentException { + validate(functionDesc.getExpression(), functionDesc.getReturnDataType()); } - @Override - public Class<? extends DataTypeSerializer<?>> getAggregationDataSeralizer() { - return HLLCSerializer.class; + private void validate(String funcName, DataType dataType) { + if (FUNC_COUNT_DISTINCT.equals(funcName) == false) + throw new IllegalArgumentException(); + + if (DATATYPE_HLLC.equals(dataType.getName()) == false) + throw new IllegalArgumentException(); + + if (dataType.getPrecision() < 1 || dataType.getPrecision() > 5000) + throw new IllegalArgumentException(); } @Override - public void validate(MeasureDesc measureDesc) throws IllegalArgumentException { - // TODO Auto-generated method stub - + public boolean isMemoryHungry() { + return true; } @Override - public MeasureIngester<?> newIngester() { + public MeasureIngester<HyperLogLogPlusCounter> newIngester() { return new MeasureIngester<HyperLogLogPlusCounter>() { HyperLogLogPlusCounter current = new HyperLogLogPlusCounter(dataType.getPrecision()); @@ -77,11 +106,8 @@ public class HLLCMeasureType extends MeasureType { } @Override - public MeasureAggregator<?> newAggregator() { - if (dataType.isHLLC()) - return new HLLCAggregator(dataType.getPrecision()); - else - return new LDCAggregator(); + public MeasureAggregator<HyperLogLogPlusCounter> newAggregator() { + return new HLLCAggregator(dataType.getPrecision()); } @Override @@ -95,4 +121,8 @@ public class HLLCMeasureType extends MeasureType { return null; } + public static boolean isCountDistinct(FunctionDesc func) { + return FUNC_COUNT_DISTINCT.equalsIgnoreCase(func.getExpression()); + } + } http://git-wip-us.apache.org/repos/asf/kylin/blob/7f50c17a/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCSerializer.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCSerializer.java b/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCSerializer.java index 7131201..7424493 100644 --- a/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCSerializer.java +++ b/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCSerializer.java @@ -81,7 +81,8 @@ public class HLLCSerializer extends DataTypeSerializer<HyperLogLogPlusCounter> { @Override public int getStorageBytesEstimate() { - return current().maxLength(); + // for HLL, it will be compressed when export to bytes + return (int) (current().maxLength() * 0.75); } } http://git-wip-us.apache.org/repos/asf/kylin/blob/7f50c17a/core-metadata/src/main/java/org/apache/kylin/measure/hllc/LDCAggregator.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/hllc/LDCAggregator.java b/core-metadata/src/main/java/org/apache/kylin/measure/hllc/LDCAggregator.java deleted file mode 100644 index 5d96450..0000000 --- a/core-metadata/src/main/java/org/apache/kylin/measure/hllc/LDCAggregator.java +++ /dev/null @@ -1,63 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package org.apache.kylin.measure.hllc; - -import org.apache.kylin.measure.MeasureAggregator; -import org.apache.kylin.metadata.datatype.LongMutable; - -/** - * Long Distinct Count - */ -@SuppressWarnings("serial") -public class LDCAggregator extends MeasureAggregator<LongMutable> { - - private static LongMutable ZERO = new LongMutable(0); - - private HLLCAggregator hllAgg = null; - private LongMutable state = new LongMutable(0); - - @SuppressWarnings("rawtypes") - public void setDependentAggregator(MeasureAggregator agg) { - this.hllAgg = (HLLCAggregator) agg; - } - - @Override - public void reset() { - } - - @Override - public void aggregate(LongMutable value) { - } - - @Override - public LongMutable getState() { - if (hllAgg == null) { - return ZERO; - } else { - state.set(hllAgg.getState().getCountEstimate()); - return state; - } - } - - @Override - public int getMemBytesEstimate() { - return guessLongMemBytes(); - } - -} http://git-wip-us.apache.org/repos/asf/kylin/blob/7f50c17a/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNMeasureFactory.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNMeasureFactory.java b/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNMeasureFactory.java deleted file mode 100644 index 1ceb607..0000000 --- a/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNMeasureFactory.java +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package org.apache.kylin.measure.topn; - -import org.apache.kylin.measure.IMeasureFactory; -import org.apache.kylin.measure.MeasureType; -import org.apache.kylin.metadata.datatype.DataType; -import org.apache.kylin.metadata.model.FunctionDesc; - -public class TopNMeasureFactory implements IMeasureFactory { - - @Override - public MeasureType createMeasureType(String funcName, String dataType) { - if (FunctionDesc.FUNC_TOP_N.equalsIgnoreCase(funcName) == false) - throw new IllegalArgumentException(); - - return new TopNMeasureType(DataType.getType(dataType)); - } - -} http://git-wip-us.apache.org/repos/asf/kylin/blob/7f50c17a/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNMeasureType.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNMeasureType.java b/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNMeasureType.java index 01582d0..c3d36ca 100644 --- a/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNMeasureType.java +++ b/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNMeasureType.java @@ -32,7 +32,7 @@ import org.apache.kylin.common.util.Dictionary; import org.apache.kylin.measure.MeasureAggregator; import org.apache.kylin.measure.MeasureIngester; import org.apache.kylin.measure.MeasureType; -import org.apache.kylin.measure.hllc.HLLCSerializer; +import org.apache.kylin.measure.MeasureTypeFactory; import org.apache.kylin.metadata.datatype.DataType; import org.apache.kylin.metadata.datatype.DataTypeSerializer; import org.apache.kylin.metadata.model.FunctionDesc; @@ -47,44 +47,70 @@ import org.slf4j.LoggerFactory; import com.google.common.collect.Lists; -public class TopNMeasureType extends MeasureType { +public class TopNMeasureType extends MeasureType<TopNCounter<ByteArray>> { private static final Logger logger = LoggerFactory.getLogger(TopNMeasureType.class); - private final DataType dataType; + public static final String FUNC_TOP_N = "TOP_N"; + public static final String DATATYPE_TOPN = "topn"; + + public static class Factory extends MeasureTypeFactory<TopNCounter<ByteArray>> { - public TopNMeasureType(DataType dataType) { - if ("topn".equals(dataType.getName()) == false) - throw new IllegalArgumentException(); + @Override + public MeasureType<TopNCounter<ByteArray>> createMeasureType(String funcName, DataType dataType) { + return new TopNMeasureType(funcName, dataType); + } - this.dataType = dataType; + @Override + public String getAggrFunctionName() { + return FUNC_TOP_N; + } + + @Override + public String getAggrDataTypeName() { + return DATATYPE_TOPN; + } - if (this.dataType.getPrecision() < 1 || this.dataType.getPrecision() > 1000) - throw new IllegalArgumentException("TopN precision must be between 1 and 1000"); + @Override + public Class<? extends DataTypeSerializer<TopNCounter<ByteArray>>> getAggrDataTypeSerializer() { + return TopNCounterSerializer.class; + } } + + // ============================================================================ - @Override - public DataType getAggregationDataType() { - return dataType; + private final DataType dataType; + + public TopNMeasureType(String funcName, DataType dataType) { + validate(funcName, dataType); + this.dataType = dataType; } - @Override - public Class<? extends DataTypeSerializer<?>> getAggregationDataSeralizer() { - return HLLCSerializer.class; + public void validate(FunctionDesc functionDesc) throws IllegalArgumentException { + validate(functionDesc.getExpression(), functionDesc.getReturnDataType()); } - @Override - public void validate(MeasureDesc measureDesc) throws IllegalArgumentException { - // TODO Auto-generated method stub + private void validate(String funcName, DataType dataType) { + if (FUNC_TOP_N.equals(funcName) == false) + throw new IllegalArgumentException(); + + if (DATATYPE_TOPN.equals(dataType.getName()) == false) + throw new IllegalArgumentException(); + if (dataType.getPrecision() < 1 || dataType.getPrecision() > 5000) + throw new IllegalArgumentException(); } - @SuppressWarnings("rawtypes") @Override - public MeasureIngester<?> newIngester() { - return new MeasureIngester<TopNCounter>() { + public boolean isMemoryHungry() { + return true; + } + + @Override + public MeasureIngester<TopNCounter<ByteArray>> newIngester() { + return new MeasureIngester<TopNCounter<ByteArray>>() { @Override - public TopNCounter valueOf(String[] values, MeasureDesc measureDesc, Map<TblColRef, Dictionary<String>> dictionaryMap) { + public TopNCounter<ByteArray> valueOf(String[] values, MeasureDesc measureDesc, Map<TblColRef, Dictionary<String>> dictionaryMap) { if (values.length != 2) throw new IllegalArgumentException(); @@ -92,7 +118,7 @@ public class TopNMeasureType extends MeasureType { String literal = values[1]; // encode literal using dictionary - TblColRef literalCol = measureDesc.getFunction().getTopNLiteralColumn(); + TblColRef literalCol = getTopNLiteralColumn(measureDesc.getFunction()); Dictionary<String> dictionary = dictionaryMap.get(literalCol); int keyEncodedValue = dictionary.getIdFromValue(literal); @@ -104,12 +130,11 @@ public class TopNMeasureType extends MeasureType { return topNCounter; } - @SuppressWarnings("unchecked") @Override - public TopNCounter reEncodeDictionary(TopNCounter value, MeasureDesc measureDesc, Map<TblColRef, Dictionary<String>> oldDicts, Map<TblColRef, Dictionary<String>> newDicts) { - TopNCounter<ByteArray> topNCounter = (TopNCounter<ByteArray>) value; + public TopNCounter<ByteArray> reEncodeDictionary(TopNCounter<ByteArray> value, MeasureDesc measureDesc, Map<TblColRef, Dictionary<String>> oldDicts, Map<TblColRef, Dictionary<String>> newDicts) { + TopNCounter<ByteArray> topNCounter = value; - TblColRef colRef = measureDesc.getFunction().getTopNLiteralColumn(); + TblColRef colRef = getTopNLiteralColumn(measureDesc.getFunction()); Dictionary<String> sourceDict = oldDicts.get(colRef); Dictionary<String> mergedDict = newDicts.get(colRef); @@ -138,7 +163,7 @@ public class TopNMeasureType extends MeasureType { } @Override - public MeasureAggregator<?> newAggregator() { + public MeasureAggregator<TopNCounter<ByteArray>> newAggregator() { return new TopNAggregator(); } @@ -159,11 +184,11 @@ public class TopNMeasureType extends MeasureType { // the measure function must be SUM FunctionDesc onlyFunction = digest.aggregations.iterator().next(); - if (onlyFunction.isSum() == false) + if (isTopNCompatibleSum(topN.getFunction(), onlyFunction) == false) return null; - TblColRef literalCol = topN.getFunction().getTopNLiteralColumn(); - if (unmatchedDimensions.contains(literalCol) && topN.getFunction().isTopNCompatibleSum(onlyFunction)) { + TblColRef literalCol = getTopNLiteralColumn(topN.getFunction()); + if (unmatchedDimensions.contains(literalCol)) { unmatchedDimensions.remove(literalCol); unmatchedAggregations.remove(onlyFunction); return new CapabilityInfluence() { @@ -176,6 +201,21 @@ public class TopNMeasureType extends MeasureType { return null; } + private boolean isTopNCompatibleSum(FunctionDesc topN, FunctionDesc sum) { + if (sum == null) + return false; + + if (!isTopN(topN) || !sum.isSum()) + return false; + + if (sum.getParameter().getColRefs().isEmpty()) + return false; + + TblColRef sumCol = sum.getParameter().getColRefs().get(0); + TblColRef topnNumCol = getTopNNumericColumn(topN); + return sumCol.equals(topnNumCol); + } + @Override public boolean needRewrite() { return false; @@ -189,7 +229,7 @@ public class TopNMeasureType extends MeasureType { @Override public void beforeStorageQuery(MeasureDesc measureDesc, SQLDigest sqlDigest) { FunctionDesc topnFunc = measureDesc.getFunction(); - TblColRef topnLiteralCol = topnFunc.getTopNLiteralColumn(); + TblColRef topnLiteralCol = getTopNLiteralColumn(topnFunc); if (sqlDigest.groupbyColumns.contains(topnLiteralCol) == false) return; @@ -213,12 +253,12 @@ public class TopNMeasureType extends MeasureType { public boolean needAdvancedTupleFilling() { return true; } - + @Override public void fillTupleSimply(Tuple tuple, int indexInTuple, Object measureValue) { throw new UnsupportedOperationException(); } - + @Override public IAdvMeasureFiller getAdvancedTupleFiller(FunctionDesc function, TupleInfo tupleInfo, Map<TblColRef, Dictionary<String>> dictionaryMap) { final TblColRef literalCol = getTopNLiteralColumn(function); @@ -227,7 +267,7 @@ public class TopNMeasureType extends MeasureType { final int literalTupleIdx = tupleInfo.hasColumn(literalCol) ? tupleInfo.getColumnIndex(literalCol) : -1; // for TopN, the aggr must be SUM, so the number fill into the column position (without rewrite) final int numericTupleIdx = tupleInfo.hasColumn(numericCol) ? tupleInfo.getColumnIndex(numericCol) : -1; - + return new IAdvMeasureFiller() { private TopNCounter<ByteArray> topNCounter; private Iterator<Counter<ByteArray>> topNCounterIterator; @@ -249,7 +289,7 @@ public class TopNMeasureType extends MeasureType { public void fillTuplle(Tuple tuple, int row) { if (expectRow++ != row) throw new IllegalStateException(); - + Counter<ByteArray> counter = topNCounterIterator.next(); int key = BytesUtil.readUnsigned(counter.getItem().array(), 0, counter.getItem().array().length); String colValue = topNColDict.getValueFromId(key); @@ -266,4 +306,8 @@ public class TopNMeasureType extends MeasureType { private TblColRef getTopNLiteralColumn(FunctionDesc functionDesc) { return functionDesc.getParameter().getColRefs().get(1); } + + private boolean isTopN(FunctionDesc functionDesc) { + return FUNC_TOP_N.equalsIgnoreCase(functionDesc.getExpression()); + } } http://git-wip-us.apache.org/repos/asf/kylin/blob/7f50c17a/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/DataType.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/DataType.java b/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/DataType.java index 235c99f..05e6744 100644 --- a/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/DataType.java +++ b/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/DataType.java @@ -21,6 +21,7 @@ package org.apache.kylin.metadata.datatype; import java.io.Serializable; import java.util.HashMap; import java.util.HashSet; +import java.util.LinkedHashSet; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; @@ -28,21 +29,37 @@ import java.util.concurrent.ConcurrentMap; import java.util.regex.Matcher; import java.util.regex.Pattern; +import org.apache.commons.lang.StringUtils; + /** */ @SuppressWarnings("serial") public class DataType implements Serializable { - // standard sql types, ref: http://www.w3schools.com/sql/sql_datatypes_general.asp - public static final String VALID_TYPES_STRING = "any|char|varchar|boolean|binary" // - + "|integer|tinyint|smallint|bigint|decimal|numeric|float|real|double" // - + "|date|time|datetime|timestamp|byte|int|short|long|string|hllc|topn"; + private static final LinkedHashSet<String> VALID_TYPES = new LinkedHashSet<String>(); + private static Pattern TYPE_PATTERN = null; private static final String TYPE_PATTEN_TAIL = "\\s*" // + "(?:" + "[(]" + "([\\d\\s,]+)" + "[)]" + ")?"; - private static final Pattern TYPE_PATTERN = Pattern.compile( // - "(" + VALID_TYPES_STRING + ")" + TYPE_PATTEN_TAIL, Pattern.CASE_INSENSITIVE); + public static synchronized void register(String... typeNames) { + for (String typeName : typeNames) { + VALID_TYPES.add(typeName); + } + + TYPE_PATTERN = Pattern.compile( // + "(" + StringUtils.join(VALID_TYPES, "|") + ")" // + + TYPE_PATTEN_TAIL, Pattern.CASE_INSENSITIVE); + } + + // standard sql types, ref: http://www.w3schools.com/sql/sql_datatypes_general.asp + static { + register("any", "char", "varchar", "string", // + "boolean", "byte", "binary", // + "int", "short", "long", "integer", "tinyint", "smallint", "bigint", // + "float", "real", "double", "decimal", "numeric", // + "date", "time", "datetime", "timestamp"); + } public static final Set<String> INTEGER_FAMILY = new HashSet<String>(); public static final Set<String> NUMBER_FAMILY = new HashSet<String>(); @@ -219,10 +236,6 @@ public class DataType implements Serializable { return name.equals("decimal"); } - public boolean isHLLC() { - return name.equals("hllc"); - } - public String getName() { return name; } http://git-wip-us.apache.org/repos/asf/kylin/blob/7f50c17a/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/DataTypeSerializer.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/DataTypeSerializer.java b/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/DataTypeSerializer.java index fd3121f..01f8cc6 100644 --- a/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/DataTypeSerializer.java +++ b/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/DataTypeSerializer.java @@ -19,13 +19,9 @@ package org.apache.kylin.metadata.datatype; import java.nio.ByteBuffer; -import java.util.Collections; -import java.util.HashMap; import java.util.Map; import org.apache.kylin.common.util.BytesSerializer; -import org.apache.kylin.measure.hllc.HLLCSerializer; -import org.apache.kylin.measure.topn.TopNCounterSerializer; import com.google.common.collect.Maps; @@ -36,28 +32,20 @@ import com.google.common.collect.Maps; */ abstract public class DataTypeSerializer<T> implements BytesSerializer<T> { - final static Map<String, Class<?>> implementations; + final static Map<String, Class<?>> implementations = Maps.newHashMap(); static { - HashMap<String, Class<?>> impl = Maps.newHashMap(); - impl.put("varchar", StringSerializer.class); - impl.put("decimal", BigDecimalSerializer.class); - impl.put("double", DoubleSerializer.class); - impl.put("float", DoubleSerializer.class); - impl.put("bigint", LongSerializer.class); - impl.put("long", LongSerializer.class); - impl.put("integer", LongSerializer.class); - impl.put("int", LongSerializer.class); - impl.put("smallint", LongSerializer.class); - impl.put("date", DateTimeSerializer.class); - impl.put("datetime", DateTimeSerializer.class); - impl.put("timestamp", DateTimeSerializer.class); - impl.put("topn", TopNCounterSerializer.class); - impl.put("hllc", HLLCSerializer.class); - implementations = Collections.unmodifiableMap(impl); - } - - public static boolean hasRegistered(String dataTypeName) { - return implementations.containsKey(dataTypeName); + implementations.put("varchar", StringSerializer.class); + implementations.put("decimal", BigDecimalSerializer.class); + implementations.put("double", DoubleSerializer.class); + implementations.put("float", DoubleSerializer.class); + implementations.put("bigint", LongSerializer.class); + implementations.put("long", LongSerializer.class); + implementations.put("integer", LongSerializer.class); + implementations.put("int", LongSerializer.class); + implementations.put("smallint", LongSerializer.class); + implementations.put("date", DateTimeSerializer.class); + implementations.put("datetime", DateTimeSerializer.class); + implementations.put("timestamp", DateTimeSerializer.class); } public static void register(String dataTypeName, Class<? extends DataTypeSerializer<?>> impl) { http://git-wip-us.apache.org/repos/asf/kylin/blob/7f50c17a/core-metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java index 104f3c7..39fe6b3 100644 --- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java +++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java @@ -22,6 +22,8 @@ import java.util.ArrayList; import java.util.Collection; import org.apache.kylin.measure.MeasureType; +import org.apache.kylin.measure.MeasureTypeFactory; +import org.apache.kylin.measure.basic.BasicMeasureType; import org.apache.kylin.metadata.datatype.DataType; import com.fasterxml.jackson.annotation.JsonAutoDetect; @@ -38,8 +40,6 @@ public class FunctionDesc { public static final String FUNC_MIN = "MIN"; public static final String FUNC_MAX = "MAX"; public static final String FUNC_COUNT = "COUNT"; - public static final String FUNC_COUNT_DISTINCT = "COUNT_DISTINCT"; - public static final String FUNC_TOP_N = "TOP_N"; public static final String PARAMETER_TYPE_CONSTANT = "constant"; public static final String PARAMETER_TYPE_COLUMN = "column"; @@ -52,7 +52,7 @@ public class FunctionDesc { private String returnType; private DataType returnDataType; - private MeasureType measureType; + private MeasureType<?> measureType; private boolean isDimensionAsMetric = false; public void init(TableDesc factTable) { @@ -80,12 +80,12 @@ public class FunctionDesc { } } - public MeasureType getMeasureType() { + public MeasureType<?> getMeasureType() { if (isDimensionAsMetric) return null; if (measureType == null) { - measureType = MeasureType.create(getExpression(), getReturnType()); + measureType = MeasureTypeFactory.create(getExpression(), getReturnDataType()); } return measureType; } @@ -108,12 +108,12 @@ public class FunctionDesc { } public DataType getRewriteFieldType() { - if (isCountDistinct() || isTopN()) - return DataType.ANY; - else if (isSum() || isMax() || isMin()) + if (isSum() || isMax() || isMin()) return parameter.getColRefs().get(0).getType(); - else + else if (getMeasureType() instanceof BasicMeasureType) return returnDataType; + else + return DataType.ANY; } public ColumnDesc newFakeRewriteColumn(TableDesc sourceTable) { @@ -142,22 +142,6 @@ public class FunctionDesc { return FUNC_COUNT.equalsIgnoreCase(expression); } - public boolean isCountDistinct() { - return FUNC_COUNT_DISTINCT.equalsIgnoreCase(expression); - } - - public boolean isTopN() { - return FUNC_TOP_N.equalsIgnoreCase(expression); - } - - public boolean isHolisticCountDistinct() { - if (isCountDistinct() && returnDataType != null && returnDataType.isBigInt()) { - return true; - } else { - return false; - } - } - /** * Get Full Expression such as sum(amount), count(1), count(*)... */ @@ -272,35 +256,4 @@ public class FunctionDesc { return "FunctionDesc [expression=" + expression + ", parameter=" + parameter + ", returnType=" + returnType + "]"; } - // cols[0] is numeric (e.g. GMV), cols[1] is literal (e.g. SELLER) - public TblColRef getTopNNumericColumn() { - if (isTopN() == false) - throw new IllegalStateException(); - - return parameter.getColRefs().get(0); - } - - // cols[0] is numeric (e.g. GMV), cols[1] is literal (e.g. SELLER) - public TblColRef getTopNLiteralColumn() { - if (isTopN() == false) - throw new IllegalStateException(); - - return parameter.getColRefs().get(1); - } - - public boolean isTopNCompatibleSum(FunctionDesc sum) { - if (isTopN() == false) - throw new IllegalStateException(); - - if (sum == null) { - return false; - } - - if (this.isTopN() && sum.isSum()) { - if (this.getParameter().getColRefs().get(0).equals(sum.getParameter().getColRefs().get(0))) - return true; - } - - return false; - } } http://git-wip-us.apache.org/repos/asf/kylin/blob/7f50c17a/core-metadata/src/main/java/org/apache/kylin/metadata/tuple/TeeTupleIterator.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/tuple/TeeTupleIterator.java b/core-metadata/src/main/java/org/apache/kylin/metadata/tuple/TeeTupleIterator.java index 49328e9..c12f18a 100644 --- a/core-metadata/src/main/java/org/apache/kylin/metadata/tuple/TeeTupleIterator.java +++ b/core-metadata/src/main/java/org/apache/kylin/metadata/tuple/TeeTupleIterator.java @@ -2,9 +2,6 @@ package org.apache.kylin.metadata.tuple; import java.util.List; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import com.google.common.collect.Lists; /** @@ -13,8 +10,6 @@ import com.google.common.collect.Lists; */ public class TeeTupleIterator implements ITupleIterator { - private static final Logger logger = LoggerFactory.getLogger(TeeTupleIterator.class); - private ITupleIterator underlying; private List<ITuple> duplicatedData; private List<TeeTupleItrListener> listeners = Lists.newArrayList(); http://git-wip-us.apache.org/repos/asf/kylin/blob/7f50c17a/core-metadata/src/main/java/org/apache/kylin/metadata/tuple/Tuple.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/tuple/Tuple.java b/core-metadata/src/main/java/org/apache/kylin/metadata/tuple/Tuple.java index c79e1f8..d38aafd 100644 --- a/core-metadata/src/main/java/org/apache/kylin/metadata/tuple/Tuple.java +++ b/core-metadata/src/main/java/org/apache/kylin/metadata/tuple/Tuple.java @@ -24,8 +24,6 @@ import java.util.List; import net.sf.ehcache.pool.sizeof.annotations.IgnoreSizeOf; -import org.apache.kylin.common.hll.HyperLogLogPlusCounter; -import org.apache.kylin.common.topn.TopNCounter; import org.apache.kylin.common.util.DateFormat; import org.apache.kylin.metadata.datatype.DoubleMutable; import org.apache.kylin.metadata.datatype.LongMutable; @@ -61,15 +59,7 @@ public class Tuple implements ITuple { public ITuple makeCopy() { Tuple ret = new Tuple(this.info); for (int i = 0; i < this.values.length; ++i) { - if (this.values[i] == null) { - ret.values[i] = null; - } else if (this.values[i] instanceof HyperLogLogPlusCounter) { - ret.values[i] = new HyperLogLogPlusCounter((HyperLogLogPlusCounter) this.values[i]); - } else if (this.values[i] instanceof TopNCounter) { - ret.values[i] = null; - } else { - ret.values[i] = this.values[i]; - } + ret.values[i] = this.values[i]; } return ret; }