This is an automated email from the ASF dual-hosted git repository.

asdf2014 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new b53c26f5c59 Fix issues with partitioning boundaries for MSQ window 
functions (#16729)
b53c26f5c59 is described below

commit b53c26f5c5972d51f712fa4a2c4b906006fbb392
Author: Akshat Jain <akj...@gmail.com>
AuthorDate: Thu Jul 18 07:35:09 2024 +0530

    Fix issues with partitioning boundaries for MSQ window functions (#16729)
    
    * Fix issues with partitioning boundaries for MSQ window functions
    
    * Address review comments
    
    * Address review comments
    
    * Add test for coverage check failure
    
    * Address review comment
    
    * Remove DruidWindowQueryTest and WindowQueryTestBase, move those tests to 
DrillWindowQueryTest
    
    * Update 
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryKit.java
    
    * Address review comments
    
    * Add test for equals and hashcode for 
WindowOperatorQueryFrameProcessorFactory
    
    * Address review comment
    
    * Fix checkstyle
    
    ---------
    
    Co-authored-by: Benedict Jin <asdf2...@apache.org>
---
 .../WindowOperatorQueryFrameProcessor.java         |  61 ++++----
 .../WindowOperatorQueryFrameProcessorFactory.java  |  18 ++-
 .../druid/msq/querykit/WindowOperatorQueryKit.java | 157 +++++++++++++++------
 ...ndowOperatorQueryFrameProcessorFactoryTest.java |  35 +++++
 .../druid/msq/test/CalciteMSQTestsHelper.java      |  16 +++
 .../org/apache/druid/query/operator/Operator.java  |   2 +-
 .../query/operator/window/ComposingProcessor.java  |  12 ++
 .../druid/query/operator/window/Processor.java     |   7 +
 .../window/WindowFramedAggregateProcessor.java     |  12 ++
 .../window/ranking/WindowPercentileProcessor.java  |   8 ++
 .../window/ranking/WindowRankingProcessorBase.java |   6 +
 .../window/ranking/WindowRowNumberProcessor.java   |   9 ++
 .../window/value/WindowValueProcessorBase.java     |   8 ++
 .../operator/WindowProcessorOperatorTest.java      |   9 ++
 .../operator/window/ComposingProcessorTest.java    |  10 ++
 .../window/WindowFramedAggregateProcessorTest.java |   2 +
 .../ranking/WindowCumeDistProcessorTest.java       |   2 +
 .../ranking/WindowDenseRankProcessorTest.java      |   2 +
 .../ranking/WindowPercentileProcessorTest.java     |   7 +
 .../window/ranking/WindowRankProcessorTest.java    |   4 +
 .../ranking/WindowRowNumberProcessorTest.java      |   3 +
 .../window/value/WindowFirstProcessorTest.java     |   7 +
 .../window/value/WindowLastProcessorTest.java      |   6 +
 .../druid/sql/calcite/DrillWindowQueryTest.java    |  74 ++++++++++
 .../multiple_windows/wikipedia_query_1.e           |  13 ++
 .../multiple_windows/wikipedia_query_1.q           |   6 +
 .../wikipedia_query_1_named_windows.e              |  13 ++
 .../wikipedia_query_1_named_windows.q              |   9 ++
 .../wikipedia_query_1.e                            |  15 ++
 .../wikipedia_query_1.q                            |   7 +
 .../wikipedia_query_2.e                            |  15 ++
 .../wikipedia_query_2.q                            |   9 ++
 .../same_window_across_columns/wikipedia_query_1.e |  15 ++
 .../same_window_across_columns/wikipedia_query_1.q |   6 +
 .../wikipedia_query_1_named_window.e               |  15 ++
 .../wikipedia_query_1_named_window.q               |   7 +
 .../shuffle_columns/wikipedia_query_1.e            |  15 ++
 .../shuffle_columns/wikipedia_query_1.q            |   5 +
 .../shuffle_columns/wikipedia_query_1_shuffle_1.e  |  15 ++
 .../shuffle_columns/wikipedia_query_1_shuffle_1.q  |   5 +
 .../shuffle_columns/wikipedia_query_2.e            |  16 +++
 .../shuffle_columns/wikipedia_query_2.q            |   9 ++
 .../shuffle_columns/wikipedia_query_2_shuffle_1.e  |  16 +++
 .../shuffle_columns/wikipedia_query_2_shuffle_1.q  |   9 ++
 44 files changed, 614 insertions(+), 83 deletions(-)

diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessor.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessor.java
index 6d8cfdfd277..2bf21397ffb 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessor.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessor.java
@@ -38,7 +38,6 @@ import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.msq.indexing.error.MSQException;
 import org.apache.druid.msq.indexing.error.TooManyRowsInAWindowFault;
 import org.apache.druid.query.groupby.ResultRow;
-import org.apache.druid.query.operator.NaivePartitioningOperatorFactory;
 import org.apache.druid.query.operator.OffsetLimit;
 import org.apache.druid.query.operator.Operator;
 import org.apache.druid.query.operator.OperatorFactory;
@@ -70,6 +69,7 @@ public class WindowOperatorQueryFrameProcessor implements 
FrameProcessor<Object>
   private final WindowOperatorQuery query;
 
   private final List<OperatorFactory> operatorFactoryList;
+  private final List<String> partitionColumnNames;
   private final ObjectMapper jsonMapper;
   private final ArrayList<RowsAndColumns> frameRowsAndCols;
   private final ArrayList<RowsAndColumns> resultRowAndCols;
@@ -79,7 +79,6 @@ public class WindowOperatorQueryFrameProcessor implements 
FrameProcessor<Object>
   private final FrameReader frameReader;
   private final ArrayList<ResultRow> objectsOfASingleRac;
   private final int maxRowsMaterialized;
-  List<Integer> partitionColsIndex;
   private long currentAllocatorCapacity; // Used for generating 
FrameRowTooLargeException if needed
   private Cursor frameCursor = null;
   private Supplier<ResultRow> rowSupplierFromFrameCursor;
@@ -97,7 +96,8 @@ public class WindowOperatorQueryFrameProcessor implements 
FrameProcessor<Object>
       final List<OperatorFactory> operatorFactoryList,
       final RowSignature rowSignature,
       final boolean isOverEmpty,
-      final int maxRowsMaterializedInWindow
+      final int maxRowsMaterializedInWindow,
+      final List<String> partitionColumnNames
   )
   {
     this.inputChannel = inputChannel;
@@ -110,9 +110,9 @@ public class WindowOperatorQueryFrameProcessor implements 
FrameProcessor<Object>
     this.frameRowsAndCols = new ArrayList<>();
     this.resultRowAndCols = new ArrayList<>();
     this.objectsOfASingleRac = new ArrayList<>();
-    this.partitionColsIndex = new ArrayList<>();
     this.isOverEmpty = isOverEmpty;
     this.maxRowsMaterialized = maxRowsMaterializedInWindow;
+    this.partitionColumnNames = partitionColumnNames;
   }
 
   @Override
@@ -177,12 +177,12 @@ public class WindowOperatorQueryFrameProcessor implements 
FrameProcessor<Object>
      *
      *  Future thoughts: {@link https://github.com/apache/druid/issues/16126}
      *
-     *  1. We are writing 1 partition to each frame in this way. In case of 
low cardinality data
-     *      we will me making a large number of small frames. We can have a 
check to keep size of frame to a value
+     *  1. We are writing 1 partition to each frame in this way. In case of 
high cardinality data
+     *      we will be making a large number of small frames. We can have a 
check to keep size of frame to a value
      *      say 20k rows and keep on adding to the same pending frame and not 
create a new frame
      *
      *  2. Current approach with R&C and operators materialize a single R&C 
for processing. In case of data
-     *     with high cardinality a single R&C might be too big to consume. 
Same for the case of empty OVER() clause
+     *     with low cardinality a single R&C might be too big to consume. Same 
for the case of empty OVER() clause
      *     Most of the window operations like SUM(), RANK(), RANGE() etc. can 
be made with 2 passes of the data.
      *     We might think to reimplement them in the MSQ way so that we do not 
have to materialize so much data
      */
@@ -218,7 +218,6 @@ public class WindowOperatorQueryFrameProcessor implements 
FrameProcessor<Object>
           final Frame frame = inputChannel.read();
           frameCursor = FrameProcessors.makeCursor(frame, frameReader);
           final ColumnSelectorFactory frameColumnSelectorFactory = 
frameCursor.getColumnSelectorFactory();
-          partitionColsIndex = findPartitionColumns(frameReader.signature());
           final Supplier<Object>[] fieldSuppliers = new 
Supplier[frameReader.signature().size()];
           for (int i = 0; i < fieldSuppliers.length; i++) {
             final ColumnValueSelector<?> selector =
@@ -259,18 +258,17 @@ public class WindowOperatorQueryFrameProcessor implements 
FrameProcessor<Object>
         if (outputRow == null) {
           outputRow = currentRow;
           objectsOfASingleRac.add(currentRow);
-        } else if (comparePartitionKeys(outputRow, currentRow, 
partitionColsIndex)) {
+        } else if (comparePartitionKeys(outputRow, currentRow, 
partitionColumnNames)) {
           // if they have the same partition key
           // keep adding them after checking
           // guardrails
+          objectsOfASingleRac.add(currentRow);
           if (objectsOfASingleRac.size() > maxRowsMaterialized) {
             throw new MSQException(new TooManyRowsInAWindowFault(
                 objectsOfASingleRac.size(),
                 maxRowsMaterialized
             ));
           }
-          objectsOfASingleRac.add(currentRow);
-
         } else {
           // key change noted
           // create rac from the rows seen before
@@ -484,37 +482,36 @@ public class WindowOperatorQueryFrameProcessor implements 
FrameProcessor<Object>
     frameRowsAndCols.add(ldrc);
   }
 
-  private List<Integer> findPartitionColumns(RowSignature rowSignature)
-  {
-    List<Integer> indexList = new ArrayList<>();
-    for (OperatorFactory of : operatorFactoryList) {
-      if (of instanceof NaivePartitioningOperatorFactory) {
-        for (String s : ((NaivePartitioningOperatorFactory) 
of).getPartitionColumns()) {
-          indexList.add(rowSignature.indexOf(s));
-        }
-      }
-    }
-    return indexList;
-  }
-
   /**
-   *
-   * Compare two rows based only the columns in the partitionIndices
-   * In case the parition indices is empty or null compare entire row
-   *
+   * Compare two rows based on the columns in partitionColumnNames.
+   * If the partitionColumnNames is empty or null, compare entire row.
+   * <p>
+   * For example, say:
+   * <ul>
+   *   <li>partitionColumnNames = ["d1", "d2"]</li>
+   *   <li>frameReader's row signature = {d1:STRING, d2:STRING, p0:STRING}</li>
+   *   <li>frameReader.signature.indexOf("d1") = 0</li>
+   *   <li>frameReader.signature.indexOf("d2") = 1</li>
+   *   <li>row1 = [d1_row1, d2_row1, p0_row1]</li>
+   *   <li>row2 = [d1_row2, d2_row2, p0_row2]</li>
+   * </ul>
+   * <p>
+   * Then this method will return true if d1_row1==d1_row2 && 
d2_row1==d2_row2, false otherwise.
+   * Returning true would indicate that these 2 rows can be put into the same 
partition for window function processing.
    */
-  private boolean comparePartitionKeys(ResultRow row1, ResultRow row2, 
List<Integer> partitionIndices)
+  private boolean comparePartitionKeys(ResultRow row1, ResultRow row2, 
List<String> partitionColumnNames)
   {
-    if (partitionIndices == null || partitionIndices.isEmpty()) {
+    if (partitionColumnNames == null || partitionColumnNames.isEmpty()) {
       return row1.equals(row2);
     } else {
       int match = 0;
-      for (int i : partitionIndices) {
+      for (String columnName : partitionColumnNames) {
+        int i = frameReader.signature().indexOf(columnName);
         if (Objects.equals(row1.get(i), row2.get(i))) {
           match++;
         }
       }
-      return match == partitionIndices.size();
+      return match == partitionColumnNames.size();
     }
   }
 }
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessorFactory.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessorFactory.java
index fbbc0a0fc3e..d9c14390736 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessorFactory.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessorFactory.java
@@ -61,6 +61,7 @@ public class WindowOperatorQueryFrameProcessorFactory extends 
BaseFrameProcessor
   private final RowSignature stageRowSignature;
   private final boolean isEmptyOver;
   private final int maxRowsMaterializedInWindow;
+  private final List<String> partitionColumnNames;
 
   @JsonCreator
   public WindowOperatorQueryFrameProcessorFactory(
@@ -68,7 +69,8 @@ public class WindowOperatorQueryFrameProcessorFactory extends 
BaseFrameProcessor
       @JsonProperty("operatorList") List<OperatorFactory> operatorFactoryList,
       @JsonProperty("stageRowSignature") RowSignature stageRowSignature,
       @JsonProperty("emptyOver") boolean emptyOver,
-      @JsonProperty("maxRowsMaterializedInWindow") int 
maxRowsMaterializedInWindow
+      @JsonProperty("maxRowsMaterializedInWindow") int 
maxRowsMaterializedInWindow,
+      @JsonProperty("partitionColumnNames") List<String> partitionColumnNames
   )
   {
     this.query = Preconditions.checkNotNull(query, "query");
@@ -76,6 +78,7 @@ public class WindowOperatorQueryFrameProcessorFactory extends 
BaseFrameProcessor
     this.stageRowSignature = Preconditions.checkNotNull(stageRowSignature, 
"stageSignature");
     this.isEmptyOver = emptyOver;
     this.maxRowsMaterializedInWindow = maxRowsMaterializedInWindow;
+    this.partitionColumnNames = partitionColumnNames;
   }
 
   @JsonProperty("query")
@@ -90,6 +93,12 @@ public class WindowOperatorQueryFrameProcessorFactory 
extends BaseFrameProcessor
     return operatorList;
   }
 
+  @JsonProperty("partitionColumnNames")
+  public List<String> getPartitionColumnNames()
+  {
+    return partitionColumnNames;
+  }
+
   @JsonProperty("stageRowSignature")
   public RowSignature getSignature()
   {
@@ -148,7 +157,6 @@ public class WindowOperatorQueryFrameProcessorFactory 
extends BaseFrameProcessor
         readableInput -> {
           final OutputChannel outputChannel =
               
outputChannels.get(readableInput.getStagePartition().getPartitionNumber());
-
           return new WindowOperatorQueryFrameProcessor(
               query,
               readableInput.getChannel(),
@@ -159,7 +167,8 @@ public class WindowOperatorQueryFrameProcessorFactory 
extends BaseFrameProcessor
               operatorList,
               stageRowSignature,
               isEmptyOver,
-              maxRowsMaterializedInWindow
+              maxRowsMaterializedInWindow,
+              partitionColumnNames
           );
         }
     );
@@ -185,12 +194,13 @@ public class WindowOperatorQueryFrameProcessorFactory 
extends BaseFrameProcessor
            && maxRowsMaterializedInWindow == that.maxRowsMaterializedInWindow
            && Objects.equals(query, that.query)
            && Objects.equals(operatorList, that.operatorList)
+           && Objects.equals(partitionColumnNames, that.partitionColumnNames)
            && Objects.equals(stageRowSignature, that.stageRowSignature);
   }
 
   @Override
   public int hashCode()
   {
-    return Objects.hash(query, operatorList, stageRowSignature, isEmptyOver, 
maxRowsMaterializedInWindow);
+    return Objects.hash(query, operatorList, partitionColumnNames, 
stageRowSignature, isEmptyOver, maxRowsMaterializedInWindow);
   }
 }
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryKit.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryKit.java
index d08d78ef791..3754f081a27 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryKit.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryKit.java
@@ -24,9 +24,12 @@ import com.google.common.collect.ImmutableMap;
 import org.apache.druid.frame.key.ClusterBy;
 import org.apache.druid.frame.key.KeyColumn;
 import org.apache.druid.frame.key.KeyOrder;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.msq.exec.Limits;
 import org.apache.druid.msq.input.stage.StageInputSpec;
 import org.apache.druid.msq.kernel.HashShuffleSpec;
+import org.apache.druid.msq.kernel.MixShuffleSpec;
 import org.apache.druid.msq.kernel.QueryDefinition;
 import org.apache.druid.msq.kernel.QueryDefinitionBuilder;
 import org.apache.druid.msq.kernel.ShuffleSpec;
@@ -39,6 +42,7 @@ import 
org.apache.druid.query.operator.NaiveSortOperatorFactory;
 import org.apache.druid.query.operator.OperatorFactory;
 import org.apache.druid.query.operator.WindowOperatorQuery;
 import org.apache.druid.query.operator.window.WindowOperatorFactory;
+import org.apache.druid.segment.column.ColumnType;
 import org.apache.druid.segment.column.RowSignature;
 
 import java.util.ArrayList;
@@ -48,6 +52,7 @@ import java.util.Map;
 
 public class WindowOperatorQueryKit implements QueryKit<WindowOperatorQuery>
 {
+  private static final Logger log = new Logger(WindowOperatorQueryKit.class);
   private final ObjectMapper jsonMapper;
 
   public WindowOperatorQueryKit(ObjectMapper jsonMapper)
@@ -65,13 +70,22 @@ public class WindowOperatorQueryKit implements 
QueryKit<WindowOperatorQuery>
       int minStageNumber
   )
   {
-    // need to validate query first
-    // populate the group of operators to be processed as each stage
-    // the size of the operators is the number of serialized stages
-    // later we should also check if these can be parallelized
-    // check there is an empty over clause or not
-    List<List<OperatorFactory>> operatorList = new ArrayList<>();
-    boolean isEmptyOverFound = 
ifEmptyOverPresentInWindowOperstors(originalQuery, operatorList);
+    // Need to validate query first.
+    // Populate the group of operators to be processed at each stage.
+    // The size of the operators is the number of serialized stages.
+    // Later we should also check if these can be parallelized.
+    // Check if there is an empty OVER() clause or not.
+    RowSignature rowSignature = originalQuery.getRowSignature();
+    log.info("Row signature received for query is [%s].", rowSignature);
+
+    boolean isEmptyOverPresent = originalQuery.getOperators()
+                                            .stream()
+                                            .filter(of -> of instanceof 
NaivePartitioningOperatorFactory)
+                                            .map(of -> 
(NaivePartitioningOperatorFactory) of)
+                                            .anyMatch(of -> 
of.getPartitionColumns().isEmpty());
+
+    List<List<OperatorFactory>> operatorList = 
getOperatorListFromQuery(originalQuery);
+    log.info("Created operatorList with operator factories: [%s]", 
operatorList);
 
     ShuffleSpec nextShuffleSpec = 
findShuffleSpecForNextWindow(operatorList.get(0), maxWorkerCount);
     // add this shuffle spec to the last stage of the inner query
@@ -102,16 +116,14 @@ public class WindowOperatorQueryKit implements 
QueryKit<WindowOperatorQuery>
     final int firstStageNumber = Math.max(minStageNumber, 
queryDefBuilder.getNextStageNumber());
     final WindowOperatorQuery queryToRun = (WindowOperatorQuery) 
originalQuery.withDataSource(dataSourcePlan.getNewDataSource());
     final int maxRowsMaterialized;
-    RowSignature rowSignature = queryToRun.getRowSignature();
+
     if (originalQuery.context() != null && 
originalQuery.context().containsKey(MultiStageQueryContext.MAX_ROWS_MATERIALIZED_IN_WINDOW))
 {
-      maxRowsMaterialized = (int) originalQuery.context()
-                                                         
.get(MultiStageQueryContext.MAX_ROWS_MATERIALIZED_IN_WINDOW);
+      maxRowsMaterialized = (int) 
originalQuery.context().get(MultiStageQueryContext.MAX_ROWS_MATERIALIZED_IN_WINDOW);
     } else {
       maxRowsMaterialized = Limits.MAX_ROWS_MATERIALIZED_IN_WINDOW;
     }
 
-
-    if (isEmptyOverFound) {
+    if (isEmptyOverPresent) {
       // empty over clause found
       // moving everything to a single partition
       queryDefBuilder.add(
@@ -125,28 +137,59 @@ public class WindowOperatorQueryKit implements 
QueryKit<WindowOperatorQuery>
                              queryToRun.getOperators(),
                              rowSignature,
                              true,
-                             maxRowsMaterialized
+                             maxRowsMaterialized,
+                             new ArrayList<>()
                          ))
       );
     } else {
-      // there are multiple windows present in the query
-      // Create stages for each window in the query
-      // These stages will be serialized
-      // the partition by clause of the next window will be the shuffle key 
for the previous window
+      // There are multiple windows present in the query.
+      // Create stages for each window in the query.
+      // These stages will be serialized.
+      // The partition by clause of the next window will be the shuffle key 
for the previous window.
       RowSignature.Builder bob = RowSignature.builder();
-      final int numberOfWindows = operatorList.size();
-      final int baseSize = rowSignature.size() - numberOfWindows;
-      for (int i = 0; i < baseSize; i++) {
-        bob.add(rowSignature.getColumnName(i), 
rowSignature.getColumnType(i).get());
+      RowSignature signatureFromInput = 
dataSourcePlan.getSubQueryDefBuilder().get().build().getFinalStageDefinition().getSignature();
+      log.info("Row signature received from last stage is [%s].", 
signatureFromInput);
+
+      for (int i = 0; i < signatureFromInput.getColumnNames().size(); i++) {
+        bob.add(signatureFromInput.getColumnName(i), 
signatureFromInput.getColumnType(i).get());
       }
 
-      for (int i = 0; i < numberOfWindows; i++) {
-        bob.add(rowSignature.getColumnName(baseSize + i), 
rowSignature.getColumnType(baseSize + i).get()).build();
+      List<String> partitionColumnNames = new ArrayList<>();
+
+      /*
+      operatorList is a List<List<OperatorFactory>>, where each 
List<OperatorFactory> corresponds to the operator factories
+       to be used for a different window stage.
+
+       We iterate over operatorList, and add the definition for a window stage 
to QueryDefinitionBuilder.
+       */
+      for (int i = 0; i < operatorList.size(); i++) {
+        for (OperatorFactory operatorFactory : operatorList.get(i)) {
+          if (operatorFactory instanceof WindowOperatorFactory) {
+            List<String> outputColumnNames = ((WindowOperatorFactory) 
operatorFactory).getProcessor().getOutputColumnNames();
+
+            // Need to add column names which are present in outputColumnNames 
and rowSignature but not in bob,
+            // since they need to be present in the row signature for this 
window stage.
+            for (String columnName : outputColumnNames) {
+              int indexInRowSignature = rowSignature.indexOf(columnName);
+              if (indexInRowSignature != -1 && bob.build().indexOf(columnName) 
== -1) {
+                ColumnType columnType = 
rowSignature.getColumnType(indexInRowSignature).get();
+                bob.add(columnName, columnType);
+                log.info("Added column [%s] of type [%s] to row signature for 
window stage.", columnName, columnType);
+              } else {
+                throw new ISE(
+                    "Found unexpected column [%s] already present in row 
signature [%s].",
+                    columnName,
+                    rowSignature
+                );
+              }
+            }
+          }
+        }
+
         // find the shuffle spec of the next stage
         // if it is the last stage set the next shuffle spec to single 
partition
-        if (i + 1 == numberOfWindows) {
-          nextShuffleSpec = ShuffleSpecFactories.singlePartition()
-                                                .build(ClusterBy.none(), 
false);
+        if (i + 1 == operatorList.size()) {
+          nextShuffleSpec = MixShuffleSpec.instance();
         } else {
           nextShuffleSpec = findShuffleSpecForNextWindow(operatorList.get(i + 
1), maxWorkerCount);
         }
@@ -162,6 +205,28 @@ public class WindowOperatorQueryKit implements 
QueryKit<WindowOperatorQuery>
           );
         }
 
+        log.info("Using row signature [%s] for window stage.", 
stageRowSignature);
+
+        boolean partitionOperatorExists = false;
+        List<String> currentPartitionColumns = new ArrayList<>();
+        for (OperatorFactory of : operatorList.get(i)) {
+          if (of instanceof NaivePartitioningOperatorFactory) {
+            for (String s : ((NaivePartitioningOperatorFactory) 
of).getPartitionColumns()) {
+              currentPartitionColumns.add(s);
+              partitionOperatorExists = true;
+            }
+          }
+        }
+
+        if (partitionOperatorExists) {
+          partitionColumnNames = currentPartitionColumns;
+        }
+
+        log.info(
+            "Columns which would be used to define partitioning boundaries for 
this window stage are [%s]",
+            partitionColumnNames
+        );
+
         queryDefBuilder.add(
             StageDefinition.builder(firstStageNumber + i)
                            .inputs(new StageInputSpec(firstStageNumber + i - 
1))
@@ -173,7 +238,8 @@ public class WindowOperatorQueryKit implements 
QueryKit<WindowOperatorQuery>
                                operatorList.get(i),
                                stageRowSignature,
                                false,
-                               maxRowsMaterialized
+                               maxRowsMaterialized,
+                               partitionColumnNames
                            ))
         );
       }
@@ -184,14 +250,12 @@ public class WindowOperatorQueryKit implements 
QueryKit<WindowOperatorQuery>
   /**
    *
    * @param originalQuery
-   * @param operatorList
-   * @return true if the operator List has a partitioning operator with an 
empty OVER clause, false otherwise
+   * @return A list of list of operator factories, where each list represents 
the operator factories for a particular
+   * window stage.
    */
-  private boolean ifEmptyOverPresentInWindowOperstors(
-      WindowOperatorQuery originalQuery,
-      List<List<OperatorFactory>> operatorList
-  )
+  private List<List<OperatorFactory>> 
getOperatorListFromQuery(WindowOperatorQuery originalQuery)
   {
+    List<List<OperatorFactory>> operatorList = new ArrayList<>();
     final List<OperatorFactory> operators = originalQuery.getOperators();
     List<OperatorFactory> operatorFactoryList = new ArrayList<>();
     for (OperatorFactory of : operators) {
@@ -203,18 +267,17 @@ public class WindowOperatorQueryKit implements 
QueryKit<WindowOperatorQuery>
         if (((NaivePartitioningOperatorFactory) 
of).getPartitionColumns().isEmpty()) {
           operatorList.clear();
           operatorList.add(originalQuery.getOperators());
-          return true;
+          return operatorList;
         }
       }
     }
-    return false;
+    return operatorList;
   }
 
   private ShuffleSpec findShuffleSpecForNextWindow(List<OperatorFactory> 
operatorFactories, int maxWorkerCount)
   {
     NaivePartitioningOperatorFactory partition = null;
     NaiveSortOperatorFactory sort = null;
-    List<KeyColumn> keyColsOfWindow = new ArrayList<>();
     for (OperatorFactory of : operatorFactories) {
       if (of instanceof NaivePartitioningOperatorFactory) {
         partition = (NaivePartitioningOperatorFactory) of;
@@ -222,29 +285,31 @@ public class WindowOperatorQueryKit implements 
QueryKit<WindowOperatorQuery>
         sort = (NaiveSortOperatorFactory) of;
       }
     }
-    Map<String, ColumnWithDirection.Direction> colMap = new HashMap<>();
+
+    Map<String, ColumnWithDirection.Direction> sortColumnsMap = new 
HashMap<>();
     if (sort != null) {
       for (ColumnWithDirection sortColumn : sort.getSortColumns()) {
-        colMap.put(sortColumn.getColumn(), sortColumn.getDirection());
+        sortColumnsMap.put(sortColumn.getColumn(), sortColumn.getDirection());
       }
     }
-    assert partition != null;
-    if (partition.getPartitionColumns().isEmpty()) {
+
+    if (partition == null || partition.getPartitionColumns().isEmpty()) {
+      // If operatorFactories doesn't have any partitioning factory, then we 
should keep the shuffle spec from previous stage.
+      // This indicates that we already have the data partitioned correctly, 
and hence we don't need to do any shuffling.
       return null;
     }
+
+    List<KeyColumn> keyColsOfWindow = new ArrayList<>();
     for (String partitionColumn : partition.getPartitionColumns()) {
       KeyColumn kc;
-      if (colMap.containsKey(partitionColumn)) {
-        if (colMap.get(partitionColumn) == ColumnWithDirection.Direction.ASC) {
-          kc = new KeyColumn(partitionColumn, KeyOrder.ASCENDING);
-        } else {
-          kc = new KeyColumn(partitionColumn, KeyOrder.DESCENDING);
-        }
+      if (sortColumnsMap.get(partitionColumn) == 
ColumnWithDirection.Direction.DESC) {
+        kc = new KeyColumn(partitionColumn, KeyOrder.DESCENDING);
       } else {
         kc = new KeyColumn(partitionColumn, KeyOrder.ASCENDING);
       }
       keyColsOfWindow.add(kc);
     }
+
     return new HashShuffleSpec(new ClusterBy(keyColsOfWindow, 0), 
maxWorkerCount);
   }
 }
diff --git 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessorFactoryTest.java
 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessorFactoryTest.java
new file mode 100644
index 00000000000..2049c0194ed
--- /dev/null
+++ 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessorFactoryTest.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.querykit;
+
+import nl.jqno.equalsverifier.EqualsVerifier;
+import org.junit.Test;
+
+public class WindowOperatorQueryFrameProcessorFactoryTest
+{
+  @Test
+  public void testEqualsAndHashcode()
+  {
+    EqualsVerifier.forClass(WindowOperatorQueryFrameProcessorFactory.class)
+                  .withNonnullFields("query", "operatorList", 
"stageRowSignature", "isEmptyOver", "maxRowsMaterializedInWindow", 
"partitionColumnNames")
+                  .usingGetClass()
+                  .verify();
+  }
+}
diff --git 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteMSQTestsHelper.java
 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteMSQTestsHelper.java
index 54552e5d5b0..eaa2a9efe5a 100644
--- 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteMSQTestsHelper.java
+++ 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteMSQTestsHelper.java
@@ -39,6 +39,7 @@ import org.apache.druid.guice.JoinableFactoryModule;
 import org.apache.druid.guice.annotations.Self;
 import org.apache.druid.indexing.common.SegmentCacheManagerFactory;
 import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.concurrent.Execs;
 import org.apache.druid.java.util.common.io.Closer;
 import org.apache.druid.math.expr.ExprMacroTable;
@@ -62,12 +63,14 @@ import org.apache.druid.query.groupby.GroupingEngine;
 import org.apache.druid.query.groupby.TestGroupByBuffers;
 import org.apache.druid.segment.IndexBuilder;
 import org.apache.druid.segment.IndexIO;
+import org.apache.druid.segment.IndexSpec;
 import org.apache.druid.segment.QueryableIndex;
 import org.apache.druid.segment.QueryableIndexStorageAdapter;
 import org.apache.druid.segment.Segment;
 import org.apache.druid.segment.StorageAdapter;
 import org.apache.druid.segment.TestIndex;
 import org.apache.druid.segment.column.ColumnConfig;
+import org.apache.druid.segment.incremental.IncrementalIndex;
 import org.apache.druid.segment.incremental.IncrementalIndexSchema;
 import org.apache.druid.segment.loading.DataSegmentPusher;
 import org.apache.druid.segment.loading.LocalDataSegmentPusher;
@@ -91,6 +94,7 @@ import javax.annotation.Nullable;
 import java.io.File;
 import java.util.List;
 import java.util.Set;
+import java.util.UUID;
 import java.util.function.Function;
 import java.util.function.Supplier;
 
@@ -99,6 +103,7 @@ import static 
org.apache.druid.sql.calcite.util.CalciteTests.DATASOURCE1;
 import static org.apache.druid.sql.calcite.util.CalciteTests.DATASOURCE2;
 import static org.apache.druid.sql.calcite.util.CalciteTests.DATASOURCE3;
 import static org.apache.druid.sql.calcite.util.CalciteTests.DATASOURCE5;
+import static org.apache.druid.sql.calcite.util.CalciteTests.WIKIPEDIA;
 import static 
org.apache.druid.sql.calcite.util.TestDataBuilder.INDEX_SCHEMA_LOTS_O_COLUMNS;
 import static 
org.apache.druid.sql.calcite.util.TestDataBuilder.INDEX_SCHEMA_NUMERIC_DIMS;
 import static org.apache.druid.sql.calcite.util.TestDataBuilder.ROWS1;
@@ -205,6 +210,17 @@ public class CalciteMSQTestsHelper
   {
     final QueryableIndex index;
     switch (segmentId.getDataSource()) {
+      case WIKIPEDIA:
+        try {
+          final File directory = new File(tempFolderProducer.apply("tmpDir"), 
StringUtils.format("wikipedia-index-%s", UUID.randomUUID()));
+          final IncrementalIndex incrementalIndex = 
TestIndex.makeWikipediaIncrementalIndex();
+          TestIndex.INDEX_MERGER.persist(incrementalIndex, directory, 
IndexSpec.DEFAULT, null);
+          index = TestIndex.INDEX_IO.loadIndex(directory);
+        }
+        catch (Exception e) {
+          throw new RuntimeException(e);
+        }
+        break;
       case DATASOURCE1:
         IncrementalIndexSchema foo1Schema = new 
IncrementalIndexSchema.Builder()
             .withMetrics(
diff --git 
a/processing/src/main/java/org/apache/druid/query/operator/Operator.java 
b/processing/src/main/java/org/apache/druid/query/operator/Operator.java
index a9a18c36d54..57bc1013fc4 100644
--- a/processing/src/main/java/org/apache/druid/query/operator/Operator.java
+++ b/processing/src/main/java/org/apache/druid/query/operator/Operator.java
@@ -126,7 +126,7 @@ public interface Operator
      */
     STOP,
     /**
-     * Inidcates that the downstream processing should pause its pushing of 
results and instead return a
+     * Indicates that the downstream processing should pause its pushing of 
results and instead return a
      * continuation object that encapsulates whatever state is required to 
resume processing.  When this signal is
      * received, Operators that are generating data might choose to exert 
backpressure or otherwise pause their
      * processing efforts until called again with the returned continuation 
object.
diff --git 
a/processing/src/main/java/org/apache/druid/query/operator/window/ComposingProcessor.java
 
b/processing/src/main/java/org/apache/druid/query/operator/window/ComposingProcessor.java
index a4fa74967f6..0e0fc59498c 100644
--- 
a/processing/src/main/java/org/apache/druid/query/operator/window/ComposingProcessor.java
+++ 
b/processing/src/main/java/org/apache/druid/query/operator/window/ComposingProcessor.java
@@ -23,7 +23,9 @@ import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import org.apache.druid.query.rowsandcols.RowsAndColumns;
 
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.List;
 
 public class ComposingProcessor implements Processor
 {
@@ -37,6 +39,16 @@ public class ComposingProcessor implements Processor
     this.processors = processors;
   }
 
+  @Override
+  public List<String> getOutputColumnNames()
+  {
+    List<String> outputColumnNames = new ArrayList<>();
+    for (Processor processor : processors) {
+      outputColumnNames.addAll(processor.getOutputColumnNames());
+    }
+    return outputColumnNames;
+  }
+
   @JsonProperty("processors")
   public Processor[] getProcessors()
   {
diff --git 
a/processing/src/main/java/org/apache/druid/query/operator/window/Processor.java
 
b/processing/src/main/java/org/apache/druid/query/operator/window/Processor.java
index fe8d125cbdf..b271d3064ef 100644
--- 
a/processing/src/main/java/org/apache/druid/query/operator/window/Processor.java
+++ 
b/processing/src/main/java/org/apache/druid/query/operator/window/Processor.java
@@ -31,6 +31,8 @@ import 
org.apache.druid.query.operator.window.value.WindowLastProcessor;
 import org.apache.druid.query.operator.window.value.WindowOffsetProcessor;
 import org.apache.druid.query.rowsandcols.RowsAndColumns;
 
+import java.util.List;
+
 /**
  * A Processor is a bit of logic that processes a single RowsAndColumns object 
to produce a new RowsAndColumns
  * object.  Generally speaking, it is used to add or alter columns in a 
batch-oriented fashion.
@@ -80,4 +82,9 @@ public interface Processor
    * @return boolean identifying if these processors should be considered 
equivalent to each other.
    */
   boolean validateEquivalent(Processor otherProcessor);
+
+  /**
+   * @return List of output column names for the Processor.
+   */
+  List<String> getOutputColumnNames();
 }
diff --git 
a/processing/src/main/java/org/apache/druid/query/operator/window/WindowFramedAggregateProcessor.java
 
b/processing/src/main/java/org/apache/druid/query/operator/window/WindowFramedAggregateProcessor.java
index 3545c3740f4..41baced4e61 100644
--- 
a/processing/src/main/java/org/apache/druid/query/operator/window/WindowFramedAggregateProcessor.java
+++ 
b/processing/src/main/java/org/apache/druid/query/operator/window/WindowFramedAggregateProcessor.java
@@ -27,7 +27,9 @@ import 
org.apache.druid.query.rowsandcols.semantic.DefaultFramedOnHeapAggregatab
 import org.apache.druid.query.rowsandcols.semantic.FramedOnHeapAggregatable;
 
 import javax.annotation.Nullable;
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.List;
 import java.util.Objects;
 
 public class WindowFramedAggregateProcessor implements Processor
@@ -45,6 +47,16 @@ public class WindowFramedAggregateProcessor implements 
Processor
   private final WindowFrame frame;
   private final AggregatorFactory[] aggregations;
 
+  @Override
+  public List<String> getOutputColumnNames()
+  {
+    List<String> outputColumnNames = new ArrayList<>();
+    for (AggregatorFactory aggregation : aggregations) {
+      outputColumnNames.add(aggregation.getName());
+    }
+    return outputColumnNames;
+  }
+
   @JsonCreator
   public WindowFramedAggregateProcessor(
       @JsonProperty("frame") WindowFrame frame,
diff --git 
a/processing/src/main/java/org/apache/druid/query/operator/window/ranking/WindowPercentileProcessor.java
 
b/processing/src/main/java/org/apache/druid/query/operator/window/ranking/WindowPercentileProcessor.java
index 541c1399e36..b7f77d50969 100644
--- 
a/processing/src/main/java/org/apache/druid/query/operator/window/ranking/WindowPercentileProcessor.java
+++ 
b/processing/src/main/java/org/apache/druid/query/operator/window/ranking/WindowPercentileProcessor.java
@@ -28,12 +28,20 @@ import 
org.apache.druid.query.rowsandcols.column.IntArrayColumn;
 import org.apache.druid.query.rowsandcols.semantic.AppendableRowsAndColumns;
 
 import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
 
 public class WindowPercentileProcessor implements Processor
 {
   private final int numBuckets;
   private final String outputColumn;
 
+  @Override
+  public List<String> getOutputColumnNames()
+  {
+    return Collections.singletonList(outputColumn);
+  }
+
   @JsonCreator
   public WindowPercentileProcessor(
       @JsonProperty("outputColumn") String outputColumn,
diff --git 
a/processing/src/main/java/org/apache/druid/query/operator/window/ranking/WindowRankingProcessorBase.java
 
b/processing/src/main/java/org/apache/druid/query/operator/window/ranking/WindowRankingProcessorBase.java
index fb5bedf9519..4e026cbdd3d 100644
--- 
a/processing/src/main/java/org/apache/druid/query/operator/window/ranking/WindowRankingProcessorBase.java
+++ 
b/processing/src/main/java/org/apache/druid/query/operator/window/ranking/WindowRankingProcessorBase.java
@@ -27,6 +27,7 @@ import 
org.apache.druid.query.rowsandcols.semantic.AppendableRowsAndColumns;
 import org.apache.druid.query.rowsandcols.semantic.ClusteredGroupPartitioner;
 import 
org.apache.druid.query.rowsandcols.semantic.DefaultClusteredGroupPartitioner;
 
+import java.util.Collections;
 import java.util.List;
 import java.util.Objects;
 import java.util.function.Function;
@@ -124,4 +125,9 @@ public abstract class WindowRankingProcessorBase implements 
Processor
     return Objects.equals(groupingCols, other.groupingCols) && 
Objects.equals(outputColumn, other.outputColumn);
   }
 
+  @Override
+  public List<String> getOutputColumnNames()
+  {
+    return Collections.singletonList(outputColumn);
+  }
 }
diff --git 
a/processing/src/main/java/org/apache/druid/query/operator/window/ranking/WindowRowNumberProcessor.java
 
b/processing/src/main/java/org/apache/druid/query/operator/window/ranking/WindowRowNumberProcessor.java
index 7821e3fd53b..98b09b6f80d 100644
--- 
a/processing/src/main/java/org/apache/druid/query/operator/window/ranking/WindowRowNumberProcessor.java
+++ 
b/processing/src/main/java/org/apache/druid/query/operator/window/ranking/WindowRowNumberProcessor.java
@@ -28,6 +28,9 @@ import 
org.apache.druid.query.rowsandcols.column.ColumnAccessorBasedColumn;
 import org.apache.druid.query.rowsandcols.semantic.AppendableRowsAndColumns;
 import org.apache.druid.segment.column.ColumnType;
 
+import java.util.Collections;
+import java.util.List;
+
 public class WindowRowNumberProcessor implements Processor
 {
   private final String outputColumn;
@@ -128,4 +131,10 @@ public class WindowRowNumberProcessor implements Processor
            "outputColumn='" + outputColumn + '\'' +
            '}';
   }
+
+  @Override
+  public List<String> getOutputColumnNames()
+  {
+    return Collections.singletonList(outputColumn);
+  }
 }
diff --git 
a/processing/src/main/java/org/apache/druid/query/operator/window/value/WindowValueProcessorBase.java
 
b/processing/src/main/java/org/apache/druid/query/operator/window/value/WindowValueProcessorBase.java
index 2e084ae983a..93a7ccd9a5b 100644
--- 
a/processing/src/main/java/org/apache/druid/query/operator/window/value/WindowValueProcessorBase.java
+++ 
b/processing/src/main/java/org/apache/druid/query/operator/window/value/WindowValueProcessorBase.java
@@ -26,6 +26,8 @@ import org.apache.druid.query.rowsandcols.RowsAndColumns;
 import org.apache.druid.query.rowsandcols.column.Column;
 import org.apache.druid.query.rowsandcols.semantic.AppendableRowsAndColumns;
 
+import java.util.Collections;
+import java.util.List;
 import java.util.function.Function;
 
 public abstract class WindowValueProcessorBase implements Processor
@@ -100,4 +102,10 @@ public abstract class WindowValueProcessorBase implements 
Processor
     return "inputColumn=" + inputColumn +
            ", outputColumn='" + outputColumn + '\'';
   }
+
+  @Override
+  public List<String> getOutputColumnNames()
+  {
+    return Collections.singletonList(outputColumn);
+  }
 }
diff --git 
a/processing/src/test/java/org/apache/druid/query/operator/WindowProcessorOperatorTest.java
 
b/processing/src/test/java/org/apache/druid/query/operator/WindowProcessorOperatorTest.java
index 9cce74cb98c..c11a50cf5cb 100644
--- 
a/processing/src/test/java/org/apache/druid/query/operator/WindowProcessorOperatorTest.java
+++ 
b/processing/src/test/java/org/apache/druid/query/operator/WindowProcessorOperatorTest.java
@@ -27,6 +27,9 @@ import 
org.apache.druid.query.rowsandcols.column.IntArrayColumn;
 import org.junit.Assert;
 import org.junit.Test;
 
+import java.util.Collections;
+import java.util.List;
+
 public class WindowProcessorOperatorTest
 {
   @Test
@@ -53,6 +56,12 @@ public class WindowProcessorOperatorTest
           {
             return true;
           }
+
+          @Override
+          public List<String> getOutputColumnNames()
+          {
+            return Collections.emptyList();
+          }
         },
         InlineScanOperator.make(rac)
     );
diff --git 
a/processing/src/test/java/org/apache/druid/query/operator/window/ComposingProcessorTest.java
 
b/processing/src/test/java/org/apache/druid/query/operator/window/ComposingProcessorTest.java
index 570cba65d92..d8f4599eb1a 100644
--- 
a/processing/src/test/java/org/apache/druid/query/operator/window/ComposingProcessorTest.java
+++ 
b/processing/src/test/java/org/apache/druid/query/operator/window/ComposingProcessorTest.java
@@ -23,6 +23,9 @@ import org.apache.druid.query.rowsandcols.RowsAndColumns;
 import org.junit.Assert;
 import org.junit.Test;
 
+import java.util.Collections;
+import java.util.List;
+
 public class ComposingProcessorTest
 {
   @Test
@@ -32,6 +35,7 @@ public class ComposingProcessorTest
     final ProcessorForTesting secondProcessor = new ProcessorForTesting();
 
     ComposingProcessor proc = new ComposingProcessor(firstProcessor, 
secondProcessor);
+    Assert.assertTrue(proc.getOutputColumnNames().isEmpty());
 
     proc.process(null);
     Assert.assertEquals(1, firstProcessor.processCounter);
@@ -70,5 +74,11 @@ public class ComposingProcessorTest
       ++validateCounter;
       return validationResult;
     }
+
+    @Override
+    public List<String> getOutputColumnNames()
+    {
+      return Collections.emptyList();
+    }
   }
 }
