Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21738#discussion_r201208356
  
    --- Diff: 
sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMap.java
 ---
    @@ -82,27 +82,34 @@ public static boolean 
supportsAggregationBufferSchema(StructType schema) {
        * @param emptyAggregationBuffer the default value for new keys (a 
"zero" of the agg. function)
        * @param aggregationBufferSchema the schema of the aggregation buffer, 
used for row conversion.
        * @param groupingKeySchema the schema of the grouping key, used for row 
conversion.
    -   * @param taskMemoryManager the memory manager used to allocate our 
Unsafe memory structures.
    +   * @param taskContext the current task context.
        * @param initialCapacity the initial capacity of the map (a sizing hint 
to avoid re-hashing).
        * @param pageSizeBytes the data page size, in bytes; limits the maximum 
record size.
        */
       public UnsafeFixedWidthAggregationMap(
           InternalRow emptyAggregationBuffer,
           StructType aggregationBufferSchema,
           StructType groupingKeySchema,
    -      TaskMemoryManager taskMemoryManager,
    +      TaskContext taskContext,
           int initialCapacity,
           long pageSizeBytes) {
         this.aggregationBufferSchema = aggregationBufferSchema;
         this.currentAggregationBuffer = new 
UnsafeRow(aggregationBufferSchema.length());
         this.groupingKeyProjection = 
UnsafeProjection.create(groupingKeySchema);
         this.groupingKeySchema = groupingKeySchema;
    -    this.map =
    -      new BytesToBytesMap(taskMemoryManager, initialCapacity, 
pageSizeBytes, true);
    +    this.map = new BytesToBytesMap(
    +      taskContext.taskMemoryManager(), initialCapacity, pageSizeBytes, 
true);
     
         // Initialize the buffer for aggregation value
         final UnsafeProjection valueProjection = 
UnsafeProjection.create(aggregationBufferSchema);
         this.emptyAggregationBuffer = 
valueProjection.apply(emptyAggregationBuffer).getBytes();
    +
    +    // Register a cleanup task with TaskContext to ensure that memory is 
guaranteed to be freed at
    +    // the end of the task. This is necessary to avoid memory leaks in 
when the downstream operator
    +    // does not fully consume the sorter's output (e.g. sort followed by 
limit).
    --- End diff --
    
    super nit: the sorter -> the aggregation, sort followed by limit -> 
aggregation followed by limit.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to