I ended up drafting an implementation of Flight based on UCX, and doing some of the necessary refactoring to support additional backends in the future. It can run the Flight benchmark, and performance is about comparable to gRPC, as tested on AWS EC2.
The implementation is based on the UCP streams API. It's extremely bare-bones and is really only a proof of concept; a good amount of work is needed to turn it into a usable implementation. I had hoped it would perform markedly better than gRPC, at least in this early test, but this seems not to be the case. That said: I am likely not using UCX properly, UCX would still open up support for additional hardware, and this work should allow other backends to be implemented more easily. The branch can be viewed at https://github.com/lidavidm/arrow/tree/flight-ucx I've attached the benchmark output at the end. There are still quite a few TODOs and things that need investigating: - Only DoGet and GetFlightInfo are implemented, and incompletely at that. - Concurrent requests are not supported, or even making more than one request on a connection, nor does the server support concurrent clients. We also need to decide whether to even support concurrent requests, and how (e.g. pooling multiple connections, or implementing a gRPC/HTTP2 style protocol, or even possibly implementing HTTP2). - We need to make sure we properly handle errors, etc. everywhere. - Are we using UCX in a performant and idiomatic manner? Will the implementation work well on RDMA and other specialized hardware? - Do we also need to support the UCX tag API? - Can we refactor out interfaces that allow sharing more of the client/server implementation between different backends? - Are the abstractions sufficient to support other potential backends like MPI, libfabrics, or WebSockets? If anyone has experience with UCX, I'd appreciate any feedback. Otherwise, I'm hoping to plan out and try to tackle some of the TODOs above, and figure out how this effort can proceed. Antoine/Micah raised the possibility of extending gRPC instead. That would be preferable, frankly, given otherwise we'd might have to re-implement a lot of what gRPC and HTTP2 provide by ourselves. However, the necessary proposal stalled and was dropped without much discussion: https://groups.google.com/g/grpc-io/c/oIbBfPVO0lY Benchmark results (also uploaded at https://gist.github.com/lidavidm/c4676c5d9c89d4cc717d6dea07dee952): Testing was done between two t3.xlarge instances in the same zone. t3.xlarge has "up to 5 Gbps" of bandwidth (~600 MiB/s). (ucx) ubuntu@ip-172-31-37-78:~/arrow/build$ env UCX_LOG_LEVEL=info ./relwithdebinfo/arrow-flight-benchmark -transport ucx -server_host 172.31.34.4 -num_streams=1 -num_threads=1 -records_per_stream=40960000 -records_per_batch=4096 Testing method: DoGet [1640703417.639373] [ip-172-31-37-78:10110:0] ucp_worker.c:1627 UCX INFO ep_cfg[1]: tag(tcp/ens5); stream(tcp/ens5); [1640703417.650068] [ip-172-31-37-78:10110:1] ucp_worker.c:1627 UCX INFO ep_cfg[1]: tag(tcp/ens5); stream(tcp/ens5); Number of perf runs: 1 Number of concurrent gets/puts: 1 Batch size: 131072 Batches read: 10000 Bytes read: 1310720000 Nanos: 2165862969 Speed: 577.137 MB/s Throughput: 4617.1 batches/s Latency mean: 214 us Latency quantile=0.5: 209 us Latency quantile=0.95: 340 us Latency quantile=0.99: 409 us Latency max: 6350 us (ucx) ubuntu@ip-172-31-37-78:~/arrow/build$ env UCX_LOG_LEVEL=info ./relwithdebinfo/arrow-flight-benchmark -transport ucx -server_host 172.31.34.4 -num_streams=1 -num_threads=1 -records_per_stream=655360000 -records_per_batch=65536 Testing method: DoGet [1640703439.428785] [ip-172-31-37-78:10116:0] ucp_worker.c:1627 UCX INFO ep_cfg[1]: tag(tcp/ens5); stream(tcp/ens5); [1640703439.440359] [ip-172-31-37-78:10116:1] ucp_worker.c:1627 UCX INFO ep_cfg[1]: tag(tcp/ens5); stream(tcp/ens5); Number of perf runs: 1 Number of concurrent gets/puts: 1 Batch size: 2097152 Batches read: 10000 Bytes read: 20971520000 Nanos: 34184175236 Speed: 585.066 MB/s Throughput: 292.533 batches/s Latency mean: 3415 us Latency quantile=0.5: 3408 us Latency quantile=0.95: 3549 us Latency quantile=0.99: 3800 us Latency max: 20236 us (ucx) ubuntu@ip-172-31-37-78:~/arrow/build$ env UCX_LOG_LEVEL=info ./relwithdebinfo/arrow-flight-benchmark -transport grpc -server_host 172.31.34.4 -num_streams=1 -num_threads=1 -records_per_stream=40960000 -records_per_batch=4096 Testing method: DoGet Using standalone TCP server Server host: 172.31.34.4 Server port: 31337 Number of perf runs: 1 Number of concurrent gets/puts: 1 Batch size: 131072 Batches read: 10000 Bytes read: 1310720000 Nanos: 2375289668 Speed: 526.252 MB/s Throughput: 4210.01 batches/s Latency mean: 235 us Latency quantile=0.5: 203 us Latency quantile=0.95: 328 us Latency quantile=0.99: 1377 us Latency max: 17860 us (ucx) ubuntu@ip-172-31-37-78:~/arrow/build$ env UCX_LOG_LEVEL=info ./relwithdebinfo/arrow-flight-benchmark -transport grpc -server_host 172.31.34.4 -num_streams=1 -num_threads=1 -records_per_stream=655360000 -records_per_batch=65536 Testing method: DoGet Using standalone TCP server Server host: 172.31.34.4 Server port: 31337 Number of perf runs: 1 Number of concurrent gets/puts: 1 Batch size: 2097152 Batches read: 10000 Bytes read: 20971520000 Nanos: 34202704498 Speed: 584.749 MB/s Throughput: 292.375 batches/s Latency mean: 3416 us Latency quantile=0.5: 3406 us Latency quantile=0.95: 3548 us Latency quantile=0.99: 3764 us Latency max: 17086 us (ucx) ubuntu@ip-172-31-37-78:~/arrow/build$ iperf3 -c 172.31.34.4 -p 1337 -Z -l 1M Connecting to host 172.31.34.4, port 1337 [ 5] local 172.31.37.78 port 48422 connected to 172.31.34.4 port 1337 [ ID] Interval Transfer Bitrate Retr Cwnd [ 5] 0.00-1.00 sec 572 MBytes 4.79 Gbits/sec 36 2.35 MBytes [ 5] 1.00-2.00 sec 582 MBytes 4.88 Gbits/sec 0 2.43 MBytes [ 5] 2.00-3.00 sec 585 MBytes 4.91 Gbits/sec 0 2.43 MBytes [ 5] 3.00-4.00 sec 587 MBytes 4.92 Gbits/sec 0 2.43 MBytes [ 5] 4.00-5.00 sec 587 MBytes 4.92 Gbits/sec 0 2.43 MBytes [ 5] 5.00-6.00 sec 586 MBytes 4.91 Gbits/sec 0 2.43 MBytes [ 5] 6.00-7.00 sec 586 MBytes 4.92 Gbits/sec 0 2.43 MBytes [ 5] 7.00-8.00 sec 580 MBytes 4.87 Gbits/sec 0 2.43 MBytes [ 5] 8.00-9.00 sec 584 MBytes 4.89 Gbits/sec 0 2.43 MBytes [ 5] 9.00-10.00 sec 577 MBytes 4.84 Gbits/sec 0 2.43 MBytes - - - - - - - - - - - - - - - - - - - - - - - - - [ ID] Interval Transfer Bitrate Retr [ 5] 0.00-10.00 sec 5.69 GBytes 4.89 Gbits/sec 36 sender [ 5] 0.00-10.00 sec 5.69 GBytes 4.88 Gbits/sec receiver iperf Done. Best, David On Tue, Nov 2, 2021, at 19:59, Jed Brown wrote: > "David Li" <lidav...@apache.org> writes: > > > Thanks for the clarification Yibo, looking forward to the results. Even if > > it is a very hacky PoC it will be interesting to see how it affects > > performance, though as Keith points out there are benefits in general to > > UCX (or similar library), and we can work out the implementation plan from > > there. > > > > To Benson's point - the work done to get UCX supported would pave the way > > to supporting other backends as well. I'm personally not familiar with UCX, > > MPI, etc. so is MPI here more about playing well with established practices > > or does it also offer potential hardware support/performance improvements > > like UCX would? > > There are two main implementations of MPI, MPICH and Open MPI, both of which > are permissively licensed open source community projects. Both have direct > support for UCX and unless your needs are very specific, the overhead of > going through MPI is likely to be negligible. Both also have proprietary > derivatives, such as Cray MPI (MPICH derivative) and Spectrum MPI (Open MPI > derivative), which may have optimizations for proprietary networks. Both > MPICH and Open MPI can be built without UCX, and this is often easier (UCX > 'master' is more volatile in my experience). > > The vast majority of distributed memory scientific applications use MPI or > higher level libraries, rather than writing directly to UCX (which provides > less coverage of HPC networks). I think MPI compatibility is important. > > From way up-thread (sorry): > > >> >>>>> Jed - how would you see MPI and Flight interacting? As another > >> >>>>> transport/alternative to UCX? I admit I'm not familiar with the HPC > >> >>>>> space. > > MPI has collective operations like MPI_Allreduce (perform a reduction and > give every process the result; these run in log(P) or better time with small > constants -- 15 microseconds is typical for a cheap reduction operation on a > million processes). MPI supports user-defined operations for reductions and > prefix-scan operations. If we defined MPI_Ops for Arrow types, we could > compute summary statistics and other algorithmic building blocks fast at > arbitrary scale. > > The collective execution model might not be everyone's bag, but MPI_Op can > also be used in one-sided operations (MPI_Accumulate and MPI_Fetch_and_op) > and dropping into collective mode has big advantages for certain algorithms > in computational statistics/machine learning. >