Repository: phoenix
Updated Branches:
  refs/heads/4.x-HBase-1.2 3f4e72324 -> 4a81d9277


PHOENIX-418 Support approximate COUNT DISTINCT (Ethan Wang)


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/4a81d927
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/4a81d927
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/4a81d927

Branch: refs/heads/4.x-HBase-1.2
Commit: 4a81d9277ba93dc9d3969ac416ed2215ef73810d
Parents: 3f4e723
Author: James Taylor <jtay...@salesforce.com>
Authored: Mon Aug 28 15:17:37 2017 -0700
Committer: James Taylor <jtay...@salesforce.com>
Committed: Mon Aug 28 15:22:04 2017 -0700

----------------------------------------------------------------------
 dev/release_files/NOTICE                        |   8 +
 phoenix-core/pom.xml                            |   5 +
 .../CountDistinctApproximateHyperLogLogIT.java  | 154 +++++++++++++++
 .../phoenix/expression/ExpressionType.java      |   4 +-
 ...stinctCountHyperLogLogAggregateFunction.java | 192 +++++++++++++++++++
 ...tinctCountHyperLogLogAggregateParseNode.java |  39 ++++
 6 files changed, 401 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/4a81d927/dev/release_files/NOTICE
----------------------------------------------------------------------
diff --git a/dev/release_files/NOTICE b/dev/release_files/NOTICE
index eed1edc..b7b40c0 100644
--- a/dev/release_files/NOTICE
+++ b/dev/release_files/NOTICE
@@ -277,3 +277,11 @@ jcip-annotations
 findbugs-annotations
 
   Copyright Stephen Connolly.
+
+stream-lib
+
+  Copyright 2016 AddThis.
+  This product includes software developed by AddThis.
+  This product also includes code adapted from:
+    - software copyright (c) 2014 The Apache Software Foundation., 
http://lucene.apache.org/solr/
+    - software copyright (c) 2014 The Apache Software Foundation., 
http://mahout.apache.org/

http://git-wip-us.apache.org/repos/asf/phoenix/blob/4a81d927/phoenix-core/pom.xml
----------------------------------------------------------------------
diff --git a/phoenix-core/pom.xml b/phoenix-core/pom.xml
index 275b72f..ce44171 100644
--- a/phoenix-core/pom.xml
+++ b/phoenix-core/pom.xml
@@ -470,5 +470,10 @@
         <artifactId>joni</artifactId>
         <version>${joni.version}</version>
     </dependency>
+    <dependency>
+      <groupId>com.clearspring.analytics</groupId>
+      <artifactId>stream</artifactId>
+      <version>2.9.5</version>
+    </dependency>
   </dependencies>
 </project>

