Hi, The following code should reproduce the problem. ```
import pyarrow as pa import pyarrow.fs, pyarrow.dataset schema = pa.schema([("id", pa.utf8()), ("bucket", pa.uint8())]) def rb_generator(buckets, rows, batches): batch = pa.record_batch( [[f"id-{i}" for i in range(rows)], [i % buckets for i in range(rows)]], schema=schema, ) for i in range(batches): yield batch print(f"yielding {i}") if __name__ == "__main__": pa.set_io_thread_count(1) reader = pa.RecordBatchReader.from_batches(schema, rb_generator(64, 32768, 1000000)) local_fs = pa.fs.LocalFileSystem() pa.dataset.write_dataset( reader, "/tmp/data_f", format="feather", partitioning=["bucket"], filesystem=local_fs, existing_data_behavior="overwrite_or_ignore" ) ``` Wenbo Hu <huwenbo1...@gmail.com> 于2023年7月30日周日 15:30写道: > > Hi, > Then another question is that "why back pressure not working on the > input stream of write_dataset api?". If back pressure happens on the > end of the acero stream for some reason (on queue stage or write > stage), then the input stream should backpressure as well? It should > keep the memory to a stable level so that the input speed would match > the output speed. > > Then, I made some other experiments with various io_thread_count > values and bucket_size (partitions/opening files). > > 1. for bucket_size to 64 and io_thread_count/cpu_count to 1, the cpu > is up to 100% after transferring done, but there is a very interesting > output. > * flow transferring from client to server at the very first few > batches are slow, less than 0.01M rows/s, then it speeds up to over 4M > rows/s very quickly. > * I think at the very beginning stage, the backpressure works > fine, until sometime, like the previous experiments, the backpressure > makes the stream into a blackhole, then the io thread input stream > stuck at some slow speed. (It's still writing, but takes a lot of time > on waiting upstream CPU partitioning threads to push batches?) > * from iotop, the total disk write is dropping down very slowly > after transferring done. But it may change over different experiments > with the same configuration. I think the upstream backpressure is not > working as expected, which makes the downstream writing keep querying. > I think it may reveal something, maybe at some point, the slow writing > enlarge the backpressure on the whole process (the write speed is > dropping slowly), but the slow reason of writing is the upstream is > already slow down. > > 2. Then I set cpu_count to 64 > 2.1 io_thread_count to 4. > 2.1.1 , for bucket_size to 2/4/6, The system works fine. CPU is less > than 100%. Backpressure works fine, memory will not accumulated much > before the flow speed becomes stable. > 2.1.2 when bucket_size to 8, the bug comes back. After transferring > done, CPU is about 350% (only io thread is running?) and write from > iotop is about 40M/s, then dropping down very slowly. > > 2.2. then I set both io_thread to 6, > 2.2.1 for bucket_size to 6/8/16, The system works fine. CPU is about > 100%. like 2.1.1 > 2.2.2 for bucket_size to 32, the bug comes back. CPU halts at 550%. > > 2.3 io_thread_count to 8 > 2.3.1 for bucket_size to 16, it fails somehow. After transferring > done, the memory accumulated over 3G, but write speed is about 60M/s, > which makes it possible to wait. CPU is about 600~700%. After the > accumulated memory writing, CPU becomes normal. > 2.3.2 for bucket_size to 32, it still fails. CPU halts at 800%. > transferring is very fast (over 14M rows/s). the backpressure is not > working at all. > > > Weston Pace <weston.p...@gmail.com> 于2023年7月29日周六 01:08写道: > > > > > How many io threads are writing concurrently in a single write_dataset > > > call? > > > > With the default options, and no partitioning, it will only use 1 I/O > > thread. This is because we do not write to a single file in parallel. > > If you change FileSystemDatasetWriteOptions::max_rows_per_file then you may > > see more than 1 I/O thread because we will start new files and write to > > each one in parallel. > > If you have partitioning then you may see more than 1 I/O thread because we > > will be writing to multiple files. > > > > We use, at most, 1 I/O thread per file being written. > > > > > How do they schedule? > > > > There are 3 stages. > > > > Arrival stage: Data arrives on an Acero worker thread (CPU thread). We > > will partition the data at this point. For each batch we schedule a Queue > > Batch task. > > Queue stage: This stage runs on the CPU thread. It finds the correct file > > queue for the batch and adds the batch to the file queue. It may split the > > batch if max_rows_per_file is set. It may trigger backpressure if there > > are too many rows queued on files. This stage runs serially, on the CPU > > thread. There is never more than one queue task running. > > Write stage: Each file has a number of write tasks. These run in parallel > > across files but serially within a file. These are I/O tasks. > > > > > The throttle code seems only one task got running? > > > > Yes, there is a throttled scheduler used for the queue stage (we only run > > one queue task at a time). There is a throttled scheduler per file used > > for the write stage. All of these are configured to only allow one task at > > a time to run. > > > > > What else can I do to inspect the problem? > > > > I think we need to find out why the CPU is still 800% after the transfer is > > done when partitioning is enabled. I would expect the CPU to drop to 0% > > even if it takes several seconds (or longer) for the cached data to flush > > to the disk. The strack trace you shared is helpful but I don't know the > > root cause yet. All of the threads are stuck on locking / unlocking in > > FutureImpl::TryAddCallback but that critical section is very small. So it > > seems like there is some kind of task storm. I think this is similar to a > > condition_variable that has thousands of waiters and is constantly doing a > > notify_all. > > > > I think we will need to figure out some kind of reproducible test case. I > > will try and find some time to run some experiments on Monday. Maybe I can > > reproduce this by setting the backpressure limit to a very small amount. > > > > On Fri, Jul 28, 2023 at 6:48 AM Wenbo Hu <huwenbo1...@gmail.com> wrote: > > > > > Hi, > > > Thanks for your detailed explanation, I made some experiment today. > > > > > > Before experiment, > > > 1. To limit the resources used by the server, I use docker, which uses > > > cgroups. But "free" does not respect the resource limit inside the > > > container. > > > 2. I measured the write speed on the host by "dd if=/dev/zero > > > of=./test.img bs=1G count=45 oflag=dsync", the output is "48318382080 > > > Byets(48 GB) Copies,132.558 s,365 MB/s" > > > 3. I do not limit the memory of the container ( available over 140GB), > > > but the CPU is still limit to 8. According to you explanation, the > > > write process should never slow down, since it will write to the > > > "memory cached" which is accounted as used memory until it is flushed > > > to the storage by the OS. > > > > > > Additionally, the file format is "feather", writing with/without > > > partitioning leads to different result. > > > > > > ## write without partitioning > > > Everything works fine, no matter what value I set to io_thread_count > > > or cpu_count. > > > the performance of same configuration varies a lot, the initial peak > > > speed may result in different memory usage, but the max average flow > > > speed not varies a lot. > > > Some records are below, > > > 1. With cpu_count and io_thread_count to 128 CPU is less than 100% and > > > RES is less than 1G (after initial peak speed), average flow speed is > > > 6.83M rows/s (45bytes per row). > > > 2. With cpu_count to 1, io_thread_count to 16, CPU is a little over > > > 100%, RES is about 3g at max, average flow speed is 4.64M rows/s, but > > > it takes additional 6s to complete writing after transferring done. > > > 3. With cpu_count to 1, io_thread_count to 128, performs almost as > > > same as record 2. > > > > > > ## write with partitioning > > > Writing with partitioning fails most of the time, setting lower cpu > > > count not helping. > > > 1. With cpu_count and io_thread_count to 128, CPU is 800% from > > > begining, RES is growing slowing to 40.7G to the end of transferring, > > > average flow speed is 3.24M rows/s. After that, CPU is still 800%, but > > > RES going down very slow at 200MB/minute. Write speed not recovered. > > > 2.With cpu_count to 1, io_thread_count to 16, CPU goes up to 800% > > > slower than record1, RES is growing to 44.1G to the end of > > > transferring, average flow speed is 6.75M rows/s. Same happens as > > > record 1 after transferring done. > > > 3. With cpu_count to 1, io_thread_count to 128, CPU goes to 800% much > > > slower than record2 (due to slower flow speed?), RES is growing to 30G > > > to the end of transferring, average flow speed is 1.62M rows/s. Same > > > happens as record 1 after transferring done. > > > > > > Then I'm trying to limit the flow speed before writing queue got full > > > with custom flow control (sleep on reader iteration based on available > > > memory) But the sleep time curve is not accurate, sometimes flow slows > > > down, but the queue got full anyway. > > > Then the interesting thing happens, before the queue is full (memory > > > quickly grows up), the CPU is not fully used. When memory grows up > > > quickly, CPU goes up as well, to 800%. > > > 1. Sometimes, the writing queue can overcome, CPU will goes down after > > > the memory accumulated. The writing speed recoved and memory back to > > > normal. > > > 2. Sometimes, it can't. IOBPS goes down sharply, and CPU never goes > > > down after that. > > > > > > How many io threads are writing concurrently in a single write_dataset > > > call? How do they schedule? The throttle code seems only one task got > > > running? > > > What else can I do to inspect the problem? > > > > > > Weston Pace <weston.p...@gmail.com> 于2023年7月28日周五 00:33写道: > > > > > > > > You'll need to measure more but generally the bottleneck for writes is > > > > usually going to be the disk itself. Unfortunately, standard OS > > > > buffered > > > > I/O has some pretty negative behaviors in this case. First I'll > > > > describe > > > > what I generally see happen (the last time I profiled this was a while > > > back > > > > but I don't think anything substantial has changed). > > > > > > > > * Initially, writes are very fast. The OS `write` call is simply a > > > memcpy > > > > from user space into kernel space. The actual flushing the data from > > > > kernel space to disk happens asynchronously unless you are using direct > > > I/O > > > > (which is not currently supported). > > > > * Over time, assuming the data arrival rate is faster than the data > > > > write > > > > rate, the data will accumulate in kernel memory. For example, if you > > > > continuously run the Linux `free` program you will see the `free` column > > > > decrease and the `buff/cache` column decreases. The `available` column > > > > should generally stay consistent (kernel memory that is in use but can > > > > technically be flushed to disk if needed is still considered "available" > > > > but not "free") > > > > * Once the `free` column reaches 0 then a few things happen. First, the > > > > calls to `write` are no longer fast (the write cannot complete until > > > > some > > > > existing data has been flushed to disk). Second, other processes that > > > > aren't in use might start swapping their data to disk (you will > > > > generally > > > > see the entire system, if it is interactive, grind to a halt). Third, > > > > if > > > > you have an OOM-killer active, it may start to kill running > > > > applications. > > > > It isn't supposed to do so but there are sometimes bugs[1]. > > > > * Assuming the OOM killer does not kill your application then, because > > > the > > > > `write` calls slow down, the number of rows in the dataset writer's > > > > queue > > > > will start to fill up (this is captured by the variable > > > > `rows_in_flight_throttle`. > > > > * Once the rows_in_flight_throttle is full it will pause and the dataset > > > > writer will return an unfinished future (asking the caller to back off). > > > > * Once this happens the caller will apply backpressure (if being used in > > > > Acero) which will pause the reader. This backpressure is not instant > > > > and > > > > generally each running CPU thread still delivers whatever batch it is > > > > working on. These batches essentially get added to an asynchronous > > > > condition variable waiting on the dataset writer queue to free up. This > > > is > > > > the spot where the ThrottledAsyncTaskScheduler is used. > > > > > > > > The stack dump that you reported is not exactly what I would have > > > expected > > > > but it might still match the above description. At this point I am just > > > > sort of guessing. When the dataset writer frees up enough to receive > > > > another batch it will do what is effectively a "notify all" and all of > > > the > > > > compute threads are waking up and trying to add their batch to the > > > dataset > > > > writer. One of these gets through, gets added to the dataset writer, > > > > and > > > > then backpressure is applied again and all the requests pile up once > > > > again. It's possible that a "resume sending" signal is sent and this > > > might > > > > actually lead to RAM filling up more. We could probably mitigate this > > > > by > > > > adding a low water mark to the dataset writer's backpressure throttle > > > > (so > > > > it doesn't send the resume signal as soon as the queue has room but > > > > waits > > > > until the queue is half full). > > > > > > > > I'd recommend watching the output of `free` (or monitoring memory in > > > > some > > > > other way) and verifying the above. I'd also suggest lowering the > > > > number > > > > of CPU threads and see how that affects performance. If you lower the > > > CPU > > > > threads enough then you should eventually get it to a point where your > > > > supply of data is slower then your writer and I wouldn't expect memory > > > > to > > > > accumulate. These things are solutions but might give us more clues > > > > into > > > > what is happening. > > > > > > > > [1] > > > > > > > https://unix.stackexchange.com/questions/300106/why-is-the-oom-killer-killing-processes-when-swap-is-hardly-used > > > > > > > > On Thu, Jul 27, 2023 at 4:53 AM Wenbo Hu <huwenbo1...@gmail.com> wrote: > > > > > > > > > Hi, > > > > > I'm using flight to receive streams from client and write to the > > > > > storage with python `pa.dataset.write_dataset` API. The whole data is > > > > > 1 Billion rows, over 40GB with one partition column ranges from 0~63. > > > > > The container runs at 8-cores CPU and 4GB ram resources. > > > > > It can be done about 160s (6M rows/s, each record batch is about > > > > > 32K rows) for completing transferring and writing almost > > > > > synchronously, after setting 128 for io_thread_count. > > > > > Then I'd like to find out the bottleneck of the system, CPU or > > > > > RAM or storage? > > > > > 1. I extend the ram into 32GB, then the server consumes more ram, > > > > > the writing progress works at the beginning, then suddenly slow down > > > > > and the data accumulated into ram until OOM. > > > > > 2. Then I set the ram to 64GB, so that the server will not killed > > > > > by OOM. Same happens, also, after all the data is transferred (in > > > > > memory), the server consumes all CPU shares (800%), but still very > > > > > slow on writing (not totally stopped, but about 100MB/minute). > > > > > 2.1 I'm wondering if the io thread is stuck, or the computation > > > > > task is stuck. After setting both io_thread_count and cpu_count to 32, > > > > > I wrapped the input stream of write_dataset with a callback on each > > > > > record batch, I can tell that all the record batches are consumed into > > > > > write_dataset API. > > > > > 2.2 I dumped all threads stack traces and grab a flamegraph. See > > > > > https://gist.github.com/hu6360567/e21ce04e7f620fafb5500cd93d44d3fb. > > > > > > > > > > It seems that all threads stucks at > > > ThrottledAsyncTaskSchedulerImpl. > > > > > > > > > > -- > > > > > --------------------- > > > > > Best Regards, > > > > > Wenbo Hu, > > > > > > > > > > > > > > > > > -- > > > --------------------- > > > Best Regards, > > > Wenbo Hu, > > > > > > > -- > --------------------- > Best Regards, > Wenbo Hu, -- --------------------- Best Regards, Wenbo Hu,