cryptoe commented on code in PR #16381:
URL: https://github.com/apache/druid/pull/16381#discussion_r1590572866


##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/results/ExportResultsFrameProcessor.java:
##########
@@ -131,69 +133,76 @@ public ReturnOrAwait<Object> runIncrementally(IntSet 
readableInputs) throws IOEx
     if (inputChannel.isFinished()) {
       return ReturnOrAwait.returnObject(exportFilePath);
     } else {
+      if (exportWriter == null) {
+        createExportWriter();
+      }
       exportFrame(inputChannel.read());
       return ReturnOrAwait.awaitAll(1);
     }
   }
 
-  private void exportFrame(final Frame frame) throws IOException
+  private void exportFrame(final Frame frame)
   {
     final Sequence<Cursor> cursorSequence =
         new FrameStorageAdapter(frame, frameReader, Intervals.ETERNITY)
             .makeCursors(null, Intervals.ETERNITY, VirtualColumns.EMPTY, 
Granularities.ALL, false, null);
 
-    // Add headers if we are writing to a new file.
-    final boolean writeHeader = !storageConnector.pathExists(exportFilePath);
-
-    try (OutputStream stream = storageConnector.write(exportFilePath)) {
-      ResultFormat.Writer formatter = exportFormat.createFormatter(stream, 
jsonMapper);
-      formatter.writeResponseStart();
-
-      if (writeHeader) {
-        formatter.writeHeaderFromRowSignature(exportRowSignature, false);
-      }
-
-      SequenceUtils.forEach(
-          cursorSequence,
-          cursor -> {
-            try {
-              final ColumnSelectorFactory columnSelectorFactory = 
cursor.getColumnSelectorFactory();
-
-              //noinspection rawtypes
-              @SuppressWarnings("rawtypes")
-              final List<BaseObjectColumnValueSelector> selectors =
-                  frameReader.signature()
-                             .getColumnNames()
-                             .stream()
-                             
.map(columnSelectorFactory::makeColumnValueSelector)
-                             .collect(Collectors.toList());
-
-              while (!cursor.isDone()) {
-                formatter.writeRowStart();
-                for (int j = 0; j < exportRowSignature.size(); j++) {
-                  String columnName = exportRowSignature.getColumnName(j);
-                  BaseObjectColumnValueSelector<?> selector = 
selectors.get(outputColumnNameToFrameColumnNumberMap.getInt(columnName));
-                  formatter.writeRowField(columnName, selector.getObject());
-                }
-                channelCounter.incrementRowCount();
-                formatter.writeRowEnd();
-                cursor.advance();
+    SequenceUtils.forEach(
+        cursorSequence,
+        cursor -> {
+          try {
+            final ColumnSelectorFactory columnSelectorFactory = 
cursor.getColumnSelectorFactory();
+
+            //noinspection rawtypes
+            final List<BaseObjectColumnValueSelector> selectors =
+                frameReader.signature()
+                           .getColumnNames()
+                           .stream()
+                           .map(columnSelectorFactory::makeColumnValueSelector)
+                           .collect(Collectors.toList());
+
+            while (!cursor.isDone()) {
+              exportWriter.writeRowStart();
+              for (int j = 0; j < exportRowSignature.size(); j++) {
+                String columnName = exportRowSignature.getColumnName(j);
+                BaseObjectColumnValueSelector<?> selector = 
selectors.get(outputColumnNameToFrameColumnNumberMap.getInt(columnName));
+                exportWriter.writeRowField(columnName, selector.getObject());
               }
-            }
-            catch (IOException e) {
-              throw DruidException.forPersona(DruidException.Persona.USER)
-                                  
.ofCategory(DruidException.Category.RUNTIME_FAILURE)
-                                  .build(e, "Exception occurred while writing 
file to the export location [%s].", exportFilePath);
+              channelCounter.incrementRowCount();
+              exportWriter.writeRowEnd();
+              cursor.advance();
             }
           }
-      );
-      formatter.writeResponseEnd();
+          catch (IOException e) {
+            throw DruidException.forPersona(DruidException.Persona.USER)
+                                
.ofCategory(DruidException.Category.RUNTIME_FAILURE)
+                                .build(e, "Exception occurred while writing 
file to the export location [%s].", exportFilePath);
+          }
+        }
+    );
+  }
+
+  private void createExportWriter()
+  {
+    try {
+      OutputStream stream = storageConnector.write(exportFilePath);
+      exportWriter = exportFormat.createFormatter(stream, jsonMapper);
+      exportWriter.writeResponseStart();
+      exportWriter.writeHeaderFromRowSignature(exportRowSignature, false);
+    }
+    catch (IOException e) {
+      throw DruidException.forPersona(DruidException.Persona.USER)
+                          .ofCategory(DruidException.Category.RUNTIME_FAILURE)
+                          .build(e, "Exception occurred while opening a stream 
to the export location [%s].", exportFilePath);
     }
   }
 
   @Override
   public void cleanup() throws IOException
   {
     FrameProcessors.closeAll(inputChannels(), outputChannels());
+

Review Comment:
   I think a null check is required here. 
   Can we have some Ut's for this ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org

Reply via email to