http://git-wip-us.apache.org/repos/asf/phoenix/blob/4a81d927/phoenix-core/src/it/java/org/apache/phoenix/end2end/CountDistinctApproximateHyperLogLogIT.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CountDistinctApproximateHyperLogLogIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CountDistinctApproximateHyperLogLogIT.java
new file mode 100644
index 0000000..3de1509
--- /dev/null
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CountDistinctApproximateHyperLogLogIT.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end;
+
+import org.apache.phoenix.schema.ColumnNotFoundException;
+import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.QueryUtil;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.sql.*;
+import java.util.Properties;
+
+import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
+import static org.junit.Assert.*;
+
+public class CountDistinctApproximateHyperLogLogIT extends 
ParallelStatsDisabledIT {
+       private String tableName;
+
+       @Before
+       public void generateTableNames() {
+               tableName = "T_" + generateUniqueName();
+       }
+
+       @Test(expected = ColumnNotFoundException.class)
+       public void testDistinctCountException() throws Exception {
+               String query = "SELECT APPROX_COUNT_DISTINCT(x) FROM " + 
tableName;
+               Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+               
+               try (Connection conn = DriverManager.getConnection(getUrl(), 
props);
+                               PreparedStatement statement = 
conn.prepareStatement(query);) {
+                       prepareTableWithValues(conn, 100);
+                       ResultSet rs = statement.executeQuery();
+               }
+       }
+
+       @Test
+       public void testDistinctCountOnConstant() throws Exception {
+               String query = "SELECT APPROX_COUNT_DISTINCT(20) FROM " + 
tableName;
+               Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+               
+               try (Connection conn = DriverManager.getConnection(getUrl(), 
props);
+                               PreparedStatement statement = 
conn.prepareStatement(query);) {
+                       prepareTableWithValues(conn, 100);
+                       ResultSet rs = statement.executeQuery();
+
+                       assertTrue(rs.next());
+                       assertEquals(1, rs.getLong(1));
+                       assertFalse(rs.next());
+               }
+       }
+
+       @Test
+       public void testDistinctCountOnSingleColumn() throws Exception {
+               String query = "SELECT APPROX_COUNT_DISTINCT(i2) FROM " + 
tableName;
+               Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+
+               try (Connection conn = DriverManager.getConnection(getUrl(), 
props);
+                               PreparedStatement statement = 
conn.prepareStatement(query);) {
+                       prepareTableWithValues(conn, 100);
+                       ResultSet rs = statement.executeQuery();
+
+                       assertTrue(rs.next());
+                       assertEquals(10, rs.getLong(1));
+                       assertFalse(rs.next());
+               }
+       }
+
+       @Test
+       public void testDistinctCountOnMutlipleColumns() throws Exception {
+               String query = "SELECT APPROX_COUNT_DISTINCT(i1||i2) FROM " + 
tableName;
+               Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+
+               try (Connection conn = DriverManager.getConnection(getUrl(), 
props);
+                               PreparedStatement statement = 
conn.prepareStatement(query);) {
+                       prepareTableWithValues(conn, 100);
+
+                       ResultSet rs = statement.executeQuery();
+
+                       assertTrue(rs.next());
+                       assertEquals(100, rs.getLong(1));
+                       assertFalse(rs.next());
+               }
+       }
+
+       @Test
+       public void testDistinctCountOnjoining() throws Exception {
+               String query = "SELECT APPROX_COUNT_DISTINCT(a.i1||a.i2||b.i2) 
FROM " + tableName + " a, " + tableName
+                               + " b where a.i1=b.i1 and a.i2 = b.i2";
+               Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+               
+               try(Connection conn = DriverManager.getConnection(getUrl(), 
props);
+                       PreparedStatement statement = 
conn.prepareStatement(query);) {
+                       prepareTableWithValues(conn, 100);
+                       ResultSet rs = statement.executeQuery();
+
+                       assertTrue(rs.next());
+                       assertEquals(100, rs.getLong(1));
+                       assertFalse(rs.next());
+               }
+       }
+
+       @Test
+       public void testDistinctCountPlanExlain() throws Exception {
+               Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+               String query = "explain SELECT APPROX_COUNT_DISTINCT(i1||i2) 
FROM " + tableName;
+               
+               try(Connection conn = DriverManager.getConnection(getUrl(), 
props);
+                       PreparedStatement statement = 
conn.prepareStatement(query);) {
+                       prepareTableWithValues(conn, 100);
+                       ResultSet rs = statement.executeQuery();
+
+                       assertEquals("CLIENT PARALLEL 1-WAY FULL SCAN OVER " + 
tableName + "\n"
+                                       + "    SERVER FILTER BY FIRST KEY 
ONLY\n" + "    SERVER AGGREGATE INTO SINGLE ROW",
+                                       QueryUtil.getExplainPlan(rs));
+               }
+       }
+
+       /**
+        * Prepare tables with stats updated. format of first table such as i1, 
i2
+        * 1, 10 2, 20 3, 30 ...
+        * 
+        * @param conn
+        * @param nRows
+        * @throws Exception
+        */
+       final private void prepareTableWithValues(final Connection conn, final 
int nRows) throws Exception {
+               conn.createStatement().execute("create table " + tableName + 
"\n"
+                               + "   (i1 integer not null, i2 integer not 
null\n" + "    CONSTRAINT pk PRIMARY KEY (i1,i2))");
+
+               final PreparedStatement stmt = conn.prepareStatement("upsert 
into " + tableName + " VALUES (?, ?)");
+               for (int i = 0; i < nRows; i++) {
+                       stmt.setInt(1, i);
+                       stmt.setInt(2, (i * 10) % 100);
+                       stmt.execute();
+               }
+               conn.commit();
+       }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/4a81d927/phoenix-core/src/main/java/org/apache/phoenix/expression/ExpressionType.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/expression/ExpressionType.java 
b/phoenix-core/src/main/java/org/apache/phoenix/expression/ExpressionType.java
index b8f68da..4f26e87 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/expression/ExpressionType.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/expression/ExpressionType.java
@@ -130,6 +130,7 @@ import org.apache.phoenix.expression.function.WeekFunction;
 import org.apache.phoenix.expression.function.YearFunction;
 import org.apache.phoenix.expression.function.DayOfWeekFunction;
 import org.apache.phoenix.expression.function.DayOfYearFunction;
+import 
org.apache.phoenix.expression.function.DistinctCountHyperLogLogAggregateFunction;
 
 import com.google.common.collect.Maps;
 
@@ -292,7 +293,8 @@ public enum ExpressionType {
     DefaultValueExpression(DefaultValueExpression.class),
     ArrayColumnExpression(SingleCellColumnExpression.class),
     FirstValuesFunction(FirstValuesFunction.class),
-    LastValuesFunction(LastValuesFunction.class);
+    LastValuesFunction(LastValuesFunction.class),
+    
DistinctCountHyperLogLogAggregateFunction(DistinctCountHyperLogLogAggregateFunction.class);
 
     ExpressionType(Class<? extends Expression> clazz) {
         this.clazz = clazz;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/4a81d927/phoenix-core/src/main/java/org/apache/phoenix/expression/function/DistinctCountHyperLogLogAggregateFunction.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/expression/function/DistinctCountHyperLogLogAggregateFunction.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/expression/function/DistinctCountHyperLogLogAggregateFunction.java
new file mode 100644
index 0000000..7e18afd
--- /dev/null
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/expression/function/DistinctCountHyperLogLogAggregateFunction.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.expression.function;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.expression.aggregator.Aggregator;
+import org.apache.phoenix.expression.aggregator.BaseAggregator;
+import org.apache.phoenix.expression.aggregator.DistinctCountClientAggregator;
+import org.apache.phoenix.parse.FunctionParseNode.Argument;
+import org.apache.phoenix.parse.FunctionParseNode.BuiltInFunction;
+import org.apache.phoenix.parse.DistinctCountHyperLogLogAggregateParseNode;
+import org.apache.phoenix.schema.types.PLong;
+import org.apache.phoenix.schema.types.PVarbinary;
+import org.apache.phoenix.util.ByteUtil;
+
+import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus;
+
+import org.apache.phoenix.schema.SortOrder;
+import org.apache.phoenix.schema.types.PDataType;
+import org.apache.phoenix.schema.tuple.Tuple;
+
+/**
+ * Built-in function for Distinct Count Aggregation 
+ * function in approximation. 
+ * This aggregator is implemented using HyperLogLog.
+ * Please refer to PHOENIX-418
+ * https://issues.apache.org/jira/browse/PHOENIX-418
+ * 
+ * 
+ * 1, Accuracy input is not a customizeable. In HyperLogLog
+ * accuracy is propertional to 1/sqrt(m), m is the size of
+ * the hll hash. Also, this process is irrelavent to runtime
+ * or space complexity.
+ * 
+ * 2, The two parameters that requires during HLL initialization. 
+ * i.e., the precision value for the normal set and the precision 
+ * value for the sparse set, is hard coded as static final 
+ * variable. Any change of them requires re-deployment of the 
+ * phoenix server coprocessors.
+ * 
+ */
+@BuiltInFunction(name=DistinctCountHyperLogLogAggregateFunction.NAME, 
nodeClass=DistinctCountHyperLogLogAggregateParseNode.class, args= {@Argument()} 
)
+public class DistinctCountHyperLogLogAggregateFunction extends 
DistinctCountAggregateFunction {
+    public static final String NAME = "APPROX_COUNT_DISTINCT";
+    public static final int NormalSetPrecision = 16;
+    public static final int SparseSetPrecision = 25;
+    
+    public DistinctCountHyperLogLogAggregateFunction() {
+    }
+    
+    public DistinctCountHyperLogLogAggregateFunction(List<Expression> 
childExpressions){
+        super(childExpressions, null);
+    }
+    
+    public DistinctCountHyperLogLogAggregateFunction(List<Expression> 
childExpressions, CountAggregateFunction delegate){
+        super(childExpressions, delegate);
+    }
+
+    @Override
+    public DistinctCountClientAggregator newClientAggregator() {
+       return new HyperLogLogClientAggregator(SortOrder.getDefault());
+    }
+    
+    @Override
+    public Aggregator newServerAggregator(Configuration conf) {
+        final Expression child = getAggregatorExpression();
+        return new HyperLogLogServerAggregator(child.getSortOrder()){
+                       @Override
+                       protected PDataType getInputDataType() {
+                               return child.getDataType();
+                       }
+        };
+    }
+    
+    @Override
+    public Aggregator newServerAggregator(Configuration conf, 
ImmutableBytesWritable ptr) {
+        final Expression child = getAggregatorExpression();
+        return new HyperLogLogServerAggregator(child.getSortOrder(), ptr) {
+          @Override
+          protected PDataType getInputDataType() {
+            return child.getDataType();
+          }
+        };
+    }
+   
+    @Override
+    public String getName() {
+        return NAME;
+    }
+}
+
+
+/**
+* ClientSide HyperLogLogAggregator
+* It will be called when server side aggregator has finished
+* Method aggregate is called for every new server aggregator returned
+* Method evaluate is called when the aggregate is done.
+* the return of evaluate will be send back to user as 
+* counted result of expression.evaluate
+*/
+class HyperLogLogClientAggregator extends DistinctCountClientAggregator{
+       private HyperLogLogPlus hll = new 
HyperLogLogPlus(DistinctCountHyperLogLogAggregateFunction.NormalSetPrecision, 
DistinctCountHyperLogLogAggregateFunction.SparseSetPrecision);
+
+       public HyperLogLogClientAggregator(SortOrder sortOrder) {
+               super(sortOrder);
+       }
+       
+       @Override
+       public void aggregate(Tuple tuple, ImmutableBytesWritable ptr) {
+               try {
+                       
hll.addAll(HyperLogLogPlus.Builder.build(ByteUtil.copyKeyBytesIfNecessary(ptr)));
+               } catch (Exception e) {
+                       throw new RuntimeException(e);
+               }
+       }
+
+       @Override
+       public boolean evaluate(Tuple tuple, ImmutableBytesWritable ptr) {      
+               byte[] buffer = new byte[PLong.INSTANCE.getByteSize()];
+               PLong.INSTANCE.getCodec().encodeLong(hll.cardinality(), buffer, 
0);
+               ptr.set(buffer);
+               return true;
+       }
+}
+
+
+/**
+ * ServerSide HyperLogLogAggregator
+ * It will be serialized and dispatched to region server
+ * Method aggregate is called for every new row scanned
+ * Method evaluate is called when this remote scan is over.
+ * the return of evaluate will be send back to ClientSideAggregator.aggregate 
+ */
+abstract class HyperLogLogServerAggregator extends BaseAggregator{
+       private HyperLogLogPlus hll = new 
HyperLogLogPlus(DistinctCountHyperLogLogAggregateFunction.NormalSetPrecision, 
DistinctCountHyperLogLogAggregateFunction.SparseSetPrecision);
+       protected final ImmutableBytesWritable valueByteArray = new 
ImmutableBytesWritable(ByteUtil.EMPTY_BYTE_ARRAY);
+
+       public HyperLogLogServerAggregator(SortOrder sortOrder) {
+               super(sortOrder);
+       }
+       
+       public HyperLogLogServerAggregator(SortOrder sortOrder, 
ImmutableBytesWritable ptr) {
+               this(sortOrder);
+               if(ptr !=null){
+                       hll.offer(ptr);
+               }
+       }
+
+       @Override
+       public void aggregate(Tuple tuple, ImmutableBytesWritable ptr) {
+               hll.offer(ptr);
+       }
+
+       @Override
+       public boolean evaluate(Tuple tuple, ImmutableBytesWritable ptr) {      
+               try {
+                       valueByteArray.set(hll.getBytes(), 0, 
hll.getBytes().length);
+                       
ptr.set(ByteUtil.copyKeyBytesIfNecessary(valueByteArray));
+               } catch (IOException e) {
+                       throw new RuntimeException(e);
+               }
+               return true;
+       }
+
+       @Override
+       public final PDataType getDataType() {
+               return PVarbinary.INSTANCE;
+       }
+       
+       abstract protected PDataType getInputDataType();
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/4a81d927/phoenix-core/src/main/java/org/apache/phoenix/parse/DistinctCountHyperLogLogAggregateParseNode.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/parse/DistinctCountHyperLogLogAggregateParseNode.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/parse/DistinctCountHyperLogLogAggregateParseNode.java
new file mode 100644
index 0000000..2fa6e10
--- /dev/null
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/parse/DistinctCountHyperLogLogAggregateParseNode.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.parse;
+
+import java.sql.SQLException;
+import java.util.List;
+
+import org.apache.phoenix.compile.StatementContext;
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.expression.function.FunctionExpression;
+import 
org.apache.phoenix.expression.function.DistinctCountHyperLogLogAggregateFunction;
+import org.apache.phoenix.expression.function.SumAggregateFunction;
+
+
+public class DistinctCountHyperLogLogAggregateParseNode extends 
DelegateConstantToCountParseNode {
+    public DistinctCountHyperLogLogAggregateParseNode(String name, 
List<ParseNode> children, BuiltInFunctionInfo info) {
+        super(name, children, info);
+    }
+    
+    @Override
+    public FunctionExpression create(List<Expression> children, 
StatementContext context) throws SQLException {
+        return new DistinctCountHyperLogLogAggregateFunction(children, 
getDelegateFunction(children,context));
+    }
+}

Reply via email to