Hi,
    The following code should reproduce the problem.

```

import pyarrow as pa
import pyarrow.fs, pyarrow.dataset

schema = pa.schema([("id", pa.utf8()), ("bucket", pa.uint8())])


def rb_generator(buckets, rows, batches):
    batch = pa.record_batch(
        [[f"id-{i}" for i in range(rows)], [i % buckets for i in range(rows)]],
        schema=schema,
    )

    for i in range(batches):
        yield batch
        print(f"yielding {i}")


if __name__ == "__main__":
    pa.set_io_thread_count(1)
    reader = pa.RecordBatchReader.from_batches(schema,
rb_generator(64, 32768, 1000000))
    local_fs = pa.fs.LocalFileSystem()

    pa.dataset.write_dataset(
        reader,
        "/tmp/data_f",
        format="feather",
        partitioning=["bucket"],
        filesystem=local_fs,
        existing_data_behavior="overwrite_or_ignore"
    )

```

Wenbo Hu <huwenbo1...@gmail.com> 于2023年7月30日周日 15:30写道:
>
> Hi,
>    Then another question is that "why back pressure not working on the
> input stream of write_dataset api?". If back pressure happens on the
> end of the acero stream for some reason (on queue stage or write
> stage), then the input stream should backpressure as well? It should
> keep the memory to a stable level so that the input speed would match
> the output speed.
>
>     Then, I made some other experiments with various io_thread_count
> values and bucket_size (partitions/opening files).
>
> 1. for bucket_size to 64 and io_thread_count/cpu_count to 1, the cpu
> is up to 100% after transferring done, but there is a very interesting
> output.
>     * flow transferring from client to server at the very first few
> batches are slow, less than 0.01M rows/s, then it speeds up to over 4M
> rows/s very quickly.
>     * I think at the very beginning stage, the backpressure works
> fine, until sometime, like the previous experiments, the backpressure
> makes the stream into a blackhole, then the io thread input stream
> stuck at some slow speed. (It's still writing, but takes a lot of time
> on waiting upstream CPU partitioning threads to push batches?)
>     * from iotop, the total disk write is dropping down very slowly
> after transferring done. But it may change over different experiments
> with the same configuration. I think the upstream backpressure is not
> working as expected, which makes the downstream writing keep querying.
> I think it may reveal something, maybe at some point, the slow writing
> enlarge the backpressure on the whole process (the write speed is
> dropping slowly), but the slow reason of writing is the upstream is
> already slow down.
>
> 2. Then I set cpu_count to 64
> 2.1 io_thread_count to 4.
> 2.1.1 , for bucket_size to 2/4/6, The system works fine. CPU is less
> than 100%. Backpressure works fine, memory will not accumulated much
> before the flow speed becomes stable.
> 2.1.2  when bucket_size to 8, the bug comes back. After transferring
> done, CPU is about 350% (only io thread is running?) and write from
> iotop is about 40M/s, then dropping down very slowly.
>
> 2.2. then I set both io_thread to 6,
> 2.2.1 for bucket_size to 6/8/16, The system works fine. CPU is about
> 100%. like 2.1.1
> 2.2.2 for bucket_size to 32, the bug comes back. CPU halts at 550%.
>
> 2.3 io_thread_count to 8
> 2.3.1 for bucket_size to 16, it fails somehow. After transferring
> done, the memory accumulated over 3G, but write speed is about 60M/s,
> which makes it possible to wait. CPU is about 600~700%. After the
> accumulated memory writing, CPU becomes normal.
> 2.3.2 for bucket_size to 32, it still fails. CPU halts at 800%.
> transferring is very fast (over 14M rows/s). the backpressure is not
> working at all.
>
>
> Weston Pace <weston.p...@gmail.com> 于2023年7月29日周六 01:08写道:
> >
> > > How many io threads are writing concurrently in a single write_dataset
> > > call?
> >
> > With the default options, and no partitioning, it will only use 1 I/O
> > thread.  This is because we do not write to a single file in parallel.
> > If you change FileSystemDatasetWriteOptions::max_rows_per_file then you may
> > see more than 1 I/O thread because we will start new files and write to
> > each one in parallel.
> > If you have partitioning then you may see more than 1 I/O thread because we
> > will be writing to multiple files.
> >
> > We use, at most, 1 I/O thread per file being written.
> >
> > > How do they schedule?
> >
> > There are 3 stages.
> >
> > Arrival stage: Data arrives on an Acero worker thread (CPU thread).  We
> > will partition the data at this point.  For each batch we schedule a Queue
> > Batch task.
> > Queue stage: This stage runs on the CPU thread.  It finds the correct file
> > queue for the batch and adds the batch to the file queue.  It may split the
> > batch if max_rows_per_file is set.  It may trigger backpressure if there
> > are too many rows queued on files.  This stage runs serially, on the CPU
> > thread.  There is never more than one queue task running.
> > Write stage: Each file has a number of write tasks.  These run in parallel
> > across files but serially within a file.  These are I/O tasks.
> >
> > > The throttle code seems only one task got running?
> >
> > Yes, there is a throttled scheduler used for the queue stage (we only run
> > one queue task at a time).  There is a throttled scheduler per file used
> > for the write stage.  All of these are configured to only allow one task at
> > a time to run.
> >
> > > What else can I do to inspect the problem?
> >
> > I think we need to find out why the CPU is still 800% after the transfer is
> > done when partitioning is enabled.  I would expect the CPU to drop to 0%
> > even if it takes several seconds (or longer) for the cached data to flush
> > to the disk.  The strack trace you shared is helpful but I don't know the
> > root cause yet.  All of the threads are stuck on locking / unlocking in
> > FutureImpl::TryAddCallback but that critical section is very small.  So it
> > seems like there is some kind of task storm.  I think this is similar to a
> > condition_variable that has thousands of waiters and is constantly doing a
> > notify_all.
> >
> > I think we will need to figure out some kind of reproducible test case.  I
> > will try and find some time to run some experiments on Monday.  Maybe I can
> > reproduce this by setting the backpressure limit to a very small amount.
> >
> > On Fri, Jul 28, 2023 at 6:48 AM Wenbo Hu <huwenbo1...@gmail.com> wrote:
> >
> > > Hi,
> > >    Thanks for your detailed explanation, I made some experiment today.
> > >
> > > Before experiment,
> > > 1. To limit the resources used by the server, I use docker, which uses
> > > cgroups. But "free" does not respect the resource limit inside the
> > > container.
> > > 2. I measured the write speed on the host by "dd if=/dev/zero
> > > of=./test.img bs=1G count=45 oflag=dsync", the output is "48318382080
> > > Byets(48 GB) Copies,132.558 s,365 MB/s"
> > > 3. I do not limit the memory of the container ( available over 140GB),
> > > but the CPU is still limit to 8. According to you explanation,  the
> > > write process should never slow down, since it will write to the
> > > "memory cached" which is accounted as used memory until it is flushed
> > > to the storage by the OS.
> > >
> > > Additionally, the file format is "feather", writing with/without
> > > partitioning leads to different result.
> > >
> > > ## write without partitioning
> > > Everything works fine, no matter what value I set to io_thread_count
> > > or cpu_count.
> > > the performance of same configuration varies a lot, the initial peak
> > > speed may result in different memory usage, but the max average flow
> > > speed not varies a lot.
> > > Some records are below,
> > > 1. With cpu_count and io_thread_count to 128 CPU is less than 100% and
> > > RES is less than 1G (after initial peak speed), average flow speed is
> > > 6.83M rows/s (45bytes per row).
> > > 2. With cpu_count to 1, io_thread_count to 16, CPU is a little over
> > > 100%, RES is about 3g at max, average flow speed is 4.64M rows/s, but
> > > it takes additional 6s to complete writing after transferring done.
> > > 3. With cpu_count to 1, io_thread_count to 128, performs almost as
> > > same as record 2.
> > >
> > > ## write with partitioning
> > > Writing with partitioning fails most of the time, setting lower cpu
> > > count not helping.
> > >  1. With cpu_count and io_thread_count to 128, CPU is 800% from
> > > begining, RES is growing slowing to 40.7G to the end of transferring,
> > > average flow speed is 3.24M rows/s. After that, CPU is still 800%, but
> > > RES going down very slow at 200MB/minute. Write speed not recovered.
> > >  2.With cpu_count to 1, io_thread_count to 16, CPU goes up to 800%
> > > slower than record1, RES is growing to 44.1G to the end of
> > > transferring, average flow speed is 6.75M rows/s. Same happens as
> > > record 1 after transferring done.
> > > 3. With cpu_count to 1, io_thread_count to 128, CPU goes to 800% much
> > > slower than record2 (due to slower flow speed?), RES is growing to 30G
> > > to the end of transferring, average flow speed is 1.62M rows/s. Same
> > > happens as record 1 after transferring done.
> > >
> > > Then I'm trying to limit the flow speed before writing queue got full
> > > with custom flow control (sleep on reader iteration based on available
> > > memory) But the sleep time curve is not accurate, sometimes flow slows
> > > down, but the queue got full anyway.
> > > Then the interesting thing happens, before the queue is full (memory
> > > quickly grows up), the CPU is not fully used. When memory grows up
> > > quickly, CPU goes up as well, to 800%.
> > > 1. Sometimes, the writing queue can overcome, CPU will goes down after
> > > the memory accumulated. The writing speed recoved and memory back to
> > > normal.
> > > 2. Sometimes, it can't. IOBPS goes down sharply, and CPU never goes
> > > down after that.
> > >
> > > How many io threads are writing concurrently in a single write_dataset
> > > call? How do they schedule? The throttle code seems only one task got
> > > running?
> > > What else can I do to inspect the problem?
> > >
> > > Weston Pace <weston.p...@gmail.com> 于2023年7月28日周五 00:33写道:
> > > >
> > > > You'll need to measure more but generally the bottleneck for writes is
> > > > usually going to be the disk itself.  Unfortunately, standard OS 
> > > > buffered
> > > > I/O has some pretty negative behaviors in this case.  First I'll 
> > > > describe
> > > > what I generally see happen (the last time I profiled this was a while
> > > back
> > > > but I don't think anything substantial has changed).
> > > >
> > > > * Initially, writes are very fast.  The OS `write` call is simply a
> > > memcpy
> > > > from user space into kernel space.  The actual flushing the data from
> > > > kernel space to disk happens asynchronously unless you are using direct
> > > I/O
> > > > (which is not currently supported).
> > > > * Over time, assuming the data arrival rate is faster than the data 
> > > > write
> > > > rate, the data will accumulate in kernel memory.  For example, if you
> > > > continuously run the Linux `free` program you will see the `free` column
> > > > decrease and the `buff/cache` column decreases.  The `available` column
> > > > should generally stay consistent (kernel memory that is in use but can
> > > > technically be flushed to disk if needed is still considered "available"
> > > > but not "free")
> > > > * Once the `free` column reaches 0 then a few things happen.  First, the
> > > > calls to `write` are no longer fast (the write cannot complete until 
> > > > some
> > > > existing data has been flushed to disk).  Second, other processes that
> > > > aren't in use might start swapping their data to disk (you will 
> > > > generally
> > > > see the entire system, if it is interactive, grind to a halt).  Third, 
> > > > if
> > > > you have an OOM-killer active, it may start to kill running 
> > > > applications.
> > > > It isn't supposed to do so but there are sometimes bugs[1].
> > > > * Assuming the OOM killer does not kill your application then, because
> > > the
> > > > `write` calls slow down, the number of rows in the dataset writer's 
> > > > queue
> > > > will start to fill up (this is captured by the variable
> > > > `rows_in_flight_throttle`.
> > > > * Once the rows_in_flight_throttle is full it will pause and the dataset
> > > > writer will return an unfinished future (asking the caller to back off).
> > > > * Once this happens the caller will apply backpressure (if being used in
> > > > Acero) which will pause the reader.  This backpressure is not instant 
> > > > and
> > > > generally each running CPU thread still delivers whatever batch it is
> > > > working on.  These batches essentially get added to an asynchronous
> > > > condition variable waiting on the dataset writer queue to free up.  This
> > > is
> > > > the spot where the ThrottledAsyncTaskScheduler is used.
> > > >
> > > > The stack dump that you reported is not exactly what I would have
> > > expected
> > > > but it might still match the above description.  At this point I am just
> > > > sort of guessing.  When the dataset writer frees up enough to receive
> > > > another batch it will do what is effectively a "notify all" and all of
> > > the
> > > > compute threads are waking up and trying to add their batch to the
> > > dataset
> > > > writer.  One of these gets through, gets added to the dataset writer, 
> > > > and
> > > > then backpressure is applied again and all the requests pile up once
> > > > again.  It's possible that a "resume sending" signal is sent and this
> > > might
> > > > actually lead to RAM filling up more.  We could probably mitigate this 
> > > > by
> > > > adding a low water mark to the dataset writer's backpressure throttle 
> > > > (so
> > > > it doesn't send the resume signal as soon as the queue has room but 
> > > > waits
> > > > until the queue is half full).
> > > >
> > > > I'd recommend watching the output of `free` (or monitoring memory in 
> > > > some
> > > > other way) and verifying the above.  I'd also suggest lowering the 
> > > > number
> > > > of CPU threads and see how that affects performance.  If you lower the
> > > CPU
> > > > threads enough then you should eventually get it to a point where your
> > > > supply of data is slower then your writer and I wouldn't expect memory 
> > > > to
> > > > accumulate.  These things are solutions but might give us more clues 
> > > > into
> > > > what is happening.
> > > >
> > > > [1]
> > > >
> > > https://unix.stackexchange.com/questions/300106/why-is-the-oom-killer-killing-processes-when-swap-is-hardly-used
> > > >
> > > > On Thu, Jul 27, 2023 at 4:53 AM Wenbo Hu <huwenbo1...@gmail.com> wrote:
> > > >
> > > > > Hi,
> > > > >     I'm using flight to receive streams from client and write to the
> > > > > storage with python `pa.dataset.write_dataset` API. The whole data is
> > > > > 1 Billion rows, over 40GB with one partition column ranges from 0~63.
> > > > > The container runs at 8-cores CPU and 4GB ram resources.
> > > > >     It can be done about 160s (6M rows/s, each record batch is about
> > > > > 32K rows) for completing transferring and writing almost
> > > > > synchronously, after setting 128 for io_thread_count.
> > > > >      Then I'd like to find out the bottleneck of the system, CPU or
> > > > > RAM or storage?
> > > > >     1. I extend the ram into 32GB, then the server consumes more ram,
> > > > > the writing progress works at the beginning, then suddenly slow down
> > > > > and the data accumulated into ram until OOM.
> > > > >     2. Then I set the ram to 64GB, so that the server will not killed
> > > > > by OOM. Same happens, also, after all the data is transferred (in
> > > > > memory), the server consumes all CPU shares (800%), but still very
> > > > > slow on writing (not totally stopped, but about 100MB/minute).
> > > > >     2.1 I'm wondering if the io thread is stuck, or the computation
> > > > > task is stuck. After setting both io_thread_count and cpu_count to 32,
> > > > > I wrapped the input stream of write_dataset with a callback on each
> > > > > record batch, I can tell that all the record batches are consumed into
> > > > > write_dataset API.
> > > > >     2.2 I dumped all threads stack traces and grab a flamegraph. See
> > > > > https://gist.github.com/hu6360567/e21ce04e7f620fafb5500cd93d44d3fb.
> > > > >
> > > > >      It seems that all threads stucks at
> > > ThrottledAsyncTaskSchedulerImpl.
> > > > >
> > > > > --
> > > > > ---------------------
> > > > > Best Regards,
> > > > > Wenbo Hu,
> > > > >
> > >
> > >
> > >
> > > --
> > > ---------------------
> > > Best Regards,
> > > Wenbo Hu,
> > >
>
>
>
> --
> ---------------------
> Best Regards,
> Wenbo Hu,



-- 
---------------------
Best Regards,
Wenbo Hu,

Reply via email to