Kontinuation commented on issue #17334: URL: https://github.com/apache/datafusion/issues/17334#issuecomment-3570965986
I still prefer the `MemoryConsumer` abstraction in both Apache Spark and the [initial MemoryConsumer in DataFusion](https://github.com/apache/datafusion/pull/1526/files#diff-0b624369e03a3ddcb9af37638d94eac75cce2f5bc0a5aab5d5492c23175610a5). The memory pool coordinates between spillable data structures instead of execution plans. One execution plan instance could create multiple `SendableRecordBatchStream`s, and each stream may hold some instances of spillable data structures. We want the memory manager to coordinate the memory usage of these memory consumers directly instead of doing this through plans. The memory consumer must be able to differentiate between self-initiated spills (triggered by its own `try_grow` request) and passive spills (triggered cooperatively by other consumers), unless we ensure that the consumer calling `try_grow` will never have its `spill` method called in response to memory shortage. There are some common rules for implementing the MemoryConsumer interface in Apache Spark. The description of https://github.com/apache/spark/pull/9241 has listed all the rules and considerations to avoid deadlocking, but I'd like to elaborate on them according to what I understand. **1. Two stages of spillable operators** Spillable operators are pipeline breakers (sort, grouped aggregation, etc.). These operators work in 2 stages: 1. **Build stage**: ingest input batches and build some internal state. For the sort operator, the internal state consists of multiple sorted runs. Operators cannot produce batches until they have ingested all the input batches. 2. **Produce stage** (the Spark PR calls it "scanning stage"): produces batches from their internal states. For instance, merge sorted runs as final ordered outputs. **2. Cooperative spill only applies to the produce stage** The operators usually intensively mutate the internal state in the build stage; it is no fun to handle concurrent passive `spill` requests in this stage. The operators only self-spill in build stage and ignore any spill requests initiated by other consumers. The internal state becomes mostly read-only in produce stage. Handling passive spill requests is much easier. Here is [Spark's sorting implementation](https://github.com/apache/spark/blob/336ca8c12a163d31519a491a28d26ff091626986/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java#L217-L222 ) as an example: it checks whether the spill is triggered by itself (self-spill). If it is not, it handle the spill request only when `readingIterator` is available, which is an indication of being in produce stage. Here is an illustration of a fragment of a query plan involving multiple spillable operators, one is in produce stage and the other is in build stage: ``` ... | v +-------------------+ | GroupedAggregate | in produce stage, can respond to spill request when Sort fails to reserve memory. +-------------------+ | v +-------------------+ | Sort | in build stage, can self-spill, or ask GroupedAggregate to spill. +-------------------+ | v ... ``` **3. Don't allocate memory when spilling** [The documentation of MemoryConsumer.spill](https://github.com/apache/spark/blob/e3533a62ca0abb43ca16e1fc767374f54298a203/core/src/main/java/org/apache/spark/memory/MemoryConsumer.java#L70-L84 ) explicitly required that consumers should not allocate memory when spilling. This is for preventing deadlocks. The `spill` method could be called when someone else is holding the lock of the memory manager, especially when memory reservation failed and cooperative spilling is triggered. Allocating memory in `spill` will request the lock of the memory manager again, which causes deadlock. Re-entrant lock only works when the memory manager only manages consumers in the same thread. However, managing consumers across multiple threads/partitions is a must to be production-ready. **4. Do not free memory in spilling when holding the lock of self** We usually need to protect the internal state using mutexes to support handling concurrent cooperative spilling requests while producing batches. We also usually hold the lock in `spill` method to mutate the internal state to free some memory. One caveat is that we should only call memory management methods such as `free` or `shrink` after the lock is released. There is an example in Spark's sort operator: it deliberately [postpones calls to `freePage`](https://github.com/apache/spark/blob/336ca8c12a163d31519a491a28d26ff091626986/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java#L669-L674) [after the synchronize block](https://github.com/apache/spark/blob/336ca8c12a163d31519a491a28d26ff091626986/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java#L701-L706). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