diff --git 
a/processing/src/test/java/org/apache/druid/query/operator/window/WindowFramedAggregateProcessorTest.java
 
b/processing/src/test/java/org/apache/druid/query/operator/window/WindowFramedAggregateProcessorTest.java
index 88d79c87cdb..5af321b53c8 100644
--- 
a/processing/src/test/java/org/apache/druid/query/operator/window/WindowFramedAggregateProcessorTest.java
+++ 
b/processing/src/test/java/org/apache/druid/query/operator/window/WindowFramedAggregateProcessorTest.java
@@ -19,6 +19,7 @@
 
 package org.apache.druid.query.operator.window;
 
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import nl.jqno.equalsverifier.EqualsVerifier;
 import org.apache.druid.common.config.NullHandling;
@@ -51,6 +52,7 @@ public class WindowFramedAggregateProcessorTest
         new DoubleSumAggregatorFactory("cummSum", "doubleCol")
     };
     WindowFramedAggregateProcessor proc = new 
WindowFramedAggregateProcessor(theFrame, theAggs);
+    Assert.assertEquals(ImmutableList.of("cummMax", "cummSum"), 
proc.getOutputColumnNames());
 
     final MapOfColumnsRowsAndColumns rac = 
MapOfColumnsRowsAndColumns.fromMap(ImmutableMap.of(
         "yay", new IntArrayColumn(new int[]{1, 2, 3})
diff --git 
a/processing/src/test/java/org/apache/druid/query/operator/window/ranking/WindowCumeDistProcessorTest.java
 
b/processing/src/test/java/org/apache/druid/query/operator/window/ranking/WindowCumeDistProcessorTest.java
index f5914e4f5db..877c7841549 100644
--- 
a/processing/src/test/java/org/apache/druid/query/operator/window/ranking/WindowCumeDistProcessorTest.java
+++ 
b/processing/src/test/java/org/apache/druid/query/operator/window/ranking/WindowCumeDistProcessorTest.java
@@ -25,6 +25,7 @@ import 
org.apache.druid.query.rowsandcols.MapOfColumnsRowsAndColumns;
 import org.apache.druid.query.rowsandcols.RowsAndColumns;
 import org.apache.druid.query.rowsandcols.column.Column;
 import org.apache.druid.query.rowsandcols.column.IntArrayColumn;
+import org.junit.Assert;
 import org.junit.Test;
 
 import java.util.Collections;
@@ -42,6 +43,7 @@ public class WindowCumeDistProcessorTest
     MapOfColumnsRowsAndColumns rac = MapOfColumnsRowsAndColumns.fromMap(map);
 
     Processor processor = new 
WindowCumeDistProcessor(Collections.singletonList("vals"), "CumeDist");
+    Assert.assertEquals(Collections.singletonList("CumeDist"), 
processor.getOutputColumnNames());
 
     final RowsAndColumnsHelper expectations = new RowsAndColumnsHelper()
         .expectColumn("vals", new int[]{7, 18, 18, 30, 120, 121, 122, 122, 
8290, 8290})
diff --git 
a/processing/src/test/java/org/apache/druid/query/operator/window/ranking/WindowDenseRankProcessorTest.java
 
b/processing/src/test/java/org/apache/druid/query/operator/window/ranking/WindowDenseRankProcessorTest.java
index e165f46f074..86580e5bd2f 100644
--- 
a/processing/src/test/java/org/apache/druid/query/operator/window/ranking/WindowDenseRankProcessorTest.java
+++ 
b/processing/src/test/java/org/apache/druid/query/operator/window/ranking/WindowDenseRankProcessorTest.java
@@ -25,6 +25,7 @@ import 
org.apache.druid.query.rowsandcols.MapOfColumnsRowsAndColumns;
 import org.apache.druid.query.rowsandcols.RowsAndColumns;
 import org.apache.druid.query.rowsandcols.column.Column;
 import org.apache.druid.query.rowsandcols.column.IntArrayColumn;
+import org.junit.Assert;
 import org.junit.Test;
 
 import java.util.Collections;
@@ -42,6 +43,7 @@ public class WindowDenseRankProcessorTest
     MapOfColumnsRowsAndColumns rac = MapOfColumnsRowsAndColumns.fromMap(map);
 
     Processor processor = new 
WindowDenseRankProcessor(Collections.singletonList("vals"), "DenseRank");
+    Assert.assertEquals(Collections.singletonList("DenseRank"), 
processor.getOutputColumnNames());
 
     final RowsAndColumnsHelper expectations = new RowsAndColumnsHelper()
         .expectColumn("vals", new int[]{7, 18, 18, 30, 120, 121, 122, 122, 
8290, 8290})
diff --git 
a/processing/src/test/java/org/apache/druid/query/operator/window/ranking/WindowPercentileProcessorTest.java
 
b/processing/src/test/java/org/apache/druid/query/operator/window/ranking/WindowPercentileProcessorTest.java
index c38cd2a245c..bf5bb727b0a 100644
--- 
a/processing/src/test/java/org/apache/druid/query/operator/window/ranking/WindowPercentileProcessorTest.java
+++ 
b/processing/src/test/java/org/apache/druid/query/operator/window/ranking/WindowPercentileProcessorTest.java
@@ -19,6 +19,7 @@
 
 package org.apache.druid.query.operator.window.ranking;
 
+import com.google.common.collect.ImmutableList;
 import org.apache.druid.query.operator.window.ComposingProcessor;
 import org.apache.druid.query.operator.window.Processor;
 import org.apache.druid.query.operator.window.RowsAndColumnsHelper;
@@ -29,6 +30,7 @@ import 
org.apache.druid.query.rowsandcols.column.DoubleArrayColumn;
 import org.apache.druid.query.rowsandcols.column.IntArrayColumn;
 import org.apache.druid.query.rowsandcols.column.ObjectArrayColumn;
 import org.apache.druid.segment.column.ColumnType;
+import org.junit.Assert;
 import org.junit.Test;
 
 import java.util.LinkedHashMap;
@@ -63,6 +65,11 @@ public class WindowPercentileProcessorTest
         new WindowPercentileProcessor("10292", 10292)
     );
 
+    Assert.assertEquals(
+        ImmutableList.of("1", "2", "3", "4", "5", "6", "7", "8", "9", "10", 
"10292"),
+        processor.getOutputColumnNames()
+    );
+
     final RowsAndColumnsHelper expectations = new RowsAndColumnsHelper()
         .expectColumn("intCol", new int[]{88, 1, 2, 3, 4, 5, 6, 7, 8, 9})
         .expectColumn("doubleCol", new double[]{0.4728, 1, 2, 3, 4, 5, 6, 7, 
8, 9})
diff --git 
a/processing/src/test/java/org/apache/druid/query/operator/window/ranking/WindowRankProcessorTest.java
 
b/processing/src/test/java/org/apache/druid/query/operator/window/ranking/WindowRankProcessorTest.java
index 59c7dd6df36..b7f281c423e 100644
--- 
a/processing/src/test/java/org/apache/druid/query/operator/window/ranking/WindowRankProcessorTest.java
+++ 
b/processing/src/test/java/org/apache/druid/query/operator/window/ranking/WindowRankProcessorTest.java
@@ -19,6 +19,7 @@
 
 package org.apache.druid.query.operator.window.ranking;
 
+import com.google.common.collect.ImmutableList;
 import org.apache.druid.query.operator.window.ComposingProcessor;
 import org.apache.druid.query.operator.window.Processor;
 import org.apache.druid.query.operator.window.RowsAndColumnsHelper;
@@ -26,6 +27,7 @@ import 
org.apache.druid.query.rowsandcols.MapOfColumnsRowsAndColumns;
 import org.apache.druid.query.rowsandcols.RowsAndColumns;
 import org.apache.druid.query.rowsandcols.column.Column;
 import org.apache.druid.query.rowsandcols.column.IntArrayColumn;
+import org.junit.Assert;
 import org.junit.Test;
 
 import java.util.Collections;
@@ -49,6 +51,8 @@ public class WindowRankProcessorTest
         new WindowRankProcessor(orderingCols, "rankAsPercent", true)
     );
 
