This is an automated email from the ASF dual-hosted git repository.

cgivre pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git


The following commit(s) were added to refs/heads/master by this push:
     new 384fd8122c DRILL-8434: Add Median Function (#2803)
384fd8122c is described below

commit 384fd8122c045ced378308d54fc41107cbea2b5e
Author: Charles S. Givre <[email protected]>
AuthorDate: Fri May 19 09:46:09 2023 -0400

    DRILL-8434: Add Median Function (#2803)
---
 .../drill/exec/udfs/DistributionFunctions.java     |   2 +-
 contrib/udfs/src/test/resources/test_data.csvh     |  10 +-
 exec/java-exec/src/main/codegen/config.fmpp        |   1 +
 .../java-exec/src/main/codegen/data/AggrTypes4.tdd |  33 +++++
 .../main/codegen/templates/MedianFunctions.java    | 106 ++++++++++++++++
 .../exec/expr/fn/impl/StreamingMedianHelpers.java  | 133 +++++++++++++++++++++
 .../drill/exec/fn/impl/TestAggregateFunctions.java |  42 +++++++
 exec/java-exec/src/test/resources/orgs.csvh        |  11 ++
 exec/java-exec/src/test/resources/test_data.csvh   |   5 +
 9 files changed, 337 insertions(+), 6 deletions(-)

diff --git 
a/contrib/udfs/src/main/java/org/apache/drill/exec/udfs/DistributionFunctions.java
 
b/contrib/udfs/src/main/java/org/apache/drill/exec/udfs/DistributionFunctions.java
index fd18fd4117..60126c463b 100644
--- 
a/contrib/udfs/src/main/java/org/apache/drill/exec/udfs/DistributionFunctions.java
+++ 
b/contrib/udfs/src/main/java/org/apache/drill/exec/udfs/DistributionFunctions.java
@@ -29,6 +29,7 @@ import org.apache.drill.exec.expr.annotations.Workspace;
 import org.apache.drill.exec.expr.holders.Float8Holder;
 import org.apache.drill.exec.expr.holders.IntHolder;
 
+@SuppressWarnings("unused")
 public class DistributionFunctions {
 
   @FunctionTemplate(names = {"width_bucket", "widthBucket"},
@@ -150,7 +151,6 @@ public class DistributionFunctions {
       tau.value = result;
     }
   }
-
   @FunctionTemplate(names = {"regr_slope", "regrSlope"},
       scope = FunctionScope.POINT_AGGREGATE,
       nulls = NullHandling.INTERNAL)
diff --git a/contrib/udfs/src/test/resources/test_data.csvh 
b/contrib/udfs/src/test/resources/test_data.csvh
index adfcc9764b..d8271c93de 100644
--- a/contrib/udfs/src/test/resources/test_data.csvh
+++ b/contrib/udfs/src/test/resources/test_data.csvh
@@ -1,5 +1,5 @@
-col1,col2
-2,25
-3,32
-4,49
-5,32
\ No newline at end of file
+col1,col2,col3,col4
+2,25,2.0,0
+3,32,3.0,0
+4,49,4.5,
+5,32,9,9
\ No newline at end of file
diff --git a/exec/java-exec/src/main/codegen/config.fmpp 
b/exec/java-exec/src/main/codegen/config.fmpp
index c6f86f3392..9c3567c82e 100644
--- a/exec/java-exec/src/main/codegen/config.fmpp
+++ b/exec/java-exec/src/main/codegen/config.fmpp
@@ -36,6 +36,7 @@ data: {
     decimalaggrtypes3:        tdd(../data/DecimalAggrTypes3.tdd),
     aggrtypes2:               tdd(../data/AggrTypes2.tdd),
     aggrtypes3:               tdd(../data/AggrTypes3.tdd),
+    aggrtypes4:               tdd(../data/AggrTypes4.tdd),
     covarTypes:               tdd(../data/CovarTypes.tdd),
     corrTypes:                tdd(../data/CorrelationTypes.tdd),
     logicalTypes:             tdd(../data/AggrBitwiseLogicalTypes.tdd),
diff --git a/exec/java-exec/src/main/codegen/data/AggrTypes4.tdd 
b/exec/java-exec/src/main/codegen/data/AggrTypes4.tdd
new file mode 100644
index 0000000000..cc9cc90471
--- /dev/null
+++ b/exec/java-exec/src/main/codegen/data/AggrTypes4.tdd
@@ -0,0 +1,33 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+{
+  aggrtypes: [
+    {className: "Median", funcName: "median",  aliasName: "", types: [
+      {inputType: "Int", outputType: "NullableBigInt", major: "Numeric", java: 
"Long", medianHelper: "StreamingIntMedianHelper"},
+      {inputType: "BigInt", outputType: "NullableBigInt",  major: "Numeric", 
java: "Long", medianHelper: "StreamingIntMedianHelper"},
+      {inputType: "NullableInt", outputType: "NullableBigInt", major: 
"Numeric", java: "Long", medianHelper: "StreamingIntMedianHelper"},
+      {inputType: "NullableBigInt", outputType: "NullableBigInt", major: 
"Numeric", java: "Long", medianHelper: "StreamingIntMedianHelper"},
+      {inputType: "Float4", outputType: "NullableFloat8", major: "Numeric", 
java: "Double", medianHelper: "StreamingDoubleMedianHelper"},
+      {inputType: "Float8", outputType: "NullableFloat8", major: "Numeric", 
java: "Double", medianHelper: "StreamingDoubleMedianHelper"},
+      {inputType: "NullableFloat4", outputType: "NullableFloat8", major: 
"Numeric", java: "Double", medianHelper: "StreamingDoubleMedianHelper"},
+      {inputType: "NullableFloat8", outputType: "NullableFloat8", major: 
"Numeric", java: "Double", medianHelper: "StreamingDoubleMedianHelper"},
+     ]
+   }
+  ]
+}
\ No newline at end of file
diff --git a/exec/java-exec/src/main/codegen/templates/MedianFunctions.java 
b/exec/java-exec/src/main/codegen/templates/MedianFunctions.java
new file mode 100644
index 0000000000..fec9b9fe4d
--- /dev/null
+++ b/exec/java-exec/src/main/codegen/templates/MedianFunctions.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+<@pp.dropOutputFile />
+
+
+
+<#list aggrtypes4.aggrtypes as aggrtype>
+<@pp.changeOutputFile 
name="/org/apache/drill/exec/expr/fn/impl/gaggr/${aggrtype.className}Functions.java"
 />
+
+<#include "/@includes/license.ftl" />
+
+/*
+ * This class is generated using freemarker and the ${.template_name} template.
+ */
+
+<#-- A utility class that is used to generate java code for median functions 
-->
+
+package org.apache.drill.exec.expr.fn.impl.gaggr;
+
+import org.apache.drill.exec.expr.DrillAggFunc;
+import org.apache.drill.exec.expr.annotations.FunctionTemplate;
+import org.apache.drill.exec.expr.annotations.FunctionTemplate.NullHandling;
+import org.apache.drill.exec.expr.annotations.FunctionTemplate.FunctionScope;
+import org.apache.drill.exec.expr.annotations.Output;
+import org.apache.drill.exec.expr.annotations.Param;
+import org.apache.drill.exec.expr.annotations.Workspace;
+import org.apache.drill.exec.expr.holders.*;
+import 
org.apache.drill.exec.expr.fn.impl.StreamingMedianHelpers.StreamingMedianHelper;
+import 
org.apache.drill.exec.expr.fn.impl.StreamingMedianHelpers.StreamingIntMedianHelper;
+import 
org.apache.drill.exec.expr.fn.impl.StreamingMedianHelpers.StreamingDoubleMedianHelper;
+
+@SuppressWarnings("unused")
+
+public class ${aggrtype.className}Functions {
+  static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(${aggrtype.className}Functions.class);
+
+<#list aggrtype.types as type>
+
+<#if aggrtype.aliasName == "">
+  @FunctionTemplate(name = "${aggrtype.funcName}", scope = 
FunctionTemplate.FunctionScope.POINT_AGGREGATE)
+<#else>
+  @FunctionTemplate(names = {"${aggrtype.funcName}", "${aggrtype.aliasName}"}, 
scope = FunctionTemplate.FunctionScope.POINT_AGGREGATE)
+</#if>
+
+  public static class ${type.inputType}${aggrtype.className} implements 
DrillAggFunc {
+
+    @Param ${type.inputType}Holder input;
+    @Output ${type.outputType}Holder median;
+    @Workspace ObjectHolder utils;
+
+    public void setup() {
+      // Initialize the ObjectHolder
+      utils = new ObjectHolder();
+      utils.obj = new 
org.apache.drill.exec.expr.fn.impl.StreamingMedianHelpers.${type.medianHelper}();
+    }
+
+    @Override
+    public void add() {
+      
org.apache.drill.exec.expr.fn.impl.StreamingMedianHelpers.${type.medianHelper} 
medianHelper = 
(org.apache.drill.exec.expr.fn.impl.StreamingMedianHelpers.${type.medianHelper})
 utils.obj;
+      <#if type.inputType?starts_with("Nullable")>
+          sout: {
+      if (input.isSet == 0) {
+        // processing nullable input and the value is null, so don't do 
anything...
+        break sout;
+      }
+           </#if>
+      medianHelper.addNextNumber(input.value);
+
+      <#if type.inputType?starts_with("Nullable")>
+      } // end of sout block
+    </#if>
+    }
+
+    @Override
+    public void output() {
+      
org.apache.drill.exec.expr.fn.impl.StreamingMedianHelpers.${type.medianHelper} 
medianHelper = 
(org.apache.drill.exec.expr.fn.impl.StreamingMedianHelpers.${type.medianHelper})
 utils.obj;
+      median.value = medianHelper.getMedian();
+      median.isSet = 1;
+    }
+
+    @Override
+    public void reset() {
+      
org.apache.drill.exec.expr.fn.impl.StreamingMedianHelpers.StreamingMedianHelper 
medianHelper = 
(org.apache.drill.exec.expr.fn.impl.StreamingMedianHelpers.${type.medianHelper})
 utils.obj;
+      medianHelper.reset();
+    }
+  }
+
+
+</#list>
+}
+</#list>
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/StreamingMedianHelpers.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/StreamingMedianHelpers.java
new file mode 100644
index 0000000000..d6c2739b7d
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/StreamingMedianHelpers.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.expr.fn.impl;
+
+import java.util.Comparator;
+import java.util.PriorityQueue;
+
+/**
+ * This class implements a heap-based streaming median.
+ *
+ *<p>
+ * Reference: <a 
href="https://www.baeldung.com/java-stream-integers-median-using-heap";>Stream 
Integers Median using Heap</a>
+ * </p>
+ */
+
+public class StreamingMedianHelpers {
+
+  public interface StreamingMedianHelper {
+    void reset();
+  }
+
+  public static class StreamingIntMedianHelper implements 
StreamingMedianHelper {
+    private final PriorityQueue<Long> minHeap;
+    private final PriorityQueue<Long> maxHeap;
+
+    public StreamingIntMedianHelper() {
+      super();
+      this.minHeap = new PriorityQueue<>();
+      this.maxHeap = new PriorityQueue<>(Comparator.reverseOrder());
+    }
+
+    public void addNextNumber(Long n) {
+      if (!minHeap.isEmpty() && n < minHeap.peek()) {
+        maxHeap.offer(n);
+        if (maxHeap.size() > minHeap.size() + 1) {
+          minHeap.offer(maxHeap.poll());
+        }
+      } else {
+        minHeap.offer(n);
+        if (minHeap.size() > maxHeap.size() + 1) {
+          maxHeap.offer(minHeap.poll());
+        }
+      }
+    }
+
+    public void addNextNumber(int n) {
+      addNextNumber((long) n);
+    }
+
+    public Long getMedian() {
+      Long median;
+      if (minHeap.size() < maxHeap.size()) {
+        median = maxHeap.peek();
+      } else if (minHeap.size() > maxHeap.size()) {
+        median = minHeap.peek();
+      } else if (minHeap.isEmpty() && maxHeap.isEmpty()) {
+        median = 0L;
+      } else {
+        median = (minHeap.peek() + maxHeap.peek()) / 2;
+      }
+      return median;
+    }
+
+    public void reset() {
+      minHeap.clear();
+      maxHeap.clear();
+    }
+  }
+
+  public static class StreamingDoubleMedianHelper implements 
StreamingMedianHelper {
+    private final PriorityQueue<Double> minHeap;
+    private final PriorityQueue<Double> maxHeap;
+
+    public StreamingDoubleMedianHelper() {
+      super();
+      this.minHeap = new PriorityQueue<>();
+      this.maxHeap = new PriorityQueue<>(Comparator.reverseOrder());
+    }
+
+    public void addNextNumber(Double n) {
+      if (!minHeap.isEmpty() && n < minHeap.peek()) {
+        maxHeap.offer(n);
+        if (maxHeap.size() > minHeap.size() + 1) {
+          minHeap.offer(maxHeap.poll());
+        }
+      } else {
+        minHeap.offer(n);
+        if (minHeap.size() > maxHeap.size() + 1) {
+          maxHeap.offer(minHeap.poll());
+        }
+      }
+    }
+
+    public void addNextNumber(float n) {
+      addNextNumber((double) n);
+    }
+
+    public Double getMedian() {
+      Double median;
+      if (minHeap.size() < maxHeap.size()) {
+        median = maxHeap.peek();
+      } else if (minHeap.size() > maxHeap.size()) {
+        median = minHeap.peek();
+      } else if (minHeap.isEmpty() && maxHeap.isEmpty()) {
+        median = 0.0;
+      } else {
+        median = (minHeap.peek() + maxHeap.peek()) / 2;
+      }
+      return median;
+    }
+
+    public void reset() {
+      minHeap.clear();
+      maxHeap.clear();
+    }
+  }
+}
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/fn/impl/TestAggregateFunctions.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/fn/impl/TestAggregateFunctions.java
index 804aea35d1..3477d4122b 100644
--- 
a/exec/java-exec/src/test/java/org/apache/drill/exec/fn/impl/TestAggregateFunctions.java
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/fn/impl/TestAggregateFunctions.java
@@ -1289,4 +1289,46 @@ public class TestAggregateFunctions extends ClusterTest {
       .baselineValues("Partial High School", 3555.8064516129034, 
101.14516129032258, 3469.7014925373132, 103.3731343283582)
       .go();
   }
+
+  @Test
+  public void testMedian() throws Exception {
+
+    // Test as String from CSV and aggregate
+    String query = "SELECT category, median(`value`) AS med FROM 
cp.`orgs.csvh` GROUP BY category";
+    testBuilder()
+        .sqlQuery(query)
+        .unOrdered()
+        .baselineColumns("category", "med")
+        .baselineValues("cat1", 5L)
+        .baselineValues("cat2", 5L)
+        .go();
+
+    query = "SELECT category, median(CAST(`value` AS DOUBLE)) AS med FROM 
cp.`orgs.csvh` GROUP BY category";
+    testBuilder()
+        .sqlQuery(query)
+        .unOrdered()
+        .baselineColumns("category", "med")
+        .baselineValues("cat1", 5.0)
+        .baselineValues("cat2", 5.0)
+        .go();
+
+  }
+
+  @Test
+  public void testMedianWithPivot() throws Exception {
+    String query = "SELECT * FROM (\n" +
+        "SELECT education_level, salary, marital_status, extract(year from 
age('2023-02-23', birth_date)) age\n" +
+        "FROM cp.`employee.json`)\n" +
+        "PIVOT (median(salary) med_salary, median(age) med_age FOR 
marital_status IN ('M' married, 'S' single))";
+    testBuilder()
+        .sqlQuery(query)
+        .unOrdered()
+        .baselineColumns("education_level", "married_med_salary", 
"married_med_age", "single_med_salary", "single_med_age")
+        .baselineValues("Graduate Degree", 4400.0, 109L, 20.0, 109L)
+        .baselineValues("Bachelors Degree", 4400.0,109L, 4400.0, 109L)
+        .baselineValues("Partial College", 4500.0, 109L, 4400.0, 109L)
+        .baselineValues("High School Degree", 3700.0, 109L, 4400.0, 109L)
+        .baselineValues("Partial High School", 4400.0, 109L, 20.0, 109L)
+        .go();
+  }
 }
diff --git a/exec/java-exec/src/test/resources/orgs.csvh 
b/exec/java-exec/src/test/resources/orgs.csvh
new file mode 100644
index 0000000000..0ca6b074a3
--- /dev/null
+++ b/exec/java-exec/src/test/resources/orgs.csvh
@@ -0,0 +1,11 @@
+category,value
+cat1,5
+cat1,3
+cat1,2
+cat1,7
+cat1,10
+cat2,1
+cat2,5
+cat2,9
+cat2,2
+cat2,11
\ No newline at end of file
diff --git a/exec/java-exec/src/test/resources/test_data.csvh 
b/exec/java-exec/src/test/resources/test_data.csvh
new file mode 100644
index 0000000000..d8271c93de
--- /dev/null
+++ b/exec/java-exec/src/test/resources/test_data.csvh
@@ -0,0 +1,5 @@
+col1,col2,col3,col4
+2,25,2.0,0
+3,32,3.0,0
+4,49,4.5,
+5,32,9,9
\ No newline at end of file

Reply via email to