Awesome, thanks for sharing this too!

The refactoring you have with DataClientStream what I would like to do as well 
- I think much of the existing code can be adapted to be more 
transport-agnostic and then it will be easier to support new transports 
(whether data-only or for all methods). 

Where do you see the gaps between gRPC and this? I think what would happen is 
1) client calls GetFlightInfo 2) server returns a `shm://` URI 3) client sees 
the unfamiliar prefix and creates a new client for the DoGet call (it would 
have to do this anyways if, for instance, the GetFlightInfo call returned the 
address of a different server).

I also wonder how this stacks up to UCX's shared memory backend (I did not test 
this though).

I would like to be able to support entire new transports for certain cases 
(namely browser support - though perhaps one of the gRPC proxies would suffice 
there), but even in that case, we could make it so that a new transport only 
needs to implement the data plane methods. Only having to support the data 
plane methods would save significant implementation effort for all non-browser 
cases so I think it's a worthwhile approach.

-David

On Wed, Dec 29, 2021, at 04:37, Yibo Cai wrote:
> Thanks David to initiate UCX integration, great work!
> I think 5Gbps network is too limited for performance evaluation. I will try 
> the patch on 100Gb RDMA network, hopefully we can see some improvements.
> I once benchmarked flight over 100Gb network [1], grpc based throughput is 
> 2.4GB/s for one thread, 8.8GB/s for six threads, about 60us latency. I also 
> benchmarked raw RDMA performance (same batch sizes as flight), one thread can 
> achive 9GB/s with 12us latency. Of couse the comparison is not fair. With 
> David's patch, we can get a more realistic comparison.
> 
> I'm implementing a data plane approach to hope we can adopt new data 
> acceleration methods easily. My approach is to replace only the FlighData 
> transmission of DoGet/Put/Exchange with data plane drivers, and grpc is still 
> used for all rpc calls.
> Code is at my github repo [2]. Besides the framework, I just implemented a 
> shared memory data plane driver as PoC. Get/Put/Exchange unit tests passed, 
> TestCancel hangs, some unit tests run longer than expected, still debugging. 
> The shared memory data plane performance is pretty bad now, due to repeated 
> map/unmap for each read/write, pre-allocated pages should improve much, still 
> experimenting.
> 
> Would like to hear community comments.
> 
> My personal opinion is the data plane approach reuses grpc control plane, may 
> be easier to add new data acceleration methods, but it needs to fit into grpc 
> seamlessly (there're still gaps not resolved). A new tranport requires much 
> more initial effort, but may payoff later. And looks these two approaches 
> don't conflict with each other.
> 
> [1] Test environment
> nics: mellanox connectx5
> hosts: client (neoverse n1), server (xeon gold 5218)
> os: ubuntu 20.04, linux kernel 5.4
> test case: 128k batch size, DoGet
> 
> [2] https://github.com/cyb70289/arrow/tree/flight-data-plane
> 
> ________________________________
> From: David Li <lidav...@apache.org>
> Sent: Wednesday, December 29, 2021 3:09 AM
> To: dev@arrow.apache.org <dev@arrow.apache.org>
> Subject: Re: Arrow in HPC
> 
> I ended up drafting an implementation of Flight based on UCX, and doing some
> of the necessary refactoring to support additional backends in the future.
> It can run the Flight benchmark, and performance is about comparable to
> gRPC, as tested on AWS EC2.
> 
> The implementation is based on the UCP streams API. It's extremely
> bare-bones and is really only a proof of concept; a good amount of work is
> needed to turn it into a usable implementation. I had hoped it would perform
> markedly better than gRPC, at least in this early test, but this seems not
> to be the case. That said: I am likely not using UCX properly, UCX would
> still open up support for additional hardware, and this work should allow
> other backends to be implemented more easily.
> 
> The branch can be viewed at
> https://github.com/lidavidm/arrow/tree/flight-ucx
> 
> I've attached the benchmark output at the end.
> 
> There are still quite a few TODOs and things that need investigating:
> 
> - Only DoGet and GetFlightInfo are implemented, and incompletely at that.
> - Concurrent requests are not supported, or even making more than one
>   request on a connection, nor does the server support concurrent clients.
>   We also need to decide whether to even support concurrent requests, and
>   how (e.g. pooling multiple connections, or implementing a gRPC/HTTP2 style
>   protocol, or even possibly implementing HTTP2).
> - We need to make sure we properly handle errors, etc. everywhere.
> - Are we using UCX in a performant and idiomatic manner? Will the
>   implementation work well on RDMA and other specialized hardware?
> - Do we also need to support the UCX tag API?
> - Can we refactor out interfaces that allow sharing more of the
>   client/server implementation between different backends?
> - Are the abstractions sufficient to support other potential backends like
>   MPI, libfabrics, or WebSockets?
> 
> If anyone has experience with UCX, I'd appreciate any feedback. Otherwise,
> I'm hoping to plan out and try to tackle some of the TODOs above, and figure
> out how this effort can proceed.
> 
> Antoine/Micah raised the possibility of extending gRPC instead. That would
> be preferable, frankly, given otherwise we'd might have to re-implement a
> lot of what gRPC and HTTP2 provide by ourselves. However, the necessary
> proposal stalled and was dropped without much discussion:
> https://groups.google.com/g/grpc-io/c/oIbBfPVO0lY
> 
> Benchmark results (also uploaded at
> https://gist.github.com/lidavidm/c4676c5d9c89d4cc717d6dea07dee952):
> 
> Testing was done between two t3.xlarge instances in the same zone.
> t3.xlarge has "up to 5 Gbps" of bandwidth (~600 MiB/s).
> 
> (ucx) ubuntu@ip-172-31-37-78:~/arrow/build$ env UCX_LOG_LEVEL=info 
> ./relwithdebinfo/arrow-flight-benchmark -transport ucx -server_host 
> 172.31.34.4 -num_streams=1 -num_threads=1 -records_per_stream=40960000 
> -records_per_batch=4096
> Testing method: DoGet
> [1640703417.639373] [ip-172-31-37-78:10110:0]     ucp_worker.c:1627 UCX  INFO 
>  ep_cfg[1]: tag(tcp/ens5); stream(tcp/ens5);
> [1640703417.650068] [ip-172-31-37-78:10110:1]     ucp_worker.c:1627 UCX  INFO 
>  ep_cfg[1]: tag(tcp/ens5); stream(tcp/ens5);
> Number of perf runs: 1
> Number of concurrent gets/puts: 1
> Batch size: 131072
> Batches read: 10000
> Bytes read: 1310720000
> Nanos: 2165862969
> Speed: 577.137 MB/s
> Throughput: 4617.1 batches/s
> Latency mean: 214 us
> Latency quantile=0.5: 209 us
> Latency quantile=0.95: 340 us
> Latency quantile=0.99: 409 us
> Latency max: 6350 us
> (ucx) ubuntu@ip-172-31-37-78:~/arrow/build$ env UCX_LOG_LEVEL=info 
> ./relwithdebinfo/arrow-flight-benchmark -transport ucx -server_host 
> 172.31.34.4 -num_streams=1 -num_threads=1 -records_per_stream=655360000 
> -records_per_batch=65536
> Testing method: DoGet
> [1640703439.428785] [ip-172-31-37-78:10116:0]     ucp_worker.c:1627 UCX  INFO 
>  ep_cfg[1]: tag(tcp/ens5); stream(tcp/ens5);
> [1640703439.440359] [ip-172-31-37-78:10116:1]     ucp_worker.c:1627 UCX  INFO 
>  ep_cfg[1]: tag(tcp/ens5); stream(tcp/ens5);
> Number of perf runs: 1
> Number of concurrent gets/puts: 1
> Batch size: 2097152
> Batches read: 10000
> Bytes read: 20971520000
> Nanos: 34184175236
> Speed: 585.066 MB/s
> Throughput: 292.533 batches/s
> Latency mean: 3415 us
> Latency quantile=0.5: 3408 us
> Latency quantile=0.95: 3549 us
> Latency quantile=0.99: 3800 us
> Latency max: 20236 us
> (ucx) ubuntu@ip-172-31-37-78:~/arrow/build$ env UCX_LOG_LEVEL=info 
> ./relwithdebinfo/arrow-flight-benchmark -transport grpc -server_host 
> 172.31.34.4 -num_streams=1 -num_threads=1 -records_per_stream=40960000 
> -records_per_batch=4096
> Testing method: DoGet
> Using standalone TCP server
> Server host: 172.31.34.4
> Server port: 31337
> Number of perf runs: 1
> Number of concurrent gets/puts: 1
> Batch size: 131072
> Batches read: 10000
> Bytes read: 1310720000
> Nanos: 2375289668
> Speed: 526.252 MB/s
> Throughput: 4210.01 batches/s
> Latency mean: 235 us
> Latency quantile=0.5: 203 us
> Latency quantile=0.95: 328 us
> Latency quantile=0.99: 1377 us
> Latency max: 17860 us
> (ucx) ubuntu@ip-172-31-37-78:~/arrow/build$ env UCX_LOG_LEVEL=info 
> ./relwithdebinfo/arrow-flight-benchmark -transport grpc -server_host 
> 172.31.34.4 -num_streams=1 -num_threads=1 -records_per_stream=655360000 
> -records_per_batch=65536
> Testing method: DoGet
> Using standalone TCP server
> Server host: 172.31.34.4
> Server port: 31337
> Number of perf runs: 1
> Number of concurrent gets/puts: 1
> Batch size: 2097152
> Batches read: 10000
> Bytes read: 20971520000
> Nanos: 34202704498
> Speed: 584.749 MB/s
> Throughput: 292.375 batches/s
> Latency mean: 3416 us
> Latency quantile=0.5: 3406 us
> Latency quantile=0.95: 3548 us
> Latency quantile=0.99: 3764 us
> Latency max: 17086 us
> (ucx) ubuntu@ip-172-31-37-78:~/arrow/build$ iperf3 -c 172.31.34.4 -p 1337 -Z 
> -l 1M
> Connecting to host 172.31.34.4, port 1337
> [  5] local 172.31.37.78 port 48422 connected to 172.31.34.4 port 1337
> [ ID] Interval           Transfer     Bitrate         Retr  Cwnd
> [  5]   0.00-1.00   sec   572 MBytes  4.79 Gbits/sec   36   2.35 MBytes
> [  5]   1.00-2.00   sec   582 MBytes  4.88 Gbits/sec    0   2.43 MBytes
> [  5]   2.00-3.00   sec   585 MBytes  4.91 Gbits/sec    0   2.43 MBytes
> [  5]   3.00-4.00   sec   587 MBytes  4.92 Gbits/sec    0   2.43 MBytes
> [  5]   4.00-5.00   sec   587 MBytes  4.92 Gbits/sec    0   2.43 MBytes
> [  5]   5.00-6.00   sec   586 MBytes  4.91 Gbits/sec    0   2.43 MBytes
> [  5]   6.00-7.00   sec   586 MBytes  4.92 Gbits/sec    0   2.43 MBytes
> [  5]   7.00-8.00   sec   580 MBytes  4.87 Gbits/sec    0   2.43 MBytes
> [  5]   8.00-9.00   sec   584 MBytes  4.89 Gbits/sec    0   2.43 MBytes
> [  5]   9.00-10.00  sec   577 MBytes  4.84 Gbits/sec    0   2.43 MBytes
> - - - - - - - - - - - - - - - - - - - - - - - - -
> [ ID] Interval           Transfer     Bitrate         Retr
> [  5]   0.00-10.00  sec  5.69 GBytes  4.89 Gbits/sec   36             sender
> [  5]   0.00-10.00  sec  5.69 GBytes  4.88 Gbits/sec                  receiver
> 
> iperf Done.
> 
> Best,
> David
> 
> On Tue, Nov 2, 2021, at 19:59, Jed Brown wrote:
> > "David Li" <lidav...@apache.org> writes:
> >
> > > Thanks for the clarification Yibo, looking forward to the results. Even 
> > > if it is a very hacky PoC it will be interesting to see how it affects 
> > > performance, though as Keith points out there are benefits in general to 
> > > UCX (or similar library), and we can work out the implementation plan 
> > > from there.
> > >
> > > To Benson's point - the work done to get UCX supported would pave the way 
> > > to supporting other backends as well. I'm personally not familiar with 
> > > UCX, MPI, etc. so is MPI here more about playing well with established 
> > > practices or does it also offer potential hardware support/performance 
> > > improvements like UCX would?
> >
> > There are two main implementations of MPI, MPICH and Open MPI, both of 
> > which are permissively licensed open source community projects. Both have 
> > direct support for UCX and unless your needs are very specific, the 
> > overhead of going through MPI is likely to be negligible. Both also have 
> > proprietary derivatives, such as Cray MPI (MPICH derivative) and Spectrum 
> > MPI (Open MPI derivative), which may have optimizations for proprietary 
> > networks. Both MPICH and Open MPI can be built without UCX, and this is 
> > often easier (UCX 'master' is more volatile in my experience).
> >
> > The vast majority of distributed memory scientific applications use MPI or 
> > higher level libraries, rather than writing directly to UCX (which provides 
> > less coverage of HPC networks). I think MPI compatibility is important.
> >
> > From way up-thread (sorry):
> >
> > >> >>>>> Jed - how would you see MPI and Flight interacting? As another
> > >> >>>>> transport/alternative to UCX? I admit I'm not familiar with the HPC
> > >> >>>>> space.
> >
> > MPI has collective operations like MPI_Allreduce (perform a reduction and 
> > give every process the result; these run in log(P) or better time with 
> > small constants -- 15 microseconds is typical for a cheap reduction 
> > operation on a million processes). MPI supports user-defined operations for 
> > reductions and prefix-scan operations. If we defined MPI_Ops for Arrow 
> > types, we could compute summary statistics and other algorithmic building 
> > blocks fast at arbitrary scale.
> >
> > The collective execution model might not be everyone's bag, but MPI_Op can 
> > also be used in one-sided operations (MPI_Accumulate and MPI_Fetch_and_op) 
> > and dropping into collective mode has big advantages for certain algorithms 
> > in computational statistics/machine learning.
> >
> IMPORTANT NOTICE: The contents of this email and any attachments are 
> confidential and may also be privileged. If you are not the intended 
> recipient, please notify the sender immediately and do not disclose the 
> contents to any other person, use it for any purpose, or store or copy the 
> information in any medium. Thank you.
> 

Reply via email to