[ https://issues.apache.org/jira/browse/DRILL-6832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17001142#comment-17001142 ]
ASF GitHub Bot commented on DRILL-6832: --------------------------------------- ihuzenko commented on pull request #1929: DRILL-6832: Remove the old "unmanaged" external sort URL: https://github.com/apache/drill/pull/1929#discussion_r360439104 ########## File path: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java ########## @@ -286,540 +291,419 @@ public void buildSchema() throws SchemaChangeException { state = BatchState.DONE; break; default: - break; + throw new IllegalStateException("Unexpected iter outcome: " + outcome); } } + /** + * Process each request for a batch. The first request retrieves + * all the incoming batches and sorts them, optionally spilling to + * disk as needed. Subsequent calls retrieve the sorted results in + * fixed-size batches. + */ + @Override public IterOutcome innerNext() { - if (schema != null) { - if (spillCount == 0) { - return (getSelectionVector4().next()) ? IterOutcome.OK : IterOutcome.NONE; - } else { - Stopwatch w = Stopwatch.createStarted(); - int count = copier.next(targetRecordCount); - if (count > 0) { - long t = w.elapsed(TimeUnit.MICROSECONDS); - logger.debug("Took {} us to merge {} records", t, count); - container.setRecordCount(count); - return IterOutcome.OK; - } else { - logger.debug("copier returned 0 records"); - return IterOutcome.NONE; - } + switch (sortState) { + case DONE: + return NONE; + case START: + return load(); + case LOAD: + if (!this.retainInMemoryBatchesOnNone) { + resetSortState(); } + return (sortState == SortState.DONE) ? NONE : load(); + case DELIVER: + return nextOutputBatch(); + default: + throw new IllegalStateException("Unexpected sort state: " + sortState); } + } - int totalCount = 0; - int totalBatches = 0; // total number of batches received so far + private IterOutcome nextOutputBatch() { + // Call next on outputSV4 for it's state to progress in parallel to resultsIterator state + outputSV4.next(); - try{ - container.clear(); - outer: while (true) { - IterOutcome upstream; - if (first) { - upstream = IterOutcome.OK_NEW_SCHEMA; - } else { - upstream = next(incoming); - } - if (upstream == IterOutcome.OK && sorter == null) { - upstream = IterOutcome.OK_NEW_SCHEMA; - } - switch (upstream) { - case NONE: - if (first) { - return upstream; - } - break outer; - case NOT_YET: - throw new UnsupportedOperationException(); - case STOP: - return upstream; - case OK_NEW_SCHEMA: - case OK: - VectorContainer convertedBatch; - // only change in the case that the schema truly changes. Artificial schema changes are ignored. - if (upstream == IterOutcome.OK_NEW_SCHEMA && !incoming.getSchema().equals(schema)) { - if (schema != null) { - if (unionTypeEnabled) { - this.schema = SchemaUtil.mergeSchemas(schema, incoming.getSchema()); - } else { - throw new SchemaChangeException("Schema changes not supported in External Sort. Please enable Union type"); - } - } else { - schema = incoming.getSchema(); - } - convertedBatch = SchemaUtil.coerceContainer(incoming, schema, oContext); - for (BatchGroup b : batchGroups) { - b.setSchema(schema); - } - for (BatchGroup b : spilledBatchGroups) { - b.setSchema(schema); - } - this.sorter = createNewSorter(context, convertedBatch); - } else { - convertedBatch = SchemaUtil.coerceContainer(incoming, schema, oContext); - } - if (first) { - first = false; - } - if (convertedBatch.getRecordCount() == 0) { - for (VectorWrapper<?> w : convertedBatch) { - w.clear(); - } - break; - } - SelectionVector2 sv2; - if (incoming.getSchema().getSelectionVectorMode() == BatchSchema.SelectionVectorMode.TWO_BYTE) { - sv2 = incoming.getSelectionVector2().clone(); - } else { - try { - sv2 = newSV2(); - } catch(InterruptedException e) { - return IterOutcome.STOP; - } catch (OutOfMemoryException e) { - throw new OutOfMemoryException(e); - } - } + // But if results iterator next returns true that means it has more results to pass + if (resultsIterator.next()) { + container.setRecordCount(getRecordCount()); + injector.injectUnchecked(context.getExecutionControls(), INTERRUPTION_WHILE_MERGING); + } + // getFinalOutcome will take care of returning correct IterOutcome when there is no data to pass and for + // EMIT/NONE scenarios + return getFinalOutcome(); + } - int count = sv2.getCount(); - totalCount += count; - totalBatches++; - sorter.setup(context, sv2, convertedBatch); - sorter.sort(sv2); - RecordBatchData rbd = new RecordBatchData(convertedBatch, oAllocator); - boolean success = false; - try { - rbd.setSv2(sv2); - batchGroups.add(new BatchGroup(rbd.getContainer(), rbd.getSv2(), oContext)); - if (peakNumBatches < batchGroups.size()) { - peakNumBatches = batchGroups.size(); - stats.setLongStat(Metric.PEAK_BATCHES_IN_MEMORY, peakNumBatches); - } - - batchesSinceLastSpill++; - if (// If we haven't spilled so far, do we have enough memory for MSorter if this turns out to be the last incoming batch? - (spillCount == 0 && !hasMemoryForInMemorySort(totalCount)) || - // If we haven't spilled so far, make sure we don't exceed the maximum number of batches SV4 can address - (spillCount == 0 && totalBatches > Character.MAX_VALUE) || - // TODO(DRILL-4438) - consider setting this threshold more intelligently, - // lowering caused a failing low memory condition (test in BasicPhysicalOpUnitTest) - // to complete successfully (although it caused perf decrease as there was more spilling) - - // current memory used is more than 95% of memory usage limit of this operator - (oAllocator.getAllocatedMemory() > .95 * oAllocator.getLimit()) || - // Number of incoming batches (BatchGroups) exceed the limit and number of incoming batches accumulated - // since the last spill exceed the defined limit - (batchGroups.size() > SPILL_THRESHOLD && batchesSinceLastSpill >= SPILL_BATCH_GROUP_SIZE)) { - - if (firstSpillBatchCount == 0) { - firstSpillBatchCount = batchGroups.size(); - } - - if (spilledBatchGroups.size() > firstSpillBatchCount / 2) { - logger.info("Merging spills"); - final BatchGroup merged = mergeAndSpill(spilledBatchGroups); - if (merged != null) { - spilledBatchGroups.addFirst(merged); - } - } - final BatchGroup merged = mergeAndSpill(batchGroups); - if (merged != null) { // make sure we don't add null to spilledBatchGroups - spilledBatchGroups.add(merged); - batchesSinceLastSpill = 0; - } - } - success = true; - } finally { - if (!success) { - rbd.clear(); - } - } - break; - case OUT_OF_MEMORY: - logger.debug("received OUT_OF_MEMORY, trying to spill"); - if (batchesSinceLastSpill > 2) { - final BatchGroup merged = mergeAndSpill(batchGroups); - if (merged != null) { - spilledBatchGroups.add(merged); - batchesSinceLastSpill = 0; - } - } else { - logger.debug("not enough batches to spill, sending OUT_OF_MEMORY downstream"); - return IterOutcome.OUT_OF_MEMORY; - } - break; - default: - throw new UnsupportedOperationException(); - } - } + /** + * Load the results and sort them. May bail out early if an exceptional + * condition is passed up from the input batch. + * + * @return return code: OK_NEW_SCHEMA if rows were sorted, + * NONE if no rows + */ - if (totalCount == 0) { - return IterOutcome.NONE; - } - if (spillCount == 0) { + private IterOutcome load() { + logger.trace("Start of load phase"); - if (builder != null) { - builder.clear(); - builder.close(); - } - builder = new SortRecordBatchBuilder(oAllocator); + // Don't clear the temporary container created by buildSchema() after each load since across EMIT outcome we have + // to maintain the ValueVector references for downstream operators - for (BatchGroup group : batchGroups) { - RecordBatchData rbd = new RecordBatchData(group.getContainer(), oAllocator); - rbd.setSv2(group.getSv2()); - builder.add(rbd); - } + // Loop over all input batches - builder.build(container); - sv4 = builder.getSv4(); - mSorter = createNewMSorter(); - mSorter.setup(context, oAllocator, getSelectionVector4(), this.container); + IterOutcome result = OK; + for (;;) { + result = loadBatch(); - // For testing memory-leak purpose, inject exception after mSorter finishes setup - injector.injectUnchecked(context.getExecutionControls(), INTERRUPTION_AFTER_SETUP); - mSorter.sort(this.container); + // NONE/EMIT means all batches have been read at this record boundary + if (result == NONE || result == EMIT) { + break; } - // sort may have prematurely exited due to should continue returning false. - if (!context.getExecutorState().shouldContinue()) { - return IterOutcome.STOP; - } + // if result is STOP that means something went wrong. - // For testing memory-leak purpose, inject exception after mSorter finishes sorting - injector.injectUnchecked(context.getExecutionControls(), INTERRUPTION_AFTER_SORT); - sv4 = mSorter.getSV4(); + if (result == STOP) { + return result; } + } Review comment: ```java IterOutcome result; loop: for (;;) { switch (result = loadBatch()) { case NONE: case EMIT: break loop; // all batches have been read at this record boundary case STOP: return STOP; // something went wrong. } } ``` ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Remove old "unmanaged" sort implementation > ------------------------------------------ > > Key: DRILL-6832 > URL: https://issues.apache.org/jira/browse/DRILL-6832 > Project: Apache Drill > Issue Type: Improvement > Affects Versions: 1.14.0 > Reporter: Paul Rogers > Assignee: Paul Rogers > Priority: Minor > > Several releases back Drill introduced a new "managed" external sort that > enhanced the sort operator's memory management. To be safe, at the time, the > new version was controlled by an option, with the ability to revert to the > old version. > The new version has proven to be stable. The time has come to remove the old > version. > * Remove the implementation in {{physical.impl.xsort}}. > * Move the implementation from {{physical.impl.xsort.managed}} to the parent > package. > * Remove the conditional code in the batch creator. > * Remove the option that allowed disabling the new version. -- This message was sent by Atlassian Jira (v8.3.4#803005)