+    Assert.assertEquals(ImmutableList.of("rank", "rankAsPercent"), 
processor.getOutputColumnNames());
+
     final RowsAndColumnsHelper expectations = new RowsAndColumnsHelper()
         .expectColumn("vals", new int[]{7, 18, 18, 30, 120, 121, 122, 122, 
8290, 8290})
         .expectColumn("rank", new int[]{1, 2, 2, 4, 5, 6, 7, 7, 9, 9})
diff --git 
a/processing/src/test/java/org/apache/druid/query/operator/window/ranking/WindowRowNumberProcessorTest.java
 
b/processing/src/test/java/org/apache/druid/query/operator/window/ranking/WindowRowNumberProcessorTest.java
index 937fea7c360..f4f9b5bfeee 100644
--- 
a/processing/src/test/java/org/apache/druid/query/operator/window/ranking/WindowRowNumberProcessorTest.java
+++ 
b/processing/src/test/java/org/apache/druid/query/operator/window/ranking/WindowRowNumberProcessorTest.java
@@ -28,8 +28,10 @@ import 
org.apache.druid.query.rowsandcols.column.DoubleArrayColumn;
 import org.apache.druid.query.rowsandcols.column.IntArrayColumn;
 import org.apache.druid.query.rowsandcols.column.ObjectArrayColumn;
 import org.apache.druid.segment.column.ColumnType;
