This is an automated email from the ASF dual-hosted git repository.
cgivre pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git
The following commit(s) were added to refs/heads/master by this push:
new 384fd8122c DRILL-8434: Add Median Function (#2803)
384fd8122c is described below
commit 384fd8122c045ced378308d54fc41107cbea2b5e
Author: Charles S. Givre <[email protected]>
AuthorDate: Fri May 19 09:46:09 2023 -0400
DRILL-8434: Add Median Function (#2803)
---
.../drill/exec/udfs/DistributionFunctions.java | 2 +-
contrib/udfs/src/test/resources/test_data.csvh | 10 +-
exec/java-exec/src/main/codegen/config.fmpp | 1 +
.../java-exec/src/main/codegen/data/AggrTypes4.tdd | 33 +++++
.../main/codegen/templates/MedianFunctions.java | 106 ++++++++++++++++
.../exec/expr/fn/impl/StreamingMedianHelpers.java | 133 +++++++++++++++++++++
.../drill/exec/fn/impl/TestAggregateFunctions.java | 42 +++++++
exec/java-exec/src/test/resources/orgs.csvh | 11 ++
exec/java-exec/src/test/resources/test_data.csvh | 5 +
9 files changed, 337 insertions(+), 6 deletions(-)
diff --git
a/contrib/udfs/src/main/java/org/apache/drill/exec/udfs/DistributionFunctions.java
b/contrib/udfs/src/main/java/org/apache/drill/exec/udfs/DistributionFunctions.java
index fd18fd4117..60126c463b 100644
---
a/contrib/udfs/src/main/java/org/apache/drill/exec/udfs/DistributionFunctions.java
+++
b/contrib/udfs/src/main/java/org/apache/drill/exec/udfs/DistributionFunctions.java
@@ -29,6 +29,7 @@ import org.apache.drill.exec.expr.annotations.Workspace;
import org.apache.drill.exec.expr.holders.Float8Holder;
import org.apache.drill.exec.expr.holders.IntHolder;
+@SuppressWarnings("unused")
public class DistributionFunctions {
@FunctionTemplate(names = {"width_bucket", "widthBucket"},
@@ -150,7 +151,6 @@ public class DistributionFunctions {
tau.value = result;
}
}
-
@FunctionTemplate(names = {"regr_slope", "regrSlope"},
scope = FunctionScope.POINT_AGGREGATE,
nulls = NullHandling.INTERNAL)
diff --git a/contrib/udfs/src/test/resources/test_data.csvh
b/contrib/udfs/src/test/resources/test_data.csvh
index adfcc9764b..d8271c93de 100644
--- a/contrib/udfs/src/test/resources/test_data.csvh
+++ b/contrib/udfs/src/test/resources/test_data.csvh
@@ -1,5 +1,5 @@
-col1,col2
-2,25
-3,32
-4,49
-5,32
\ No newline at end of file
+col1,col2,col3,col4
+2,25,2.0,0
+3,32,3.0,0
+4,49,4.5,
+5,32,9,9
\ No newline at end of file
diff --git a/exec/java-exec/src/main/codegen/config.fmpp
b/exec/java-exec/src/main/codegen/config.fmpp
index c6f86f3392..9c3567c82e 100644
--- a/exec/java-exec/src/main/codegen/config.fmpp
+++ b/exec/java-exec/src/main/codegen/config.fmpp
@@ -36,6 +36,7 @@ data: {
decimalaggrtypes3: tdd(../data/DecimalAggrTypes3.tdd),
aggrtypes2: tdd(../data/AggrTypes2.tdd),
aggrtypes3: tdd(../data/AggrTypes3.tdd),
+ aggrtypes4: tdd(../data/AggrTypes4.tdd),
covarTypes: tdd(../data/CovarTypes.tdd),
corrTypes: tdd(../data/CorrelationTypes.tdd),
logicalTypes: tdd(../data/AggrBitwiseLogicalTypes.tdd),
diff --git a/exec/java-exec/src/main/codegen/data/AggrTypes4.tdd
b/exec/java-exec/src/main/codegen/data/AggrTypes4.tdd
new file mode 100644
index 0000000000..cc9cc90471
--- /dev/null
+++ b/exec/java-exec/src/main/codegen/data/AggrTypes4.tdd
@@ -0,0 +1,33 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+{
+ aggrtypes: [
+ {className: "Median", funcName: "median", aliasName: "", types: [
+ {inputType: "Int", outputType: "NullableBigInt", major: "Numeric", java:
"Long", medianHelper: "StreamingIntMedianHelper"},
+ {inputType: "BigInt", outputType: "NullableBigInt", major: "Numeric",
java: "Long", medianHelper: "StreamingIntMedianHelper"},
+ {inputType: "NullableInt", outputType: "NullableBigInt", major:
"Numeric", java: "Long", medianHelper: "StreamingIntMedianHelper"},
+ {inputType: "NullableBigInt", outputType: "NullableBigInt", major:
"Numeric", java: "Long", medianHelper: "StreamingIntMedianHelper"},
+ {inputType: "Float4", outputType: "NullableFloat8", major: "Numeric",
java: "Double", medianHelper: "StreamingDoubleMedianHelper"},
+ {inputType: "Float8", outputType: "NullableFloat8", major: "Numeric",
java: "Double", medianHelper: "StreamingDoubleMedianHelper"},
+ {inputType: "NullableFloat4", outputType: "NullableFloat8", major:
"Numeric", java: "Double", medianHelper: "StreamingDoubleMedianHelper"},
+ {inputType: "NullableFloat8", outputType: "NullableFloat8", major:
"Numeric", java: "Double", medianHelper: "StreamingDoubleMedianHelper"},
+ ]
+ }
+ ]
+}
\ No newline at end of file
diff --git a/exec/java-exec/src/main/codegen/templates/MedianFunctions.java
b/exec/java-exec/src/main/codegen/templates/MedianFunctions.java
new file mode 100644
index 0000000000..fec9b9fe4d
--- /dev/null
+++ b/exec/java-exec/src/main/codegen/templates/MedianFunctions.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+<@pp.dropOutputFile />
+
+
+
+<#list aggrtypes4.aggrtypes as aggrtype>
+<@pp.changeOutputFile
name="/org/apache/drill/exec/expr/fn/impl/gaggr/${aggrtype.className}Functions.java"
/>
+
+<#include "/@includes/license.ftl" />
+
+/*
+ * This class is generated using freemarker and the ${.template_name} template.
+ */
+
+<#-- A utility class that is used to generate java code for median functions
-->
+
+package org.apache.drill.exec.expr.fn.impl.gaggr;
+
+import org.apache.drill.exec.expr.DrillAggFunc;
+import org.apache.drill.exec.expr.annotations.FunctionTemplate;
+import org.apache.drill.exec.expr.annotations.FunctionTemplate.NullHandling;
+import org.apache.drill.exec.expr.annotations.FunctionTemplate.FunctionScope;
+import org.apache.drill.exec.expr.annotations.Output;
+import org.apache.drill.exec.expr.annotations.Param;
+import org.apache.drill.exec.expr.annotations.Workspace;
+import org.apache.drill.exec.expr.holders.*;
+import
org.apache.drill.exec.expr.fn.impl.StreamingMedianHelpers.StreamingMedianHelper;
+import
org.apache.drill.exec.expr.fn.impl.StreamingMedianHelpers.StreamingIntMedianHelper;
+import
org.apache.drill.exec.expr.fn.impl.StreamingMedianHelpers.StreamingDoubleMedianHelper;
+
+@SuppressWarnings("unused")
+
+public class ${aggrtype.className}Functions {
+ static final org.slf4j.Logger logger =
org.slf4j.LoggerFactory.getLogger(${aggrtype.className}Functions.class);
+
+<#list aggrtype.types as type>
+
+<#if aggrtype.aliasName == "">
+ @FunctionTemplate(name = "${aggrtype.funcName}", scope =
FunctionTemplate.FunctionScope.POINT_AGGREGATE)
+<#else>
+ @FunctionTemplate(names = {"${aggrtype.funcName}", "${aggrtype.aliasName}"},
scope = FunctionTemplate.FunctionScope.POINT_AGGREGATE)
+</#if>
+
+ public static class ${type.inputType}${aggrtype.className} implements
DrillAggFunc {
+
+ @Param ${type.inputType}Holder input;
+ @Output ${type.outputType}Holder median;
+ @Workspace ObjectHolder utils;
+
+ public void setup() {
+ // Initialize the ObjectHolder
+ utils = new ObjectHolder();
+ utils.obj = new
org.apache.drill.exec.expr.fn.impl.StreamingMedianHelpers.${type.medianHelper}();
+ }
+
+ @Override
+ public void add() {
+
org.apache.drill.exec.expr.fn.impl.StreamingMedianHelpers.${type.medianHelper}
medianHelper =
(org.apache.drill.exec.expr.fn.impl.StreamingMedianHelpers.${type.medianHelper})
utils.obj;
+ <#if type.inputType?starts_with("Nullable")>
+ sout: {
+ if (input.isSet == 0) {
+ // processing nullable input and the value is null, so don't do
anything...
+ break sout;
+ }
+ </#if>
+ medianHelper.addNextNumber(input.value);
+
+ <#if type.inputType?starts_with("Nullable")>
+ } // end of sout block
+ </#if>
+ }
+
+ @Override
+ public void output() {
+
org.apache.drill.exec.expr.fn.impl.StreamingMedianHelpers.${type.medianHelper}
medianHelper =
(org.apache.drill.exec.expr.fn.impl.StreamingMedianHelpers.${type.medianHelper})
utils.obj;
+ median.value = medianHelper.getMedian();
+ median.isSet = 1;
+ }
+
+ @Override
+ public void reset() {
+
org.apache.drill.exec.expr.fn.impl.StreamingMedianHelpers.StreamingMedianHelper
medianHelper =
(org.apache.drill.exec.expr.fn.impl.StreamingMedianHelpers.${type.medianHelper})
utils.obj;
+ medianHelper.reset();
+ }
+ }
+
+
+</#list>
+}
+</#list>
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/StreamingMedianHelpers.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/StreamingMedianHelpers.java
new file mode 100644
index 0000000000..d6c2739b7d
--- /dev/null
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/StreamingMedianHelpers.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.expr.fn.impl;
+
+import java.util.Comparator;
+import java.util.PriorityQueue;
+
+/**
+ * This class implements a heap-based streaming median.
+ *
+ *<p>
+ * Reference: <a
href="https://www.baeldung.com/java-stream-integers-median-using-heap">Stream
Integers Median using Heap</a>
+ * </p>
+ */
+
+public class StreamingMedianHelpers {
+
+ public interface StreamingMedianHelper {
+ void reset();
+ }
+
+ public static class StreamingIntMedianHelper implements
StreamingMedianHelper {
+ private final PriorityQueue<Long> minHeap;
+ private final PriorityQueue<Long> maxHeap;
+
+ public StreamingIntMedianHelper() {
+ super();
+ this.minHeap = new PriorityQueue<>();
+ this.maxHeap = new PriorityQueue<>(Comparator.reverseOrder());
+ }
+
+ public void addNextNumber(Long n) {
+ if (!minHeap.isEmpty() && n < minHeap.peek()) {
+ maxHeap.offer(n);
+ if (maxHeap.size() > minHeap.size() + 1) {
+ minHeap.offer(maxHeap.poll());
+ }
+ } else {
+ minHeap.offer(n);
+ if (minHeap.size() > maxHeap.size() + 1) {
+ maxHeap.offer(minHeap.poll());
+ }
+ }
+ }
+
+ public void addNextNumber(int n) {
+ addNextNumber((long) n);
+ }
+
+ public Long getMedian() {
+ Long median;
+ if (minHeap.size() < maxHeap.size()) {
+ median = maxHeap.peek();
+ } else if (minHeap.size() > maxHeap.size()) {
+ median = minHeap.peek();
+ } else if (minHeap.isEmpty() && maxHeap.isEmpty()) {
+ median = 0L;
+ } else {
+ median = (minHeap.peek() + maxHeap.peek()) / 2;
+ }
+ return median;
+ }
+
+ public void reset() {
+ minHeap.clear();
+ maxHeap.clear();
+ }
+ }
+
+ public static class StreamingDoubleMedianHelper implements
StreamingMedianHelper {
+ private final PriorityQueue<Double> minHeap;
+ private final PriorityQueue<Double> maxHeap;
+
+ public StreamingDoubleMedianHelper() {
+ super();
+ this.minHeap = new PriorityQueue<>();
+ this.maxHeap = new PriorityQueue<>(Comparator.reverseOrder());
+ }
+
+ public void addNextNumber(Double n) {
+ if (!minHeap.isEmpty() && n < minHeap.peek()) {
+ maxHeap.offer(n);
+ if (maxHeap.size() > minHeap.size() + 1) {
+ minHeap.offer(maxHeap.poll());
+ }
+ } else {
+ minHeap.offer(n);
+ if (minHeap.size() > maxHeap.size() + 1) {
+ maxHeap.offer(minHeap.poll());
+ }
+ }
+ }
+
+ public void addNextNumber(float n) {
+ addNextNumber((double) n);
+ }
+
+ public Double getMedian() {
+ Double median;
+ if (minHeap.size() < maxHeap.size()) {
+ median = maxHeap.peek();
+ } else if (minHeap.size() > maxHeap.size()) {
+ median = minHeap.peek();
+ } else if (minHeap.isEmpty() && maxHeap.isEmpty()) {
+ median = 0.0;
+ } else {
+ median = (minHeap.peek() + maxHeap.peek()) / 2;
+ }
+ return median;
+ }
+
+ public void reset() {
+ minHeap.clear();
+ maxHeap.clear();
+ }
+ }
+}
diff --git
a/exec/java-exec/src/test/java/org/apache/drill/exec/fn/impl/TestAggregateFunctions.java
b/exec/java-exec/src/test/java/org/apache/drill/exec/fn/impl/TestAggregateFunctions.java
index 804aea35d1..3477d4122b 100644
---
a/exec/java-exec/src/test/java/org/apache/drill/exec/fn/impl/TestAggregateFunctions.java
+++
b/exec/java-exec/src/test/java/org/apache/drill/exec/fn/impl/TestAggregateFunctions.java
@@ -1289,4 +1289,46 @@ public class TestAggregateFunctions extends ClusterTest {
.baselineValues("Partial High School", 3555.8064516129034,
101.14516129032258, 3469.7014925373132, 103.3731343283582)
.go();
}
+
+ @Test
+ public void testMedian() throws Exception {
+
+ // Test as String from CSV and aggregate
+ String query = "SELECT category, median(`value`) AS med FROM
cp.`orgs.csvh` GROUP BY category";
+ testBuilder()
+ .sqlQuery(query)
+ .unOrdered()
+ .baselineColumns("category", "med")
+ .baselineValues("cat1", 5L)
+ .baselineValues("cat2", 5L)
+ .go();
+
+ query = "SELECT category, median(CAST(`value` AS DOUBLE)) AS med FROM
cp.`orgs.csvh` GROUP BY category";
+ testBuilder()
+ .sqlQuery(query)
+ .unOrdered()
+ .baselineColumns("category", "med")
+ .baselineValues("cat1", 5.0)
+ .baselineValues("cat2", 5.0)
+ .go();
+
+ }
+
+ @Test
+ public void testMedianWithPivot() throws Exception {
+ String query = "SELECT * FROM (\n" +
+ "SELECT education_level, salary, marital_status, extract(year from
age('2023-02-23', birth_date)) age\n" +
+ "FROM cp.`employee.json`)\n" +
+ "PIVOT (median(salary) med_salary, median(age) med_age FOR
marital_status IN ('M' married, 'S' single))";
+ testBuilder()
+ .sqlQuery(query)
+ .unOrdered()
+ .baselineColumns("education_level", "married_med_salary",
"married_med_age", "single_med_salary", "single_med_age")
+ .baselineValues("Graduate Degree", 4400.0, 109L, 20.0, 109L)
+ .baselineValues("Bachelors Degree", 4400.0,109L, 4400.0, 109L)
+ .baselineValues("Partial College", 4500.0, 109L, 4400.0, 109L)
+ .baselineValues("High School Degree", 3700.0, 109L, 4400.0, 109L)
+ .baselineValues("Partial High School", 4400.0, 109L, 20.0, 109L)
+ .go();
+ }
}
diff --git a/exec/java-exec/src/test/resources/orgs.csvh
b/exec/java-exec/src/test/resources/orgs.csvh
new file mode 100644
index 0000000000..0ca6b074a3
--- /dev/null
+++ b/exec/java-exec/src/test/resources/orgs.csvh
@@ -0,0 +1,11 @@
+category,value
+cat1,5
+cat1,3
+cat1,2
+cat1,7
+cat1,10
+cat2,1
+cat2,5
+cat2,9
+cat2,2
+cat2,11
\ No newline at end of file
diff --git a/exec/java-exec/src/test/resources/test_data.csvh
b/exec/java-exec/src/test/resources/test_data.csvh
new file mode 100644
index 0000000000..d8271c93de
--- /dev/null
+++ b/exec/java-exec/src/test/resources/test_data.csvh
@@ -0,0 +1,5 @@
+col1,col2,col3,col4
+2,25,2.0,0
+3,32,3.0,0
+4,49,4.5,
+5,32,9,9
\ No newline at end of file