clintropolis commented on code in PR #15470:
URL: https://github.com/apache/druid/pull/15470#discussion_r1471874310


##########
processing/src/main/java/org/apache/druid/query/rowsandcols/concrete/FrameRowsAndColumns.java:
##########
@@ -44,6 +45,7 @@ public class FrameRowsAndColumns implements RowsAndColumns
 
   public FrameRowsAndColumns(Frame frame, RowSignature signature)
   {
+    //this.frame = frame;

Review Comment:
   nit: delete?



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessorFactory.java:
##########
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.querykit;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
+import com.google.common.base.Preconditions;
+import org.apache.druid.collections.ResourceHolder;
+import org.apache.druid.frame.channel.WritableFrameChannel;
+import org.apache.druid.frame.processor.FrameProcessor;
+import org.apache.druid.frame.write.FrameWriterFactory;
+import org.apache.druid.msq.input.ReadableInput;
+import org.apache.druid.msq.kernel.FrameContext;
+import org.apache.druid.query.operator.OperatorFactory;
+import org.apache.druid.query.operator.WindowOperatorQuery;
+import org.apache.druid.segment.SegmentReference;
+import org.apache.druid.segment.column.RowSignature;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.function.Function;
+
+@JsonTypeName("window")
+public class WindowOperatorQueryFrameProcessorFactory extends 
BaseLeafFrameProcessorFactory
+{
+  private final WindowOperatorQuery query;
+  private final List<OperatorFactory> operatorList;
+  private final RowSignature stageRowSignature;
+
+  @Override
+  public boolean equals(Object o)
+  {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    WindowOperatorQueryFrameProcessorFactory that = 
(WindowOperatorQueryFrameProcessorFactory) o;
+    return Objects.equals(query, that.query)
+           && Objects.equals(operatorList, that.operatorList)
+           && Objects.equals(stageRowSignature, that.stageRowSignature);
+  }
+
+  @Override
+  public int hashCode()
+  {
+    return Objects.hash(query, operatorList, stageRowSignature);
+  }

Review Comment:
   super nitpick: these methods aren't very interesting, please put at the end 
of the file near toString



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessor.java:
##########
@@ -0,0 +1,464 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.querykit;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.Iterables;
+import org.apache.druid.collections.ResourceHolder;
+import org.apache.druid.frame.Frame;
+import org.apache.druid.frame.channel.FrameWithPartition;
+import org.apache.druid.frame.channel.ReadableFrameChannel;
+import org.apache.druid.frame.channel.WritableFrameChannel;
+import org.apache.druid.frame.processor.FrameProcessors;
+import org.apache.druid.frame.processor.FrameRowTooLargeException;
+import org.apache.druid.frame.processor.ReturnOrAwait;
+import org.apache.druid.frame.read.FrameReader;
+import org.apache.druid.frame.write.FrameWriter;
+import org.apache.druid.frame.write.FrameWriterFactory;
+import org.apache.druid.java.util.common.Unit;
+import org.apache.druid.java.util.common.io.Closer;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.msq.input.ReadableInput;
+import org.apache.druid.msq.input.table.SegmentWithDescriptor;
+import org.apache.druid.query.groupby.ResultRow;
+import org.apache.druid.query.operator.NaivePartitioningOperatorFactory;
+import org.apache.druid.query.operator.OffsetLimit;
+import org.apache.druid.query.operator.Operator;
+import org.apache.druid.query.operator.OperatorFactory;
+import org.apache.druid.query.operator.WindowOperatorQuery;
+import org.apache.druid.query.rowsandcols.ConcatRowsAndColumns;
+import org.apache.druid.query.rowsandcols.LazilyDecoratedRowsAndColumns;
+import org.apache.druid.query.rowsandcols.MapOfColumnsRowsAndColumns;
+import org.apache.druid.query.rowsandcols.RowsAndColumns;
+import org.apache.druid.query.rowsandcols.concrete.RowBasedFrameRowAndColumns;
+import org.apache.druid.query.rowsandcols.semantic.ColumnSelectorFactoryMaker;
+import org.apache.druid.segment.ColumnSelectorFactory;
+import org.apache.druid.segment.ColumnValueSelector;
+import org.apache.druid.segment.Cursor;
+import org.apache.druid.segment.SegmentReference;
+import org.apache.druid.segment.column.RowSignature;
+
+import javax.annotation.Nullable;
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Function;
+import java.util.function.Supplier;
+
+public class WindowOperatorQueryFrameProcessor extends BaseLeafFrameProcessor
+{
+
+  private static final Logger log = new 
Logger(WindowOperatorQueryFrameProcessor.class);
+  private final WindowOperatorQuery query;
+
+  private final List<OperatorFactory> operatorFactoryList;
+  private final ObjectMapper jsonMapper;
+  private final Closer closer = Closer.create();
+  private final RowSignature outputStageSignature;
+  private final ArrayList<RowsAndColumns> frameRowsAndCols;
+  private final ArrayList<RowsAndColumns> resultRowAndCols;
+  ArrayList<ResultRow> objectsOfASingleRac;
+  private long currentAllocatorCapacity; // Used for generating 
FrameRowTooLargeException if needed
+  private Cursor frameCursor = null;
+  private Supplier<ResultRow> rowSupplierFromFrameCursor;
+  private ResultRow outputRow = null;
+  private FrameWriter frameWriter = null;
+  List<Integer> partitionColsIndex;
+
+  public WindowOperatorQueryFrameProcessor(
+      final WindowOperatorQuery query,
+      final List<OperatorFactory> operatorFactoryList,
+      final ObjectMapper jsonMapper,
+      final ReadableInput baseInput,
+      final Function<SegmentReference, SegmentReference> segmentMapFn,
+      final ResourceHolder<WritableFrameChannel> outputChannelHolder,
+      final ResourceHolder<FrameWriterFactory> frameWriterFactoryHolder,
+      final RowSignature rowSignature
+  )
+  {
+    super(
+        baseInput,
+        segmentMapFn,
+        outputChannelHolder,
+        frameWriterFactoryHolder
+    );
+    this.query = query;
+    this.jsonMapper = jsonMapper;
+    this.operatorFactoryList = operatorFactoryList;
+    this.outputStageSignature = rowSignature;
+    this.frameRowsAndCols = new ArrayList<>();
+    this.resultRowAndCols = new ArrayList<>();
+    this.objectsOfASingleRac = new ArrayList<>();
+    this.partitionColsIndex = new ArrayList<>();
+  }
+
+  @Override
+  protected ReturnOrAwait<Unit> runWithSegment(SegmentWithDescriptor segment)
+  {
+    throw new RuntimeException("Window stage can run only on the output of a 
previous stage");
+  }
+
+  @Override
+  protected ReturnOrAwait<Unit> runWithLoadedSegment(SegmentWithDescriptor 
segment)
+  {
+    throw new RuntimeException("Window stage can run only on the output of a 
previous stage");
+  }
+
+  // previous stage output
+  @Override
+  protected ReturnOrAwait<Unit> runWithInputChannel(ReadableFrameChannel 
inputChannel, FrameReader inputFrameReader)
+  {
+    /**
+     *
+     * PARTITION BY A ORDER BY B
+     *
+     * Frame 1   -> rac1
+     * A  B
+     * 1, 2
+     * 1, 3
+     * 2, 1 --> key changed
+     * 2, 2
+     *
+     *
+     * Frame 2 -> rac2
+     * 3, 1 --> key changed
+     * 3, 2
+     * 3, 3
+     * 3, 4
+     *
+     * Frame 3 -> rac3
+     *
+     * 3, 5
+     * 3, 6
+     * 4, 1 --> key changed
+     * 4, 2
+     *
+     * In case of empty OVER clause, all these racs need to be added to a 
single rows and columns
+     * to be processed. The way we can do this is to use a ConcatRowsAndColumns
+     * ConcatRC [rac1, rac2, rac3]
+     * Run all ops on this
+     *
+     *
+     * The flow would look like:
+     * 1. Validate if the operator has an empty OVER clause
+     * 2. If 1 is true make a giant rows and columns (R&C) using concat as 
shown above
+     *    Let all operators run amok on that R&C
+     * 3. If 1 is false
+     *    Read a frame
+     *    keep the older row in a class variable
+     *    check row by row and compare current with older row to check if 
partition boundary is reached
+     *    when frame partition by changes
+     *    create R&C for those particular set of columns, they would have the 
same partition key
+     *    output will be a single R&C
+     *    write to output channel
+     *
+     *
+     *  Future thoughts:
+     *
+     *  1. We are writing 1 partition to each frame in this way. In case of 
high cardinality data
+     *      we will me making a large number of small frames. We can have a 
check to keep size of frame to a value
+     *      say 20k rows and keep on adding to the same pending frame and not 
create a new frame
+     *
+     *  2. Current approach with R&C and operators materialize a single R&C 
for processing. In case of data
+     *     with low cardinality a single R&C might be too big to consume. Same 
for the case of empty OVER() clause
+     *     Most of the window operations like SUM(), RANK(), RANGE() etc. can 
be made with 2 passes of the data.
+     *     We might think to reimplement them in the MSQ way so that we do not 
have to materialize so much data
+     */

Review Comment:
   this is nice, but should it be moved up being an actual javadoc of the 
method? (or else not a javadoc formatted comment and simply start with `/*`)



##########
processing/src/main/java/org/apache/druid/query/rowsandcols/concrete/RowBasedFrameRowAndColumns.java:
##########
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query.rowsandcols.concrete;
+
+import org.apache.druid.frame.Frame;
+import org.apache.druid.frame.FrameType;
+import org.apache.druid.frame.read.FrameReader;
+import org.apache.druid.frame.read.columnar.FrameColumnReaders;
+import org.apache.druid.frame.segment.FrameStorageAdapter;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.query.rowsandcols.RowsAndColumns;
+import org.apache.druid.query.rowsandcols.column.Column;
+import org.apache.druid.segment.StorageAdapter;
+import org.apache.druid.segment.column.ColumnType;
+import org.apache.druid.segment.column.RowSignature;
+
+import javax.annotation.Nullable;
+import java.util.Collection;
+import java.util.LinkedHashMap;
+
+public class RowBasedFrameRowAndColumns implements RowsAndColumns

Review Comment:
   ci is complaining that this looks like this could use some test coverage, i 
think you can add a test similar to the `FrameRowsAndColumnsTest` and add to 
makers list in `getMakers` of `RowsAndColumnsTestBase`.
   
   Also, I think we probably want to rename `FrameRowsAndColumns` to 
`ColumnarRowsAndColumns` or something



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByQueryKit.java:
##########
@@ -166,34 +169,90 @@ public QueryDefinition makeQueryDefinition(
         partitionBoost
     );
 
-    queryDefBuilder.add(
-        StageDefinition.builder(firstStageNumber + 1)
-                       .inputs(new StageInputSpec(firstStageNumber))
-                       .signature(resultSignature)
-                       .maxWorkerCount(maxWorkerCount)
-                       .shuffleSpec(
-                           shuffleSpecFactoryPostAggregation != null
-                           ? 
shuffleSpecFactoryPostAggregation.build(resultClusterBy, false)
-                           : null
-                       )
-                       .processorFactory(new 
GroupByPostShuffleFrameProcessorFactory(queryToRun))
-    );
+    // note to self:
+    // the result signature might change if I add the window shuffle spec
+    // say the output signature was d0, d1
+    // But shuffle spec for window was d1
+    // example query
+    // select m1,m2,
+    // SUM(m1) OVER(PARTITION BY m2) summ1
+    // from foo
+    // GROUP BY m1, m2

Review Comment:
   nit: this should be deleted or transformed into a general info comment 
before we merge



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessor.java:
##########
@@ -244,7 +247,9 @@ protected ReturnOrAwait<Unit> runWithLoadedSegment(final 
SegmentWithDescriptor s
   @Override
   protected ReturnOrAwait<Unit> runWithSegment(final SegmentWithDescriptor 
segment) throws IOException
   {
+    // why not remove order by and limit here ?

Review Comment:
   nit: this looks like a todo comment (without the todo: tag), should resolve 
this question and delete comment and/or change codes before we merge this



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/DataSourcePlan.java:
##########
@@ -88,6 +88,7 @@ public class DataSourcePlan
    * of subqueries.
    */
   private static final Map<String, Object> CONTEXT_MAP_NO_SEGMENT_GRANULARITY 
= new HashMap<>();
+  public static final String NEXT_WINDOW_SHUFFLE_COL = "__windowShuffleCol";

Review Comment:
   this context key name should maybe be added to 
https://github.com/apache/druid/blob/master/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java
 instead of here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to