+import org.junit.Assert;
 import org.junit.Test;
 
+import java.util.Collections;
 import java.util.LinkedHashMap;
 import java.util.Map;
 
@@ -49,6 +51,7 @@ public class WindowRowNumberProcessorTest
     MapOfColumnsRowsAndColumns rac = MapOfColumnsRowsAndColumns.fromMap(map);
 
     Processor processor = new WindowRowNumberProcessor("rowRow");
+    Assert.assertEquals(Collections.singletonList("rowRow"), 
processor.getOutputColumnNames());
 
     final RowsAndColumnsHelper expectations = new RowsAndColumnsHelper()
         .expectColumn("intCol", new int[]{88, 1, 2, 3, 4, 5, 6, 7, 8, 9})
diff --git 
a/processing/src/test/java/org/apache/druid/query/operator/window/value/WindowFirstProcessorTest.java
 
b/processing/src/test/java/org/apache/druid/query/operator/window/value/WindowFirstProcessorTest.java
index 67242f05503..eb6caa10a0b 100644
--- 
a/processing/src/test/java/org/apache/druid/query/operator/window/value/WindowFirstProcessorTest.java
+++ 
b/processing/src/test/java/org/apache/druid/query/operator/window/value/WindowFirstProcessorTest.java
@@ -19,6 +19,7 @@
 
 package org.apache.druid.query.operator.window.value;
 
