This is an automated email from the ASF dual-hosted git repository. wesm pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push: new bd0dfe5 ARROW-4421: [C++][Flight] Handle large RPC messages in Flight bd0dfe5 is described below commit bd0dfe5d2760fcefdc8bba096d50335b166a3001 Author: David Li <david.m...@twosigma.com> AuthorDate: Tue Mar 12 20:22:44 2019 -0500 ARROW-4421: [C++][Flight] Handle large RPC messages in Flight gRPC has a default maximum message size of 4MB Author: David Li <david.m...@twosigma.com> Closes #3878 from lihalite/arrow-4421 and squashes the following commits: a4efbbd17 <David Li> Accept messages of any size in C++/Python Flight server 0d8c8ccb6 <David Li> Accept messages of any size in C++/Python Flight client --- cpp/src/arrow/flight/client.cc | 2 ++ cpp/src/arrow/flight/server.cc | 2 ++ python/pyarrow/tests/test_flight.py | 36 ++++++++++++++++++++++++++++++++++++ 3 files changed, 40 insertions(+) diff --git a/cpp/src/arrow/flight/client.cc b/cpp/src/arrow/flight/client.cc index 50c70ee..f3420c4 100644 --- a/cpp/src/arrow/flight/client.cc +++ b/cpp/src/arrow/flight/client.cc @@ -198,6 +198,8 @@ class FlightClient::FlightClientImpl { grpc::ChannelArguments args; // Try to reconnect quickly at first, in case the server is still starting up args.SetInt(GRPC_ARG_INITIAL_RECONNECT_BACKOFF_MS, 100); + // Receive messages of any size + args.SetMaxReceiveMessageSize(-1); stub_ = pb::FlightService::NewStub( grpc::CreateCustomChannel(ss.str(), grpc::InsecureChannelCredentials(), args)); return Status::OK(); diff --git a/cpp/src/arrow/flight/server.cc b/cpp/src/arrow/flight/server.cc index 3628821..cc1c03d 100644 --- a/cpp/src/arrow/flight/server.cc +++ b/cpp/src/arrow/flight/server.cc @@ -352,6 +352,8 @@ Status FlightServerBase::Init(int port) { impl_->service_.reset(new FlightServiceImpl(this)); grpc::ServerBuilder builder; + // Allow uploading messages of any length + builder.SetMaxReceiveMessageSize(-1); builder.AddListeningPort(impl_->address_, grpc::InsecureServerCredentials()); builder.RegisterService(impl_->service_.get()); diff --git a/python/pyarrow/tests/test_flight.py b/python/pyarrow/tests/test_flight.py index d225f77..b1b6a12 100644 --- a/python/pyarrow/tests/test_flight.py +++ b/python/pyarrow/tests/test_flight.py @@ -43,6 +43,20 @@ class ConstantFlightServer(flight.FlightServerBase): return flight.RecordBatchStream(table) +class EchoFlightServer(flight.FlightServerBase): + """A Flight server that returns the last data uploaded.""" + + def __init__(self): + super(EchoFlightServer, self).__init__() + self.last_message = None + + def do_get(self, ticket): + return flight.RecordBatchStream(self.last_message) + + def do_put(self, descriptor, reader): + self.last_message = reader.read_all() + + @contextlib.contextmanager def flight_server(server_base, *args, **kwargs): """Spawn a Flight server on a free port, shutting it down when done.""" @@ -78,3 +92,25 @@ def test_flight_do_get(): client = flight.FlightClient.connect('localhost', server_port) data = client.do_get(flight.Ticket(b''), table.schema).read_all() assert data.equals(table) + + +@pytest.mark.slow +def test_flight_large_message(): + """Try sending/receiving a large message via Flight. + + See ARROW-4421: by default, gRPC won't allow us to send messages > + 4MiB in size. + """ + data = pa.Table.from_arrays([ + pa.array(range(0, 10 * 1024 * 1024)) + ], names=['a']) + + with flight_server(EchoFlightServer) as server_port: + client = flight.FlightClient.connect('localhost', server_port) + writer = client.do_put(flight.FlightDescriptor.for_path('test'), + data.schema) + # Write a single giant chunk + writer.write_table(data, 10 * 1024 * 1024) + writer.close() + result = client.do_get(flight.Ticket(b''), data.schema).read_all() + assert result.equals(data)