imply-cheddar commented on code in PR #16800:
URL: https://github.com/apache/druid/pull/16800#discussion_r1758992568


##########
processing/src/main/java/org/apache/druid/query/operator/WindowOperatorQueryQueryToolChest.java:
##########
@@ -158,42 +177,77 @@ public Sequence run(
             
queryPlus.withQuery(query.withOverriddenContext(ImmutableMap.of("unravel", 
false))),
             responseContext
         );
+        final ResultSerializationMode serializationMode = 
query.context().getEnum(
+            ResultSerializationMode.CTX_SERIALIZATION_PARAMETER,
+            ResultSerializationMode.class,
+            ResultSerializationMode.ROWS
+        );
+        switch (serializationMode) {
+          case ROWS:
+            return asRows(baseSequence, query);
+          case FRAMES:
+            return asFrames(baseSequence);
+          default:
+            throw DruidException.defensive("Serialization mode[%s] not 
supported", serializationMode);
+        }
+      }
 
-        final RowSignature rowSignature = query.getRowSignature();
-        return baseSequence.flatMap(
-            rac -> {
-              List<Object[]> results = new ArrayList<>(rac.numRows());
-
-              ColumnAccessor[] accessors = new 
ColumnAccessor[rowSignature.size()];
-              int index = 0;
-              for (String columnName : rowSignature.getColumnNames()) {
-                final Column column = rac.findColumn(columnName);
-                if (column == null) {
-                  final ColumnType columnType = rowSignature
-                      .getColumnType(columnName)
-                      .orElse(ColumnType.UNKNOWN_COMPLEX);
-
-                  accessors[index] = new NullColumn.Accessor(columnType, 
rac.numRows());
-                } else {
-                  accessors[index] = column.toAccessor();
-                }
-                ++index;
-              }
+      return baseQueryRunner.run(queryPlus, responseContext);
+    }
 
-              for (int i = 0; i < rac.numRows(); ++i) {
-                Object[] objArr = new Object[accessors.length];
-                for (int j = 0; j < accessors.length; j++) {
-                  objArr[j] = accessors[j].getObject(i);
-                }
-                results.add(objArr);
+    /**
+     * Translates Sequence of RACs to a Sequence of Object[]
+     */
+    private static Sequence asRows(final Sequence<RowsAndColumns> 
baseSequence, final WindowOperatorQuery query)
+    {
+      final RowSignature rowSignature = query.getRowSignature();
+      return baseSequence.flatMap(
+          rac -> {
+            List<Object[]> results = new ArrayList<>(rac.numRows());
+
+            ColumnAccessor[] accessors = new 
ColumnAccessor[rowSignature.size()];
+            int index = 0;
+            for (String columnName : rowSignature.getColumnNames()) {
+              final Column column = rac.findColumn(columnName);
+              if (column == null) {
+                final ColumnType columnType = rowSignature
+                    .getColumnType(columnName)
+                    .orElse(ColumnType.UNKNOWN_COMPLEX);
+
+                accessors[index] = new NullColumn.Accessor(columnType, 
rac.numRows());
+              } else {
+                accessors[index] = column.toAccessor();
               }
+              ++index;
+            }
 
-              return Sequences.simple(results);
+            for (int i = 0; i < rac.numRows(); ++i) {
+              Object[] objArr = new Object[accessors.length];
+              for (int j = 0; j < accessors.length; j++) {
+                objArr[j] = accessors[j].getObject(i);
+              }
+              results.add(objArr);
             }
-        );
-      }
 
-      return baseQueryRunner.run(queryPlus, responseContext);
+            return Sequences.simple(results);
+          }
+      );
+    }
+
+    /**
+     * Translates a sequence of RACs to a Sequence of Frames
+     */
+    private static Sequence asFrames(final Sequence<RowsAndColumns> 
baseSequence)
+    {
+      return baseSequence.map(
+          rac -> {
+            FrameMaker frameMaker = FrameMaker.fromRAC(rac);
+            return new FrameSignaturePair(
+                frameMaker.toColumnBasedFrame(),
+                frameMaker.computeSignature()
+            );

Review Comment:
   Yeah, we could have a recombining operator at the end that basically exists 
to recombine things based on whatever input boundaries would have existed.  We 
should be able to make this "magically" happen, if it's not already then it's 
just a question of what the stacktrace looks like and which concrete RAC 
classes are involved.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to