+import com.google.common.collect.ImmutableList;
 import org.apache.druid.query.operator.window.ComposingProcessor;
 import org.apache.druid.query.operator.window.RowsAndColumnsHelper;
 import org.apache.druid.query.rowsandcols.MapOfColumnsRowsAndColumns;
@@ -28,6 +29,7 @@ import 
org.apache.druid.query.rowsandcols.column.DoubleArrayColumn;
 import org.apache.druid.query.rowsandcols.column.IntArrayColumn;
 import org.apache.druid.query.rowsandcols.column.ObjectArrayColumn;
 import org.apache.druid.segment.column.ColumnType;
+import org.junit.Assert;
 import org.junit.Test;
 
 import java.util.LinkedHashMap;
@@ -59,6 +61,11 @@ public class WindowFirstProcessorTest
         new WindowFirstProcessor("nullFirstCol", "NullFirstCol")
     );
 
+    Assert.assertEquals(
+        ImmutableList.of("FirstIntCol", "FirstDoubleCol", "FirstObjectCol", 
"NullFirstCol"),
+        processor.getOutputColumnNames()
+    );
+
     final RowsAndColumnsHelper expectations = new RowsAndColumnsHelper()
         .expectColumn("intCol", new int[]{88, 1, 2, 3, 4, 5, 6, 7, 8, 9})
         .expectColumn("doubleCol", new double[]{0.4728, 1, 2, 3, 4, 5, 6, 7, 
8, 9})
diff --git 
a/processing/src/test/java/org/apache/druid/query/operator/window/value/WindowLastProcessorTest.java
 
