cryptoe commented on code in PR #16381: URL: https://github.com/apache/druid/pull/16381#discussion_r1590572866
########## extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/results/ExportResultsFrameProcessor.java: ########## @@ -131,69 +133,76 @@ public ReturnOrAwait<Object> runIncrementally(IntSet readableInputs) throws IOEx if (inputChannel.isFinished()) { return ReturnOrAwait.returnObject(exportFilePath); } else { + if (exportWriter == null) { + createExportWriter(); + } exportFrame(inputChannel.read()); return ReturnOrAwait.awaitAll(1); } } - private void exportFrame(final Frame frame) throws IOException + private void exportFrame(final Frame frame) { final Sequence<Cursor> cursorSequence = new FrameStorageAdapter(frame, frameReader, Intervals.ETERNITY) .makeCursors(null, Intervals.ETERNITY, VirtualColumns.EMPTY, Granularities.ALL, false, null); - // Add headers if we are writing to a new file. - final boolean writeHeader = !storageConnector.pathExists(exportFilePath); - - try (OutputStream stream = storageConnector.write(exportFilePath)) { - ResultFormat.Writer formatter = exportFormat.createFormatter(stream, jsonMapper); - formatter.writeResponseStart(); - - if (writeHeader) { - formatter.writeHeaderFromRowSignature(exportRowSignature, false); - } - - SequenceUtils.forEach( - cursorSequence, - cursor -> { - try { - final ColumnSelectorFactory columnSelectorFactory = cursor.getColumnSelectorFactory(); - - //noinspection rawtypes - @SuppressWarnings("rawtypes") - final List<BaseObjectColumnValueSelector> selectors = - frameReader.signature() - .getColumnNames() - .stream() - .map(columnSelectorFactory::makeColumnValueSelector) - .collect(Collectors.toList()); - - while (!cursor.isDone()) { - formatter.writeRowStart(); - for (int j = 0; j < exportRowSignature.size(); j++) { - String columnName = exportRowSignature.getColumnName(j); - BaseObjectColumnValueSelector<?> selector = selectors.get(outputColumnNameToFrameColumnNumberMap.getInt(columnName)); - formatter.writeRowField(columnName, selector.getObject()); - } - channelCounter.incrementRowCount(); - formatter.writeRowEnd(); - cursor.advance(); + SequenceUtils.forEach( + cursorSequence, + cursor -> { + try { + final ColumnSelectorFactory columnSelectorFactory = cursor.getColumnSelectorFactory(); + + //noinspection rawtypes + final List<BaseObjectColumnValueSelector> selectors = + frameReader.signature() + .getColumnNames() + .stream() + .map(columnSelectorFactory::makeColumnValueSelector) + .collect(Collectors.toList()); + + while (!cursor.isDone()) { + exportWriter.writeRowStart(); + for (int j = 0; j < exportRowSignature.size(); j++) { + String columnName = exportRowSignature.getColumnName(j); + BaseObjectColumnValueSelector<?> selector = selectors.get(outputColumnNameToFrameColumnNumberMap.getInt(columnName)); + exportWriter.writeRowField(columnName, selector.getObject()); } - } - catch (IOException e) { - throw DruidException.forPersona(DruidException.Persona.USER) - .ofCategory(DruidException.Category.RUNTIME_FAILURE) - .build(e, "Exception occurred while writing file to the export location [%s].", exportFilePath); + channelCounter.incrementRowCount(); + exportWriter.writeRowEnd(); + cursor.advance(); } } - ); - formatter.writeResponseEnd(); + catch (IOException e) { + throw DruidException.forPersona(DruidException.Persona.USER) + .ofCategory(DruidException.Category.RUNTIME_FAILURE) + .build(e, "Exception occurred while writing file to the export location [%s].", exportFilePath); + } + } + ); + } + + private void createExportWriter() + { + try { + OutputStream stream = storageConnector.write(exportFilePath); + exportWriter = exportFormat.createFormatter(stream, jsonMapper); + exportWriter.writeResponseStart(); + exportWriter.writeHeaderFromRowSignature(exportRowSignature, false); + } + catch (IOException e) { + throw DruidException.forPersona(DruidException.Persona.USER) + .ofCategory(DruidException.Category.RUNTIME_FAILURE) + .build(e, "Exception occurred while opening a stream to the export location [%s].", exportFilePath); } } @Override public void cleanup() throws IOException { FrameProcessors.closeAll(inputChannels(), outputChannels()); + Review Comment: I think a null check is required here. Can we have some Ut's for this ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org