KYLIN-2552 Fix backward compatibility (#617)
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/bb1517bb Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/bb1517bb Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/bb1517bb Branch: refs/heads/master-hadoop3.0 Commit: bb1517bb56cd9230836ab2df807680e960aa943a Parents: 675c0d8 Author: FAN XIE <xiefan.s...@outlook.com> Authored: Tue Apr 18 15:20:00 2017 +0800 Committer: hongbin ma <m...@kyligence.io> Committed: Tue Apr 18 15:20:00 2017 +0800 ---------------------------------------------------------------------- .../mr/steps/FactDistinctColumnsMapper.java | 54 ++++++++++++++++++-- 1 file changed, 50 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/bb1517bb/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapper.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapper.java index e6cea2b..993dafa 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapper.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapper.java @@ -24,6 +24,8 @@ import java.util.Collection; import java.util.List; import org.apache.hadoop.io.Text; +import org.apache.kylin.common.KylinVersion; +import org.apache.kylin.common.util.ByteArray; import org.apache.kylin.common.util.Bytes; import org.apache.kylin.common.util.StringUtil; import org.apache.kylin.cube.cuboid.CuboidScheduler; @@ -64,6 +66,7 @@ public class FactDistinctColumnsMapper<KEYIN> extends FactDistinctColumnsMapperB private int samplingPercentage; //private ByteArray[] row_hashcodes = null; private long[] rowHashCodesLong = null; + private ByteArray[] row_hashcodes = null; private ByteBuffer tmpbuf; private static final Text EMPTY_TEXT = new Text(); public static final byte MARK_FOR_PARTITION_COL = (byte) 0xFE; @@ -74,6 +77,9 @@ public class FactDistinctColumnsMapper<KEYIN> extends FactDistinctColumnsMapperB private SelfDefineSortableKey sortableKey = new SelfDefineSortableKey(); + //about details of the new algorithm, please see KYLIN-2518 + private boolean isUsePutRowKeyToHllNewAlgorithm; + @Override protected void setup(Context context) throws IOException { super.setup(context); @@ -96,8 +102,7 @@ public class FactDistinctColumnsMapper<KEYIN> extends FactDistinctColumnsMapperB allCuboidsHLL[i] = new HLLCounter(cubeDesc.getConfig().getCubeStatsHLLPrecision(), RegisterType.DENSE); } - hf = Hashing.murmur3_128(); - rowHashCodesLong = new long[nRowKey]; + TblColRef partitionColRef = cubeDesc.getModel().getPartitionDesc().getPartitionDateColumnRef(); if (partitionColRef != null) { @@ -111,7 +116,21 @@ public class FactDistinctColumnsMapper<KEYIN> extends FactDistinctColumnsMapperB } else { needFetchPartitionCol = true; } + //for KYLIN-2518 backward compatibility + if (KylinVersion.isBefore200(cubeDesc.getVersion())) { + isUsePutRowKeyToHllNewAlgorithm = false; + row_hashcodes = new ByteArray[nRowKey]; + for (int i = 0; i < nRowKey; i++) { + row_hashcodes[i] = new ByteArray(); + } + hf = Hashing.murmur3_32(); + } else { + isUsePutRowKeyToHllNewAlgorithm = true; + rowHashCodesLong = new long[nRowKey]; + hf = Hashing.murmur3_128(); + } } + } private void addCuboidBitSet(long cuboidId, List<Integer[]> allCuboidsBitSet, List<Long> allCuboids) { @@ -176,7 +195,11 @@ public class FactDistinctColumnsMapper<KEYIN> extends FactDistinctColumnsMapperB if (collectStatistics) { if (rowCount % 100 < samplingPercentage) { - putRowKeyToHLL(row); + if (isUsePutRowKeyToHllNewAlgorithm) { + putRowKeyToHLLNew(row); + } else { + putRowKeyToHLLOld(row); + } } if (needFetchPartitionCol == true) { @@ -208,7 +231,30 @@ public class FactDistinctColumnsMapper<KEYIN> extends FactDistinctColumnsMapperB return size; } - private void putRowKeyToHLL(String[] row) { + private void putRowKeyToHLLOld(String[] row) { + //generate hash for each row key column + for (int i = 0; i < nRowKey; i++) { + Hasher hc = hf.newHasher(); + String colValue = row[intermediateTableDesc.getRowKeyColumnIndexes()[i]]; + if (colValue != null) { + row_hashcodes[i].set(hc.putString(colValue).hash().asBytes()); + } else { + row_hashcodes[i].set(hc.putInt(0).hash().asBytes()); + } + } + + // user the row key column hash to get a consolidated hash for each cuboid + for (int i = 0, n = allCuboidsBitSet.length; i < n; i++) { + Hasher hc = hf.newHasher(); + for (int position = 0; position < allCuboidsBitSet[i].length; position++) { + hc.putBytes(row_hashcodes[allCuboidsBitSet[i][position]].array()); + } + + allCuboidsHLL[i].add(hc.hash().asBytes()); + } + } + + private void putRowKeyToHLLNew(String[] row) { //generate hash for each row key column for (int i = 0; i < nRowKey; i++) { Hasher hc = hf.newHasher();