sreemanamala commented on code in PR #16741:
URL: https://github.com/apache/druid/pull/16741#discussion_r1679609472


##########
processing/src/main/java/org/apache/druid/query/operator/window/WindowFrame.java:
##########
@@ -21,159 +21,230 @@
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonSubTypes;
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 import org.apache.druid.query.operator.ColumnWithDirection;
 
+import javax.annotation.Nullable;
+
 import java.util.Collections;
 import java.util.List;
 import java.util.Objects;
 import java.util.stream.Collectors;
 
-public class WindowFrame
+@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
+@JsonSubTypes(value = {
+    @JsonSubTypes.Type(name = "unbounded", value = 
WindowFrame.Unbounded.class),
+    @JsonSubTypes.Type(name = "rows", value = WindowFrame.Rows.class),
+    @JsonSubTypes.Type(name = "groups", value = WindowFrame.Groups.class),
+})
+public interface WindowFrame

Review Comment:
   currently as we just support only UNBOUNDED and CURRENT ROW for RANGE (which 
would be same for GROUPS), so this looks good to use `groups` to represent 
RANGE queries as well. But later on, if we want to support both, shouldnt we 
have something to distinguish them?



##########
processing/src/main/java/org/apache/druid/query/operator/window/WindowFrame.java:
##########
@@ -21,159 +21,230 @@
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonSubTypes;
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 import org.apache.druid.query.operator.ColumnWithDirection;
 
+import javax.annotation.Nullable;
+
 import java.util.Collections;
 import java.util.List;
 import java.util.Objects;
 import java.util.stream.Collectors;
 
