cryptoe commented on code in PR #14370:
URL: https://github.com/apache/druid/pull/14370#discussion_r1220855468


##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java:
##########
@@ -1393,67 +1393,69 @@ private Yielder<Object[]> getFinalResultsYielder(
 
       return Yielders.each(
           Sequences.concat(
-              
StreamSupport.stream(queryKernel.getResultPartitionsForStage(finalStageId).spliterator(),
 false)
-                           .map(
-                               readablePartition -> {
-                                 try {
-                                   return new FrameChannelSequence(
-                                       inputChannels.openChannel(
-                                           new StagePartition(
-                                               
queryKernel.getStageDefinition(finalStageId).getId(),
-                                               
readablePartition.getPartitionNumber()
-                                           )
-                                       )
-                                   );
-                                 }
-                                 catch (IOException e) {
-                                   throw new RuntimeException(e);
-                                 }
-                               }
-                           ).collect(Collectors.toList())
-          ).flatMap(
-              frame -> {
-                final Cursor cursor = FrameProcessors.makeCursor(
-                    frame,
-                    
queryKernel.getStageDefinition(finalStageId).getFrameReader()
-                );
-
-                final ColumnSelectorFactory columnSelectorFactory = 
cursor.getColumnSelectorFactory();
-                final ColumnMappings columnMappings = 
task.getQuerySpec().getColumnMappings();
-                @SuppressWarnings("rawtypes")
-                final List<ColumnValueSelector> selectors =
-                    columnMappings.getMappings()
-                                  .stream()
-                                  .map(
-                                      mapping ->
-                                          
columnSelectorFactory.makeColumnValueSelector(mapping.getQueryColumn())
-                                  ).collect(Collectors.toList());
-
-                final List<SqlTypeName> sqlTypeNames = task.getSqlTypeNames();
-                final List<Object[]> retVal = new ArrayList<>();
-                while (!cursor.isDone()) {
-                  final Object[] row = new Object[columnMappings.size()];
-                  for (int i = 0; i < row.length; i++) {
-                    final Object value = selectors.get(i).getObject();
-                    if (sqlTypeNames == null || task.getSqlResultsContext() == 
null) {
-                      // SQL type unknown, or no SQL results context: 
pass-through as is.
-                      row[i] = value;
-                    } else {
-                      row[i] = SqlResults.coerce(
-                          context.jsonMapper(),
-                          task.getSqlResultsContext(),
-                          value,
-                          sqlTypeNames.get(i)
-                      );
-                    }
-                  }
-                  retVal.add(row);
-                  cursor.advance();
-                }
-
-                return Sequences.simple(retVal);
-              }
-          ).withBaggage(resultReaderExec::shutdownNow)
+                       
StreamSupport.stream(queryKernel.getResultPartitionsForStage(finalStageId).spliterator(),
 false)
+                                    .map(
+                                        readablePartition -> {
+                                          try {
+                                            return new FrameChannelSequence(
+                                                inputChannels.openChannel(
+                                                    new StagePartition(
+                                                        
queryKernel.getStageDefinition(finalStageId).getId(),
+                                                        
readablePartition.getPartitionNumber()
+                                                    )
+                                                )
+                                            );
+                                          }
+                                          catch (IOException e) {
+                                            throw new RuntimeException(e);
+                                          }
+                                        }
+                                    ).collect(Collectors.toList())
+                   ).flatMap(
+                       frame -> {
+                         final Cursor cursor = FrameProcessors.makeCursor(
+                             frame,
+                             
queryKernel.getStageDefinition(finalStageId).getFrameReader()
+                         );
+
+                         final ColumnSelectorFactory columnSelectorFactory = 
cursor.getColumnSelectorFactory();
+                         final ColumnMappings columnMappings = 
task.getQuerySpec().getColumnMappings();
+                         @SuppressWarnings("rawtypes")
+                         final List<ColumnValueSelector> selectors =
+                             columnMappings.getMappings()
+                                           .stream()
+                                           .map(
+                                               mapping ->
+                                                   
columnSelectorFactory.makeColumnValueSelector(mapping.getQueryColumn())
+                                           ).collect(Collectors.toList());
+
+                         final List<SqlTypeName> sqlTypeNames = 
task.getSqlTypeNames();
+                         final List<Object[]> retVal = new ArrayList<>();
+                         while (!cursor.isDone()) {
+                           final Object[] row = new 
Object[columnMappings.size()];
+                           for (int i = 0; i < row.length; i++) {
+                             final Object value = selectors.get(i).getObject();
+                             if (sqlTypeNames == null || 
task.getSqlResultsContext() == null) {
+                               // SQL type unknown, or no SQL results context: 
pass-through as is.
+                               row[i] = value;
+                             } else {
+                               row[i] = SqlResults.coerce(
+                                   context.jsonMapper(),
+                                   task.getSqlResultsContext(),
+                                   value,
+                                   sqlTypeNames.get(i)
+                               );
+                             }
+                           }
+                           retVal.add(row);
+                           cursor.advance();
+                         }
+
+                         return Sequences.simple(retVal);
+                       }
+                   )
+                   .limit(Limits.MAX_SELECT_RESULT_ROWS)

Review Comment:
   For the end user, we need to set a flag in the report mentioning the results 
are truncated. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to