Repository: kylin
Updated Branches:
  refs/heads/2.x-staging 5f4c581e6 -> f4667226d


http://git-wip-us.apache.org/repos/asf/kylin/blob/daa294b6/core-metadata/src/main/java/org/apache/kylin/measure/topn/DoublyLinkedList.java
----------------------------------------------------------------------
diff --git 
a/core-metadata/src/main/java/org/apache/kylin/measure/topn/DoublyLinkedList.java
 
b/core-metadata/src/main/java/org/apache/kylin/measure/topn/DoublyLinkedList.java
new file mode 100644
index 0000000..03e4066
--- /dev/null
+++ 
b/core-metadata/src/main/java/org/apache/kylin/measure/topn/DoublyLinkedList.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.measure.topn;
+
+
+/**
+ * Modified from DoublyLinkedList.java in https://github.com/addthis/stream-lib
+ * 
+ * @param <T>
+ */
+public class DoublyLinkedList<T> {
+
+    private int size = 0;
+    private ListNode2<T> tail;
+    private ListNode2<T> head;
+
+    /**
+     * Append to head of list
+     */
+    public ListNode2<T> add(T value) {
+        ListNode2<T> node = new ListNode2<T>(value);
+        add(node);
+
+        return node;
+    }
+
+    /**
+     * Prepend to tail of list
+     */
+    public ListNode2<T> enqueue(T value) {
+        ListNode2<T> node = new ListNode2<T>(value);
+       
+        return enqueue(node);
+    }
+
+    public ListNode2<T> enqueue(ListNode2<T> node) {
+        if (size++ == 0) {
+            head = node;
+        } else {
+            node.next = tail;
+            tail.prev = node;
+        }
+
+        tail = node;
+
+        return node;
+    }
+
+    public void add(ListNode2<T> node) {
+        node.prev = head;
+        node.next = null;
+
+        if (size++ == 0) {
+            tail = node;
+        } else {
+            head.next = node;
+        }
+
+        head = node;
+    }
+
+    public ListNode2<T> addAfter(ListNode2<T> node, T value) {
+        ListNode2<T> newNode = new ListNode2<T>(value);
+        addAfter(node, newNode);
+        return newNode;
+    }
+
+    public void addAfter(ListNode2<T> node, ListNode2<T> newNode) {
+        newNode.next = node.next;
+        newNode.prev = node;
+        node.next = newNode;
+        if (newNode.next == null) {
+            head = newNode;
+        } else {
+            newNode.next.prev = newNode;
+        }
+        size++;
+    }
+
+
+    public void addBefore(ListNode2<T> node, ListNode2<T> newNode) {
+        newNode.prev = node.prev;
+        newNode.next = node;
+        node.prev = newNode;
+        if (newNode.prev == null) {
+            tail = newNode;
+        } else {
+            newNode.prev.next = newNode;
+        }
+        size++;
+    }
+
+    public void remove(ListNode2<T> node) {
+        if (node == tail) {
+            tail = node.next;
+        } else {
+            node.prev.next = node.next;
+        }
+
+        if (node == head) {
+            head = node.prev;
+        } else {
+            node.next.prev = node.prev;
+        }
+        size--;
+    }
+
+    public int size() {
+        return size;
+    }
+    
+
+    public ListNode2<T> head() {
+        return head;
+    }
+
+    public ListNode2<T> tail() {
+        return tail;
+    }
+
+    public boolean isEmpty() {
+        return size == 0;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/daa294b6/core-metadata/src/main/java/org/apache/kylin/measure/topn/ListNode2.java
----------------------------------------------------------------------
diff --git 
a/core-metadata/src/main/java/org/apache/kylin/measure/topn/ListNode2.java 
b/core-metadata/src/main/java/org/apache/kylin/measure/topn/ListNode2.java
new file mode 100644
index 0000000..b2f47c9
--- /dev/null
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/topn/ListNode2.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.measure.topn;
+
+/**
+ * Modified from ListNode2.java in https://github.com/addthis/stream-lib
+ *  
+ * @param <T>
+ */
+public class ListNode2<T> {
+
+    protected T value;
+    protected ListNode2<T> prev;
+    protected ListNode2<T> next;
+
+    public ListNode2(T value) {
+        this.value = value;
+    }
+
+    public ListNode2<T> getPrev() {
+        return prev;
+    }
+
+    public ListNode2<T> getNext() {
+        return next;
+    }
+
+    public T getValue() {
+        return value;
+    }
+
+    public void setValue(T value) {
+        this.value = value;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/daa294b6/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNAggregator.java
----------------------------------------------------------------------
diff --git 
a/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNAggregator.java 
b/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNAggregator.java
index 9b4c893..6ea4e7a 100644
--- 
a/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNAggregator.java
+++ 
b/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNAggregator.java
@@ -20,7 +20,6 @@ package org.apache.kylin.measure.topn;
 
 import java.util.Map;
 
-import org.apache.kylin.common.topn.TopNCounter;
 import org.apache.kylin.common.util.ByteArray;
 import org.apache.kylin.measure.MeasureAggregator;
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/daa294b6/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNCounter.java
----------------------------------------------------------------------
diff --git 
a/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNCounter.java 
b/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNCounter.java
new file mode 100644
index 0000000..fd35309
--- /dev/null
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNCounter.java
@@ -0,0 +1,335 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.measure.topn;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import org.apache.kylin.common.util.Pair;
+
+import java.util.*;
+
+/**
+ * Modified from the StreamSummary.java in 
https://github.com/addthis/stream-lib
+ * 
+ * Based on the <i>Space-Saving</i> algorithm and the <i>Stream-Summary</i>
+ * data structure as described in:
+ * <i>Efficient Computation of Frequent and Top-k Elements in Data Streams</i>
+ * by Metwally, Agrawal, and Abbadi
+ *
+ * @param <T> type of data in the stream to be summarized
+ */
+public class TopNCounter<T> implements Iterable<Counter<T>> {
+
+    public static final int EXTRA_SPACE_RATE = 50;
+
+    protected int capacity;
+    private HashMap<T, ListNode2<Counter<T>>> counterMap;
+    protected DoublyLinkedList<Counter<T>> counterList;
+
+    /**
+     * @param capacity maximum size (larger capacities improve accuracy)
+     */
+    public TopNCounter(int capacity) {
+        this.capacity = capacity;
+        counterMap = new HashMap<T, ListNode2<Counter<T>>>();
+        counterList = new DoublyLinkedList<Counter<T>>();
+    }
+
+    public int getCapacity() {
+        return capacity;
+    }
+
+    /**
+     * Algorithm: <i>Space-Saving</i>
+     *
+     * @param item stream element (<i>e</i>)
+     * @return false if item was already in the stream summary, true otherwise
+     */
+    public boolean offer(T item) {
+        return offer(item, 1.0);
+    }
+
+    /**
+     * Algorithm: <i>Space-Saving</i>
+     *
+     * @param item stream element (<i>e</i>)
+     * @return false if item was already in the stream summary, true otherwise
+     */
+    public boolean offer(T item, double incrementCount) {
+        return offerReturnAll(item, incrementCount).getFirst();
+    }
+
+    /**
+     * @param item stream element (<i>e</i>)
+     * @return item dropped from summary if an item was dropped, null otherwise
+     */
+    public T offerReturnDropped(T item, double incrementCount) {
+        return offerReturnAll(item, incrementCount).getSecond();
+    }
+
+    /**
+     * @param item stream element (<i>e</i>)
+     * @return Pair<isNewItem, itemDropped> where isNewItem is the return 
value of offer() and itemDropped is null if no item was dropped
+     */
+    public Pair<Boolean, T> offerReturnAll(T item, double incrementCount) {
+        ListNode2<Counter<T>> counterNode = counterMap.get(item);
+        boolean isNewItem = (counterNode == null);
+        T droppedItem = null;
+        if (isNewItem) {
+
+            if (size() < capacity) {
+                counterNode = counterList.enqueue(new Counter<T>(item));
+            } else {
+                counterNode = counterList.tail();
+                Counter<T> counter = counterNode.getValue();
+                droppedItem = counter.item;
+                counterMap.remove(droppedItem);
+                counter.item = item;
+                counter.count = 0.0;
+            }
+            counterMap.put(item, counterNode);
+        }
+
+        incrementCounter(counterNode, incrementCount);
+
+        return Pair.newPair(isNewItem, droppedItem);
+    }
+
+    protected void incrementCounter(ListNode2<Counter<T>> counterNode, double 
incrementCount) {
+        Counter<T> counter = counterNode.getValue();
+        counter.count += incrementCount;
+
+        ListNode2<Counter<T>> nodeNext; 
+                
+        if (incrementCount > 0) {
+            nodeNext = counterNode.getNext();
+        } else {
+            nodeNext = counterNode.getPrev();
+        }
+        counterList.remove(counterNode);
+        counterNode.prev = null;
+        counterNode.next = null;
+
+        if (incrementCount > 0) {
+            while (nodeNext != null && counter.count >= 
nodeNext.getValue().count) {
+                nodeNext = nodeNext.getNext();
+            }
+            if (nodeNext != null) {
+                counterList.addBefore(nodeNext, counterNode);
+            } else {
+                counterList.add(counterNode);
+            }
+            
+        } else {
+            while (nodeNext != null && counter.count < 
nodeNext.getValue().count) {
+                nodeNext = nodeNext.getPrev();
+            }
+            if (nodeNext != null) {
+                counterList.addAfter(nodeNext, counterNode);
+            } else {
+                counterList.enqueue(counterNode);
+            }
+        }
+
+       
+
+    }
+
+    public List<T> peek(int k) {
+        List<T> topK = new ArrayList<T>(k);
+
+        for (ListNode2<Counter<T>> bNode = counterList.head(); bNode != null; 
bNode = bNode.getPrev()) {
+            Counter<T> b = bNode.getValue();
+            if (topK.size() == k) {
+                return topK;
+            }
+            topK.add(b.item);
+        }
+
+        return topK;
+    }
+
+    public List<Counter<T>> topK(int k) {
+        List<Counter<T>> topK = new ArrayList<Counter<T>>(k);
+
+        for (ListNode2<Counter<T>> bNode = counterList.head(); bNode != null; 
bNode = bNode.getPrev()) {
+            Counter<T> b = bNode.getValue();
+            if (topK.size() == k) {
+                return topK;
+            }
+            topK.add(b);
+        }
+
+        return topK;
+    }
+
+    /**
+     * @return number of items stored
+     */
+    public int size() {
+        return counterMap.size();
+    }
+
+    @Override
+    public String toString() {
+        StringBuilder sb = new StringBuilder();
+        sb.append('[');
+        for (ListNode2<Counter<T>> bNode = counterList.head(); bNode != null; 
bNode = bNode.getPrev()) {
+            Counter<T> b = bNode.getValue();
+            sb.append(b.item);
+            sb.append(':');
+            sb.append(b.count);
+        }
+        sb.append(']');
+        return sb.toString();
+    }
+
+    /**
+     * Put element to the head position;
+     * The consumer should call this method with count in ascending way; the 
item will be directly put to the head of the list, without comparison for best 
performance;
+     * @param item
+     * @param count
+     */
+    public void offerToHead(T item, double count) {
+        Counter<T> c = new Counter<T>(item);
+        c.count = count;
+        ListNode2<Counter<T>> node = counterList.add(c);
+        counterMap.put(c.item, node);
+    }
+
+    /**
+     * Merge another counter into this counter;
+     * @param another
+     * @return
+     */
+    public TopNCounter<T> merge(TopNCounter<T> another) {
+        double m1 = 0.0, m2 = 0.0;
+        if (this.size() >= this.capacity) {
+            m1 = this.counterList.tail().getValue().count;
+        }
+
+        if (another.size() >= another.capacity) {
+            m2 = another.counterList.tail().getValue().count;
+        }
+        
+        Set<T> duplicateItems = Sets.newHashSet(); 
+        List<T> notDuplicateItems = Lists.newArrayList();
+        
+        for (Map.Entry<T, ListNode2<Counter<T>>> entry : 
this.counterMap.entrySet()) {
+            T item = entry.getKey();
+            ListNode2<Counter<T>> existing = another.counterMap.get(item);
+            if (existing != null) {
+                duplicateItems.add(item);
+            } else {
+                notDuplicateItems.add(item);
+            }
+        }
+
+        for(T item : duplicateItems) {
+            this.offer(item, another.counterMap.get(item).getValue().count);
+        }
+        
+        for(T item : notDuplicateItems) {
+            this.offer(item, m2);
+        }
+
+        for (Map.Entry<T, ListNode2<Counter<T>>> entry : 
another.counterMap.entrySet()) {
+            T item = entry.getKey();
+            if (duplicateItems.contains(item) == false) {
+                double counter = entry.getValue().getValue().count;
+                this.offer(item, counter + m1);
+            }
+        }
+
+        return this;
+    }
+
+    /**
+     * Retain the capacity to the given number; The extra counters will be cut 
off
+     * @param newCapacity
+     */
+    public void retain(int newCapacity) {
+        assert newCapacity > 0;
+        this.capacity = newCapacity;
+        if (newCapacity < this.size()) {
+            ListNode2<Counter<T>> tail = counterList.tail();
+            while (tail != null && this.size() > newCapacity) {
+                Counter<T> bucket = tail.getValue();
+                this.counterMap.remove(bucket.getItem());
+                this.counterList.remove(tail);
+                tail = this.counterList.tail();
+            }
+        }
+
+    }
+
+    /**
+     * Get the counter values in ascending order
+     * @return
+     */
+    public double[] getCounters() {
+        double[] counters = new double[size()];
+        int index = 0;
+
+        for (ListNode2<Counter<T>> bNode = counterList.tail(); bNode != null; 
bNode = bNode.getNext()) {
+            Counter<T> b = bNode.getValue();
+            counters[index] = b.count;
+            index++;
+        }
+
+        assert index == size();
+        return counters;
+    }
+
+    @Override
+    public Iterator<Counter<T>> iterator() {
+        return new TopNCounterIterator();
+    }
+    
+    /**
+     * Iterator from the tail (smallest) to head (biggest);
+     */
+    private class TopNCounterIterator implements Iterator<Counter<T>> {
+
+        private ListNode2<Counter<T>> currentBNode;
+
+        private TopNCounterIterator() {
+            currentBNode = counterList.tail();
+        }
+
+        @Override
+        public boolean hasNext() {
+            return currentBNode != null;
+
+        }
+
+        @Override
+        public Counter<T> next() {
+            Counter<T> counter = currentBNode.getValue();
+            currentBNode = currentBNode.getNext();
+            return counter;
+        }
+
+        @Override
+        public void remove() {
+            throw new UnsupportedOperationException();
+        }
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/daa294b6/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNCounterSerializer.java
----------------------------------------------------------------------
diff --git 
a/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNCounterSerializer.java
 
b/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNCounterSerializer.java
index 777b47f..604365c 100644
--- 
a/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNCounterSerializer.java
+++ 
b/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNCounterSerializer.java
@@ -22,9 +22,6 @@ import java.nio.ByteBuffer;
 import java.util.Iterator;
 import java.util.List;
 
-import org.apache.kylin.common.topn.Counter;
-import org.apache.kylin.common.topn.DoubleDeltaSerializer;
-import org.apache.kylin.common.topn.TopNCounter;
 import org.apache.kylin.common.util.ByteArray;
 import org.apache.kylin.metadata.datatype.DataType;
 import org.apache.kylin.metadata.datatype.DataTypeSerializer;

http://git-wip-us.apache.org/repos/asf/kylin/blob/daa294b6/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNMeasureType.java
----------------------------------------------------------------------
diff --git 
a/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNMeasureType.java
 
b/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNMeasureType.java
index 06493f7..0f79c1d 100644
--- 
a/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNMeasureType.java
+++ 
b/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNMeasureType.java
@@ -24,8 +24,6 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 
-import org.apache.kylin.common.topn.Counter;
-import org.apache.kylin.common.topn.TopNCounter;
 import org.apache.kylin.common.util.ByteArray;
 import org.apache.kylin.common.util.BytesUtil;
 import org.apache.kylin.common.util.Dictionary;

http://git-wip-us.apache.org/repos/asf/kylin/blob/daa294b6/core-metadata/src/test/java/org/apache/kylin/aggregation/topn/TopNCounterSerializerTest.java
----------------------------------------------------------------------
diff --git 
a/core-metadata/src/test/java/org/apache/kylin/aggregation/topn/TopNCounterSerializerTest.java
 
b/core-metadata/src/test/java/org/apache/kylin/aggregation/topn/TopNCounterSerializerTest.java
index abe11e3..5b50241 100644
--- 
a/core-metadata/src/test/java/org/apache/kylin/aggregation/topn/TopNCounterSerializerTest.java
+++ 
b/core-metadata/src/test/java/org/apache/kylin/aggregation/topn/TopNCounterSerializerTest.java
@@ -2,7 +2,7 @@ package org.apache.kylin.aggregation.topn;
 
 import java.nio.ByteBuffer;
 
-import org.apache.kylin.common.topn.TopNCounter;
+import org.apache.kylin.measure.topn.TopNCounter;
 import org.apache.kylin.common.util.ByteArray;
 import org.apache.kylin.common.util.Bytes;
 import org.apache.kylin.measure.topn.TopNCounterSerializer;

http://git-wip-us.apache.org/repos/asf/kylin/blob/daa294b6/core-metadata/src/test/java/org/apache/kylin/measure/hll/HyperLogLogCounterTest.java
----------------------------------------------------------------------
diff --git 
a/core-metadata/src/test/java/org/apache/kylin/measure/hll/HyperLogLogCounterTest.java
 
b/core-metadata/src/test/java/org/apache/kylin/measure/hll/HyperLogLogCounterTest.java
new file mode 100644
index 0000000..e96b9d0
--- /dev/null
+++ 
b/core-metadata/src/test/java/org/apache/kylin/measure/hll/HyperLogLogCounterTest.java
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.measure.hll;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.HashSet;
+import java.util.Random;
+import java.util.Set;
+
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.measure.hllc.HyperLogLogPlusCounter;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * @author yangli9
+ * 
+ */
+public class HyperLogLogCounterTest {
+
+    ByteBuffer buf = ByteBuffer.allocate(1024 * 1024);
+    Random rand1 = new Random(1);
+    Random rand2 = new Random(2);
+    Random rand3 = new Random(3);
+    int errorCount1 = 0;
+    int errorCount2 = 0;
+    int errorCount3 = 0;
+
+    @Test
+    public void testPeekLength() throws IOException {
+        HyperLogLogPlusCounter hllc = new HyperLogLogPlusCounter(10);
+        HyperLogLogPlusCounter copy = new HyperLogLogPlusCounter(10);
+        byte[] value = new byte[10];
+        for (int i = 0; i < 200000; i++) {
+            rand1.nextBytes(value);
+            hllc.add(value);
+
+            buf.clear();
+            hllc.writeRegisters(buf);
+
+            int len = buf.position();
+            buf.position(0);
+            assertEquals(len, hllc.peekLength(buf));
+
+            copy.readRegisters(buf);
+            assertEquals(len, buf.position());
+            assertEquals(hllc, copy);
+        }
+        buf.clear();
+    }
+
+    private Set<String> generateTestData(int n) {
+        Set<String> testData = new HashSet<String>();
+        for (int i = 0; i < n; i++) {
+            String[] samples = generateSampleData();
+            for (String sample : samples) {
+                testData.add(sample);
+            }
+        }
+        return testData;
+    }
+
+    // simulate the visit (=visitor+id)
+    private String[] generateSampleData() {
+
+        StringBuilder buf = new StringBuilder();
+        for (int i = 0; i < 19; i++) {
+            buf.append(Math.abs(rand1.nextInt()) % 10);
+        }
+        String header = buf.toString();
+
+        int size = Math.abs(rand3.nextInt()) % 9 + 1;
+        String[] samples = new String[size];
+        for (int k = 0; k < size; k++) {
+            buf = new StringBuilder(header);
+            buf.append("-");
+            for (int i = 0; i < 10; i++) {
+                buf.append(Math.abs(rand3.nextInt()) % 10);
+            }
+            samples[k] = buf.toString();
+        }
+
+        return samples;
+    }
+
+    @Test
+    public void countTest() throws IOException {
+        int n = 10;
+        for (int i = 0; i < 5; i++) {
+            count(n);
+            n *= 10;
+        }
+    }
+
+    private void count(int n) throws IOException {
+        Set<String> testSet = generateTestData(n);
+
+        HyperLogLogPlusCounter hllc = newHLLC();
+        for (String testData : testSet) {
+            hllc.add(Bytes.toBytes(testData));
+        }
+        long estimate = hllc.getCountEstimate();
+        double errorRate = hllc.getErrorRate();
+        double actualError = (double) Math.abs(testSet.size() - estimate) / 
testSet.size();
+        System.out.println(estimate);
+        System.out.println(testSet.size());
+        System.out.println(errorRate);
+        System.out.println("=" + actualError);
+        Assert.assertTrue(actualError < errorRate * 3.0);
+
+        checkSerialize(hllc);
+    }
+
+    private void checkSerialize(HyperLogLogPlusCounter hllc) throws 
IOException {
+        long estimate = hllc.getCountEstimate();
+        buf.clear();
+        hllc.writeRegisters(buf);
+        buf.flip();
+        hllc.readRegisters(buf);
+        Assert.assertEquals(estimate, hllc.getCountEstimate());
+    }
+
+    @Test
+    public void mergeTest() throws IOException {
+        double error = 0;
+        double absError = 0;
+        int n = 100;
+        for (int i = 0; i < n; i++) {
+            double e = merge();
+            error += e;
+            absError += Math.abs(e);
+        }
+        System.out.println("Total average error is " + error / n + " and 
absolute error is " + absError / n);
+        
+        System.out.println("  errorRateCount1 is " + errorCount1 + "!");
+        System.out.println("  errorRateCount2 is " + errorCount2 + "!");
+        System.out.println("  errorRateCount3 is " + errorCount3 + "!");
+
+        Assert.assertTrue(errorCount1 <= n * 0.40);
+        Assert.assertTrue(errorCount2 <= n * 0.08);
+        Assert.assertTrue(errorCount3 <= n * 0.02);
+    }
+
+    private double merge() throws IOException {
+
+        int ln = 50;
+        int dn = 300;
+        Set<String> testSet = new HashSet<String>();
+        HyperLogLogPlusCounter[] hllcs = new HyperLogLogPlusCounter[ln];
+        for (int i = 0; i < ln; i++) {
+            hllcs[i] = newHLLC();
+            for (int k = 0; k < dn; k++) {
+                String[] samples = generateSampleData();
+                for (String data : samples) {
+                    testSet.add(data);
+                    hllcs[i].add(Bytes.toBytes(data));
+                }
+            }
+        }
+        HyperLogLogPlusCounter mergeHllc = newHLLC();
+        for (HyperLogLogPlusCounter hllc : hllcs) {
+            mergeHllc.merge(hllc);
+            checkSerialize(mergeHllc);
+        }
+
+        double errorRate = mergeHllc.getErrorRate();
+        long estimate = mergeHllc.getCountEstimate();
+        double actualError = (double) (testSet.size() - estimate) / 
testSet.size();
+
+        System.out.println(testSet.size() + "-" + estimate + " ~ " + 
actualError);
+        
+        if (Math.abs(actualError) > errorRate) {
+            errorCount1++;
+        }
+        if (Math.abs(actualError) > 2 * errorRate) {
+            errorCount2++;
+        }
+        if (Math.abs(actualError) > 3 * errorRate) {
+            errorCount3++;
+        }
+
+        return actualError;
+    }
+
+    @Test
+    public void testPerformance() throws IOException {
+        int N = 3; // reduce N HLLC into one
+        int M = 1000; // for M times, use 100000 for real perf test
+
+        HyperLogLogPlusCounter samples[] = new HyperLogLogPlusCounter[N];
+        for (int i = 0; i < N; i++) {
+            samples[i] = newHLLC();
+            for (String str : generateTestData(10000))
+                samples[i].add(str);
+        }
+
+        System.out.println("Perf test running ... ");
+        long start = System.currentTimeMillis();
+        HyperLogLogPlusCounter sum = newHLLC();
+        for (int i = 0; i < M; i++) {
+            sum.clear();
+            for (int j = 0; j < N; j++) {
+                sum.merge(samples[j]);
+                checkSerialize(sum);
+            }
+        }
+        long duration = System.currentTimeMillis() - start;
+        System.out.println("Perf test result: " + duration / 1000 + " 
seconds");
+    }
+
+    @Test
+    public void testEquivalence() {
+        byte[] a = new byte[] { 0, 3, 4, 42, 2, 2 };
+        byte[] b = new byte[] { 3, 4, 42 };
+        HyperLogLogPlusCounter ha = new HyperLogLogPlusCounter();
+        HyperLogLogPlusCounter hb = new HyperLogLogPlusCounter();
+        ha.add(a, 1, 3);
+        hb.add(b);
+
+        Assert.assertTrue(ha.getCountEstimate() == hb.getCountEstimate());
+    }
+
+    private HyperLogLogPlusCounter newHLLC() {
+        return new HyperLogLogPlusCounter(16);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/daa294b6/core-metadata/src/test/java/org/apache/kylin/measure/topn/DoubleDeltaSerializerTest.java
----------------------------------------------------------------------
diff --git 
a/core-metadata/src/test/java/org/apache/kylin/measure/topn/DoubleDeltaSerializerTest.java
 
b/core-metadata/src/test/java/org/apache/kylin/measure/topn/DoubleDeltaSerializerTest.java
new file mode 100644
index 0000000..3a387f5
--- /dev/null
+++ 
b/core-metadata/src/test/java/org/apache/kylin/measure/topn/DoubleDeltaSerializerTest.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kylin.measure.topn;
+
+import static org.junit.Assert.*;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Random;
+
+import org.junit.Test;
+
+public class DoubleDeltaSerializerTest {
+
+    ByteBuffer buf = ByteBuffer.allocate(8192);
+    DoubleDeltaSerializer dds = new DoubleDeltaSerializer();
+
+    @Test
+    public void testEmpty() {
+        buf.clear();
+        dds.serialize(new double[] {}, buf);
+        buf.flip();
+        double[] r = dds.deserialize(buf);
+        assertTrue(r.length == 0);
+    }
+
+    @Test
+    public void testSingle() {
+        buf.clear();
+        dds.serialize(new double[] { 1.2 }, buf);
+        buf.flip();
+        double[] r = dds.deserialize(buf);
+        assertArrayEquals(new double[] { 1.2 }, r);
+    }
+
+    @Test
+    public void testRounding() {
+        buf.clear();
+        dds.serialize(new double[] { 1.234, 2.345 }, buf);
+        buf.flip();
+        double[] r = dds.deserialize(buf);
+        assertArrayEquals(new double[] { 1.23, 2.35 }, r);
+    }
+
+    @Test
+    public void testRandom() {
+        Random rand = new Random();
+        int n = 1000;
+
+        double[] nums = new double[n];
+        for (int i = 0; i < n; i++) {
+            nums[i] = rand.nextDouble() * 1000000;
+        }
+        Arrays.sort(nums);
+
+        buf.clear();
+        dds.serialize(nums, buf);
+        buf.flip();
+        double[] r = dds.deserialize(buf);
+        assertArrayEquals(nums, r);
+        System.out.println("doubles size of " + (n * 8) + " bytes serialized 
to " + buf.limit() + " bytes");
+    }
+
+    private static void assertArrayEquals(double[] expected, double[] actual) {
+        assertEquals(expected.length, actual.length);
+        for (int i = 0; i < expected.length; i++) {
+            assertEquals(expected[i], actual[i], 0.01);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/daa294b6/core-metadata/src/test/java/org/apache/kylin/measure/topn/TopNCounterBasicTest.java
----------------------------------------------------------------------
diff --git 
a/core-metadata/src/test/java/org/apache/kylin/measure/topn/TopNCounterBasicTest.java
 
b/core-metadata/src/test/java/org/apache/kylin/measure/topn/TopNCounterBasicTest.java
new file mode 100644
index 0000000..a55b493
--- /dev/null
+++ 
b/core-metadata/src/test/java/org/apache/kylin/measure/topn/TopNCounterBasicTest.java
@@ -0,0 +1,131 @@
+/*
+ * Copyright (C) 2011 Clearspring Technologies, Inc. 
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.measure.topn;
+
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class TopNCounterBasicTest {
+
+    @Test
+    public void testTopNCounter() {
+        TopNCounter<String> vs = new TopNCounter<String>(3);
+        String[] stream = { "X", "X", "Y", "Z", "A", "B", "C", "X", "X", "A", 
"A", "A", "Y" };
+        for (String i : stream) {
+            vs.offer(i);
+        }
+
+        List<Counter<String>> topk = vs.topK(6);
+
+        for (Counter<String> top : topk) {
+            System.out.println(top.getItem() + ":" + top.getCount());
+        }
+
+    }
+
+    @Test
+    public void testTopK() {
+        TopNCounter<String> vs = new TopNCounter<String>(3);
+        String[] stream = { "X", "X", "Y", "Z", "A", "B", "C", "X", "X", "A", 
"C", "A", "A" };
+        for (String i : stream) {
+            vs.offer(i);
+        }
+        List<Counter<String>> topK = vs.topK(3);
+        for (Counter<String> c : topK) {
+            assertTrue(Arrays.asList("A", "C", "X").contains(c.getItem()));
+        }
+    }
+
+    @Test
+    public void testTopKWithIncrement() {
+        TopNCounter<String> vs = new TopNCounter<String>(3);
+        String[] stream = { "X", "X", "Y", "Z", "A", "B", "C", "X", "X", "A", 
"C", "A", "A" };
+        for (String i : stream) {
+            vs.offer(i, 10);
+        }
+        List<Counter<String>> topK = vs.topK(3);
+        for (Counter<String> c : topK) {
+            assertTrue(Arrays.asList("A", "C", "X").contains(c.getItem()));
+        }
+    }
+
+    @Test
+    public void testTopKWithIncrementOutOfOrder() {
+        TopNCounter<String> vs_increment = new TopNCounter<String>(3);
+        TopNCounter<String> vs_single = new TopNCounter<String>(3);
+        String[] stream = { "A", "B", "C", "D", "A" };
+        Integer[] increments = { 15, 20, 25, 30, 1 };
+
+        for (int i = 0; i < stream.length; i++) {
+            vs_increment.offer(stream[i], increments[i]);
+            for (int k = 0; k < increments[i]; k++) {
+                vs_single.offer(stream[i]);
+            }
+        }
+        System.out.println("Insert with counts vs. single inserts:");
+        System.out.println(vs_increment);
+        System.out.println(vs_single);
+
+        List<Counter<String>> topK_increment = vs_increment.topK(3);
+        List<Counter<String>> topK_single = vs_single.topK(3);
+
+        for (int i = 0; i < topK_increment.size(); i++) {
+            assertEquals(topK_increment.get(i).getItem(), 
topK_single.get(i).getItem());
+        }
+    }
+
+    @Test
+    public void testRetain() {
+        TopNCounter<String> vs = new TopNCounter<String>(10);
+        String[] stream = { "X", "X", "Y", "Z", "A", "B", "C", "X", "X", "A", 
"C", "A", "A" };
+        for (String i : stream) {
+            vs.offer(i);
+        }
+
+        vs.retain(5);
+        assertTrue(vs.size() <= 5);
+        assertTrue(vs.getCapacity() <= 5);
+    }
+
+    @Test
+    public void testMerge() {
+
+        TopNCounter<String> vs = new TopNCounter<String>(10);
+        String[] stream = { "X", "X", "Y", "Z", "A", "B", "C", "X", "X", "A", 
"C", "A", "B", "A" };
+        for (String i : stream) {
+            vs.offer(i);
+        }
+
+        String[] stream2 = { "B", "B", "Z", "Z", "B", "C", "X", "X" };
+        TopNCounter<String> vs2 = new TopNCounter<String>(10);
+        for (String i : stream2) {
+            vs2.offer(i);
+        }
+        // X: 4+2, C: 2+1, A: 3+0, B: 2 +3, Y: 1+0 Z: 1 +0
+        vs.merge(vs2);
+        List<Counter<String>> topK = vs.topK(3);
+        for (Counter<String> c : topK) {
+            assertTrue(Arrays.asList("A", "B", "X").contains(c.getItem()));
+        }
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/daa294b6/core-metadata/src/test/java/org/apache/kylin/measure/topn/TopNCounterCombinationTest.java
----------------------------------------------------------------------
diff --git 
a/core-metadata/src/test/java/org/apache/kylin/measure/topn/TopNCounterCombinationTest.java
 
b/core-metadata/src/test/java/org/apache/kylin/measure/topn/TopNCounterCombinationTest.java
new file mode 100644
index 0000000..bf51112
--- /dev/null
+++ 
b/core-metadata/src/test/java/org/apache/kylin/measure/topn/TopNCounterCombinationTest.java
@@ -0,0 +1,62 @@
+/*
+ * Copyright (C) 2011 Clearspring Technologies, Inc. 
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.measure.topn;
+
+import org.junit.Ignore;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Arrays;
+import java.util.Collection;
+
+@RunWith(Parameterized.class)
+@Ignore("For collecting accuracy statistics, not for functional test")
+public class TopNCounterCombinationTest extends TopNCounterTest {
+
+    @Parameterized.Parameters
+    public static Collection<Integer[]> configs() {
+        return Arrays.asList(new Integer[][] {
+                // with 20X space
+                { 10, 20 }, // top 10%
+                { 20, 20 }, // top 5%
+                { 100, 20 }, // top 1%
+                { 1000, 20 }, // top 0.1%
+
+                // with 50X space
+                { 10, 50 }, // top 10% 
+                { 20, 50 }, // top 5% 
+                { 100, 50 }, // top 1% 
+                { 1000, 50 }, // top 0.1%
+
+                // with 100X space
+                { 10, 100 }, // top 10% 
+                { 20, 100 }, // top 5% 
+                { 100, 100 }, // top 1% 
+                { 1000, 100 }, // top 0.1% 
+        });
+    }
+
+    public TopNCounterCombinationTest(int keySpaceRate, int spaceSavingRate) 
throws Exception {
+        super();
+        this.TOP_K = 100;
+        this.KEY_SPACE = TOP_K * keySpaceRate;
+        this.SPACE_SAVING_ROOM = spaceSavingRate;
+        TOTAL_RECORDS = 1000000; // 1 million
+        this.PARALLEL = 10;
+        this.verbose = true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/daa294b6/core-metadata/src/test/java/org/apache/kylin/measure/topn/TopNCounterTest.java
----------------------------------------------------------------------
diff --git 
a/core-metadata/src/test/java/org/apache/kylin/measure/topn/TopNCounterTest.java
 
b/core-metadata/src/test/java/org/apache/kylin/measure/topn/TopNCounterTest.java
new file mode 100644
index 0000000..7b7031b
--- /dev/null
+++ 
b/core-metadata/src/test/java/org/apache/kylin/measure/topn/TopNCounterTest.java
@@ -0,0 +1,306 @@
+/*
+ * Copyright (C) 2011 Clearspring Technologies, Inc. 
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.measure.topn;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang.RandomStringUtils;
+import org.apache.commons.math3.distribution.ZipfDistribution;
+import org.apache.kylin.common.util.Pair;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import java.io.*;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+
+@Ignore("For collecting accuracy statistics, not for functional test")
+public class TopNCounterTest {
+
+    protected static int TOP_K;
+
+    protected static int KEY_SPACE;
+
+    protected static int TOTAL_RECORDS;
+
+    protected static int SPACE_SAVING_ROOM;
+
+    protected static int PARALLEL = 10;
+    
+    protected static boolean verbose = true;
+
+    public TopNCounterTest() {
+        TOP_K = 100;
+        KEY_SPACE = 100 * TOP_K;
+        TOTAL_RECORDS = 1000000; // 1 million
+        SPACE_SAVING_ROOM = 100;
+    }
+
+    protected String prepareTestDate() throws IOException {
+        String[] allKeys = new String[KEY_SPACE];
+
+        for (int i = 0; i < KEY_SPACE; i++) {
+            allKeys[i] = RandomStringUtils.randomAlphabetic(10);
+        }
+
+        outputMsg("Start to create test random data...");
+        long startTime = System.currentTimeMillis();
+        ZipfDistribution zipf = new ZipfDistribution(KEY_SPACE, 0.5);
+        int keyIndex;
+
+        File tempFile = File.createTempFile("ZipfDistribution", ".txt");
+
+        if (tempFile.exists())
+            FileUtils.forceDelete(tempFile);
+        FileWriter fw = new FileWriter(tempFile);
+        try {
+            for (int i = 0; i < TOTAL_RECORDS; i++) {
+                keyIndex = zipf.sample() -1;
+                fw.write(allKeys[keyIndex]);
+                fw.write('\n');
+            }
+        } finally {
+            if (fw != null)
+                fw.close();
+        }
+
+        outputMsg("Create test data takes : " + (System.currentTimeMillis() - 
startTime) / 1000 + " seconds.");
+        outputMsg("Test data in : " + tempFile.getAbsolutePath());
+
+        return tempFile.getAbsolutePath();
+    }
+
+    //@Test
+    public void testSingleSpaceSaving() throws IOException {
+        String dataFile = prepareTestDate();
+        TopNCounterTest.SpaceSavingConsumer spaceSavingCounter = new 
TopNCounterTest.SpaceSavingConsumer(TOP_K * SPACE_SAVING_ROOM);
+        TopNCounterTest.HashMapConsumer accurateCounter = new 
TopNCounterTest.HashMapConsumer();
+
+        for (TopNCounterTest.TestDataConsumer consumer : new 
TopNCounterTest.TestDataConsumer[] { spaceSavingCounter, accurateCounter }) {
+            feedDataToConsumer(dataFile, consumer, 0, TOTAL_RECORDS);
+        }
+
+        FileUtils.forceDelete(new File(dataFile));
+
+        compareResult(spaceSavingCounter, accurateCounter);
+    }
+
+    private void compareResult(TopNCounterTest.TestDataConsumer firstConsumer, 
TopNCounterTest.TestDataConsumer secondConsumer) {
+        List<Pair<String, Double>> topResult1 = firstConsumer.getTopN(TOP_K);
+        outputMsg("Get topN, Space saving takes " + 
firstConsumer.getSpentTime() / 1000 + " seconds");
+        List<Pair<String, Double>> realSequence = 
secondConsumer.getTopN(TOP_K);
+        outputMsg("Get topN, Merge sort takes " + 
secondConsumer.getSpentTime() / 1000 + " seconds");
+
+        int error = 0;
+        for (int i = 0; i < topResult1.size(); i++) {
+            outputMsg("Compare " + i);
+
+            if (isClose(topResult1.get(i).getSecond().doubleValue(), 
realSequence.get(i).getSecond().doubleValue())) {
+                //            if 
(topResult1.get(i).getFirst().equals(realSequence.get(i).getFirst()) && 
topResult1.get(i).getSecond().doubleValue() == 
realSequence.get(i).getSecond().doubleValue()) {
+                outputMsg("Passed; key:" + topResult1.get(i).getFirst() + ", 
value:" + topResult1.get(i).getSecond());
+            } else {
+                outputMsg("Failed; space saving key:" + 
topResult1.get(i).getFirst() + ", value:" + topResult1.get(i).getSecond());
+                outputMsg("Failed; correct key:" + 
realSequence.get(i).getFirst() + ", value:" + realSequence.get(i).getSecond());
+                error++;
+            }
+        }
+
+        org.junit.Assert.assertEquals(0, error);
+    }
+    
+    private boolean isClose(double value1, double value2) {
+        
+        if(Math.abs(value1 - value2) < 5.0)
+            return true;
+        
+        return false;
+    }
+
+    @Test
+    public void testParallelSpaceSaving() throws IOException, 
ClassNotFoundException {
+        String dataFile = prepareTestDate();
+
+        TopNCounterTest.SpaceSavingConsumer[] parallelCounters = new 
TopNCounterTest.SpaceSavingConsumer[PARALLEL];
+
+        for (int i = 0; i < PARALLEL; i++) {
+            parallelCounters[i] = new 
TopNCounterTest.SpaceSavingConsumer(TOP_K * SPACE_SAVING_ROOM);
+        }
+
+        int slice = TOTAL_RECORDS / PARALLEL;
+        int startPosition = 0;
+        for (int i = 0; i < PARALLEL; i++) {
+            feedDataToConsumer(dataFile, parallelCounters[i], startPosition, 
startPosition + slice);
+            startPosition += slice;
+        }
+
+        TopNCounterTest.SpaceSavingConsumer[] mergedCounters = 
singleMerge(parallelCounters);
+
+        TopNCounterTest.HashMapConsumer accurateCounter = new 
TopNCounterTest.HashMapConsumer();
+        feedDataToConsumer(dataFile, accurateCounter, 0, TOTAL_RECORDS);
+
+        compareResult(mergedCounters[0], accurateCounter);
+        FileUtils.forceDelete(new File(dataFile));
+
+    }
+
+    private TopNCounterTest.SpaceSavingConsumer[] 
singleMerge(TopNCounterTest.SpaceSavingConsumer[] consumers) throws 
IOException, ClassNotFoundException {
+        List<TopNCounterTest.SpaceSavingConsumer> list = Lists.newArrayList();
+        if (consumers.length == 1)
+            return consumers;
+
+        TopNCounterTest.SpaceSavingConsumer merged = new 
TopNCounterTest.SpaceSavingConsumer(TOP_K * SPACE_SAVING_ROOM);
+        
+        for (int i=0, n=consumers.length; i<n; i++) {
+            merged.vs.merge(consumers[i].vs);
+        }
+
+        merged.vs.retain(TOP_K * SPACE_SAVING_ROOM); // remove extra elements;
+        return new TopNCounterTest.SpaceSavingConsumer[] {merged};
+        
+    }
+
+    private TopNCounterTest.SpaceSavingConsumer[] 
binaryMerge(TopNCounterTest.SpaceSavingConsumer[] consumers) throws 
IOException, ClassNotFoundException {
+        List<TopNCounterTest.SpaceSavingConsumer> list = Lists.newArrayList();
+        if (consumers.length == 1)
+            return consumers;
+
+        
+        for (int i = 0, n = consumers.length; i < n; i = i + 2) {
+            if (i + 1 < n) {
+                consumers[i].vs.merge(consumers[i + 1].vs);
+            }
+
+            list.add(consumers[i]);
+        }
+
+        return binaryMerge(list.toArray(new 
TopNCounterTest.SpaceSavingConsumer[list.size()]));
+    }
+    
+
+    private void feedDataToConsumer(String dataFile, 
TopNCounterTest.TestDataConsumer consumer, int startLine, int endLine) throws 
IOException {
+        long startTime = System.currentTimeMillis();
+        BufferedReader bufferedReader = new BufferedReader(new 
FileReader(dataFile));
+
+        int lineNum = 0;
+        String line = bufferedReader.readLine();
+        while (line != null) {
+            if (lineNum >= startLine && lineNum < endLine) {
+                consumer.addElement(line, 1.0);
+            }
+            line = bufferedReader.readLine();
+            lineNum++;
+        }
+
+        bufferedReader.close();
+        outputMsg("feed data to " + consumer.getClass().getCanonicalName() + " 
take time (seconds): " + (System.currentTimeMillis() - startTime) / 1000);
+    }
+
+    private void outputMsg(String msg) {
+        if (verbose)
+            System.out.println(msg);
+    }
+
+    private static interface TestDataConsumer {
+        public void addElement(String elementKey, double value);
+
+        public List<Pair<String, Double>> getTopN(int k);
+
+        public long getSpentTime();
+    }
+
+    private class SpaceSavingConsumer implements 
TopNCounterTest.TestDataConsumer {
+        private long timeSpent = 0;
+        protected TopNCounter<String> vs;
+
+        public SpaceSavingConsumer(int space) {
+            vs = new TopNCounter<String>(space);
+
+        }
+
+        public void addElement(String key, double value) {
+            //outputMsg("Adding " + key + ":" + incrementCount);
+            long startTime = System.currentTimeMillis();
+            vs.offer(key, value);
+            timeSpent += (System.currentTimeMillis() - startTime);
+        }
+
+        @Override
+        public List<Pair<String, Double>> getTopN(int k) {
+            long startTime = System.currentTimeMillis();
+            List<Counter<String>> tops = vs.topK(k);
+            List<Pair<String, Double>> allRecords = Lists.newArrayList();
+
+            for (Counter<String> counter : tops)
+                allRecords.add(Pair.newPair(counter.getItem(), 
counter.getCount()));
+            timeSpent += (System.currentTimeMillis() - startTime);
+            return allRecords;
+        }
+
+        @Override
+        public long getSpentTime() {
+            return timeSpent;
+        }
+    }
+
+    private class HashMapConsumer implements TopNCounterTest.TestDataConsumer {
+
+        private long timeSpent = 0;
+        private Map<String, Double> hashMap;
+
+        public HashMapConsumer() {
+            hashMap = Maps.newHashMap();
+        }
+
+        public void addElement(String key, double value) {
+            long startTime = System.currentTimeMillis();
+            if (hashMap.containsKey(key)) {
+                hashMap.put(key, hashMap.get(key) + value);
+            } else {
+                hashMap.put(key, value);
+            }
+            timeSpent += (System.currentTimeMillis() - startTime);
+        }
+
+        @Override
+        public List<Pair<String, Double>> getTopN(int k) {
+            long startTime = System.currentTimeMillis();
+            List<Pair<String, Double>> allRecords = Lists.newArrayList();
+
+            for (Map.Entry<String, Double> entry : hashMap.entrySet()) {
+                allRecords.add(Pair.newPair(entry.getKey(), entry.getValue()));
+            }
+
+            Collections.sort(allRecords, new Comparator<Pair<String, 
Double>>() {
+                @Override
+                public int compare(Pair<String, Double> o1, Pair<String, 
Double> o2) {
+                    return o1.getSecond() < o2.getSecond() ? 1 : 
(o1.getSecond() > o2.getSecond() ? -1 : 0);
+                }
+            });
+            timeSpent += (System.currentTimeMillis() - startTime);
+            return allRecords.subList(0, k);
+        }
+
+        @Override
+        public long getSpentTime() {
+            return timeSpent;
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/daa294b6/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java
----------------------------------------------------------------------
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java
 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java
index 58900e0..3aad6ae 100644
--- 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java
@@ -40,7 +40,7 @@ import org.apache.hadoop.io.SequenceFile.Reader;
 import org.apache.hadoop.io.SequenceFile.Reader.Option;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
+import org.apache.kylin.measure.hllc.HyperLogLogPlusCounter;
 import org.apache.kylin.common.persistence.ResourceStore;
 import org.apache.kylin.common.util.ByteArray;
 import org.apache.kylin.common.util.Bytes;

http://git-wip-us.apache.org/repos/asf/kylin/blob/daa294b6/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidStatsUtil.java
----------------------------------------------------------------------
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidStatsUtil.java
 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidStatsUtil.java
index a1582cb..02fe0f0 100644
--- 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidStatsUtil.java
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidStatsUtil.java
@@ -24,7 +24,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.SequenceFile;
-import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
+import org.apache.kylin.measure.hllc.HyperLogLogPlusCounter;
 import org.apache.kylin.common.util.Bytes;
 import org.apache.kylin.cube.kv.RowConstants;
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/daa294b6/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java
----------------------------------------------------------------------
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java
 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java
index 65d1530..6b54f17 100644
--- 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java
@@ -29,7 +29,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
+import org.apache.kylin.measure.hllc.HyperLogLogPlusCounter;
 import org.apache.kylin.common.util.ByteArray;
 import org.apache.kylin.common.util.Bytes;
 import org.apache.kylin.cube.CubeInstance;

http://git-wip-us.apache.org/repos/asf/kylin/blob/daa294b6/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctHiveColumnsMapper.java
----------------------------------------------------------------------
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctHiveColumnsMapper.java
 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctHiveColumnsMapper.java
index d8880dd..c72e5d6 100644
--- 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctHiveColumnsMapper.java
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctHiveColumnsMapper.java
@@ -24,7 +24,7 @@ import java.util.Collection;
 import java.util.List;
 
 import org.apache.hadoop.io.Text;
-import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
+import org.apache.kylin.measure.hllc.HyperLogLogPlusCounter;
 import org.apache.kylin.common.util.ByteArray;
 import org.apache.kylin.common.util.Bytes;
 import org.apache.kylin.cube.cuboid.CuboidScheduler;

http://git-wip-us.apache.org/repos/asf/kylin/blob/daa294b6/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsStep.java
----------------------------------------------------------------------
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsStep.java
 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsStep.java
index 05b56aa..0a6c123 100644
--- 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsStep.java
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeStatisticsStep.java
@@ -38,7 +38,7 @@ import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
+import org.apache.kylin.measure.hllc.HyperLogLogPlusCounter;
 import org.apache.kylin.common.persistence.ResourceStore;
 import org.apache.kylin.common.util.ByteArray;
 import org.apache.kylin.common.util.Bytes;

http://git-wip-us.apache.org/repos/asf/kylin/blob/daa294b6/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/CubeSamplingTest.java
----------------------------------------------------------------------
diff --git 
a/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/CubeSamplingTest.java
 
b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/CubeSamplingTest.java
index 410cec7..b9138fb 100644
--- 
a/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/CubeSamplingTest.java
+++ 
b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/CubeSamplingTest.java
@@ -19,11 +19,10 @@
 package org.apache.kylin.engine.mr.steps;
 
 import java.util.ArrayList;
-import java.util.BitSet;
 import java.util.List;
 
 import org.apache.commons.lang.RandomStringUtils;
-import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
+import org.apache.kylin.measure.hllc.HyperLogLogPlusCounter;
 import org.apache.kylin.common.util.ByteArray;
 import org.apache.kylin.common.util.Bytes;
 import org.junit.Before;

http://git-wip-us.apache.org/repos/asf/kylin/blob/daa294b6/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducerTest.java
----------------------------------------------------------------------
diff --git 
a/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducerTest.java
 
b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducerTest.java
index 5f2f100..f46683e 100644
--- 
a/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducerTest.java
+++ 
b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducerTest.java
@@ -7,7 +7,7 @@ import java.util.UUID;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
+import org.apache.kylin.measure.hllc.HyperLogLogPlusCounter;
 import org.apache.kylin.engine.mr.HadoopUtil;
 import org.apache.kylin.engine.mr.common.CuboidStatsUtil;
 import org.junit.Test;

http://git-wip-us.apache.org/repos/asf/kylin/blob/daa294b6/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java
----------------------------------------------------------------------
diff --git 
a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java 
b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java
index 7f5ab6b..285729f 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java
@@ -40,7 +40,7 @@ import 
org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.util.ToolRunner;
 import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
+import org.apache.kylin.measure.hllc.HyperLogLogPlusCounter;
 import org.apache.kylin.common.util.ByteArray;
 import org.apache.kylin.common.util.ClassUtil;
 import org.apache.kylin.common.util.Dictionary;
@@ -57,7 +57,6 @@ import org.apache.kylin.cube.model.*;
 import org.apache.kylin.cube.util.CubingUtils;
 import org.apache.kylin.dict.*;
 import org.apache.kylin.engine.mr.common.CubeStatsReader;
-import org.apache.kylin.engine.mr.steps.InMemCuboidJob;
 import org.apache.kylin.engine.spark.cube.BufferedCuboidWriter;
 import org.apache.kylin.engine.spark.cube.DefaultTupleConverter;
 import org.apache.kylin.engine.spark.util.IteratorUtils;

http://git-wip-us.apache.org/repos/asf/kylin/blob/daa294b6/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/IStreamingOutput.java
----------------------------------------------------------------------
diff --git 
a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/IStreamingOutput.java
 
b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/IStreamingOutput.java
index 31e6086..ef726a2 100644
--- 
a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/IStreamingOutput.java
+++ 
b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/IStreamingOutput.java
@@ -35,7 +35,7 @@ package org.apache.kylin.engine.streaming;
 
 import java.util.Map;
 
-import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
+import org.apache.kylin.measure.hllc.HyperLogLogPlusCounter;
 import org.apache.kylin.cube.inmemcubing.ICuboidWriter;
 import org.apache.kylin.metadata.model.IBuildable;
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/daa294b6/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/OneOffStreamingBuilder.java
----------------------------------------------------------------------
diff --git 
a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/OneOffStreamingBuilder.java
 
b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/OneOffStreamingBuilder.java
index 1965f8c..3fbade2 100644
--- 
a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/OneOffStreamingBuilder.java
+++ 
b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/OneOffStreamingBuilder.java
@@ -35,7 +35,7 @@ package org.apache.kylin.engine.streaming;
 
 import java.util.Map;
 
-import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
+import org.apache.kylin.measure.hllc.HyperLogLogPlusCounter;
 import org.apache.kylin.common.util.Dictionary;
 import org.apache.kylin.common.util.StreamingBatch;
 import org.apache.kylin.engine.streaming.util.StreamingUtils;

http://git-wip-us.apache.org/repos/asf/kylin/blob/daa294b6/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingBatchBuilder.java
----------------------------------------------------------------------
diff --git 
a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingBatchBuilder.java
 
b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingBatchBuilder.java
index 91cdea9..7946438 100644
--- 
a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingBatchBuilder.java
+++ 
b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingBatchBuilder.java
@@ -35,7 +35,7 @@ package org.apache.kylin.engine.streaming;
 
 import java.util.Map;
 
-import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
+import org.apache.kylin.measure.hllc.HyperLogLogPlusCounter;
 import org.apache.kylin.common.util.Dictionary;
 import org.apache.kylin.common.util.StreamingBatch;
 import org.apache.kylin.cube.inmemcubing.ICuboidWriter;

http://git-wip-us.apache.org/repos/asf/kylin/blob/daa294b6/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cube/StreamingCubeBuilder.java
----------------------------------------------------------------------
diff --git 
a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cube/StreamingCubeBuilder.java
 
b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cube/StreamingCubeBuilder.java
index 044dcca..d7056cf 100644
--- 
a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cube/StreamingCubeBuilder.java
+++ 
b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cube/StreamingCubeBuilder.java
@@ -45,7 +45,7 @@ import java.util.concurrent.LinkedBlockingQueue;
 import javax.annotation.Nullable;
 
 import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
+import org.apache.kylin.measure.hllc.HyperLogLogPlusCounter;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;

http://git-wip-us.apache.org/repos/asf/kylin/blob/daa294b6/invertedindex/src/main/java/org/apache/kylin/invertedindex/measure/FixedHLLCodec.java
----------------------------------------------------------------------
diff --git 
a/invertedindex/src/main/java/org/apache/kylin/invertedindex/measure/FixedHLLCodec.java
 
b/invertedindex/src/main/java/org/apache/kylin/invertedindex/measure/FixedHLLCodec.java
index 8671f43..eb213b8 100644
--- 
a/invertedindex/src/main/java/org/apache/kylin/invertedindex/measure/FixedHLLCodec.java
+++ 
b/invertedindex/src/main/java/org/apache/kylin/invertedindex/measure/FixedHLLCodec.java
@@ -19,7 +19,7 @@ package org.apache.kylin.invertedindex.measure;
 
 import java.nio.ByteBuffer;
 
-import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
+import org.apache.kylin.measure.hllc.HyperLogLogPlusCounter;
 import org.apache.kylin.metadata.datatype.DataType;
 
 /**

http://git-wip-us.apache.org/repos/asf/kylin/blob/daa294b6/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityMapper.java
----------------------------------------------------------------------
diff --git 
a/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityMapper.java
 
b/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityMapper.java
index 885656d..cbfd8c3 100644
--- 
a/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityMapper.java
+++ 
b/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityMapper.java
@@ -28,7 +28,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
+import org.apache.kylin.measure.hllc.HyperLogLogPlusCounter;
 import org.apache.kylin.common.util.Bytes;
 import org.apache.kylin.cube.kv.RowConstants;
 import org.apache.kylin.engine.mr.IMRInput.IMRTableInputFormat;

http://git-wip-us.apache.org/repos/asf/kylin/blob/daa294b6/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityReducer.java
----------------------------------------------------------------------
diff --git 
a/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityReducer.java
 
b/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityReducer.java
index 988187d..19f5759 100644
--- 
a/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityReducer.java
+++ 
b/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityReducer.java
@@ -30,7 +30,7 @@ import java.util.Map;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.LongWritable;
-import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
+import org.apache.kylin.measure.hllc.HyperLogLogPlusCounter;
 import org.apache.kylin.cube.kv.RowConstants;
 import org.apache.kylin.engine.mr.KylinReducer;
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/daa294b6/source-hive/src/test/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityReducerTest.java
----------------------------------------------------------------------
diff --git 
a/source-hive/src/test/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityReducerTest.java
 
b/source-hive/src/test/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityReducerTest.java
index 5e8788a..57721d6 100644
--- 
a/source-hive/src/test/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityReducerTest.java
+++ 
b/source-hive/src/test/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityReducerTest.java
@@ -33,11 +33,9 @@ import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.mrunit.mapreduce.ReduceDriver;
 import org.apache.hadoop.mrunit.types.Pair;
-import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
+import org.apache.kylin.measure.hllc.HyperLogLogPlusCounter;
 import org.apache.kylin.common.util.Bytes;
 import org.apache.kylin.cube.kv.RowConstants;
-import org.apache.kylin.source.hive.cardinality.ColumnCardinalityMapper;
-import org.apache.kylin.source.hive.cardinality.ColumnCardinalityReducer;
 import org.junit.Before;
 import org.junit.Test;
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/daa294b6/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/EndpointAggregators.java
----------------------------------------------------------------------
diff --git 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/EndpointAggregators.java
 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/EndpointAggregators.java
index 76c6637..fa7d81e 100644
--- 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/EndpointAggregators.java
+++ 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/EndpointAggregators.java
@@ -22,7 +22,7 @@ import java.nio.ByteBuffer;
 import java.util.List;
 
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
+import org.apache.kylin.measure.hllc.HyperLogLogPlusCounter;
 import org.apache.kylin.common.util.BytesSerializer;
 import org.apache.kylin.common.util.BytesUtil;
 import org.apache.kylin.invertedindex.index.RawTableRecord;

http://git-wip-us.apache.org/repos/asf/kylin/blob/daa294b6/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseStreamingOutput.java
----------------------------------------------------------------------
diff --git 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseStreamingOutput.java
 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseStreamingOutput.java
index fdab8eb..770be3c 100644
--- 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseStreamingOutput.java
+++ 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseStreamingOutput.java
@@ -28,7 +28,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
+import org.apache.kylin.measure.hllc.HyperLogLogPlusCounter;
 import org.apache.kylin.common.persistence.ResourceStore;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.cube.inmemcubing.ICuboidWriter;

Reply via email to