One general observation. I think this implementation uses the polling to
check the progress. Because of the client server semantics of Arrow Flight,
you may need to use an interrupt based polling like epoll to avoid the busy
looping.

Best,
Supun..

On Tue, Jan 18, 2022 at 8:13 AM David Li <lidav...@apache.org> wrote:

> Thanks for those results, Yibo! Looks like there's still more room for
> improvement here. Yes, things are a little unstable, though I didn't
> get that much trouble trying to just start the benchmark - I will need
> to find suitable hardware and iron out these issues. Note that I've
> only implemented DoGet, and I haven't implemented concurrent streams,
> which would explain why most benchmark configurations hang or error.
>
> Since the last time, I've rewritten the prototype to use UCX's "active
> message" functionality instead of trying to implement messages over
> the "streams" API. This simplified the code. I also did some
> refactoring along the lines of Yibo's prototype to share more code
> between the gRPC and UCX implementations. Here are some benchmark
> numbers:
>
> For IPC (server/client on the same machine): UCX with shared memory
> handily beats gRPC here. UCX with TCP isn't quite up to par, though.
>
> gRPC:
> 128KiB batches: 4463 MiB/s
> 2MiB batches:   3537 MiB/s
> 32MiB batches:  1828 MiB/s
>
> UCX (shared memory):
> 128KiB batches: 6500 MiB/s
> 2MiB batches:  13879 MiB/s
> 32MiB batches:  9045 MiB/s
>
> UCX (TCP):
> 128KiB batches: 1069 MiB/s
> 2MiB batches:   1735 MiB/s
> 32MiB batches:  1602 MiB/s
>
> For RPC (server/client on different machines): Two t3.xlarge (4 core,
> 16 thread) machines were used in AWS EC2. These have "up to" 5Gbps
> bandwidth. This isn't really a scenario where UCX is expected to
> shine, however, UCX performs comparably to gRPC here.
>
> gRPC:
> 128 KiB batches: 554 MiB/s
> 2 MiB batches:   575 MiB/s
>
> UCX:
> 128 KiB batches: 546 MiB/s
> 2 MiB batches:   567 MiB/s
>
> Raw test logs can be found here:
> https://gist.github.com/lidavidm/57d8a3cba46229e4d277ae0730939acc
>
> For IPC, the shared memory results are promising in that it could be
> feasible to expose a library purely over Flight without worrying about
> FFI bindings. Also, it seems results are roughly comparable to what
> Yibo observed in ARROW-15282 [1] meaning UCX will get us both a
> performant shared memory transport and support for more exotic
> hardware.
>
> There's still much work to be done; at this point, I'd like to start
> implementing the rest of the Flight methods, fixing up the many TODOs
> scattered around, trying to refactor more things to share code between
> gRPC/UCX, and find and benchmark some hardware that UCX has a fast
> path for.
>
> [1]: https://issues.apache.org/jira/browse/ARROW-15282
>
> -David
>
> On Tue, Jan 18, 2022, at 04:35, Yibo Cai wrote:
> > Some updates.
> >
> > I tested David's UCX transport patch over 100Gb network. FlightRPC over
> > UCX/RDMA improves throughput about 50%, with lower and flat latency.
> > And I think there are chances to improve further. See test report [1].
> >
> > For the data plane approach, the PoC shared memory data plane also
> > introduces significantly performance boost. Details at [2].
> >
> > Glad to see there are big potentials to improve FlightRPC performance.
> >
> > [1] https://issues.apache.org/jira/browse/ARROW-15229
> > [2] https://issues.apache.org/jira/browse/ARROW-15282
> >
> > On 12/30/21 11:57 PM, David Li wrote:
> > > Ah, I see.
> > >
> > > I think both projects can proceed as well. At some point we will have
> to figure out how to merge them, but I think it's too early to see how
> exactly we will want to refactor things.
> > >
> > > I looked over the code and I don't have any important comments for
> now. Looking forward to reviewing when it's ready.
> > >
> > > -David
> > >
> > > On Wed, Dec 29, 2021, at 22:16, Yibo Cai wrote:
> > >>
> > >>
> > >> On 12/29/21 11:03 PM, David Li wrote:
> > >>> Awesome, thanks for sharing this too!
> > >>>
> > >>> The refactoring you have with DataClientStream what I would like to
> do as well - I think much of the existing code can be adapted to be more
> transport-agnostic and then it will be easier to support new transports
> (whether data-only or for all methods).
> > >>>
> > >>> Where do you see the gaps between gRPC and this? I think what would
> happen is 1) client calls GetFlightInfo 2) server returns a `shm://` URI 3)
> client sees the unfamiliar prefix and creates a new client for the DoGet
> call (it would have to do this anyways if, for instance, the GetFlightInfo
> call returned the address of a different server).
> > >>>
> > >>
> > >> I mean implementation details. Some unit test runs longer than
> expected
> > >> (data plane timeouts reading from an ended stream). Looks grpc stream
> > >> finish message is not correctly intercepted and forwarded to data
> plane.
> > >> I don't think it's big problem, just need some time to debug.
> > >>
> > >>> I also wonder how this stacks up to UCX's shared memory backend (I
> did not test this though).
> > >>>
> > >>
> > >> I implemented a shared memory data plane only to verify and
> consolidate
> > >> the data plane design, as it's the easiest (and useful) driver. I also
> > >> plan to implement a socket based data plane, not useful in practice,
> > >> only to make sure the design works ok across network. Then we can add
> > >> more useful drivers like UCX or DPDK (the benefit of DPDK is it works
> on
> > >> commodity hardware, unlike UCX/RDMA which requires expensive
> equipment).
> > >>
> > >>> I would like to be able to support entire new transports for certain
> cases (namely browser support - though perhaps one of the gRPC proxies
> would suffice there), but even in that case, we could make it so that a new
> transport only needs to implement the data plane methods. Only having to
> support the data plane methods would save significant implementation effort
> for all non-browser cases so I think it's a worthwhile approach.
> > >>>
> > >>
> > >> Thanks for being interest in this approach. My current plan is to
> first
> > >> refactor shared memory data plane to verify it beats grpc in local rpc
> > >> by considerable margin, otherwise there must be big mistakes in my
> > >> design. After that I will fix unit test issues and deliver for
> community
> > >> review.
> > >>
> > >> Anyway, don't let me block your implementations. And if you think it's
> > >> useful, I can push current code for more detailed discussion.
> > >>
> > >>> -David
> > >>>
> > >>> On Wed, Dec 29, 2021, at 04:37, Yibo Cai wrote:
> > >>>> Thanks David to initiate UCX integration, great work!
> > >>>> I think 5Gbps network is too limited for performance evaluation. I
> will try the patch on 100Gb RDMA network, hopefully we can see some
> improvements.
> > >>>> I once benchmarked flight over 100Gb network [1], grpc based
> throughput is 2.4GB/s for one thread, 8.8GB/s for six threads, about 60us
> latency. I also benchmarked raw RDMA performance (same batch sizes as
> flight), one thread can achive 9GB/s with 12us latency. Of couse the
> comparison is not fair. With David's patch, we can get a more realistic
> comparison.
> > >>>>
> > >>>> I'm implementing a data plane approach to hope we can adopt new
> data acceleration methods easily. My approach is to replace only the
> FlighData transmission of DoGet/Put/Exchange with data plane drivers, and
> grpc is still used for all rpc calls.
> > >>>> Code is at my github repo [2]. Besides the framework, I just
> implemented a shared memory data plane driver as PoC. Get/Put/Exchange unit
> tests passed, TestCancel hangs, some unit tests run longer than expected,
> still debugging. The shared memory data plane performance is pretty bad
> now, due to repeated map/unmap for each read/write, pre-allocated pages
> should improve much, still experimenting.
> > >>>>
> > >>>> Would like to hear community comments.
> > >>>>
> > >>>> My personal opinion is the data plane approach reuses grpc control
> plane, may be easier to add new data acceleration methods, but it needs to
> fit into grpc seamlessly (there're still gaps not resolved). A new tranport
> requires much more initial effort, but may payoff later. And looks these
> two approaches don't conflict with each other.
> > >>>>
> > >>>> [1] Test environment
> > >>>> nics: mellanox connectx5
> > >>>> hosts: client (neoverse n1), server (xeon gold 5218)
> > >>>> os: ubuntu 20.04, linux kernel 5.4
> > >>>> test case: 128k batch size, DoGet
> > >>>>
> > >>>> [2] https://github.com/cyb70289/arrow/tree/flight-data-plane
> > >>>>
> > >>>> ________________________________
> > >>>> From: David Li <lidav...@apache.org>
> > >>>> Sent: Wednesday, December 29, 2021 3:09 AM
> > >>>> To: dev@arrow.apache.org <dev@arrow.apache.org>
> > >>>> Subject: Re: Arrow in HPC
> > >>>>
> > >>>> I ended up drafting an implementation of Flight based on UCX, and
> doing some
> > >>>> of the necessary refactoring to support additional backends in the
> future.
> > >>>> It can run the Flight benchmark, and performance is about
> comparable to
> > >>>> gRPC, as tested on AWS EC2.
> > >>>>
> > >>>> The implementation is based on the UCP streams API. It's extremely
> > >>>> bare-bones and is really only a proof of concept; a good amount of
> work is
> > >>>> needed to turn it into a usable implementation. I had hoped it
> would perform
> > >>>> markedly better than gRPC, at least in this early test, but this
> seems not
> > >>>> to be the case. That said: I am likely not using UCX properly, UCX
> would
> > >>>> still open up support for additional hardware, and this work should
> allow
> > >>>> other backends to be implemented more easily.
> > >>>>
> > >>>> The branch can be viewed at
> > >>>> https://github.com/lidavidm/arrow/tree/flight-ucx
> > >>>>
> > >>>> I've attached the benchmark output at the end.
> > >>>>
> > >>>> There are still quite a few TODOs and things that need
> investigating:
> > >>>>
> > >>>> - Only DoGet and GetFlightInfo are implemented, and incompletely at
> that.
> > >>>> - Concurrent requests are not supported, or even making more than
> one
> > >>>>     request on a connection, nor does the server support concurrent
> clients.
> > >>>>     We also need to decide whether to even support concurrent
> requests, and
> > >>>>     how (e.g. pooling multiple connections, or implementing a
> gRPC/HTTP2 style
> > >>>>     protocol, or even possibly implementing HTTP2).
> > >>>> - We need to make sure we properly handle errors, etc. everywhere.
> > >>>> - Are we using UCX in a performant and idiomatic manner? Will the
> > >>>>     implementation work well on RDMA and other specialized hardware?
> > >>>> - Do we also need to support the UCX tag API?
> > >>>> - Can we refactor out interfaces that allow sharing more of the
> > >>>>     client/server implementation between different backends?
> > >>>> - Are the abstractions sufficient to support other potential
> backends like
> > >>>>     MPI, libfabrics, or WebSockets?
> > >>>>
> > >>>> If anyone has experience with UCX, I'd appreciate any feedback.
> Otherwise,
> > >>>> I'm hoping to plan out and try to tackle some of the TODOs above,
> and figure
> > >>>> out how this effort can proceed.
> > >>>>
> > >>>> Antoine/Micah raised the possibility of extending gRPC instead.
> That would
> > >>>> be preferable, frankly, given otherwise we'd might have to
> re-implement a
> > >>>> lot of what gRPC and HTTP2 provide by ourselves. However, the
> necessary
> > >>>> proposal stalled and was dropped without much discussion:
> > >>>> https://groups.google.com/g/grpc-io/c/oIbBfPVO0lY
> > >>>>
> > >>>> Benchmark results (also uploaded at
> > >>>> https://gist.github.com/lidavidm/c4676c5d9c89d4cc717d6dea07dee952):
> > >>>>
> > >>>> Testing was done between two t3.xlarge instances in the same zone.
> > >>>> t3.xlarge has "up to 5 Gbps" of bandwidth (~600 MiB/s).
> > >>>>
> > >>>> (ucx) ubuntu@ip-172-31-37-78:~/arrow/build$ env UCX_LOG_LEVEL=info
> ./relwithdebinfo/arrow-flight-benchmark -transport ucx -server_host
> 172.31.34.4 -num_streams=1 -num_threads=1 -records_per_stream=40960000
> -records_per_batch=4096
> > >>>> Testing method: DoGet
> > >>>> [1640703417.639373] [ip-172-31-37-78:10110:0]     ucp_worker.c:1627
> UCX  INFO  ep_cfg[1]: tag(tcp/ens5); stream(tcp/ens5);
> > >>>> [1640703417.650068] [ip-172-31-37-78:10110:1]     ucp_worker.c:1627
> UCX  INFO  ep_cfg[1]: tag(tcp/ens5); stream(tcp/ens5);
> > >>>> Number of perf runs: 1
> > >>>> Number of concurrent gets/puts: 1
> > >>>> Batch size: 131072
> > >>>> Batches read: 10000
> > >>>> Bytes read: 1310720000
> > >>>> Nanos: 2165862969
> > >>>> Speed: 577.137 MB/s
> > >>>> Throughput: 4617.1 batches/s
> > >>>> Latency mean: 214 us
> > >>>> Latency quantile=0.5: 209 us
> > >>>> Latency quantile=0.95: 340 us
> > >>>> Latency quantile=0.99: 409 us
> > >>>> Latency max: 6350 us
> > >>>> (ucx) ubuntu@ip-172-31-37-78:~/arrow/build$ env UCX_LOG_LEVEL=info
> ./relwithdebinfo/arrow-flight-benchmark -transport ucx -server_host
> 172.31.34.4 -num_streams=1 -num_threads=1 -records_per_stream=655360000
> -records_per_batch=65536
> > >>>> Testing method: DoGet
> > >>>> [1640703439.428785] [ip-172-31-37-78:10116:0]     ucp_worker.c:1627
> UCX  INFO  ep_cfg[1]: tag(tcp/ens5); stream(tcp/ens5);
> > >>>> [1640703439.440359] [ip-172-31-37-78:10116:1]     ucp_worker.c:1627
> UCX  INFO  ep_cfg[1]: tag(tcp/ens5); stream(tcp/ens5);
> > >>>> Number of perf runs: 1
> > >>>> Number of concurrent gets/puts: 1
> > >>>> Batch size: 2097152
> > >>>> Batches read: 10000
> > >>>> Bytes read: 20971520000
> > >>>> Nanos: 34184175236
> > >>>> Speed: 585.066 MB/s
> > >>>> Throughput: 292.533 batches/s
> > >>>> Latency mean: 3415 us
> > >>>> Latency quantile=0.5: 3408 us
> > >>>> Latency quantile=0.95: 3549 us
> > >>>> Latency quantile=0.99: 3800 us
> > >>>> Latency max: 20236 us
> > >>>> (ucx) ubuntu@ip-172-31-37-78:~/arrow/build$ env UCX_LOG_LEVEL=info
> ./relwithdebinfo/arrow-flight-benchmark -transport grpc -server_host
> 172.31.34.4 -num_streams=1 -num_threads=1 -records_per_stream=40960000
> -records_per_batch=4096
> > >>>> Testing method: DoGet
> > >>>> Using standalone TCP server
> > >>>> Server host: 172.31.34.4
> > >>>> Server port: 31337
> > >>>> Number of perf runs: 1
> > >>>> Number of concurrent gets/puts: 1
> > >>>> Batch size: 131072
> > >>>> Batches read: 10000
> > >>>> Bytes read: 1310720000
> > >>>> Nanos: 2375289668
> > >>>> Speed: 526.252 MB/s
> > >>>> Throughput: 4210.01 batches/s
> > >>>> Latency mean: 235 us
> > >>>> Latency quantile=0.5: 203 us
> > >>>> Latency quantile=0.95: 328 us
> > >>>> Latency quantile=0.99: 1377 us
> > >>>> Latency max: 17860 us
> > >>>> (ucx) ubuntu@ip-172-31-37-78:~/arrow/build$ env UCX_LOG_LEVEL=info
> ./relwithdebinfo/arrow-flight-benchmark -transport grpc -server_host
> 172.31.34.4 -num_streams=1 -num_threads=1 -records_per_stream=655360000
> -records_per_batch=65536
> > >>>> Testing method: DoGet
> > >>>> Using standalone TCP server
> > >>>> Server host: 172.31.34.4
> > >>>> Server port: 31337
> > >>>> Number of perf runs: 1
> > >>>> Number of concurrent gets/puts: 1
> > >>>> Batch size: 2097152
> > >>>> Batches read: 10000
> > >>>> Bytes read: 20971520000
> > >>>> Nanos: 34202704498
> > >>>> Speed: 584.749 MB/s
> > >>>> Throughput: 292.375 batches/s
> > >>>> Latency mean: 3416 us
> > >>>> Latency quantile=0.5: 3406 us
> > >>>> Latency quantile=0.95: 3548 us
> > >>>> Latency quantile=0.99: 3764 us
> > >>>> Latency max: 17086 us
> > >>>> (ucx) ubuntu@ip-172-31-37-78:~/arrow/build$ iperf3 -c 172.31.34.4
> -p 1337 -Z -l 1M
> > >>>> Connecting to host 172.31.34.4, port 1337
> > >>>> [  5] local 172.31.37.78 port 48422 connected to 172.31.34.4 port
> 1337
> > >>>> [ ID] Interval           Transfer     Bitrate         Retr  Cwnd
> > >>>> [  5]   0.00-1.00   sec   572 MBytes  4.79 Gbits/sec   36   2.35
> MBytes
> > >>>> [  5]   1.00-2.00   sec   582 MBytes  4.88 Gbits/sec    0   2.43
> MBytes
> > >>>> [  5]   2.00-3.00   sec   585 MBytes  4.91 Gbits/sec    0   2.43
> MBytes
> > >>>> [  5]   3.00-4.00   sec   587 MBytes  4.92 Gbits/sec    0   2.43
> MBytes
> > >>>> [  5]   4.00-5.00   sec   587 MBytes  4.92 Gbits/sec    0   2.43
> MBytes
> > >>>> [  5]   5.00-6.00   sec   586 MBytes  4.91 Gbits/sec    0   2.43
> MBytes
> > >>>> [  5]   6.00-7.00   sec   586 MBytes  4.92 Gbits/sec    0   2.43
> MBytes
> > >>>> [  5]   7.00-8.00   sec   580 MBytes  4.87 Gbits/sec    0   2.43
> MBytes
> > >>>> [  5]   8.00-9.00   sec   584 MBytes  4.89 Gbits/sec    0   2.43
> MBytes
> > >>>> [  5]   9.00-10.00  sec   577 MBytes  4.84 Gbits/sec    0   2.43
> MBytes
> > >>>> - - - - - - - - - - - - - - - - - - - - - - - - -
> > >>>> [ ID] Interval           Transfer     Bitrate         Retr
> > >>>> [  5]   0.00-10.00  sec  5.69 GBytes  4.89 Gbits/sec   36
>    sender
> > >>>> [  5]   0.00-10.00  sec  5.69 GBytes  4.88 Gbits/sec
>   receiver
> > >>>>
> > >>>> iperf Done.
> > >>>>
> > >>>> Best,
> > >>>> David
> > >>>>
> > >>>> On Tue, Nov 2, 2021, at 19:59, Jed Brown wrote:
> > >>>>> "David Li" <lidav...@apache.org> writes:
> > >>>>>
> > >>>>>> Thanks for the clarification Yibo, looking forward to the
> results. Even if it is a very hacky PoC it will be interesting to see how
> it affects performance, though as Keith points out there are benefits in
> general to UCX (or similar library), and we can work out the implementation
> plan from there.
> > >>>>>>
> > >>>>>> To Benson's point - the work done to get UCX supported would pave
> the way to supporting other backends as well. I'm personally not familiar
> with UCX, MPI, etc. so is MPI here more about playing well with established
> practices or does it also offer potential hardware support/performance
> improvements like UCX would?
> > >>>>>
> > >>>>> There are two main implementations of MPI, MPICH and Open MPI,
> both of which are permissively licensed open source community projects.
> Both have direct support for UCX and unless your needs are very specific,
> the overhead of going through MPI is likely to be negligible. Both also
> have proprietary derivatives, such as Cray MPI (MPICH derivative) and
> Spectrum MPI (Open MPI derivative), which may have optimizations for
> proprietary networks. Both MPICH and Open MPI can be built without UCX, and
> this is often easier (UCX 'master' is more volatile in my experience).
> > >>>>>
> > >>>>> The vast majority of distributed memory scientific applications
> use MPI or higher level libraries, rather than writing directly to UCX
> (which provides less coverage of HPC networks). I think MPI compatibility
> is important.
> > >>>>>
> > >>>>>   From way up-thread (sorry):
> > >>>>>
> > >>>>>>>>>>>> Jed - how would you see MPI and Flight interacting? As
> another
> > >>>>>>>>>>>> transport/alternative to UCX? I admit I'm not familiar with
> the HPC
> > >>>>>>>>>>>> space.
> > >>>>>
> > >>>>> MPI has collective operations like MPI_Allreduce (perform a
> reduction and give every process the result; these run in log(P) or better
> time with small constants -- 15 microseconds is typical for a cheap
> reduction operation on a million processes). MPI supports user-defined
> operations for reductions and prefix-scan operations. If we defined MPI_Ops
> for Arrow types, we could compute summary statistics and other algorithmic
> building blocks fast at arbitrary scale.
> > >>>>>
> > >>>>> The collective execution model might not be everyone's bag, but
> MPI_Op can also be used in one-sided operations (MPI_Accumulate and
> MPI_Fetch_and_op) and dropping into collective mode has big advantages for
> certain algorithms in computational statistics/machine learning.
> > >>>>>
> > >>>> IMPORTANT NOTICE: The contents of this email and any attachments
> are confidential and may also be privileged. If you are not the intended
> recipient, please notify the sender immediately and do not disclose the
> contents to any other person, use it for any purpose, or store or copy the
> information in any medium. Thank you.
> > >>>>
> > >>>
> > >>
> > >
> >
>


-- 
Supun Kamburugamuve

Reply via email to