-public class WindowFrame
+@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
+@JsonSubTypes(value = {
+    @JsonSubTypes.Type(name = "unbounded", value = 
WindowFrame.Unbounded.class),
+    @JsonSubTypes.Type(name = "rows", value = WindowFrame.Rows.class),
+    @JsonSubTypes.Type(name = "groups", value = WindowFrame.Groups.class),
+})
+public interface WindowFrame
 {
-  public static WindowFrame unbounded()
+  static WindowFrame unbounded()
   {
-    return new WindowFrame(PeerType.ROWS, true, 0, true, 0, null);
+    return new WindowFrame.Unbounded();
   }
 
-  @SuppressWarnings("unused")
-  public enum PeerType
+  static Rows rows(Integer lowerOffset, Integer upperOffset)
   {
-    ROWS,
-    RANGE
+    return new WindowFrame.Rows(lowerOffset, upperOffset);
   }
 
-  // Will likely need to add the order by columns to also be able to deal with 
RANGE peer type.
-  private final PeerType peerType;
-  private final boolean lowerUnbounded;
-  private final int lowerOffset;
-  private final boolean upperUnbounded;
-  private final int upperOffset;
-  private final List<ColumnWithDirection> orderBy;
-
-  @JsonCreator
-  public WindowFrame(
-      @JsonProperty("peerType") PeerType peerType,
-      @JsonProperty("lowUnbounded") boolean lowerUnbounded,
-      @JsonProperty("lowOffset") int lowerOffset,
-      @JsonProperty("uppUnbounded") boolean upperUnbounded,
-      @JsonProperty("uppOffset") int upperOffset,
-      @JsonProperty("orderBy") List<ColumnWithDirection> orderBy
-  )
+  static Groups groups(
+      final Integer lowerOffset,
+      final Integer upperOffset,
+      final List<ColumnWithDirection> orderBy)
   {
-    this.peerType = peerType;
-    this.lowerUnbounded = lowerUnbounded;
-    this.lowerOffset = lowerOffset;
-    this.upperUnbounded = upperUnbounded;
-    this.upperOffset = upperOffset;
-    this.orderBy = orderBy;
-  }
+    return new WindowFrame.Groups(lowerOffset, upperOffset, orderBy);
 
-  @JsonProperty("peerType")
-  public PeerType getPeerType()
-  {
-    return peerType;
   }
 
-  @JsonProperty("lowUnbounded")
-  public boolean isLowerUnbounded()
+  static WindowFrame forOrderBy(ColumnWithDirection... orderBy)
   {
-    return lowerUnbounded;
+    return groups(null, 0, Lists.newArrayList(orderBy));
   }
 
-  @JsonProperty("lowOffset")
-  public int getLowerOffset()
+  class Unbounded implements WindowFrame
   {
-    return lowerOffset;
-  }
+    @JsonCreator
+    public Unbounded()
+    {
+    }
 
-  @JsonProperty("uppUnbounded")
-  public boolean isUpperUnbounded()
-  {
-    return upperUnbounded;
-  }
+    @Override
+    public boolean equals(Object obj)
+    {
+      if (obj == null) {
+        return false;
+      }
+      return getClass() == obj.getClass();
+    }
 
-  @JsonProperty("uppOffset")
-  public int getUpperOffset()
-  {
-    return upperOffset;
-  }
+    @Override
+    public int hashCode()
+    {
+      return 0;
+    }
 
-  @JsonProperty("orderBy")
-  public List<ColumnWithDirection> getOrderBy()
-  {
-    return orderBy;
+    @Override
+    public String toString()
+    {
+      return "WindowFrame.Unbounded []";
+    }
   }
 
-  @Override
-  public boolean equals(Object o)
+  abstract class OffsetFrame implements WindowFrame
   {
-    if (this == o) {
-      return true;
-    }
-    if (!(o instanceof WindowFrame)) {
-      return false;
-    }
-    WindowFrame that = (WindowFrame) o;
-    return lowerUnbounded == that.lowerUnbounded
-           && lowerOffset == that.lowerOffset
-           && upperUnbounded == that.upperUnbounded
-           && upperOffset == that.upperOffset
-           && peerType == that.peerType
-           && Objects.equals(orderBy, that.orderBy);
-  }
+    @JsonProperty
+    public final Integer lowerOffset;
+    @JsonProperty
+    public final Integer upperOffset;
 
-  @Override
-  public int hashCode()
-  {
-    return Objects.hash(peerType, lowerUnbounded, lowerOffset, upperUnbounded, 
upperOffset, orderBy);
-  }
+    @JsonCreator
+    public OffsetFrame(
+        @JsonProperty("lowerOffset") Integer lowerOffset,
+        @JsonProperty("upperOffset") Integer upperOffset)
+    {
+      this.lowerOffset = lowerOffset;
+      this.upperOffset = upperOffset;
+    }
 
-  @Override
-  public String toString()
-  {
-    return "WindowFrame{" +
-           "peerType=" + peerType +
-           ", lowerUnbounded=" + lowerUnbounded +
-           ", lowerOffset=" + lowerOffset +
-           ", upperUnbounded=" + upperUnbounded +
-           ", upperOffset=" + upperOffset +
-           ", orderBy=" + orderBy +
-           '}';
-  }
+    @Override
+    public int hashCode()
+    {
+      return Objects.hash(lowerOffset, upperOffset);
+    }
 
-  public static WindowFrame forOrderBy(ColumnWithDirection... orderBy)
-  {
-    return new WindowFrame(PeerType.RANGE, true, 0, false, 0, 
Lists.newArrayList(orderBy));
+    /**
+     * Calculates the applicable lower offset if the max number of rows is
+     * known.
+     */
+    public int getLowerOffsetClamped(int maxRows)
+    {
+      if (lowerOffset == null) {
+        return -maxRows;
+      }
+      return Math.max(-maxRows, lowerOffset);
+    }
+
+    /**
+     * Calculates the applicable upper offset if the max number of rows is
+     * known.
+     */
+    public int getUpperOffsetClamped(int maxRows)
+    {
+      if (upperOffset == null) {
+        return maxRows;
+      }
+      return Math.min(maxRows, upperOffset);
+    }
+
+    @Override
+    public boolean equals(Object obj)
+    {
+      if (this == obj) {
+        return true;
+      }
+      if (obj == null) {
+        return false;
+      }
+      if (getClass() != obj.getClass()) {
+        return false;
+      }
+      OffsetFrame other = (OffsetFrame) obj;
+      return Objects.equals(lowerOffset, other.lowerOffset) && 
Objects.equals(upperOffset, other.upperOffset);
+    }
+
+    @Override
+    public abstract String toString();
   }
 
-  public List<String> getOrderByColNames()
+  class Rows extends OffsetFrame
   {
-    if (orderBy == null) {
-      return Collections.emptyList();
+    @JsonCreator
+    public Rows(
+        @JsonProperty("lowerOffset") Integer lowerOffset,
+        @JsonProperty("upperOffset") Integer upperOffset)
+    {
+      super(lowerOffset, upperOffset);
+    }
+
+    @Override
+    public String toString()
+    {
+      return "WindowFrame.Rows ["
+          + "lowerOffset=" + lowerOffset +
+          ", upperOffset=" + upperOffset +
+          "]";
     }
-    return 
orderBy.stream().map(ColumnWithDirection::getColumn).collect(Collectors.toList());
   }
 
-  /**
-   * Calculates the applicable lower offset if the max number of rows is known.
-   */
-  public int getLowerOffsetClamped(int maxRows)
+  class Groups extends OffsetFrame
   {
-    if (lowerUnbounded) {
-      return -maxRows;
+    @JsonProperty
+    private final ImmutableList<ColumnWithDirection> orderBy;
+
+    @JsonCreator
+    public Groups(
+        @JsonProperty("lowerOffset") Integer lowerOffset,
+        @JsonProperty("upperOffset") Integer upperOffset,
+        @JsonProperty("orderBy") List<ColumnWithDirection> orderBy)
+    {
+      super(lowerOffset, upperOffset);
+      this.orderBy = ImmutableList.copyOf(orderBy);
+    }
+
+    public List<String> getOrderByColNames()

Review Comment:
   would be nice to have `getOrderByColumns` as well to read the data along 
with direction



##########
sql/src/test/resources/calcite/tests/window/no_grouping.sqlTest:
##########
@@ -12,7 +12,7 @@ expectedOperators:
   - type: "window"
     processor:
       type: "framedAgg"
-      frame: { peerType: "ROWS", lowUnbounded: true, lowOffset: 0, 
uppUnbounded: true, uppOffset: 0 }
+      frame: { type: "rows" }

Review Comment:
   why is this not `{ type: unbounded }` similar to the one in 
`no_grouping2.sqlTest`



##########
processing/src/main/java/org/apache/druid/query/rowsandcols/semantic/DefaultFramedOnHeapAggregatable.java:
##########
@@ -106,31 +110,53 @@ public void appendTo(AppendableRowsAndColumns rac)
   public static Iterable<AggInterval> 
buildIteratorFor(AppendableRowsAndColumns rac, WindowFrame frame)
   {
     int numRows = rac.numRows();
-    if (frame.getLowerOffsetClamped(numRows) == -numRows && 
frame.getUpperOffsetClamped(numRows) == numRows) {
-      return buildUnboundedIteratorFor(rac, frame);
-    } else if (frame.getPeerType() == WindowFrame.PeerType.RANGE) {
-      return buildGroupIteratorFor(rac, frame);
-    } else {
-      return buildRowIteratorFor(rac, frame);
+    if (isEffectivelyUnbounded(frame, numRows)) {
+      return buildUnboundedIteratorFor(rac);
     }
+    Rows rowsFrame = frame.unwrap(WindowFrame.Rows.class);
+    if (rowsFrame != null) {
+      return buildRowIteratorFor(rac, rowsFrame);
+    }
+    Groups groupsFrame = frame.unwrap(WindowFrame.Groups.class);
+    if (groupsFrame != null) {
+      return buildGroupIteratorFor(rac, groupsFrame);
+    }
+    throw DruidException.defensive("Unable to handle WindowFrame [%s]!", 
frame);
   }
 
-  private static Iterable<AggInterval> 
buildUnboundedIteratorFor(AppendableRowsAndColumns rac, WindowFrame frame)
+  private static boolean isEffectivelyUnbounded(WindowFrame frame, int numRows)
   {
-    int[] groupBoundaries = new int[]{0, rac.numRows()};
-    return new GroupIteratorForWindowFrame(frame, groupBoundaries);
+    if (frame.unwrap(WindowFrame.Unbounded.class) != null) {
+      return true;
+    }
+    OffsetFrame offsetFrame = frame.unwrap(WindowFrame.OffsetFrame.class);
+    if (offsetFrame.getLowerOffsetClamped(numRows) == -numRows
+        && offsetFrame.getUpperOffsetClamped(numRows) == numRows) {
+      // regardless the actual mode; all rows will be inside the frame!
+      return true;
+    }
+    return false;
   }
 
-  private static Iterable<AggInterval> 
buildRowIteratorFor(AppendableRowsAndColumns rac, WindowFrame frame)
+  private static Iterable<AggInterval> 
buildUnboundedIteratorFor(AppendableRowsAndColumns rac)
+  {
+    int[] groupBoundaries = new int[] {0, rac.numRows()};
+    return new GroupIteratorForWindowFrame(WindowFrame.rows(0, 0), 
groupBoundaries);
+  }
+
+  private static Iterable<AggInterval> 
buildRowIteratorFor(AppendableRowsAndColumns rac, WindowFrame.Rows frame)
   {
     int[] groupBoundaries = new int[rac.numRows() + 1];
     for (int j = 0; j < groupBoundaries.length; j++) {
       groupBoundaries[j] = j;
     }
+    if (isEffectivelyUnbounded(frame, groupBoundaries.length - 1)) {
+      return buildUnboundedIteratorFor(rac);
+    }

Review Comment:
   isnt this redundant here, as we do this at the start of this method and 
given `isEffectivelyUnbounded` also checks by unwrapping it to `OffsetFrame`?



##########
processing/src/test/java/org/apache/druid/query/rowsandcols/semantic/FramedOnHeapAggregatableTest.java:
##########
@@ -371,7 +370,7 @@ public void testUnboundedWindowedAggregation()
     FramedOnHeapAggregatable agger = FramedOnHeapAggregatable.fromRAC(rac);
 
     final RowsAndColumns results = agger.aggregateAll(
-        new WindowFrame(WindowFrame.PeerType.ROWS, true, 0, true, 0, null),
+        WindowFrame.rows(null, null),

Review Comment:
   what is the difference between `WindowFrame.rows(null, 0)` and 
`WindowFrame.unbounded()`? Is it just the rows frame unwraps as `OffsetFrame`? 
can we use them interchangeably? 



##########
processing/src/main/java/org/apache/druid/query/rowsandcols/semantic/DefaultFramedOnHeapAggregatable.java:
##########
@@ -106,31 +110,53 @@ public void appendTo(AppendableRowsAndColumns rac)
   public static Iterable<AggInterval> 
buildIteratorFor(AppendableRowsAndColumns rac, WindowFrame frame)
   {
     int numRows = rac.numRows();
-    if (frame.getLowerOffsetClamped(numRows) == -numRows && 
frame.getUpperOffsetClamped(numRows) == numRows) {
-      return buildUnboundedIteratorFor(rac, frame);
-    } else if (frame.getPeerType() == WindowFrame.PeerType.RANGE) {
-      return buildGroupIteratorFor(rac, frame);
-    } else {
-      return buildRowIteratorFor(rac, frame);
+    if (isEffectivelyUnbounded(frame, numRows)) {
+      return buildUnboundedIteratorFor(rac);
     }
+    Rows rowsFrame = frame.unwrap(WindowFrame.Rows.class);
+    if (rowsFrame != null) {
+      return buildRowIteratorFor(rac, rowsFrame);
+    }
+    Groups groupsFrame = frame.unwrap(WindowFrame.Groups.class);
+    if (groupsFrame != null) {
+      return buildGroupIteratorFor(rac, groupsFrame);
+    }
+    throw DruidException.defensive("Unable to handle WindowFrame [%s]!", 
frame);
   }
 
-  private static Iterable<AggInterval> 
buildUnboundedIteratorFor(AppendableRowsAndColumns rac, WindowFrame frame)
+  private static boolean isEffectivelyUnbounded(WindowFrame frame, int numRows)
   {
-    int[] groupBoundaries = new int[]{0, rac.numRows()};
-    return new GroupIteratorForWindowFrame(frame, groupBoundaries);
+    if (frame.unwrap(WindowFrame.Unbounded.class) != null) {
+      return true;
+    }
+    OffsetFrame offsetFrame = frame.unwrap(WindowFrame.OffsetFrame.class);
+    if (offsetFrame.getLowerOffsetClamped(numRows) == -numRows
+        && offsetFrame.getUpperOffsetClamped(numRows) == numRows) {
+      // regardless the actual mode; all rows will be inside the frame!
+      return true;
+    }
+    return false;
   }
 
-  private static Iterable<AggInterval> 
buildRowIteratorFor(AppendableRowsAndColumns rac, WindowFrame frame)
+  private static Iterable<AggInterval> 
buildUnboundedIteratorFor(AppendableRowsAndColumns rac)
+  {
+    int[] groupBoundaries = new int[] {0, rac.numRows()};
+    return new GroupIteratorForWindowFrame(WindowFrame.rows(0, 0), 
groupBoundaries);

Review Comment:
   Shouldnt this be `WindowFrame.rows(null, null)` for better readability? 
Technically anything would work 



##########
sql/src/test/java/org/apache/druid/sql/calcite/CalciteWindowQueryTest.java:
##########
@@ -339,6 +327,9 @@ enum TestType
     @JsonProperty
     public TestType type;
 
+    @JsonProperty
+    public Map<String, String> queryContext;

Review Comment:
   Nice to have query context extended here providing the flexibility removing 
`windowQueryTestWithCustomContextMaxSubqueryBytes`! 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org

Reply via email to