draft
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/18e7999d Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/18e7999d Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/18e7999d Branch: refs/heads/KYLIN-942 Commit: 18e7999dc1d256723cef7649fef8a36e2d6a84b2 Parents: 38f7d83 Author: honma <ho...@ebay.com> Authored: Mon Oct 12 14:47:58 2015 +0800 Committer: honma <ho...@ebay.com> Committed: Wed Oct 14 12:48:54 2015 +0800 ---------------------------------------------------------------------- .../apache/kylin/common/util/ShardingHash.java | 2 +- .../cube/common/FuzzyValueCombination.java | 130 +++++++++++++++++++ .../kylin/cube/gridtable/CubeCodeSystem.java | 12 +- .../cube/gridtable/TrimmedCubeCodeSystem.java | 2 +- .../apache/kylin/cube/kv/FuzzyKeyEncoder.java | 2 +- .../apache/kylin/cube/kv/FuzzyMaskEncoder.java | 8 +- .../org/apache/kylin/cube/kv/RowConstants.java | 7 +- .../org/apache/kylin/cube/kv/RowKeyEncoder.java | 2 +- .../org/apache/kylin/gridtable/GTBuilder.java | 4 +- .../java/org/apache/kylin/gridtable/GTInfo.java | 16 ++- .../org/apache/kylin/gridtable/GTRecord.java | 34 +---- .../org/apache/kylin/gridtable/GTScanRange.java | 30 +---- .../kylin/gridtable/GTScanRangePlanner.java | 106 +++++++++++---- .../apache/kylin/gridtable/GTScanRequest.java | 9 +- .../kylin/gridtable/DictGridTableTest.java | 4 +- .../translate/FuzzyValueCombination.java | 30 ++--- dev-support/test_all.sh | 11 ++ .../mr/steps/MapContextGTRecordWriter.java | 2 +- .../kylin/engine/mr/steps/NDCuboidMapper.java | 4 +- .../hbase/cube/v2/CubeHBaseEndpointRPC.java | 8 +- .../storage/hbase/cube/v2/CubeHBaseRPC.java | 94 ++++++++++++-- .../storage/hbase/cube/v2/CubeHBaseScanRPC.java | 10 +- .../hbase/cube/v2/CubeSegmentScanner.java | 64 ++++++++- .../kylin/storage/hbase/cube/v2/HBaseScan.java | 88 +++++++++++++ .../kylin/storage/hbase/cube/v2/RawScan.java | 20 +++ .../storage/hbase/steps/HBaseCuboidWriter.java | 2 +- 26 files changed, 542 insertions(+), 159 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/18e7999d/core-common/src/main/java/org/apache/kylin/common/util/ShardingHash.java ---------------------------------------------------------------------- diff --git a/core-common/src/main/java/org/apache/kylin/common/util/ShardingHash.java b/core-common/src/main/java/org/apache/kylin/common/util/ShardingHash.java index 97feda1..8d728c8 100644 --- a/core-common/src/main/java/org/apache/kylin/common/util/ShardingHash.java +++ b/core-common/src/main/java/org/apache/kylin/common/util/ShardingHash.java @@ -50,7 +50,7 @@ public class ShardingHash { return _getShard(hash, totalShards); } - public static short getShard(short cuboidShardBase, short shardOffset, int totalShards) { + public static short normalize(short cuboidShardBase, short shardOffset, int totalShards) { if (totalShards <= 1) { return 0; } http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/18e7999d/core-cube/src/main/java/org/apache/kylin/cube/common/FuzzyValueCombination.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/common/FuzzyValueCombination.java b/core-cube/src/main/java/org/apache/kylin/cube/common/FuzzyValueCombination.java new file mode 100644 index 0000000..4ddb06a --- /dev/null +++ b/core-cube/src/main/java/org/apache/kylin/cube/common/FuzzyValueCombination.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.cube.common; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; + +public class FuzzyValueCombination { + + private static class Dim<K, V> { + K col; + Set<V> values; + } + + private static final Set SINGLE_NULL_SET = Sets.newHashSet(); + + static { + SINGLE_NULL_SET.add(null); + } + + public static <K, V> List<Map<K, V>> calculate(Map<K, Set<V>> fuzzyValues, long cap) { + Collections.emptyMap(); + Dim<K, V>[] dims = toDims(fuzzyValues); + // If a query has many IN clause and each IN clause has many values, then it will easily generate + // thousands of fuzzy keys. When there are lots of fuzzy keys, the scan performance is bottle necked + // on it. So simply choose to abandon all fuzzy keys in this case. + if (exceedCap(dims, cap)) { + return Lists.newArrayList(); + } else { + return combination(dims); + } + } + + @SuppressWarnings("unchecked") + private static <K, V> List<Map<K, V>> combination(Dim<K, V>[] dims) { + + List<Map<K, V>> result = Lists.newArrayList(); + + int emptyDims = 0; + for (Dim dim : dims) { + if (dim.values.isEmpty()) { + dim.values = SINGLE_NULL_SET; + emptyDims++; + } + } + if (emptyDims == dims.length) { + return result; + } + + Map<K, V> r = Maps.newHashMap(); + Iterator<V>[] iters = new Iterator[dims.length]; + int level = 0; + while (true) { + Dim<K, V> dim = dims[level]; + if (iters[level] == null) { + iters[level] = dim.values.iterator(); + } + + Iterator<V> it = iters[level]; + if (it.hasNext() == false) { + if (level == 0) + break; + r.remove(dim.col); + iters[level] = null; + level--; + continue; + } + + r.put(dim.col, it.next()); + if (level == dims.length - 1) { + result.add(new HashMap<K, V>(r)); + } else { + level++; + } + } + return result; + } + + private static <K, V> Dim<K, V>[] toDims(Map<K, Set<V>> fuzzyValues) { + Dim[] dims = new Dim[fuzzyValues.size()]; + int i = 0; + for (Entry<K, Set<V>> entry : fuzzyValues.entrySet()) { + dims[i] = new Dim<K, V>(); + dims[i].col = entry.getKey(); + dims[i].values = entry.getValue(); + if (dims[i].values == null) + dims[i].values = Collections.emptySet(); + i++; + } + return dims; + } + + private static boolean exceedCap(Dim[] dims, long cap) { + return combCount(dims) > cap; + } + + private static long combCount(Dim[] dims) { + long count = 1; + for (Dim dim : dims) { + count *= Math.max(dim.values.size(), 1); + } + return count; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/18e7999d/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CubeCodeSystem.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CubeCodeSystem.java b/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CubeCodeSystem.java index e52a6e1..99258e9 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CubeCodeSystem.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CubeCodeSystem.java @@ -18,17 +18,15 @@ import org.apache.kylin.gridtable.IGTComparator; import org.apache.kylin.metadata.measure.MeasureAggregator; import org.apache.kylin.metadata.measure.serializer.DataTypeSerializer; import org.apache.kylin.metadata.measure.serializer.StringSerializer; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** - * Created by shaoshi on 3/23/15. - * This implementation uses Dictionary to encode and decode the table; If a column doesn't have dictionary, will check - * its data type to serialize/deserialize it; + * defines how column values will be encoded to/ decoded from GTRecord + * + * Cube meta can provide which columns are dictionary encoded (dict encoded dimensions) or fixed length encoded (fixed length dimensions) + * Metrics columns are more flexible, they will use DataTypeSerializer according to their data type. */ @SuppressWarnings({ "rawtypes", "unchecked" }) public class CubeCodeSystem implements IGTCodeSystem { - private static final Logger logger = LoggerFactory.getLogger(CubeCodeSystem.class); // ============================================================================ @@ -113,7 +111,7 @@ public class CubeCodeSystem implements IGTCodeSystem { if (serializer instanceof DictionarySerializer) { ((DictionarySerializer) serializer).serializeWithRounding(value, roundingFlag, buf); } else { - if ((!(serializer instanceof StringSerializer || serializer instanceof FixLenSerializer)) && (value instanceof String)) { + if ((value instanceof String) && (!(serializer instanceof StringSerializer || serializer instanceof FixLenSerializer))) { value = serializer.valueOf((String) value); } serializer.serialize(value, buf); http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/18e7999d/core-cube/src/main/java/org/apache/kylin/cube/gridtable/TrimmedCubeCodeSystem.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/gridtable/TrimmedCubeCodeSystem.java b/core-cube/src/main/java/org/apache/kylin/cube/gridtable/TrimmedCubeCodeSystem.java index e662a82..ea020f3 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/gridtable/TrimmedCubeCodeSystem.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/gridtable/TrimmedCubeCodeSystem.java @@ -78,7 +78,7 @@ public class TrimmedCubeCodeSystem implements IGTCodeSystem { //TODO: remove this check throw new IllegalStateException("Encode dictionary value in coprocessor"); } else { - if ((!(serializer instanceof StringSerializer || serializer instanceof CubeCodeSystem.FixLenSerializer)) && (value instanceof String)) { + if (((value instanceof String) && !(serializer instanceof StringSerializer || serializer instanceof CubeCodeSystem.FixLenSerializer))) { value = serializer.valueOf((String) value); } serializer.serialize(value, buf); http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/18e7999d/core-cube/src/main/java/org/apache/kylin/cube/kv/FuzzyKeyEncoder.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/kv/FuzzyKeyEncoder.java b/core-cube/src/main/java/org/apache/kylin/cube/kv/FuzzyKeyEncoder.java index a17bb28..2185bc5 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/kv/FuzzyKeyEncoder.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/kv/FuzzyKeyEncoder.java @@ -37,7 +37,7 @@ public class FuzzyKeyEncoder extends RowKeyEncoder { @Override protected byte[] defaultValue(int length) { byte[] keyBytes = new byte[length]; - Arrays.fill(keyBytes, RowConstants.FUZZY_MASK_ZERO); + Arrays.fill(keyBytes, RowConstants.BYTE_ZERO); return keyBytes; } } http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/18e7999d/core-cube/src/main/java/org/apache/kylin/cube/kv/FuzzyMaskEncoder.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/kv/FuzzyMaskEncoder.java b/core-cube/src/main/java/org/apache/kylin/cube/kv/FuzzyMaskEncoder.java index 254482c..bf67538 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/kv/FuzzyMaskEncoder.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/kv/FuzzyMaskEncoder.java @@ -37,18 +37,18 @@ public class FuzzyMaskEncoder extends RowKeyEncoder { @Override protected int fillHeader(byte[] bytes) { - Arrays.fill(bytes, 0, RowConstants.ROWKEY_SHARDID_LEN, RowConstants.FUZZY_MASK_ONE); + Arrays.fill(bytes, 0, RowConstants.ROWKEY_SHARDID_LEN, RowConstants.BYTE_ONE); // always fuzzy match cuboid ID to lock on the selected cuboid - Arrays.fill(bytes, RowConstants.ROWKEY_SHARDID_LEN, RowConstants.ROWKEY_HEADER_LEN, RowConstants.FUZZY_MASK_ZERO); + Arrays.fill(bytes, RowConstants.ROWKEY_SHARDID_LEN, RowConstants.ROWKEY_HEADER_LEN, RowConstants.BYTE_ZERO); return this.headerLength; } @Override protected void fillColumnValue(TblColRef column, int columnLen, byte[] value, int valueLen, byte[] outputValue, int outputValueOffset) { if (value == null) { - Arrays.fill(outputValue, outputValueOffset, outputValueOffset + columnLen, RowConstants.FUZZY_MASK_ONE); + Arrays.fill(outputValue, outputValueOffset, outputValueOffset + columnLen, RowConstants.BYTE_ONE); } else { - Arrays.fill(outputValue, outputValueOffset, outputValueOffset + columnLen, RowConstants.FUZZY_MASK_ZERO); + Arrays.fill(outputValue, outputValueOffset, outputValueOffset + columnLen, RowConstants.BYTE_ZERO); } } } http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/18e7999d/core-cube/src/main/java/org/apache/kylin/cube/kv/RowConstants.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/kv/RowConstants.java b/core-cube/src/main/java/org/apache/kylin/cube/kv/RowConstants.java index c5adfb5..6a8eeb5 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/kv/RowConstants.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/kv/RowConstants.java @@ -33,10 +33,9 @@ public class RowConstants { public static final int ROWKEY_SHARDID_LEN = 2; public static final int ROWKEY_HEADER_LEN = ROWKEY_CUBOIDID_LEN + ROWKEY_SHARDID_LEN; - - // fuzzy mask - public static final byte FUZZY_MASK_ZERO = 0; - public static final byte FUZZY_MASK_ONE = 1; + + public static final byte BYTE_ZERO = 0; + public static final byte BYTE_ONE = 1; // row value delimiter public static final byte ROWVALUE_DELIMITER_BYTE = 7; http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/18e7999d/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyEncoder.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyEncoder.java b/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyEncoder.java index bc4a927..0676df6 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyEncoder.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyEncoder.java @@ -99,7 +99,7 @@ public class RowKeyEncoder extends AbstractRowKeyEncoder { if (encodeShard) { short cuboidShardNum = cubeSeg.getCuboidShardNum(cuboid.getId()); short shardOffset = ShardingHash.getShard(bytes, RowConstants.ROWKEY_HEADER_LEN, bytes.length - RowConstants.ROWKEY_HEADER_LEN, cuboidShardNum); - short finalShard = ShardingHash.getShard(cubeSeg.getCuboidBaseShard(cuboid.getId()), shardOffset, cubeSeg.getTotalShards()); + short finalShard = ShardingHash.normalize(cubeSeg.getCuboidBaseShard(cuboid.getId()), shardOffset, cubeSeg.getTotalShards()); BytesUtil.writeShort(finalShard, bytes, offset, RowConstants.ROWKEY_SHARDID_LEN); } else { BytesUtil.writeShort((short) 0, bytes, offset, RowConstants.ROWKEY_SHARDID_LEN); http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/18e7999d/core-cube/src/main/java/org/apache/kylin/gridtable/GTBuilder.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTBuilder.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTBuilder.java index 31ea9e2..5eefa54 100644 --- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTBuilder.java +++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTBuilder.java @@ -19,9 +19,9 @@ public class GTBuilder implements Closeable { this.info = info; if (append) { - storeWriter = store.append(shard); + storeWriter = store.append(); } else { - storeWriter = store.rebuild(shard); + storeWriter = store.rebuild(); } } http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/18e7999d/core-cube/src/main/java/org/apache/kylin/gridtable/GTInfo.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTInfo.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTInfo.java index d0559ad..c0ffcc1 100644 --- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTInfo.java +++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTInfo.java @@ -53,7 +53,7 @@ public class GTInfo { public ImmutableBitSet getPrimaryKey() { return primaryKey; } - + public ImmutableBitSet getAllColumns() { return colAll; } @@ -112,7 +112,7 @@ public class GTInfo { public void validateColRef(TblColRef ref) { TblColRef expected = colRef(ref.getColumnDesc().getZeroBasedIndex()); - if (expected.equals(ref) == false) + if (!expected.equals(ref)) throw new IllegalArgumentException(); } @@ -155,11 +155,11 @@ public class GTInfo { for (int i = 0; i < colBlocks.length; i++) { merge = merge.or(colBlocks[i]); } - if (merge.equals(colAll) == false) + if (!merge.equals(colAll)) throw new IllegalStateException(); // primary key must be the first column block - if (primaryKey.equals(colBlocks[0]) == false) + if (!primaryKey.equals(colBlocks[0])) throw new IllegalStateException(); // drop empty column block @@ -170,7 +170,7 @@ public class GTInfo { if (cb.isEmpty()) it.remove(); } - colBlocks = (ImmutableBitSet[]) list.toArray(new ImmutableBitSet[list.size()]); + colBlocks = list.toArray(new ImmutableBitSet[list.size()]); } public static class Builder { @@ -243,8 +243,12 @@ public class GTInfo { return KryoUtils.serialize(info); } } - + public static GTInfo deserialize(byte[] bytes) { return KryoUtils.deserialize(bytes, GTInfo.class); } + + public IGTCodeSystem getCodeSystem() { + return codeSystem; + } } http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/18e7999d/core-cube/src/main/java/org/apache/kylin/gridtable/GTRecord.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTRecord.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTRecord.java index dbfdf57..0d02655 100644 --- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTRecord.java +++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTRecord.java @@ -2,7 +2,6 @@ package org.apache.kylin.gridtable; import java.nio.ByteBuffer; import java.util.Arrays; -import java.util.BitSet; import org.apache.kylin.common.util.ByteArray; import org.apache.kylin.common.util.ImmutableBitSet; @@ -159,7 +158,7 @@ public class GTRecord implements Comparable<GTRecord> { return false; for (int i = 0; i < maskForEqualHashComp.trueBitCount(); i++) { int c = maskForEqualHashComp.trueBitAt(i); - if (this.cols[c].equals(o.cols[c]) == false) { + if (!this.cols[c].equals(o.cols[c])) { return false; } } @@ -228,7 +227,10 @@ public class GTRecord implements Comparable<GTRecord> { buf.setLength(pos); } - /** write data to given buffer, like serialize, UNLIKE other export this will put a prefix indicating null or not*/ + /** + * write data to given buffer, like serialize, UNLIKE other export this will put a prefix indicating null or not. + * for saving space + */ public void exportAllColumns(ByteBuffer buf) { for (int i = 0; i < info.colAll.trueBitCount(); i++) { int c = info.colAll.trueBitAt(i); @@ -300,30 +302,4 @@ public class GTRecord implements Comparable<GTRecord> { } } - /** similar to export(primaryKey) but will stop at the first null value */ - public static ByteArray exportScanKey(GTRecord rec) { - if (rec == null) - return null; - - GTInfo info = rec.getInfo(); - - BitSet selectedColumns = new BitSet(); - int len = 0; - for (int i = 0; i < info.primaryKey.trueBitCount(); i++) { - int c = info.primaryKey.trueBitAt(i); - if (rec.cols[c].array() == null) { - break; - } - selectedColumns.set(c); - len += rec.cols[c].length(); - } - - if (selectedColumns.cardinality() == 0) - return null; - - ByteArray buf = ByteArray.allocate(len); - rec.exportColumns(new ImmutableBitSet(selectedColumns), buf); - return buf; - } - } http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/18e7999d/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRange.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRange.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRange.java index 197fde4..eefe88e 100644 --- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRange.java +++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRange.java @@ -7,42 +7,26 @@ public class GTScanRange { final public GTRecord pkStart; // inclusive, record must not be null, col[pk].array() can be null to mean unbounded final public GTRecord pkEnd; // inclusive, record must not be null, col[pk].array() can be null to mean unbounded - final public List<GTRecord> hbaseFuzzyKeys; // partial matching primary keys + final public List<GTRecord> fuzzyKeys; // partial matching primary keys public GTScanRange(GTRecord pkStart, GTRecord pkEnd) { this(pkStart, pkEnd, null); } - public GTScanRange(GTRecord pkStart, GTRecord pkEnd, List<GTRecord> hbaseFuzzyKeys) { + public GTScanRange(GTRecord pkStart, GTRecord pkEnd, List<GTRecord> fuzzyKeys) { GTInfo info = pkStart.info; assert info == pkEnd.info; - validateRangeKey(pkStart); - validateRangeKey(pkEnd); - this.pkStart = pkStart; this.pkEnd = pkEnd; - this.hbaseFuzzyKeys = hbaseFuzzyKeys == null ? Collections.<GTRecord> emptyList() : hbaseFuzzyKeys; - } - - private void validateRangeKey(GTRecord pk) { - pk.maskForEqualHashComp(pk.info.primaryKey); - boolean afterNull = false; - for (int i = 0; i < pk.info.primaryKey.trueBitCount(); i++) { - int c = pk.info.primaryKey.trueBitAt(i); - if (afterNull) { - pk.cols[c].set(null, 0, 0); - } else { - afterNull = pk.cols[c].array() == null; - } - } + this.fuzzyKeys = fuzzyKeys == null ? Collections.<GTRecord> emptyList() : fuzzyKeys; } @Override public int hashCode() { final int prime = 31; int result = 1; - result = prime * result + ((hbaseFuzzyKeys == null) ? 0 : hbaseFuzzyKeys.hashCode()); + result = prime * result + ((fuzzyKeys == null) ? 0 : fuzzyKeys.hashCode()); result = prime * result + ((pkEnd == null) ? 0 : pkEnd.hashCode()); result = prime * result + ((pkStart == null) ? 0 : pkStart.hashCode()); return result; @@ -57,10 +41,10 @@ public class GTScanRange { if (getClass() != obj.getClass()) return false; GTScanRange other = (GTScanRange) obj; - if (hbaseFuzzyKeys == null) { - if (other.hbaseFuzzyKeys != null) + if (fuzzyKeys == null) { + if (other.fuzzyKeys != null) return false; - } else if (!hbaseFuzzyKeys.equals(other.hbaseFuzzyKeys)) + } else if (!fuzzyKeys.equals(other.fuzzyKeys)) return false; if (pkEnd == null) { if (other.pkEnd != null) http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/18e7999d/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRangePlanner.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRangePlanner.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRangePlanner.java index 58114d7..d01fe2c 100644 --- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRangePlanner.java +++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRangePlanner.java @@ -1,6 +1,7 @@ package org.apache.kylin.gridtable; import java.util.ArrayList; +import java.util.BitSet; import java.util.Collection; import java.util.Collections; import java.util.Comparator; @@ -11,32 +12,45 @@ import java.util.List; import java.util.Map; import java.util.Set; -import com.google.common.collect.Maps; +import org.apache.kylin.common.debug.BackdoorToggles; import org.apache.kylin.common.util.ByteArray; import org.apache.kylin.common.util.ImmutableBitSet; +import org.apache.kylin.common.util.Pair; +import org.apache.kylin.cube.common.FuzzyValueCombination; import org.apache.kylin.metadata.filter.CompareTupleFilter; import org.apache.kylin.metadata.filter.LogicalTupleFilter; import org.apache.kylin.metadata.filter.TupleFilter; import org.apache.kylin.metadata.filter.TupleFilter.FilterOperatorEnum; import org.apache.kylin.metadata.model.TblColRef; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import com.google.common.collect.Sets; public class GTScanRangePlanner { + private static final Logger logger = LoggerFactory.getLogger(GTScanRangePlanner.class); + private static final int MAX_HBASE_FUZZY_KEYS = 100; final private GTInfo info; + final private Pair<ByteArray, ByteArray> segmentStartAndEnd; + final private TblColRef partitionColRef; + final private ComparatorEx<ByteArray> byteUnknownIsSmaller; final private ComparatorEx<ByteArray> byteUnknownIsBigger; final private ComparatorEx<GTRecord> recordUnknownIsSmaller; final private ComparatorEx<GTRecord> recordUnknownIsBigger; - public GTScanRangePlanner(GTInfo info) { + public GTScanRangePlanner(GTInfo info, Pair<ByteArray, ByteArray> segmentStartAndEnd, TblColRef partitionColRef) { this.info = info; + this.segmentStartAndEnd = segmentStartAndEnd; + this.partitionColRef = partitionColRef; IGTComparator comp = info.codeSystem.getComparator(); + this.byteUnknownIsSmaller = byteComparatorTreatsUnknownSmaller(comp); this.byteUnknownIsBigger = byteComparatorTreatsUnknownBigger(comp); this.recordUnknownIsSmaller = recordComparatorTreatsUnknownSmaller(comp); @@ -58,7 +72,8 @@ public class GTScanRangePlanner { List<GTScanRange> scanRanges = Lists.newArrayListWithCapacity(orAndDimRanges.size()); for (Collection<ColumnRange> andDimRanges : orAndDimRanges) { GTScanRange scanRange = newScanRange(andDimRanges); - scanRanges.add(scanRange); + if (scanRange != null) + scanRanges.add(scanRange); } List<GTScanRange> mergedRanges = mergeOverlapRanges(scanRanges); @@ -70,40 +85,75 @@ public class GTScanRangePlanner { private GTScanRange newScanRange(Collection<ColumnRange> andDimRanges) { GTRecord pkStart = new GTRecord(info); GTRecord pkEnd = new GTRecord(info); - Map<Integer,Set<ByteArray>> fuzzyValues = Maps.newHashMap(); - - List<GTRecord> hbaseFuzzyKeys = Lists.newArrayList(); + Map<Integer, Set<ByteArray>> fuzzyValues = Maps.newHashMap(); + + List<GTRecord> fuzzyKeys; for (ColumnRange range : andDimRanges) { + + if (partitionColRef != null && range.column.equals(partitionColRef)) { + boolean rangeEndGTESegStart = true; + + if (segmentStartAndEnd.getFirst().array() != null && range.end.array() != null) { + rangeEndGTESegStart = info.getCodeSystem().getComparator().compare(range.end, segmentStartAndEnd.getFirst()) >= 0; + } + + if (!rangeEndGTESegStart) { + return null; + } + + boolean rangeStartLTESegEnd = true; + if (segmentStartAndEnd.getSecond().array() != null && range.begin.array() != null) { + rangeStartLTESegEnd = info.getCodeSystem().getComparator().compare(range.begin, segmentStartAndEnd.getSecond()) <= 0; + } + + if (!rangeStartLTESegEnd) { + return null; + } + } + int col = range.column.getColumnDesc().getZeroBasedIndex(); - if (info.primaryKey.get(col) == false) + if (!info.primaryKey.get(col)) continue; pkStart.set(col, range.begin); pkEnd.set(col, range.end); -// if (range.equals != null) { -// ImmutableBitSet fuzzyMask = new ImmutableBitSet(col); -// for (ByteArray v : range.equals) { -// GTRecord fuzzy = new GTRecord(info); -// fuzzy.set(col, v); -// fuzzy.maskForEqualHashComp(fuzzyMask); -// hbaseFuzzyKeys.add(fuzzy); -// } -// } - - if(range.valueSet != null) - { - for (ByteArray v : range.equals) { -// GTRecord fuzzy = new GTRecord(info); -// fuzzy.set(col, v); -// fuzzy.maskForEqualHashComp(fuzzyMask); -// hbaseFuzzyKeys.add(fuzzy); -// } + if (range.valueSet != null && !range.valueSet.isEmpty()) { + fuzzyValues.put(col, range.valueSet); } } - return new GTScanRange(pkStart, pkEnd, hbaseFuzzyKeys); + fuzzyKeys = buildFuzzyKeys(fuzzyValues); + + return new GTScanRange(pkStart, pkEnd, fuzzyKeys); + } + + private List<GTRecord> buildFuzzyKeys(Map<Integer, Set<ByteArray>> fuzzyValueSet) { + ArrayList<GTRecord> result = Lists.newArrayList(); + + if (fuzzyValueSet.isEmpty()) + return result; + + // debug/profiling purpose + if (BackdoorToggles.getDisableFuzzyKey()) { + logger.info("The execution of this query will not use fuzzy key"); + return result; + } + + List<Map<Integer, ByteArray>> fuzzyValueCombinations = FuzzyValueCombination.calculate(fuzzyValueSet, MAX_HBASE_FUZZY_KEYS); + + for (Map<Integer, ByteArray> fuzzyValue : fuzzyValueCombinations) { + GTRecord fuzzy = new GTRecord(info); + BitSet bitSet = new BitSet(info.getColumnCount()); + for (Map.Entry<Integer, ByteArray> entry : fuzzyValue.entrySet()) { + bitSet.set(entry.getKey()); + fuzzy.set(entry.getKey(), entry.getValue()); + } + fuzzy.maskForEqualHashComp(new ImmutableBitSet(bitSet)); + result.add(fuzzy); + } + return result; } private TupleFilter flattenToOrAndFilter(TupleFilter filter) { @@ -252,8 +302,8 @@ public class GTScanRangePlanner { boolean hasNonFuzzyRange = false; for (GTScanRange range : ranges) { - hasNonFuzzyRange = hasNonFuzzyRange || range.hbaseFuzzyKeys.isEmpty(); - newFuzzyKeys.addAll(range.hbaseFuzzyKeys); + hasNonFuzzyRange = hasNonFuzzyRange || range.fuzzyKeys.isEmpty(); + newFuzzyKeys.addAll(range.fuzzyKeys); end = recordUnknownIsBigger.max(end, range.pkEnd); } http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/18e7999d/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java index 2b31e70..c81dd63 100644 --- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java +++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java @@ -2,6 +2,7 @@ package org.apache.kylin.gridtable; import java.io.IOException; import java.util.Arrays; +import java.util.List; import java.util.Set; import org.apache.kylin.common.util.ImmutableBitSet; @@ -88,7 +89,7 @@ public class GTScanRequest { } private void validateFilterPushDown(GTInfo info) { - if (hasFilterPushDown() == false) + if (!hasFilterPushDown()) return; Set<TblColRef> filterColumns = Sets.newHashSet(); @@ -102,7 +103,7 @@ public class GTScanRequest { } // un-evaluatable filter must be removed - if (TupleFilter.isEvaluableRecursively(filterPushDown) == false) { + if (!TupleFilter.isEvaluableRecursively(filterPushDown)) { Set<TblColRef> unevaluableColumns = Sets.newHashSet(); filterPushDown = GTUtil.convertFilterUnevaluatable(filterPushDown, info, unevaluableColumns); @@ -147,6 +148,10 @@ public class GTScanRequest { return range.pkEnd; } + public List<GTRecord> getFuzzyKeys() { + return range.fuzzyKeys; + } + public ImmutableBitSet getSelectedColBlocks() { return selectedColBlocks; } http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/18e7999d/core-cube/src/test/java/org/apache/kylin/gridtable/DictGridTableTest.java ---------------------------------------------------------------------- diff --git a/core-cube/src/test/java/org/apache/kylin/gridtable/DictGridTableTest.java b/core-cube/src/test/java/org/apache/kylin/gridtable/DictGridTableTest.java index 684f0ef..c991e66 100644 --- a/core-cube/src/test/java/org/apache/kylin/gridtable/DictGridTableTest.java +++ b/core-cube/src/test/java/org/apache/kylin/gridtable/DictGridTableTest.java @@ -70,7 +70,7 @@ public class DictGridTableTest { private void verifyScanRangePlanner(GridTable table) { GTInfo info = table.getInfo(); - GTScanRangePlanner planner = new GTScanRangePlanner(info); + GTScanRangePlanner planner = new GTScanRangePlanner(info, null, null); CompareTupleFilter timeComp1 = compare(info.colRef(0), FilterOperatorEnum.GT, enc(info, 0, "2015-01-14")); CompareTupleFilter timeComp2 = compare(info.colRef(0), FilterOperatorEnum.LT, enc(info, 0, "2015-01-13")); @@ -87,7 +87,7 @@ public class DictGridTableTest { List<GTScanRange> r = planner.planScanRanges(filter); assertEquals(1, r.size()); assertEquals("[1421193600000, 10]-[null, null]", r.get(0).toString()); - assertEquals("[[10], [20]]", r.get(0).hbaseFuzzyKeys.toString()); + assertEquals("[[10], [20]]", r.get(0).fuzzyKeys.toString()); } // pre-evaluate ever false http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/18e7999d/core-storage/src/main/java/org/apache/kylin/storage/translate/FuzzyValueCombination.java ---------------------------------------------------------------------- diff --git a/core-storage/src/main/java/org/apache/kylin/storage/translate/FuzzyValueCombination.java b/core-storage/src/main/java/org/apache/kylin/storage/translate/FuzzyValueCombination.java index fbc6d19..1e05eb8 100644 --- a/core-storage/src/main/java/org/apache/kylin/storage/translate/FuzzyValueCombination.java +++ b/core-storage/src/main/java/org/apache/kylin/storage/translate/FuzzyValueCombination.java @@ -32,23 +32,21 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; -/** - * @author yangli9 - * - */ public class FuzzyValueCombination { - private static class Dim { + private static class Dim<E> { TblColRef col; - Set<String> values; + Set<E> values; } - private static final Set<String> SINGLE_NULL_SET = Sets.newHashSet(); + private static final Set SINGLE_NULL_SET = Sets.newHashSet(); + static { SINGLE_NULL_SET.add(null); } - public static List<Map<TblColRef, String>> calculate(Map<TblColRef, Set<String>> fuzzyValues, long cap) { + public static <E> List<Map<TblColRef, E>> calculate(Map<TblColRef, Set<E>> fuzzyValues, long cap) { + Collections.emptyMap(); Dim[] dims = toDims(fuzzyValues); // If a query has many IN clause and each IN clause has many values, then it will easily generate // thousands of fuzzy keys. When there are lots of fuzzy keys, the scan performance is bottle necked @@ -61,9 +59,9 @@ public class FuzzyValueCombination { } @SuppressWarnings("unchecked") - private static List<Map<TblColRef, String>> combination(Dim[] dims) { + private static <E> List<Map<TblColRef, E>> combination(Dim[] dims) { - List<Map<TblColRef, String>> result = Lists.newArrayList(); + List<Map<TblColRef, E>> result = Lists.newArrayList(); int emptyDims = 0; for (Dim dim : dims) { @@ -76,8 +74,8 @@ public class FuzzyValueCombination { return result; } - Map<TblColRef, String> r = Maps.newHashMap(); - Iterator<String>[] iters = new Iterator[dims.length]; + Map<TblColRef, E> r = Maps.newHashMap(); + Iterator<E>[] iters = new Iterator[dims.length]; int level = 0; while (true) { Dim dim = dims[level]; @@ -85,7 +83,7 @@ public class FuzzyValueCombination { iters[level] = dim.values.iterator(); } - Iterator<String> it = iters[level]; + Iterator<E> it = iters[level]; if (it.hasNext() == false) { if (level == 0) break; @@ -97,7 +95,7 @@ public class FuzzyValueCombination { r.put(dim.col, it.next()); if (level == dims.length - 1) { - result.add(new HashMap<TblColRef, String>(r)); + result.add(new HashMap<TblColRef, E>(r)); } else { level++; } @@ -105,10 +103,10 @@ public class FuzzyValueCombination { return result; } - private static Dim[] toDims(Map<TblColRef, Set<String>> fuzzyValues) { + private static <E> Dim[] toDims(Map<TblColRef, Set<E>> fuzzyValues) { Dim[] dims = new Dim[fuzzyValues.size()]; int i = 0; - for (Entry<TblColRef, Set<String>> entry : fuzzyValues.entrySet()) { + for (Entry<TblColRef, Set<E>> entry : fuzzyValues.entrySet()) { dims[i] = new Dim(); dims[i].col = entry.getKey(); dims[i].values = entry.getValue(); http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/18e7999d/dev-support/test_all.sh ---------------------------------------------------------------------- diff --git a/dev-support/test_all.sh b/dev-support/test_all.sh new file mode 100644 index 0000000..6a7b887 --- /dev/null +++ b/dev-support/test_all.sh @@ -0,0 +1,11 @@ +#!/bin/bash + +dir=$(dirname ${0}) +cd ${dir} +cd .. + +mvn clean install -DskipTests | tee mci.log +mvn test -Dtest=org.apache.kylin.job.BuildCubeWithEngineTest -DfailIfNoTests=false -P sandbox | tee BuildCubeWithEngineTest.log +mvn test -Dtest=org.apache.kylin.job.BuildIIWithStreamTest -DfailIfNoTests=false -P sandbox | tee BuildIIWithStreamTest.log +mvn test -Dtest=org.apache.kylin.job.BuildCubeWithStreamTest -DfailIfNoTests=false -P sandbox | tee BuildCubeWithStreamTest.log +mvn verify -fae -P sandbox | tee mvnverify.log \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/18e7999d/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MapContextGTRecordWriter.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MapContextGTRecordWriter.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MapContextGTRecordWriter.java index 7510c40..86e2f07 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MapContextGTRecordWriter.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MapContextGTRecordWriter.java @@ -72,7 +72,7 @@ public class MapContextGTRecordWriter implements ICuboidWriter { short cuboidShardNum = cubeSegment.getCuboidShardNum(cuboidId); short shardOffset = ShardingHash.getShard(keyBuf, header, offSet - header, cuboidShardNum); short cuboidShardBase = cubeSegment.getCuboidBaseShard(cuboidId); - short finalShard = ShardingHash.getShard(cuboidShardBase, shardOffset, cubeSegment.getTotalShards()); + short finalShard = ShardingHash.normalize(cuboidShardBase, shardOffset, cubeSegment.getTotalShards()); BytesUtil.writeShort(finalShard, keyBuf, 0, RowConstants.ROWKEY_SHARDID_LEN); //output measures http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/18e7999d/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidMapper.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidMapper.java index e7db1fb..2180dd6 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidMapper.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/NDCuboidMapper.java @@ -41,8 +41,6 @@ import org.apache.kylin.metadata.model.SegmentStatusEnum; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.base.Preconditions; - /** * @author George Song (ysong1) * @@ -117,7 +115,7 @@ public class NDCuboidMapper extends KylinMapper<Text, Text, Text, Text> { //fill shard short cuboidShardNum = cubeSegment.getCuboidShardNum(childCuboidId); short shardOffset = ShardingHash.getShard(keyBuf, bodyOffset, offset - bodyOffset, cuboidShardNum); - short finalShard = ShardingHash.getShard(cubeSegment.getCuboidBaseShard(childCuboidId), shardOffset, cubeSegment.getTotalShards()); + short finalShard = ShardingHash.normalize(cubeSegment.getCuboidBaseShard(childCuboidId), shardOffset, cubeSegment.getTotalShards()); BytesUtil.writeShort(finalShard, keyBuf, 0, RowConstants.ROWKEY_SHARDID_LEN); return offset; http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/18e7999d/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java index e0efb10..2e58644 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java @@ -39,7 +39,6 @@ import org.apache.hadoop.hbase.ipc.BlockingRpcCallback; import org.apache.hadoop.hbase.ipc.ServerRpcController; import org.apache.kylin.common.util.CompressionUtils; import org.apache.kylin.common.util.ImmutableBitSet; -import org.apache.kylin.common.util.Pair; import org.apache.kylin.cube.CubeSegment; import org.apache.kylin.cube.cuboid.Cuboid; import org.apache.kylin.cube.util.KryoUtils; @@ -132,9 +131,8 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC { // globally shared connection, does not require close HConnection hbaseConn = HBaseConnection.get(cubeSeg.getCubeInstance().getConfig().getStorageUrl()); final HTableInterface hbaseTable = hbaseConn.getTable(cubeSeg.getStorageLocationIdentifier()); - final List<Pair<byte[], byte[]>> hbaseColumns = makeHBaseColumns(selectedColBlocks); - List<RawScan> rawScans = prepareRawScan(scanRequest.getPkStart(), scanRequest.getPkEnd(), hbaseColumns); + List<RawScan> rawScans = preparedHBaseScan(scanRequest.getPkStart(), scanRequest.getPkEnd(), scanRequest.getFuzzyKeys(), selectedColBlocks); byte[] scanRequestBytes = KryoUtils.serialize(scanRequest); final ByteString scanRequestBytesString = ByteString.copyFrom(scanRequestBytes); @@ -142,6 +140,10 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC { ExecutorService executorService = Executors.newFixedThreadPool(rawScans.size()); final List<byte[]> rowBlocks = Collections.synchronizedList(Lists.<byte[]> newArrayList()); + for (RawScan rawScan : rawScans) { + logScan(rawScan, cubeSeg.getStorageLocationIdentifier()); + } + for (final RawScan rawScan : rawScans) { executorService.submit(new Runnable() { @Override http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/18e7999d/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java index 0a9c1d4..cc7ec4f 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java @@ -2,6 +2,7 @@ package org.apache.kylin.storage.hbase.cube.v2; import java.io.IOException; import java.util.Arrays; +import java.util.Collections; import java.util.List; import org.apache.hadoop.hbase.Cell; @@ -28,6 +29,7 @@ import org.apache.kylin.gridtable.IGTScanner; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.base.Preconditions; import com.google.common.collect.Lists; public abstract class CubeHBaseRPC { @@ -70,38 +72,77 @@ public abstract class CubeHBaseRPC { return scan; } - protected List<RawScan> prepareRawScan(GTRecord pkStart, GTRecord pkEnd, List<Pair<byte[], byte[]>> selectedColumns) { + protected List<RawScan> preparedHBaseScan(GTRecord pkStart, GTRecord pkEnd, List<GTRecord> fuzzyKeys, ImmutableBitSet selectedColBlocks) { + final List<Pair<byte[], byte[]>> selectedColumns = makeHBaseColumns(selectedColBlocks); List<RawScan> ret = Lists.newArrayList(); - byte[] start = makeRowKeyToScan(pkStart, (byte) 0x00); - byte[] end = makeRowKeyToScan(pkEnd, (byte) 0xff); - - //TODO fuzzy match + byte[] start = makeRowKeyToScan(pkStart, RowConstants.ROWKEY_LOWER_BYTE); + byte[] end = makeRowKeyToScan(pkEnd, RowConstants.ROWKEY_UPPER_BYTE); + List<Pair<byte[], byte[]>> hbaseFuzzyKeys = translateFuzzyKeys(fuzzyKeys); short cuboidShardNum = cubeSeg.getCuboidShardNum(cuboid.getId()); for (short i = 0; i < cuboidShardNum; ++i) { - short shard = ShardingHash.getShard(cubeSeg.getCuboidBaseShard(cuboid.getId()), i, cubeSeg.getTotalShards()); + short shard = ShardingHash.normalize(cubeSeg.getCuboidBaseShard(cuboid.getId()), i, cubeSeg.getTotalShards()); byte[] shardStart = Arrays.copyOf(start, start.length); byte[] shardEnd = Arrays.copyOf(end, end.length); BytesUtil.writeShort(shard, shardStart, 0, RowConstants.ROWKEY_SHARDID_LEN); BytesUtil.writeShort(shard, shardEnd, 0, RowConstants.ROWKEY_SHARDID_LEN); - ret.add(new RawScan(shardStart, shardEnd, selectedColumns, null)); + ret.add(new RawScan(shardStart, shardEnd, selectedColumns, hbaseFuzzyKeys)); } return ret; } + /** + * translate GTRecord format fuzzy keys to hbase expected format + * @return + */ + private List<Pair<byte[], byte[]>> translateFuzzyKeys(List<GTRecord> fuzzyKeys) { + if (fuzzyKeys == null || fuzzyKeys.isEmpty()) { + return Collections.emptyList(); + } + + List<Pair<byte[], byte[]>> ret = Lists.newArrayList(); + int coreLength = fullGTInfo.getMaxColumnLength(fullGTInfo.getPrimaryKey()); + for (GTRecord gtRecordFuzzyKey : fuzzyKeys) { + byte[] hbaseFuzzyKey = new byte[coreLength + RowConstants.ROWKEY_HEADER_LEN]; + byte[] hbaseFuzzyMask = new byte[coreLength + RowConstants.ROWKEY_HEADER_LEN]; + + int pos = 0; + //shard part + Arrays.fill(hbaseFuzzyMask, pos, pos + RowConstants.ROWKEY_SHARDID_LEN, RowConstants.BYTE_ONE);//shard part should better be FIXED, for simplicity we make it non-fixed + pos += RowConstants.ROWKEY_SHARDID_LEN; + + //cuboid part + Arrays.fill(hbaseFuzzyMask, pos, pos + RowConstants.ROWKEY_CUBOIDID_LEN, RowConstants.BYTE_ZERO); + System.arraycopy(cuboid.getBytes(), 0, hbaseFuzzyKey, pos, RowConstants.ROWKEY_CUBOIDID_LEN); + pos += RowConstants.ROWKEY_CUBOIDID_LEN; + + //row key core part + ByteArray coreKey = HBaseScan.exportScanKey(gtRecordFuzzyKey, RowConstants.BYTE_ZERO); + System.arraycopy(coreKey.array(), coreKey.offset(), hbaseFuzzyKey, pos, coreKey.length()); + ByteArray coreMask = HBaseScan.exportScanMask(gtRecordFuzzyKey); + System.arraycopy(coreMask.array(), coreMask.offset(), hbaseFuzzyMask, pos, coreMask.length()); + + Preconditions.checkState(coreKey.length() == coreMask.length(), "corekey length not equal coremask length"); + pos += coreKey.length(); + Preconditions.checkState(hbaseFuzzyKey.length == pos, "HBase fuzzy key not completely populated"); + + ret.add(new Pair<byte[], byte[]>(hbaseFuzzyKey, hbaseFuzzyMask)); + } + + return ret; + } + private byte[] makeRowKeyToScan(GTRecord pkRec, byte fill) { - ByteArray pk = GTRecord.exportScanKey(pkRec); - int pkMaxLen = pkRec.getInfo().getMaxColumnLength(pkRec.getInfo().getPrimaryKey()); + ByteArray pk = HBaseScan.exportScanKey(pkRec, fill); - byte[] buf = new byte[pkMaxLen + RowConstants.ROWKEY_HEADER_LEN]; + byte[] buf = new byte[pk.length() + RowConstants.ROWKEY_HEADER_LEN]; Arrays.fill(buf, fill); - //for scanning/reading, later all possbile shard will be applied - //BytesUtil.writeShort((short) 0, buf, 0, RowConstants.ROWKEY_SHARDID_LEN); + //for scanning/reading, later all possible shard will be applied System.arraycopy(cuboid.getBytes(), 0, buf, RowConstants.ROWKEY_SHARDID_LEN, RowConstants.ROWKEY_CUBOIDID_LEN); if (pk != null && pk.array() != null) { @@ -175,4 +216,33 @@ public abstract class CubeHBaseRPC { return result; } + protected void logScan(RawScan rawScan, String tableName) { + StringBuilder info = new StringBuilder(); + info.append("\nVisiting hbase table ").append(tableName).append(": "); + if (cuboid.requirePostAggregation()) { + info.append("cuboid require post aggregation, from "); + } else { + info.append("cuboid exact match, from "); + } + info.append(cuboid.getInputID()); + info.append(" to "); + info.append(cuboid.getId()); + info.append("\nStart: "); + info.append(rawScan.getStartKeyAsString()); + info.append(" - "); + info.append(Bytes.toStringBinary(rawScan.startKey)); + info.append("\nStop: "); + info.append(rawScan.getEndKeyAsString()); + info.append(" - "); + info.append(Bytes.toStringBinary(rawScan.endKey)); + if (rawScan.fuzzyKey != null) { + info.append("\nFuzzy key counts: " + rawScan.fuzzyKey.size()); + info.append("\nFuzzy: "); + info.append(rawScan.getFuzzyKeyAsString()); + } else { + info.append("\nNo Fuzzy Key"); + } + logger.info(info.toString()); + } + } http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/18e7999d/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java index a2ba39f..a31bcdf 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseScanRPC.java @@ -11,7 +11,6 @@ import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.kylin.common.util.ImmutableBitSet; -import org.apache.kylin.common.util.Pair; import org.apache.kylin.cube.CubeSegment; import org.apache.kylin.cube.cuboid.Cuboid; import org.apache.kylin.gridtable.GTInfo; @@ -70,19 +69,18 @@ public class CubeHBaseScanRPC extends CubeHBaseRPC { // primary key (also the 0th column block) is always selected final ImmutableBitSet selectedColBlocks = scanRequest.getSelectedColBlocks().set(0); - // globally shared connection, does not require close HConnection hbaseConn = HBaseConnection.get(cubeSeg.getCubeInstance().getConfig().getStorageUrl()); - final HTableInterface hbaseTable = hbaseConn.getTable(cubeSeg.getStorageLocationIdentifier()); - final List<Pair<byte[], byte[]>> hbaseColumns = makeHBaseColumns(selectedColBlocks); - List<RawScan> rawScans = prepareRawScan(scanRequest.getPkStart(), scanRequest.getPkEnd(), hbaseColumns); + List<RawScan> rawScans = preparedHBaseScan(scanRequest.getPkStart(), scanRequest.getPkEnd(), scanRequest.getFuzzyKeys(), selectedColBlocks); final List<ResultScanner> scanners = Lists.newArrayList(); final List<Iterator<Result>> resultIterators = Lists.newArrayList(); for (RawScan rawScan : rawScans) { + + logScan(rawScan, cubeSeg.getStorageLocationIdentifier()); Scan hbaseScan = buildScan(rawScan); final ResultScanner scanner = hbaseTable.getScanner(hbaseScan); @@ -119,7 +117,7 @@ public class CubeHBaseScanRPC extends CubeHBaseRPC { } }; - IGTStore store = new HBaseReadonlyStore(cellListIterator, scanRequest, hbaseColumns); + IGTStore store = new HBaseReadonlyStore(cellListIterator, scanRequest, rawScans.get(0).hbaseColumns); IGTScanner rawScanner = store.scan(scanRequest); final IGTScanner decorateScanner = scanRequest.decorateScanner(rawScanner); http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/18e7999d/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeSegmentScanner.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeSegmentScanner.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeSegmentScanner.java index 756f8d6..d49de56 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeSegmentScanner.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeSegmentScanner.java @@ -1,6 +1,7 @@ package org.apache.kylin.storage.hbase.cube.v2; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.BitSet; import java.util.Collection; import java.util.Collections; @@ -10,7 +11,10 @@ import java.util.List; import java.util.NoSuchElementException; import java.util.Set; +import org.apache.kylin.common.util.ByteArray; +import org.apache.kylin.common.util.DateFormat; import org.apache.kylin.common.util.ImmutableBitSet; +import org.apache.kylin.common.util.Pair; import org.apache.kylin.cube.CubeSegment; import org.apache.kylin.cube.cuboid.Cuboid; import org.apache.kylin.cube.gridtable.CubeGridTable; @@ -57,8 +61,18 @@ public class CubeSegmentScanner implements IGTScanner { ImmutableBitSet gtAggrMetrics = makeGridTableColumns(mapping, metrics); String[] gtAggrFuncs = makeAggrFuncs(mapping, metrics); - //TODO: should remove this in endpoint scenario - GTScanRangePlanner scanRangePlanner = new GTScanRangePlanner(info); + GTScanRangePlanner scanRangePlanner; + if (cubeSeg.getCubeDesc().getModel().getPartitionDesc().isPartitioned()) { + TblColRef tblColRef = cubeSeg.getCubeDesc().getModel().getPartitionDesc().getPartitionDateColumnRef(); + Pair<ByteArray, ByteArray> segmentStartAndEnd = null; + int index = mapping.getIndexOf(tblColRef); + if (index >= 0) { + segmentStartAndEnd = getSegmentStartAndEnd(tblColRef, index); + } + scanRangePlanner = new GTScanRangePlanner(info, segmentStartAndEnd, tblColRef); + } else { + scanRangePlanner = new GTScanRangePlanner(info, null, null); + } List<GTScanRange> scanRanges = scanRangePlanner.planScanRanges(gtFilter, MAX_SCAN_RANGES); scanRequests = Lists.newArrayListWithCapacity(scanRanges.size()); @@ -73,6 +87,43 @@ public class CubeSegmentScanner implements IGTScanner { scanner = new Scanner(); } + private Pair<ByteArray, ByteArray> getSegmentStartAndEnd(TblColRef tblColRef, int index) { + + String partitionColType = tblColRef.getColumnDesc().getDatatype(); + + ByteArray start; + if (cubeSeg.getDateRangeStart() != Long.MIN_VALUE) { + start = translateTsToString(cubeSeg.getDateRangeStart(), partitionColType, index); + } else { + start = new ByteArray(); + } + + ByteArray end; + if (cubeSeg.getDateRangeEnd() != Long.MAX_VALUE) { + end = translateTsToString(cubeSeg.getDateRangeEnd(), partitionColType, index); + } else { + end = new ByteArray(); + } + return Pair.newPair(start, end); + + } + + private ByteArray translateTsToString(long ts, String partitionColType, int index) { + String value; + if ("date".equalsIgnoreCase(partitionColType)) { + value = DateFormat.formatToDateStr(ts); + } else if ("timestamp".equalsIgnoreCase(partitionColType)) { + value = DateFormat.formatToTimeWithoutMilliStr(ts); + } else { + throw new RuntimeException("Type " + partitionColType + " is not valid partition column type"); + } + + ByteBuffer buffer = ByteBuffer.allocate(info.getMaxColumnLength()); + info.getCodeSystem().encodeColumnValue(index, value, buffer); + + return ByteArray.copyOf(buffer.array(), 0, buffer.position()); + } + private Set<TblColRef> replaceDerivedColumns(Set<TblColRef> input, CubeDesc cubeDesc) { Set<TblColRef> ret = Sets.newHashSet(); for (TblColRef col : input) { @@ -150,8 +201,6 @@ public class CubeSegmentScanner implements IGTScanner { return scanner.getScannedRowCount(); } - - private class Scanner { final IGTScanner[] inputScanners = new IGTScanner[scanRequests.size()]; int cur = 0; @@ -171,10 +220,13 @@ public class CubeSegmentScanner implements IGTScanner { return false; try { - CubeHBaseRPC rpc = new CubeHBaseEndpointRPC(cubeSeg, cuboid, info); + + //CubeHBaseRPC rpc = new CubeHBaseEndpointRPC(cubeSeg, cuboid, info); + CubeHBaseRPC rpc = new CubeHBaseScanRPC(cubeSeg, cuboid, info); + //change previous line to CubeHBaseRPC rpc = new CubeHBaseScanRPC(cubeSeg, cuboid, info); //to debug locally - + inputScanners[cur] = rpc.getGTScanner(scanRequests.get(cur)); curIterator = inputScanners[cur].iterator(); } catch (IOException e) { http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/18e7999d/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/HBaseScan.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/HBaseScan.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/HBaseScan.java new file mode 100644 index 0000000..7667830 --- /dev/null +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/HBaseScan.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.storage.hbase.cube.v2; + +import java.util.Arrays; + +import org.apache.kylin.common.util.ByteArray; +import org.apache.kylin.cube.kv.RowConstants; +import org.apache.kylin.gridtable.GTInfo; +import org.apache.kylin.gridtable.GTRecord; + +import com.google.common.base.Preconditions; + +public class HBaseScan { + + /** + * every column in scan key is fixed length. for empty values, 0 zero will be populated + */ + public static ByteArray exportScanKey(GTRecord rec, byte fill) { + + Preconditions.checkNotNull(rec); + + GTInfo info = rec.getInfo(); + int len = info.getMaxColumnLength(info.getPrimaryKey()); + ByteArray buf = ByteArray.allocate(len); + int pos = 0; + for (int i = 0; i < info.getPrimaryKey().trueBitCount(); i++) { + int c = info.getPrimaryKey().trueBitAt(i); + int colLength = info.getCodeSystem().maxCodeLength(c); + + if (rec.get(c).array() != null) { + Preconditions.checkArgument(colLength == rec.get(c).length(), "ColLength :" + colLength + " not equals cols[c] length: " + rec.get(c).length() + " c is " + c); + System.arraycopy(rec.get(c).array(), rec.get(c).offset(), buf.array(), buf.offset() + pos, rec.get(c).length()); + } else { + Arrays.fill(buf.array(), buf.offset() + pos, buf.offset() + pos + colLength, fill); + } + pos += colLength; + } + buf.setLength(pos); + + return buf; + } + + /** + * every column in scan key is fixed length. for fixed columns, 0 will be populated, for non-fixed columns, 1 will be populated + */ + public static ByteArray exportScanMask(GTRecord rec) { + Preconditions.checkNotNull(rec); + + GTInfo info = rec.getInfo(); + int len = info.getMaxColumnLength(info.getPrimaryKey()); + ByteArray buf = ByteArray.allocate(len); + byte fill; + + int pos = 0; + for (int i = 0; i < info.getPrimaryKey().trueBitCount(); i++) { + int c = info.getPrimaryKey().trueBitAt(i); + int colLength = info.getCodeSystem().maxCodeLength(c); + + if (rec.get(c).array() != null) { + fill = RowConstants.BYTE_ZERO; + } else { + fill = RowConstants.BYTE_ONE; + } + Arrays.fill(buf.array(), buf.offset() + pos, buf.offset() + pos + colLength, fill); + pos += colLength; + } + buf.setLength(pos); + + return buf; + } +} http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/18e7999d/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/RawScan.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/RawScan.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/RawScan.java index aa73927..0184908 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/RawScan.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/RawScan.java @@ -20,6 +20,7 @@ package org.apache.kylin.storage.hbase.cube.v2; import java.util.List; +import org.apache.kylin.common.util.BytesUtil; import org.apache.kylin.common.util.Pair; public class RawScan { @@ -37,4 +38,23 @@ public class RawScan { this.fuzzyKey = fuzzyKey; } + public String getStartKeyAsString() { + return BytesUtil.toHex(this.startKey); + } + + public String getEndKeyAsString() { + return BytesUtil.toHex(this.endKey); + } + + public String getFuzzyKeyAsString() { + StringBuilder buf = new StringBuilder(); + for (Pair<byte[], byte[]> fuzzyKey : this.fuzzyKey) { + buf.append(BytesUtil.toHex(fuzzyKey.getFirst())); + buf.append(" "); + buf.append(BytesUtil.toHex(fuzzyKey.getSecond())); + buf.append(System.lineSeparator()); + } + return buf.toString(); + } + } http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/18e7999d/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseCuboidWriter.java ---------------------------------------------------------------------- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseCuboidWriter.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseCuboidWriter.java index bc0abc0..8f77f87 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseCuboidWriter.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseCuboidWriter.java @@ -112,7 +112,7 @@ public final class HBaseCuboidWriter implements ICuboidWriter { short shardOffset = ShardingHash.getShard(byteBuffer.array(), // RowConstants.ROWKEY_HEADER_LEN, byteBuffer.position() - RowConstants.ROWKEY_HEADER_LEN, cuboidShardNum); Short cuboidShardBase = cubeSegment.getCuboidBaseShard(cuboidId); - short finalShard = ShardingHash.getShard(cuboidShardBase, shardOffset, cubeSegment.getTotalShards()); + short finalShard = ShardingHash.normalize(cuboidShardBase, shardOffset, cubeSegment.getTotalShards()); BytesUtil.writeShort(finalShard, byteBuffer.array(), 0, RowConstants.ROWKEY_SHARDID_LEN); return byteBuffer;