b/processing/src/test/java/org/apache/druid/query/operator/window/value/WindowLastProcessorTest.java
index 5aa212b6acb..1910401f34a 100644
--- 
a/processing/src/test/java/org/apache/druid/query/operator/window/value/WindowLastProcessorTest.java
+++ 
b/processing/src/test/java/org/apache/druid/query/operator/window/value/WindowLastProcessorTest.java
@@ -19,6 +19,7 @@
 
 package org.apache.druid.query.operator.window.value;
 
+import com.google.common.collect.ImmutableList;
 import org.apache.druid.query.operator.window.ComposingProcessor;
 import org.apache.druid.query.operator.window.RowsAndColumnsHelper;
 import org.apache.druid.query.rowsandcols.MapOfColumnsRowsAndColumns;
@@ -28,6 +29,7 @@ import 
org.apache.druid.query.rowsandcols.column.DoubleArrayColumn;
 import org.apache.druid.query.rowsandcols.column.IntArrayColumn;
 import org.apache.druid.query.rowsandcols.column.ObjectArrayColumn;
 import org.apache.druid.segment.column.ColumnType;
+import org.junit.Assert;
 import org.junit.Test;
 
 import java.util.LinkedHashMap;
@@ -58,6 +60,10 @@ public class WindowLastProcessorTest
         new WindowLastProcessor("objectCol", "LastObjectCol"),
         new WindowLastProcessor("nullLastCol", "NullLastCol")
     );
+    Assert.assertEquals(
+        ImmutableList.of("LastIntCol", "LastDoubleCol", "LastObjectCol", 
"NullLastCol"),
+        processor.getOutputColumnNames()
+    );
 
 
     final RowsAndColumnsHelper expectations = new RowsAndColumnsHelper()
diff --git 
a/sql/src/test/java/org/apache/druid/sql/calcite/DrillWindowQueryTest.java 
b/sql/src/test/java/org/apache/druid/sql/calcite/DrillWindowQueryTest.java
index cb7bed7e041..4e958383945 100644
--- a/sql/src/test/java/org/apache/druid/sql/calcite/DrillWindowQueryTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/calcite/DrillWindowQueryTest.java
@@ -7533,4 +7533,78 @@ public class DrillWindowQueryTest extends 
BaseCalciteQueryTest
   {
     windowQueryTest();
   }
