Github user Ben-Zvi commented on a diff in the pull request: https://github.com/apache/drill/pull/822#discussion_r119975302 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java --- @@ -400,114 +782,411 @@ public IterOutcome getOutcome() { @Override public int getOutputCount() { - // return outputCount; return lastBatchOutputCount; } @Override public void cleanup() { - if (htable != null) { - htable.clear(); - htable = null; - } + if ( schema == null ) { return; } // not set up; nothing to clean + for ( int i = 0; i < numPartitions; i++) { + if (htables[i] != null) { + htables[i].clear(); + htables[i] = null; + } + if ( batchHolders[i] != null) { + for (BatchHolder bh : batchHolders[i]) { + bh.clear(); + } + batchHolders[i].clear(); + batchHolders[i] = null; + } + + // delete any (still active) output spill file + if ( outputStream[i] != null && spillFiles[i] != null) { + try { + spillSet.delete(spillFiles[i]); + } catch(IOException e) { + logger.warn("Cleanup: Failed to delete spill file {}",spillFiles[i]); + } + } + } + // delete any spill file left in unread spilled partitions + while ( ! spilledPartitionsList.isEmpty() ) { + SpilledPartition sp = spilledPartitionsList.remove(0); + try { + spillSet.delete(sp.spillFile); + } catch(IOException e) { + logger.warn("Cleanup: Failed to delete spill file {}",sp.spillFile); + } + } + spillSet.close(); // delete the spill directory(ies) htIdxHolder = null; materializedValueFields = null; outStartIdxHolder = null; outNumRecordsHolder = null; + } - if (batchHolders != null) { - for (BatchHolder bh : batchHolders) { + // First free the memory used by the given (spilled) partition (i.e., hash table plus batches) + // then reallocate them in pristine state to allow the partition to continue receiving rows + private void reinitPartition(int part) throws SchemaChangeException, ClassTransformationException, IOException { + assert htables[part] != null; + htables[part].reset(); + if ( batchHolders[part] != null) { + for (BatchHolder bh : batchHolders[part]) { bh.clear(); } - batchHolders.clear(); - batchHolders = null; + batchHolders[part].clear(); } + batchHolders[part] = new ArrayList<BatchHolder>(); // First BatchHolder is created when the first put request is received. } -// private final AggOutcome setOkAndReturn() { -// this.outcome = IterOutcome.OK; -// for (VectorWrapper<?> v : outgoing) { -// v.getValueVector().getMutator().setValueCount(outputCount); -// } -// return AggOutcome.RETURN_OUTCOME; -// } private final void incIndex() { underlyingIndex++; if (underlyingIndex >= incoming.getRecordCount()) { currentIndex = Integer.MAX_VALUE; return; } - currentIndex = getVectorIndex(underlyingIndex); + try { currentIndex = getVectorIndex(underlyingIndex); } + catch (SchemaChangeException sc) { throw new DrillRuntimeException(sc);} } private final void resetIndex() { underlyingIndex = -1; incIndex(); } - private void addBatchHolder() { + private boolean isSpilled(int part) { + return outputStream[part] != null; + } + /** + * Which partition to choose for flushing out (i.e. spill or return) ? + * - The current partition (to which a new bach holder is added) has a priority, + * because its last batch holder is full. + * - Also the largest prior spilled partition has some priority, as it is already spilled; + * but spilling too few rows (e.g. a single batch) gets us nothing. + * - So the largest non-spilled partition has some priority, to get more memory freed. + * Need to weigh the above three options. + * + * @param currPart - The partition that hit the memory limit (gets a priority) + * @return The partition (number) chosen to be spilled + */ + private int chooseAPartitionToFlush(int currPart) { + if ( ! is2ndPhase ) { return currPart; } // 1st phase: just use the current partition + int currPartSize = batchHolders[currPart].size(); + if ( currPartSize == 1 ) { currPartSize = -1; } // don't pick current if size is 1 + // first find the largest spilled partition + int maxSizeSpilled = -1; + int indexMaxSpilled = -1; + for (int isp = 0; isp < numPartitions; isp++ ) { + if ( isSpilled(isp) && maxSizeSpilled < batchHolders[isp].size() ) { + maxSizeSpilled = batchHolders[isp].size(); + indexMaxSpilled = isp; + } + } + // Give the current (if already spilled) some priority + if ( isSpilled(currPart) && ( currPartSize + 1 >= maxSizeSpilled )) { + maxSizeSpilled = currPartSize ; + indexMaxSpilled = currPart; + } + // now find the largest non-spilled partition + int maxSize = -1; + int indexMax = -1; + // Use the largest spilled (if found) as a base line, with a factor of 4 + if ( indexMaxSpilled > -1 && maxSizeSpilled > 1 ) { + indexMax = indexMaxSpilled; + maxSize = 4 * maxSizeSpilled ; + } + for ( int insp = 0; insp < numPartitions; insp++) { + if ( ! isSpilled(insp) && maxSize < batchHolders[insp].size() ) { + indexMax = insp; + maxSize = batchHolders[insp].size(); + } + } + // again - priority to the current partition + if ( ! isSpilled(currPart) && (currPartSize + 1 >= maxSize) ) { + return currPart; + } + if ( maxSize <= 1 ) { // Can not make progress by spilling a single batch! + return -1; // try skipping this spill + } + return indexMax; + } + + /** + * Iterate through the batches of the given partition, writing them to a file + * + * @param part The partition (number) to spill + */ + private void spillAPartition(int part) { + + ArrayList<BatchHolder> currPartition = batchHolders[part]; + rowsInPartition = 0; + if ( EXTRA_DEBUG_SPILL ) { + logger.debug("HashAggregate: Spilling partition {} current cycle {} part size {}", part, cycleNum, currPartition.size()); + } + + if ( currPartition.size() == 0 ) { return; } // in case empty - nothing to spill + + // If this is the first spill for this partition, create an output stream + if ( ! isSpilled(part) ) { + + spillFiles[part] = spillSet.getNextSpillFile(cycleNum > 0 ? Integer.toString(cycleNum) : null); + + try { + outputStream[part] = spillSet.openForOutput(spillFiles[part]); + } catch (IOException e) { + throw new DrillRuntimeException("Hash Aggregation failed to open spill file: " + spillFiles[part]); + } + } + + for (int currOutBatchIndex = 0; currOutBatchIndex < currPartition.size(); currOutBatchIndex++ ) { + + // get the number of records in the batch holder that are pending output + int numPendingOutput = currPartition.get(currOutBatchIndex).getNumPendingOutput(); + + rowsInPartition += numPendingOutput; // for logging + rowsSpilled += numPendingOutput; + + allocateOutgoing(numPendingOutput); + + currPartition.get(currOutBatchIndex).outputValues(outStartIdxHolder, outNumRecordsHolder); + int numOutputRecords = outNumRecordsHolder.value; + + this.htables[part].outputKeys(currOutBatchIndex, this.outContainer, outStartIdxHolder.value, outNumRecordsHolder.value); + + // set the value count for outgoing batch value vectors + /* int i = 0; */ + for (VectorWrapper<?> v : outgoing) { + v.getValueVector().getMutator().setValueCount(numOutputRecords); + /* + // print out the first row to be spilled ( varchar, varchar, bigint ) + try { + if (i++ < 2) { + NullableVarCharVector vv = ((NullableVarCharVector) v.getValueVector()); + logger.info("FIRST ROW = {}", vv.getAccessor().get(0)); + } else { + NullableBigIntVector vv = ((NullableBigIntVector) v.getValueVector()); + logger.info("FIRST ROW = {}", vv.getAccessor().get(0)); + } + } catch (Exception e) { logger.info("While printing the first row - Got an exception = {}",e); } + */ + } + + outContainer.setRecordCount(numPendingOutput); + WritableBatch batch = WritableBatch.getBatchNoHVWrap(numPendingOutput, outContainer, false); + VectorAccessibleSerializable outputBatch = new VectorAccessibleSerializable(batch, allocator); + Stopwatch watch = Stopwatch.createStarted(); + try { + outputBatch.writeToStream(outputStream[part]); + } catch (IOException e) { + throw new DrillRuntimeException("Hash Aggregation failed to write to output stream: " + outputStream[part].toString()); + } + outContainer.zeroVectors(); + logger.trace("HASH AGG: Took {} us to spill {} records", watch.elapsed(TimeUnit.MICROSECONDS), numPendingOutput); + } + + spilledBatchesCount[part] += currPartition.size(); // update count of spilled batches + + logger.trace("HASH AGG: Spilled {} rows from {} batches of partition {}", rowsInPartition, currPartition.size(), part); + } + + private void addBatchHolder(int part) { + BatchHolder bh = newBatchHolder(); - batchHolders.add(bh); + batchHolders[part].add(bh); if (EXTRA_DEBUG_1) { - logger.debug("HashAggregate: Added new batch; num batches = {}.", batchHolders.size()); + logger.debug("HashAggregate: Added new batch; num batches = {}.", batchHolders[part].size()); } bh.setup(); } - // Overridden in the generated class when created as plain Java code. - + // These methods are overridden in the generated class when created as plain Java code. protected BatchHolder newBatchHolder() { return new BatchHolder(); } + protected SpilledRecordbatch newSpilledRecordBatch(String arg1, int arg2, FragmentContext arg4, BatchSchema arg5, OperatorContext arg6) { + return new SpilledRecordbatch(arg1, arg2, arg4, arg5, arg6); + } + /** + * Output the next batch from partition "nextPartitionToReturn" + * + * @return iteration outcome (e.g., OK, NONE ...) + */ @Override public IterOutcome outputCurrentBatch() { - if (outBatchIndex >= batchHolders.size()) { - this.outcome = IterOutcome.NONE; - return outcome; + + // when incoming was an empty batch, just finish up + if ( schema == null ) { + logger.trace("Incoming was empty; output is an empty batch."); + this.outcome = IterOutcome.NONE; // no records were read + allFlushed = true; + return this.outcome; --- End diff -- This code was changed anyway, as part of eliminating RESTART.
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---