http://git-wip-us.apache.org/repos/asf/kylin/blob/826f23f1/core-dictionary/src/main/java/org/apache/kylin/dict/TrieDictionaryBuilder.java ---------------------------------------------------------------------- diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/TrieDictionaryBuilder.java b/core-dictionary/src/main/java/org/apache/kylin/dict/TrieDictionaryBuilder.java index 916c369..598865b 100644 --- a/core-dictionary/src/main/java/org/apache/kylin/dict/TrieDictionaryBuilder.java +++ b/core-dictionary/src/main/java/org/apache/kylin/dict/TrieDictionaryBuilder.java @@ -1,512 +1,512 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package org.apache.kylin.dict; - -import java.io.ByteArrayOutputStream; -import java.io.DataOutputStream; -import java.io.IOException; -import java.io.PrintStream; -import java.io.UnsupportedEncodingException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.IdentityHashMap; -import java.util.LinkedList; - -import org.apache.kylin.common.util.BytesUtil; - -/** - * Builds a dictionary using Trie structure. All values are taken in byte[] form - * and organized in a Trie with ordering. Then numeric IDs are assigned in - * sequence. - * - * @author yangli9 - */ -public class TrieDictionaryBuilder<T> { - - private static final int _2GB = 2000000000; - - public static class Node { - public byte[] part; - public boolean isEndOfValue; - public ArrayList<Node> children; - - public int nValuesBeneath; // only present after stats() - - Node(byte[] value, boolean isEndOfValue) { - reset(value, isEndOfValue); - } - - Node(byte[] value, boolean isEndOfValue, ArrayList<Node> children) { - reset(value, isEndOfValue, children); - } - - void reset(byte[] value, boolean isEndOfValue) { - reset(value, isEndOfValue, new ArrayList<Node>()); - } - - void reset(byte[] value, boolean isEndOfValue, ArrayList<Node> children) { - this.part = value; - this.isEndOfValue = isEndOfValue; - this.children = children; - } - } - - public static interface Visitor { - void visit(Node n, int level); - } - - // ============================================================================ - - private Node root; - private BytesConverter<T> bytesConverter; - - public TrieDictionaryBuilder(BytesConverter<T> bytesConverter) { - this.root = new Node(new byte[0], false); - this.bytesConverter = bytesConverter; - } - - public void addValue(T value) { - addValue(bytesConverter.convertToBytes(value)); - } - - // add a converted value (given in byte[] format), use with care, for internal only - void addValue(byte[] value) { - addValueR(root, value, 0); - } - - private void addValueR(Node node, byte[] value, int start) { - // match the value part of current node - int i = 0, j = start; - int n = node.part.length, nn = value.length; - int comp = 0; - for (; i < n && j < nn; i++, j++) { - comp = BytesUtil.compareByteUnsigned(node.part[i], value[j]); - if (comp != 0) - break; - } - - if (j == nn) { - // if value fully matched within the current node - if (i == n) { - // if equals to current node, just mark end of value - node.isEndOfValue = true; - } else { - // otherwise, split the current node into two - Node c = new Node(BytesUtil.subarray(node.part, i, n), node.isEndOfValue, node.children); - node.reset(BytesUtil.subarray(node.part, 0, i), true); - node.children.add(c); - } - return; - } - - // if partially matched the current, split the current node, add the new value, make a 3-way - if (i < n) { - Node c1 = new Node(BytesUtil.subarray(node.part, i, n), node.isEndOfValue, node.children); - Node c2 = new Node(BytesUtil.subarray(value, j, nn), true); - node.reset(BytesUtil.subarray(node.part, 0, i), false); - if (comp < 0) { - node.children.add(c1); - node.children.add(c2); - } else { - node.children.add(c2); - node.children.add(c1); - } - return; - } - - // out matched the current, binary search the next byte for a child node to continue - byte lookfor = value[j]; - int lo = 0; - int hi = node.children.size() - 1; - int mid = 0; - boolean found = false; - comp = 0; - while (!found && lo <= hi) { - mid = lo + (hi - lo) / 2; - comp = BytesUtil.compareByteUnsigned(lookfor, node.children.get(mid).part[0]); - if (comp < 0) - hi = mid - 1; - else if (comp > 0) - lo = mid + 1; - else - found = true; - } - if (found) { - // found a child node matching the first byte, continue in that child - addValueR(node.children.get(mid), value, j); - } else { - // otherwise, make the value a new child - Node c = new Node(BytesUtil.subarray(value, j, nn), true); - node.children.add(comp <= 0 ? mid : mid + 1, c); - } - } - - public void traverse(Visitor visitor) { - traverseR(root, visitor, 0); - } - - private void traverseR(Node node, Visitor visitor, int level) { - visitor.visit(node, level); - for (Node c : node.children) - traverseR(c, visitor, level + 1); - } - - public void traversePostOrder(Visitor visitor) { - traversePostOrderR(root, visitor, 0); - } - - private void traversePostOrderR(Node node, Visitor visitor, int level) { - for (Node c : node.children) - traversePostOrderR(c, visitor, level + 1); - visitor.visit(node, level); - } - - public static class Stats { - public int nValues; // number of values in total - public int nValueBytesPlain; // number of bytes for all values - // uncompressed - public int nValueBytesCompressed; // number of values bytes in Trie - // (compressed) - public int maxValueLength; // size of longest value in bytes - - // the trie is multi-byte-per-node - public int mbpn_nNodes; // number of nodes in trie - public int mbpn_trieDepth; // depth of trie - public int mbpn_maxFanOut; // the maximum no. children - public long mbpn_nChildLookups; // number of child lookups during lookup every value once - public long mbpn_nTotalFanOut; // the sum of fan outs during lookup every value once - public int mbpn_sizeValueTotal; // the sum of value space in all nodes - public int mbpn_sizeNoValueBytes; // size of field noValueBytes - public int mbpn_sizeNoValueBeneath; // size of field noValuesBeneath, depends on cardinality - public int mbpn_sizeChildOffset; // size of field childOffset, points to first child in flattened array - public long mbpn_footprint; // MBPN footprint in bytes - - // stats for one-byte-per-node as well, so there's comparison - public int obpn_sizeValue; // size of value per node, always 1 - public int obpn_sizeNoValuesBeneath; // size of field noValuesBeneath, depends on cardinality - public int obpn_sizeChildCount; // size of field childCount, enables binary search among children - public int obpn_sizeChildOffset; // size of field childOffset, points to first child in flattened array - public int obpn_nNodes; // no. nodes in OBPN trie - public long obpn_footprint; // OBPN footprint in bytes - - public void print() { - PrintStream out = System.out; - out.println("============================================================================"); - out.println("No. values: " + nValues); - out.println("No. bytes raw: " + nValueBytesPlain); - out.println("No. bytes in trie: " + nValueBytesCompressed); - out.println("Longest value length: " + maxValueLength); - - // flatten trie footprint calculation, case of One-Byte-Per-Node - out.println("----------------------------------------------------------------------------"); - out.println("OBPN node size: " + (obpn_sizeValue + obpn_sizeNoValuesBeneath + obpn_sizeChildCount + obpn_sizeChildOffset) + " = " + obpn_sizeValue + " + " + obpn_sizeNoValuesBeneath + " + " + obpn_sizeChildCount + " + " + obpn_sizeChildOffset); - out.println("OBPN no. nodes: " + obpn_nNodes); - out.println("OBPN trie depth: " + maxValueLength); - out.println("OBPN footprint: " + obpn_footprint + " in bytes"); - - // flatten trie footprint calculation, case of Multi-Byte-Per-Node - out.println("----------------------------------------------------------------------------"); - out.println("MBPN max fan out: " + mbpn_maxFanOut); - out.println("MBPN no. child lookups: " + mbpn_nChildLookups); - out.println("MBPN total fan out: " + mbpn_nTotalFanOut); - out.println("MBPN average fan out: " + (double) mbpn_nTotalFanOut / mbpn_nChildLookups); - out.println("MBPN values size total: " + mbpn_sizeValueTotal); - out.println("MBPN node size: " + (mbpn_sizeNoValueBytes + mbpn_sizeNoValueBeneath + mbpn_sizeChildOffset) + " = " + mbpn_sizeNoValueBytes + " + " + mbpn_sizeNoValueBeneath + " + " + mbpn_sizeChildOffset); - out.println("MBPN no. nodes: " + mbpn_nNodes); - out.println("MBPN trie depth: " + mbpn_trieDepth); - out.println("MBPN footprint: " + mbpn_footprint + " in bytes"); - } - } - - /** out print some statistics of the trie and the dictionary built from it */ - public Stats stats() { - // calculate nEndValueBeneath - traversePostOrder(new Visitor() { - @Override - public void visit(Node n, int level) { - n.nValuesBeneath = n.isEndOfValue ? 1 : 0; - for (Node c : n.children) - n.nValuesBeneath += c.nValuesBeneath; - } - }); - - // run stats - final Stats s = new Stats(); - final ArrayList<Integer> lenAtLvl = new ArrayList<Integer>(); - traverse(new Visitor() { - @Override - public void visit(Node n, int level) { - if (n.isEndOfValue) - s.nValues++; - s.nValueBytesPlain += n.part.length * n.nValuesBeneath; - s.nValueBytesCompressed += n.part.length; - s.mbpn_nNodes++; - if (s.mbpn_trieDepth < level + 1) - s.mbpn_trieDepth = level + 1; - if (n.children.size() > 0) { - if (s.mbpn_maxFanOut < n.children.size()) - s.mbpn_maxFanOut = n.children.size(); - int childLookups = n.nValuesBeneath - (n.isEndOfValue ? 1 : 0); - s.mbpn_nChildLookups += childLookups; - s.mbpn_nTotalFanOut += childLookups * n.children.size(); - } - - if (level < lenAtLvl.size()) - lenAtLvl.set(level, n.part.length); - else - lenAtLvl.add(n.part.length); - int lenSoFar = 0; - for (int i = 0; i <= level; i++) - lenSoFar += lenAtLvl.get(i); - if (lenSoFar > s.maxValueLength) - s.maxValueLength = lenSoFar; - } - }); - - // flatten trie footprint calculation, case of One-Byte-Per-Node - s.obpn_sizeValue = 1; - s.obpn_sizeNoValuesBeneath = BytesUtil.sizeForValue(s.nValues); - s.obpn_sizeChildCount = 1; - s.obpn_sizeChildOffset = 5; // MSB used as isEndOfValue flag - s.obpn_nNodes = s.nValueBytesCompressed; // no. nodes is the total number of compressed bytes in OBPN - s.obpn_footprint = s.obpn_nNodes * (long) (s.obpn_sizeValue + s.obpn_sizeNoValuesBeneath + s.obpn_sizeChildCount + s.obpn_sizeChildOffset); - while (true) { // minimize the offset size to match the footprint - long t = s.obpn_nNodes * (long) (s.obpn_sizeValue + s.obpn_sizeNoValuesBeneath + s.obpn_sizeChildCount + s.obpn_sizeChildOffset - 1); - if (BytesUtil.sizeForValue(t * 2) <= s.obpn_sizeChildOffset - 1) { // *2 because MSB of offset is used for isEndOfValue flag - s.obpn_sizeChildOffset--; - s.obpn_footprint = t; - } else - break; - } - - // flatten trie footprint calculation, case of Multi-Byte-Per-Node - s.mbpn_sizeValueTotal = s.nValueBytesCompressed; - s.mbpn_sizeNoValueBytes = 1; - s.mbpn_sizeNoValueBeneath = BytesUtil.sizeForValue(s.nValues); - s.mbpn_sizeChildOffset = 5; - s.mbpn_footprint = s.mbpn_sizeValueTotal + s.mbpn_nNodes * (long) (s.mbpn_sizeNoValueBytes + s.mbpn_sizeNoValueBeneath + s.mbpn_sizeChildOffset); - while (true) { // minimize the offset size to match the footprint - long t = s.mbpn_sizeValueTotal + s.mbpn_nNodes * (long) (s.mbpn_sizeNoValueBytes + s.mbpn_sizeNoValueBeneath + s.mbpn_sizeChildOffset - 1); - if (BytesUtil.sizeForValue(t * 4) <= s.mbpn_sizeChildOffset - 1) { // *4 because 2 MSB of offset is used for isEndOfValue & isEndChild flag - s.mbpn_sizeChildOffset--; - s.mbpn_footprint = t; - } else - break; - } - - return s; - } - - /** out print trie for debug */ - public void print() { - print(System.out); - } - - public void print(final PrintStream out) { - traverse(new Visitor() { - @Override - public void visit(Node n, int level) { - try { - for (int i = 0; i < level; i++) - out.print(" "); - out.print(new String(n.part, "UTF-8")); - out.print(" - "); - if (n.nValuesBeneath > 0) - out.print(n.nValuesBeneath); - if (n.isEndOfValue) - out.print("*"); - out.print("\n"); - } catch (UnsupportedEncodingException e) { - e.printStackTrace(); - } - } - }); - } - - private CompleteParts completeParts = new CompleteParts(); - - private class CompleteParts { - byte[] data = new byte[4096]; - int current = 0; - - public void append(byte[] part) { - while (current + part.length > data.length) - expand(); - - System.arraycopy(part, 0, data, current, part.length); - current += part.length; - } - - public void withdraw(int size) { - current -= size; - } - - public byte[] retrieve() { - return Arrays.copyOf(data, current); - } - - private void expand() { - byte[] temp = new byte[2 * data.length]; - System.arraycopy(data, 0, temp, 0, data.length); - data = temp; - } - } - - // there is a 255 limitation of length for each node's part. - // we interpolate nodes to satisfy this when a node's part becomes - // too long(overflow) - private void checkOverflowParts(Node node) { - LinkedList<Node> childrenCopy = new LinkedList<Node>(node.children); - for (Node child : childrenCopy) { - if (child.part.length > 255) { - byte[] first255 = Arrays.copyOf(child.part, 255); - - completeParts.append(node.part); - completeParts.append(first255); - byte[] visited = completeParts.retrieve(); - this.addValue(visited); - completeParts.withdraw(255); - completeParts.withdraw(node.part.length); - } - } - - completeParts.append(node.part); // by here the node.children may have been changed - for (Node child : node.children) { - checkOverflowParts(child); - } - completeParts.withdraw(node.part.length); - } - - /** - * Flatten the trie into a byte array for a minimized memory footprint. - * Lookup remains fast. Cost is inflexibility to modify (becomes immutable). - * - * Flattened node structure is HEAD + NODEs, for each node: - * - o byte, offset to child node, o = stats.mbpn_sizeChildOffset - * - 1 bit, isLastChild flag, the 1st MSB of o - * - 1 bit, isEndOfValue flag, the 2nd MSB of o - * - c byte, number of values beneath, c = stats.mbpn_sizeNoValueBeneath - * - 1 byte, number of value bytes - * - n byte, value bytes - */ - public TrieDictionary<T> build(int baseId) { - byte[] trieBytes = buildTrieBytes(baseId); - TrieDictionary<T> r = new TrieDictionary<T>(trieBytes); - return r; - } - - protected byte[] buildTrieBytes(int baseId) { - checkOverflowParts(this.root); - - Stats stats = stats(); - int sizeNoValuesBeneath = stats.mbpn_sizeNoValueBeneath; - int sizeChildOffset = stats.mbpn_sizeChildOffset; - +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.dict; + +import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.PrintStream; +import java.io.UnsupportedEncodingException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.IdentityHashMap; +import java.util.LinkedList; + +import org.apache.kylin.common.util.BytesUtil; + +/** + * Builds a dictionary using Trie structure. All values are taken in byte[] form + * and organized in a Trie with ordering. Then numeric IDs are assigned in + * sequence. + * + * @author yangli9 + */ +public class TrieDictionaryBuilder<T> { + + private static final int _2GB = 2000000000; + + public static class Node { + public byte[] part; + public boolean isEndOfValue; + public ArrayList<Node> children; + + public int nValuesBeneath; // only present after stats() + + Node(byte[] value, boolean isEndOfValue) { + reset(value, isEndOfValue); + } + + Node(byte[] value, boolean isEndOfValue, ArrayList<Node> children) { + reset(value, isEndOfValue, children); + } + + void reset(byte[] value, boolean isEndOfValue) { + reset(value, isEndOfValue, new ArrayList<Node>()); + } + + void reset(byte[] value, boolean isEndOfValue, ArrayList<Node> children) { + this.part = value; + this.isEndOfValue = isEndOfValue; + this.children = children; + } + } + + public static interface Visitor { + void visit(Node n, int level); + } + + // ============================================================================ + + private Node root; + private BytesConverter<T> bytesConverter; + + public TrieDictionaryBuilder(BytesConverter<T> bytesConverter) { + this.root = new Node(new byte[0], false); + this.bytesConverter = bytesConverter; + } + + public void addValue(T value) { + addValue(bytesConverter.convertToBytes(value)); + } + + // add a converted value (given in byte[] format), use with care, for internal only + void addValue(byte[] value) { + addValueR(root, value, 0); + } + + private void addValueR(Node node, byte[] value, int start) { + // match the value part of current node + int i = 0, j = start; + int n = node.part.length, nn = value.length; + int comp = 0; + for (; i < n && j < nn; i++, j++) { + comp = BytesUtil.compareByteUnsigned(node.part[i], value[j]); + if (comp != 0) + break; + } + + if (j == nn) { + // if value fully matched within the current node + if (i == n) { + // if equals to current node, just mark end of value + node.isEndOfValue = true; + } else { + // otherwise, split the current node into two + Node c = new Node(BytesUtil.subarray(node.part, i, n), node.isEndOfValue, node.children); + node.reset(BytesUtil.subarray(node.part, 0, i), true); + node.children.add(c); + } + return; + } + + // if partially matched the current, split the current node, add the new value, make a 3-way + if (i < n) { + Node c1 = new Node(BytesUtil.subarray(node.part, i, n), node.isEndOfValue, node.children); + Node c2 = new Node(BytesUtil.subarray(value, j, nn), true); + node.reset(BytesUtil.subarray(node.part, 0, i), false); + if (comp < 0) { + node.children.add(c1); + node.children.add(c2); + } else { + node.children.add(c2); + node.children.add(c1); + } + return; + } + + // out matched the current, binary search the next byte for a child node to continue + byte lookfor = value[j]; + int lo = 0; + int hi = node.children.size() - 1; + int mid = 0; + boolean found = false; + comp = 0; + while (!found && lo <= hi) { + mid = lo + (hi - lo) / 2; + comp = BytesUtil.compareByteUnsigned(lookfor, node.children.get(mid).part[0]); + if (comp < 0) + hi = mid - 1; + else if (comp > 0) + lo = mid + 1; + else + found = true; + } + if (found) { + // found a child node matching the first byte, continue in that child + addValueR(node.children.get(mid), value, j); + } else { + // otherwise, make the value a new child + Node c = new Node(BytesUtil.subarray(value, j, nn), true); + node.children.add(comp <= 0 ? mid : mid + 1, c); + } + } + + public void traverse(Visitor visitor) { + traverseR(root, visitor, 0); + } + + private void traverseR(Node node, Visitor visitor, int level) { + visitor.visit(node, level); + for (Node c : node.children) + traverseR(c, visitor, level + 1); + } + + public void traversePostOrder(Visitor visitor) { + traversePostOrderR(root, visitor, 0); + } + + private void traversePostOrderR(Node node, Visitor visitor, int level) { + for (Node c : node.children) + traversePostOrderR(c, visitor, level + 1); + visitor.visit(node, level); + } + + public static class Stats { + public int nValues; // number of values in total + public int nValueBytesPlain; // number of bytes for all values + // uncompressed + public int nValueBytesCompressed; // number of values bytes in Trie + // (compressed) + public int maxValueLength; // size of longest value in bytes + + // the trie is multi-byte-per-node + public int mbpn_nNodes; // number of nodes in trie + public int mbpn_trieDepth; // depth of trie + public int mbpn_maxFanOut; // the maximum no. children + public long mbpn_nChildLookups; // number of child lookups during lookup every value once + public long mbpn_nTotalFanOut; // the sum of fan outs during lookup every value once + public int mbpn_sizeValueTotal; // the sum of value space in all nodes + public int mbpn_sizeNoValueBytes; // size of field noValueBytes + public int mbpn_sizeNoValueBeneath; // size of field noValuesBeneath, depends on cardinality + public int mbpn_sizeChildOffset; // size of field childOffset, points to first child in flattened array + public long mbpn_footprint; // MBPN footprint in bytes + + // stats for one-byte-per-node as well, so there's comparison + public int obpn_sizeValue; // size of value per node, always 1 + public int obpn_sizeNoValuesBeneath; // size of field noValuesBeneath, depends on cardinality + public int obpn_sizeChildCount; // size of field childCount, enables binary search among children + public int obpn_sizeChildOffset; // size of field childOffset, points to first child in flattened array + public int obpn_nNodes; // no. nodes in OBPN trie + public long obpn_footprint; // OBPN footprint in bytes + + public void print() { + PrintStream out = System.out; + out.println("============================================================================"); + out.println("No. values: " + nValues); + out.println("No. bytes raw: " + nValueBytesPlain); + out.println("No. bytes in trie: " + nValueBytesCompressed); + out.println("Longest value length: " + maxValueLength); + + // flatten trie footprint calculation, case of One-Byte-Per-Node + out.println("----------------------------------------------------------------------------"); + out.println("OBPN node size: " + (obpn_sizeValue + obpn_sizeNoValuesBeneath + obpn_sizeChildCount + obpn_sizeChildOffset) + " = " + obpn_sizeValue + " + " + obpn_sizeNoValuesBeneath + " + " + obpn_sizeChildCount + " + " + obpn_sizeChildOffset); + out.println("OBPN no. nodes: " + obpn_nNodes); + out.println("OBPN trie depth: " + maxValueLength); + out.println("OBPN footprint: " + obpn_footprint + " in bytes"); + + // flatten trie footprint calculation, case of Multi-Byte-Per-Node + out.println("----------------------------------------------------------------------------"); + out.println("MBPN max fan out: " + mbpn_maxFanOut); + out.println("MBPN no. child lookups: " + mbpn_nChildLookups); + out.println("MBPN total fan out: " + mbpn_nTotalFanOut); + out.println("MBPN average fan out: " + (double) mbpn_nTotalFanOut / mbpn_nChildLookups); + out.println("MBPN values size total: " + mbpn_sizeValueTotal); + out.println("MBPN node size: " + (mbpn_sizeNoValueBytes + mbpn_sizeNoValueBeneath + mbpn_sizeChildOffset) + " = " + mbpn_sizeNoValueBytes + " + " + mbpn_sizeNoValueBeneath + " + " + mbpn_sizeChildOffset); + out.println("MBPN no. nodes: " + mbpn_nNodes); + out.println("MBPN trie depth: " + mbpn_trieDepth); + out.println("MBPN footprint: " + mbpn_footprint + " in bytes"); + } + } + + /** out print some statistics of the trie and the dictionary built from it */ + public Stats stats() { + // calculate nEndValueBeneath + traversePostOrder(new Visitor() { + @Override + public void visit(Node n, int level) { + n.nValuesBeneath = n.isEndOfValue ? 1 : 0; + for (Node c : n.children) + n.nValuesBeneath += c.nValuesBeneath; + } + }); + + // run stats + final Stats s = new Stats(); + final ArrayList<Integer> lenAtLvl = new ArrayList<Integer>(); + traverse(new Visitor() { + @Override + public void visit(Node n, int level) { + if (n.isEndOfValue) + s.nValues++; + s.nValueBytesPlain += n.part.length * n.nValuesBeneath; + s.nValueBytesCompressed += n.part.length; + s.mbpn_nNodes++; + if (s.mbpn_trieDepth < level + 1) + s.mbpn_trieDepth = level + 1; + if (n.children.size() > 0) { + if (s.mbpn_maxFanOut < n.children.size()) + s.mbpn_maxFanOut = n.children.size(); + int childLookups = n.nValuesBeneath - (n.isEndOfValue ? 1 : 0); + s.mbpn_nChildLookups += childLookups; + s.mbpn_nTotalFanOut += childLookups * n.children.size(); + } + + if (level < lenAtLvl.size()) + lenAtLvl.set(level, n.part.length); + else + lenAtLvl.add(n.part.length); + int lenSoFar = 0; + for (int i = 0; i <= level; i++) + lenSoFar += lenAtLvl.get(i); + if (lenSoFar > s.maxValueLength) + s.maxValueLength = lenSoFar; + } + }); + + // flatten trie footprint calculation, case of One-Byte-Per-Node + s.obpn_sizeValue = 1; + s.obpn_sizeNoValuesBeneath = BytesUtil.sizeForValue(s.nValues); + s.obpn_sizeChildCount = 1; + s.obpn_sizeChildOffset = 5; // MSB used as isEndOfValue flag + s.obpn_nNodes = s.nValueBytesCompressed; // no. nodes is the total number of compressed bytes in OBPN + s.obpn_footprint = s.obpn_nNodes * (long) (s.obpn_sizeValue + s.obpn_sizeNoValuesBeneath + s.obpn_sizeChildCount + s.obpn_sizeChildOffset); + while (true) { // minimize the offset size to match the footprint + long t = s.obpn_nNodes * (long) (s.obpn_sizeValue + s.obpn_sizeNoValuesBeneath + s.obpn_sizeChildCount + s.obpn_sizeChildOffset - 1); + if (BytesUtil.sizeForValue(t * 2) <= s.obpn_sizeChildOffset - 1) { // *2 because MSB of offset is used for isEndOfValue flag + s.obpn_sizeChildOffset--; + s.obpn_footprint = t; + } else + break; + } + + // flatten trie footprint calculation, case of Multi-Byte-Per-Node + s.mbpn_sizeValueTotal = s.nValueBytesCompressed; + s.mbpn_sizeNoValueBytes = 1; + s.mbpn_sizeNoValueBeneath = BytesUtil.sizeForValue(s.nValues); + s.mbpn_sizeChildOffset = 5; + s.mbpn_footprint = s.mbpn_sizeValueTotal + s.mbpn_nNodes * (long) (s.mbpn_sizeNoValueBytes + s.mbpn_sizeNoValueBeneath + s.mbpn_sizeChildOffset); + while (true) { // minimize the offset size to match the footprint + long t = s.mbpn_sizeValueTotal + s.mbpn_nNodes * (long) (s.mbpn_sizeNoValueBytes + s.mbpn_sizeNoValueBeneath + s.mbpn_sizeChildOffset - 1); + if (BytesUtil.sizeForValue(t * 4) <= s.mbpn_sizeChildOffset - 1) { // *4 because 2 MSB of offset is used for isEndOfValue & isEndChild flag + s.mbpn_sizeChildOffset--; + s.mbpn_footprint = t; + } else + break; + } + + return s; + } + + /** out print trie for debug */ + public void print() { + print(System.out); + } + + public void print(final PrintStream out) { + traverse(new Visitor() { + @Override + public void visit(Node n, int level) { + try { + for (int i = 0; i < level; i++) + out.print(" "); + out.print(new String(n.part, "UTF-8")); + out.print(" - "); + if (n.nValuesBeneath > 0) + out.print(n.nValuesBeneath); + if (n.isEndOfValue) + out.print("*"); + out.print("\n"); + } catch (UnsupportedEncodingException e) { + e.printStackTrace(); + } + } + }); + } + + private CompleteParts completeParts = new CompleteParts(); + + private class CompleteParts { + byte[] data = new byte[4096]; + int current = 0; + + public void append(byte[] part) { + while (current + part.length > data.length) + expand(); + + System.arraycopy(part, 0, data, current, part.length); + current += part.length; + } + + public void withdraw(int size) { + current -= size; + } + + public byte[] retrieve() { + return Arrays.copyOf(data, current); + } + + private void expand() { + byte[] temp = new byte[2 * data.length]; + System.arraycopy(data, 0, temp, 0, data.length); + data = temp; + } + } + + // there is a 255 limitation of length for each node's part. + // we interpolate nodes to satisfy this when a node's part becomes + // too long(overflow) + private void checkOverflowParts(Node node) { + LinkedList<Node> childrenCopy = new LinkedList<Node>(node.children); + for (Node child : childrenCopy) { + if (child.part.length > 255) { + byte[] first255 = Arrays.copyOf(child.part, 255); + + completeParts.append(node.part); + completeParts.append(first255); + byte[] visited = completeParts.retrieve(); + this.addValue(visited); + completeParts.withdraw(255); + completeParts.withdraw(node.part.length); + } + } + + completeParts.append(node.part); // by here the node.children may have been changed + for (Node child : node.children) { + checkOverflowParts(child); + } + completeParts.withdraw(node.part.length); + } + + /** + * Flatten the trie into a byte array for a minimized memory footprint. + * Lookup remains fast. Cost is inflexibility to modify (becomes immutable). + * + * Flattened node structure is HEAD + NODEs, for each node: + * - o byte, offset to child node, o = stats.mbpn_sizeChildOffset + * - 1 bit, isLastChild flag, the 1st MSB of o + * - 1 bit, isEndOfValue flag, the 2nd MSB of o + * - c byte, number of values beneath, c = stats.mbpn_sizeNoValueBeneath + * - 1 byte, number of value bytes + * - n byte, value bytes + */ + public TrieDictionary<T> build(int baseId) { + byte[] trieBytes = buildTrieBytes(baseId); + TrieDictionary<T> r = new TrieDictionary<T>(trieBytes); + return r; + } + + protected byte[] buildTrieBytes(int baseId) { + checkOverflowParts(this.root); + + Stats stats = stats(); + int sizeNoValuesBeneath = stats.mbpn_sizeNoValueBeneath; + int sizeChildOffset = stats.mbpn_sizeChildOffset; + if (stats.mbpn_footprint <= 0) // must never happen, but let us be cautious throw new IllegalStateException("Too big dictionary, dictionary cannot be bigger than 2GB"); - if (stats.mbpn_footprint > _2GB) - throw new RuntimeException("Too big dictionary, dictionary cannot be bigger than 2GB"); - - // write head - byte[] head; - try { - ByteArrayOutputStream byteBuf = new ByteArrayOutputStream(); - DataOutputStream headOut = new DataOutputStream(byteBuf); - headOut.write(TrieDictionary.MAGIC); - headOut.writeShort(0); // head size, will back fill - headOut.writeInt((int) stats.mbpn_footprint); // body size - headOut.write(sizeChildOffset); - headOut.write(sizeNoValuesBeneath); - headOut.writeShort(baseId); - headOut.writeShort(stats.maxValueLength); - headOut.writeUTF(bytesConverter == null ? "" : bytesConverter.getClass().getName()); - headOut.close(); - head = byteBuf.toByteArray(); - BytesUtil.writeUnsigned(head.length, head, TrieDictionary.MAGIC_SIZE_I, 2); - } catch (IOException e) { - throw new RuntimeException(e); // shall not happen, as we are writing in memory - } - - byte[] trieBytes = new byte[(int) stats.mbpn_footprint + head.length]; - System.arraycopy(head, 0, trieBytes, 0, head.length); - - LinkedList<Node> open = new LinkedList<Node>(); - IdentityHashMap<Node, Integer> offsetMap = new IdentityHashMap<Node, Integer>(); - - // write body - int o = head.length; - offsetMap.put(root, o); - o = build_writeNode(root, o, true, sizeNoValuesBeneath, sizeChildOffset, trieBytes); - if (root.children.isEmpty() == false) - open.addLast(root); - - while (open.isEmpty() == false) { - Node parent = open.removeFirst(); - build_overwriteChildOffset(offsetMap.get(parent), o - head.length, sizeChildOffset, trieBytes); - for (int i = 0; i < parent.children.size(); i++) { - Node c = parent.children.get(i); - boolean isLastChild = (i == parent.children.size() - 1); - offsetMap.put(c, o); - o = build_writeNode(c, o, isLastChild, sizeNoValuesBeneath, sizeChildOffset, trieBytes); - if (c.children.isEmpty() == false) - open.addLast(c); - } - } - - if (o != trieBytes.length) - throw new RuntimeException(); - return trieBytes; - } - - private void build_overwriteChildOffset(int parentOffset, int childOffset, int sizeChildOffset, byte[] trieBytes) { - int flags = (int) trieBytes[parentOffset] & (TrieDictionary.BIT_IS_LAST_CHILD | TrieDictionary.BIT_IS_END_OF_VALUE); - BytesUtil.writeUnsigned(childOffset, trieBytes, parentOffset, sizeChildOffset); - trieBytes[parentOffset] |= flags; - } - - private int build_writeNode(Node n, int offset, boolean isLastChild, int sizeNoValuesBeneath, int sizeChildOffset, byte[] trieBytes) { - int o = offset; - if (o > _2GB) - throw new IllegalStateException(); - - // childOffset - if (isLastChild) - trieBytes[o] |= TrieDictionary.BIT_IS_LAST_CHILD; - if (n.isEndOfValue) - trieBytes[o] |= TrieDictionary.BIT_IS_END_OF_VALUE; - o += sizeChildOffset; - - // nValuesBeneath - BytesUtil.writeUnsigned(n.nValuesBeneath, trieBytes, o, sizeNoValuesBeneath); - o += sizeNoValuesBeneath; - - // nValueBytes - if (n.part.length > 255) - throw new RuntimeException(); - BytesUtil.writeUnsigned(n.part.length, trieBytes, o, 1); - o++; - - // valueBytes - System.arraycopy(n.part, 0, trieBytes, o, n.part.length); - o += n.part.length; - - return o; - } - -} + if (stats.mbpn_footprint > _2GB) + throw new RuntimeException("Too big dictionary, dictionary cannot be bigger than 2GB"); + + // write head + byte[] head; + try { + ByteArrayOutputStream byteBuf = new ByteArrayOutputStream(); + DataOutputStream headOut = new DataOutputStream(byteBuf); + headOut.write(TrieDictionary.MAGIC); + headOut.writeShort(0); // head size, will back fill + headOut.writeInt((int) stats.mbpn_footprint); // body size + headOut.write(sizeChildOffset); + headOut.write(sizeNoValuesBeneath); + headOut.writeShort(baseId); + headOut.writeShort(stats.maxValueLength); + headOut.writeUTF(bytesConverter == null ? "" : bytesConverter.getClass().getName()); + headOut.close(); + head = byteBuf.toByteArray(); + BytesUtil.writeUnsigned(head.length, head, TrieDictionary.MAGIC_SIZE_I, 2); + } catch (IOException e) { + throw new RuntimeException(e); // shall not happen, as we are writing in memory + } + + byte[] trieBytes = new byte[(int) stats.mbpn_footprint + head.length]; + System.arraycopy(head, 0, trieBytes, 0, head.length); + + LinkedList<Node> open = new LinkedList<Node>(); + IdentityHashMap<Node, Integer> offsetMap = new IdentityHashMap<Node, Integer>(); + + // write body + int o = head.length; + offsetMap.put(root, o); + o = build_writeNode(root, o, true, sizeNoValuesBeneath, sizeChildOffset, trieBytes); + if (root.children.isEmpty() == false) + open.addLast(root); + + while (open.isEmpty() == false) { + Node parent = open.removeFirst(); + build_overwriteChildOffset(offsetMap.get(parent), o - head.length, sizeChildOffset, trieBytes); + for (int i = 0; i < parent.children.size(); i++) { + Node c = parent.children.get(i); + boolean isLastChild = (i == parent.children.size() - 1); + offsetMap.put(c, o); + o = build_writeNode(c, o, isLastChild, sizeNoValuesBeneath, sizeChildOffset, trieBytes); + if (c.children.isEmpty() == false) + open.addLast(c); + } + } + + if (o != trieBytes.length) + throw new RuntimeException(); + return trieBytes; + } + + private void build_overwriteChildOffset(int parentOffset, int childOffset, int sizeChildOffset, byte[] trieBytes) { + int flags = (int) trieBytes[parentOffset] & (TrieDictionary.BIT_IS_LAST_CHILD | TrieDictionary.BIT_IS_END_OF_VALUE); + BytesUtil.writeUnsigned(childOffset, trieBytes, parentOffset, sizeChildOffset); + trieBytes[parentOffset] |= flags; + } + + private int build_writeNode(Node n, int offset, boolean isLastChild, int sizeNoValuesBeneath, int sizeChildOffset, byte[] trieBytes) { + int o = offset; + if (o > _2GB) + throw new IllegalStateException(); + + // childOffset + if (isLastChild) + trieBytes[o] |= TrieDictionary.BIT_IS_LAST_CHILD; + if (n.isEndOfValue) + trieBytes[o] |= TrieDictionary.BIT_IS_END_OF_VALUE; + o += sizeChildOffset; + + // nValuesBeneath + BytesUtil.writeUnsigned(n.nValuesBeneath, trieBytes, o, sizeNoValuesBeneath); + o += sizeNoValuesBeneath; + + // nValueBytes + if (n.part.length > 255) + throw new RuntimeException(); + BytesUtil.writeUnsigned(n.part.length, trieBytes, o, 1); + o++; + + // valueBytes + System.arraycopy(n.part, 0, trieBytes, o, n.part.length); + o += n.part.length; + + return o; + } + +}
http://git-wip-us.apache.org/repos/asf/kylin/blob/826f23f1/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/LookupStringTable.java ---------------------------------------------------------------------- diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/LookupStringTable.java b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/LookupStringTable.java index 56c1c98..4b96622 100644 --- a/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/LookupStringTable.java +++ b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/LookupStringTable.java @@ -1,112 +1,112 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package org.apache.kylin.dict.lookup; - -import java.io.IOException; -import java.util.Comparator; - -import org.apache.kylin.common.util.DateFormat; -import org.apache.kylin.metadata.datatype.DataType; -import org.apache.kylin.metadata.model.ColumnDesc; -import org.apache.kylin.metadata.model.TableDesc; -import org.apache.kylin.source.ReadableTable; - -/** - * @author yangli9 - * - */ -public class LookupStringTable extends LookupTable<String> { - - private static final Comparator<String> dateStrComparator = new Comparator<String>() { - @Override - public int compare(String o1, String o2) { - long l1 = Long.parseLong(o1); - long l2 = Long.parseLong(o2); - return Long.compare(l1, l2); - } - }; - - private static final Comparator<String> numStrComparator = new Comparator<String>() { - @Override - public int compare(String o1, String o2) { - double d1 = Double.parseDouble(o1); - double d2 = Double.parseDouble(o2); - return Double.compare(d1, d2); - } - }; - - private static final Comparator<String> defaultStrComparator = new Comparator<String>() { - @Override - public int compare(String o1, String o2) { - return o1.compareTo(o2); - } - }; - - boolean[] colIsDateTime; - boolean[] colIsNumber; - - public LookupStringTable(TableDesc tableDesc, String[] keyColumns, ReadableTable table) throws IOException { - super(tableDesc, keyColumns, table); - } - - @Override - protected void init() throws IOException { - ColumnDesc[] cols = tableDesc.getColumns(); - colIsDateTime = new boolean[cols.length]; - colIsNumber = new boolean[cols.length]; - for (int i = 0; i < cols.length; i++) { - DataType t = cols[i].getType(); - colIsDateTime[i] = t.isDateTimeFamily(); - colIsNumber[i] = t.isNumberFamily(); - } - - super.init(); - } - - @Override - protected String[] convertRow(String[] cols) { - for (int i = 0; i < cols.length; i++) { - if (colIsDateTime[i]) { +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.dict.lookup; + +import java.io.IOException; +import java.util.Comparator; + +import org.apache.kylin.common.util.DateFormat; +import org.apache.kylin.metadata.datatype.DataType; +import org.apache.kylin.metadata.model.ColumnDesc; +import org.apache.kylin.metadata.model.TableDesc; +import org.apache.kylin.source.ReadableTable; + +/** + * @author yangli9 + * + */ +public class LookupStringTable extends LookupTable<String> { + + private static final Comparator<String> dateStrComparator = new Comparator<String>() { + @Override + public int compare(String o1, String o2) { + long l1 = Long.parseLong(o1); + long l2 = Long.parseLong(o2); + return Long.compare(l1, l2); + } + }; + + private static final Comparator<String> numStrComparator = new Comparator<String>() { + @Override + public int compare(String o1, String o2) { + double d1 = Double.parseDouble(o1); + double d2 = Double.parseDouble(o2); + return Double.compare(d1, d2); + } + }; + + private static final Comparator<String> defaultStrComparator = new Comparator<String>() { + @Override + public int compare(String o1, String o2) { + return o1.compareTo(o2); + } + }; + + boolean[] colIsDateTime; + boolean[] colIsNumber; + + public LookupStringTable(TableDesc tableDesc, String[] keyColumns, ReadableTable table) throws IOException { + super(tableDesc, keyColumns, table); + } + + @Override + protected void init() throws IOException { + ColumnDesc[] cols = tableDesc.getColumns(); + colIsDateTime = new boolean[cols.length]; + colIsNumber = new boolean[cols.length]; + for (int i = 0; i < cols.length; i++) { + DataType t = cols[i].getType(); + colIsDateTime[i] = t.isDateTimeFamily(); + colIsNumber[i] = t.isNumberFamily(); + } + + super.init(); + } + + @Override + protected String[] convertRow(String[] cols) { + for (int i = 0; i < cols.length; i++) { + if (colIsDateTime[i]) { if (cols[i] != null) cols[i] = String.valueOf(DateFormat.stringToMillis(cols[i])); - } - } - return cols; - } - - @Override - protected Comparator<String> getComparator(int idx) { - if (colIsDateTime[idx]) - return dateStrComparator; - else if (colIsNumber[idx]) - return numStrComparator; - else - return defaultStrComparator; - } - - @Override - protected String toString(String cell) { - return cell; - } - - public Class<?> getType() { - return String.class; - } - -} + } + } + return cols; + } + + @Override + protected Comparator<String> getComparator(int idx) { + if (colIsDateTime[idx]) + return dateStrComparator; + else if (colIsNumber[idx]) + return numStrComparator; + else + return defaultStrComparator; + } + + @Override + protected String toString(String cell) { + return cell; + } + + public Class<?> getType() { + return String.class; + } + +} http://git-wip-us.apache.org/repos/asf/kylin/blob/826f23f1/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/LookupTable.java ---------------------------------------------------------------------- diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/LookupTable.java b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/LookupTable.java index 9aae755..cd700e9 100644 --- a/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/LookupTable.java +++ b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/LookupTable.java @@ -1,180 +1,180 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package org.apache.kylin.dict.lookup; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Comparator; -import java.util.List; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; - +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.dict.lookup; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Comparator; +import java.util.List; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + import org.apache.commons.io.IOUtils; -import org.apache.kylin.common.util.Array; -import org.apache.kylin.common.util.Pair; -import org.apache.kylin.metadata.model.TableDesc; -import org.apache.kylin.source.ReadableTable; -import org.apache.kylin.source.ReadableTable.TableReader; - -import com.google.common.collect.Sets; - -/** - * An in-memory lookup table, in which each cell is an object of type T. The - * table is indexed by specified PK for fast lookup. - * - * @author yangli9 - */ -abstract public class LookupTable<T> { - - protected TableDesc tableDesc; - protected String[] keyColumns; - protected ReadableTable table; - protected ConcurrentHashMap<Array<T>, T[]> data; - - public LookupTable(TableDesc tableDesc, String[] keyColumns, ReadableTable table) throws IOException { - this.tableDesc = tableDesc; - this.keyColumns = keyColumns; - this.table = table; - this.data = new ConcurrentHashMap<Array<T>, T[]>(); - init(); - } - - protected void init() throws IOException { - int[] keyIndex = new int[keyColumns.length]; - for (int i = 0; i < keyColumns.length; i++) { - keyIndex[i] = tableDesc.findColumnByName(keyColumns[i]).getZeroBasedIndex(); - } - - TableReader reader = table.getReader(); - try { - while (reader.next()) { - initRow(reader.getRow(), keyIndex); - } - } finally { +import org.apache.kylin.common.util.Array; +import org.apache.kylin.common.util.Pair; +import org.apache.kylin.metadata.model.TableDesc; +import org.apache.kylin.source.ReadableTable; +import org.apache.kylin.source.ReadableTable.TableReader; + +import com.google.common.collect.Sets; + +/** + * An in-memory lookup table, in which each cell is an object of type T. The + * table is indexed by specified PK for fast lookup. + * + * @author yangli9 + */ +abstract public class LookupTable<T> { + + protected TableDesc tableDesc; + protected String[] keyColumns; + protected ReadableTable table; + protected ConcurrentHashMap<Array<T>, T[]> data; + + public LookupTable(TableDesc tableDesc, String[] keyColumns, ReadableTable table) throws IOException { + this.tableDesc = tableDesc; + this.keyColumns = keyColumns; + this.table = table; + this.data = new ConcurrentHashMap<Array<T>, T[]>(); + init(); + } + + protected void init() throws IOException { + int[] keyIndex = new int[keyColumns.length]; + for (int i = 0; i < keyColumns.length; i++) { + keyIndex[i] = tableDesc.findColumnByName(keyColumns[i]).getZeroBasedIndex(); + } + + TableReader reader = table.getReader(); + try { + while (reader.next()) { + initRow(reader.getRow(), keyIndex); + } + } finally { IOUtils.closeQuietly(reader); - } - } - - @SuppressWarnings("unchecked") - private void initRow(String[] cols, int[] keyIndex) { - T[] value = convertRow(cols); - T[] keyCols = (T[]) java.lang.reflect.Array.newInstance(getType(), keyIndex.length); - for (int i = 0; i < keyCols.length; i++) - keyCols[i] = value[keyIndex[i]]; - - Array<T> key = new Array<T>(keyCols); - - if (data.containsKey(key)) - throw new IllegalStateException("Dup key found, key=" + toString(keyCols) + ", value1=" + toString(data.get(key)) + ", value2=" + toString(value)); - - data.put(key, value); - } - - abstract protected T[] convertRow(String[] cols); - - public T[] getRow(Array<T> key) { - return data.get(key); - } - - public Collection<T[]> getAllRows() { - return data.values(); - } - - public List<T> scan(String col, List<T> values, String returnCol) { - ArrayList<T> result = new ArrayList<T>(); - int colIdx = tableDesc.findColumnByName(col).getZeroBasedIndex(); - int returnIdx = tableDesc.findColumnByName(returnCol).getZeroBasedIndex(); - for (T[] row : data.values()) { - if (values.contains(row[colIdx])) - result.add(row[returnIdx]); - } - return result; - } - - public Pair<T, T> mapRange(String col, T beginValue, T endValue, String returnCol) { - int colIdx = tableDesc.findColumnByName(col).getZeroBasedIndex(); - int returnIdx = tableDesc.findColumnByName(returnCol).getZeroBasedIndex(); - Comparator<T> colComp = getComparator(colIdx); - Comparator<T> returnComp = getComparator(returnIdx); - - T returnBegin = null; - T returnEnd = null; - for (T[] row : data.values()) { - if (between(beginValue, row[colIdx], endValue, colComp)) { - T returnValue = row[returnIdx]; - if (returnBegin == null || returnComp.compare(returnValue, returnBegin) < 0) { - returnBegin = returnValue; - } - if (returnEnd == null || returnComp.compare(returnValue, returnEnd) > 0) { - returnEnd = returnValue; - } - } - } - if (returnBegin == null && returnEnd == null) - return null; - else - return Pair.newPair(returnBegin, returnEnd); - } - - public Set<T> mapValues(String col, Set<T> values, String returnCol) { - int colIdx = tableDesc.findColumnByName(col).getZeroBasedIndex(); - int returnIdx = tableDesc.findColumnByName(returnCol).getZeroBasedIndex(); - Set<T> result = Sets.newHashSetWithExpectedSize(values.size()); - for (T[] row : data.values()) { - if (values.contains(row[colIdx])) { - result.add(row[returnIdx]); - } - } - return result; - } - - private boolean between(T beginValue, T v, T endValue, Comparator<T> comp) { - return (beginValue == null || comp.compare(beginValue, v) <= 0) && (endValue == null || comp.compare(v, endValue) <= 0); - } - - abstract protected Comparator<T> getComparator(int colIdx); - - public String toString() { - return "LookupTable [path=" + table + "]"; - } - - protected String toString(T[] cols) { - StringBuilder b = new StringBuilder(); - b.append("["); - for (int i = 0; i < cols.length; i++) { - if (i > 0) - b.append(","); - b.append(toString(cols[i])); - } - b.append("]"); - return b.toString(); - } - - abstract protected String toString(T cell); - - abstract public Class<?> getType(); - - public void dump() { - for (Array<T> key : data.keySet()) { - System.out.println(toString(key.data) + " => " + toString(data.get(key))); - } - } - -} + } + } + + @SuppressWarnings("unchecked") + private void initRow(String[] cols, int[] keyIndex) { + T[] value = convertRow(cols); + T[] keyCols = (T[]) java.lang.reflect.Array.newInstance(getType(), keyIndex.length); + for (int i = 0; i < keyCols.length; i++) + keyCols[i] = value[keyIndex[i]]; + + Array<T> key = new Array<T>(keyCols); + + if (data.containsKey(key)) + throw new IllegalStateException("Dup key found, key=" + toString(keyCols) + ", value1=" + toString(data.get(key)) + ", value2=" + toString(value)); + + data.put(key, value); + } + + abstract protected T[] convertRow(String[] cols); + + public T[] getRow(Array<T> key) { + return data.get(key); + } + + public Collection<T[]> getAllRows() { + return data.values(); + } + + public List<T> scan(String col, List<T> values, String returnCol) { + ArrayList<T> result = new ArrayList<T>(); + int colIdx = tableDesc.findColumnByName(col).getZeroBasedIndex(); + int returnIdx = tableDesc.findColumnByName(returnCol).getZeroBasedIndex(); + for (T[] row : data.values()) { + if (values.contains(row[colIdx])) + result.add(row[returnIdx]); + } + return result; + } + + public Pair<T, T> mapRange(String col, T beginValue, T endValue, String returnCol) { + int colIdx = tableDesc.findColumnByName(col).getZeroBasedIndex(); + int returnIdx = tableDesc.findColumnByName(returnCol).getZeroBasedIndex(); + Comparator<T> colComp = getComparator(colIdx); + Comparator<T> returnComp = getComparator(returnIdx); + + T returnBegin = null; + T returnEnd = null; + for (T[] row : data.values()) { + if (between(beginValue, row[colIdx], endValue, colComp)) { + T returnValue = row[returnIdx]; + if (returnBegin == null || returnComp.compare(returnValue, returnBegin) < 0) { + returnBegin = returnValue; + } + if (returnEnd == null || returnComp.compare(returnValue, returnEnd) > 0) { + returnEnd = returnValue; + } + } + } + if (returnBegin == null && returnEnd == null) + return null; + else + return Pair.newPair(returnBegin, returnEnd); + } + + public Set<T> mapValues(String col, Set<T> values, String returnCol) { + int colIdx = tableDesc.findColumnByName(col).getZeroBasedIndex(); + int returnIdx = tableDesc.findColumnByName(returnCol).getZeroBasedIndex(); + Set<T> result = Sets.newHashSetWithExpectedSize(values.size()); + for (T[] row : data.values()) { + if (values.contains(row[colIdx])) { + result.add(row[returnIdx]); + } + } + return result; + } + + private boolean between(T beginValue, T v, T endValue, Comparator<T> comp) { + return (beginValue == null || comp.compare(beginValue, v) <= 0) && (endValue == null || comp.compare(v, endValue) <= 0); + } + + abstract protected Comparator<T> getComparator(int colIdx); + + public String toString() { + return "LookupTable [path=" + table + "]"; + } + + protected String toString(T[] cols) { + StringBuilder b = new StringBuilder(); + b.append("["); + for (int i = 0; i < cols.length; i++) { + if (i > 0) + b.append(","); + b.append(toString(cols[i])); + } + b.append("]"); + return b.toString(); + } + + abstract protected String toString(T cell); + + abstract public Class<?> getType(); + + public void dump() { + for (Array<T> key : data.keySet()) { + System.out.println(toString(key.data) + " => " + toString(data.get(key))); + } + } + +} http://git-wip-us.apache.org/repos/asf/kylin/blob/826f23f1/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotManager.java ---------------------------------------------------------------------- diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotManager.java b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotManager.java index 0d3848c..085158a 100644 --- a/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotManager.java +++ b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotManager.java @@ -1,81 +1,81 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package org.apache.kylin.dict.lookup; - -import java.io.IOException; -import java.util.NavigableSet; -import java.util.concurrent.ConcurrentHashMap; +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.dict.lookup; + +import java.io.IOException; +import java.util.NavigableSet; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; - -import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.common.persistence.ResourceStore; -import org.apache.kylin.metadata.MetadataManager; -import org.apache.kylin.metadata.model.TableDesc; -import org.apache.kylin.source.ReadableTable; -import org.apache.kylin.source.ReadableTable.TableSignature; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - + +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.persistence.ResourceStore; +import org.apache.kylin.metadata.MetadataManager; +import org.apache.kylin.metadata.model.TableDesc; +import org.apache.kylin.source.ReadableTable; +import org.apache.kylin.source.ReadableTable.TableSignature; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheLoader; import com.google.common.cache.LoadingCache; import com.google.common.cache.RemovalListener; import com.google.common.cache.RemovalNotification; -/** - * @author yangli9 - */ -public class SnapshotManager { - - private static final Logger logger = LoggerFactory.getLogger(SnapshotManager.class); - - // static cached instances - private static final ConcurrentHashMap<KylinConfig, SnapshotManager> SERVICE_CACHE = new ConcurrentHashMap<KylinConfig, SnapshotManager>(); - - public static SnapshotManager getInstance(KylinConfig config) { - SnapshotManager r = SERVICE_CACHE.get(config); - if (r == null) { - synchronized (SnapshotManager.class) { - r = SERVICE_CACHE.get(config); - if (r == null) { - r = new SnapshotManager(config); - SERVICE_CACHE.put(config, r); - if (SERVICE_CACHE.size() > 1) { - logger.warn("More than one singleton exist"); - } - } - } - } - return r; - } - - // ============================================================================ - - private KylinConfig config; +/** + * @author yangli9 + */ +public class SnapshotManager { + + private static final Logger logger = LoggerFactory.getLogger(SnapshotManager.class); + + // static cached instances + private static final ConcurrentHashMap<KylinConfig, SnapshotManager> SERVICE_CACHE = new ConcurrentHashMap<KylinConfig, SnapshotManager>(); + + public static SnapshotManager getInstance(KylinConfig config) { + SnapshotManager r = SERVICE_CACHE.get(config); + if (r == null) { + synchronized (SnapshotManager.class) { + r = SERVICE_CACHE.get(config); + if (r == null) { + r = new SnapshotManager(config); + SERVICE_CACHE.put(config, r); + if (SERVICE_CACHE.size() > 1) { + logger.warn("More than one singleton exist"); + } + } + } + } + return r; + } + + // ============================================================================ + + private KylinConfig config; private LoadingCache<String, SnapshotTable> snapshotCache; // resource - - // path ==> - // SnapshotTable - - private SnapshotManager(KylinConfig config) { - this.config = config; + + // path ==> + // SnapshotTable + + private SnapshotManager(KylinConfig config) { + this.config = config; this.snapshotCache = CacheBuilder.newBuilder().removalListener(new RemovalListener<String, SnapshotTable>() { @Override public void onRemoval(RemovalNotification<String, SnapshotTable> notification) { @@ -89,13 +89,13 @@ public class SnapshotManager { return snapshotTable; } }); - } - - public void wipeoutCache() { + } + + public void wipeoutCache() { snapshotCache.invalidateAll(); - } - - public SnapshotTable getSnapshotTable(String resourcePath) throws IOException { + } + + public SnapshotTable getSnapshotTable(String resourcePath) throws IOException { try { SnapshotTable r = snapshotCache.get(resourcePath); if (r == null) { @@ -105,114 +105,114 @@ public class SnapshotManager { return r; } catch (ExecutionException e) { throw new RuntimeException(e.getCause()); - } - } - - public void removeSnapshot(String resourcePath) throws IOException { - ResourceStore store = MetadataManager.getInstance(this.config).getStore(); - store.deleteResource(resourcePath); + } + } + + public void removeSnapshot(String resourcePath) throws IOException { + ResourceStore store = MetadataManager.getInstance(this.config).getStore(); + store.deleteResource(resourcePath); snapshotCache.invalidate(resourcePath); - } - - public SnapshotTable buildSnapshot(ReadableTable table, TableDesc tableDesc) throws IOException { - SnapshotTable snapshot = new SnapshotTable(table, tableDesc.getIdentity()); - snapshot.updateRandomUuid(); - - String dup = checkDupByInfo(snapshot); - if (dup != null) { - logger.info("Identical input " + table.getSignature() + ", reuse existing snapshot at " + dup); - return getSnapshotTable(dup); - } - - if (snapshot.getSignature().getSize() / 1024 / 1024 > config.getTableSnapshotMaxMB()) { - throw new IllegalStateException("Table snapshot should be no greater than " + config.getTableSnapshotMaxMB() // - + " MB, but " + tableDesc + " size is " + snapshot.getSignature().getSize()); - } - - snapshot.takeSnapshot(table, tableDesc); - - return trySaveNewSnapshot(snapshot); - } - - public SnapshotTable rebuildSnapshot(ReadableTable table, TableDesc tableDesc, String overwriteUUID) throws IOException { - SnapshotTable snapshot = new SnapshotTable(table, tableDesc.getIdentity()); - snapshot.setUuid(overwriteUUID); - - snapshot.takeSnapshot(table, tableDesc); - - SnapshotTable existing = getSnapshotTable(snapshot.getResourcePath()); - snapshot.setLastModified(existing.getLastModified()); - - save(snapshot); - snapshotCache.put(snapshot.getResourcePath(), snapshot); - - return snapshot; - } - - public SnapshotTable trySaveNewSnapshot(SnapshotTable snapshotTable) throws IOException { - - String dupTable = checkDupByContent(snapshotTable); - if (dupTable != null) { - logger.info("Identical snapshot content " + snapshotTable + ", reuse existing snapshot at " + dupTable); - return getSnapshotTable(dupTable); - } - - save(snapshotTable); - snapshotCache.put(snapshotTable.getResourcePath(), snapshotTable); - - return snapshotTable; - } - - private String checkDupByInfo(SnapshotTable snapshot) throws IOException { - ResourceStore store = MetadataManager.getInstance(this.config).getStore(); - String resourceDir = snapshot.getResourceDir(); - NavigableSet<String> existings = store.listResources(resourceDir); - if (existings == null) - return null; - - TableSignature sig = snapshot.getSignature(); - for (String existing : existings) { - SnapshotTable existingTable = load(existing, false); // skip cache, - // direct load from store - if (existingTable != null && sig.equals(existingTable.getSignature())) - return existing; - } - - return null; - } - - private String checkDupByContent(SnapshotTable snapshot) throws IOException { - ResourceStore store = MetadataManager.getInstance(this.config).getStore(); - String resourceDir = snapshot.getResourceDir(); - NavigableSet<String> existings = store.listResources(resourceDir); - if (existings == null) - return null; - - for (String existing : existings) { - SnapshotTable existingTable = load(existing, true); // skip cache, direct load from store - if (existingTable != null && existingTable.equals(snapshot)) - return existing; - } - - return null; - } - - private void save(SnapshotTable snapshot) throws IOException { - ResourceStore store = MetadataManager.getInstance(this.config).getStore(); - String path = snapshot.getResourcePath(); - store.putResource(path, snapshot, SnapshotTableSerializer.FULL_SERIALIZER); - } - - private SnapshotTable load(String resourcePath, boolean loadData) throws IOException { - logger.info("Loading snapshotTable from " + resourcePath + ", with loadData: " + loadData); - ResourceStore store = MetadataManager.getInstance(this.config).getStore(); - - SnapshotTable table = store.getResource(resourcePath, SnapshotTable.class, loadData ? SnapshotTableSerializer.FULL_SERIALIZER : SnapshotTableSerializer.INFO_SERIALIZER); - - if (loadData) - logger.debug("Loaded snapshot at " + resourcePath); - - return table; - } - -} + } + + public SnapshotTable buildSnapshot(ReadableTable table, TableDesc tableDesc) throws IOException { + SnapshotTable snapshot = new SnapshotTable(table, tableDesc.getIdentity()); + snapshot.updateRandomUuid(); + + String dup = checkDupByInfo(snapshot); + if (dup != null) { + logger.info("Identical input " + table.getSignature() + ", reuse existing snapshot at " + dup); + return getSnapshotTable(dup); + } + + if (snapshot.getSignature().getSize() / 1024 / 1024 > config.getTableSnapshotMaxMB()) { + throw new IllegalStateException("Table snapshot should be no greater than " + config.getTableSnapshotMaxMB() // + + " MB, but " + tableDesc + " size is " + snapshot.getSignature().getSize()); + } + + snapshot.takeSnapshot(table, tableDesc); + + return trySaveNewSnapshot(snapshot); + } + + public SnapshotTable rebuildSnapshot(ReadableTable table, TableDesc tableDesc, String overwriteUUID) throws IOException { + SnapshotTable snapshot = new SnapshotTable(table, tableDesc.getIdentity()); + snapshot.setUuid(overwriteUUID); + + snapshot.takeSnapshot(table, tableDesc); + + SnapshotTable existing = getSnapshotTable(snapshot.getResourcePath()); + snapshot.setLastModified(existing.getLastModified()); + + save(snapshot); + snapshotCache.put(snapshot.getResourcePath(), snapshot); + + return snapshot; + } + + public SnapshotTable trySaveNewSnapshot(SnapshotTable snapshotTable) throws IOException { + + String dupTable = checkDupByContent(snapshotTable); + if (dupTable != null) { + logger.info("Identical snapshot content " + snapshotTable + ", reuse existing snapshot at " + dupTable); + return getSnapshotTable(dupTable); + } + + save(snapshotTable); + snapshotCache.put(snapshotTable.getResourcePath(), snapshotTable); + + return snapshotTable; + } + + private String checkDupByInfo(SnapshotTable snapshot) throws IOException { + ResourceStore store = MetadataManager.getInstance(this.config).getStore(); + String resourceDir = snapshot.getResourceDir(); + NavigableSet<String> existings = store.listResources(resourceDir); + if (existings == null) + return null; + + TableSignature sig = snapshot.getSignature(); + for (String existing : existings) { + SnapshotTable existingTable = load(existing, false); // skip cache, + // direct load from store + if (existingTable != null && sig.equals(existingTable.getSignature())) + return existing; + } + + return null; + } + + private String checkDupByContent(SnapshotTable snapshot) throws IOException { + ResourceStore store = MetadataManager.getInstance(this.config).getStore(); + String resourceDir = snapshot.getResourceDir(); + NavigableSet<String> existings = store.listResources(resourceDir); + if (existings == null) + return null; + + for (String existing : existings) { + SnapshotTable existingTable = load(existing, true); // skip cache, direct load from store + if (existingTable != null && existingTable.equals(snapshot)) + return existing; + } + + return null; + } + + private void save(SnapshotTable snapshot) throws IOException { + ResourceStore store = MetadataManager.getInstance(this.config).getStore(); + String path = snapshot.getResourcePath(); + store.putResource(path, snapshot, SnapshotTableSerializer.FULL_SERIALIZER); + } + + private SnapshotTable load(String resourcePath, boolean loadData) throws IOException { + logger.info("Loading snapshotTable from " + resourcePath + ", with loadData: " + loadData); + ResourceStore store = MetadataManager.getInstance(this.config).getStore(); + + SnapshotTable table = store.getResource(resourcePath, SnapshotTable.class, loadData ? SnapshotTableSerializer.FULL_SERIALIZER : SnapshotTableSerializer.INFO_SERIALIZER); + + if (loadData) + logger.debug("Loaded snapshot at " + resourcePath); + + return table; + } + +}