nuno-faria opened a new issue, #17597: URL: https://github.com/apache/datafusion/issues/17597
### Describe the bug
The `SortExec` operator with `TopK` is allocating large amounts of data,
leading to OOM with moderately sized tables. It appears to happen when the
column that is being ordered by is not sorted on disk.
cc: @adriangb
### To Reproduce
Using TPC-H `lineitem` as an example (sf=1, 6M rows):
```sql
CREATE EXTERNAL TABLE lineitem STORED AS PARQUET LOCATION
'data/lineitem.parquet';
```
Regular sort without limit, works well:
```sql
select *
from lineitem
order by l_comment;
-- 6001215 row(s) fetched.
-- Elapsed 2.506 seconds.
```
With 20K limit (slow):
```sql
select *
from lineitem
order by l_comment
limit 20000;
-- 20000 row(s) fetched.
-- Elapsed 17.559 seconds.
```
With 50k limit (OOM):
```sql
select *
from lineitem
order by l_comment
limit 50000;
-- memory allocation of 57379067209962197888 bytes failed
```
Sorting by `l_orderkey` does not lead to OOM (already sorted on disk).
### Expected behavior
No OOM.
### Additional context
It does work with `jemalloc`, but it is still slower than the non `LIMIT`
version.
Here is an excerpt of `massif` in case it is useful:
<details>
```rust
100.00% (110,565,117,952B) (page allocation syscalls) mmap/mremap/brk,
--alloc-fns, etc.
->99.92% (110,478,192,640B) 0x57F231C: __mmap64 (mmap64.c:58)
| ->99.92% (110,478,192,640B) 0x57F231C: mmap (mmap64.c:46)
| ->99.06% (109,527,957,504B) 0x3B3D061: os_pages_map (pages.c:149)
| | ->99.06% (109,527,957,504B) 0x3B3D061: _rjem_je_pages_map (pages.c:296)
| | ->99.06% (109,527,957,504B) 0x3B32F18: _rjem_je_extent_alloc_mmap
(extent_mmap.c:24)
| | ->99.01% (109,473,431,552B) 0x3B2B0DF: extent_alloc_core
(ehooks.c:36)
| | | ->99.01% (109,473,431,552B) 0x3B2B0DF:
_rjem_je_ehooks_default_alloc_impl (ehooks.c:59)
| | | ->99.01% (109,473,431,552B) 0x3B327D4: ehooks_alloc
(ehooks.h:197)
| | | ->99.01% (109,473,431,552B) 0x3B327D4: ehooks_alloc
(ehooks.h:191)
| | | ->99.01% (109,473,431,552B) 0x3B327D4:
extent_grow_retained (extent.c:672)
| | | ->99.01% (109,473,431,552B) 0x3B327D4:
extent_alloc_retained (extent.c:791)
| | | ->99.01% (109,473,431,552B) 0x3B327D4:
_rjem_je_ecache_alloc_grow (extent.c:104)
| | | ->99.01% (109,473,431,552B) 0x3B3C30D:
pac_alloc_real (pac.c:124)
| | | ->99.01% (109,473,431,552B) 0x3B3C5B2:
pac_alloc_impl (pac.c:178)
| | | ->99.01% (109,473,431,552B) 0x3B3BA23: pai_alloc
(pai.h:43)
| | | ->99.01% (109,473,431,552B) 0x3B3BA23:
_rjem_je_pa_alloc (pa.c:139)
| | | ->99.00% (109,454,032,896B) 0x3B0CD90:
_rjem_je_arena_extent_alloc_large (arena.c:338)
| | | | ->99.00% (109,454,032,896B) 0x3B39B04:
_rjem_je_large_palloc (large.c:37)
| | | | ->98.10% (108,468,895,744B) 0x3B07901:
arena_malloc (arena_inlines_b.h:162)
| | | | | ->98.10% (108,468,895,744B) 0x3B07901:
iallocztm (jemalloc_internal_inlines_c.h:55)
| | | | | ->98.10% (108,468,895,744B)
0x3B07901: imalloc_no_sample (jemalloc.c:2398)
| | | | | ->98.10% (108,468,895,744B)
0x3B07901: imalloc_body (jemalloc.c:2573)
| | | | | ->98.10% (108,468,895,744B)
0x3B07901: imalloc (jemalloc.c:2687)
| | | | | ->98.10% (108,468,895,744B)
0x3B07901: _rjem_calloc (jemalloc.c:2852)
| | | | | ->98.10% (108,468,895,744B)
0xF545BC: UnknownInlinedFun (lib.rs:117)
| | | | | ->98.10%
(108,468,895,744B) 0xF545BC: UnknownInlinedFun (datafusion-cli/src/main.rs:51)
| | | | | ->98.10%
(108,468,895,744B) 0xF545BC: alloc_zeroed (alloc.rs:177)
| | | | | ->98.10%
(108,468,895,744B) 0xF545BC: UnknownInlinedFun (alloc.rs:189)
| | | | | ->98.10%
(108,468,895,744B) 0xF545BC: allocate_zeroed (alloc.rs:256)
| | | | | ->98.10%
(108,468,895,744B) 0xF545BC: try_allocate_in<alloc::alloc::Global> (mod.rs:478)
| | | | | ->98.10%
(108,468,895,744B) 0xF545BC: with_capacity_zeroed_in<alloc::alloc::Global>
(mod.rs:447)
| | | | | ->98.10%
(108,468,895,744B) 0xF545BC: with_capacity_zeroed_in<core::option::Option<u32>,
alloc::alloc::Global> (mod.rs:212)
| | | | | ->98.10%
(108,468,895,744B) 0xF545BC: from_elem<core::option::Option<u32>,
alloc::alloc::Global> (spec_from_elem.rs:26)
| | | | | ->98.10%
(108,468,895,744B) 0xF545BC: from_elem<core::option::Option<u32>> (mod.rs:3252)
| | | | | ->98.10%
(108,468,895,744B) 0xF545BC: arrow_select::interleave::interleave_views
(interleave.rs:253)
| | | | |
->98.10% (108,468,895,744B) 0xFCF57A: {closure
| | | | |
->98.10% (108,468,895,744B) 0xFCF57A: {closure
| | | | |
->98.10% (108,468,895,744B) 0xFCF57A: try_fold<core::ops::range::Range<usize>,
(), core::iter::adapters::map::map_try_fold::{closure_env
| | | | |
->98.10% (108,468,895,744B) 0xFCF57A:
try_fold<core::result::Result<alloc::sync::Arc<dyn arrow_array::array::Array,
alloc::alloc::Global>, arrow_schema::error::ArrowError>,
core::ops::range::Range<usize>,
arrow_select::interleave::interleave_record_batch::{closure_env
| | | | |
->98.10% (108,468,895,744B) 0xFCF57A:
try_fold<core::iter::adapters::map::Map<core::ops::range::Range<usize>,
arrow_select::interleave::interleave_record_batch::{closure_env
| | | | |
->98.10% (108,468,895,744B) 0xFCF57A:
try_for_each<core::iter::adapters::GenericShunt<core::iter::adapters::map::Map<core::ops::range::Range<usize>,
arrow_select::interleave::interleave_record_batch::{closure_env
| | | | |
->98.10% (108,468,895,744B) 0xFCF57A:
<core::iter::adapters::GenericShunt<I,R> as
core::iter::traits::iterator::Iterator>::next (mod.rs:174)
| | | | |
->98.10% (108,468,895,744B) 0x2E8F361: UnknownInlinedFun (mod.rs:3578)
| | | | |
->98.10% (108,468,895,744B) 0x2E8F361: UnknownInlinedFun
(spec_extend.rs:19)
| | | | |
->98.10% (108,468,895,744B) 0x2E8F361: UnknownInlinedFun
(spec_from_iter_nested.rs:42)
| | | | |
->98.10% (108,468,895,744B) 0x2E8F361: UnknownInlinedFun
(spec_from_iter.rs:34)
| | | | |
->98.10% (108,468,895,744B) 0x2E8F361: UnknownInlinedFun
(mod.rs:3470)
| | | | |
->98.10% (108,468,895,744B) 0x2E8F361: UnknownInlinedFun
(iterator.rs:2027)
| | | | |
->98.10% (108,468,895,744B) 0x2E8F361: UnknownInlinedFun
(result.rs:2050)
| | | | |
->98.10% (108,468,895,744B) 0x2E8F361: UnknownInlinedFun
(mod.rs:160)
| | | | |
->98.10% (108,468,895,744B) 0x2E8F361: UnknownInlinedFun
(result.rs:2050)
| | | | |
->98.10% (108,468,895,744B) 0x2E8F361:
UnknownInlinedFun (iterator.rs:2027)
| | | | |
->98.10% (108,468,895,744B) 0x2E8F361:
UnknownInlinedFun (src/interleave.rs:398)
| | | | |
->98.10% (108,468,895,744B) 0x2E8F361:
datafusion_physical_plan::topk::TopKHeap::emit_with_state (mod.rs:776)
| | | | |
->74.80% (82,699,091,968B) 0x2E69CD8:
datafusion_physical_plan::topk::TopKHeap::maybe_compact (mod.rs:802)
| | | | |
| ->74.80% (82,699,091,968B) 0x30D04C1:
insert_batch (mod.rs:296)
| | | | |
| ->74.80% (82,699,091,968B) 0x30D04C1:
{async_block
| | | | |
| ->74.80% (82,699,091,968B) 0x30D04C1:
poll_next<datafusion_physical_plan::sorts::sort::{impl
| | | | |
| ->74.80% (82,699,091,968B) 0x30D04C1:
try_poll_next<futures_util::stream::once::Once<datafusion_physical_plan::sorts::sort::{impl
| | | | |
| ->74.80% (82,699,091,968B) 0x30D04C1:
poll_next<futures_util::stream::once::Once<datafusion_physical_plan::sorts::sort::{impl
| | | | |
| ->74.80% (82,699,091,968B)
0x30D04C1: <datafusion_physical_plan::stream::RecordBatchStreamAdapter<S> as
futures_core::stream::Stream>::poll_next (stream.rs:451)
| | | | |
| ->74.80% (82,699,091,968B)
0x2F42715: poll_next<alloc::boxed::Box<(dyn
datafusion_execution::stream::RecordBatchStream<Item=core::result::Result<arrow_array::record_batch::RecordBatch,
datafusion_common::error::DataFusionError>> + core::marker::Send),
alloc::alloc::Global>> (stream.rs:130)
| | | | |
| ->74.80% (82,699,091,968B)
0x2F42715: poll_next_unpin<core::pin::Pin<alloc::boxed::Box<(dyn
datafusion_execution::stream::RecordBatchStream<Item=core::result::Result<arrow_array::record_batch::RecordBatch,
datafusion_common::error::DataFusionError>> + core::marker::Send),
alloc::alloc::Global>>> (mod.rs:1638)
| | | | |
| ->74.80% (82,699,091,968B)
0x2F42715: poll<core::pin::Pin<alloc::boxed::Box<(dyn
datafusion_execution::stream::RecordBatchStream<Item=core::result::Result<arrow_array::record_batch::RecordBatch,
datafusion_common::error::DataFusionError>> + core::marker::Send),
alloc::alloc::Global>>> (next.rs:32)
| | | | |
| ->74.80% (82,699,091,968B)
0x2F42715: {async_block
| | | | |
| ->74.80% (82,699,091,968B)
0x2F42715: datafusion_common_runtime::trace_utils::trace_future::{{closure}}
(trace_utils.rs:137)
| | | | |
| ->74.80%
(82,699,091,968B) 0x30C0CEC: poll<alloc::boxed::Box<(dyn
core::future::future::Future<Output=alloc::boxed::Box<(dyn core::any::Any +
core::marker::Send), alloc::alloc::Global>> + core::marker::Send),
alloc::alloc::Global>> (future.rs:124)
| | | | |
| ->74.80%
(82,699,091,968B) 0x30C0CEC: poll<core::pin::Pin<alloc::boxed::Box<(dyn
core::future::future::Future<Output=alloc::boxed::Box<(dyn core::any::Any +
core::marker::Send), alloc::alloc::Global>> + core::marker::Send),
alloc::alloc::Global>>,
datafusion_common_runtime::trace_utils::trace_future::{closure_env
| | | | |
| ->74.80%
(82,699,091,968B) 0x30C0CEC: <futures_util::future::future::Map<Fut,F> as
core::future::future::Future>::poll (lib.rs:86)
| | | | |
| ->74.80%
(82,699,091,968B) 0x2F41445: poll<alloc::boxed::Box<(dyn
core::future::future::Future<Output=core::result::Result<(),
datafusion_common::error::DataFusionError>> + core::marker::Send),
alloc::alloc::Global>> (future.rs:124)
| | | | |
| ->74.80%
(82,699,091,968B) 0x2F41445: {closure
| | | | |
| ->74.80%
(82,699,091,968B) 0x2F41445:
with_mut<tokio::runtime::task::core::Stage<core::pin::Pin<alloc::boxed::Box<(dyn
core::future::future::Future<Output=core::result::Result<(),
datafusion_common::error::DataFusionError>> + core::marker::Send),
alloc::alloc::Global>>>, core::task::poll::Poll<core::result::Result<(),
datafusion_common::error::DataFusionError>>, tokio::runtime::task::core::{impl
| | | | |
| ->74.80%
(82,699,091,968B) 0x2F41445: poll<core::pin::Pin<alloc::boxed::Box<(dyn
core::future::future::Future<Output=core::result::Result<(),
datafusion_common::error::DataFusionError>> + core::marker::Send),
alloc::alloc::Global>>,
alloc::sync::Arc<tokio::runtime::scheduler::multi_thread::handle::Handle,
alloc::alloc::Global>> (core.rs:354)
| | | | |
| ->74.80%
(82,699,091,968B) 0x2F41445: {closure
| | | | |
| ->74.80%
(82,699,091,968B) 0x2F41445:
call_once<core::task::poll::Poll<core::result::Result<(),
datafusion_common::error::DataFusionError>>,
tokio::runtime::task::harness::poll_future::{closure_env
| | | | |
|
->74.80% (82,699,091,968B) 0x2F41445:
do_call<core::panic::unwind_safe::AssertUnwindSafe<tokio::runtime::task::harness::poll_future::{closure_env
| | | | |
|
->74.80% (82,699,091,968B) 0x2F41445:
catch_unwind<core::task::poll::Poll<core::result::Result<(),
datafusion_common::error::DataFusionError>>,
core::panic::unwind_safe::AssertUnwindSafe<tokio::runtime::task::harness::poll_future::{closure_env
| | | | |
|
->74.80% (82,699,091,968B) 0x2F41445:
catch_unwind<core::panic::unwind_safe::AssertUnwindSafe<tokio::runtime::task::harness::poll_future::{closure_env
| | | | |
|
->74.80% (82,699,091,968B) 0x2F41445:
poll_future<core::pin::Pin<alloc::boxed::Box<(dyn
core::future::future::Future<Output=core::result::Result<(),
datafusion_common::error::DataFusionError>> + core::marker::Send),
alloc::alloc::Global>>,
alloc::sync::Arc<tokio::runtime::scheduler::multi_thread::handle::Handle,
alloc::alloc::Global>> (harness.rs:523)
| | | | |
|
->74.80% (82,699,091,968B) 0x2F41445:
poll_inner<core::pin::Pin<alloc::boxed::Box<(dyn
core::future::future::Future<Output=core::result::Result<(),
datafusion_common::error::DataFusionError>> + core::marker::Send),
alloc::alloc::Global>>,
alloc::sync::Arc<tokio::runtime::scheduler::multi_thread::handle::Handle,
alloc::alloc::Global>> (harness.rs:210)
| | | | |
|
->74.80% (82,699,091,968B) 0x2F41445:
poll<core::pin::Pin<alloc::boxed::Box<(dyn
core::future::future::Future<Output=core::result::Result<(),
datafusion_common::error::DataFusionError>> + core::marker::Send),
alloc::alloc::Global>>,
alloc::sync::Arc<tokio::runtime::scheduler::multi_thread::handle::Handle,
alloc::alloc::Global>> (harness.rs:155)
| | | | |
|
->74.80% (82,699,091,968B) 0x2F41445: tokio::runtime::task::raw::poll
(raw.rs:325)
| | | | |
|
->74.80% (82,699,091,968B) 0x3AB8B83: poll (raw.rs:255)
| | | | |
|
->74.80% (82,699,091,968B) 0x3AB8B83:
run<alloc::sync::Arc<tokio::runtime::scheduler::multi_thread::handle::Handle,
alloc::alloc::Global>> (mod.rs:509)
| | | | |
|
->74.80% (82,699,091,968B) 0x3AB8B83: {closure
| | | | |
|
->74.80% (82,699,091,968B) 0x3AB8B83:
with_budget<core::result::Result<alloc::boxed::Box<tokio::runtime::scheduler::multi_thread::worker::Core,
alloc::alloc::Global>, ()>,
tokio::runtime::scheduler::multi_thread::worker::{impl
| | | | |
|
->74.80% (82,699,091,968B) 0x3AB8B83:
budget<core::result::Result<alloc::boxed::Box<tokio::runtime::scheduler::multi_thread::worker::Core,
alloc::alloc::Global>, ()>,
tokio::runtime::scheduler::multi_thread::worker::{impl
| | | | |
|
->74.80% (82,699,091,968B) 0x3AB8B83:
tokio::runtime::scheduler::multi_thread::worker::Context::run_task
(worker.rs:591)
| | | | |
|
->74.80% (82,699,091,968B) 0x3ABCDC1: run (worker.rs:539)
| | | | |
|
->74.80% (82,699,091,968B) 0x3ABCDC1: {closure
| | | | |
|
->74.80% (82,699,091,968B) 0x3ABCDC1:
set<tokio::runtime::scheduler::Context,
tokio::runtime::scheduler::multi_thread::worker::run::{closure
| | | | |
|
->74.80% (82,699,091,968B) 0x3ABCDC1: {closure
| | | | |
|
->74.80% (82,699,091,968B) 0x3ABCDC1:
try_with<tokio::runtime::context::Context,
tokio::runtime::context::set_scheduler::{closure_env
| | | | |
|
->74.80% (82,699,091,968B) 0x3ABCDC1:
with<tokio::runtime::context::Context,
tokio::runtime::context::set_scheduler::{closure_env
| | | | |
|
->74.80% (82,699,091,968B) 0x3ABCDC1:
set_scheduler<(), tokio::runtime::scheduler::multi_thread::worker::run::{closure
| | | | |
|
->74.80% (82,699,091,968B) 0x3ABCDC1: {closure
| | | | |
|
->74.80% (82,699,091,968B) 0x3ABCDC1:
enter_runtime<tokio::runtime::scheduler::multi_thread::worker::run::{closure_env
| | | | |
|
->74.80% (82,699,091,968B) 0x3ABCDC1: run
(worker.rs:491)
| | | | |
|
->74.80% (82,699,091,968B) 0x3ABCDC1:
{closure
| | | | |
|
->74.80% (82,699,091,968B) 0x3ABCDC1:
poll<tokio::runtime::scheduler::multi_thread::worker::{impl
| | | | |
|
->74.80% (82,699,091,968B)
0x3ABCDC1: {closure
| | | | |
|
->74.80% (82,699,091,968B)
0x3ABCDC1:
with_mut<tokio::runtime::task::core::Stage<tokio::runtime::blocking::task::BlockingTask<tokio::runtime::scheduler::multi_thread::worker::{impl
| | | | |
|
->74.80% (82,699,091,968B)
0x3ABCDC1:
poll<tokio::runtime::blocking::task::BlockingTask<tokio::runtime::scheduler::multi_thread::worker::{impl
| | | | |
|
->74.80% (82,699,091,968B)
0x3ABCDC1: {closure
| | | | |
|
->74.80% (82,699,091,968B)
0x3ABCDC1: call_once<core::task::poll::Poll<()>,
tokio::runtime::task::harness::poll_future::{closure_env
| | | | |
|
->74.80% (82,699,091,968B)
0x3ABCDC1:
do_call<core::panic::unwind_safe::AssertUnwindSafe<tokio::runtime::task::harness::poll_future::{closure_env
| | | | |
|
->74.80%
(82,699,091,968B) 0x3ABCDC1: catch_unwind<core::task::poll::Poll<()>,
core::panic::unwind_safe::AssertUnwindSafe<tokio::runtime::task::harness::poll_future::{closure_env
| | | | |
|
->74.80%
(82,699,091,968B) 0x3ABCDC1:
catch_unwind<core::panic::unwind_safe::AssertUnwindSafe<tokio::runtime::task::harness::poll_future::{closure_env
| | | | |
|
->74.80%
(82,699,091,968B) 0x3ABCDC1:
poll_future<tokio::runtime::blocking::task::BlockingTask<tokio::runtime::scheduler::multi_thread::worker::{impl
| | | | |
|
->74.80%
(82,699,091,968B) 0x3ABCDC1:
poll_inner<tokio::runtime::blocking::task::BlockingTask<tokio::runtime::scheduler::multi_thread::worker::{impl
| | | | |
|
->74.80%
(82,699,091,968B) 0x3ABCDC1:
poll<tokio::runtime::blocking::task::BlockingTask<tokio::runtime::scheduler::multi_thread::worker::{impl
| | | | |
|
->74.80%
(82,699,091,968B) 0x3ABCDC1: tokio::runtime::task::raw::poll (raw.rs:325)
| | | | |
|
->74.80%
(82,699,091,968B) 0x3AA4BCA: poll (raw.rs:255)
| | | | |
|
->74.80%
(82,699,091,968B) 0x3AA4BCA:
run<tokio::runtime::blocking::schedule::BlockingSchedule> (mod.rs:546)
| | | | |
|
->74.80%
(82,699,091,968B) 0x3AA4BCA: run (pool.rs:161)
| | | | |
|
->74.80% (82,699,091,968B) 0x3AA4BCA: run (pool.rs:516)
| | | | |
|
->74.80% (82,699,091,968B) 0x3AA4BCA: {closure
| | | | |
|
->74.80% (82,699,091,968B) 0x3AA4BCA:
std::sys::backtrace::__rust_begin_short_backtrace (backtrace.rs:152)
| | | | |
|
->74.80% (82,699,091,968B) 0x3AAA659: {closure
| | | | |
|
->74.80% (82,699,091,968B) 0x3AAA659: call_once<(), std::thread::{impl
| | | | |
|
->74.80% (82,699,091,968B) 0x3AAA659:
do_call<core::panic::unwind_safe::AssertUnwindSafe<std::thread::{impl
| | | | |
|
->74.80% (82,699,091,968B) 0x3AAA659: catch_unwind<(),
core::panic::unwind_safe::AssertUnwindSafe<std::thread::{impl
| | | | |
|
->74.80% (82,699,091,968B) 0x3AAA659:
catch_unwind<core::panic::unwind_safe::AssertUnwindSafe<std::thread::{impl
| | | | |
|
->74.80% (82,699,091,968B) 0x3AAA659: {closure
| | | | |
|
->74.80% (82,699,091,968B) 0x3AAA659:
core::ops::function::FnOnce::call_once{{vtable.shim}} (function.rs:250)
| | | | |
|
->74.80% (82,699,091,968B) 0x3A9F8FE: call_once<(), dyn
core::ops::function::FnOnce<(), Output=()>, alloc::alloc::Global>
(boxed.rs:1966)
| | | | |
|
->74.80% (82,699,091,968B) 0x3A9F8FE:
std::sys::pal::unix::thread::Thread::new::thread_start (thread.rs:107)
| | | | |
|
->74.80% (82,699,091,968B) 0x5769AA3: start_thread
(pthread_create.c:447)
| | | | |
|
->74.80% (82,699,091,968B) 0x57F6A33: clone (clone.S:100)
| | | | |
|
| | | | |
->23.31% (25,769,803,776B) 0x30CEFCE: emit
(mod.rs:743)
| | | | |
->23.31% (25,769,803,776B) 0x30CEFCE: emit
(mod.rs:598)
| | | | |
->23.31% (25,769,803,776B) 0x30CEFCE:
{async_block
| | | | |
->23.31% (25,769,803,776B) 0x30CEFCE:
poll_next<datafusion_physical_plan::sorts::sort::{impl
| | | | |
->23.31% (25,769,803,776B) 0x30CEFCE:
try_poll_next<futures_util::stream::once::Once<datafusion_physical_plan::sorts::sort::{impl
| | | | |
->23.31% (25,769,803,776B) 0x30CEFCE:
poll_next<futures_util::stream::once::Once<datafusion_physical_plan::sorts::sort::{impl
| | | | |
->23.31% (25,769,803,776B)
0x30CEFCE: <datafusion_physical_plan::stream::RecordBatchStreamAdapter<S> as
futures_core::stream::Stream>::poll_next (stream.rs:451)
| | | | |
->23.31% (25,769,803,776B)
0x2F42715: poll_next<alloc::boxed::Box<(dyn
datafusion_execution::stream::RecordBatchStream<Item=core::result::Result<arrow_array::record_batch::RecordBatch,
datafusion_common::error::DataFusionError>> + core::marker::Send),
alloc::alloc::Global>> (stream.rs:130)
| | | | |
->23.31% (25,769,803,776B)
0x2F42715: poll_next_unpin<core::pin::Pin<alloc::boxed::Box<(dyn
datafusion_execution::stream::RecordBatchStream<Item=core::result::Result<arrow_array::record_batch::RecordBatch,
datafusion_common::error::DataFusionError>> + core::marker::Send),
alloc::alloc::Global>>> (mod.rs:1638)
| | | | |
->23.31% (25,769,803,776B)
0x2F42715: poll<core::pin::Pin<alloc::boxed::Box<(dyn
datafusion_execution::stream::RecordBatchStream<Item=core::result::Result<arrow_array::record_batch::RecordBatch,
datafusion_common::error::DataFusionError>> + core::marker::Send),
alloc::alloc::Global>>> (next.rs:32)
| | | | |
->23.31% (25,769,803,776B)
0x2F42715: {async_block
| | | | |
->23.31% (25,769,803,776B)
0x2F42715: datafusion_common_runtime::trace_utils::trace_future::{{closure}}
(trace_utils.rs:137)
| | | | |
->23.31%
(25,769,803,776B) 0x30C0CEC: poll<alloc::boxed::Box<(dyn
core::future::future::Future<Output=alloc::boxed::Box<(dyn core::any::Any +
core::marker::Send), alloc::alloc::Global>> + core::marker::Send),
alloc::alloc::Global>> (future.rs:124)
| | | | |
->23.31%
(25,769,803,776B) 0x30C0CEC: poll<core::pin::Pin<alloc::boxed::Box<(dyn
core::future::future::Future<Output=alloc::boxed::Box<(dyn core::any::Any +
core::marker::Send), alloc::alloc::Global>> + core::marker::Send),
alloc::alloc::Global>>,
datafusion_common_runtime::trace_utils::trace_future::{closure_env
| | | | |
->23.31%
(25,769,803,776B) 0x30C0CEC: <futures_util::future::future::Map<Fut,F> as
core::future::future::Future>::poll (lib.rs:86)
| | | | |
->23.31%
(25,769,803,776B) 0x2F41445: poll<alloc::boxed::Box<(dyn
core::future::future::Future<Output=core::result::Result<(),
datafusion_common::error::DataFusionError>> + core::marker::Send),
alloc::alloc::Global>> (future.rs:124)
| | | | |
->23.31%
(25,769,803,776B) 0x2F41445: {closure
| | | | |
->23.31%
(25,769,803,776B) 0x2F41445:
with_mut<tokio::runtime::task::core::Stage<core::pin::Pin<alloc::boxed::Box<(dyn
core::future::future::Future<Output=core::result::Result<(),
datafusion_common::error::DataFusionError>> + core::marker::Send),
alloc::alloc::Global>>>, core::task::poll::Poll<core::result::Result<(),
datafusion_common::error::DataFusionError>>, tokio::runtime::task::core::{impl
| | | | |
->23.31%
(25,769,803,776B) 0x2F41445: poll<core::pin::Pin<alloc::boxed::Box<(dyn
core::future::future::Future<Output=core::result::Result<(),
datafusion_common::error::DataFusionError>> + core::marker::Send),
alloc::alloc::Global>>,
alloc::sync::Arc<tokio::runtime::scheduler::multi_thread::handle::Handle,
alloc::alloc::Global>> (core.rs:354)
| | | | |
->23.31%
(25,769,803,776B) 0x2F41445: {closure
| | | | |
->23.31%
(25,769,803,776B) 0x2F41445:
call_once<core::task::poll::Poll<core::result::Result<(),
datafusion_common::error::DataFusionError>>,
tokio::runtime::task::harness::poll_future::{closure_env
| | | | |
->23.31% (25,769,803,776B) 0x2F41445:
do_call<core::panic::unwind_safe::AssertUnwindSafe<tokio::runtime::task::harness::poll_future::{closure_env
| | | | |
->23.31% (25,769,803,776B) 0x2F41445:
catch_unwind<core::task::poll::Poll<core::result::Result<(),
datafusion_common::error::DataFusionError>>,
core::panic::unwind_safe::AssertUnwindSafe<tokio::runtime::task::harness::poll_future::{closure_env
| | | | |
->23.31% (25,769,803,776B) 0x2F41445:
catch_unwind<core::panic::unwind_safe::AssertUnwindSafe<tokio::runtime::task::harness::poll_future::{closure_env
| | | | |
->23.31% (25,769,803,776B) 0x2F41445:
poll_future<core::pin::Pin<alloc::boxed::Box<(dyn
core::future::future::Future<Output=core::result::Result<(),
datafusion_common::error::DataFusionError>> + core::marker::Send),
alloc::alloc::Global>>,
alloc::sync::Arc<tokio::runtime::scheduler::multi_thread::handle::Handle,
alloc::alloc::Global>> (harness.rs:523)
| | | | |
->23.31% (25,769,803,776B) 0x2F41445:
poll_inner<core::pin::Pin<alloc::boxed::Box<(dyn
core::future::future::Future<Output=core::result::Result<(),
datafusion_common::error::DataFusionError>> + core::marker::Send),
alloc::alloc::Global>>,
alloc::sync::Arc<tokio::runtime::scheduler::multi_thread::handle::Handle,
alloc::alloc::Global>> (harness.rs:210)
| | | | |
->23.31% (25,769,803,776B) 0x2F41445:
poll<core::pin::Pin<alloc::boxed::Box<(dyn
core::future::future::Future<Output=core::result::Result<(),
datafusion_common::error::DataFusionError>> + core::marker::Send),
alloc::alloc::Global>>,
alloc::sync::Arc<tokio::runtime::scheduler::multi_thread::handle::Handle,
alloc::alloc::Global>> (harness.rs:155)
| | | | |
->23.31% (25,769,803,776B) 0x2F41445: tokio::runtime::task::raw::poll
(raw.rs:325)
| | | | |
->23.31% (25,769,803,776B) 0x3AB8B83: poll (raw.rs:255)
| | | | |
->23.31% (25,769,803,776B) 0x3AB8B83:
run<alloc::sync::Arc<tokio::runtime::scheduler::multi_thread::handle::Handle,
alloc::alloc::Global>> (mod.rs:509)
| | | | |
->23.31% (25,769,803,776B) 0x3AB8B83: {closure
| | | | |
->23.31% (25,769,803,776B) 0x3AB8B83:
with_budget<core::result::Result<alloc::boxed::Box<tokio::runtime::scheduler::multi_thread::worker::Core,
alloc::alloc::Global>, ()>,
tokio::runtime::scheduler::multi_thread::worker::{impl
| | | | |
->23.31% (25,769,803,776B) 0x3AB8B83:
budget<core::result::Result<alloc::boxed::Box<tokio::runtime::scheduler::multi_thread::worker::Core,
alloc::alloc::Global>, ()>,
tokio::runtime::scheduler::multi_thread::worker::{impl
| | | | |
->23.31% (25,769,803,776B) 0x3AB8B83:
tokio::runtime::scheduler::multi_thread::worker::Context::run_task
(worker.rs:591)
| | | | |
->23.31% (25,769,803,776B) 0x3ABCDC1: run (worker.rs:539)
| | | | |
->23.31% (25,769,803,776B) 0x3ABCDC1: {closure
| | | | |
->23.31% (25,769,803,776B) 0x3ABCDC1:
set<tokio::runtime::scheduler::Context,
tokio::runtime::scheduler::multi_thread::worker::run::{closure
| | | | |
->23.31% (25,769,803,776B) 0x3ABCDC1: {closure
| | | | |
->23.31% (25,769,803,776B) 0x3ABCDC1:
try_with<tokio::runtime::context::Context,
tokio::runtime::context::set_scheduler::{closure_env
| | | | |
->23.31% (25,769,803,776B) 0x3ABCDC1:
with<tokio::runtime::context::Context,
tokio::runtime::context::set_scheduler::{closure_env
| | | | |
->23.31% (25,769,803,776B) 0x3ABCDC1:
set_scheduler<(), tokio::runtime::scheduler::multi_thread::worker::run::{closure
| | | | |
->23.31% (25,769,803,776B) 0x3ABCDC1: {closure
| | | | |
->23.31% (25,769,803,776B) 0x3ABCDC1:
enter_runtime<tokio::runtime::scheduler::multi_thread::worker::run::{closure_env
| | | | |
->23.31% (25,769,803,776B) 0x3ABCDC1: run
(worker.rs:491)
| | | | |
->23.31% (25,769,803,776B) 0x3ABCDC1:
{closure
| | | | |
->23.31% (25,769,803,776B) 0x3ABCDC1:
poll<tokio::runtime::scheduler::multi_thread::worker::{impl
| | | | |
->23.31% (25,769,803,776B)
0x3ABCDC1: {closure
| | | | |
->23.31% (25,769,803,776B)
0x3ABCDC1:
with_mut<tokio::runtime::task::core::Stage<tokio::runtime::blocking::task::BlockingTask<tokio::runtime::scheduler::multi_thread::worker::{impl
| | | | |
->23.31% (25,769,803,776B)
0x3ABCDC1:
poll<tokio::runtime::blocking::task::BlockingTask<tokio::runtime::scheduler::multi_thread::worker::{impl
| | | | |
->23.31% (25,769,803,776B)
0x3ABCDC1: {closure
| | | | |
->23.31% (25,769,803,776B)
0x3ABCDC1: call_once<core::task::poll::Poll<()>,
tokio::runtime::task::harness::poll_future::{closure_env
| | | | |
->23.31% (25,769,803,776B)
0x3ABCDC1:
do_call<core::panic::unwind_safe::AssertUnwindSafe<tokio::runtime::task::harness::poll_future::{closure_env
| | | | |
->23.31%
(25,769,803,776B) 0x3ABCDC1: catch_unwind<core::task::poll::Poll<()>,
core::panic::unwind_safe::AssertUnwindSafe<tokio::runtime::task::harness::poll_future::{closure_env
| | | | |
->23.31%
(25,769,803,776B) 0x3ABCDC1:
catch_unwind<core::panic::unwind_safe::AssertUnwindSafe<tokio::runtime::task::harness::poll_future::{closure_env
| | | | |
->23.31%
(25,769,803,776B) 0x3ABCDC1:
poll_future<tokio::runtime::blocking::task::BlockingTask<tokio::runtime::scheduler::multi_thread::worker::{impl
| | | | |
->23.31%
(25,769,803,776B) 0x3ABCDC1:
poll_inner<tokio::runtime::blocking::task::BlockingTask<tokio::runtime::scheduler::multi_thread::worker::{impl
| | | | |
->23.31%
(25,769,803,776B) 0x3ABCDC1:
poll<tokio::runtime::blocking::task::BlockingTask<tokio::runtime::scheduler::multi_thread::worker::{impl
| | | | |
->23.31%
(25,769,803,776B) 0x3ABCDC1: tokio::runtime::task::raw::poll (raw.rs:325)
| | | | |
->23.31%
(25,769,803,776B) 0x3AA4BCA: poll (raw.rs:255)
| | | | |
->23.31%
(25,769,803,776B) 0x3AA4BCA:
run<tokio::runtime::blocking::schedule::BlockingSchedule> (mod.rs:546)
| | | | |
->23.31%
(25,769,803,776B) 0x3AA4BCA: run (pool.rs:161)
| | | | |
->23.31% (25,769,803,776B) 0x3AA4BCA: run (pool.rs:516)
| | | | |
->23.31% (25,769,803,776B) 0x3AA4BCA: {closure
| | | | |
->23.31% (25,769,803,776B) 0x3AA4BCA:
std::sys::backtrace::__rust_begin_short_backtrace (backtrace.rs:152)
| | | | |
->23.31% (25,769,803,776B) 0x3AAA659: {closure
| | | | |
->23.31% (25,769,803,776B) 0x3AAA659: call_once<(), std::thread::{impl
| | | | |
->23.31% (25,769,803,776B) 0x3AAA659:
do_call<core::panic::unwind_safe::AssertUnwindSafe<std::thread::{impl
| | | | |
->23.31% (25,769,803,776B) 0x3AAA659: catch_unwind<(),
core::panic::unwind_safe::AssertUnwindSafe<std::thread::{impl
| | | | |
->23.31% (25,769,803,776B) 0x3AAA659:
catch_unwind<core::panic::unwind_safe::AssertUnwindSafe<std::thread::{impl
| | | | |
->23.31% (25,769,803,776B) 0x3AAA659: {closure
| | | | |
->23.31% (25,769,803,776B) 0x3AAA659:
core::ops::function::FnOnce::call_once{{vtable.shim}} (function.rs:250)
| | | | |
->23.31% (25,769,803,776B) 0x3A9F8FE: call_once<(), dyn
core::ops::function::FnOnce<(), Output=()>, alloc::alloc::Global>
(boxed.rs:1966)
| | | | |
->23.31% (25,769,803,776B) 0x3A9F8FE:
std::sys::pal::unix::thread::Thread::new::thread_start (thread.rs:107)
| | | | |
->23.31% (25,769,803,776B) 0x5769AA3: start_thread
(pthread_create.c:447)
| | | | |
->23.31% (25,769,803,776B) 0x57F6A33: clone (clone.S:100)
```
</details>
The last datafusion calls are
`datafusion_physical_plan::topk::TopKHeap::maybe_compact` and
`datafusion_physical_plan::topk::TopKHeap::emit_with_state`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