+
+  /*
+  Druid query tests
+   */
+
+  @DrillTest("druid_queries/same_window_across_columns/wikipedia_query_1")
+  @Test
+  public void test_same_window_wikipedia_query_1()
+  {
+    windowQueryTest();
+  }
+
+  
@DrillTest("druid_queries/same_window_across_columns/wikipedia_query_1_named_window")
+  @Test
+  public void test_same_window_wikipedia_query_1_named_window()
+  {
+    windowQueryTest();
+  }
+
+  @DrillTest("druid_queries/multiple_windows/wikipedia_query_1")
+  @Test
+  public void test_multiple_windows_wikipedia_query_1()
+  {
+    windowQueryTest();
+  }
+
+  @DrillTest("druid_queries/multiple_windows/wikipedia_query_1_named_windows")
+  @Test
+  public void test_multiple_windows_wikipedia_query_1_named_windows()
+  {
+    windowQueryTest();
+  }
+
+  @DrillTest("druid_queries/shuffle_columns/wikipedia_query_1")
+  @Test
+  public void test_shuffle_columns_wikipedia_query_1()
+  {
+    windowQueryTest();
+  }
+
+  @DrillTest("druid_queries/shuffle_columns/wikipedia_query_1_shuffle_1")
+  @Test
+  public void test_shuffle_columns_wikipedia_query_1_shuffle_1()
+  {
+    windowQueryTest();
+  }
+
+  @DrillTest("druid_queries/shuffle_columns/wikipedia_query_2")
+  @Test
+  public void test_shuffle_columns_wikipedia_query_2()
+  {
+    windowQueryTest();
+  }
+
+  @DrillTest("druid_queries/shuffle_columns/wikipedia_query_2_shuffle_1")
+  @Test
+  public void test_shuffle_columns_wikipedia_query_2_shuffle_1()
+  {
+    windowQueryTest();
+  }
+
+  @DrillTest("druid_queries/partition_by_multiple_columns/wikipedia_query_1")
+  @Test
+  public void test_partition_by_multiple_columns_wikipedia_query_1()
+  {
+    windowQueryTest();
+  }
+
+  @DrillTest("druid_queries/partition_by_multiple_columns/wikipedia_query_2")
+  @Test
+  public void test_partition_by_multiple_columns_wikipedia_query_2()
+  {
+    windowQueryTest();
+  }
 }
diff --git 
a/sql/src/test/resources/drill/window/queries/druid_queries/multiple_windows/wikipedia_query_1.e
 
b/sql/src/test/resources/drill/window/queries/druid_queries/multiple_windows/wikipedia_query_1.e
new file mode 100644
index 00000000000..3625be892e2
--- /dev/null
+++ 
b/sql/src/test/resources/drill/window/queries/druid_queries/multiple_windows/wikipedia_query_1.e
@@ -0,0 +1,13 @@
+null   Austria 1       1
+null   Republic of Korea       1       2
+null   Republic of Korea       2       3
+null   Republic of Korea       3       4
+Horsching      Austria 2       1
+Jeonju Republic of Korea       4       1
+Seongnam-si    Republic of Korea       5       1
+Seoul  Republic of Korea       6       1
+Suwon-si       Republic of Korea       7       1
+Vienna Austria 3       1
+Vienna Austria 4       2
+Vienna Austria 5       3
+Yongsan-dong   Republic of Korea       8       1
diff --git 
a/sql/src/test/resources/drill/window/queries/druid_queries/multiple_windows/wikipedia_query_1.q
 
b/sql/src/test/resources/drill/window/queries/druid_queries/multiple_windows/wikipedia_query_1.q
new file mode 100644
index 00000000000..d61a33e401f
--- /dev/null
+++ 
b/sql/src/test/resources/drill/window/queries/druid_queries/multiple_windows/wikipedia_query_1.q
@@ -0,0 +1,6 @@
+select cityName, countryName,
+row_number() over (partition by countryName order by countryName, cityName, 
channel) as c1,
+count(channel) over (partition by cityName order by countryName, cityName, 
channel) as c2
+from wikipedia
+where countryName in ('Austria', 'Republic of Korea')
+group by countryName, cityName, channel
diff --git 
a/sql/src/test/resources/drill/window/queries/druid_queries/multiple_windows/wikipedia_query_1_named_windows.e
 
b/sql/src/test/resources/drill/window/queries/druid_queries/multiple_windows/wikipedia_query_1_named_windows.e
new file mode 100644
index 00000000000..3625be892e2
--- /dev/null
+++ 
b/sql/src/test/resources/drill/window/queries/druid_queries/multiple_windows/wikipedia_query_1_named_windows.e
@@ -0,0 +1,13 @@
+null   Austria 1       1
+null   Republic of Korea       1       2
+null   Republic of Korea       2       3
+null   Republic of Korea       3       4
+Horsching      Austria 2       1
+Jeonju Republic of Korea       4       1
+Seongnam-si    Republic of Korea       5       1
+Seoul  Republic of Korea       6       1
+Suwon-si       Republic of Korea       7       1
+Vienna Austria 3       1
+Vienna Austria 4       2
+Vienna Austria 5       3
+Yongsan-dong   Republic of Korea       8       1
diff --git 
a/sql/src/test/resources/drill/window/queries/druid_queries/multiple_windows/wikipedia_query_1_named_windows.q
 
b/sql/src/test/resources/drill/window/queries/druid_queries/multiple_windows/wikipedia_query_1_named_windows.q
new file mode 100644
index 00000000000..12739d58ceb
--- /dev/null
+++ 
b/sql/src/test/resources/drill/window/queries/druid_queries/multiple_windows/wikipedia_query_1_named_windows.q
@@ -0,0 +1,9 @@
+select cityName, countryName,
+row_number() over w1 as c1,
+count(channel) over w2 as c2
+from wikipedia
+where countryName in ('Austria', 'Republic of Korea')
+group by countryName, cityName, channel
+WINDOW
+       w1 AS (partition by countryName order by countryName, cityName, 
channel),
+       w2 AS (partition by cityName order by countryName, cityName, channel)
diff --git 
a/sql/src/test/resources/drill/window/queries/druid_queries/partition_by_multiple_columns/wikipedia_query_1.e
 
b/sql/src/test/resources/drill/window/queries/druid_queries/partition_by_multiple_columns/wikipedia_query_1.e
new file mode 100644
index 00000000000..36812a418ae
--- /dev/null
+++ 
b/sql/src/test/resources/drill/window/queries/druid_queries/partition_by_multiple_columns/wikipedia_query_1.e
@@ -0,0 +1,15 @@
+Austria        null    94      7
+Austria        null    4685    7
+Austria        null    14      7
+Austria        null    0       7
+Austria        null    272     7
+Austria        null    0       7
+Austria        null    6979    7
+Guatemala      null    0       1
+Guatemala      El Salvador     1       1
+Guatemala      Guatemala City  173     1
+Austria        Horsching       0       1
+Austria        Vienna  93      4
+Austria        Vienna  72      4
+Austria        Vienna  0       4
+Austria        Vienna  0       4
diff --git 
a/sql/src/test/resources/drill/window/queries/druid_queries/partition_by_multiple_columns/wikipedia_query_1.q
 
b/sql/src/test/resources/drill/window/queries/druid_queries/partition_by_multiple_columns/wikipedia_query_1.q
new file mode 100644
index 00000000000..5d0dd075678
--- /dev/null
+++ 
b/sql/src/test/resources/drill/window/queries/druid_queries/partition_by_multiple_columns/wikipedia_query_1.q
@@ -0,0 +1,7 @@
+SELECT
+countryName,
+cityName,
+added,
+count(added) OVER (PARTITION BY countryName, cityName)
+FROM "wikipedia"
+where countryName in ('Guatemala', 'Austria')
diff --git 
a/sql/src/test/resources/drill/window/queries/druid_queries/partition_by_multiple_columns/wikipedia_query_2.e
 
b/sql/src/test/resources/drill/window/queries/druid_queries/partition_by_multiple_columns/wikipedia_query_2.e
new file mode 100644
index 00000000000..a1b94f5a865
--- /dev/null
+++ 
b/sql/src/test/resources/drill/window/queries/druid_queries/partition_by_multiple_columns/wikipedia_query_2.e
@@ -0,0 +1,15 @@
+Austria        null    0       7       12044   1
+Austria        null    0       7       12044   2
+Austria        null    14      7       12044   1
+Austria        null    94      7       12044   1
+Austria        null    272     7       12044   1
+Austria        null    4685    7       12044   1
+Austria        null    6979    7       12044   1
+Guatemala      null    0       1       0       1
+Guatemala      El Salvador     1       1       1       1
+Guatemala      Guatemala City  173     1       173     1
+Austria        Horsching       0       1       0       1
+Austria        Vienna  0       4       165     1
+Austria        Vienna  0       4       165     2
+Austria        Vienna  72      4       165     1
+Austria        Vienna  93      4       165     1
diff --git 
a/sql/src/test/resources/drill/window/queries/druid_queries/partition_by_multiple_columns/wikipedia_query_2.q
 
b/sql/src/test/resources/drill/window/queries/druid_queries/partition_by_multiple_columns/wikipedia_query_2.q
new file mode 100644
index 00000000000..b1a594beeda
--- /dev/null
+++ 
b/sql/src/test/resources/drill/window/queries/druid_queries/partition_by_multiple_columns/wikipedia_query_2.q
@@ -0,0 +1,9 @@
+SELECT
+countryName,
+cityName,
+added,
+count(added) OVER (PARTITION BY countryName, cityName),
+sum(added) OVER (PARTITION BY countryName, cityName),
+ROW_NUMBER() OVER (PARTITION BY countryName, cityName, added)
+FROM "wikipedia"
+where countryName in ('Guatemala', 'Austria')
diff --git 
a/sql/src/test/resources/drill/window/queries/druid_queries/same_window_across_columns/wikipedia_query_1.e
 
b/sql/src/test/resources/drill/window/queries/druid_queries/same_window_across_columns/wikipedia_query_1.e
new file mode 100644
index 00000000000..0dfb6a832b8
--- /dev/null
+++ 
b/sql/src/test/resources/drill/window/queries/druid_queries/same_window_across_columns/wikipedia_query_1.e
@@ -0,0 +1,15 @@
+Austria        162     12047   12209
+Austria        162     12047   12209
+Austria        162     12047   12209
+Austria        162     12047   12209
+Austria        162     12047   12209
+Austria        162     12047   12209
+Austria        162     12047   12209
+Austria        162     12047   12209
+Austria        162     12047   12209
+Austria        162     12047   12209
+Austria        162     12047   12209
+Austria        162     12047   12209
+Guatemala      167     7       174
+Guatemala      167     7       174
+Guatemala      167     7       174
diff --git 
a/sql/src/test/resources/drill/window/queries/druid_queries/same_window_across_columns/wikipedia_query_1.q
 
b/sql/src/test/resources/drill/window/queries/druid_queries/same_window_across_columns/wikipedia_query_1.q
new file mode 100644
index 00000000000..dcb83c09c23
--- /dev/null
+++ 
b/sql/src/test/resources/drill/window/queries/druid_queries/same_window_across_columns/wikipedia_query_1.q
@@ -0,0 +1,6 @@
+SELECT countryName,
+sum("deleted")  OVER (PARTITION BY countryName) as count_c3,
+sum(delta)  OVER (PARTITION BY countryName) as count_c1,
+sum(added)  OVER (PARTITION BY countryName) as count_c2
+FROM "wikipedia"
+where countryName in ('Guatemala', 'Austria')
diff --git 
a/sql/src/test/resources/drill/window/queries/druid_queries/same_window_across_columns/wikipedia_query_1_named_window.e
 
