Kontinuation commented on issue #17334:
URL: https://github.com/apache/datafusion/issues/17334#issuecomment-3570965986

   I still prefer the `MemoryConsumer` abstraction in both Apache Spark and the 
[initial MemoryConsumer in 
DataFusion](https://github.com/apache/datafusion/pull/1526/files#diff-0b624369e03a3ddcb9af37638d94eac75cce2f5bc0a5aab5d5492c23175610a5).
 The memory pool coordinates between spillable data structures instead of 
execution plans. One execution plan instance could create multiple 
`SendableRecordBatchStream`s, and each stream may hold some instances of 
spillable data structures. We want the memory manager to coordinate the memory 
usage of these memory consumers directly instead of doing this through plans.
   
   The memory consumer must be able to differentiate between self-initiated 
spills (triggered by its own `try_grow` request) and passive spills (triggered 
cooperatively by other consumers), unless we ensure that the consumer calling 
`try_grow` will never have its `spill` method called in response to memory 
shortage.
   
   There are some common rules for implementing the MemoryConsumer interface in 
Apache Spark. The description of https://github.com/apache/spark/pull/9241 has 
listed all the rules and considerations to avoid deadlocking, but I'd like to 
elaborate on them according to what I understand.
   
   **1. Two stages of spillable operators**
   
   Spillable operators are pipeline breakers (sort, grouped aggregation, etc.). 
These operators work in 2 stages:
   
   1. **Build stage**: ingest input batches and build some internal state. For 
the sort operator, the internal state consists of multiple sorted runs. 
Operators cannot produce batches until they have ingested all the input batches.
   2. **Produce stage** (the Spark PR calls it "scanning stage"): produces 
batches from their internal states. For instance, merge sorted runs as final 
ordered outputs.
   
   **2. Cooperative spill only applies to the produce stage**
   
   The operators usually intensively mutate the internal state in the build 
stage; it is no fun to handle concurrent passive `spill` requests in this 
stage. The operators only self-spill in build stage and ignore any spill 
requests initiated by other consumers. 
   
   The internal state becomes mostly read-only in produce stage. Handling 
passive spill requests is much easier. Here is [Spark's sorting 
implementation](https://github.com/apache/spark/blob/336ca8c12a163d31519a491a28d26ff091626986/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java#L217-L222
   ) as an example:  it checks whether the spill is triggered by itself 
(self-spill). If it is not, it handle the spill request only when 
`readingIterator` is available, which is an indication of being in produce 
stage.
   
   Here is an illustration of a fragment of a query plan involving multiple 
spillable operators, one is in produce stage and the other is in build stage:
   
   ```
            ...
             |
             v
   +-------------------+
   | GroupedAggregate  | in produce stage, can respond to spill request when 
Sort fails to reserve memory.
   +-------------------+
             |
             v
   +-------------------+
   |       Sort        | in build stage, can self-spill, or ask 
GroupedAggregate to spill.
   +-------------------+
             |
             v
            ...
   ```
   
   **3. Don't allocate memory when spilling**
   
   [The documentation of 
MemoryConsumer.spill](https://github.com/apache/spark/blob/e3533a62ca0abb43ca16e1fc767374f54298a203/core/src/main/java/org/apache/spark/memory/MemoryConsumer.java#L70-L84
   ) explicitly required that consumers should not allocate memory when 
spilling. This is for preventing deadlocks. The `spill` method could be called 
when someone else is holding the lock of the memory manager, especially when 
memory reservation failed and cooperative spilling is triggered. Allocating 
memory in `spill` will request the lock of the memory manager again, which 
causes deadlock. Re-entrant lock only works when the memory manager only 
manages consumers in the same thread. However, managing consumers across 
multiple threads/partitions is a must to be production-ready.
   
   **4. Do not free memory in spilling when holding the lock of self**
   
   We usually need to protect the internal state using mutexes to support 
handling concurrent cooperative spilling requests while producing batches. We 
also usually hold the lock in `spill` method to mutate the internal state to 
free some memory. One caveat is that we should only call memory management 
methods such as `free` or `shrink` after the lock is released. There is an 
example in Spark's sort operator: it deliberately [postpones calls to 
`freePage`](https://github.com/apache/spark/blob/336ca8c12a163d31519a491a28d26ff091626986/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java#L669-L674)
 [after the synchronize 
block](https://github.com/apache/spark/blob/336ca8c12a163d31519a491a28d26ff091626986/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java#L701-L706).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to