This is an automated email from the ASF dual-hosted git repository.

karan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new a0437b6c931 MSQ window functions: Fix partition boundary issues for 
arrays (#16780)
a0437b6c931 is described below

commit a0437b6c9319e81a41a6d408b745e364696a51bb
Author: Akshat Jain <[email protected]>
AuthorDate: Wed Jul 24 18:47:04 2024 +0530

    MSQ window functions: Fix partition boundary issues for arrays (#16780)
    
    * MSQ window functions: Fix partition boundary issues for arrays
    
    * Address review comments
    
    * Cache type strategies
    
    * Trigger Build
    
    * Convert typeStrategies from list to array
---
 .../querykit/WindowOperatorQueryFrameProcessor.java | 15 ++++++++++++---
 .../druid/sql/calcite/DrillWindowQueryTest.java     | 21 +++++++++++++++++++++
 .../partition_by_array/wikipedia_query_1.e          | 13 +++++++++++++
 .../partition_by_array/wikipedia_query_1.q          |  6 ++++++
 .../partition_by_array/wikipedia_query_2.e          | 13 +++++++++++++
 .../partition_by_array/wikipedia_query_2.q          |  6 ++++++
 .../partition_by_array/wikipedia_query_3.e          | 13 +++++++++++++
 .../partition_by_array/wikipedia_query_3.q          |  6 ++++++
 8 files changed, 90 insertions(+), 3 deletions(-)

diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessor.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessor.java
index e21cae36d0f..5fbfd3119d0 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessor.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessor.java
@@ -51,6 +51,7 @@ import 
org.apache.druid.query.rowsandcols.semantic.ColumnSelectorFactoryMaker;
 import org.apache.druid.segment.ColumnSelectorFactory;
 import org.apache.druid.segment.ColumnValueSelector;
 import org.apache.druid.segment.Cursor;
+import org.apache.druid.segment.column.NullableTypeStrategy;
 import org.apache.druid.segment.column.RowSignature;
 
 import javax.annotation.Nullable;
@@ -59,7 +60,6 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
-import java.util.Objects;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Supplier;
 
@@ -85,6 +85,10 @@ public class WindowOperatorQueryFrameProcessor implements 
FrameProcessor<Object>
   private ResultRow outputRow = null;
   private FrameWriter frameWriter = null;
 
+  // List of type strategies to compare the partition columns across rows.
+  // Type strategies are pushed in the same order as column types in 
frameReader.signature()
+  private final NullableTypeStrategy[] typeStrategies;
+
   public WindowOperatorQueryFrameProcessor(
       WindowOperatorQuery query,
       ReadableFrameChannel inputChannel,
@@ -103,13 +107,18 @@ public class WindowOperatorQueryFrameProcessor implements 
FrameProcessor<Object>
     this.frameWriterFactory = frameWriterFactory;
     this.operatorFactoryList = operatorFactoryList;
     this.jsonMapper = jsonMapper;
-    this.frameReader = frameReader;
     this.query = query;
     this.frameRowsAndCols = new ArrayList<>();
     this.resultRowAndCols = new ArrayList<>();
     this.objectsOfASingleRac = new ArrayList<>();
     this.maxRowsMaterialized = maxRowsMaterializedInWindow;
     this.partitionColumnNames = partitionColumnNames;
+
+    this.frameReader = frameReader;
+    this.typeStrategies = new 
NullableTypeStrategy[frameReader.signature().size()];
+    for (int i = 0; i < frameReader.signature().size(); i++) {
+      typeStrategies[i] = 
frameReader.signature().getColumnType(i).get().getNullableStrategy();
+    }
   }
 
   @Override
@@ -499,7 +508,7 @@ public class WindowOperatorQueryFrameProcessor implements 
FrameProcessor<Object>
     int match = 0;
     for (String columnName : partitionColumnNames) {
       int i = frameReader.signature().indexOf(columnName);
-      if (Objects.equals(row1.get(i), row2.get(i))) {
+      if (typeStrategies[i].compare(row1.get(i), row2.get(i)) == 0) {
         match++;
       }
     }
diff --git 
a/sql/src/test/java/org/apache/druid/sql/calcite/DrillWindowQueryTest.java 
b/sql/src/test/java/org/apache/druid/sql/calcite/DrillWindowQueryTest.java
index 1451d2495c9..4a2f0945087 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/DrillWindowQueryTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/DrillWindowQueryTest.java
@@ -7750,4 +7750,25 @@ public class DrillWindowQueryTest extends 
BaseCalciteQueryTest
   {
     windowQueryTest();
   }
+
+  @DrillTest("druid_queries/partition_by_array/wikipedia_query_1")
+  @Test
+  public void test_partition_by_array_wikipedia_query_1()
+  {
+    windowQueryTest();
+  }
+
+  @DrillTest("druid_queries/partition_by_array/wikipedia_query_2")
+  @Test
+  public void test_partition_by_array_wikipedia_query_2()
+  {
+    windowQueryTest();
+  }
+
+  @DrillTest("druid_queries/partition_by_array/wikipedia_query_3")
+  @Test
+  public void test_partition_by_array_wikipedia_query_3()
+  {
+    windowQueryTest();
+  }
 }
diff --git 
a/sql/src/test/resources/drill/window/queries/druid_queries/partition_by_array/wikipedia_query_1.e
 
b/sql/src/test/resources/drill/window/queries/druid_queries/partition_by_array/wikipedia_query_1.e
new file mode 100644
index 00000000000..26c251a35fb
--- /dev/null
+++ 
b/sql/src/test/resources/drill/window/queries/druid_queries/partition_by_array/wikipedia_query_1.e
@@ -0,0 +1,13 @@
+Austria        null    #de.wikipedia   1
+Republic of Korea      null    #en.wikipedia   2
+Republic of Korea      null    #ja.wikipedia   3
+Republic of Korea      null    #ko.wikipedia   4
+Republic of Korea      Seoul   #ko.wikipedia   1
+Austria        Vienna  #de.wikipedia   1
+Austria        Vienna  #es.wikipedia   2
+Austria        Vienna  #tr.wikipedia   3
+Republic of Korea      Jeonju  #ko.wikipedia   4
+Republic of Korea      Suwon-si        #ko.wikipedia   1
+Austria        Horsching       #de.wikipedia   1
+Republic of Korea      Seongnam-si     #ko.wikipedia   1
+Republic of Korea      Yongsan-dong    #ko.wikipedia   1
diff --git 
a/sql/src/test/resources/drill/window/queries/druid_queries/partition_by_array/wikipedia_query_1.q
 
b/sql/src/test/resources/drill/window/queries/druid_queries/partition_by_array/wikipedia_query_1.q
new file mode 100644
index 00000000000..b10b52af389
--- /dev/null
+++ 
b/sql/src/test/resources/drill/window/queries/druid_queries/partition_by_array/wikipedia_query_1.q
@@ -0,0 +1,6 @@
+select
+countryName, cityName, channel,
+row_number() over (partition by array[1,2,length(cityName)] order by 
countryName) as c
+from wikipedia
+where countryName in ('Austria', 'Republic of Korea')
+group by countryName, cityName, channel
diff --git 
a/sql/src/test/resources/drill/window/queries/druid_queries/partition_by_array/wikipedia_query_2.e
 
b/sql/src/test/resources/drill/window/queries/druid_queries/partition_by_array/wikipedia_query_2.e
new file mode 100644
index 00000000000..a1b116035c1
--- /dev/null
+++ 
b/sql/src/test/resources/drill/window/queries/druid_queries/partition_by_array/wikipedia_query_2.e
@@ -0,0 +1,13 @@
+Austria        null    #de.wikipedia   1
+Austria        Horsching       #de.wikipedia   2
+Austria        Vienna  #de.wikipedia   3
+Austria        Vienna  #es.wikipedia   4
+Austria        Vienna  #tr.wikipedia   5
+Republic of Korea      null    #en.wikipedia   6
+Republic of Korea      null    #ja.wikipedia   7
+Republic of Korea      null    #ko.wikipedia   8
+Republic of Korea      Jeonju  #ko.wikipedia   9
+Republic of Korea      Seongnam-si     #ko.wikipedia   10
+Republic of Korea      Seoul   #ko.wikipedia   11
+Republic of Korea      Suwon-si        #ko.wikipedia   12
+Republic of Korea      Yongsan-dong    #ko.wikipedia   13
diff --git 
a/sql/src/test/resources/drill/window/queries/druid_queries/partition_by_array/wikipedia_query_2.q
 
b/sql/src/test/resources/drill/window/queries/druid_queries/partition_by_array/wikipedia_query_2.q
new file mode 100644
index 00000000000..99245d7f953
--- /dev/null
+++ 
b/sql/src/test/resources/drill/window/queries/druid_queries/partition_by_array/wikipedia_query_2.q
@@ -0,0 +1,6 @@
+select
+countryName, cityName, channel,
+row_number() over (partition by array[1,2,3] order by countryName) as c
+from wikipedia
+where countryName in ('Austria', 'Republic of Korea')
+group by countryName, cityName, channel
diff --git 
a/sql/src/test/resources/drill/window/queries/druid_queries/partition_by_array/wikipedia_query_3.e
 
b/sql/src/test/resources/drill/window/queries/druid_queries/partition_by_array/wikipedia_query_3.e
new file mode 100644
index 00000000000..ebd91f9f893
--- /dev/null
+++ 
b/sql/src/test/resources/drill/window/queries/druid_queries/partition_by_array/wikipedia_query_3.e
@@ -0,0 +1,13 @@
+Austria        null    #de.wikipedia   1
+Austria        Vienna  #de.wikipedia   1
+Austria        Vienna  #es.wikipedia   2
+Austria        Vienna  #tr.wikipedia   3
+Austria        Horsching       #de.wikipedia   1
+Republic of Korea      null    #en.wikipedia   1
+Republic of Korea      null    #ja.wikipedia   2
+Republic of Korea      null    #ko.wikipedia   3
+Republic of Korea      Seoul   #ko.wikipedia   1
+Republic of Korea      Jeonju  #ko.wikipedia   1
+Republic of Korea      Suwon-si        #ko.wikipedia   1
+Republic of Korea      Seongnam-si     #ko.wikipedia   1
+Republic of Korea      Yongsan-dong    #ko.wikipedia   1
diff --git 
a/sql/src/test/resources/drill/window/queries/druid_queries/partition_by_array/wikipedia_query_3.q
 
b/sql/src/test/resources/drill/window/queries/druid_queries/partition_by_array/wikipedia_query_3.q
new file mode 100644
index 00000000000..9241f2ee94e
--- /dev/null
+++ 
b/sql/src/test/resources/drill/window/queries/druid_queries/partition_by_array/wikipedia_query_3.q
@@ -0,0 +1,6 @@
+select
+countryName, cityName, channel,
+row_number() over (partition by array[1,length(countryName),length(cityName)] 
order by countryName) as c
+from wikipedia
+where countryName in ('Austria', 'Republic of Korea')
+group by countryName, cityName, channel


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to