b/sql/src/test/resources/drill/window/queries/druid_queries/same_window_across_columns/wikipedia_query_1_named_window.e
new file mode 100644
index 00000000000..0dfb6a832b8
--- /dev/null
+++ 
b/sql/src/test/resources/drill/window/queries/druid_queries/same_window_across_columns/wikipedia_query_1_named_window.e
@@ -0,0 +1,15 @@
+Austria        162     12047   12209
+Austria        162     12047   12209
+Austria        162     12047   12209
+Austria        162     12047   12209
+Austria        162     12047   12209
+Austria        162     12047   12209
+Austria        162     12047   12209
+Austria        162     12047   12209
+Austria        162     12047   12209
+Austria        162     12047   12209
+Austria        162     12047   12209
+Austria        162     12047   12209
+Guatemala      167     7       174
+Guatemala      167     7       174
+Guatemala      167     7       174
diff --git 
a/sql/src/test/resources/drill/window/queries/druid_queries/same_window_across_columns/wikipedia_query_1_named_window.q
 
b/sql/src/test/resources/drill/window/queries/druid_queries/same_window_across_columns/wikipedia_query_1_named_window.q
new file mode 100644
index 00000000000..adb9287d378
--- /dev/null
+++ 
b/sql/src/test/resources/drill/window/queries/druid_queries/same_window_across_columns/wikipedia_query_1_named_window.q
@@ -0,0 +1,7 @@
+SELECT countryName,
+sum("deleted")  OVER w as count_c3,
+sum(delta)  OVER w as count_c1,
+sum(added)  OVER w as count_c2
+FROM "wikipedia"
+where countryName in ('Guatemala', 'Austria')
+WINDOW w AS (PARTITION BY countryName)
diff --git 
a/sql/src/test/resources/drill/window/queries/druid_queries/shuffle_columns/wikipedia_query_1.e
 
b/sql/src/test/resources/drill/window/queries/druid_queries/shuffle_columns/wikipedia_query_1.e
new file mode 100644
index 00000000000..e934bc8fc27
--- /dev/null
+++ 
b/sql/src/test/resources/drill/window/queries/druid_queries/shuffle_columns/wikipedia_query_1.e
@@ -0,0 +1,15 @@
+Austria        1017.4166666666666
+Austria        1017.4166666666666
+Austria        1017.4166666666666
+Austria        1017.4166666666666
+Austria        1017.4166666666666
+Austria        1017.4166666666666
+Austria        1017.4166666666666
+Austria        1017.4166666666666
+Austria        1017.4166666666666
+Austria        1017.4166666666666
+Austria        1017.4166666666666
+Austria        1017.4166666666666
+Guatemala      58
+Guatemala      58
+Guatemala      58
diff --git 
a/sql/src/test/resources/drill/window/queries/druid_queries/shuffle_columns/wikipedia_query_1.q
 
b/sql/src/test/resources/drill/window/queries/druid_queries/shuffle_columns/wikipedia_query_1.q
new file mode 100644
index 00000000000..f1a7bcb09b1
--- /dev/null
+++ 
b/sql/src/test/resources/drill/window/queries/druid_queries/shuffle_columns/wikipedia_query_1.q
@@ -0,0 +1,5 @@
+SELECT
+countryName,
+AVG(added) OVER(PARTITION BY countryName)
+FROM wikipedia
+where countryName in ('Guatemala', 'Austria')
diff --git 
a/sql/src/test/resources/drill/window/queries/druid_queries/shuffle_columns/wikipedia_query_1_shuffle_1.e
 
b/sql/src/test/resources/drill/window/queries/druid_queries/shuffle_columns/wikipedia_query_1_shuffle_1.e
new file mode 100644
index 00000000000..e74706be009
--- /dev/null
+++ 
b/sql/src/test/resources/drill/window/queries/druid_queries/shuffle_columns/wikipedia_query_1_shuffle_1.e
@@ -0,0 +1,15 @@
+1017.4166666666666     Austria
+1017.4166666666666     Austria
+1017.4166666666666     Austria
+1017.4166666666666     Austria
+1017.4166666666666     Austria
+1017.4166666666666     Austria
+1017.4166666666666     Austria
+1017.4166666666666     Austria
+1017.4166666666666     Austria
+1017.4166666666666     Austria
+1017.4166666666666     Austria
+1017.4166666666666     Austria
+58     Guatemala
+58     Guatemala
+58     Guatemala
diff --git 
a/sql/src/test/resources/drill/window/queries/druid_queries/shuffle_columns/wikipedia_query_1_shuffle_1.q
 
b/sql/src/test/resources/drill/window/queries/druid_queries/shuffle_columns/wikipedia_query_1_shuffle_1.q
new file mode 100644
index 00000000000..c2dc11546a9
--- /dev/null
+++ 
b/sql/src/test/resources/drill/window/queries/druid_queries/shuffle_columns/wikipedia_query_1_shuffle_1.q
@@ -0,0 +1,5 @@
+SELECT
+AVG(added) OVER(PARTITION BY countryName),
+countryName
+FROM wikipedia
+where countryName in ('Guatemala', 'Austria')
diff --git 
a/sql/src/test/resources/drill/window/queries/druid_queries/shuffle_columns/wikipedia_query_2.e
 
b/sql/src/test/resources/drill/window/queries/druid_queries/shuffle_columns/wikipedia_query_2.e
new file mode 100644
index 00000000000..daf6eff61ba
--- /dev/null
+++ 
b/sql/src/test/resources/drill/window/queries/druid_queries/shuffle_columns/wikipedia_query_2.e
@@ -0,0 +1,16 @@
+Austria        null    1       #de.wikipedia   1
+Guatemala      null    1       #es.wikipedia   2
+Republic of Korea      null    1       #en.wikipedia   3
+Republic of Korea      null    2       #ja.wikipedia   4
+Republic of Korea      null    3       #ko.wikipedia   5
+Guatemala      El Salvador     2       #es.wikipedia   1
+Guatemala      Guatemala City  3       #es.wikipedia   1
+Austria        Horsching       2       #de.wikipedia   1
+Republic of Korea      Jeonju  4       #ko.wikipedia   1
+Republic of Korea      Seongnam-si     5       #ko.wikipedia   1
+Republic of Korea      Seoul   6       #ko.wikipedia   1
+Republic of Korea      Suwon-si        7       #ko.wikipedia   1
+Austria        Vienna  3       #de.wikipedia   1
+Austria        Vienna  4       #es.wikipedia   2
+Austria        Vienna  5       #tr.wikipedia   3
+Republic of Korea      Yongsan-dong    8       #ko.wikipedia   1
diff --git 
a/sql/src/test/resources/drill/window/queries/druid_queries/shuffle_columns/wikipedia_query_2.q
 
b/sql/src/test/resources/drill/window/queries/druid_queries/shuffle_columns/wikipedia_query_2.q
new file mode 100644
index 00000000000..d3ea2dfc729
--- /dev/null
+++ 
b/sql/src/test/resources/drill/window/queries/druid_queries/shuffle_columns/wikipedia_query_2.q
@@ -0,0 +1,9 @@
+SELECT
+countryName,
+cityName,
+ROW_NUMBER() OVER(PARTITION BY countryName),
+channel,
+COUNT(channel) over (PARTITION BY cityName order by countryName, cityName, 
channel)
+FROM wikipedia
+where countryName in ('Guatemala', 'Austria', 'Republic of Korea')
+group by countryName, cityName, channel
diff --git 
a/sql/src/test/resources/drill/window/queries/druid_queries/shuffle_columns/wikipedia_query_2_shuffle_1.e
 
b/sql/src/test/resources/drill/window/queries/druid_queries/shuffle_columns/wikipedia_query_2_shuffle_1.e
new file mode 100644
index 00000000000..813ccdbf6aa
--- /dev/null
+++ 
b/sql/src/test/resources/drill/window/queries/druid_queries/shuffle_columns/wikipedia_query_2_shuffle_1.e
@@ -0,0 +1,16 @@
+1      Austria null    1       #de.wikipedia
+1      Guatemala       null    2       #es.wikipedia
+1      Republic of Korea       null    3       #en.wikipedia
+2      Republic of Korea       null    4       #ja.wikipedia
+3      Republic of Korea       null    5       #ko.wikipedia
+2      Guatemala       El Salvador     1       #es.wikipedia
+3      Guatemala       Guatemala City  1       #es.wikipedia
+2      Austria Horsching       1       #de.wikipedia
+4      Republic of Korea       Jeonju  1       #ko.wikipedia
+5      Republic of Korea       Seongnam-si     1       #ko.wikipedia
+6      Republic of Korea       Seoul   1       #ko.wikipedia
+7      Republic of Korea       Suwon-si        1       #ko.wikipedia
+3      Austria Vienna  1       #de.wikipedia
+4      Austria Vienna  2       #es.wikipedia
+5      Austria Vienna  3       #tr.wikipedia
+8      Republic of Korea       Yongsan-dong    1       #ko.wikipedia
diff --git 
a/sql/src/test/resources/drill/window/queries/druid_queries/shuffle_columns/wikipedia_query_2_shuffle_1.q
 
b/sql/src/test/resources/drill/window/queries/druid_queries/shuffle_columns/wikipedia_query_2_shuffle_1.q
new file mode 100644
index 00000000000..779aaf3a86f
--- /dev/null
+++ 
b/sql/src/test/resources/drill/window/queries/druid_queries/shuffle_columns/wikipedia_query_2_shuffle_1.q
@@ -0,0 +1,9 @@
+SELECT
+ROW_NUMBER() OVER(PARTITION BY countryName),
+countryName,
+cityName,
+COUNT(channel) over (PARTITION BY cityName order by countryName, cityName, 
channel),
+channel
+FROM wikipedia
+where countryName in ('Guatemala', 'Austria', 'Republic of Korea')
+group by countryName, cityName, channel


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org

Reply via email to