[ https://issues.apache.org/jira/browse/DRILL-5457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16027251#comment-16027251 ]
ASF GitHub Bot commented on DRILL-5457: --------------------------------------- Github user paul-rogers commented on a diff in the pull request: https://github.com/apache/drill/pull/822#discussion_r118814014 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java --- @@ -400,114 +782,411 @@ public IterOutcome getOutcome() { @Override public int getOutputCount() { - // return outputCount; return lastBatchOutputCount; } @Override public void cleanup() { - if (htable != null) { - htable.clear(); - htable = null; - } + if ( schema == null ) { return; } // not set up; nothing to clean + for ( int i = 0; i < numPartitions; i++) { + if (htables[i] != null) { + htables[i].clear(); + htables[i] = null; + } + if ( batchHolders[i] != null) { + for (BatchHolder bh : batchHolders[i]) { + bh.clear(); + } + batchHolders[i].clear(); + batchHolders[i] = null; + } + + // delete any (still active) output spill file + if ( outputStream[i] != null && spillFiles[i] != null) { + try { + spillSet.delete(spillFiles[i]); + } catch(IOException e) { + logger.warn("Cleanup: Failed to delete spill file {}",spillFiles[i]); + } + } + } + // delete any spill file left in unread spilled partitions + while ( ! spilledPartitionsList.isEmpty() ) { + SpilledPartition sp = spilledPartitionsList.remove(0); + try { + spillSet.delete(sp.spillFile); + } catch(IOException e) { + logger.warn("Cleanup: Failed to delete spill file {}",sp.spillFile); + } + } + spillSet.close(); // delete the spill directory(ies) htIdxHolder = null; materializedValueFields = null; outStartIdxHolder = null; outNumRecordsHolder = null; + } - if (batchHolders != null) { - for (BatchHolder bh : batchHolders) { + // First free the memory used by the given (spilled) partition (i.e., hash table plus batches) + // then reallocate them in pristine state to allow the partition to continue receiving rows + private void reinitPartition(int part) throws SchemaChangeException, ClassTransformationException, IOException { + assert htables[part] != null; + htables[part].reset(); + if ( batchHolders[part] != null) { + for (BatchHolder bh : batchHolders[part]) { bh.clear(); } - batchHolders.clear(); - batchHolders = null; + batchHolders[part].clear(); } + batchHolders[part] = new ArrayList<BatchHolder>(); // First BatchHolder is created when the first put request is received. } -// private final AggOutcome setOkAndReturn() { -// this.outcome = IterOutcome.OK; -// for (VectorWrapper<?> v : outgoing) { -// v.getValueVector().getMutator().setValueCount(outputCount); -// } -// return AggOutcome.RETURN_OUTCOME; -// } private final void incIndex() { underlyingIndex++; if (underlyingIndex >= incoming.getRecordCount()) { currentIndex = Integer.MAX_VALUE; return; } - currentIndex = getVectorIndex(underlyingIndex); + try { currentIndex = getVectorIndex(underlyingIndex); } + catch (SchemaChangeException sc) { throw new DrillRuntimeException(sc);} } private final void resetIndex() { underlyingIndex = -1; incIndex(); } - private void addBatchHolder() { + private boolean isSpilled(int part) { + return outputStream[part] != null; + } + /** + * Which partition to choose for flushing out (i.e. spill or return) ? + * - The current partition (to which a new bach holder is added) has a priority, + * because its last batch holder is full. + * - Also the largest prior spilled partition has some priority, as it is already spilled; + * but spilling too few rows (e.g. a single batch) gets us nothing. + * - So the largest non-spilled partition has some priority, to get more memory freed. + * Need to weigh the above three options. + * + * @param currPart - The partition that hit the memory limit (gets a priority) + * @return The partition (number) chosen to be spilled + */ + private int chooseAPartitionToFlush(int currPart) { + if ( ! is2ndPhase ) { return currPart; } // 1st phase: just use the current partition + int currPartSize = batchHolders[currPart].size(); + if ( currPartSize == 1 ) { currPartSize = -1; } // don't pick current if size is 1 + // first find the largest spilled partition + int maxSizeSpilled = -1; + int indexMaxSpilled = -1; + for (int isp = 0; isp < numPartitions; isp++ ) { + if ( isSpilled(isp) && maxSizeSpilled < batchHolders[isp].size() ) { + maxSizeSpilled = batchHolders[isp].size(); + indexMaxSpilled = isp; + } + } + // Give the current (if already spilled) some priority + if ( isSpilled(currPart) && ( currPartSize + 1 >= maxSizeSpilled )) { + maxSizeSpilled = currPartSize ; + indexMaxSpilled = currPart; + } + // now find the largest non-spilled partition + int maxSize = -1; + int indexMax = -1; + // Use the largest spilled (if found) as a base line, with a factor of 4 + if ( indexMaxSpilled > -1 && maxSizeSpilled > 1 ) { + indexMax = indexMaxSpilled; + maxSize = 4 * maxSizeSpilled ; + } + for ( int insp = 0; insp < numPartitions; insp++) { + if ( ! isSpilled(insp) && maxSize < batchHolders[insp].size() ) { + indexMax = insp; + maxSize = batchHolders[insp].size(); + } + } + // again - priority to the current partition + if ( ! isSpilled(currPart) && (currPartSize + 1 >= maxSize) ) { + return currPart; + } + if ( maxSize <= 1 ) { // Can not make progress by spilling a single batch! + return -1; // try skipping this spill + } + return indexMax; + } + + /** + * Iterate through the batches of the given partition, writing them to a file + * + * @param part The partition (number) to spill + */ + private void spillAPartition(int part) { + + ArrayList<BatchHolder> currPartition = batchHolders[part]; + rowsInPartition = 0; + if ( EXTRA_DEBUG_SPILL ) { + logger.debug("HashAggregate: Spilling partition {} current cycle {} part size {}", part, cycleNum, currPartition.size()); + } + + if ( currPartition.size() == 0 ) { return; } // in case empty - nothing to spill + + // If this is the first spill for this partition, create an output stream + if ( ! isSpilled(part) ) { + + spillFiles[part] = spillSet.getNextSpillFile(cycleNum > 0 ? Integer.toString(cycleNum) : null); + + try { + outputStream[part] = spillSet.openForOutput(spillFiles[part]); + } catch (IOException e) { + throw new DrillRuntimeException("Hash Aggregation failed to open spill file: " + spillFiles[part]); + } + } + + for (int currOutBatchIndex = 0; currOutBatchIndex < currPartition.size(); currOutBatchIndex++ ) { + + // get the number of records in the batch holder that are pending output + int numPendingOutput = currPartition.get(currOutBatchIndex).getNumPendingOutput(); + + rowsInPartition += numPendingOutput; // for logging + rowsSpilled += numPendingOutput; + + allocateOutgoing(numPendingOutput); + + currPartition.get(currOutBatchIndex).outputValues(outStartIdxHolder, outNumRecordsHolder); + int numOutputRecords = outNumRecordsHolder.value; + + this.htables[part].outputKeys(currOutBatchIndex, this.outContainer, outStartIdxHolder.value, outNumRecordsHolder.value); + + // set the value count for outgoing batch value vectors + /* int i = 0; */ + for (VectorWrapper<?> v : outgoing) { + v.getValueVector().getMutator().setValueCount(numOutputRecords); + /* + // print out the first row to be spilled ( varchar, varchar, bigint ) + try { + if (i++ < 2) { + NullableVarCharVector vv = ((NullableVarCharVector) v.getValueVector()); + logger.info("FIRST ROW = {}", vv.getAccessor().get(0)); + } else { + NullableBigIntVector vv = ((NullableBigIntVector) v.getValueVector()); + logger.info("FIRST ROW = {}", vv.getAccessor().get(0)); + } + } catch (Exception e) { logger.info("While printing the first row - Got an exception = {}",e); } + */ + } + + outContainer.setRecordCount(numPendingOutput); + WritableBatch batch = WritableBatch.getBatchNoHVWrap(numPendingOutput, outContainer, false); + VectorAccessibleSerializable outputBatch = new VectorAccessibleSerializable(batch, allocator); + Stopwatch watch = Stopwatch.createStarted(); + try { + outputBatch.writeToStream(outputStream[part]); + } catch (IOException e) { + throw new DrillRuntimeException("Hash Aggregation failed to write to output stream: " + outputStream[part].toString()); + } + outContainer.zeroVectors(); + logger.trace("HASH AGG: Took {} us to spill {} records", watch.elapsed(TimeUnit.MICROSECONDS), numPendingOutput); + } + + spilledBatchesCount[part] += currPartition.size(); // update count of spilled batches + + logger.trace("HASH AGG: Spilled {} rows from {} batches of partition {}", rowsInPartition, currPartition.size(), part); + } + + private void addBatchHolder(int part) { + BatchHolder bh = newBatchHolder(); - batchHolders.add(bh); + batchHolders[part].add(bh); if (EXTRA_DEBUG_1) { - logger.debug("HashAggregate: Added new batch; num batches = {}.", batchHolders.size()); + logger.debug("HashAggregate: Added new batch; num batches = {}.", batchHolders[part].size()); } bh.setup(); } - // Overridden in the generated class when created as plain Java code. - + // These methods are overridden in the generated class when created as plain Java code. protected BatchHolder newBatchHolder() { return new BatchHolder(); } + protected SpilledRecordbatch newSpilledRecordBatch(String arg1, int arg2, FragmentContext arg4, BatchSchema arg5, OperatorContext arg6) { + return new SpilledRecordbatch(arg1, arg2, arg4, arg5, arg6); + } + /** + * Output the next batch from partition "nextPartitionToReturn" + * + * @return iteration outcome (e.g., OK, NONE ...) + */ @Override public IterOutcome outputCurrentBatch() { - if (outBatchIndex >= batchHolders.size()) { - this.outcome = IterOutcome.NONE; - return outcome; + + // when incoming was an empty batch, just finish up + if ( schema == null ) { + logger.trace("Incoming was empty; output is an empty batch."); + this.outcome = IterOutcome.NONE; // no records were read + allFlushed = true; + return this.outcome; } - // get the number of records in the batch holder that are pending output - int numPendingOutput = batchHolders.get(outBatchIndex).getNumPendingOutput(); + // Initialization (covers the case of early output) + ArrayList<BatchHolder> currPartition = batchHolders[earlyPartition]; + int currOutBatchIndex = outBatchIndex[earlyPartition]; + int partitionToReturn = earlyPartition; + + if ( ! earlyOutput ) { + // Update the next partition to return (if needed) + // skip fully returned (or spilled) partitions + while (nextPartitionToReturn < numPartitions) { + // + // If this partition was spilled - spill the rest of it and skip it + // + if ( isSpilled(nextPartitionToReturn) ) { + spillAPartition(nextPartitionToReturn); // spill the rest + SpilledPartition sp = new SpilledPartition(); --- End diff -- Can `SpilledPartition` take on some of the behavior that is inline here? Can it, for example, take a partition state instance and do the spilling? Can it handle the reading later? Not yet entirely clear how this works, so take this suggestion with a grain of salt... > Support Spill to Disk for the Hash Aggregate Operator > ----------------------------------------------------- > > Key: DRILL-5457 > URL: https://issues.apache.org/jira/browse/DRILL-5457 > Project: Apache Drill > Issue Type: Improvement > Components: Execution - Relational Operators > Affects Versions: 1.10.0 > Reporter: Boaz Ben-Zvi > Assignee: Boaz Ben-Zvi > Fix For: 1.11.0 > > > Support gradual spilling memory to disk as the available memory gets too > small to allow in memory work for the Hash Aggregate Operator. -- This message was sent by Atlassian JIRA (v6.3.15#6346)