Hello Sailesh Mukil, Mostafa Mokhtar, Dan Hecht, I'd like you to reexamine a change. Please visit
http://gerrit.cloudera.org:8080/8023 to look at the new patch set (#10). Change subject: IMPALA-4856: Port data stream service to KRPC ...................................................................... IMPALA-4856: Port data stream service to KRPC This patch implements a new data stream service which utilizes KRPC. Similar to the thrift RPC implementation, there are 3 major components to the data stream services: KrpcDataStreamSender serializes and sends row batches materialized by a fragment instance to a KrpcDataStreamRecvr. KrpcDataStreamMgr is responsible for routing an incoming row batch to the appropriate receiver. The data stream service runs on the port FLAGS_krpc_port which is 29000 by default. Unlike the implementation with thrift RPC, KRPC provides an asynchronous interface for invoking remote methods. As a result, KrpcDataStreamSender doesn't need to create a thread per connection. There is one connection between two Impalad nodes for each direction (i.e. client and server). Multiple queries can multi-plex on the same connection for transmitting row batches between two Impalad nodes. The asynchronous interface also prevents some issues with thrift RPC in which a thread may be stuck in an RPC call without being able to check for cancellation. A TransmitData() call with KRPC is in essence a trio of RpcController, a serialized protobuf request buffer and a protobuf response buffer. The call is invoked via a DataStreamService proxy object. The serialized tuple offsets and row batches are sent via "sidecars" in KRPC to avoid extra copy into the serialized request buffer. Each impalad node creates a singleton DataStreamService object at start-up time. All incoming calls are served by a service thread pool created as part of DataStreamService. By default, there are 64 service threads. The service threads are shared across all queries so the RPC handler should avoid blocking as much as possible. In thrift RPC implementation, we make a thrift thread handling a TransmitData() RPC to block for extended period of time when the receiver is not yet created when the call arrives. In KRPC implementation, we store TransmitData() or EndDataStream() requests which arrive before the receiver is ready in a per-receiver early sender list stored in KrpcDataStreamMgr. These RPC calls will be processed and responded to when the receiver is created or when timeout occurs. Similarly, there is limited space in the sender queues in KrpcDataStreamRecvr. If adding a row batch to a queue in KrpcDataStreamRecvr causes the buffer limit to exceed, the request will be stashed in a blocked_sender_ queue to be processed later. The stashed RPC request will not be responded to until it is processed so as to exert back pressure to the client. An alternative would be to reply with an error and the request / row batches need to be sent again. This may end up consuming more network bandwidth than the thrift RPC implementation. This change adopts the behavior of allowing one stashed request per sender. All rpc requests and responses are serialized using protobuf. The equivalent of TRowBatch would be ProtoRowBatch which contains a serialized header about the meta-data of the row batch and two Kudu Slice objects which contain pointers to the actual data (i.e. tuple offsets and tuple data). This patch is based on an abandoned patch by Henry Robinson. TESTING ------- * Build passed with FLAGS_use_krpc=true. TO DO ----- * Port some BE tests to KRPC services. Change-Id: Ic0b8c1e50678da66ab1547d16530f88b323ed8c1 --- M be/src/common/status.cc M be/src/common/status.h M be/src/exec/data-sink.cc M be/src/exec/exchange-node.cc M be/src/exec/kudu-util.h M be/src/rpc/CMakeLists.txt M be/src/rpc/rpc-mgr.cc M be/src/rpc/rpc-mgr.h M be/src/runtime/CMakeLists.txt M be/src/runtime/data-stream-mgr-base.h M be/src/runtime/data-stream-mgr.h M be/src/runtime/data-stream-recvr.h M be/src/runtime/data-stream-sender.h M be/src/runtime/exec-env.cc M be/src/runtime/exec-env.h M be/src/runtime/krpc-data-stream-mgr.cc M be/src/runtime/krpc-data-stream-mgr.h M be/src/runtime/krpc-data-stream-recvr.cc M be/src/runtime/krpc-data-stream-recvr.h A be/src/runtime/krpc-data-stream-sender.cc A be/src/runtime/krpc-data-stream-sender.h M be/src/runtime/row-batch.cc M be/src/runtime/row-batch.h M be/src/service/CMakeLists.txt A be/src/service/data-stream-service.cc A be/src/service/data-stream-service.h M be/src/service/impala-server.cc M cmake_modules/FindProtobuf.cmake M common/protobuf/CMakeLists.txt A common/protobuf/common.proto A common/protobuf/data_stream_service.proto A common/protobuf/row_batch.proto M common/thrift/generate_error_codes.py 33 files changed, 3,159 insertions(+), 183 deletions(-) git pull ssh://gerrit.cloudera.org:29418/Impala-ASF refs/changes/23/8023/10 -- To view, visit http://gerrit.cloudera.org:8080/8023 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: newpatchset Gerrit-Change-Id: Ic0b8c1e50678da66ab1547d16530f88b323ed8c1 Gerrit-Change-Number: 8023 Gerrit-PatchSet: 10 Gerrit-Owner: Michael Ho <k...@cloudera.com> Gerrit-Reviewer: Dan Hecht <dhe...@cloudera.com> Gerrit-Reviewer: Michael Ho <k...@cloudera.com> Gerrit-Reviewer: Mostafa Mokhtar <mmokh...@cloudera.com> Gerrit-Reviewer: Sailesh Mukil <sail...@cloudera.com>