I ended up drafting an implementation of Flight based on UCX, and doing some
of the necessary refactoring to support additional backends in the future.
It can run the Flight benchmark, and performance is about comparable to
gRPC, as tested on AWS EC2.

The implementation is based on the UCP streams API. It's extremely
bare-bones and is really only a proof of concept; a good amount of work is
needed to turn it into a usable implementation. I had hoped it would perform
markedly better than gRPC, at least in this early test, but this seems not
to be the case. That said: I am likely not using UCX properly, UCX would
still open up support for additional hardware, and this work should allow
other backends to be implemented more easily.

The branch can be viewed at
https://github.com/lidavidm/arrow/tree/flight-ucx

I've attached the benchmark output at the end.

There are still quite a few TODOs and things that need investigating:

- Only DoGet and GetFlightInfo are implemented, and incompletely at that.
- Concurrent requests are not supported, or even making more than one
  request on a connection, nor does the server support concurrent clients.
  We also need to decide whether to even support concurrent requests, and
  how (e.g. pooling multiple connections, or implementing a gRPC/HTTP2 style
  protocol, or even possibly implementing HTTP2).
- We need to make sure we properly handle errors, etc. everywhere.
- Are we using UCX in a performant and idiomatic manner? Will the
  implementation work well on RDMA and other specialized hardware?
- Do we also need to support the UCX tag API?
- Can we refactor out interfaces that allow sharing more of the
  client/server implementation between different backends?
- Are the abstractions sufficient to support other potential backends like
  MPI, libfabrics, or WebSockets?

If anyone has experience with UCX, I'd appreciate any feedback. Otherwise,
I'm hoping to plan out and try to tackle some of the TODOs above, and figure
out how this effort can proceed.

Antoine/Micah raised the possibility of extending gRPC instead. That would
be preferable, frankly, given otherwise we'd might have to re-implement a
lot of what gRPC and HTTP2 provide by ourselves. However, the necessary
proposal stalled and was dropped without much discussion:
https://groups.google.com/g/grpc-io/c/oIbBfPVO0lY

