This is an automated email from the ASF dual-hosted git repository.
karan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new a0437b6c931 MSQ window functions: Fix partition boundary issues for
arrays (#16780)
a0437b6c931 is described below
commit a0437b6c9319e81a41a6d408b745e364696a51bb
Author: Akshat Jain <[email protected]>
AuthorDate: Wed Jul 24 18:47:04 2024 +0530
MSQ window functions: Fix partition boundary issues for arrays (#16780)
* MSQ window functions: Fix partition boundary issues for arrays
* Address review comments
* Cache type strategies
* Trigger Build
* Convert typeStrategies from list to array
---
.../querykit/WindowOperatorQueryFrameProcessor.java | 15 ++++++++++++---
.../druid/sql/calcite/DrillWindowQueryTest.java | 21 +++++++++++++++++++++
.../partition_by_array/wikipedia_query_1.e | 13 +++++++++++++
.../partition_by_array/wikipedia_query_1.q | 6 ++++++
.../partition_by_array/wikipedia_query_2.e | 13 +++++++++++++
.../partition_by_array/wikipedia_query_2.q | 6 ++++++
.../partition_by_array/wikipedia_query_3.e | 13 +++++++++++++
.../partition_by_array/wikipedia_query_3.q | 6 ++++++
8 files changed, 90 insertions(+), 3 deletions(-)
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessor.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessor.java
index e21cae36d0f..5fbfd3119d0 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessor.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessor.java
@@ -51,6 +51,7 @@ import
org.apache.druid.query.rowsandcols.semantic.ColumnSelectorFactoryMaker;
import org.apache.druid.segment.ColumnSelectorFactory;
import org.apache.druid.segment.ColumnValueSelector;
import org.apache.druid.segment.Cursor;
+import org.apache.druid.segment.column.NullableTypeStrategy;
import org.apache.druid.segment.column.RowSignature;
import javax.annotation.Nullable;
@@ -59,7 +60,6 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
-import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
@@ -85,6 +85,10 @@ public class WindowOperatorQueryFrameProcessor implements
FrameProcessor<Object>
private ResultRow outputRow = null;
private FrameWriter frameWriter = null;
+ // List of type strategies to compare the partition columns across rows.
+ // Type strategies are pushed in the same order as column types in
frameReader.signature()
+ private final NullableTypeStrategy[] typeStrategies;
+
public WindowOperatorQueryFrameProcessor(
WindowOperatorQuery query,
ReadableFrameChannel inputChannel,
@@ -103,13 +107,18 @@ public class WindowOperatorQueryFrameProcessor implements
FrameProcessor<Object>
this.frameWriterFactory = frameWriterFactory;
this.operatorFactoryList = operatorFactoryList;
this.jsonMapper = jsonMapper;
- this.frameReader = frameReader;
this.query = query;
this.frameRowsAndCols = new ArrayList<>();
this.resultRowAndCols = new ArrayList<>();
this.objectsOfASingleRac = new ArrayList<>();
this.maxRowsMaterialized = maxRowsMaterializedInWindow;
this.partitionColumnNames = partitionColumnNames;
+
+ this.frameReader = frameReader;
+ this.typeStrategies = new
NullableTypeStrategy[frameReader.signature().size()];
+ for (int i = 0; i < frameReader.signature().size(); i++) {
+ typeStrategies[i] =
frameReader.signature().getColumnType(i).get().getNullableStrategy();
+ }
}
@Override
@@ -499,7 +508,7 @@ public class WindowOperatorQueryFrameProcessor implements
FrameProcessor<Object>
int match = 0;
for (String columnName : partitionColumnNames) {
int i = frameReader.signature().indexOf(columnName);
- if (Objects.equals(row1.get(i), row2.get(i))) {
+ if (typeStrategies[i].compare(row1.get(i), row2.get(i)) == 0) {
match++;
}
}
diff --git
a/sql/src/test/java/org/apache/druid/sql/calcite/DrillWindowQueryTest.java
b/sql/src/test/java/org/apache/druid/sql/calcite/DrillWindowQueryTest.java
index 1451d2495c9..4a2f0945087 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/DrillWindowQueryTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/DrillWindowQueryTest.java
@@ -7750,4 +7750,25 @@ public class DrillWindowQueryTest extends
BaseCalciteQueryTest
{
windowQueryTest();
}
+
+ @DrillTest("druid_queries/partition_by_array/wikipedia_query_1")
+ @Test
+ public void test_partition_by_array_wikipedia_query_1()
+ {
+ windowQueryTest();
+ }
+
+ @DrillTest("druid_queries/partition_by_array/wikipedia_query_2")
+ @Test
+ public void test_partition_by_array_wikipedia_query_2()
+ {
+ windowQueryTest();
+ }
+
+ @DrillTest("druid_queries/partition_by_array/wikipedia_query_3")
+ @Test
+ public void test_partition_by_array_wikipedia_query_3()
+ {
+ windowQueryTest();
+ }
}
diff --git
a/sql/src/test/resources/drill/window/queries/druid_queries/partition_by_array/wikipedia_query_1.e
b/sql/src/test/resources/drill/window/queries/druid_queries/partition_by_array/wikipedia_query_1.e
new file mode 100644
index 00000000000..26c251a35fb
--- /dev/null
+++
b/sql/src/test/resources/drill/window/queries/druid_queries/partition_by_array/wikipedia_query_1.e
@@ -0,0 +1,13 @@
+Austria null #de.wikipedia 1
+Republic of Korea null #en.wikipedia 2
+Republic of Korea null #ja.wikipedia 3
+Republic of Korea null #ko.wikipedia 4
+Republic of Korea Seoul #ko.wikipedia 1
+Austria Vienna #de.wikipedia 1
+Austria Vienna #es.wikipedia 2
+Austria Vienna #tr.wikipedia 3
+Republic of Korea Jeonju #ko.wikipedia 4
+Republic of Korea Suwon-si #ko.wikipedia 1
+Austria Horsching #de.wikipedia 1
+Republic of Korea Seongnam-si #ko.wikipedia 1
+Republic of Korea Yongsan-dong #ko.wikipedia 1
diff --git
a/sql/src/test/resources/drill/window/queries/druid_queries/partition_by_array/wikipedia_query_1.q
b/sql/src/test/resources/drill/window/queries/druid_queries/partition_by_array/wikipedia_query_1.q
new file mode 100644
index 00000000000..b10b52af389
--- /dev/null
+++
b/sql/src/test/resources/drill/window/queries/druid_queries/partition_by_array/wikipedia_query_1.q
@@ -0,0 +1,6 @@
+select
+countryName, cityName, channel,
+row_number() over (partition by array[1,2,length(cityName)] order by
countryName) as c
+from wikipedia
+where countryName in ('Austria', 'Republic of Korea')
+group by countryName, cityName, channel
diff --git
a/sql/src/test/resources/drill/window/queries/druid_queries/partition_by_array/wikipedia_query_2.e
b/sql/src/test/resources/drill/window/queries/druid_queries/partition_by_array/wikipedia_query_2.e
new file mode 100644
index 00000000000..a1b116035c1
--- /dev/null
+++
b/sql/src/test/resources/drill/window/queries/druid_queries/partition_by_array/wikipedia_query_2.e
@@ -0,0 +1,13 @@
+Austria null #de.wikipedia 1
+Austria Horsching #de.wikipedia 2
+Austria Vienna #de.wikipedia 3
+Austria Vienna #es.wikipedia 4
+Austria Vienna #tr.wikipedia 5
+Republic of Korea null #en.wikipedia 6
+Republic of Korea null #ja.wikipedia 7
+Republic of Korea null #ko.wikipedia 8
+Republic of Korea Jeonju #ko.wikipedia 9
+Republic of Korea Seongnam-si #ko.wikipedia 10
+Republic of Korea Seoul #ko.wikipedia 11
+Republic of Korea Suwon-si #ko.wikipedia 12
+Republic of Korea Yongsan-dong #ko.wikipedia 13
diff --git
a/sql/src/test/resources/drill/window/queries/druid_queries/partition_by_array/wikipedia_query_2.q
b/sql/src/test/resources/drill/window/queries/druid_queries/partition_by_array/wikipedia_query_2.q
new file mode 100644
index 00000000000..99245d7f953
--- /dev/null
+++
b/sql/src/test/resources/drill/window/queries/druid_queries/partition_by_array/wikipedia_query_2.q
@@ -0,0 +1,6 @@
+select
+countryName, cityName, channel,
+row_number() over (partition by array[1,2,3] order by countryName) as c
+from wikipedia
+where countryName in ('Austria', 'Republic of Korea')
+group by countryName, cityName, channel
diff --git
a/sql/src/test/resources/drill/window/queries/druid_queries/partition_by_array/wikipedia_query_3.e
b/sql/src/test/resources/drill/window/queries/druid_queries/partition_by_array/wikipedia_query_3.e
new file mode 100644
index 00000000000..ebd91f9f893
--- /dev/null
+++
b/sql/src/test/resources/drill/window/queries/druid_queries/partition_by_array/wikipedia_query_3.e
@@ -0,0 +1,13 @@
+Austria null #de.wikipedia 1
+Austria Vienna #de.wikipedia 1
+Austria Vienna #es.wikipedia 2
+Austria Vienna #tr.wikipedia 3
+Austria Horsching #de.wikipedia 1
+Republic of Korea null #en.wikipedia 1
+Republic of Korea null #ja.wikipedia 2
+Republic of Korea null #ko.wikipedia 3
+Republic of Korea Seoul #ko.wikipedia 1
+Republic of Korea Jeonju #ko.wikipedia 1
+Republic of Korea Suwon-si #ko.wikipedia 1
+Republic of Korea Seongnam-si #ko.wikipedia 1
+Republic of Korea Yongsan-dong #ko.wikipedia 1
diff --git
a/sql/src/test/resources/drill/window/queries/druid_queries/partition_by_array/wikipedia_query_3.q
b/sql/src/test/resources/drill/window/queries/druid_queries/partition_by_array/wikipedia_query_3.q
new file mode 100644
index 00000000000..9241f2ee94e
--- /dev/null
+++
b/sql/src/test/resources/drill/window/queries/druid_queries/partition_by_array/wikipedia_query_3.q
@@ -0,0 +1,6 @@
+select
+countryName, cityName, channel,
+row_number() over (partition by array[1,length(countryName),length(cityName)]
order by countryName) as c
+from wikipedia
+where countryName in ('Austria', 'Republic of Korea')
+group by countryName, cityName, channel
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]