kgyrtkirk commented on code in PR #16781:
URL: https://github.com/apache/druid/pull/16781#discussion_r1687783900


##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryKit.java:
##########
@@ -302,26 +308,19 @@ private ShuffleSpec 
findShuffleSpecForNextWindow(List<OperatorFactory> operatorF
       }
     }
 
-    Map<String, ColumnWithDirection.Direction> sortColumnsMap = new 
HashMap<>();
-    if (sort != null) {
-      for (ColumnWithDirection sortColumn : sort.getSortColumns()) {
-        sortColumnsMap.put(sortColumn.getColumn(), sortColumn.getDirection());
-      }
-    }
-
-    if (partition == null || partition.getPartitionColumns().isEmpty()) {
-      // If operatorFactories doesn't have any partitioning factory, then we 
should keep the shuffle spec from previous stage.
-      // This indicates that we already have the data partitioned correctly, 
and hence we don't need to do any shuffling.
+    if (partition == null || partition.getPartitionColumns().isEmpty() || sort 
== null || sort.getSortColumns().isEmpty()) {
+      // If operatorFactories doesn't have any partitioning or sorting 
factory, then we should keep the shuffle spec from previous stage.
+      // This indicates that we already have the data partitioned and sorted 
correctly, and hence we don't need to do any shuffling.
       return null;
     }
 
     List<KeyColumn> keyColsOfWindow = new ArrayList<>();
-    for (String partitionColumn : partition.getPartitionColumns()) {
+    for (ColumnWithDirection sortColumn : sort.getSortColumns()) {
       KeyColumn kc;
-      if (sortColumnsMap.get(partitionColumn) == 
ColumnWithDirection.Direction.DESC) {
-        kc = new KeyColumn(partitionColumn, KeyOrder.DESCENDING);
+      if (sortColumn.getDirection() == ColumnWithDirection.Direction.DESC) {
+        kc = new KeyColumn(sortColumn.getColumn(), KeyOrder.DESCENDING);
       } else {
-        kc = new KeyColumn(partitionColumn, KeyOrder.ASCENDING);
+        kc = new KeyColumn(sortColumn.getColumn(), KeyOrder.ASCENDING);
       }
       keyColsOfWindow.add(kc);
     }

Review Comment:
   seems to me that the data will be `ClusterBy` on `keyColsOfWindow` which is 
based on the `sortColumns` ; wouldn't that mean that different values of the 
sorted values may land on different workers?
   
   its so hard to work with this without seeing the plan...I don't even know 
which class is the api to configure sort/partitioning in msq in-between stages 
- it seems to me  that would be `ShuffleSpec`? 
   
   from what I see for results for `SortOperatorFactory` in the 
`WindowOperatorQueryKit` class; I think you may not remove any sort operators; 
as that's not handled correctly in-between stages
   
   I believe the old approach to set this according to the `partitionColumns` 
was the right approach



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org

Reply via email to