Benchmark results (also uploaded at
https://gist.github.com/lidavidm/c4676c5d9c89d4cc717d6dea07dee952):

Testing was done between two t3.xlarge instances in the same zone.
t3.xlarge has "up to 5 Gbps" of bandwidth (~600 MiB/s).

(ucx) ubuntu@ip-172-31-37-78:~/arrow/build$ env UCX_LOG_LEVEL=info 
./relwithdebinfo/arrow-flight-benchmark -transport ucx -server_host 172.31.34.4 
-num_streams=1 -num_threads=1 -records_per_stream=40960000 
-records_per_batch=4096
Testing method: DoGet
[1640703417.639373] [ip-172-31-37-78:10110:0]     ucp_worker.c:1627 UCX  INFO  
ep_cfg[1]: tag(tcp/ens5); stream(tcp/ens5);
[1640703417.650068] [ip-172-31-37-78:10110:1]     ucp_worker.c:1627 UCX  INFO  
ep_cfg[1]: tag(tcp/ens5); stream(tcp/ens5);
Number of perf runs: 1
Number of concurrent gets/puts: 1
Batch size: 131072
Batches read: 10000
Bytes read: 1310720000
Nanos: 2165862969
Speed: 577.137 MB/s
Throughput: 4617.1 batches/s
Latency mean: 214 us
Latency quantile=0.5: 209 us
Latency quantile=0.95: 340 us
Latency quantile=0.99: 409 us
Latency max: 6350 us
(ucx) ubuntu@ip-172-31-37-78:~/arrow/build$ env UCX_LOG_LEVEL=info 
./relwithdebinfo/arrow-flight-benchmark -transport ucx -server_host 172.31.34.4 
-num_streams=1 -num_threads=1 -records_per_stream=655360000 
-records_per_batch=65536
Testing method: DoGet
[1640703439.428785] [ip-172-31-37-78:10116:0]     ucp_worker.c:1627 UCX  INFO  
ep_cfg[1]: tag(tcp/ens5); stream(tcp/ens5);
[1640703439.440359] [ip-172-31-37-78:10116:1]     ucp_worker.c:1627 UCX  INFO  
ep_cfg[1]: tag(tcp/ens5); stream(tcp/ens5);
Number of perf runs: 1
Number of concurrent gets/puts: 1
Batch size: 2097152
Batches read: 10000
Bytes read: 20971520000
Nanos: 34184175236
Speed: 585.066 MB/s
Throughput: 292.533 batches/s
Latency mean: 3415 us
Latency quantile=0.5: 3408 us
Latency quantile=0.95: 3549 us
Latency quantile=0.99: 3800 us
Latency max: 20236 us
(ucx) ubuntu@ip-172-31-37-78:~/arrow/build$ env UCX_LOG_LEVEL=info 
./relwithdebinfo/arrow-flight-benchmark -transport grpc -server_host 
172.31.34.4 -num_streams=1 -num_threads=1 -records_per_stream=40960000 
-records_per_batch=4096
Testing method: DoGet
Using standalone TCP server
Server host: 172.31.34.4
Server port: 31337
Number of perf runs: 1
Number of concurrent gets/puts: 1
Batch size: 131072
Batches read: 10000
Bytes read: 1310720000
Nanos: 2375289668
Speed: 526.252 MB/s
Throughput: 4210.01 batches/s
Latency mean: 235 us
Latency quantile=0.5: 203 us
Latency quantile=0.95: 328 us
Latency quantile=0.99: 1377 us
Latency max: 17860 us
(ucx) ubuntu@ip-172-31-37-78:~/arrow/build$ env UCX_LOG_LEVEL=info 
./relwithdebinfo/arrow-flight-benchmark -transport grpc -server_host 
172.31.34.4 -num_streams=1 -num_threads=1 -records_per_stream=655360000 
-records_per_batch=65536
Testing method: DoGet
Using standalone TCP server
Server host: 172.31.34.4
Server port: 31337
Number of perf runs: 1
Number of concurrent gets/puts: 1
Batch size: 2097152
Batches read: 10000
Bytes read: 20971520000
Nanos: 34202704498
Speed: 584.749 MB/s
Throughput: 292.375 batches/s
Latency mean: 3416 us
Latency quantile=0.5: 3406 us
Latency quantile=0.95: 3548 us
Latency quantile=0.99: 3764 us
Latency max: 17086 us
(ucx) ubuntu@ip-172-31-37-78:~/arrow/build$ iperf3 -c 172.31.34.4 -p 1337 -Z -l 
1M
Connecting to host 172.31.34.4, port 1337
[  5] local 172.31.37.78 port 48422 connected to 172.31.34.4 port 1337
[ ID] Interval           Transfer     Bitrate         Retr  Cwnd
[  5]   0.00-1.00   sec   572 MBytes  4.79 Gbits/sec   36   2.35 MBytes
[  5]   1.00-2.00   sec   582 MBytes  4.88 Gbits/sec    0   2.43 MBytes
[  5]   2.00-3.00   sec   585 MBytes  4.91 Gbits/sec    0   2.43 MBytes
[  5]   3.00-4.00   sec   587 MBytes  4.92 Gbits/sec    0   2.43 MBytes
[  5]   4.00-5.00   sec   587 MBytes  4.92 Gbits/sec    0   2.43 MBytes
[  5]   5.00-6.00   sec   586 MBytes  4.91 Gbits/sec    0   2.43 MBytes
[  5]   6.00-7.00   sec   586 MBytes  4.92 Gbits/sec    0   2.43 MBytes
[  5]   7.00-8.00   sec   580 MBytes  4.87 Gbits/sec    0   2.43 MBytes
[  5]   8.00-9.00   sec   584 MBytes  4.89 Gbits/sec    0   2.43 MBytes
[  5]   9.00-10.00  sec   577 MBytes  4.84 Gbits/sec    0   2.43 MBytes
- - - - - - - - - - - - - - - - - - - - - - - - -
[ ID] Interval           Transfer     Bitrate         Retr
[  5]   0.00-10.00  sec  5.69 GBytes  4.89 Gbits/sec   36             sender
[  5]   0.00-10.00  sec  5.69 GBytes  4.88 Gbits/sec                  receiver

iperf Done.

Best,
David

On Tue, Nov 2, 2021, at 19:59, Jed Brown wrote:
> "David Li" <lidav...@apache.org> writes:
> 
> > Thanks for the clarification Yibo, looking forward to the results. Even if 
> > it is a very hacky PoC it will be interesting to see how it affects 
> > performance, though as Keith points out there are benefits in general to 
> > UCX (or similar library), and we can work out the implementation plan from 
> > there.
> >
> > To Benson's point - the work done to get UCX supported would pave the way 
> > to supporting other backends as well. I'm personally not familiar with UCX, 
> > MPI, etc. so is MPI here more about playing well with established practices 
> > or does it also offer potential hardware support/performance improvements 
> > like UCX would?
> 
> There are two main implementations of MPI, MPICH and Open MPI, both of which 
> are permissively licensed open source community projects. Both have direct 
> support for UCX and unless your needs are very specific, the overhead of 
> going through MPI is likely to be negligible. Both also have proprietary 
> derivatives, such as Cray MPI (MPICH derivative) and Spectrum MPI (Open MPI 
> derivative), which may have optimizations for proprietary networks. Both 
> MPICH and Open MPI can be built without UCX, and this is often easier (UCX 
> 'master' is more volatile in my experience).
> 
> The vast majority of distributed memory scientific applications use MPI or 
> higher level libraries, rather than writing directly to UCX (which provides 
> less coverage of HPC networks). I think MPI compatibility is important.
> 
> From way up-thread (sorry):
> 
> >> >>>>> Jed - how would you see MPI and Flight interacting? As another
> >> >>>>> transport/alternative to UCX? I admit I'm not familiar with the HPC
> >> >>>>> space.
> 
> MPI has collective operations like MPI_Allreduce (perform a reduction and 
> give every process the result; these run in log(P) or better time with small 
> constants -- 15 microseconds is typical for a cheap reduction operation on a 
> million processes). MPI supports user-defined operations for reductions and 
> prefix-scan operations. If we defined MPI_Ops for Arrow types, we could 
> compute summary statistics and other algorithmic building blocks fast at 
> arbitrary scale.
> 
> The collective execution model might not be everyone's bag, but MPI_Op can 
> also be used in one-sided operations (MPI_Accumulate and MPI_Fetch_and_op) 
> and dropping into collective mode has big advantages for certain algorithms 
> in computational statistics/machine learning.
> 

Reply via email to