[Impala-ASF-CR] IMPALA-4856: Port data stream service to KRPC
Impala Public Jenkins has posted comments on this change. ( http://gerrit.cloudera.org:8080/8023 ) Change subject: IMPALA-4856: Port data stream service to KRPC .. Patch Set 13: Verified+1 -- To view, visit http://gerrit.cloudera.org:8080/8023 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: Ic0b8c1e50678da66ab1547d16530f88b323ed8c1 Gerrit-Change-Number: 8023 Gerrit-PatchSet: 13 Gerrit-Owner: Michael Ho Gerrit-Reviewer: Dan Hecht Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Michael Ho Gerrit-Reviewer: Mostafa Mokhtar Gerrit-Reviewer: Sailesh Mukil Gerrit-Comment-Date: Thu, 09 Nov 2017 20:05:07 + Gerrit-HasComments: No
[Impala-ASF-CR] IMPALA-4856: Port data stream service to KRPC
Impala Public Jenkins has submitted this change and it was merged. ( http://gerrit.cloudera.org:8080/8023 ) Change subject: IMPALA-4856: Port data stream service to KRPC .. IMPALA-4856: Port data stream service to KRPC This patch implements a new data stream service which utilizes KRPC. Similar to the thrift RPC implementation, there are 3 major components to the data stream services: KrpcDataStreamSender serializes and sends row batches materialized by a fragment instance to a KrpcDataStreamRecvr. KrpcDataStreamMgr is responsible for routing an incoming row batch to the appropriate receiver. The data stream service runs on the port FLAGS_krpc_port which is 29000 by default. Unlike the implementation with thrift RPC, KRPC provides an asynchronous interface for invoking remote methods. As a result, KrpcDataStreamSender doesn't need to create a thread per connection. There is one connection between two Impalad nodes for each direction (i.e. client and server). Multiple queries can multi-plex on the same connection for transmitting row batches between two Impalad nodes. The asynchronous interface also prevents avoids the possibility that a thread is stuck in the RPC code for extended amount of time without checking for cancellation. A TransmitData() call with KRPC is in essence a trio of RpcController, a serialized protobuf request buffer and a protobuf response buffer. The call is invoked via a DataStreamService proxy object. The serialized tuple offsets and row batches are sent via "sidecars" in KRPC to avoid extra copy into the serialized request buffer. Each impalad node creates a singleton DataStreamService object at start-up time. All incoming calls are served by a service thread pool created as part of DataStreamService. By default, the number of service threads equals the number of logical cores. The service threads are shared across all queries so the RPC handler should avoid blocking as much as possible. In thrift RPC implementation, we make a thrift thread handling a TransmitData() RPC to block for extended period of time when the receiver is not yet created when the call arrives. In KRPC implementation, we store TransmitData() or EndDataStream() requests which arrive before the receiver is ready in a per-receiver early sender list stored in KrpcDataStreamMgr. These RPC calls will be processed and responded to when the receiver is created or when timeout occurs. Similarly, there is limited space in the sender queues in KrpcDataStreamRecvr. If adding a row batch to a queue in KrpcDataStreamRecvr causes the buffer limit to exceed, the request will be stashed in a queue for deferred processing. The stashed RPC requests will not be responded to until they are processed so as to exert back pressure to the senders. An alternative would be to reply with an error and the request / row batches need to be sent again. This may end up consuming more network bandwidth than the thrift RPC implementation. This change adopts the behavior of allowing one stashed request per sender. All rpc requests and responses are serialized using protobuf. The equivalent of TRowBatch would be ProtoRowBatch which contains a serialized header about the meta-data of the row batch and two Kudu Slice objects which contain pointers to the actual data (i.e. tuple offsets and tuple data). This patch is based on an abandoned patch by Henry Robinson. TESTING --- * Builds {exhaustive/debug, core/release, asan} passed with FLAGS_use_krpc=true. TO DO - * Port some BE tests to KRPC services. Change-Id: Ic0b8c1e50678da66ab1547d16530f88b323ed8c1 Reviewed-on: http://gerrit.cloudera.org:8080/8023 Reviewed-by: Michael Ho Tested-by: Impala Public Jenkins --- M be/src/common/status.cc M be/src/common/status.h M be/src/exec/data-sink.cc M be/src/exec/exchange-node.cc M be/src/exec/kudu-util.h M be/src/rpc/CMakeLists.txt M be/src/rpc/rpc-mgr.cc M be/src/rpc/rpc-mgr.h M be/src/runtime/CMakeLists.txt M be/src/runtime/data-stream-mgr-base.h M be/src/runtime/data-stream-mgr.h M be/src/runtime/data-stream-recvr.h M be/src/runtime/data-stream-sender.h M be/src/runtime/exec-env.cc M be/src/runtime/exec-env.h M be/src/runtime/krpc-data-stream-mgr.cc M be/src/runtime/krpc-data-stream-mgr.h M be/src/runtime/krpc-data-stream-recvr.cc M be/src/runtime/krpc-data-stream-recvr.h A be/src/runtime/krpc-data-stream-sender.cc A be/src/runtime/krpc-data-stream-sender.h M be/src/runtime/row-batch.cc M be/src/runtime/row-batch.h M be/src/service/CMakeLists.txt A be/src/service/data-stream-service.cc A be/src/service/data-stream-service.h M be/src/service/impala-server.cc M cmake_modules/FindProtobuf.cmake M common/protobuf/CMakeLists.txt A common/protobuf/common.proto A common/protobuf/data_stream_service.proto A common/protobuf/row_batch.proto M common/thrift/generate_error_codes.py 33 files changed, 3,155 insertions(+), 184 deletions(-) Approvals: Michael Ho: Looks good to me,
[Impala-ASF-CR] IMPALA-4856: Port data stream service to KRPC
Dan Hecht has posted comments on this change. ( http://gerrit.cloudera.org:8080/8023 ) Change subject: IMPALA-4856: Port data stream service to KRPC .. Patch Set 13: (1 comment) http://gerrit.cloudera.org:8080/#/c/8023/13/be/src/runtime/krpc-data-stream-recvr.cc File be/src/runtime/krpc-data-stream-recvr.cc: http://gerrit.cloudera.org:8080/#/c/8023/13/be/src/runtime/krpc-data-stream-recvr.cc@163 PS13, Line 163: condition_variable_any these should be changed to impala ConditionVariables now, but you can do that as a follow on -- To view, visit http://gerrit.cloudera.org:8080/8023 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: Ic0b8c1e50678da66ab1547d16530f88b323ed8c1 Gerrit-Change-Number: 8023 Gerrit-PatchSet: 13 Gerrit-Owner: Michael Ho Gerrit-Reviewer: Dan Hecht Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Michael Ho Gerrit-Reviewer: Mostafa Mokhtar Gerrit-Reviewer: Sailesh Mukil Gerrit-Comment-Date: Thu, 09 Nov 2017 18:24:52 + Gerrit-HasComments: Yes
[Impala-ASF-CR] IMPALA-4856: Port data stream service to KRPC
Impala Public Jenkins has posted comments on this change. ( http://gerrit.cloudera.org:8080/8023 ) Change subject: IMPALA-4856: Port data stream service to KRPC .. Patch Set 13: Build started: https://jenkins.impala.io/job/gerrit-verify-dryrun/1454/ -- To view, visit http://gerrit.cloudera.org:8080/8023 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: Ic0b8c1e50678da66ab1547d16530f88b323ed8c1 Gerrit-Change-Number: 8023 Gerrit-PatchSet: 13 Gerrit-Owner: Michael Ho Gerrit-Reviewer: Dan Hecht Gerrit-Reviewer: Impala Public Jenkins Gerrit-Reviewer: Michael Ho Gerrit-Reviewer: Mostafa Mokhtar Gerrit-Reviewer: Sailesh Mukil Gerrit-Comment-Date: Thu, 09 Nov 2017 16:44:09 + Gerrit-HasComments: No
[Impala-ASF-CR] IMPALA-4856: Port data stream service to KRPC
Michael Ho has posted comments on this change. ( http://gerrit.cloudera.org:8080/8023 ) Change subject: IMPALA-4856: Port data stream service to KRPC .. Patch Set 13: Code-Review+2 Retested the new PS. Carry+2. -- To view, visit http://gerrit.cloudera.org:8080/8023 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: Ic0b8c1e50678da66ab1547d16530f88b323ed8c1 Gerrit-Change-Number: 8023 Gerrit-PatchSet: 13 Gerrit-Owner: Michael Ho Gerrit-Reviewer: Dan Hecht Gerrit-Reviewer: Michael Ho Gerrit-Reviewer: Mostafa Mokhtar Gerrit-Reviewer: Sailesh Mukil Gerrit-Comment-Date: Thu, 09 Nov 2017 16:35:40 + Gerrit-HasComments: No
[Impala-ASF-CR] IMPALA-4856: Port data stream service to KRPC
Michael Ho has posted comments on this change. ( http://gerrit.cloudera.org:8080/8023 ) Change subject: IMPALA-4856: Port data stream service to KRPC .. Patch Set 13: (5 comments) Rebased. Removed a scoped timer which is racy, removed unneeded WARN_UNUSED_RESULTS and fixed some clang-tidy errors. http://gerrit.cloudera.org:8080/#/c/8023/10/be/src/runtime/krpc-data-stream-recvr.cc File be/src/runtime/krpc-data-stream-recvr.cc: http://gerrit.cloudera.org:8080/#/c/8023/10/be/src/runtime/krpc-data-stream-recvr.cc@408 PS10, Line 408: // Dequeues the deferred batch an > not needed. Done http://gerrit.cloudera.org:8080/#/c/8023/10/be/src/runtime/krpc-data-stream-recvr.cc@425 PS10, Line 425: deferred_rpcs_.push(move(ctx)); > can be done without holding the lock. Done http://gerrit.cloudera.org:8080/#/c/8023/10/be/src/runtime/krpc-data-stream-recvr.cc@562 PS10, Line 562: // Add all batches to the same queu There is a subtle race condition here. receiver may be closed already at this point so we need to do something similar to CANCEL_SAFE_SCOPED_TIMER(). However, the race may still happen if that macro is not used under the sender queue's lock. Removed from this patch for now and it will be added back in a follow on patch which introduces more diagnostics information. http://gerrit.cloudera.org:8080/#/c/8023/10/be/src/runtime/krpc-data-stream-recvr.cc@569 PS10, Line 569: sender_queues_[use_sender_id]->De Same comment as AddBatch(). http://gerrit.cloudera.org:8080/#/c/8023/10/be/src/runtime/row-batch.h File be/src/runtime/row-batch.h: http://gerrit.cloudera.org:8080/#/c/8023/10/be/src/runtime/row-batch.h@145 PS10, Line 145: > typo. Done -- To view, visit http://gerrit.cloudera.org:8080/8023 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: Ic0b8c1e50678da66ab1547d16530f88b323ed8c1 Gerrit-Change-Number: 8023 Gerrit-PatchSet: 13 Gerrit-Owner: Michael Ho Gerrit-Reviewer: Dan Hecht Gerrit-Reviewer: Michael Ho Gerrit-Reviewer: Mostafa Mokhtar Gerrit-Reviewer: Sailesh Mukil Gerrit-Comment-Date: Thu, 09 Nov 2017 00:42:11 + Gerrit-HasComments: Yes
[Impala-ASF-CR] IMPALA-4856: Port data stream service to KRPC
Hello Sailesh Mukil, Mostafa Mokhtar, Dan Hecht, I'd like you to reexamine a change. Please visit http://gerrit.cloudera.org:8080/8023 to look at the new patch set (#13). Change subject: IMPALA-4856: Port data stream service to KRPC .. IMPALA-4856: Port data stream service to KRPC This patch implements a new data stream service which utilizes KRPC. Similar to the thrift RPC implementation, there are 3 major components to the data stream services: KrpcDataStreamSender serializes and sends row batches materialized by a fragment instance to a KrpcDataStreamRecvr. KrpcDataStreamMgr is responsible for routing an incoming row batch to the appropriate receiver. The data stream service runs on the port FLAGS_krpc_port which is 29000 by default. Unlike the implementation with thrift RPC, KRPC provides an asynchronous interface for invoking remote methods. As a result, KrpcDataStreamSender doesn't need to create a thread per connection. There is one connection between two Impalad nodes for each direction (i.e. client and server). Multiple queries can multi-plex on the same connection for transmitting row batches between two Impalad nodes. The asynchronous interface also prevents avoids the possibility that a thread is stuck in the RPC code for extended amount of time without checking for cancellation. A TransmitData() call with KRPC is in essence a trio of RpcController, a serialized protobuf request buffer and a protobuf response buffer. The call is invoked via a DataStreamService proxy object. The serialized tuple offsets and row batches are sent via "sidecars" in KRPC to avoid extra copy into the serialized request buffer. Each impalad node creates a singleton DataStreamService object at start-up time. All incoming calls are served by a service thread pool created as part of DataStreamService. By default, the number of service threads equals the number of logical cores. The service threads are shared across all queries so the RPC handler should avoid blocking as much as possible. In thrift RPC implementation, we make a thrift thread handling a TransmitData() RPC to block for extended period of time when the receiver is not yet created when the call arrives. In KRPC implementation, we store TransmitData() or EndDataStream() requests which arrive before the receiver is ready in a per-receiver early sender list stored in KrpcDataStreamMgr. These RPC calls will be processed and responded to when the receiver is created or when timeout occurs. Similarly, there is limited space in the sender queues in KrpcDataStreamRecvr. If adding a row batch to a queue in KrpcDataStreamRecvr causes the buffer limit to exceed, the request will be stashed in a queue for deferred processing. The stashed RPC requests will not be responded to until they are processed so as to exert back pressure to the senders. An alternative would be to reply with an error and the request / row batches need to be sent again. This may end up consuming more network bandwidth than the thrift RPC implementation. This change adopts the behavior of allowing one stashed request per sender. All rpc requests and responses are serialized using protobuf. The equivalent of TRowBatch would be ProtoRowBatch which contains a serialized header about the meta-data of the row batch and two Kudu Slice objects which contain pointers to the actual data (i.e. tuple offsets and tuple data). This patch is based on an abandoned patch by Henry Robinson. TESTING --- * Builds {exhaustive/debug, core/release, asan} passed with FLAGS_use_krpc=true. TO DO - * Port some BE tests to KRPC services. Change-Id: Ic0b8c1e50678da66ab1547d16530f88b323ed8c1 --- M be/src/common/status.cc M be/src/common/status.h M be/src/exec/data-sink.cc M be/src/exec/exchange-node.cc M be/src/exec/kudu-util.h M be/src/rpc/CMakeLists.txt M be/src/rpc/rpc-mgr.cc M be/src/rpc/rpc-mgr.h M be/src/runtime/CMakeLists.txt M be/src/runtime/data-stream-mgr-base.h M be/src/runtime/data-stream-mgr.h M be/src/runtime/data-stream-recvr.h M be/src/runtime/data-stream-sender.h M be/src/runtime/exec-env.cc M be/src/runtime/exec-env.h M be/src/runtime/krpc-data-stream-mgr.cc M be/src/runtime/krpc-data-stream-mgr.h M be/src/runtime/krpc-data-stream-recvr.cc M be/src/runtime/krpc-data-stream-recvr.h A be/src/runtime/krpc-data-stream-sender.cc A be/src/runtime/krpc-data-stream-sender.h M be/src/runtime/row-batch.cc M be/src/runtime/row-batch.h M be/src/service/CMakeLists.txt A be/src/service/data-stream-service.cc A be/src/service/data-stream-service.h M be/src/service/impala-server.cc M cmake_modules/FindProtobuf.cmake M common/protobuf/CMakeLists.txt A common/protobuf/common.proto A common/protobuf/data_stream_service.proto A common/protobuf/row_batch.proto M common/thrift/generate_error_codes.py 33 files changed, 3,155 insertions(+), 184 deletions(-) git pull ssh://gerrit.cloudera.org:29418/Impala-ASF refs/changes/23/8023/13 --
[Impala-ASF-CR] IMPALA-4856: Port data stream service to KRPC
Hello Sailesh Mukil, Mostafa Mokhtar, Dan Hecht, I'd like you to reexamine a change. Please visit http://gerrit.cloudera.org:8080/8023 to look at the new patch set (#12). Change subject: IMPALA-4856: Port data stream service to KRPC .. IMPALA-4856: Port data stream service to KRPC This patch implements a new data stream service which utilizes KRPC. Similar to the thrift RPC implementation, there are 3 major components to the data stream services: KrpcDataStreamSender serializes and sends row batches materialized by a fragment instance to a KrpcDataStreamRecvr. KrpcDataStreamMgr is responsible for routing an incoming row batch to the appropriate receiver. The data stream service runs on the port FLAGS_krpc_port which is 29000 by default. Unlike the implementation with thrift RPC, KRPC provides an asynchronous interface for invoking remote methods. As a result, KrpcDataStreamSender doesn't need to create a thread per connection. There is one connection between two Impalad nodes for each direction (i.e. client and server). Multiple queries can multi-plex on the same connection for transmitting row batches between two Impalad nodes. The asynchronous interface also prevents avoids the possibility that a thread is stuck in the RPC code for extended amount of time without checking for cancellation. A TransmitData() call with KRPC is in essence a trio of RpcController, a serialized protobuf request buffer and a protobuf response buffer. The call is invoked via a DataStreamService proxy object. The serialized tuple offsets and row batches are sent via "sidecars" in KRPC to avoid extra copy into the serialized request buffer. Each impalad node creates a singleton DataStreamService object at start-up time. All incoming calls are served by a service thread pool created as part of DataStreamService. By default, the number of service threads equals the number of logical cores. The service threads are shared across all queries so the RPC handler should avoid blocking as much as possible. In thrift RPC implementation, we make a thrift thread handling a TransmitData() RPC to block for extended period of time when the receiver is not yet created when the call arrives. In KRPC implementation, we store TransmitData() or EndDataStream() requests which arrive before the receiver is ready in a per-receiver early sender list stored in KrpcDataStreamMgr. These RPC calls will be processed and responded to when the receiver is created or when timeout occurs. Similarly, there is limited space in the sender queues in KrpcDataStreamRecvr. If adding a row batch to a queue in KrpcDataStreamRecvr causes the buffer limit to exceed, the request will be stashed in a queue for deferred processing. The stashed RPC requests will not be responded to until they are processed so as to exert back pressure to the senders. An alternative would be to reply with an error and the request / row batches need to be sent again. This may end up consuming more network bandwidth than the thrift RPC implementation. This change adopts the behavior of allowing one stashed request per sender. All rpc requests and responses are serialized using protobuf. The equivalent of TRowBatch would be ProtoRowBatch which contains a serialized header about the meta-data of the row batch and two Kudu Slice objects which contain pointers to the actual data (i.e. tuple offsets and tuple data). This patch is based on an abandoned patch by Henry Robinson. TESTING --- * Builds {exhaustive/debug, core/release, asan} passed with FLAGS_use_krpc=true. TO DO - * Port some BE tests to KRPC services. Change-Id: Ic0b8c1e50678da66ab1547d16530f88b323ed8c1 --- M be/src/common/status.cc M be/src/common/status.h M be/src/exec/data-sink.cc M be/src/exec/exchange-node.cc M be/src/exec/kudu-util.h M be/src/rpc/CMakeLists.txt M be/src/rpc/rpc-mgr.cc M be/src/rpc/rpc-mgr.h M be/src/runtime/CMakeLists.txt M be/src/runtime/data-stream-mgr-base.h M be/src/runtime/data-stream-mgr.h M be/src/runtime/data-stream-recvr.h M be/src/runtime/data-stream-sender.h M be/src/runtime/exec-env.cc M be/src/runtime/exec-env.h M be/src/runtime/krpc-data-stream-mgr.cc M be/src/runtime/krpc-data-stream-mgr.h M be/src/runtime/krpc-data-stream-recvr.cc M be/src/runtime/krpc-data-stream-recvr.h A be/src/runtime/krpc-data-stream-sender.cc A be/src/runtime/krpc-data-stream-sender.h M be/src/runtime/row-batch.cc M be/src/runtime/row-batch.h M be/src/service/CMakeLists.txt A be/src/service/data-stream-service.cc A be/src/service/data-stream-service.h M be/src/service/impala-server.cc M cmake_modules/FindProtobuf.cmake M common/protobuf/CMakeLists.txt A common/protobuf/common.proto A common/protobuf/data_stream_service.proto A common/protobuf/row_batch.proto M common/thrift/generate_error_codes.py 33 files changed, 3,154 insertions(+), 184 deletions(-) git pull ssh://gerrit.cloudera.org:29418/Impala-ASF refs/changes/23/8023/12 --
[Impala-ASF-CR] IMPALA-4856: Port data stream service to KRPC
Hello Sailesh Mukil, Mostafa Mokhtar, Dan Hecht, I'd like you to reexamine a change. Please visit http://gerrit.cloudera.org:8080/8023 to look at the new patch set (#11). Change subject: IMPALA-4856: Port data stream service to KRPC .. IMPALA-4856: Port data stream service to KRPC This patch implements a new data stream service which utilizes KRPC. Similar to the thrift RPC implementation, there are 3 major components to the data stream services: KrpcDataStreamSender serializes and sends row batches materialized by a fragment instance to a KrpcDataStreamRecvr. KrpcDataStreamMgr is responsible for routing an incoming row batch to the appropriate receiver. The data stream service runs on the port FLAGS_krpc_port which is 29000 by default. Unlike the implementation with thrift RPC, KRPC provides an asynchronous interface for invoking remote methods. As a result, KrpcDataStreamSender doesn't need to create a thread per connection. There is one connection between two Impalad nodes for each direction (i.e. client and server). Multiple queries can multi-plex on the same connection for transmitting row batches between two Impalad nodes. The asynchronous interface also prevents avoids the possibility that a thread is stuck in the RPC code for extended amount of time without checking for cancellation. A TransmitData() call with KRPC is in essence a trio of RpcController, a serialized protobuf request buffer and a protobuf response buffer. The call is invoked via a DataStreamService proxy object. The serialized tuple offsets and row batches are sent via "sidecars" in KRPC to avoid extra copy into the serialized request buffer. Each impalad node creates a singleton DataStreamService object at start-up time. All incoming calls are served by a service thread pool created as part of DataStreamService. By default, the number of service threads equals the number of logical cores. The service threads are shared across all queries so the RPC handler should avoid blocking as much as possible. In thrift RPC implementation, we make a thrift thread handling a TransmitData() RPC to block for extended period of time when the receiver is not yet created when the call arrives. In KRPC implementation, we store TransmitData() or EndDataStream() requests which arrive before the receiver is ready in a per-receiver early sender list stored in KrpcDataStreamMgr. These RPC calls will be processed and responded to when the receiver is created or when timeout occurs. Similarly, there is limited space in the sender queues in KrpcDataStreamRecvr. If adding a row batch to a queue in KrpcDataStreamRecvr causes the buffer limit to exceed, the request will be stashed in a queue for deferred processing. The stashed RPC requests will not be responded to until they are processed so as to exert back pressure to the senders. An alternative would be to reply with an error and the request / row batches need to be sent again. This may end up consuming more network bandwidth than the thrift RPC implementation. This change adopts the behavior of allowing one stashed request per sender. All rpc requests and responses are serialized using protobuf. The equivalent of TRowBatch would be ProtoRowBatch which contains a serialized header about the meta-data of the row batch and two Kudu Slice objects which contain pointers to the actual data (i.e. tuple offsets and tuple data). This patch is based on an abandoned patch by Henry Robinson. TESTING --- * Builds {exhaustive/debug, core/release, asan} passed with FLAGS_use_krpc=true. TO DO - * Port some BE tests to KRPC services. Change-Id: Ic0b8c1e50678da66ab1547d16530f88b323ed8c1 --- M be/src/common/status.cc M be/src/common/status.h M be/src/exec/data-sink.cc M be/src/exec/exchange-node.cc M be/src/exec/kudu-util.h M be/src/rpc/CMakeLists.txt M be/src/rpc/rpc-mgr.cc M be/src/rpc/rpc-mgr.h M be/src/runtime/CMakeLists.txt M be/src/runtime/data-stream-mgr-base.h M be/src/runtime/data-stream-mgr.h M be/src/runtime/data-stream-recvr.h M be/src/runtime/data-stream-sender.h M be/src/runtime/exec-env.cc M be/src/runtime/exec-env.h M be/src/runtime/krpc-data-stream-mgr.cc M be/src/runtime/krpc-data-stream-mgr.h M be/src/runtime/krpc-data-stream-recvr.cc M be/src/runtime/krpc-data-stream-recvr.h A be/src/runtime/krpc-data-stream-sender.cc A be/src/runtime/krpc-data-stream-sender.h M be/src/runtime/row-batch.cc M be/src/runtime/row-batch.h M be/src/service/CMakeLists.txt A be/src/service/data-stream-service.cc A be/src/service/data-stream-service.h M be/src/service/impala-server.cc M cmake_modules/FindProtobuf.cmake M common/protobuf/CMakeLists.txt A common/protobuf/common.proto A common/protobuf/data_stream_service.proto A common/protobuf/row_batch.proto M common/thrift/generate_error_codes.py 33 files changed, 3,151 insertions(+), 184 deletions(-) git pull ssh://gerrit.cloudera.org:29418/Impala-ASF refs/changes/23/8023/11 --
[Impala-ASF-CR] IMPALA-4856: Port data stream service to KRPC
Dan Hecht has posted comments on this change. ( http://gerrit.cloudera.org:8080/8023 ) Change subject: IMPALA-4856: Port data stream service to KRPC .. Patch Set 10: Code-Review+2 -- To view, visit http://gerrit.cloudera.org:8080/8023 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: Ic0b8c1e50678da66ab1547d16530f88b323ed8c1 Gerrit-Change-Number: 8023 Gerrit-PatchSet: 10 Gerrit-Owner: Michael Ho Gerrit-Reviewer: Dan Hecht Gerrit-Reviewer: Michael Ho Gerrit-Reviewer: Mostafa Mokhtar Gerrit-Reviewer: Sailesh Mukil Gerrit-Comment-Date: Wed, 08 Nov 2017 20:22:06 + Gerrit-HasComments: No
[Impala-ASF-CR] IMPALA-4856: Port data stream service to KRPC
Michael Ho has posted comments on this change. ( http://gerrit.cloudera.org:8080/8023 ) Change subject: IMPALA-4856: Port data stream service to KRPC .. Patch Set 10: (3 comments) http://gerrit.cloudera.org:8080/#/c/8023/10/be/src/runtime/krpc-data-stream-recvr.cc File be/src/runtime/krpc-data-stream-recvr.cc: http://gerrit.cloudera.org:8080/#/c/8023/10/be/src/runtime/krpc-data-stream-recvr.cc@408 PS10, Line 408: deferred_rpcs_.front().release(); not needed. http://gerrit.cloudera.org:8080/#/c/8023/10/be/src/runtime/krpc-data-stream-recvr.cc@425 PS10, Line 425: COUNTER_ADD(recvr_->num_deferred_batches_, 1); can be done without holding the lock. http://gerrit.cloudera.org:8080/#/c/8023/10/be/src/runtime/row-batch.h File be/src/runtime/row-batch.h: http://gerrit.cloudera.org:8080/#/c/8023/10/be/src/runtime/row-batch.h@145 PS10, Line 145: tuple_ofsets typo. -- To view, visit http://gerrit.cloudera.org:8080/8023 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: Ic0b8c1e50678da66ab1547d16530f88b323ed8c1 Gerrit-Change-Number: 8023 Gerrit-PatchSet: 10 Gerrit-Owner: Michael Ho Gerrit-Reviewer: Dan Hecht Gerrit-Reviewer: Michael Ho Gerrit-Reviewer: Mostafa Mokhtar Gerrit-Reviewer: Sailesh Mukil Gerrit-Comment-Date: Wed, 08 Nov 2017 19:32:05 + Gerrit-HasComments: Yes
[Impala-ASF-CR] IMPALA-4856: Port data stream service to KRPC
Hello Sailesh Mukil, Mostafa Mokhtar, Dan Hecht, I'd like you to reexamine a change. Please visit http://gerrit.cloudera.org:8080/8023 to look at the new patch set (#10). Change subject: IMPALA-4856: Port data stream service to KRPC .. IMPALA-4856: Port data stream service to KRPC This patch implements a new data stream service which utilizes KRPC. Similar to the thrift RPC implementation, there are 3 major components to the data stream services: KrpcDataStreamSender serializes and sends row batches materialized by a fragment instance to a KrpcDataStreamRecvr. KrpcDataStreamMgr is responsible for routing an incoming row batch to the appropriate receiver. The data stream service runs on the port FLAGS_krpc_port which is 29000 by default. Unlike the implementation with thrift RPC, KRPC provides an asynchronous interface for invoking remote methods. As a result, KrpcDataStreamSender doesn't need to create a thread per connection. There is one connection between two Impalad nodes for each direction (i.e. client and server). Multiple queries can multi-plex on the same connection for transmitting row batches between two Impalad nodes. The asynchronous interface also prevents some issues with thrift RPC in which a thread may be stuck in an RPC call without being able to check for cancellation. A TransmitData() call with KRPC is in essence a trio of RpcController, a serialized protobuf request buffer and a protobuf response buffer. The call is invoked via a DataStreamService proxy object. The serialized tuple offsets and row batches are sent via "sidecars" in KRPC to avoid extra copy into the serialized request buffer. Each impalad node creates a singleton DataStreamService object at start-up time. All incoming calls are served by a service thread pool created as part of DataStreamService. By default, there are 64 service threads. The service threads are shared across all queries so the RPC handler should avoid blocking as much as possible. In thrift RPC implementation, we make a thrift thread handling a TransmitData() RPC to block for extended period of time when the receiver is not yet created when the call arrives. In KRPC implementation, we store TransmitData() or EndDataStream() requests which arrive before the receiver is ready in a per-receiver early sender list stored in KrpcDataStreamMgr. These RPC calls will be processed and responded to when the receiver is created or when timeout occurs. Similarly, there is limited space in the sender queues in KrpcDataStreamRecvr. If adding a row batch to a queue in KrpcDataStreamRecvr causes the buffer limit to exceed, the request will be stashed in a blocked_sender_ queue to be processed later. The stashed RPC request will not be responded to until it is processed so as to exert back pressure to the client. An alternative would be to reply with an error and the request / row batches need to be sent again. This may end up consuming more network bandwidth than the thrift RPC implementation. This change adopts the behavior of allowing one stashed request per sender. All rpc requests and responses are serialized using protobuf. The equivalent of TRowBatch would be ProtoRowBatch which contains a serialized header about the meta-data of the row batch and two Kudu Slice objects which contain pointers to the actual data (i.e. tuple offsets and tuple data). This patch is based on an abandoned patch by Henry Robinson. TESTING --- * Build passed with FLAGS_use_krpc=true. TO DO - * Port some BE tests to KRPC services. Change-Id: Ic0b8c1e50678da66ab1547d16530f88b323ed8c1 --- M be/src/common/status.cc M be/src/common/status.h M be/src/exec/data-sink.cc M be/src/exec/exchange-node.cc M be/src/exec/kudu-util.h M be/src/rpc/CMakeLists.txt M be/src/rpc/rpc-mgr.cc M be/src/rpc/rpc-mgr.h M be/src/runtime/CMakeLists.txt M be/src/runtime/data-stream-mgr-base.h M be/src/runtime/data-stream-mgr.h M be/src/runtime/data-stream-recvr.h M be/src/runtime/data-stream-sender.h M be/src/runtime/exec-env.cc M be/src/runtime/exec-env.h M be/src/runtime/krpc-data-stream-mgr.cc M be/src/runtime/krpc-data-stream-mgr.h M be/src/runtime/krpc-data-stream-recvr.cc M be/src/runtime/krpc-data-stream-recvr.h A be/src/runtime/krpc-data-stream-sender.cc A be/src/runtime/krpc-data-stream-sender.h M be/src/runtime/row-batch.cc M be/src/runtime/row-batch.h M be/src/service/CMakeLists.txt A be/src/service/data-stream-service.cc A be/src/service/data-stream-service.h M be/src/service/impala-server.cc M cmake_modules/FindProtobuf.cmake M common/protobuf/CMakeLists.txt A common/protobuf/common.proto A common/protobuf/data_stream_service.proto A common/protobuf/row_batch.proto M common/thrift/generate_error_codes.py 33 files changed, 3,159 insertions(+), 183 deletions(-) git pull ssh://gerrit.cloudera.org:29418/Impala-ASF refs/changes/23/8023/10 -- To view, visit http://gerrit.cloudera.org:8080/8023 To unsubscribe, vis
[Impala-ASF-CR] IMPALA-4856: Port data stream service to KRPC
Michael Ho has posted comments on this change. ( http://gerrit.cloudera.org:8080/8023 ) Change subject: IMPALA-4856: Port data stream service to KRPC .. Patch Set 10: (6 comments) http://gerrit.cloudera.org:8080/#/c/8023/9/be/src/runtime/krpc-data-stream-mgr.cc File be/src/runtime/krpc-data-stream-mgr.cc: http://gerrit.cloudera.org:8080/#/c/8023/9/be/src/runtime/krpc-data-stream-mgr.cc@62 PS9, Line 62: FLAGS_datastream_service_num_deserialization_threads, 1, > why disallow the setting of this greater than num_cores? Given that it take Yes. The assumption is that the critical section is small and most threads won't block for too long so no point in pushing above number of cores. To be consistent with other knobs we have (e.g. # service threads, reactor threads), we shouldn't impose an implicit upper bound. http://gerrit.cloudera.org:8080/#/c/8023/9/be/src/runtime/krpc-data-stream-recvr.cc File be/src/runtime/krpc-data-stream-recvr.cc: http://gerrit.cloudera.org:8080/#/c/8023/9/be/src/runtime/krpc-data-stream-recvr.cc@67 PS9, Line 67: the > the resources Done http://gerrit.cloudera.org:8080/#/c/8023/9/be/src/runtime/krpc-data-stream-recvr.cc@195 PS9, Line 195: current_ba > current_batch_ Done http://gerrit.cloudera.org:8080/#/c/8023/9/be/src/runtime/krpc-data-stream-recvr.cc@225 PS9, Line 225: DCHECK > shoudl we also DCHECK that num_pending_enqueue_ == 0 (otherwise, we could h Done http://gerrit.cloudera.org:8080/#/c/8023/9/be/src/runtime/krpc-data-stream-recvr.cc@248 PS9, Line 248: current_batch_.reset(resu > this is part of the same operation as line 244 (both are adjusting the queu Done http://gerrit.cloudera.org:8080/#/c/8023/9/be/src/runtime/krpc-data-stream-recvr.cc@252 PS9, Line 252: // It's important that the dequeuing of 'deferred_rpcs_' is done after the entry > how about moving this to line 250 now that it's that scope that drops the l Done -- To view, visit http://gerrit.cloudera.org:8080/8023 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: Ic0b8c1e50678da66ab1547d16530f88b323ed8c1 Gerrit-Change-Number: 8023 Gerrit-PatchSet: 10 Gerrit-Owner: Michael Ho Gerrit-Reviewer: Dan Hecht Gerrit-Reviewer: Michael Ho Gerrit-Reviewer: Mostafa Mokhtar Gerrit-Reviewer: Sailesh Mukil Gerrit-Comment-Date: Wed, 08 Nov 2017 03:35:43 + Gerrit-HasComments: Yes
[Impala-ASF-CR] IMPALA-4856: Port data stream service to KRPC
Michael Ho has posted comments on this change. ( http://gerrit.cloudera.org:8080/8023 ) Change subject: IMPALA-4856: Port data stream service to KRPC .. Patch Set 9: (1 comment) http://gerrit.cloudera.org:8080/#/c/8023/9/be/src/runtime/krpc-data-stream-recvr.h File be/src/runtime/krpc-data-stream-recvr.h: http://gerrit.cloudera.org:8080/#/c/8023/9/be/src/runtime/krpc-data-stream-recvr.h@128 PS9, Line 128: Takes over the RPC payload of an early sender to 'deferred_rpcs_' queue of the Takes over the RPC state of an early sender for deferred processing and kicks... -- To view, visit http://gerrit.cloudera.org:8080/8023 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: Ic0b8c1e50678da66ab1547d16530f88b323ed8c1 Gerrit-Change-Number: 8023 Gerrit-PatchSet: 9 Gerrit-Owner: Michael Ho Gerrit-Reviewer: Dan Hecht Gerrit-Reviewer: Michael Ho Gerrit-Reviewer: Mostafa Mokhtar Gerrit-Reviewer: Sailesh Mukil Gerrit-Comment-Date: Wed, 08 Nov 2017 00:54:50 + Gerrit-HasComments: Yes
[Impala-ASF-CR] IMPALA-4856: Port data stream service to KRPC
Dan Hecht has posted comments on this change. ( http://gerrit.cloudera.org:8080/8023 ) Change subject: IMPALA-4856: Port data stream service to KRPC .. Patch Set 9: (6 comments) http://gerrit.cloudera.org:8080/#/c/8023/9/be/src/runtime/krpc-data-stream-mgr.cc File be/src/runtime/krpc-data-stream-mgr.cc: http://gerrit.cloudera.org:8080/#/c/8023/9/be/src/runtime/krpc-data-stream-mgr.cc@62 PS9, Line 62: min(FLAGS_datastream_service_num_deserialization_threads, CpuInfo::num_cores()), why disallow the setting of this greater than num_cores? Given that it takes locks, there could be some benefit to doing so, right? http://gerrit.cloudera.org:8080/#/c/8023/9/be/src/runtime/krpc-data-stream-recvr.cc File be/src/runtime/krpc-data-stream-recvr.cc: http://gerrit.cloudera.org:8080/#/c/8023/9/be/src/runtime/krpc-data-stream-recvr.cc@67 PS9, Line 67: data the resources http://gerrit.cloudera.org:8080/#/c/8023/9/be/src/runtime/krpc-data-stream-recvr.cc@195 PS9, Line 195: cur_batch_ current_batch_ http://gerrit.cloudera.org:8080/#/c/8023/9/be/src/runtime/krpc-data-stream-recvr.cc@225 PS9, Line 225: DCHECK shoudl we also DCHECK that num_pending_enqueue_ == 0 (otherwise, we could have acquired the lock while it was dropped for deserialization and there is still a row batch)? http://gerrit.cloudera.org:8080/#/c/8023/9/be/src/runtime/krpc-data-stream-recvr.cc@248 PS9, Line 248: batch_queue_.pop_front(); this is part of the same operation as line 244 (both are adjusting the queue state), so mind moving it to be adjacent? http://gerrit.cloudera.org:8080/#/c/8023/9/be/src/runtime/krpc-data-stream-recvr.cc@252 PS9, Line 252: // Don't hold lock when calling EnqueueDeserializeTask() as it may block. how about moving this to line 250 now that it's that scope that drops the lock. Also, it's important that this happens after batch_queue_.pop_front(), right? maybe note that. -- To view, visit http://gerrit.cloudera.org:8080/8023 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: Ic0b8c1e50678da66ab1547d16530f88b323ed8c1 Gerrit-Change-Number: 8023 Gerrit-PatchSet: 9 Gerrit-Owner: Michael Ho Gerrit-Reviewer: Dan Hecht Gerrit-Reviewer: Michael Ho Gerrit-Reviewer: Mostafa Mokhtar Gerrit-Reviewer: Sailesh Mukil Gerrit-Comment-Date: Wed, 08 Nov 2017 01:32:58 + Gerrit-HasComments: Yes
[Impala-ASF-CR] IMPALA-4856: Port data stream service to KRPC
Hello Sailesh Mukil, Mostafa Mokhtar, Dan Hecht, I'd like you to reexamine a change. Please visit http://gerrit.cloudera.org:8080/8023 to look at the new patch set (#9). Change subject: IMPALA-4856: Port data stream service to KRPC .. IMPALA-4856: Port data stream service to KRPC This patch implements a new data stream service which utilizes KRPC. Similar to the thrift RPC implementation, there are 3 major components to the data stream services: KrpcDataStreamSender serializes and sends row batches materialized by a fragment instance to a KrpcDataStreamRecvr. KrpcDataStreamMgr is responsible for routing an incoming row batch to the appropriate receiver. The data stream service runs on the port FLAGS_krpc_port which is 29000 by default. Unlike the implementation with thrift RPC, KRPC provides an asynchronous interface for invoking remote methods. As a result, KrpcDataStreamSender doesn't need to create a thread per connection. There is one connection between two Impalad nodes for each direction (i.e. client and server). Multiple queries can multi-plex on the same connection for transmitting row batches between two Impalad nodes. The asynchronous interface also prevents some issues with thrift RPC in which a thread may be stuck in an RPC call without being able to check for cancellation. A TransmitData() call with KRPC is in essence a trio of RpcController, a serialized protobuf request buffer and a protobuf response buffer. The call is invoked via a DataStreamService proxy object. The serialized tuple offsets and row batches are sent via "sidecars" in KRPC to avoid extra copy into the serialized request buffer. Each impalad node creates a singleton DataStreamService object at start-up time. All incoming calls are served by a service thread pool created as part of DataStreamService. By default, there are 64 service threads. The service threads are shared across all queries so the RPC handler should avoid blocking as much as possible. In thrift RPC implementation, we make a thrift thread handling a TransmitData() RPC to block for extended period of time when the receiver is not yet created when the call arrives. In KRPC implementation, we store TransmitData() or EndDataStream() requests which arrive before the receiver is ready in a per-receiver early sender list stored in KrpcDataStreamMgr. These RPC calls will be processed and responded to when the receiver is created or when timeout occurs. Similarly, there is limited space in the sender queues in KrpcDataStreamRecvr. If adding a row batch to a queue in KrpcDataStreamRecvr causes the buffer limit to exceed, the request will be stashed in a blocked_sender_ queue to be processed later. The stashed RPC request will not be responded to until it is processed so as to exert back pressure to the client. An alternative would be to reply with an error and the request / row batches need to be sent again. This may end up consuming more network bandwidth than the thrift RPC implementation. This change adopts the behavior of allowing one stashed request per sender. All rpc requests and responses are serialized using protobuf. The equivalent of TRowBatch would be ProtoRowBatch which contains a serialized header about the meta-data of the row batch and two Kudu Slice objects which contain pointers to the actual data (i.e. tuple offsets and tuple data). This patch is based on an abandoned patch by Henry Robinson. TESTING --- * Build passed with FLAGS_use_krpc=true. TO DO - * Port some BE tests to KRPC services. Change-Id: Ic0b8c1e50678da66ab1547d16530f88b323ed8c1 --- M be/src/common/status.cc M be/src/common/status.h M be/src/exec/data-sink.cc M be/src/exec/exchange-node.cc M be/src/exec/kudu-util.h M be/src/rpc/CMakeLists.txt M be/src/rpc/rpc-mgr.cc M be/src/rpc/rpc-mgr.h M be/src/runtime/CMakeLists.txt M be/src/runtime/data-stream-mgr-base.h M be/src/runtime/data-stream-mgr.h M be/src/runtime/data-stream-recvr.h M be/src/runtime/data-stream-sender.h M be/src/runtime/exec-env.cc M be/src/runtime/exec-env.h M be/src/runtime/krpc-data-stream-mgr.cc M be/src/runtime/krpc-data-stream-mgr.h M be/src/runtime/krpc-data-stream-recvr.cc M be/src/runtime/krpc-data-stream-recvr.h A be/src/runtime/krpc-data-stream-sender.cc A be/src/runtime/krpc-data-stream-sender.h M be/src/runtime/row-batch.cc M be/src/runtime/row-batch.h M be/src/service/CMakeLists.txt A be/src/service/data-stream-service.cc A be/src/service/data-stream-service.h M be/src/service/impala-server.cc M cmake_modules/FindProtobuf.cmake M common/protobuf/CMakeLists.txt A common/protobuf/common.proto A common/protobuf/data_stream_service.proto A common/protobuf/row_batch.proto M common/thrift/generate_error_codes.py 33 files changed, 3,156 insertions(+), 183 deletions(-) git pull ssh://gerrit.cloudera.org:29418/Impala-ASF refs/changes/23/8023/9 -- To view, visit http://gerrit.cloudera.org:8080/8023 To unsubscribe, visit
[Impala-ASF-CR] IMPALA-4856: Port data stream service to KRPC
Michael Ho has posted comments on this change. ( http://gerrit.cloudera.org:8080/8023 ) Change subject: IMPALA-4856: Port data stream service to KRPC .. Patch Set 9: (21 comments) http://gerrit.cloudera.org:8080/#/c/8023/8/be/src/runtime/krpc-data-stream-mgr.h File be/src/runtime/krpc-data-stream-mgr.h: http://gerrit.cloudera.org:8080/#/c/8023/8/be/src/runtime/krpc-data-stream-mgr.h@435 PS8, Line 435: 'num_requ > 'num_request' requests Done http://gerrit.cloudera.org:8080/#/c/8023/8/be/src/runtime/krpc-data-stream-mgr.cc File be/src/runtime/krpc-data-stream-mgr.cc: http://gerrit.cloudera.org:8080/#/c/8023/8/be/src/runtime/krpc-data-stream-mgr.cc@62 PS8, Line 62: ds, C > how was that chosen? do we have a test case that causes this queue to fill Just a large enough number to reduce the chance of the queue filling up. Yes, need to come up with a test case to fill up this queue. http://gerrit.cloudera.org:8080/#/c/8023/8/be/src/runtime/krpc-data-stream-mgr.cc@109 PS8, Line 109: // Let the receiver take over the RPC payloads of early senders and process them : // asynchronously. : for (unique_ptr this comment seems out of place. this is more an implementation detail of t Done http://gerrit.cloudera.org:8080/#/c/8023/8/be/src/runtime/krpc-data-stream-recvr.h File be/src/runtime/krpc-data-stream-recvr.h: http://gerrit.cloudera.org:8080/#/c/8023/8/be/src/runtime/krpc-data-stream-recvr.h@129 PS8, Line 129: n > and start a deserialization task to process it asynchronously. Done http://gerrit.cloudera.org:8080/#/c/8023/8/be/src/runtime/krpc-data-stream-recvr.h@130 PS8, Line 130: ask to p > this "transfer" is in the oppose direction of how our "Transfer" methods us Renamed to TakeOverEarlySender(). http://gerrit.cloudera.org:8080/#/c/8023/8/be/src/runtime/krpc-data-stream-recvr.h@197 PS8, Line 197: eive > cpu time or wall time? wall-clock time. http://gerrit.cloudera.org:8080/#/c/8023/8/be/src/runtime/krpc-data-stream-recvr.h@200 PS8, Line 200: _; > same question wall-clock time. http://gerrit.cloudera.org:8080/#/c/8023/8/be/src/runtime/krpc-data-stream-recvr.cc File be/src/runtime/krpc-data-stream-recvr.cc: http://gerrit.cloudera.org:8080/#/c/8023/8/be/src/runtime/krpc-data-stream-recvr.cc@67 PS8, Line 67: data > the resources Done http://gerrit.cloudera.org:8080/#/c/8023/8/be/src/runtime/krpc-data-stream-recvr.cc@72 PS8, Line 72: imit, the > RPC state Done http://gerrit.cloudera.org:8080/#/c/8023/8/be/src/runtime/krpc-data-stream-recvr.cc@73 PS8, Line 73: nto 'deferred_ > we shouldn't normally refer to private fields in public class comments, but Done http://gerrit.cloudera.org:8080/#/c/8023/8/be/src/runtime/krpc-data-stream-recvr.cc@87 PS8, Line 87: void TakeOverEarlySender(std::unique_ptr ctx); > same comment as recvr header comment. Done http://gerrit.cloudera.org:8080/#/c/8023/8/be/src/runtime/krpc-data-stream-recvr.cc@195 PS8, Line 195: // cur_batch_ must be replaced with the > I don't think we need this loop. see other comments in this function. Done http://gerrit.cloudera.org:8080/#/c/8023/8/be/src/runtime/krpc-data-stream-recvr.cc@197 PS8, Line 197: atch = nullptr; > nit: consider swapping the order of these so that the fast case comes first Done http://gerrit.cloudera.org:8080/#/c/8023/8/be/src/runtime/krpc-data-stream-recvr.cc@210 PS8, Line 210: } > nit: i think we could do without some of the blank lines in this method to I leave a blank line between each loop exiting conditions. http://gerrit.cloudera.org:8080/#/c/8023/8/be/src/runtime/krpc-data-stream-recvr.cc@211 PS8, Line 211: > Actually, I think the loop exiting condition is not quite right, which led There is an invariant that EOS cannot be sent by a sender when there is outstanding TransmitData() RPC so we should be able to get by by just checking for the termination condition of: (num_remaining_senders_ == 0 && batch_queue_.empty()) http://gerrit.cloudera.org:8080/#/c/8023/8/be/src/runtime/krpc-data-stream-recvr.cc@218 PS8, Line 218: > given that we just checked the other two loop exit conditions, isn't this d Converted to DCHECK(). http://gerrit.cloudera.org:8080/#/c/8023/8/be/src/runtime/krpc-data-stream-recvr.cc@229 PS8, Line 229: d > to parallelize the CPU bound deserialization work. Done http://gerrit.cloudera.org:8080/#/c/8023/8/be/src/runtime/krpc-data-stream-recvr.cc@239 PS8, Line 239: } > once you get rid of the loop, I think you'll be able to eliminate this unlo Done http://gerrit.cloudera.org:8080/#/c/8023/8/be/src/runtime/krpc-data-stream-recvr.cc@389 PS8, Line 389: status.ToProto(ctx->response->mutable_status()); > at this point, 'ctx' effectively takes ownership, right? we should add a co Done http://gerrit.cloudera.org:8080/#/c/8023/8/be/src/runtime/krpc-data-stream-recvr.cc@406 PS8, Line 406: const RowBatchHeaderPB& header = ctx->request->row_bat
[Impala-ASF-CR] IMPALA-4856: Port data stream service to KRPC
Dan Hecht has posted comments on this change. ( http://gerrit.cloudera.org:8080/8023 ) Change subject: IMPALA-4856: Port data stream service to KRPC .. Patch Set 8: (1 comment) http://gerrit.cloudera.org:8080/#/c/8023/8/be/src/runtime/krpc-data-stream-recvr.cc File be/src/runtime/krpc-data-stream-recvr.cc: http://gerrit.cloudera.org:8080/#/c/8023/8/be/src/runtime/krpc-data-stream-recvr.cc@211 PS8, Line 211: deferred_rpcs_.empty() && batch_queue_.empty() > why not write this condition as: Actually, I think the loop exiting condition is not quite right, which led to this confusin conditional. The loop exiting condition for "we're done" should check that there are no more senders, and that there's nothing left to drain from the deferred_rpcs_ queue and that there's no pending insert into batch_queue_. So, the third wait loop condition should be something like: (num_remaining_senders > 0 || !deferred_rpcs_.empty() || num_pending_enqueue_ > 0) and then this if-stmt conditional can just be: if (batch_queue_.empty()) and then the DCHECK can be the negation of that third wait loop conditional: // Wait loop is exited with an empty batch_queue_ only when there will be no more batches. DCHECK(num_remaining_senders == 0 && deferred_rpcs_.empty() && num_pending_enqueue_ == 0); And then you can get rid of the outer loop. That outer loop should be removed since it's effectively a busy wait (and I think we could get into a busy wait state in the previous patchsets). -- To view, visit http://gerrit.cloudera.org:8080/8023 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: Ic0b8c1e50678da66ab1547d16530f88b323ed8c1 Gerrit-Change-Number: 8023 Gerrit-PatchSet: 8 Gerrit-Owner: Michael Ho Gerrit-Reviewer: Dan Hecht Gerrit-Reviewer: Michael Ho Gerrit-Reviewer: Mostafa Mokhtar Gerrit-Reviewer: Sailesh Mukil Gerrit-Comment-Date: Tue, 07 Nov 2017 15:58:21 + Gerrit-HasComments: Yes
[Impala-ASF-CR] IMPALA-4856: Port data stream service to KRPC
Michael Ho has posted comments on this change. ( http://gerrit.cloudera.org:8080/8023 ) Change subject: IMPALA-4856: Port data stream service to KRPC .. Patch Set 8: (1 comment) http://gerrit.cloudera.org:8080/#/c/8023/8/be/src/runtime/krpc-data-stream-sender.cc File be/src/runtime/krpc-data-stream-sender.cc: http://gerrit.cloudera.org:8080/#/c/8023/8/be/src/runtime/krpc-data-stream-sender.cc@434 PS8, Line 434: proxy_->TransmitDataAsync(req, &resp_, &rpc_controller_, May want to call resp_.Clear() too. -- To view, visit http://gerrit.cloudera.org:8080/8023 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: Ic0b8c1e50678da66ab1547d16530f88b323ed8c1 Gerrit-Change-Number: 8023 Gerrit-PatchSet: 8 Gerrit-Owner: Michael Ho Gerrit-Reviewer: Dan Hecht Gerrit-Reviewer: Michael Ho Gerrit-Reviewer: Mostafa Mokhtar Gerrit-Reviewer: Sailesh Mukil Gerrit-Comment-Date: Mon, 06 Nov 2017 22:06:34 + Gerrit-HasComments: Yes
[Impala-ASF-CR] IMPALA-4856: Port data stream service to KRPC
Dan Hecht has posted comments on this change. ( http://gerrit.cloudera.org:8080/8023 ) Change subject: IMPALA-4856: Port data stream service to KRPC .. Patch Set 8: (23 comments) http://gerrit.cloudera.org:8080/#/c/8023/8/be/src/runtime/krpc-data-stream-mgr.h File be/src/runtime/krpc-data-stream-mgr.h: http://gerrit.cloudera.org:8080/#/c/8023/8/be/src/runtime/krpc-data-stream-mgr.h@435 PS8, Line 435: a request 'num_request' requests http://gerrit.cloudera.org:8080/#/c/8023/8/be/src/runtime/krpc-data-stream-mgr.cc File be/src/runtime/krpc-data-stream-mgr.cc: http://gerrit.cloudera.org:8080/#/c/8023/8/be/src/runtime/krpc-data-stream-mgr.cc@62 PS8, Line 62: 1 how was that chosen? do we have a test case that causes this queue to fill up? http://gerrit.cloudera.org:8080/#/c/8023/8/be/src/runtime/krpc-data-stream-mgr.cc@109 PS8, Line 109: // Transfer the early senders into 'deferred_rpcs_' queue of the corresponding : // sender queue. This makes sure new incoming RPCs won't pass these early senders, : // leading to starvation. this comment seems out of place. this is more an implementation detail of the receiver and handled properly inside ProcessEarlySender(). You could incorporate this in the comment for ProcessEarlySender() (to motivate why it uses the deferred_rpcs_ queue). http://gerrit.cloudera.org:8080/#/c/8023/8/be/src/runtime/krpc-data-stream-recvr.h File be/src/runtime/krpc-data-stream-recvr.h: http://gerrit.cloudera.org:8080/#/c/8023/8/be/src/runtime/krpc-data-stream-recvr.h@129 PS8, Line 129: . and start a deserialization task to process it asynchronously. http://gerrit.cloudera.org:8080/#/c/8023/8/be/src/runtime/krpc-data-stream-recvr.h@130 PS8, Line 130: Transfer this "transfer" is in the oppose direction of how our "Transfer" methods usually go (e.g. src->TransferResourcesOwnership(dest)). Maybe call this ProcessEarlySender() (though I don't love "process" either since it's so vague). http://gerrit.cloudera.org:8080/#/c/8023/8/be/src/runtime/krpc-data-stream-recvr.h@197 PS8, Line 197: time cpu time or wall time? http://gerrit.cloudera.org:8080/#/c/8023/8/be/src/runtime/krpc-data-stream-recvr.h@200 PS8, Line 200: time same question http://gerrit.cloudera.org:8080/#/c/8023/8/be/src/runtime/krpc-data-stream-recvr.cc File be/src/runtime/krpc-data-stream-recvr.cc: http://gerrit.cloudera.org:8080/#/c/8023/8/be/src/runtime/krpc-data-stream-recvr.cc@67 PS8, Line 67: data the resources http://gerrit.cloudera.org:8080/#/c/8023/8/be/src/runtime/krpc-data-stream-recvr.cc@72 PS8, Line 72: 'payload' RPC state http://gerrit.cloudera.org:8080/#/c/8023/8/be/src/runtime/krpc-data-stream-recvr.cc@73 PS8, Line 73: deferred_rpcs_ we shouldn't normally refer to private fields in public class comments, but given this is an internal class to the recvr, we can leave this. http://gerrit.cloudera.org:8080/#/c/8023/8/be/src/runtime/krpc-data-stream-recvr.cc@87 PS8, Line 87: void TransferEarlySender(std::unique_ptr ctx); same comment as recvr header comment. http://gerrit.cloudera.org:8080/#/c/8023/8/be/src/runtime/krpc-data-stream-recvr.cc@195 PS8, Line 195: while (current_batch_.get() == nullptr) { I don't think we need this loop. see other comments in this function. http://gerrit.cloudera.org:8080/#/c/8023/8/be/src/runtime/krpc-data-stream-recvr.cc@197 PS8, Line 197: !is_cancelled_ && batch_queue_.empty() nit: consider swapping the order of these so that the fast case comes first (!batch_queue_.empty()) but also to match the comment ("or we know we're done" corresponds to the is_cancelled_ and num_remaining_senders_ == 0 cases). http://gerrit.cloudera.org:8080/#/c/8023/8/be/src/runtime/krpc-data-stream-recvr.cc@201 PS8, Line 201: CANCEL_SAFE_SCOPED_TIMER(recvr_->data_arrival_timer_, &is_cancelled_); : CANCEL_SAFE_SCOPED_TIMER(recvr_->inactive_timer_, &is_cancelled_); : CANCEL_SAFE_SCOPED_TIMER( : received_first_batch_ ? nullptr : recvr_->first_batch_wait_total_timer_, : &is_cancelled_); there's got to be a cleaner way to do this but ignore for now http://gerrit.cloudera.org:8080/#/c/8023/8/be/src/runtime/krpc-data-stream-recvr.cc@210 PS8, Line 210: nit: i think we could do without some of the blank lines in this method to make more code fit on a screen http://gerrit.cloudera.org:8080/#/c/8023/8/be/src/runtime/krpc-data-stream-recvr.cc@211 PS8, Line 211: deferred_rpcs_.empty() && batch_queue_.empty() why not write this condition as: num_renaming_senders_ == 0 then, it's more clear that these three conditions correspond to the loop exit conditions. http://gerrit.cloudera.org:8080/#/c/8023/8/be/src/runtime/krpc-data-stream-recvr.cc@218 PS8, Line 218: !batch_queue_.empty() given that we just checked the other two loop exit conditions, isn't this definitely true? i
[Impala-ASF-CR] IMPALA-4856: Port data stream service to KRPC
Michael Ho has posted comments on this change. ( http://gerrit.cloudera.org:8080/8023 ) Change subject: IMPALA-4856: Port data stream service to KRPC .. Patch Set 8: (36 comments) http://gerrit.cloudera.org:8080/#/c/8023/7/be/src/runtime/krpc-data-stream-mgr.h File be/src/runtime/krpc-data-stream-mgr.h: http://gerrit.cloudera.org:8080/#/c/8023/7/be/src/runtime/krpc-data-stream-mgr.h@102 PS7, Line 102: the RPC state is : /// saved into the receiver's 'deferred_r > it's not really the batch added. and it's not just a single structure for t Done http://gerrit.cloudera.org:8080/#/c/8023/7/be/src/runtime/krpc-data-stream-mgr.h@104 PS7, Line 104: C is removed from the 'deferr > how about: ... from a deferred RPC queue and the row batch is deserialized. Done http://gerrit.cloudera.org:8080/#/c/8023/7/be/src/runtime/krpc-data-stream-mgr.h@393 PS7, Line 393: EarlySendersList& operator=(EarlySendersList&& other) { > quick comment for why we define a move constructor and move operator=, sinc Done http://gerrit.cloudera.org:8080/#/c/8023/7/be/src/runtime/krpc-data-stream-mgr.h@430 PS7, Line 430: n > id Done http://gerrit.cloudera.org:8080/#/c/8023/7/be/src/runtime/krpc-data-stream-mgr.h@436 PS7, Line 436: nt instan > identifies Done http://gerrit.cloudera.org:8080/#/c/8023/7/be/src/runtime/krpc-data-stream-mgr.cc File be/src/runtime/krpc-data-stream-mgr.cc: http://gerrit.cloudera.org:8080/#/c/8023/7/be/src/runtime/krpc-data-stream-mgr.cc@61 PS7, Line 61: > nit: blank space Done http://gerrit.cloudera.org:8080/#/c/8023/7/be/src/runtime/krpc-data-stream-mgr.cc@217 PS7, Line 217: > that shouldn't be possible in the DEFERRED_BATCHES case, right? so i'd prob This is actually possible if the receiver is closed before the deserialization threads execute this task. http://gerrit.cloudera.org:8080/#/c/8023/7/be/src/runtime/krpc-data-stream-mgr.cc@236 PS7, Line 236: << " node_id=" << r > why is this possible in the waiting_sender_ctxs case but not the closed_sen The response for the closed_sender_ctxs case is the same regardless of whether the receiver is unregistered or not. http://gerrit.cloudera.org:8080/#/c/8023/7/be/src/runtime/krpc-data-stream-mgr.cc@247 PS7, Line 247: !already_unregistere > why is that possible? It's possible that the receiver is cancelled and closed before the deserialization thread gets around to processing this request. In which case, the deferred RPCs would all be replied to in KrpcDataStreamRecvr::SenderQueue::Cancel(). http://gerrit.cloudera.org:8080/#/c/8023/7/be/src/runtime/krpc-data-stream-mgr.cc@248 PS7, Line 248: AddEarlyClosedSender(finst_id, request, > So I guess we no longer multithread within a single sender queue (and for n PS8 will queue multiple deserialization requests to drain multiple deferred RPCs at the same time. http://gerrit.cloudera.org:8080/#/c/8023/7/be/src/runtime/krpc-data-stream-recvr.h File be/src/runtime/krpc-data-stream-recvr.h: http://gerrit.cloudera.org:8080/#/c/8023/7/be/src/runtime/krpc-data-stream-recvr.h@78 PS7, Line 78: The caller must call TransferAllResources() : /// to acquire dat > is that talking about calling TransferAllResources(), or can the caller do Yes, it's TransferAllResources(). Comments clarified. http://gerrit.cloudera.org:8080/#/c/8023/7/be/src/runtime/krpc-data-stream-recvr.cc File be/src/runtime/krpc-data-stream-recvr.cc: http://gerrit.cloudera.org:8080/#/c/8023/7/be/src/runtime/krpc-data-stream-recvr.cc@72 PS7, Line 72: h > typo Done http://gerrit.cloudera.org:8080/#/c/8023/7/be/src/runtime/krpc-data-stream-recvr.cc@97 PS7, Line 97: > the HasSpace name seems wrong for condition (1). From the name HasSpace, I Done http://gerrit.cloudera.org:8080/#/c/8023/7/be/src/runtime/krpc-data-stream-recvr.cc@113 PS7, Line 113: B* r > deserialized or serialized size? deserialized size http://gerrit.cloudera.org:8080/#/c/8023/7/be/src/runtime/krpc-data-stream-recvr.cc@169 PS7, Line 169: > these aren't really deferred batches though. They are deferred RPCs (which Renamed to deferred_rpcs_. http://gerrit.cloudera.org:8080/#/c/8023/7/be/src/runtime/krpc-data-stream-recvr.cc@220 PS7, Line 220: recvr_->num_buffered_bytes_.Add(-batch_queue_.front().first); > rather than explaining what, we should explain why: Simplified in the new patch. http://gerrit.cloudera.org:8080/#/c/8023/7/be/src/runtime/krpc-data-stream-recvr.cc@224 PS7, Line 224: batch_queue_.pop_front(); > why do we need to drop the lock? Comments added. TriggerDeferredBatchesDrain() may block. Holding the lock here may lead to dead-lock. http://gerrit.cloudera.org:8080/#/c/8023/7/be/src/runtime/krpc-data-stream-recvr.cc@263 PS7, Line 263: KUDU_RETURN_IF_ERROR(rpc_context->GetInboundSidecar( > let's add a comment: Done http://gerrit.cloudera.org:8080/#/c/8023/7/be/src/runtime/krpc-data-stream-recvr.cc@269 PS7, Line 269: e data sidecar");
[Impala-ASF-CR] IMPALA-4856: Port data stream service to KRPC
Hello Sailesh Mukil, Mostafa Mokhtar, Dan Hecht, I'd like you to reexamine a change. Please visit http://gerrit.cloudera.org:8080/8023 to look at the new patch set (#8). Change subject: IMPALA-4856: Port data stream service to KRPC .. IMPALA-4856: Port data stream service to KRPC This patch implements a new data stream service which utilizes KRPC. Similar to the thrift RPC implementation, there are 3 major components to the data stream services: KrpcDataStreamSender serializes and sends row batches materialized by a fragment instance to a KrpcDataStreamRecvr. KrpcDataStreamMgr is responsible for routing an incoming row batch to the appropriate receiver. The data stream service runs on the port FLAGS_krpc_port which is 29000 by default. Unlike the implementation with thrift RPC, KRPC provides an asynchronous interface for invoking remote methods. As a result, KrpcDataStreamSender doesn't need to create a thread per connection. There is one connection between two Impalad nodes for each direction (i.e. client and server). Multiple queries can multi-plex on the same connection for transmitting row batches between two Impalad nodes. The asynchronous interface also prevents some issues with thrift RPC in which a thread may be stuck in an RPC call without being able to check for cancellation. A TransmitData() call with KRPC is in essence a trio of RpcController, a serialized protobuf request buffer and a protobuf response buffer. The call is invoked via a DataStreamService proxy object. The serialized tuple offsets and row batches are sent via "sidecars" in KRPC to avoid extra copy into the serialized request buffer. Each impalad node creates a singleton DataStreamService object at start-up time. All incoming calls are served by a service thread pool created as part of DataStreamService. By default, there are 64 service threads. The service threads are shared across all queries so the RPC handler should avoid blocking as much as possible. In thrift RPC implementation, we make a thrift thread handling a TransmitData() RPC to block for extended period of time when the receiver is not yet created when the call arrives. In KRPC implementation, we store TransmitData() or EndDataStream() requests which arrive before the receiver is ready in a per-receiver early sender list stored in KrpcDataStreamMgr. These RPC calls will be processed and responded to when the receiver is created or when timeout occurs. Similarly, there is limited space in the sender queues in KrpcDataStreamRecvr. If adding a row batch to a queue in KrpcDataStreamRecvr causes the buffer limit to exceed, the request will be stashed in a blocked_sender_ queue to be processed later. The stashed RPC request will not be responded to until it is processed so as to exert back pressure to the client. An alternative would be to reply with an error and the request / row batches need to be sent again. This may end up consuming more network bandwidth than the thrift RPC implementation. This change adopts the behavior of allowing one stashed request per sender. All rpc requests and responses are serialized using protobuf. The equivalent of TRowBatch would be ProtoRowBatch which contains a serialized header about the meta-data of the row batch and two Kudu Slice objects which contain pointers to the actual data (i.e. tuple offsets and tuple data). This patch is based on an abandoned patch by Henry Robinson. TESTING --- * Build passed with FLAGS_use_krpc=true. TO DO - * Port some BE tests to KRPC services. Change-Id: Ic0b8c1e50678da66ab1547d16530f88b323ed8c1 --- M be/src/common/status.cc M be/src/common/status.h M be/src/exec/data-sink.cc M be/src/exec/exchange-node.cc M be/src/exec/kudu-util.h M be/src/rpc/CMakeLists.txt M be/src/rpc/rpc-mgr.cc M be/src/rpc/rpc-mgr.h M be/src/runtime/CMakeLists.txt M be/src/runtime/data-stream-mgr-base.h M be/src/runtime/data-stream-mgr.h M be/src/runtime/data-stream-recvr.h M be/src/runtime/data-stream-sender.h M be/src/runtime/exec-env.cc M be/src/runtime/exec-env.h M be/src/runtime/krpc-data-stream-mgr.cc M be/src/runtime/krpc-data-stream-mgr.h M be/src/runtime/krpc-data-stream-recvr.cc M be/src/runtime/krpc-data-stream-recvr.h A be/src/runtime/krpc-data-stream-sender.cc A be/src/runtime/krpc-data-stream-sender.h M be/src/runtime/row-batch.cc M be/src/runtime/row-batch.h M be/src/service/CMakeLists.txt A be/src/service/data-stream-service.cc A be/src/service/data-stream-service.h M be/src/service/impala-server.cc M cmake_modules/FindProtobuf.cmake M common/protobuf/CMakeLists.txt A common/protobuf/common.proto A common/protobuf/data_stream_service.proto A common/protobuf/row_batch.proto M common/thrift/generate_error_codes.py 33 files changed, 3,138 insertions(+), 183 deletions(-) git pull ssh://gerrit.cloudera.org:29418/Impala-ASF refs/changes/23/8023/8 -- To view, visit http://gerrit.cloudera.org:8080/8023 To unsubscribe, visit
[Impala-ASF-CR] IMPALA-4856: Port data stream service to KRPC
Michael Ho has posted comments on this change. ( http://gerrit.cloudera.org:8080/8023 ) Change subject: IMPALA-4856: Port data stream service to KRPC .. Patch Set 7: (2 comments) http://gerrit.cloudera.org:8080/#/c/8023/7/be/src/runtime/krpc-data-stream-mgr.cc File be/src/runtime/krpc-data-stream-mgr.cc: http://gerrit.cloudera.org:8080/#/c/8023/7/be/src/runtime/krpc-data-stream-mgr.cc@199 PS7, Line 199: DeserializeTask payload = : {DeserializeTaskType::EARLY_SENDERS, finst_id, dest_node_id, 0}; : deserialize_pool_.Offer(move(payload)); > doesn't this mean we make early sender draining single threaded? shoudl we Yes. Actually, I just realized that early senders may also be passed by incoming row batches before they are deserialized by the deserialization thread pool, leading to extended response time for early senders. A simpler scheme would be to actually put the early senders into the 'deferred_batches' queue of the corresponding sender's queue so new incoming row batches cannot pass it. Regarding the parallelism for draining the deferred_batches queue, one simple thing to do is to enqueue as many deserialization requests as there are entries in the 'deferred_batch' queue. The deserialization thread logic will simply peek the first entry of the queue and try to insert it if there is space. An entry is popped off the queue only if it can be inserted. This may be wasteful if the 'batch_queue' fills up before all deserialization thread requests are drained but hopefully the peeking logic shouldn't take too long. We can be more fancy and record the deserialized size of each entry in deferred_batches_ and determine how many entries we can pop off deferred_batches_ queue without going over the limit. http://gerrit.cloudera.org:8080/#/c/8023/7/be/src/runtime/krpc-data-stream-mgr.cc@235 PS7, Line 235: for (const unique_ptr& ctx : early_senders.waiting_sender_ctxs) { > shouldn't we process waiting_sender_ctxs before closed_sender_ctxs? Otherwi Yes, it's impossible for the same sender in both queues at the same time but yeah, I can switch the order if it's easier to understand. -- To view, visit http://gerrit.cloudera.org:8080/8023 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: Ic0b8c1e50678da66ab1547d16530f88b323ed8c1 Gerrit-Change-Number: 8023 Gerrit-PatchSet: 7 Gerrit-Owner: Michael Ho Gerrit-Reviewer: Dan Hecht Gerrit-Reviewer: Michael Ho Gerrit-Reviewer: Mostafa Mokhtar Gerrit-Reviewer: Sailesh Mukil Gerrit-Comment-Date: Sat, 04 Nov 2017 04:00:00 + Gerrit-HasComments: Yes
[Impala-ASF-CR] IMPALA-4856: Port data stream service to KRPC
Dan Hecht has posted comments on this change. ( http://gerrit.cloudera.org:8080/8023 ) Change subject: IMPALA-4856: Port data stream service to KRPC .. Patch Set 7: (37 comments) http://gerrit.cloudera.org:8080/#/c/8023/7/be/src/runtime/krpc-data-stream-mgr.h File be/src/runtime/krpc-data-stream-mgr.h: http://gerrit.cloudera.org:8080/#/c/8023/7/be/src/runtime/krpc-data-stream-mgr.h@102 PS7, Line 102: the batch is added : /// to the receiver's 'deferred_batches_' it's not really the batch added. and it's not just a single structure for the receiver (it may go into one of many queues for merging exchange). So how about saying: ... the RPC state is saved into a deferred queue. http://gerrit.cloudera.org:8080/#/c/8023/7/be/src/runtime/krpc-data-stream-mgr.h@104 PS7, Line 104: from the pending sender queue how about: ... from a deferred RPC queue and the row batch is deserialized. http://gerrit.cloudera.org:8080/#/c/8023/7/be/src/runtime/krpc-data-stream-mgr.h@393 PS7, Line 393: quick comment for why we define a move constructor and move operator=, since we don't typically want to define those. http://gerrit.cloudera.org:8080/#/c/8023/7/be/src/runtime/krpc-data-stream-mgr.cc File be/src/runtime/krpc-data-stream-mgr.cc: http://gerrit.cloudera.org:8080/#/c/8023/7/be/src/runtime/krpc-data-stream-mgr.cc@199 PS7, Line 199: DeserializeTask payload = : {DeserializeTaskType::EARLY_SENDERS, finst_id, dest_node_id, 0}; : deserialize_pool_.Offer(move(payload)); doesn't this mean we make early sender draining single threaded? shoudl we instead use the sender_id in this case as well and offer work per sender? or do we think this doesn't matter? http://gerrit.cloudera.org:8080/#/c/8023/7/be/src/runtime/krpc-data-stream-mgr.cc@217 PS7, Line 217: already_unregistered that shouldn't be possible in the DEFERRED_BATCHES case, right? so i'd probably move this DCHECK into the cases below so you can tighten it up. http://gerrit.cloudera.org:8080/#/c/8023/7/be/src/runtime/krpc-data-stream-mgr.cc@235 PS7, Line 235: for (const unique_ptr& ctx : early_senders.waiting_sender_ctxs) { shouldn't we process waiting_sender_ctxs before closed_sender_ctxs? Otherwise, if the same sender is in both lists we'll process those RPCs out of order. I guess that can't really happen given the current implementation of not responding to early RPCs and that senders only let one in flight, but it still seems to make more sense to do it the other way around. http://gerrit.cloudera.org:8080/#/c/8023/7/be/src/runtime/krpc-data-stream-mgr.cc@236 PS7, Line 236: already_unregistered why is this possible in the waiting_sender_ctxs case but not the closed_sender_ctxs case? http://gerrit.cloudera.org:8080/#/c/8023/7/be/src/runtime/krpc-data-stream-mgr.cc@247 PS7, Line 247: already_unregistered why is that possible? http://gerrit.cloudera.org:8080/#/c/8023/7/be/src/runtime/krpc-data-stream-mgr.cc@248 PS7, Line 248: recvr->AddDeferredBatches(task.sender_id); So I guess we no longer multithread within a single sender queue (and for non-merging, within a single receiver) doing it this way. I think that's okay but was it intentional? http://gerrit.cloudera.org:8080/#/c/8023/7/be/src/runtime/krpc-data-stream-recvr.h File be/src/runtime/krpc-data-stream-recvr.h: http://gerrit.cloudera.org:8080/#/c/8023/7/be/src/runtime/krpc-data-stream-recvr.h@78 PS7, Line 78: The caller must acquire data from the : /// returned batch is that talking about calling TransferAllResources(), or can the caller do it directly? http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-recvr.cc File be/src/runtime/krpc-data-stream-recvr.cc: http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-recvr.cc@127 PS6, Line 127: // If true, the receiver fragment for this stream got cancelled. > For the non-merging case, there is essentially only one queue. As mentioned elsewhere, I'm not totally convinced yet that this is the right way to do it but, yes, we can think about it more and change it later if necessary. http://gerrit.cloudera.org:8080/#/c/8023/7/be/src/runtime/krpc-data-stream-recvr.cc File be/src/runtime/krpc-data-stream-recvr.cc: http://gerrit.cloudera.org:8080/#/c/8023/7/be/src/runtime/krpc-data-stream-recvr.cc@72 PS7, Line 72: s typo http://gerrit.cloudera.org:8080/#/c/8023/7/be/src/runtime/krpc-data-stream-recvr.cc@77 PS7, Line 77: Adds as many deferred batches as possible hmm I'm still not convinced this is the right thing to do (in the merging case). It seems like it's left up to chance as to the order that deferred batches are drained across the sender queues. But we can think about this more and address it later. http://gerrit.cloudera.org:8080/#/c/8023/7/be/src/runtime/krpc-data-stream-recvr.cc@97 PS7, Line 97: (1) 'batch_queue' is empty and there is no p
[Impala-ASF-CR] IMPALA-4856: Port data stream service to KRPC
Michael Ho has posted comments on this change. ( http://gerrit.cloudera.org:8080/8023 ) Change subject: IMPALA-4856: Port data stream service to KRPC .. Patch Set 7: (5 comments) http://gerrit.cloudera.org:8080/#/c/8023/7/be/src/runtime/krpc-data-stream-mgr.h File be/src/runtime/krpc-data-stream-mgr.h: http://gerrit.cloudera.org:8080/#/c/8023/7/be/src/runtime/krpc-data-stream-mgr.h@430 PS7, Line 430: i id http://gerrit.cloudera.org:8080/#/c/8023/7/be/src/runtime/krpc-data-stream-mgr.h@436 PS7, Line 436: specifies identifies http://gerrit.cloudera.org:8080/#/c/8023/7/be/src/runtime/krpc-data-stream-mgr.cc File be/src/runtime/krpc-data-stream-mgr.cc: http://gerrit.cloudera.org:8080/#/c/8023/7/be/src/runtime/krpc-data-stream-mgr.cc@61 PS7, Line 61: nit: blank space http://gerrit.cloudera.org:8080/#/c/8023/7/be/src/runtime/krpc-data-stream-sender.h File be/src/runtime/krpc-data-stream-sender.h: http://gerrit.cloudera.org:8080/#/c/8023/7/be/src/runtime/krpc-data-stream-sender.h@136 PS7, Line 136: // nit: /// http://gerrit.cloudera.org:8080/#/c/8023/7/be/src/runtime/krpc-data-stream-sender.cc File be/src/runtime/krpc-data-stream-sender.cc: http://gerrit.cloudera.org:8080/#/c/8023/7/be/src/runtime/krpc-data-stream-sender.cc@182 PS7, Line 182: It points to delete. -- To view, visit http://gerrit.cloudera.org:8080/8023 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: Ic0b8c1e50678da66ab1547d16530f88b323ed8c1 Gerrit-Change-Number: 8023 Gerrit-PatchSet: 7 Gerrit-Owner: Michael Ho Gerrit-Reviewer: Dan Hecht Gerrit-Reviewer: Michael Ho Gerrit-Reviewer: Mostafa Mokhtar Gerrit-Reviewer: Sailesh Mukil Gerrit-Comment-Date: Fri, 03 Nov 2017 18:13:57 + Gerrit-HasComments: Yes
[Impala-ASF-CR] IMPALA-4856: Port data stream service to KRPC
Michael Ho has posted comments on this change. ( http://gerrit.cloudera.org:8080/8023 ) Change subject: IMPALA-4856: Port data stream service to KRPC .. Patch Set 7: (81 comments) http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/exec/kudu-util.h File be/src/exec/kudu-util.h: http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/exec/kudu-util.h@45 PS6, Line 45: .ok()) > nit: use same mangling between this and KUDU_RETURN_IF_ERROR (prepend seems Done http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/exec-env.cc File be/src/runtime/exec-env.cc: http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/exec-env.cc@89 PS6, Line 89: "processing threads. If left at default value 0, it will be set to number of CPU " > how are these defaults chosen? I didn't really change these parameters from Henry's patch. There are no good defaults for the service queue size any way (IMPALA-6116) but 1024 seems to be a reasonable bound for untracked memory consumption. The latest PS sets num_svc_threads to match the number of cores on the system. http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-mgr.h File be/src/runtime/krpc-data-stream-mgr.h: http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-mgr.h@97 PS6, Line 97: etion at any one : /// time). The rate of transmission is controlled by the receiver: a sender will only : /// schedule batch transmission when the previous transmissi > i'm confused what the "two" cases are from this comment. Also, I think it' Done http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-mgr.h@103 PS6, Line 103: batch que > deserialized? Done http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-mgr.h@104 PS6, Line 104: moved from t > what is this "respectively" referring to? The two cases. Removed as this seems unnecessary. http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-mgr.h@249 PS6, Line 249: for Transm > unclear what that means, maybe stale comment? And this should say something Done http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-mgr.h@258 PS6, Line 258: > isn't that part of "memory pointed to by 'request'"? If so and you want to Done http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-mgr.h@294 PS6, Line 294: > How about getting rid of this typedef? The code seems easier to understand This data structure has been renamed to DeserializeTask and also changed in the latest patch. http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-mgr.h@428 PS6, Line 428: > a sender Done http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-mgr.h File be/src/runtime/krpc-data-stream-mgr.h: http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-mgr.h@314 PS3, Line 314: t > Done Oops..missed it. Will update in later patch. http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-mgr.cc File be/src/runtime/krpc-data-stream-mgr.cc: http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-mgr.cc@59 PS6, Line 59: > I don't have a strong preference either way, but it'd be nice to be consist My general guideline is to use lambda if it's one or two liners and use bind for larger functions. I guess there many different opinions on this topic. I try to optimize for readability. http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-mgr.cc@322 PS6, Line 322: tonicMillis() + STREAM_E > shouldn't be plural Done http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-recvr.h File be/src/runtime/krpc-data-stream-recvr.h: http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-recvr.h@119 PS6, Line 119: row batch i > row batch is deserialized Done http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-recvr.cc File be/src/runtime/krpc-data-stream-recvr.cc: http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-recvr.cc@52 PS6, Line 52: total amount of > it's not clear what that means just from reading the comment. It'd be nice Done http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-recvr.cc@113 PS6, Line 113: erialized > how about using unique_ptr since this owns the row batch (until it's transf Done http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-recvr.cc@127 PS6, Line 127: // If true, the receiver fragment for this stream got cancelled. > given that a single soft limit is imposed across all sender queues, does it For the non-merging case, there is essentially only one queue. For the merging case, I can see that it's possible for a blocked row batch to a sender queue being passed by either new incoming row batches or blocked row batches to another sender queue. H
[Impala-ASF-CR] IMPALA-4856: Port data stream service to KRPC
Hello Sailesh Mukil, Mostafa Mokhtar, Dan Hecht, I'd like you to reexamine a change. Please visit http://gerrit.cloudera.org:8080/8023 to look at the new patch set (#7). Change subject: IMPALA-4856: Port data stream service to KRPC .. IMPALA-4856: Port data stream service to KRPC This patch implements a new data stream service which utilizes KRPC. Similar to the thrift RPC implementation, there are 3 major components to the data stream services: KrpcDataStreamSender serializes and sends row batches materialized by a fragment instance to a KrpcDataStreamRecvr. KrpcDataStreamMgr is responsible for routing an incoming row batch to the appropriate receiver. The data stream service runs on the port FLAGS_krpc_port which is 29000 by default. Unlike the implementation with thrift RPC, KRPC provides an asynchronous interface for invoking remote methods. As a result, KrpcDataStreamSender doesn't need to create a thread per connection. There is one connection between two Impalad nodes for each direction (i.e. client and server). Multiple queries can multi-plex on the same connection for transmitting row batches between two Impalad nodes. The asynchronous interface also prevents some issues with thrift RPC in which a thread may be stuck in an RPC call without being able to check for cancellation. A TransmitData() call with KRPC is in essence a trio of RpcController, a serialized protobuf request buffer and a protobuf response buffer. The call is invoked via a DataStreamService proxy object. The serialized tuple offsets and row batches are sent via "sidecars" in KRPC to avoid extra copy into the serialized request buffer. Each impalad node creates a singleton DataStreamService object at start-up time. All incoming calls are served by a service thread pool created as part of DataStreamService. By default, there are 64 service threads. The service threads are shared across all queries so the RPC handler should avoid blocking as much as possible. In thrift RPC implementation, we make a thrift thread handling a TransmitData() RPC to block for extended period of time when the receiver is not yet created when the call arrives. In KRPC implementation, we store TransmitData() or EndDataStream() requests which arrive before the receiver is ready in a per-receiver early sender list stored in KrpcDataStreamMgr. These RPC calls will be processed and responded to when the receiver is created or when timeout occurs. Similarly, there is limited space in the sender queues in KrpcDataStreamRecvr. If adding a row batch to a queue in KrpcDataStreamRecvr causes the buffer limit to exceed, the request will be stashed in a blocked_sender_ queue to be processed later. The stashed RPC request will not be responded to until it is processed so as to exert back pressure to the client. An alternative would be to reply with an error and the request / row batches need to be sent again. This may end up consuming more network bandwidth than the thrift RPC implementation. This change adopts the behavior of allowing one stashed request per sender. All rpc requests and responses are serialized using protobuf. The equivalent of TRowBatch would be ProtoRowBatch which contains a serialized header about the meta-data of the row batch and two Kudu Slice objects which contain pointers to the actual data (i.e. tuple offsets and tuple data). This patch is based on an abandoned patch by Henry Robinson. TESTING --- * Build passed with FLAGS_use_krpc=true. TO DO - * Port some BE tests to KRPC services. Change-Id: Ic0b8c1e50678da66ab1547d16530f88b323ed8c1 --- M be/src/common/status.cc M be/src/common/status.h M be/src/exec/data-sink.cc M be/src/exec/exchange-node.cc M be/src/exec/kudu-util.h M be/src/rpc/CMakeLists.txt M be/src/rpc/rpc-mgr.cc M be/src/rpc/rpc-mgr.h M be/src/runtime/CMakeLists.txt M be/src/runtime/data-stream-mgr-base.h M be/src/runtime/data-stream-mgr.h M be/src/runtime/data-stream-recvr.h M be/src/runtime/data-stream-sender.h M be/src/runtime/exec-env.cc M be/src/runtime/exec-env.h M be/src/runtime/krpc-data-stream-mgr.cc M be/src/runtime/krpc-data-stream-mgr.h M be/src/runtime/krpc-data-stream-recvr.cc M be/src/runtime/krpc-data-stream-recvr.h A be/src/runtime/krpc-data-stream-sender.cc A be/src/runtime/krpc-data-stream-sender.h M be/src/runtime/row-batch.cc M be/src/runtime/row-batch.h M be/src/service/CMakeLists.txt A be/src/service/data-stream-service.cc A be/src/service/data-stream-service.h M be/src/service/impala-server.cc M cmake_modules/FindProtobuf.cmake M common/protobuf/CMakeLists.txt A common/protobuf/common.proto A common/protobuf/data_stream_service.proto A common/protobuf/row_batch.proto M common/thrift/generate_error_codes.py 33 files changed, 3,124 insertions(+), 180 deletions(-) git pull ssh://gerrit.cloudera.org:29418/Impala-ASF refs/changes/23/8023/7 -- To view, visit http://gerrit.cloudera.org:8080/8023 To unsubscribe, visit
[Impala-ASF-CR] IMPALA-4856: Port data stream service to KRPC
Dan Hecht has posted comments on this change. ( http://gerrit.cloudera.org:8080/8023 ) Change subject: IMPALA-4856: Port data stream service to KRPC .. Patch Set 6: (8 comments) Here's my last set of comments for this round. http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-mgr.cc File be/src/runtime/krpc-data-stream-mgr.cc: http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-mgr.cc@59 PS6, Line 59: boost::bind I don't have a strong preference either way, but it'd be nice to be consistent between either using bind or [], rather than both... http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-mgr.cc@322 PS6, Line 322: RespondToTimedOutSenders shouldn't be plural http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-recvr.cc File be/src/runtime/krpc-data-stream-recvr.cc: http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-recvr.cc@52 PS6, Line 52: overflows the queue it's not clear what that means just from reading the comment. It'd be nice to briefly explain that this is talking about the soft limit of the number of bytes across all sender queues. http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-recvr.cc@105 PS6, Line 105: // number of pending row batch insertion. : int num_pending_enqueue_ = 0; it's not clear what a "pending insertion" means or why we have this. http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-recvr.cc@113 PS6, Line 113: RowBatch* how about using unique_ptr since this owns the row batch (until it's transferred to current_batch_)? http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-recvr.cc@127 PS6, Line 127: queue> blocked_senders_; given that a single soft limit is imposed across all sender queues, does it make sense that the blocked_senders_ are maintained per sender? Why don't we maintain a single blocked_senders_ list per datastream recvr? Hmm, I guess we need to know if this sender has a blocked sender in GetBatch(). But given the single limit, it seems wrong that one sender's row batches can bypass another sender once we get into the blocked sender situation. i.e. the flow of batches across senders seems quite different depending on when the limit was reached. http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-recvr.cc@231 PS6, Line 231: proto_batch update http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-recvr.cc@322 PS6, Line 322: payload->rpc_context->RespondSuccess(); doing this in Close() goes against the paradigm that Close() is only about releasing (local) resources. We've been going that way because there might be no where to bubble up a status from Close(). At least RespondSuccess() doesn't return a status, I suppose. But is there any place sooner we could do this? Does it make sense to do in during Cancel instead? -- To view, visit http://gerrit.cloudera.org:8080/8023 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: Ic0b8c1e50678da66ab1547d16530f88b323ed8c1 Gerrit-Change-Number: 8023 Gerrit-PatchSet: 6 Gerrit-Owner: Michael Ho Gerrit-Reviewer: Dan Hecht Gerrit-Reviewer: Michael Ho Gerrit-Reviewer: Mostafa Mokhtar Gerrit-Reviewer: Sailesh Mukil Gerrit-Comment-Date: Thu, 02 Nov 2017 00:10:49 + Gerrit-HasComments: Yes
[Impala-ASF-CR] IMPALA-4856: Port data stream service to KRPC
Dan Hecht has posted comments on this change. ( http://gerrit.cloudera.org:8080/8023 ) Change subject: IMPALA-4856: Port data stream service to KRPC .. Patch Set 6: (14 comments) Note to self: remaining files: krpc-data-stream-{mgr,recvr}.cc http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/exec-env.cc File be/src/runtime/exec-env.cc: http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/exec-env.cc@89 PS6, Line 89: "Number of datastream service processing threads"); how are these defaults chosen? http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/row-batch.h File be/src/runtime/row-batch.h: http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/row-batch.h@50 PS6, Line 50: outbound I think we should say something about KRPC to at least give that hint. maybe: A KRPC outbound row batch... http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/row-batch.h@60 PS6, Line 60: sizeof(int32_t) sizeof(tuple_offsets_[0]) seems clearer and more robust http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/row-batch.h@354 PS6, Line 354: /// it is ignored. This function does not Reset(). we should preserve this comment when removing the thrift variant. So you could just put the new decl here now so we don't forget that. http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/row-batch.h@424 PS6, Line 424: /// nit: i don't think we generally have all these line breaks between parameter comments. http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/row-batch.h@426 PS6, Line 426: . delete space http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/row-batch.h@444 PS6, Line 444: nput_ delete http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/row-batch.h@447 PS6, Line 447: input_ delete http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/row-batch.h@537 PS6, Line 537: std::string compression_scratch_; this seems like a hack and we could do something simpler, but let's leave it alone for now. http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/row-batch.cc File be/src/runtime/row-batch.cc: http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/row-batch.cc@241 PS6, Line 241: // as sidecars to the RpcController. this comment was probably meant to be deleted? http://gerrit.cloudera.org:8080/#/c/8023/6/common/protobuf/data_stream_service.proto File common/protobuf/data_stream_service.proto: http://gerrit.cloudera.org:8080/#/c/8023/6/common/protobuf/data_stream_service.proto@29 PS6, Line 29: fragment isn't this the id of the instance? The comment in KrpcDataStreamSender is clearer, let's copy that: /// Sender instance id, unique within a fragment. int sender_id_; http://gerrit.cloudera.org:8080/#/c/8023/6/common/protobuf/data_stream_service.proto@59 PS6, Line 59: // Id of this fragment in its role as a sender. same http://gerrit.cloudera.org:8080/#/c/8023/3/common/protobuf/row_batch.proto File common/protobuf/row_batch.proto: http://gerrit.cloudera.org:8080/#/c/8023/3/common/protobuf/row_batch.proto@32 PS3, Line 32: = 2; > That's the tuple data sent as sidecar. Clarified in the new comments. My point is that writing it like 'tuple_data' doesn't make sense since it's not a field in this struct. You should just write: Size of the tuple data (sent as a sidecar) in bytes ... http://gerrit.cloudera.org:8080/#/c/8023/6/common/protobuf/row_batch.proto File common/protobuf/row_batch.proto: http://gerrit.cloudera.org:8080/#/c/8023/6/common/protobuf/row_batch.proto@32 PS6, Line 32: epeated int32 row_tuples = 2; why is this needed? i don't see it used. The size of it is used, though it seems like even that can be inferred from the descriptors. -- To view, visit http://gerrit.cloudera.org:8080/8023 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: Ic0b8c1e50678da66ab1547d16530f88b323ed8c1 Gerrit-Change-Number: 8023 Gerrit-PatchSet: 6 Gerrit-Owner: Michael Ho Gerrit-Reviewer: Dan Hecht Gerrit-Reviewer: Michael Ho Gerrit-Reviewer: Mostafa Mokhtar Gerrit-Reviewer: Sailesh Mukil Gerrit-Comment-Date: Wed, 01 Nov 2017 21:48:21 + Gerrit-HasComments: Yes
[Impala-ASF-CR] IMPALA-4856: Port data stream service to KRPC
Dan Hecht has posted comments on this change. ( http://gerrit.cloudera.org:8080/8023 ) Change subject: IMPALA-4856: Port data stream service to KRPC .. Patch Set 6: (37 comments) Next batch. http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-mgr.h File be/src/runtime/krpc-data-stream-mgr.h: http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-mgr.h@103 PS6, Line 103: processed deserialized? http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-mgr.h@104 PS6, Line 104: respectively what is this "respectively" referring to? http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-mgr.h@249 PS6, Line 249: proto batch unclear what that means, maybe stale comment? And this should say something about the row batch being contained in 'request'. http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-mgr.h@258 PS6, Line 258: the actual row batch isn't that part of "memory pointed to by 'request'"? If so and you want to explicitly mention row batch, maybe say "including the serialized row batch"? http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-mgr.h@294 PS6, Line 294: typedef std::unique_ptr DeserializeWorkItem; How about getting rid of this typedef? The code seems easier to understand if the unique_ptr is visible in the fn decls. it's a bit harder than necessary to reasonable about DeserializeWorkItem& and &&, given that this is now directly a unique_ptr. http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-mgr.h@428 PS6, Line 428: senders a sender http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-recvr.h File be/src/runtime/krpc-data-stream-recvr.h: http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-recvr.h@119 PS6, Line 119: 'row_batch' row batch is deserialized ('row_batch' isn't a variable in this context) http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-sender.h File be/src/runtime/krpc-data-stream-sender.h: http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-sender.h@57 PS6, Line 57: per_channel_buffer_size' is the buffer size in bytes allocated to each channel's : /// accumulated row batch. still not clear what that means. This isn't really the size of a buffer, is it? How about something like: ... is a soft limit on the buffering, in bytes, into the channel's accumulating row batch before it will be sent. http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-sender.h@111 PS6, Line 111: cached protobuf serialized http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-sender.h@130 PS6, Line 130: cached proto outbound row http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-sender.h@133 PS6, Line 133: Two OutboundRowBatch reused across RPCs Maybe say: The outbound row batches are double-buffered. http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-sender.cc File be/src/runtime/krpc-data-stream-sender.cc: http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-sender.cc@67 PS6, Line 67: can it looks like there's a third interface now: SerializeAndSendBatch() that takes an Impala rowbatch. But now that we have the outbound batch on the sender, why not just use that for the RANDOM case and do SerializeBatch() then TransmitData(), so that we can simplify the abstraction? http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-sender.cc@81 PS6, Line 81: TearDown Teardown() http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-sender.cc@82 PS6, Line 82: TearDown Teardown() http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-sender.cc@126 PS6, Line 126: . or if the preceding RPC failed. http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-sender.cc@141 PS6, Line 141: // Flushes any buffered row batches and sends the EOS RPC to close the channel. Returns error status if... Also indicate that it blocks until the EOS RPC is complete. http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-sender.cc@157 PS6, Line 157: below delete. If that's all we need it for, maybe just remember the capacity? http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-sender.cc@166 PS6, Line 166: RuntimeState* runtime_state_ = nullptr; is that actually used? http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-sender.cc@176 PS6, Line 176: current_outbound_batch_ the name of this is confusing because it's so similar to current_batch_idx_, but it means something different (and often the opposite). How about calling this: rpc_outbound_batch_ or rpc_in_flight_batch_? You could even get rid of the r
[Impala-ASF-CR] IMPALA-4856: Port data stream service to KRPC
Dan Hecht has posted comments on this change. ( http://gerrit.cloudera.org:8080/8023 ) Change subject: IMPALA-4856: Port data stream service to KRPC .. Patch Set 6: (5 comments) Some initial comments for this round. http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/exec/kudu-util.h File be/src/exec/kudu-util.h: http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/exec/kudu-util.h@45 PS6, Line 45: status_ nit: use same mangling between this and KUDU_RETURN_IF_ERROR (prepend seems better too since it's not a member of a class and status_ is a comment member name). http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-mgr.h File be/src/runtime/krpc-data-stream-mgr.h: http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-mgr.h@97 PS6, Line 97: two things: : /// process it immediately, add it to a fixed-size 'batch queue' for later processing, : /// or defer processing the RPC if the 'batch-queue' is full i'm confused what the "two" cases are from this comment. Also, I think it's kind of confusing what "processing" means. Should it read something like: two things: deserialize it immediately adding it to the a 'batch queue', or defer deserializing and respond to the RPC later if the 'batch queue' is full. Also, the batch queue isn't really "fixed size", right? http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-recvr.cc File be/src/runtime/krpc-data-stream-recvr.cc: http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-recvr.cc@171 PS6, Line 171: if (!blocked_senders_.empty()) { we talked about this in person, but want to note it so I don't forget: this loop will deque a "random" number of blocked senders, until the first one happens to finish deserializing and shows up in batch_queue_. That seems wrong. http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-recvr.cc@220 PS6, Line 220: DCHECK_GT(num_remaining_senders_, 0); why do we have this DCHECK (i.e. why is this condition important here), and could it be violated with out of order RPCs? http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-recvr.cc@237 PS6, Line 237: recvr_->ExceedsLimit(batch_size) it seems like we should be ORing that with !block_senders_.empty(), or something. Otherwise, stuff that's been in the block_senders_ queue for a while can be passed by new stuff. i.e. block_senders_ senders can get starved -- maybe that explains longer than expected RPC times we've seen in some cases? -- To view, visit http://gerrit.cloudera.org:8080/8023 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: Ic0b8c1e50678da66ab1547d16530f88b323ed8c1 Gerrit-Change-Number: 8023 Gerrit-PatchSet: 6 Gerrit-Owner: Michael Ho Gerrit-Reviewer: Dan Hecht Gerrit-Reviewer: Michael Ho Gerrit-Reviewer: Mostafa Mokhtar Gerrit-Reviewer: Sailesh Mukil Gerrit-Comment-Date: Mon, 30 Oct 2017 19:35:45 + Gerrit-HasComments: Yes
[Impala-ASF-CR] IMPALA-4856: Port data stream service to KRPC
Michael Ho has posted comments on this change. ( http://gerrit.cloudera.org:8080/8023 ) Change subject: IMPALA-4856: Port data stream service to KRPC .. Patch Set 6: (15 comments) http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-sender.cc File be/src/runtime/krpc-data-stream-sender.cc: http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-sender.cc@123 PS6, Line 123: until : // any in-flight RPC completes if the preceding RPC is still in-flight. http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-sender.cc@135 PS6, Line 135: free frees http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-sender.cc@258 PS6, Line 258: stack code http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-sender.cc@261 PS6, Line 261: stack code http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-sender.cc@264 PS6, Line 264: thread threads http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/krpc-data-stream-sender.cc@279 PS6, Line 279: thread threads http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/row-batch.h File be/src/runtime/row-batch.h: http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/row-batch.h@104 PS6, Line 104: CachedProtobufRowBatch OutboundRowBatch http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/row-batch.h@144 PS6, Line 144: /// Populate a row batch from a serialized protobuf input_batch by copying : /// input_batch's tuple_data into the row batch's mempool and converting all : /// offsets in the data back into pointers. stale http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/row-batch.h@455 PS6, Line 455: tuple_offsets input_* http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/runtime/row-batch.h@531 PS6, Line 531: CachedProtobufRowBatch OutboundProtoRowBatch http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/util/network-util.cc File be/src/util/network-util.cc: http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/util/network-util.cc@41 PS6, Line 41: using kudu::Sockaddr; undo. Bad rebase. http://gerrit.cloudera.org:8080/#/c/8023/6/be/src/util/network-util.cc@120 PS6, Line 120: Sockaddr sock; undo http://gerrit.cloudera.org:8080/#/c/8023/6/common/protobuf/data_stream_service.proto File common/protobuf/data_stream_service.proto: http://gerrit.cloudera.org:8080/#/c/8023/6/common/protobuf/data_stream_service.proto@38 PS6, Line 38: 'tuple_offsets' tuple offsets' buffer http://gerrit.cloudera.org:8080/#/c/8023/6/common/protobuf/data_stream_service.proto@43 PS6, Line 43: 'tuple_data' tuple data's buffer http://gerrit.cloudera.org:8080/#/c/8023/6/common/protobuf/row_batch.proto File common/protobuf/row_batch.proto: http://gerrit.cloudera.org:8080/#/c/8023/6/common/protobuf/row_batch.proto@25 PS6, Line 25: The indices of the sidecars are included in the header below. delete -- To view, visit http://gerrit.cloudera.org:8080/8023 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: Ic0b8c1e50678da66ab1547d16530f88b323ed8c1 Gerrit-Change-Number: 8023 Gerrit-PatchSet: 6 Gerrit-Owner: Michael Ho Gerrit-Reviewer: Dan Hecht Gerrit-Reviewer: Michael Ho Gerrit-Reviewer: Mostafa Mokhtar Gerrit-Reviewer: Sailesh Mukil Gerrit-Comment-Date: Fri, 27 Oct 2017 22:41:44 + Gerrit-HasComments: Yes
[Impala-ASF-CR] IMPALA-4856: Port data stream service to KRPC
Michael Ho has posted comments on this change. ( http://gerrit.cloudera.org:8080/8023 ) Change subject: IMPALA-4856: Port data stream service to KRPC .. Patch Set 6: (6 comments) http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-sender.cc File be/src/runtime/krpc-data-stream-sender.cc: http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-sender.cc@75 PS3, Line 75: g two > see comment in row-batch.h about this terminology. Done http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-sender.cc@178 PS3, Line 178: // The two OutboundRowBatch which are re-used across multiple RPCs. Each entry contains : // a RowBatchHeaderPB and the buffers for the serialized tuple offsets and data. When : // one is being used for an in-flight RPC, the execution thread continues to run and : // serializes another row batch into the other entry. 'current_batch_idx_' is the index : // of the entry being used by the in-flight or last completed RPC. : // : // TODO: replace this with an ac > We need to access these fields from the callback (e.g. due to retry). req_ removed in PS5. http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/row-batch.h File be/src/runtime/row-batch.h: http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/row-batch.h@52 PS3, Line 52: class OutboundRowBatch { > ProtoRowBatch is a conceptual representation of a serialized row batch in b Removed in PS5. http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/row-batch.h@89 PS3, Line 89: > Had hard time coming up with a good name to indicate the re-use of the vect Renamed to OutboundRowBatch in PS5. http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/row-batch.h@431 PS3, Line 431: /// : /// 'uncompressed_size': Updated with the uncompressed size of 'tuple_data'. : /// : /// 'is_compressed': Sets to true if compression is applied on 'tuple_data'. : /// False otherwise. : /// : /// Returns error status if serialization failed. Returns OK otherwise. : /// TODO: clean this up once the thrift RPC implementation is removed. : Status Serialize(bool full_dedup, vector* tuple_offsets, string* tuple_data, : int64_t* uncompressed_size, bool* is_compressed); : : /// Shared implementation between thrift and protobuf to deserialize a row batch. : /// : /// 'input_tuple_offsets': an int32_t array of tuples; offsets into 'input_tuple_data'. : /// Used for populating the tuples in the row batch with actual pointers. : /// : /// 'input_tuple_data': contains pointer and size of tuples' data buffer. : /// If 'is_compressed' is true, the data is compressed. : /// : /// 'uncompressed_size': the uncompressed size of 'input_tuple_data' if it's compressed. : /// : /// 'is_compressed': True if 'input_tuple_data' is compressed. : /// : /// TODO: clean this up once the thrift RPC implementation is removed. : void Deserialize(const kudu::Slice& tuple_offsets, const kudu::Slice& tuple_data, : int64_t uncompressed_size, bool is_compressed); : : typedef FixedSizeHashTable DedupMap; : : /// The total size of all data represented in this row batch (tuples and referenced : /// string and collection data). This is the size of the row batch after removing all : /// gaps in the auxiliary and deduplicated tuples (i.e. the smallest footprint for the : /// row batch). If the distinct_tuples argument is non-null, full deduplication is : /// enabled. The distinct_tuples map must be empty. : int64_t TotalByteSize(DedupMap* distinct_tuples); : : void SerializeInternal(int64_t size, DedupMap* distinct_tuples, : vector* tuple_offsets, string* tuple_data); : : /// All members below need to be handled in R > let's leave a TODO about cleaning this all up once we can remove the thrift TODO added. Cannot file a JIRA as apache JIRA is being re-indexed. http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/service/data-stream-service.cc File be/src/service/data-stream-service.cc: http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/service/data-stream-service.cc@64 PS3, Line 64: : : : : : : : : > see comment in row-batch.h. I think we should do this later when we actual D
[Impala-ASF-CR] IMPALA-4856: Port data stream service to KRPC
Hello Sailesh Mukil, Mostafa Mokhtar, Dan Hecht, I'd like you to reexamine a change. Please visit http://gerrit.cloudera.org:8080/8023 to look at the new patch set (#6). Change subject: IMPALA-4856: Port data stream service to KRPC .. IMPALA-4856: Port data stream service to KRPC This patch implements a new data stream service which utilizes KRPC. Similar to the thrift RPC implementation, there are 3 major components to the data stream services: KrpcDataStreamSender serializes and sends row batches materialized by a fragment instance to a KrpcDataStreamRecvr. KrpcDataStreamMgr is responsible for routing an incoming row batch to the appropriate receiver. The data stream service runs on the port FLAGS_krpc_port which is 29000 by default. Unlike the implementation with thrift RPC, KRPC provides an asynchronous interface for invoking remote methods. As a result, KrpcDataStreamSender doesn't need to create a thread per connection. There is one connection between two Impalad nodes for each direction (i.e. client and server). Multiple queries can multi-plex on the same connection for transmitting row batches between two Impalad nodes. The asynchronous interface also prevents some issues with thrift RPC in which a thread may be stuck in an RPC call without being able to check for cancellation. A TransmitData() call with KRPC is in essence a trio of RpcController, a serialized protobuf request buffer and a protobuf response buffer. The call is invoked via a DataStreamService proxy object. The serialized tuple offsets and row batches are sent via "sidecars" in KRPC to avoid extra copy into the serialized request buffer. Each impalad node creates a singleton DataStreamService object at start-up time. All incoming calls are served by a service thread pool created as part of DataStreamService. By default, there are 64 service threads. The service threads are shared across all queries so the RPC handler should avoid blocking as much as possible. In thrift RPC implementation, we make a thrift thread handling a TransmitData() RPC to block for extended period of time when the receiver is not yet created when the call arrives. In KRPC implementation, we store TransmitData() or EndDataStream() requests which arrive before the receiver is ready in a per-receiver early sender list stored in KrpcDataStreamMgr. These RPC calls will be processed and responded to when the receiver is created or when timeout occurs. Similarly, there is limited space in the sender queues in KrpcDataStreamRecvr. If adding a row batch to a queue in KrpcDataStreamRecvr causes the buffer limit to exceed, the request will be stashed in a blocked_sender_ queue to be processed later. The stashed RPC request will not be responded to until it is processed so as to exert back pressure to the client. An alternative would be to reply with an error and the request / row batches need to be sent again. This may end up consuming more network bandwidth than the thrift RPC implementation. This change adopts the behavior of allowing one stashed request per sender. All rpc requests and responses are serialized using protobuf. The equivalent of TRowBatch would be ProtoRowBatch which contains a serialized header about the meta-data of the row batch and two Kudu Slice objects which contain pointers to the actual data (i.e. tuple offsets and tuple data). This patch is based on an abandoned patch by Henry Robinson. TESTING --- * Build passed with FLAGS_use_krpc=true. TO DO - * Port some BE tests to KRPC services. Change-Id: Ic0b8c1e50678da66ab1547d16530f88b323ed8c1 --- M be/src/common/status.cc M be/src/common/status.h M be/src/exec/data-sink.cc M be/src/exec/exchange-node.cc M be/src/exec/kudu-util.h M be/src/rpc/CMakeLists.txt M be/src/rpc/rpc-mgr.cc M be/src/rpc/rpc-mgr.h M be/src/runtime/CMakeLists.txt M be/src/runtime/data-stream-mgr-base.h M be/src/runtime/data-stream-mgr.h M be/src/runtime/data-stream-recvr.h M be/src/runtime/data-stream-sender.h M be/src/runtime/exec-env.cc M be/src/runtime/exec-env.h M be/src/runtime/krpc-data-stream-mgr.cc M be/src/runtime/krpc-data-stream-mgr.h M be/src/runtime/krpc-data-stream-recvr.cc M be/src/runtime/krpc-data-stream-recvr.h A be/src/runtime/krpc-data-stream-sender.cc A be/src/runtime/krpc-data-stream-sender.h M be/src/runtime/row-batch.cc M be/src/runtime/row-batch.h M be/src/service/CMakeLists.txt A be/src/service/data-stream-service.cc A be/src/service/data-stream-service.h M be/src/service/impala-server.cc M be/src/util/network-util.cc M cmake_modules/FindProtobuf.cmake M common/protobuf/CMakeLists.txt A common/protobuf/common.proto A common/protobuf/data_stream_service.proto A common/protobuf/row_batch.proto M common/thrift/generate_error_codes.py 34 files changed, 2,932 insertions(+), 175 deletions(-) git pull ssh://gerrit.cloudera.org:29418/Impala-ASF refs/changes/23/8023/6 -- To view, visit http://gerrit.cloudera.org:8
[Impala-ASF-CR] IMPALA-4856: Port data stream service to KRPC
Hello Sailesh Mukil, Mostafa Mokhtar, Dan Hecht, I'd like you to reexamine a change. Please visit http://gerrit.cloudera.org:8080/8023 to look at the new patch set (#5). Change subject: IMPALA-4856: Port data stream service to KRPC .. IMPALA-4856: Port data stream service to KRPC This patch implements a new data stream service which utilizes KRPC. Similar to the thrift RPC implementation, there are 3 major components to the data stream services: KrpcDataStreamSender serializes and sends row batches materialized by a fragment instance to a KrpcDataStreamRecvr. KrpcDataStreamMgr is responsible for routing an incoming row batch to the appropriate receiver. The data stream service runs on the port FLAGS_krpc_port which is 29000 by default. Unlike the implementation with thrift RPC, KRPC provides an asynchronous interface for invoking remote methods. As a result, KrpcDataStreamSender doesn't need to create a thread per connection. There is one connection between two Impalad nodes for each direction (i.e. client and server). Multiple queries can multi-plex on the same connection for transmitting row batches between two Impalad nodes. The asynchronous interface also prevents some issues with thrift RPC in which a thread may be stuck in an RPC call without being able to check for cancellation. A TransmitData() call with KRPC is in essence a trio of RpcController, a serialized protobuf request buffer and a protobuf response buffer. The call is invoked via a DataStreamService proxy object. The serialized tuple offsets and row batches are sent via "sidecars" in KRPC to avoid extra copy into the serialized request buffer. Each impalad node creates a singleton DataStreamService object at start-up time. All incoming calls are served by a service thread pool created as part of DataStreamService. By default, there are 64 service threads. The service threads are shared across all queries so the RPC handler should avoid blocking as much as possible. In thrift RPC implementation, we make a thrift thread handling a TransmitData() RPC to block for extended period of time when the receiver is not yet created when the call arrives. In KRPC implementation, we store TransmitData() or EndDataStream() requests which arrive before the receiver is ready in a per-receiver early sender list stored in KrpcDataStreamMgr. These RPC calls will be processed and responded to when the receiver is created or when timeout occurs. Similarly, there is limited space in the sender queues in KrpcDataStreamRecvr. If adding a row batch to a queue in KrpcDataStreamRecvr causes the buffer limit to exceed, the request will be stashed in a blocked_sender_ queue to be processed later. The stashed RPC request will not be responded to until it is processed so as to exert back pressure to the client. An alternative would be to reply with an error and the request / row batches need to be sent again. This may end up consuming more network bandwidth than the thrift RPC implementation. This change adopts the behavior of allowing one stashed request per sender. All rpc requests and responses are serialized using protobuf. The equivalent of TRowBatch would be ProtoRowBatch which contains a serialized header about the meta-data of the row batch and two Kudu Slice objects which contain pointers to the actual data (i.e. tuple offsets and tuple data). This patch is based on an abandoned patch by Henry Robinson. TESTING --- * Build passed with FLAGS_use_krpc=true. TO DO - * Port some BE tests to KRPC services. Change-Id: Ic0b8c1e50678da66ab1547d16530f88b323ed8c1 --- M be/src/common/status.cc M be/src/common/status.h M be/src/exec/data-sink.cc M be/src/exec/exchange-node.cc M be/src/exec/kudu-util.h M be/src/rpc/CMakeLists.txt M be/src/rpc/rpc-mgr.cc M be/src/rpc/rpc-mgr.h M be/src/runtime/CMakeLists.txt M be/src/runtime/data-stream-mgr-base.h M be/src/runtime/data-stream-mgr.h M be/src/runtime/data-stream-recvr.h M be/src/runtime/data-stream-sender.h M be/src/runtime/exec-env.cc M be/src/runtime/exec-env.h M be/src/runtime/krpc-data-stream-mgr.cc M be/src/runtime/krpc-data-stream-mgr.h M be/src/runtime/krpc-data-stream-recvr.cc M be/src/runtime/krpc-data-stream-recvr.h A be/src/runtime/krpc-data-stream-sender.cc A be/src/runtime/krpc-data-stream-sender.h M be/src/runtime/row-batch.cc M be/src/runtime/row-batch.h M be/src/service/CMakeLists.txt A be/src/service/data-stream-service.cc A be/src/service/data-stream-service.h M be/src/service/impala-server.cc M be/src/util/network-util.cc M cmake_modules/FindProtobuf.cmake M common/protobuf/CMakeLists.txt A common/protobuf/common.proto A common/protobuf/data_stream_service.proto A common/protobuf/row_batch.proto M common/thrift/generate_error_codes.py 34 files changed, 2,929 insertions(+), 175 deletions(-) git pull ssh://gerrit.cloudera.org:29418/Impala-ASF refs/changes/23/8023/5 -- To view, visit http://gerrit.cloudera.org:8
[Impala-ASF-CR] IMPALA-4856: Port data stream service to KRPC
Michael Ho has posted comments on this change. ( http://gerrit.cloudera.org:8080/8023 ) Change subject: IMPALA-4856: Port data stream service to KRPC .. Patch Set 4: (86 comments) Reply to some of the comments for now. Will look into removing ProtoRowBatch next. Will not rebase until next version of the patch is pushed. http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/common/status.cc File be/src/common/status.cc: http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/common/status.cc@246 PS3, Line 246: void Status::FromProto(const StatusPB& status) { > this is the same as FromThrift() effectively, right? Can we make the two lo Done http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/common/status.cc@262 PS3, Line 262: void Status::FreeMessage() noexcept { > same comment. let's make this and ToThrift look the same so it's obvious th Done http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/exec/exchange-node.cc File be/src/exec/exchange-node.cc: http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/exec/exchange-node.cc@109 PS3, Line 109: RETURN_IF_CANCELLED(state); > why do we do this in some Open() but not all? Should we just do it in ExecN Actually, I noticed similar patterns in other exec nodes. Let me keep this line of change for now and do the refactoring in another change. http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/rpc/rpc-mgr.h File be/src/rpc/rpc-mgr.h: http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/rpc/rpc-mgr.h@154 PS3, Line 154: ~RpcMgr() { > nit: one-liner? Done http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/rpc/rpc-mgr.cc File be/src/rpc/rpc-mgr.cc: http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/rpc/rpc-mgr.cc@77 PS3, Line 77: VLOG_QUERY << "Registered KRPC service: " << service_pool->service_name(); > Should we add a log message stating which services we registered with KRPC? Done http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-mgr.h File be/src/runtime/krpc-data-stream-mgr.h: http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-mgr.h@63 PS3, Line 63: tiple RPCs. The logical connection between a pair of client > I don't think that's accurate. see questions in krpc-data-stream-recvr.h ab Comment rephrased. http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-mgr.h@93 PS3, Line 93: After the first batch has been received, a sender continues to send batches, one > XXX check whether these are really different Rephrased. http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-mgr.h@94 PS3, Line 94: () RPC > what buffer? do you mean queue? Done http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-mgr.h@108 PS3, Line 108: > what does that mean? Is it saying that during unordinary operation, a send It means the fragment instance completes without hitting any error. If a fragment instance ends early, it may end up not calling EOS() RPC. For instance, if there is any cancellation, the stream will just be torn down without sending EOS as it's expected that the receivers' fragments will be cancelled too. http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-mgr.h@140 PS3, Line 140: RPCs, and may be cancell > what are "it" and "its" here? "the sender" and "the RPC's"? the result will be dropped silently by the RPC layer. http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-mgr.h@141 PS3, Line 141: /// time. If an in-flight RPC is cancelled at the sender side, the reply from the receiver > is that still true now that we have cancellation of RPCs? Yup. If an RPC is cancelled before the result arrives, the KRPC code will just ignore the result. http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-mgr.h@153 PS3, Line 153: > sending fragment? Done http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-mgr.h@164 PS3, Line 164: structure is const > is that because the recvr hasn't showed up yet, or because the stream's que both. Comments updated. http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-mgr.h@166 PS3, Line 166: > is that talking about the 'request' field below, or something different? Yes. http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-mgr.h@175 PS3, Line 175: kudu::rpc::RpcContext* rpc_context; > what's the relationship between this and proto_batch? proto_batch is the inbound row_batch populated from information in 'request' and 'rpc_context'. I agree that it's not strictly necessary to keep it in TransmitDataCtx. http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-mgr.h@178 PS3, Line 178: /// such as the destination finst ID, plan node ID and the row batch header. > who owns it? 'context'. Commends added. http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-mgr.h@235 PS3, L
[Impala-ASF-CR] IMPALA-4856: Port data stream service to KRPC
Michael Ho has uploaded a new patch set (#4). ( http://gerrit.cloudera.org:8080/8023 ) Change subject: IMPALA-4856: Port data stream service to KRPC .. IMPALA-4856: Port data stream service to KRPC This patch implements a new data stream service which utilizes KRPC. Similar to the thrift RPC implementation, there are 3 major components to the data stream services: KrpcDataStreamSender serializes and sends row batches materialized by a fragment instance to a KrpcDataStreamRecvr. KrpcDataStreamMgr is responsible for routing an incoming row batch to the appropriate receiver. The data stream service runs on the port FLAGS_krpc_port which is 29000 by default. Unlike the implementation with thrift RPC, KRPC provides an asynchronous interface for invoking remote methods. As a result, KrpcDataStreamSender doesn't need to create a thread per connection. There is one connection between two Impalad nodes for each direction (i.e. client and server). Multiple queries can multi-plex on the same connection for transmitting row batches between two Impalad nodes. The asynchronous interface also prevents some issues with thrift RPC in which a thread may be stuck in an RPC call without being able to check for cancellation. A TransmitData() call with KRPC is in essence a trio of RpcController, a serialized protobuf request buffer and a protobuf response buffer. The call is invoked via a DataStreamService proxy object. The serialized tuple offsets and row batches are sent via "sidecars" in KRPC to avoid extra copy into the serialized request buffer. Each impalad node creates a singleton DataStreamService object at start-up time. All incoming calls are served by a service thread pool created as part of DataStreamService. By default, there are 64 service threads. The service threads are shared across all queries so the RPC handler should avoid blocking as much as possible. In thrift RPC implementation, we make a thrift thread handling a TransmitData() RPC to block for extended period of time when the receiver is not yet created when the call arrives. In KRPC implementation, we store TransmitData() or EndDataStream() requests which arrive before the receiver is ready in a per-receiver early sender list stored in KrpcDataStreamMgr. These RPC calls will be processed and responded to when the receiver is created or when timeout occurs. Similarly, there is limited space in the sender queues in KrpcDataStreamRecvr. If adding a row batch to a queue in KrpcDataStreamRecvr causes the buffer limit to exceed, the request will be stashed in a blocked_sender_ queue to be processed later. The stashed RPC request will not be responded to until it is processed so as to exert back pressure to the client. An alternative would be to reply with an error and the request / row batches need to be sent again. This may end up consuming more network bandwidth than the thrift RPC implementation. This change adopts the behavior of allowing one stashed request per sender. All rpc requests and responses are serialized using protobuf. The equivalent of TRowBatch would be ProtoRowBatch which contains a serialized header about the meta-data of the row batch and two Kudu Slice objects which contain pointers to the actual data (i.e. tuple offsets and tuple data). This patch is based on an abandoned patch by Henry Robinson. TESTING --- * Build passed with FLAGS_use_krpc=true. TO DO - * Port some BE tests to KRPC services. Change-Id: Ic0b8c1e50678da66ab1547d16530f88b323ed8c1 --- M be/src/common/status.cc M be/src/common/status.h M be/src/exec/data-sink.cc M be/src/exec/exchange-node.cc M be/src/exec/kudu-util.h M be/src/rpc/CMakeLists.txt M be/src/rpc/rpc-mgr.cc M be/src/rpc/rpc-mgr.h M be/src/runtime/CMakeLists.txt M be/src/runtime/data-stream-mgr-base.h M be/src/runtime/data-stream-mgr.h M be/src/runtime/data-stream-recvr.h M be/src/runtime/data-stream-sender.h M be/src/runtime/exec-env.cc M be/src/runtime/exec-env.h M be/src/runtime/krpc-data-stream-mgr.cc M be/src/runtime/krpc-data-stream-mgr.h M be/src/runtime/krpc-data-stream-recvr.cc M be/src/runtime/krpc-data-stream-recvr.h A be/src/runtime/krpc-data-stream-sender.cc A be/src/runtime/krpc-data-stream-sender.h M be/src/runtime/row-batch.cc M be/src/runtime/row-batch.h M be/src/service/CMakeLists.txt A be/src/service/data-stream-service.cc A be/src/service/data-stream-service.h M be/src/service/impala-server.cc M be/src/util/network-util.cc M cmake_modules/FindProtobuf.cmake M common/protobuf/CMakeLists.txt A common/protobuf/common.proto A common/protobuf/data_stream_service.proto A common/protobuf/row_batch.proto M common/thrift/generate_error_codes.py 34 files changed, 2,969 insertions(+), 175 deletions(-) git pull ssh://gerrit.cloudera.org:29418/Impala-ASF refs/changes/23/8023/4 -- To view, visit http://gerrit.cloudera.org:8080/8023 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project:
[Impala-ASF-CR] IMPALA-4856: Port data stream service to KRPC
Dan Hecht has posted comments on this change. ( http://gerrit.cloudera.org:8080/8023 ) Change subject: IMPALA-4856: Port data stream service to KRPC .. Patch Set 3: (16 comments) Some more comments, still going though. http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-sender.cc File be/src/runtime/krpc-data-stream-sender.cc: http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-sender.cc@75 PS3, Line 75: cached see comment in row-batch.h about this terminology. http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-sender.cc@89 PS3, Line 89: // safe to free them once the callback has been invoked. I think we should add a reference to KUDU-2011 somewhere here like: Note that due to KUDU-2011, timeout cannot be used with outbound sidecars. The client has no idea when it is safe to reclaim the sidecar buffer (~RpcSidecar() should be the right place, except that's currently called too early). RpcController::Cancel(), however, ensures that the callback is called only after the RPC layer no longer references the sidecar buffer. http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-sender.cc@92 PS3, Line 92: query query? does it mean fragment instance? http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-sender.cc@124 PS3, Line 124: Shutdown the RPC thread is that still accurate? http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-sender.cc@139 PS3, Line 139: int buffer_size_; that could use a comment. http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-sender.cc@141 PS3, Line 141: const TNetworkAddress address_; : TUniqueId fragment_instance_id_; : PlanNodeId dest_node_id_; those could be commented together to say they identify the destination. it's a little odd that plan node id is prefixed "dest" when the others are not. it also seems weird that we need both these and the req_ field since shouldn't they just be stored there? Or seems we should get rid of the req_ and just generate it when sending. http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-sender.cc@159 PS3, Line 159: num_cached_proto_batches_ caps since constant? also, can this ever be something other than 2 without writing the code? i.e. doesn't the code assume this value is 2 in various ways? http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-sender.cc@175 PS3, Line 175: proxy proxy_ http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-sender.cc@197 PS3, Line 197: bool remote_recvr_closed_ = false; why is that needed now? also, shouldn't we do something different, at a higher level, in that case (like cancel this instance)? http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-sender.cc@200 PS3, Line 200: Status rpc_status_; I think it would help to associate this with rpc_in_flight_ - move them adjacent and say that rpc_status_ is valid only when rpc_in_flight_ is false, or something. http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-sender.cc@218 PS3, Line 218: 2 if we have the constant, shouldn't that use it? http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-sender.cc@334 PS3, Line 334: auto pred = [this]() -> bool { return !rpc_in_flight_ || ShouldTerminate(); }; : auto timeout = std::chrono::system_clock::now() + milliseconds(50); : while (!rpc_done_cv_.wait_until(*lock, timeout, pred)) { : timeout = system_clock::now() + milliseconds(50); : } seems simpler to just write: while (rpc_in_flight_ && !ShouldTerminate()) { auto timeout = std::chrono::system_clock::now() + milliseconds(50); rpc_done_cv_.wait_until(*lock, timeout); } or even better to use wait_for() which takes the relative timeout. Or should we use our ConditionVariable wrapper? Especially if we want to start instrumenting these things better. But if it's work to switch it over, it's okay to keep it condition_variable, but let's at least make the code more straight forward. http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/row-batch.h File be/src/runtime/row-batch.h: http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/row-batch.h@52 PS3, Line 52: struct ProtoRowBatch { I think we should get rid of this structure all together. IMO, the indirection just adds confusion. On the receive side, it seems we can just get the header and sidecars directly from the request, which is already threaded through the RPC handler anyway. Pulling it into a ProtoRowBatch just makes it unclear where the not yet deserialized rowbatch comes from. On the send side, I think we should just work directly on CachedProtoRowBatch (but rename that thing, see below). The indirect
[Impala-ASF-CR] IMPALA-4856: Port data stream service to KRPC
Tim Armstrong has posted comments on this change. ( http://gerrit.cloudera.org:8080/8023 ) Change subject: IMPALA-4856: Port data stream service to KRPC .. Patch Set 3: (1 comment) http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-mgr.h File be/src/runtime/krpc-data-stream-mgr.h: http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-mgr.h@414 PS3, Line 414: Status > I think that status is not getting checked by the caller. I thought Tim mad GCC 4.9.2 doesn't support [[nodiscard]] - we need to upgrade GCC to get this in GCC builds. Any clang build (including clang-tidy) will catch this - LMK if it doesn't. -- To view, visit http://gerrit.cloudera.org:8080/8023 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: Ic0b8c1e50678da66ab1547d16530f88b323ed8c1 Gerrit-Change-Number: 8023 Gerrit-PatchSet: 3 Gerrit-Owner: Michael Ho Gerrit-Reviewer: Dan Hecht Gerrit-Reviewer: Michael Ho Gerrit-Reviewer: Mostafa Mokhtar Gerrit-Reviewer: Sailesh Mukil Gerrit-Reviewer: Tim Armstrong Gerrit-Comment-Date: Thu, 12 Oct 2017 21:06:02 + Gerrit-HasComments: Yes
[Impala-ASF-CR] IMPALA-4856: Port data stream service to KRPC
Dan Hecht has posted comments on this change. ( http://gerrit.cloudera.org:8080/8023 ) Change subject: IMPALA-4856: Port data stream service to KRPC .. Patch Set 3: (34 comments) Another batch of comments... http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-mgr.h File be/src/runtime/krpc-data-stream-mgr.h: http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-mgr.h@63 PS3, Line 63: ongoing transmission from a client to a server as a 'stream' I don't think that's accurate. see questions in krpc-data-stream-recvr.h about stream definition. http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-mgr.h@93 PS3, Line 93: process it immediately, add it to a fixed-size 'batch queue' for later processing XXX check whether these are really different http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-mgr.h@164 PS3, Line 164: deferred processing is that because the recvr hasn't showed up yet, or because the stream's queue is full, or both? http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-mgr.h@235 PS3, Line 235: node_id dest_node_id? http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-mgr.h@239 PS3, Line 239: Ownership of the receiver is shared between this DataStream mgr instance and the : /// caller. that seems unnecessary but don't change it now. http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-mgr.h@246 PS3, Line 246: 'proto_batch'? http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-mgr.h@248 PS3, Line 248: . 'request'. Also document what 'response' and 'context' are. http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-mgr.h@266 PS3, Line 266: Notifies the receiver is this an RPC handler? I think we should just be explicit about which of these methods are RPC handlers. http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-mgr.h@267 PS3, Line 267: The RPC what RPC is this talking about? If this is a handler, then it's clear. http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-mgr.h@274 PS3, Line 274: Closes Does it close or cancel? (or is there no difference?) http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-mgr.h@284 PS3, Line 284: RPCs which were buffered To be consistent with terminology used in class comment, maybe say "deferred RPCs" http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-mgr.h@340 PS3, Line 340: ragment instance id, 0 what is that saying? is that a misplaced comma or am I reading this wrong? http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-mgr.h@341 PS3, Line 341: instance id changes I don't understand this. it kinda sounds like we're trying to be able to find all instances of this fragment, but then wouldn't we iterate until the fragment id changes (not the instance id)? http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-mgr.h@349 PS3, Line 349: struct EarlySendersList { hmm, I guess we need this now that we can't block the RPC thread? http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-mgr.h@358 PS3, Line 358: Time Monotonic time http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-mgr.h@374 PS3, Line 374: time monotonic time http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-mgr.h@382 PS3, Line 382: boost::unordered_set closed_stream_cache_; all this parallel startup stuff really needs to be revisited (but not for this change). it's too complex and brittle. http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-mgr.h@386 PS3, Line 386: Deserialize maybe call it DeserializeDeferred() or DeserializeWorker() to make it clearer that this is only for the deferred (slow) path, since the normal path will also have to deserialize (but doesn't use this code). http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-mgr.h@404 PS3, Line 404: void EnqueueRowBatch(DeserializeWorkItem&& payload); how about grouping this with Deferred function above since it's related. Also, I think the name should be less generic. Like maybe EnqueueDeferredBatch() or EnqueueDeferredRpc() (does the response happen before or after this deferred work?) http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-mgr.h@413 PS3, Line 413: block what's that? http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-mgr.h@414 PS3, Line 414: Status I think that status is not getting checked by the caller. I thought Tim made Status warn on unused result -- why is it not catching this? (Or do we still need to annotate each method?). http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-da
[Impala-ASF-CR] IMPALA-4856: Port data stream service to KRPC
Sailesh Mukil has posted comments on this change. ( http://gerrit.cloudera.org:8080/8023 ) Change subject: IMPALA-4856: Port data stream service to KRPC .. Patch Set 3: (23 comments) http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/rpc/rpc-mgr.cc File be/src/rpc/rpc-mgr.cc: http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/rpc/rpc-mgr.cc@77 PS3, Line 77: Should we add a log message stating which services we registered with KRPC? It might be useful later on as we add more services, while trying to debug user issues to know which services are on KRPC and which are on thrift. Granted there are other ways to find that, but this is easily accessible and straightforward. http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-mgr.h File be/src/runtime/krpc-data-stream-mgr.h: http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-mgr.h@314 PS3, Line 314: w susper-nit: Capital 'W' http://gerrit.cloudera.org:8080/#/c/8023/1/be/src/runtime/krpc-data-stream-mgr.cc File be/src/runtime/krpc-data-stream-mgr.cc: http://gerrit.cloudera.org:8080/#/c/8023/1/be/src/runtime/krpc-data-stream-mgr.cc@198 PS1, Line 198: deseria > Deserialization pool's purpose is to avoid executing deserialization in lin Hmm, this seems like it would be a nice thing to have. Is the absence of a MemTracker the only hindrance to early deserialization? Is there some way we could add this to the process MemTracker if we can't attribute it to a query? If it's too complicated for now, let's track this with a JIRA and write down some ideas there for the next KRPC milestone. http://gerrit.cloudera.org:8080/#/c/8023/1/be/src/runtime/krpc-data-stream-mgr.cc@227 PS1, Line 227: // to make sure that the close operation is performed so add to per-recvr list of : // pending closes. It's possible for a sender to issue EOS RPC without sending any : // rows if no rows are m > This may be a bit subtle but this is equivalent to the logic in the non-KRP Ah you're right. Case 2 is what changed in IMPALA-5199, but it looks like that's automatically fixed here. http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-mgr.cc File be/src/runtime/krpc-data-stream-mgr.cc: http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-mgr.cc@112 PS3, Line 112: for (unique_ptr& ctx : early_senders.waiting_sender_ctxs) { : EnqueueRowBatch({recvr->fragment_instance_id(), move(ctx)}); : num_senders_waiting_->Increment(-1); : } : for (unique_ptr& ctx : early_senders.closed_sender_ctxs) { : recvr->RemoveSender(ctx->request->sender_id()); : Status::OK().ToProto(ctx->response->mutable_status()); : ctx->context->RespondSuccess(); : num_senders_waiting_->Increment(-1); : } It's not possible for the same sender to be in waiting_senders_ctxs and closed_sender_ctxs for a given receiver right? Because if it would, it would make more sense to service the 'closed_sender_ctxs' before the 'waiting_sender_ctxs' since we may as well close the receiver instead of wasting CPU processing those RPCs for a while. http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-mgr.cc@140 PS3, Line 140: while (range.first != range.second) { : shared_ptr recvr = range.first->second; : if (recvr->fragment_instance_id() == finst_id && : recvr->dest_node_id() == node_id) { : return recvr; : } : ++range.first; : } I'm thinking it makes sense to prioritize finding the receiver with the assumption that we will find it in the receiver_map_, rather than assume that it most likely will already be unregistered. In other words, I think it may be more beneficial CPU-wise for general workloads to look in the 'receiver_map_' before looking into the 'closed_stream_cache_'. What do you think? http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-mgr.cc@151 PS3, Line 151: KrpcDataStreamMgr::AddEarlySender We could merge the implementations of AddEarlySender() and AddEarlyClosedSender() by using templates and some extra params, but maybe the code complexity isn't worth it. http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-mgr.cc@174 PS3, Line 174: // that it is still preparing, so add payload to per-receiver list. Add comment "In the worst case, this RPC is so late that the receiver is already unregistered and removed from the closed_stream_cache_, in which case it will be responded to by the Maintenance thread after FLAGS_datastream_sender_timeout_ms." http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-mgr.cc@242 PS3, Line 242: {
[Impala-ASF-CR] IMPALA-4856: Port data stream service to KRPC
Mostafa Mokhtar has posted comments on this change. ( http://gerrit.cloudera.org:8080/8023 ) Change subject: IMPALA-4856: Port data stream service to KRPC .. Patch Set 3: (1 comment) http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-sender.cc File be/src/runtime/krpc-data-stream-sender.cc: http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-sender.cc@644 PS3, Line 644: for (int i = 0; i < channels_.size(); ++i) { The RowBatch is serialized once per channel which is very wasteful. IMPALA-6041. Compare to https://github.com/michaelhkw/incubator-impala/blob/krpc-testing-hung/be/src/runtime/data-stream-sender.cc#L429 -- To view, visit http://gerrit.cloudera.org:8080/8023 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: Ic0b8c1e50678da66ab1547d16530f88b323ed8c1 Gerrit-Change-Number: 8023 Gerrit-PatchSet: 3 Gerrit-Owner: Michael Ho Gerrit-Reviewer: Dan Hecht Gerrit-Reviewer: Michael Ho Gerrit-Reviewer: Mostafa Mokhtar Gerrit-Reviewer: Sailesh Mukil Gerrit-Comment-Date: Wed, 11 Oct 2017 19:14:14 + Gerrit-HasComments: Yes
[Impala-ASF-CR] IMPALA-4856: Port data stream service to KRPC
Dan Hecht has posted comments on this change. ( http://gerrit.cloudera.org:8080/8023 ) Change subject: IMPALA-4856: Port data stream service to KRPC .. Patch Set 3: (17 comments) Some initial comments. http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/common/status.cc File be/src/common/status.cc: http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/common/status.cc@246 PS3, Line 246: } this is the same as FromThrift() effectively, right? Can we make the two look the same to make that obvious? http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/common/status.cc@262 PS3, Line 262: same comment. let's make this and ToThrift look the same so it's obvious they do the same things. nit: also, could we order the functions consistently? We currently have ToThrift, FromThrift, FromProto, ToProto, and that ordering just makes it slightly slower to read through since it doesn't follow a pattern. http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/exec/exchange-node.cc File be/src/exec/exchange-node.cc: http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/exec/exchange-node.cc@109 PS3, Line 109: RETURN_IF_CANCELLED(state); why do we do this in some Open() but not all? Should we just do it in ExecNode::Open() and remove the ones in the derived classes? okay to do separately from this patch. http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/rpc/rpc-mgr.h File be/src/rpc/rpc-mgr.h: http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/rpc/rpc-mgr.h@154 PS3, Line 154: } nit: one-liner? http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-mgr.h File be/src/runtime/krpc-data-stream-mgr.h: http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-mgr.h@94 PS3, Line 94: buffer what buffer? do you mean queue? http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-mgr.h@108 PS3, Line 108: During ordinary operation what does that mean? Is it saying that during unordinary operation, a sender can have both a TransmitData() and EndDataStream() call in-flight simultaneously? http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-mgr.h@140 PS3, Line 140: it will quietly drop its what are "it" and "its" here? "the sender" and "the RPC's"? http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-mgr.h@141 PS3, Line 141: /// it returns. is that still true now that we have cancellation of RPCs? http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-mgr.h@153 PS3, Line 153: fragment sending fragment? http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-mgr.h@166 PS3, Line 166: request is that talking about the 'request' field below, or something different? http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-mgr.h@175 PS3, Line 175: const TransmitDataRequestPB* request; what's the relationship between this and proto_batch? http://gerrit.cloudera.org:8080/#/c/8023/3/be/src/runtime/krpc-data-stream-mgr.h@178 PS3, Line 178: /// responded to. who owns it? http://gerrit.cloudera.org:8080/#/c/8023/3/common/protobuf/data_stream_service.proto File common/protobuf/data_stream_service.proto: http://gerrit.cloudera.org:8080/#/c/8023/3/common/protobuf/data_stream_service.proto@29 PS3, Line 29: optional int32 sender_id = 2; : : optional int32 dest_node_id = 3; what are "IDs" in these cases? let's improve the documentation here. Especially since type is no longer PlanNodeId (and why is that?). http://gerrit.cloudera.org:8080/#/c/8023/3/common/protobuf/row_batch.proto File common/protobuf/row_batch.proto: http://gerrit.cloudera.org:8080/#/c/8023/3/common/protobuf/row_batch.proto@30 PS3, Line 30: int32 in thrift we had TTupleId. Is there a reason we aren't defining those types as well to make the structure clearer? http://gerrit.cloudera.org:8080/#/c/8023/3/common/protobuf/row_batch.proto@32 PS3, Line 32: tuple_data what's tuple_data? not a field in this structure... http://gerrit.cloudera.org:8080/#/c/8023/3/common/protobuf/row_batch.proto@39 PS3, Line 39: Size size of what? http://gerrit.cloudera.org:8080/#/c/8023/3/common/protobuf/row_batch.proto@42 PS3, Line 42: (TODO(KRPC): native enum) do we plan to fix that? -- To view, visit http://gerrit.cloudera.org:8080/8023 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: Ic0b8c1e50678da66ab1547d16530f88b323ed8c1 Gerrit-Change-Number: 8023 Gerrit-PatchSet: 3 Gerrit-Owner: Michael Ho Gerrit-Reviewer: Dan Hecht Gerrit-Reviewer: Michael Ho Gerrit-Reviewer: Sailesh Mukil Gerrit-Comment-Date: Tue, 10 Oct 2017 20:15:20 + Gerrit-HasComments: Yes
[Impala-ASF-CR] IMPALA-4856: Port data stream service to KRPC
Michael Ho has posted comments on this change. ( http://gerrit.cloudera.org:8080/8023 ) Change subject: IMPALA-4856: Port data stream service to KRPC .. Patch Set 3: (3 comments) http://gerrit.cloudera.org:8080/#/c/8023/2/be/src/runtime/krpc-data-stream-sender.cc File be/src/runtime/krpc-data-stream-sender.cc: http://gerrit.cloudera.org:8080/#/c/8023/2/be/src/runtime/krpc-data-stream-sender.cc@103 PS2, Line 103: fragment_instance_id_(fragment_instance_id), > address_(destination) Done http://gerrit.cloudera.org:8080/#/c/8023/2/be/src/runtime/krpc-data-stream-sender.cc@141 PS2, Line 141: const TNetworkAddress add > const TNetworkAddress address_; Done http://gerrit.cloudera.org:8080/#/c/8023/2/be/src/runtime/krpc-data-stream-sender.cc@485 PS2, Line 485: batch_->CommitLastRow(); : return Status::OK(); : } : : void > This is broken if the RPC was rejected for FLAGS_backend_client_connection_ Changed to using async RPC for this in the new patch. This ensures that we can check for cancellation while waiting for replies from the remote server. -- To view, visit http://gerrit.cloudera.org:8080/8023 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: Ic0b8c1e50678da66ab1547d16530f88b323ed8c1 Gerrit-Change-Number: 8023 Gerrit-PatchSet: 3 Gerrit-Owner: Michael Ho Gerrit-Reviewer: Michael Ho Gerrit-Reviewer: Sailesh Mukil Gerrit-Comment-Date: Mon, 09 Oct 2017 18:10:45 + Gerrit-HasComments: Yes
[Impala-ASF-CR] IMPALA-4856: Port data stream service to KRPC
Hello Sailesh Mukil, I'd like you to reexamine a change. Please visit http://gerrit.cloudera.org:8080/8023 to look at the new patch set (#3). Change subject: IMPALA-4856: Port data stream service to KRPC .. IMPALA-4856: Port data stream service to KRPC This patch implements a new data stream service which utilizes KRPC. Similar to the thrift RPC implementation, there are 3 major components to the data stream services: KrpcDataStreamSender serializes and sends row batches materialized by a fragment instance to a KrpcDataStreamRecvr. KrpcDataStreamMgr is responsible for routing an incoming row batch to the appropriate receiver. The data stream service runs on the port FLAGS_krpc_port which is 29000 by default. Unlike the implementation with thrift RPC, KRPC provides an asynchronous interface for invoking remote methods. As a result, KrpcDataStreamSender doesn't need to create a thread per connection. There is one connection between two Impalad nodes for each direction (i.e. client and server). Multiple queries can multi-plex on the same connection for transmitting row batches between two Impalad nodes. The asynchronous interface also prevents some issues with thrift RPC in which a thread may be stuck in an RPC call without being able to check for cancellation. A TransmitData() call with KRPC is in essence a trio of RpcController, a serialized protobuf request buffer and a protobuf response buffer. The call is invoked via a DataStreamService proxy object. The serialized tuple offsets and row batches are sent via "sidecars" in KRPC to avoid extra copy into the serialized request buffer. Each impalad node creates a singleton DataStreamService object at start-up time. All incoming calls are served by a service thread pool created as part of DataStreamService. By default, there are 64 service threads. The service threads are shared across all queries so the RPC handler should avoid blocking as much as possible. In thrift RPC implementation, we make a thrift thread handling a TransmitData() RPC to block for extended period of time when the receiver is not yet created when the call arrives. In KRPC implementation, we store TransmitData() or EndDataStream() requests which arrive before the receiver is ready in a per-receiver early sender list stored in KrpcDataStreamMgr. These RPC calls will be processed and responded to when the receiver is created or when timeout occurs. Similarly, there is limited space in the sender queues in KrpcDataStreamRecvr. If adding a row batch to a queue in KrpcDataStreamRecvr causes the buffer limit to exceed, the request will be stashed in a blocked_sender_ queue to be processed later. The stashed RPC request will not be responded to until it is processed so as to exert back pressure to the client. An alternative would be to reply with an error and the request / row batches need to be sent again. This may end up consuming more network bandwidth than the thrift RPC implementation. This change adopts the behavior of allowing one stashed request per sender. All rpc requests and responses are serialized using protobuf. The equivalent of TRowBatch would be ProtoRowBatch which contains a serialized header about the meta-data of the row batch and two Kudu Slice objects which contain pointers to the actual data (i.e. tuple offsets and tuple data). This patch is based on an abandoned patch by Henry Robinson. TESTING --- * Build passed with FLAGS_use_krpc=true. TO DO - * Port some BE tests to KRPC services. Change-Id: Ic0b8c1e50678da66ab1547d16530f88b323ed8c1 --- M be/src/common/status.cc M be/src/common/status.h M be/src/exec/data-sink.cc M be/src/exec/exchange-node.cc M be/src/rpc/CMakeLists.txt M be/src/rpc/rpc-mgr.cc M be/src/rpc/rpc-mgr.h M be/src/runtime/CMakeLists.txt M be/src/runtime/data-stream-mgr-base.h M be/src/runtime/data-stream-mgr.h M be/src/runtime/data-stream-recvr.h M be/src/runtime/exec-env.cc M be/src/runtime/exec-env.h M be/src/runtime/krpc-data-stream-mgr.cc M be/src/runtime/krpc-data-stream-mgr.h M be/src/runtime/krpc-data-stream-recvr.cc M be/src/runtime/krpc-data-stream-recvr.h A be/src/runtime/krpc-data-stream-sender.cc A be/src/runtime/krpc-data-stream-sender.h M be/src/runtime/row-batch.cc M be/src/runtime/row-batch.h M be/src/service/CMakeLists.txt A be/src/service/data-stream-service.cc A be/src/service/data-stream-service.h M be/src/service/impala-server.cc M be/src/util/network-util.cc M cmake_modules/FindProtobuf.cmake M common/protobuf/CMakeLists.txt A common/protobuf/common.proto A common/protobuf/data_stream_service.proto A common/protobuf/row_batch.proto M common/thrift/generate_error_codes.py 32 files changed, 2,867 insertions(+), 166 deletions(-) git pull ssh://gerrit.cloudera.org:29418/Impala-ASF refs/changes/23/8023/3 -- To view, visit http://gerrit.cloudera.org:8080/8023 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Imp
[Impala-ASF-CR] IMPALA-4856: Port data stream service to KRPC
Michael Ho has posted comments on this change. ( http://gerrit.cloudera.org:8080/8023 ) Change subject: IMPALA-4856: Port data stream service to KRPC .. Patch Set 2: (1 comment) http://gerrit.cloudera.org:8080/#/c/8023/2/be/src/runtime/krpc-data-stream-recvr.cc File be/src/runtime/krpc-data-stream-recvr.cc: http://gerrit.cloudera.org:8080/#/c/8023/2/be/src/runtime/krpc-data-stream-recvr.cc@141 PS2, Line 141: while (true) { : // wait until something shows up or we know we're done : while (!is_cancelled_ && batch_queue_.empty() && blocked_senders_.empty() : && num_remaining_senders_ > 0) { : VLOG_ROW << "wait arrival fragment_instance_id=" << recvr_->fragment_instance_id() :<< " node=" << recvr_->dest_node_id(); : // Don't count time spent waiting on the sender as active time. : CANCEL_SAFE_SCOPED_TIMER(recvr_->data_arrival_timer_, &is_cancelled_); : CANCEL_SAFE_SCOPED_TIMER(recvr_->inactive_timer_, &is_cancelled_); : CANCEL_SAFE_SCOPED_TIMER( : received_first_batch_ ? nullptr : recvr_->first_batch_wait_total_timer_, : &is_cancelled_); : data_arrival_cv_.wait(l); : } : : if (is_cancelled_) return Status::CANCELLED; : : if (blocked_senders_.empty() && batch_queue_.empty()) { : DCHECK_EQ(num_remaining_senders_, 0); : return Status::OK(); : } : : received_first_batch_ = true; : : // Either we'll consume a row batch from batch_queue_, or it's empty. In either case, : // take a blocked sender and retry delivering their batch. There is a window between : // which a deferred batch is dequeued from blocked_senders_ queue and when it's : // inserted into batch_queue_. However, a receiver won't respond to the sender until : // the deferred row batch has been inserted. The sender will wait for all in-flight : // RPCs to complete before sending EOS RPC so num_remaining_senders_ should be > 0. : if (!blocked_senders_.empty()) { : recvr_->mgr_->EnqueueRowBatch( : {recvr_->fragment_instance_id(), move(blocked_senders_.front())}); : blocked_senders_.pop(); : } : : if (!batch_queue_.empty()) { : RowBatch* result = batch_queue_.front().second; : recvr_->num_buffered_bytes_.Add(-batch_queue_.front().first); : VLOG_ROW << "fetched #rows=" << result->num_rows(); : current_batch_.reset(result); : *next_batch = current_batch_.get(); : batch_queue_.pop_front(); : return Status::OK(); : } > This loop may lead to live lock in the rare case in which blocked_senders_ Actually, mis-read the thing in the heat of debugging. If both queues are empty, we may return early in line 160 above if num_remaining_senders == 0. So, we shouldn't spin forever. Otherwise, the thread should sleep and wait in line 153. This loop tends to have the unfortunate behavior of popping all entries off blocked_senders_ first before dropping the lock and sleeping on line 153. Although there is a window in which both queues are empty when a row batch is deserialized and moved from blocked_senders_ to batch_queue_, it should be impossible for num_remaining_senders_ to reach 0 in that window. The reason is that the sender of that row batch will not be responded to until after the row batch has been inserted into batch_queue_ (after it has been popped from blocked_senders_). In which case, batch_queue_ will become non-empty first before the remote sender gets a reply. -- To view, visit http://gerrit.cloudera.org:8080/8023 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: Ic0b8c1e50678da66ab1547d16530f88b323ed8c1 Gerrit-Change-Number: 8023 Gerrit-PatchSet: 2 Gerrit-Owner: Michael Ho Gerrit-Reviewer: Michael Ho Gerrit-Reviewer: Sailesh Mukil Gerrit-Comment-Date: Fri, 06 Oct 2017 22:03:51 + Gerrit-HasComments: Yes
[Impala-ASF-CR] IMPALA-4856: Port data stream service to KRPC
Michael Ho has posted comments on this change. ( http://gerrit.cloudera.org:8080/8023 ) Change subject: IMPALA-4856: Port data stream service to KRPC .. Patch Set 2: (2 comments) http://gerrit.cloudera.org:8080/#/c/8023/2/be/src/runtime/krpc-data-stream-recvr.cc File be/src/runtime/krpc-data-stream-recvr.cc: http://gerrit.cloudera.org:8080/#/c/8023/2/be/src/runtime/krpc-data-stream-recvr.cc@141 PS2, Line 141: while (true) { : // wait until something shows up or we know we're done : while (!is_cancelled_ && batch_queue_.empty() && blocked_senders_.empty() : && num_remaining_senders_ > 0) { : VLOG_ROW << "wait arrival fragment_instance_id=" << recvr_->fragment_instance_id() :<< " node=" << recvr_->dest_node_id(); : // Don't count time spent waiting on the sender as active time. : CANCEL_SAFE_SCOPED_TIMER(recvr_->data_arrival_timer_, &is_cancelled_); : CANCEL_SAFE_SCOPED_TIMER(recvr_->inactive_timer_, &is_cancelled_); : CANCEL_SAFE_SCOPED_TIMER( : received_first_batch_ ? nullptr : recvr_->first_batch_wait_total_timer_, : &is_cancelled_); : data_arrival_cv_.wait(l); : } : : if (is_cancelled_) return Status::CANCELLED; : : if (blocked_senders_.empty() && batch_queue_.empty()) { : DCHECK_EQ(num_remaining_senders_, 0); : return Status::OK(); : } : : received_first_batch_ = true; : : // Either we'll consume a row batch from batch_queue_, or it's empty. In either case, : // take a blocked sender and retry delivering their batch. There is a window between : // which a deferred batch is dequeued from blocked_senders_ queue and when it's : // inserted into batch_queue_. However, a receiver won't respond to the sender until : // the deferred row batch has been inserted. The sender will wait for all in-flight : // RPCs to complete before sending EOS RPC so num_remaining_senders_ should be > 0. : if (!blocked_senders_.empty()) { : recvr_->mgr_->EnqueueRowBatch( : {recvr_->fragment_instance_id(), move(blocked_senders_.front())}); : blocked_senders_.pop(); : } : : if (!batch_queue_.empty()) { : RowBatch* result = batch_queue_.front().second; : recvr_->num_buffered_bytes_.Add(-batch_queue_.front().first); : VLOG_ROW << "fetched #rows=" << result->num_rows(); : current_batch_.reset(result); : *next_batch = current_batch_.get(); : batch_queue_.pop_front(); : return Status::OK(); : } This loop may lead to live lock in the rare case in which blocked_senders_ and batch_queue_ are both empty in the window in which the deserialization threads are still working on inserting the row batches in blocked_senders_ into batch_queue_ after batch_queue_ has been exhausted. Spinning here forever will not drop the lock, causing the deserialization threads to wait forever to insert into batch_queue_. http://gerrit.cloudera.org:8080/#/c/8023/2/be/src/runtime/krpc-data-stream-sender.cc File be/src/runtime/krpc-data-stream-sender.cc: http://gerrit.cloudera.org:8080/#/c/8023/2/be/src/runtime/krpc-data-stream-sender.cc@485 PS2, Line 485: // Sleep for sometime before retrying. : if (RpcMgr::IsServerTooBusy(rpc_controller_)) { : SleepForMs(FLAGS_rpc_retry_interval_ms); : continue; : } This is broken if the RPC was rejected for FLAGS_backend_client_connection_num_retries number of times in a row. In which case, we will break out of the loop and return Status::OK(). This can lead to remote receiver hanging forever as it still thinks there are still active senders. -- To view, visit http://gerrit.cloudera.org:8080/8023 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: Ic0b8c1e50678da66ab1547d16530f88b323ed8c1 Gerrit-Change-Number: 8023 Gerrit-PatchSet: 2 Gerrit-Owner: Michael Ho Gerrit-Reviewer: Michael Ho Gerrit-Reviewer: Sailesh Mukil Gerrit-Comment-Date: Fri, 06 Oct 2017 17:58:27 + Gerrit-HasComments: Yes
[Impala-ASF-CR] IMPALA-4856: Port data stream service to KRPC
Michael Ho has posted comments on this change. ( http://gerrit.cloudera.org:8080/8023 ) Change subject: IMPALA-4856: Port data stream service to KRPC .. Patch Set 2: (2 comments) http://gerrit.cloudera.org:8080/#/c/8023/2/be/src/runtime/krpc-data-stream-sender.cc File be/src/runtime/krpc-data-stream-sender.cc: http://gerrit.cloudera.org:8080/#/c/8023/2/be/src/runtime/krpc-data-stream-sender.cc@103 PS2, Line 103: address_(MakeNetworkAddress(destination.hostname, destination.port)), address_(destination) http://gerrit.cloudera.org:8080/#/c/8023/2/be/src/runtime/krpc-data-stream-sender.cc@141 PS2, Line 141: TNetworkAddress address_; const TNetworkAddress address_; -- To view, visit http://gerrit.cloudera.org:8080/8023 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: Ic0b8c1e50678da66ab1547d16530f88b323ed8c1 Gerrit-Change-Number: 8023 Gerrit-PatchSet: 2 Gerrit-Owner: Michael Ho Gerrit-Reviewer: Michael Ho Gerrit-Reviewer: Sailesh Mukil Gerrit-Comment-Date: Fri, 29 Sep 2017 17:49:44 + Gerrit-HasComments: Yes
[Impala-ASF-CR] IMPALA-4856: Port data stream service to KRPC
Michael Ho has posted comments on this change. ( http://gerrit.cloudera.org:8080/8023 ) Change subject: IMPALA-4856: Port data stream service to KRPC .. Patch Set 1: (20 comments) http://gerrit.cloudera.org:8080/#/c/8023/1/be/src/runtime/krpc-data-stream-mgr.h File be/src/runtime/krpc-data-stream-mgr.h: http://gerrit.cloudera.org:8080/#/c/8023/1/be/src/runtime/krpc-data-stream-mgr.h@110 PS1, Line 110: sent > nit: received Done http://gerrit.cloudera.org:8080/#/c/8023/1/be/src/runtime/krpc-data-stream-mgr.h@110 PS1, Line 110: no TransmitData() RPCs will successfully deliver their : /// payload. > Why would there be a TransmitData() RPC if EndDataStream() has already been It's the expectation that the sender will not send any TransmitData() RPC after EndDataStream() RPC. Comments updated. http://gerrit.cloudera.org:8080/#/c/8023/1/be/src/runtime/krpc-data-stream-mgr.h@129 PS1, Line 129: /// In exceptional circumstances, the data stream manager will garbage-collect the closed Comments added. > sending its last batch is bounded. That seems possible to solve > with sender-side state if the receiver notifies the sender that the > receiver was not present and the sender can infer it was closed > cleanly. Not sure I followed the proposed solution. The sender can track whether it has ever successfully sent a row batch to the receiver. However, in the example above, instance 2 of F3 has never sent a row batch to the receiver before it hits the limit and closes. In which case, it's not clear how the sender can differentiate between an early sender case (i.e. the receiver is still being prepared) vs a closed receiver. It seems a more fool-proof solution is for coordinator to notify all backend nodes about completed/aborted/cancelled queries and that seems to be the absolutely safe point to remove closed stream entries. Alternately, we can use statestore update to broadcast this information and make the maintenance thread in DataStreamMgr remove the receiver entries based on the updates. http://gerrit.cloudera.org:8080/#/c/8023/1/be/src/runtime/krpc-data-stream-mgr.h@133 PS1, Line 133: /// period expires. > As per Tim's comment above, I would also reference IMPALA-3990 as a TODO he Done http://gerrit.cloudera.org:8080/#/c/8023/1/be/src/runtime/krpc-data-stream-mgr.h@159 PS1, Line 159: Consider tracking, on the sender, whether a batch has been successfully sent or : /// not. That's enough state to realise that a receiver has failed (rather than not : /// prepared yet), and the data stream mgr can use that to fail an RPC fast, rather than : /// having the closed-stream list. > It would be nice to have a JIRA for this and reference it here. Actually, now that I think about it, this idea may not work due to examples such as IMPALA-3990 above. The problem is that the receiver may have been closed (legitimately) even before a particular sender managed to send a batch to it. In which case, it would falsely assume that the receiver has failed. Similarly, if no rows were ever materialized from the sender side, we still need to closed-stream cache to differentiate between the closed receiver vs receiver which is still being prepared. http://gerrit.cloudera.org:8080/#/c/8023/1/be/src/runtime/krpc-data-stream-mgr.h@353 PS1, Line 353: waiting_senders > This is a little confusing to follow in the .cc file, since when I see "wai Done http://gerrit.cloudera.org:8080/#/c/8023/1/be/src/runtime/krpc-data-stream-mgr.h@356 PS1, Line 356: closed_senders > Similarly, we could call this 'closed_senders_ctxs'. Done http://gerrit.cloudera.org:8080/#/c/8023/1/be/src/runtime/krpc-data-stream-mgr.cc File be/src/runtime/krpc-data-stream-mgr.cc: http://gerrit.cloudera.org:8080/#/c/8023/1/be/src/runtime/krpc-data-stream-mgr.cc@168 PS1, Line 168: num_senders_waiting_->Increment(1); : total_senders_waited_->Increment(1); : RecvrId recvr_id = make_pair(fragment_instance_id, request->dest_node_id()); : auto payload = : make_unique(proto_batch, context, request, response); : early_senders_map_[recvr_id].waiting_senders.push_back(move(payload)); > I'm wondering if it makes sense to add simple inline functions that encapsu I don't find it too unreadable being inline but I guess it's less distracting if the logic is encapsulated in a function http://gerrit.cloudera.org:8080/#/c/8023/1/be/src/runtime/krpc-data-stream-mgr.cc@198 PS1, Line 198: AddData > Isn't the point of the deserialize pool to deserialize the payload early? Deserialization pool's purpose is to avoid executing deserialization in line in the main thread for early or blocked senders. For instances, if there are multiple row batches (from multiple early senders) for a given receiver, the deserialization thread c
[Impala-ASF-CR] IMPALA-4856: Port data stream service to KRPC
Hello Sailesh Mukil, I'd like you to reexamine a change. Please visit http://gerrit.cloudera.org:8080/8023 to look at the new patch set (#2). Change subject: IMPALA-4856: Port data stream service to KRPC .. IMPALA-4856: Port data stream service to KRPC This patch implements a new data stream service which utilizes KRPC. Similar to the thrift RPC implementation, there are 3 major components to the data stream services: KrpcDataStreamSender serializes and sends row batches materialized by a fragment instance to a KrpcDataStreamRecvr. KrpcDataStreamMgr is responsible for routing an incoming row batch to the appropriate receiver. The data stream service runs on the port FLAGS_krpc_port which is 29000 by default. Unlike the implementation with thrift RPC, KRPC provides an asynchronous interface for invoking remote methods. As a result, KrpcDataStreamSender doesn't need to create a thread per connection. There is one connection between two Impalad nodes for each direction (i.e. client and server). Multiple queries can multi-plex on the same connection for transmitting row batches between two Impalad nodes. The asynchronous interface also prevents some issues with thrift RPC in which a thread may be stuck in an RPC call without being able to check for cancellation. A TransmitData() call with KRPC is in essence a trio of RpcController, a serialized protobuf request buffer and a protobuf response buffer. The call is invoked via a DataStreamService proxy object. The serialized tuple offsets and row batches are sent via "sidecars" in KRPC to avoid extra copy into the serialized request buffer. Each impalad node creates a singleton DataStreamService object at start-up time. All incoming calls are served by a service thread pool created as part of DataStreamService. By default, there are 64 service threads. The service threads are shared across all queries so the RPC handler should avoid blocking as much as possible. In thrift RPC implementation, we make a thrift thread handling a TransmitData() RPC to block for extended period of time when the receiver is not yet created when the call arrives. In KRPC implementation, we store TransmitData() or EndDataStream() requests which arrive before the receiver is ready in a per-receiver early sender list stored in KrpcDataStreamMgr. These RPC calls will be processed and responded to when the receiver is created or when timeout occurs. Similarly, there is limited space in the sender queues in KrpcDataStreamRecvr. If adding a row batch to a queue in KrpcDataStreamRecvr causes the buffer limit to exceed, the request will be stashed in a blocked_sender_ queue to be processed later. The stashed RPC request will not be responded to until it is processed so as to exert back pressure to the client. An alternative would be to reply with an error and the request / row batches need to be sent again. This may end up consuming more network bandwidth than the thrift RPC implementation. This change adopts the behavior of allowing one stashed request per sender. All rpc requests and responses are serialized using protobuf. The equivalent of TRowBatch would be ProtoRowBatch which contains a serialized header about the meta-data of the row batch and two Kudu Slice objects which contain pointers to the actual data (i.e. tuple offsets and tuple data). This patch is based on an abandoned patch by Henry Robinson. TESTING --- * Build passed with FLAGS_use_krpc=true. TO DO - * Port some BE tests to KRPC services. Change-Id: Ic0b8c1e50678da66ab1547d16530f88b323ed8c1 --- M be/src/common/status.cc M be/src/common/status.h M be/src/exec/data-sink.cc M be/src/exec/exchange-node.cc M be/src/rpc/CMakeLists.txt M be/src/rpc/rpc-mgr.cc M be/src/rpc/rpc-mgr.h M be/src/runtime/CMakeLists.txt M be/src/runtime/data-stream-mgr-base.h M be/src/runtime/data-stream-mgr.h M be/src/runtime/data-stream-recvr.h M be/src/runtime/exec-env.cc M be/src/runtime/exec-env.h M be/src/runtime/krpc-data-stream-mgr.cc M be/src/runtime/krpc-data-stream-mgr.h M be/src/runtime/krpc-data-stream-recvr.cc M be/src/runtime/krpc-data-stream-recvr.h A be/src/runtime/krpc-data-stream-sender.cc A be/src/runtime/krpc-data-stream-sender.h M be/src/runtime/row-batch.cc M be/src/runtime/row-batch.h M be/src/service/CMakeLists.txt A be/src/service/data-stream-service.cc A be/src/service/data-stream-service.h M be/src/service/impala-server.cc M common/protobuf/CMakeLists.txt A common/protobuf/common.proto A common/protobuf/data_stream_service.proto A common/protobuf/row_batch.proto M common/thrift/generate_error_codes.py 30 files changed, 2,822 insertions(+), 163 deletions(-) git pull ssh://gerrit.cloudera.org:29418/Impala-ASF refs/changes/23/8023/2 -- To view, visit http://gerrit.cloudera.org:8080/8023 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: newpatchset Ger
[Impala-ASF-CR] IMPALA-4856: Port data stream service to KRPC
Sailesh Mukil has posted comments on this change. ( http://gerrit.cloudera.org:8080/8023 ) Change subject: IMPALA-4856: Port data stream service to KRPC .. Patch Set 1: (5 comments) http://gerrit.cloudera.org:8080/#/c/8023/1/be/src/runtime/krpc-data-stream-mgr.cc File be/src/runtime/krpc-data-stream-mgr.cc: http://gerrit.cloudera.org:8080/#/c/8023/1/be/src/runtime/krpc-data-stream-mgr.cc@198 PS1, Line 198: AddData Isn't the point of the deserialize pool to deserialize the payload early? Here, we're just calling AddData() on the payloads for early senders after the corresponding receiver has been created. http://gerrit.cloudera.org:8080/#/c/8023/1/be/src/runtime/krpc-data-stream-recvr.cc File be/src/runtime/krpc-data-stream-recvr.cc: http://gerrit.cloudera.org:8080/#/c/8023/1/be/src/runtime/krpc-data-stream-recvr.cc@165 PS1, Line 165: Either we'll consume a row batch from batch_queue_, or it's empty Shouldn't there always be something in the batch_queue_ if there's something in the blocked_senders_ list? Since we fill the blocked_senders_ only if the queue is at its limit. And we also logically service the batches from batch_queue_ first before servicing the batches from the blocked_senders_ list. http://gerrit.cloudera.org:8080/#/c/8023/1/be/src/runtime/krpc-data-stream-recvr.cc@166 PS1, Line 166: There is a window Just to make things clearer, could you specify what there's a window for? http://gerrit.cloudera.org:8080/#/c/8023/1/be/src/runtime/krpc-data-stream-recvr.cc@225 PS1, Line 225: There is a problem here. When we release lock_here, an arbitrary number of senders could call AddBatch(), and all their batches would get enqueued even though the ExceedsLimit() would be true. This breaks the guarantee of the queue not being over committed more than a single batch. http://gerrit.cloudera.org:8080/#/c/8023/1/be/src/runtime/krpc-data-stream-recvr.cc@284 PS1, Line 284: for (const auto& queue_entry: batch_queue_) delete queue_entry.second; batch_queue_.clear() ? -- To view, visit http://gerrit.cloudera.org:8080/8023 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-MessageType: comment Gerrit-Change-Id: Ic0b8c1e50678da66ab1547d16530f88b323ed8c1 Gerrit-Change-Number: 8023 Gerrit-PatchSet: 1 Gerrit-Owner: Michael Ho Gerrit-Reviewer: Sailesh Mukil Gerrit-Comment-Date: Tue, 26 Sep 2017 18:11:25 + Gerrit-HasComments: Yes
[Impala-ASF-CR] IMPALA-4856: Port data stream service to KRPC
Sailesh Mukil has posted comments on this change. ( http://gerrit.cloudera.org:8080/8023 ) Change subject: IMPALA-4856: Port data stream service to KRPC .. Patch Set 1: (14 comments) http://gerrit.cloudera.org:8080/#/c/8023/1/be/src/runtime/krpc-data-stream-mgr.h File be/src/runtime/krpc-data-stream-mgr.h: http://gerrit.cloudera.org:8080/#/c/8023/1/be/src/runtime/krpc-data-stream-mgr.h@110 PS1, Line 110: sent nit: received http://gerrit.cloudera.org:8080/#/c/8023/1/be/src/runtime/krpc-data-stream-mgr.h@110 PS1, Line 110: no TransmitData() RPCs will successfully deliver their : /// payload. Why would there be a TransmitData() RPC if EndDataStream() has already been sent? Doesn't the sender send it only if it knows all its TransmitData() RPCs have been processed? http://gerrit.cloudera.org:8080/#/c/8023/1/be/src/runtime/krpc-data-stream-mgr.h@133 PS1, Line 133: /// period expires. As per Tim's comment above, I would also reference IMPALA-3990 as a TODO here for later fixing. http://gerrit.cloudera.org:8080/#/c/8023/1/be/src/runtime/krpc-data-stream-mgr.h@159 PS1, Line 159: Consider tracking, on the sender, whether a batch has been successfully sent or : /// not. That's enough state to realise that a receiver has failed (rather than not : /// prepared yet), and the data stream mgr can use that to fail an RPC fast, rather than : /// having the closed-stream list. It would be nice to have a JIRA for this and reference it here. http://gerrit.cloudera.org:8080/#/c/8023/1/be/src/runtime/krpc-data-stream-mgr.h@353 PS1, Line 353: waiting_senders This is a little confusing to follow in the .cc file, since when I see "waiting_senders", I expect it to be a set of some unique identifiers for a Sender ID. Although this is unique to a specific sender, it would be a little clearer to call this 'waiting_senders_ctxs'. Let me know what you think. http://gerrit.cloudera.org:8080/#/c/8023/1/be/src/runtime/krpc-data-stream-mgr.h@356 PS1, Line 356: closed_senders Similarly, we could call this 'closed_senders_ctxs'. http://gerrit.cloudera.org:8080/#/c/8023/1/be/src/runtime/krpc-data-stream-mgr.cc File be/src/runtime/krpc-data-stream-mgr.cc: http://gerrit.cloudera.org:8080/#/c/8023/1/be/src/runtime/krpc-data-stream-mgr.cc@168 PS1, Line 168: num_senders_waiting_->Increment(1); : total_senders_waited_->Increment(1); : RecvrId recvr_id = make_pair(fragment_instance_id, request->dest_node_id()); : auto payload = : make_unique(proto_batch, context, request, response); : early_senders_map_[recvr_id].waiting_senders.push_back(move(payload)); I'm wondering if it makes sense to add simple inline functions that encapsulate this functionality; for the sake of readability. Eg: AddEarlyWaitingSender(), AddEarlyClosedSender() http://gerrit.cloudera.org:8080/#/c/8023/1/be/src/runtime/krpc-data-stream-mgr.cc@213 PS1, Line 213: If no receiver found, but not in the closed stream cache nit: If no receiver is found, and the receiver is not in the closed stream cache as well, we still need... http://gerrit.cloudera.org:8080/#/c/8023/1/be/src/runtime/krpc-data-stream-mgr.cc@218 PS1, Line 218: RecvrId recvr_id = make_pair(fragment_instance_id, dest_node_id); : auto payload = make_unique(context, request, response); : early_senders_map_[recvr_id].closed_senders.emplace_back(move(payload)); : num_senders_waiting_->Increment(1); : total_senders_waited_->Increment(1); AddEarlyClosedSender() as per comment above, if you agree. http://gerrit.cloudera.org:8080/#/c/8023/1/be/src/runtime/krpc-data-stream-mgr.cc@227 PS1, Line 227: if (LIKELY(recvr != nullptr)) recvr->RemoveSender(request->sender_id()); : Status::OK().ToProto(response->mutable_status()); : context->RespondSuccess(); This may need some modification based on the recent commit for IMPALA-5199: https://github.com/apache/incubator-impala/commit/5119ced50c0e0c4001621c9d4da598c187bdb580 http://gerrit.cloudera.org:8080/#/c/8023/1/be/src/runtime/krpc-data-stream-recvr.cc File be/src/runtime/krpc-data-stream-recvr.cc: http://gerrit.cloudera.org:8080/#/c/8023/1/be/src/runtime/krpc-data-stream-recvr.cc@75 PS1, Line 75: ("new data") I'm having some trouble understanding what this means. Could you please clarify? http://gerrit.cloudera.org:8080/#/c/8023/1/be/src/runtime/krpc-data-stream-recvr.cc@271 PS1, Line 271: data_arrival_cv_.notify_all(); Shouldn't this notify be done while holding the lock_ ? http://gerrit.cloudera.org:8080/#/c/8023/1/be/src/runtime/krpc-data-stream-recvr.cc@285 PS1, Line 285: while (!blocked_senders_.empty()) { nit: Add comment: Respond to blocked senders' RPCs http://gerrit.cloudera.org:808
[Impala-ASF-CR] IMPALA-4856: Port data stream service to KRPC
Tim Armstrong has posted comments on this change. Change subject: IMPALA-4856: Port data stream service to KRPC .. Patch Set 1: (1 comment) http://gerrit.cloudera.org:8080/#/c/8023/1/be/src/runtime/krpc-data-stream-mgr.h File be/src/runtime/krpc-data-stream-mgr.h: Line 129: /// In exceptional circumstances, the data stream manager will garbage-collect the closed There's a pre-existing flaw in the reasoning here that we should call out. "Exceptional circumstances" is vague and I think hides a distinction between an unhealthy cluster with extreme delays and the expected behaviour of certain long-running queries. I think the problem is an invalid assumption that the the receiver sends batches on a regular cadence with a bounded delay before the first batch is sent and when each subsequent batch is sent. That assumption is incorrect. I think we should call it out in this comment so that readers understand the current flaw. Here's an example where it's wrong. Consider a plan with three fragments. F1 (long-running) | V F2 (limit = 1 on exchange) | V F3 (long-running selective scan) 1. The fragments all start up. 2. Instance 1 of F3 immediately finds and returns a matching row, which is sent to F2. 3. This causes F2 to hit its limit, close its exchange and tear itself down. 4. Let's assume F1 also has a lot of work to do and won't finish for 20 minutes 5. Instance 2 of F3 is still churning away on the scan. After 10 minutes it finally find a matching row. 6. F3 tries to send the row, can't find the receiver after a timeout and returns an error to the coordinator 7. The coordinator cancels the query and returns an error There are two problems here: 1. The query failed when it shouldn't have 2. F3 wasn't cancelled when it was no longer needed and used lots of resources unnecessarily. The JIRA is IMPALA-3990. I believe that the main reason we haven't seen this in practice is that it can only occur when there's a limit without order in a subquery. Most queries with that property are non-deterministic and it doesn't really make a lot of sense to have a long-running query that returns non-deterministic results. But this actually blocked me from implementing early-close for joins with empty build sides, which is a nice optimisations. There may also be a slightly different invalid assumption that the time between the receiver closing the exchange and the sender sending its last batch is bounded. That seems possible to solve with sender-side state if the receiver notifies the sender that the receiver was not present and the sender can infer it was closed cleanly. -- To view, visit http://gerrit.cloudera.org:8080/8023 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-MessageType: comment Gerrit-Change-Id: Ic0b8c1e50678da66ab1547d16530f88b323ed8c1 Gerrit-PatchSet: 1 Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-Owner: Michael Ho Gerrit-Reviewer: Tim Armstrong Gerrit-HasComments: Yes
[Impala-ASF-CR] IMPALA-4856: Port data stream service to KRPC
Michael Ho has uploaded a new change for review. http://gerrit.cloudera.org:8080/8023 Change subject: IMPALA-4856: Port data stream service to KRPC .. IMPALA-4856: Port data stream service to KRPC This patch implements a new data stream service which utilizes KRPC. Similar to the thrift RPC implementation, there are 3 major components to the data stream services: KrpcDataStreamSender serializes and sends row batches materialized by a fragment instance to a KrpcDataStreamRecvr. KrpcDataStreamMgr is responsible for routing an incoming row batch to the appropriate receiver. The data stream service runs on the port FLAGS_krpc_port which is 29000 by default. Unlike the implementation with thrift RPC, KRPC provides an asynchronous interface for invoking remote methods. As a result, KrpcDataStreamSender doesn't need to create a thread per connection. There is one connection between two Impalad nodes for each direction (i.e. client and server). Multiple queries can multi-plex on the same connection for transmitting row batches between two Impalad nodes. The asynchronous interface also prevents some issues with thrift RPC in which a thread may be stuck in an RPC call without being able to check for cancellation. A TransmitData() call with KRPC is in essence a trio of RpcController, a serialized protobuf request buffer and a protobuf response buffer. The call is invoked via a DataStreamService proxy object. The serialized tuple offsets and row batches are sent via "sidecars" in KRPC to avoid extra copy into the serialized request buffer. Each impalad node creates a singleton DataStreamService object at start-up time. All incoming calls are served by a service thread pool created as part of DataStreamService. By default, there are 64 service threads. The service threads are shared across all queries so the RPC handler should avoid blocking as much as possible. In thrift RPC implementation, we make a thrift thread handling a TransmitData() RPC to block for extended period of time when the receiver is not yet created when the call arrives. In KRPC implementation, we store TransmitData() or EndDataStream() requests which arrive before the receiver is ready in a per-receiver early sender list stored in KrpcDataStreamMgr. These RPC calls will be processed and responded to when the receiver is created or when timeout occurs. Similarly, there is limited space in the sender queues in KrpcDataStreamRecvr. If adding a row batch to a queue in KrpcDataStreamRecvr causes the buffer limit to exceed, the request will be stashed in a blocked_sender_ queue to be processed later. The stashed RPC request will not be responded to until it is processed so as to exert back pressure to the client. An alternative would be to reply with an error and the request / row batches need to be sent again. This may end up consuming more network bandwidth than the thrift RPC implementation. This change adopts the behavior of allowing one stashed request per sender. All rpc requests and responses are serialized using protobuf. The equivalent of TRowBatch would be ProtoRowBatch which contains a serialized header about the meta-data of the row batch and two Kudu Slice objects which contain pointers to the actual data (i.e. tuple offsets and tuple data). This patch is based on an abandoned patch by Henry Robinson. TESTING --- * Build passed with FLAGS_use_krpc=true. TO DO - * Port some BE tests to KRPC services. Change-Id: Ic0b8c1e50678da66ab1547d16530f88b323ed8c1 --- M be/src/common/status.cc M be/src/common/status.h M be/src/exec/data-sink.cc M be/src/exec/exchange-node.cc M be/src/rpc/CMakeLists.txt M be/src/rpc/rpc-mgr.cc M be/src/rpc/rpc-mgr.h M be/src/runtime/CMakeLists.txt M be/src/runtime/data-stream-mgr-base.h M be/src/runtime/data-stream-mgr.h M be/src/runtime/data-stream-recvr.h M be/src/runtime/exec-env.cc M be/src/runtime/exec-env.h M be/src/runtime/krpc-data-stream-mgr.cc M be/src/runtime/krpc-data-stream-mgr.h M be/src/runtime/krpc-data-stream-recvr.cc M be/src/runtime/krpc-data-stream-recvr.h A be/src/runtime/krpc-data-stream-sender.cc A be/src/runtime/krpc-data-stream-sender.h M be/src/runtime/row-batch.cc M be/src/runtime/row-batch.h M be/src/service/CMakeLists.txt A be/src/service/data-stream-service.cc A be/src/service/data-stream-service.h M be/src/service/impala-server.cc M common/protobuf/CMakeLists.txt A common/protobuf/common.proto A common/protobuf/data_stream_service.proto A common/protobuf/row_batch.proto 29 files changed, 2,783 insertions(+), 163 deletions(-) git pull ssh://gerrit.cloudera.org:29418/Impala-ASF refs/changes/23/8023/1 -- To view, visit http://gerrit.cloudera.org:8080/8023 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-MessageType: newchange Gerrit-Change-Id: Ic0b8c1e50678da66ab1547d16530f88b323ed8c1 Gerrit-PatchSet: 1 Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-Owner: Michael Ho
[Impala-ASF-CR] IMPALA-4856: Port data stream service to KRPC
Henry Robinson has abandoned this change. Change subject: IMPALA-4856: Port data stream service to KRPC .. Abandoned -- To view, visit http://gerrit.cloudera.org:8080/7103 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-MessageType: abandon Gerrit-Change-Id: Ia66704be7a0a8162bb85556d07b583ec756c584b Gerrit-PatchSet: 6 Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-Owner: Henry Robinson Gerrit-Reviewer: Henry Robinson Gerrit-Reviewer: Michael Ho Gerrit-Reviewer: Mostafa Mokhtar Gerrit-Reviewer: Sailesh Mukil
[Impala-ASF-CR] IMPALA-4856: Port data stream service to KRPC
Sailesh Mukil has uploaded a new patch set (#6). Change subject: IMPALA-4856: Port data stream service to KRPC .. IMPALA-4856: Port data stream service to KRPC This patch ports the data-flow parts of ImpalaInternalService to KRPC. * ImpalaInternalService is split into two services. The first, ImpalaInternalService, deals with control messages for plan fragment instance execution, cancellation and reporting, and remains implemented in Thrift for now. The second, DataStreamService, handles large-payload RPCs for transmitting runtime filters and row batches between hosts. * In the DataStreamService, all RPCs use 'native' protobuf. The DataStreamService starts on the port previously reserved for the StatestoreSubscriberService (which is also a KRPC service), to avoid having to configure another port when starting Impala. When the ImpalaInternalService is ported to KRPC, all services will run on one port. * To support needing to address two different backend services, a data service port has been added to TBackendDescriptor. * This patch adds support for asynchronous RPCs to the RpcMgr and Rpc classes. Previously, Impala used fixed size thread pools + synchronous RPCs to achieve some parallelism for 'broadcast' RPCs like filter propagation, or a dedicated per-sender+receiver pair thread on the sender side in the DataStreamSender case. In this patch, the PublishFilter() and TransmitData() RPCs are sent asynchronously using KRPC's thread pools. * The TransmitData() protocol has changed to adapt to asynchronous RPCs. The full details are in data-stream-mgr.h. * As a result, DataStreamSender no longer creates a thread-per-connection on the sender side. * Both tuple transmission and runtime filter publication use sidecars to minimise the number of copies and serialization steps required. * Also include a fix for KUDU-2011 that properly allows sidecars to be shared between KRPC and the RPC caller (fixing IMPALA-5093, a corruption bug). * A large portion of this patch is the replacement of TRowBatch with its Protobuf equivalent, RowBatchPb. The replacement is a literal port of the data structure, and row-batch-test, row-batch-list-test and row-batch-serialize-benchmark continue to execute without major logic changes. * Simplify FindRecvr() logic in DataStreamManager. No-longer need to handle blocking sender-side, so no need for complex promise-based machinery. Instead, all senders with no receiver are added to a per-receiver list, which is processed when the receiver arrives. If it does not arrive promptly, the DataStreamManager cleans them up after FLAGS_datastream_sender_timeout_ms. * This patch also begins a clean-up of how ImpalaServer instances are created (by removing CreateImpalaServer), and clarifying the relationship between ExecEnv and ImpalaServer. ImpalaServer now follows the standard construct->Init()->Start()->Join() lifecycle that we use for other services. * Ensure that all addresses used for KRPCs are fully resolved, avoiding the need to resolve them for each RPC. TESTING --- * New tests added to rpc-mgr-test. TO DO - * Re-enable throughput and latency measurements per data-stream sender when that information is exposed from KRPC (KUDU-1738). * TLS and Kerberos are still not supported by KRPC in this patch. Change-Id: Ia66704be7a0a8162bb85556d07b583ec756c584b --- M .clang-format M CMakeLists.txt M be/CMakeLists.txt M be/src/benchmarks/bloom-filter-benchmark.cc M be/src/benchmarks/row-batch-serialize-benchmark.cc M be/src/common/init.cc M be/src/common/status.cc M be/src/common/status.h M be/src/exprs/expr-test.cc M be/src/kudu/rpc/rpc_sidecar.cc M be/src/kudu/rpc/rpc_sidecar.h M be/src/rpc/CMakeLists.txt M be/src/rpc/TAcceptQueueServer.cpp M be/src/rpc/common.proto M be/src/rpc/rpc-mgr-test.cc M be/src/rpc/rpc-mgr.h M be/src/rpc/rpc-mgr.inline.h M be/src/rpc/rpc.h M be/src/runtime/backend-client.h M be/src/runtime/client-cache-types.h M be/src/runtime/coordinator-backend-state.cc M be/src/runtime/coordinator-backend-state.h M be/src/runtime/coordinator-filter-state.h M be/src/runtime/coordinator.cc M be/src/runtime/coordinator.h M be/src/runtime/data-stream-mgr.cc M be/src/runtime/data-stream-mgr.h M be/src/runtime/data-stream-recvr.cc M be/src/runtime/data-stream-recvr.h M be/src/runtime/data-stream-sender.cc M be/src/runtime/data-stream-sender.h M be/src/runtime/data-stream-test.cc M be/src/runtime/exec-env.cc M be/src/runtime/exec-env.h M be/src/runtime/fragment-instance-state.cc M be/src/runtime/fragment-instance-state.h M be/src/runtime/query-state.cc M be/src/runtime/query-state.h M be/src/runtime/row-batch-serialize-test.cc M be/src/runtime/row-batch.cc M be/src/runtime/row-batch.h M be/src/runtime/runtime-filter-bank.cc M be/src/runtime/runtime-filter-bank.h M be/src/scheduling/backend-config-test.cc M be/src/scheduling/backe
[Impala-ASF-CR] IMPALA-4856: Port data stream service to KRPC
Michael Ho has posted comments on this change. Change subject: IMPALA-4856: Port data stream service to KRPC .. Patch Set 3: (1 comment) http://gerrit.cloudera.org:8080/#/c/7103/3/be/src/runtime/data-stream-sender.cc File be/src/runtime/data-stream-sender.cc: PS3, Line 252: batch->compressed_tuple_data > The ownership is shared with the batch object. AddSidecar() internally move I see. I am still getting used to this subtly of passing shared_ptr as argument. -- To view, visit http://gerrit.cloudera.org:8080/7103 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-MessageType: comment Gerrit-Change-Id: Ia66704be7a0a8162bb85556d07b583ec756c584b Gerrit-PatchSet: 3 Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-Owner: Henry Robinson Gerrit-Reviewer: Henry Robinson Gerrit-Reviewer: Michael Ho Gerrit-Reviewer: Sailesh Mukil Gerrit-HasComments: Yes
[Impala-ASF-CR] IMPALA-4856: Port data stream service to KRPC
Henry Robinson has posted comments on this change. Change subject: IMPALA-4856: Port data stream service to KRPC .. Patch Set 5: (22 comments) PS4 is a rebase. PS5 includes the review responses (so diff 4->5 if you want to see what changed). http://gerrit.cloudera.org:8080/#/c/7103/3/be/src/rpc/rpc.h File be/src/rpc/rpc.h: PS3, Line 106: Ownership is : // shared by the caller, and the RPC subsystem > Doesn't std::move transfer the ownership so the caller no longer shares the The shared_ptr is copied in. It's the copy that is then moved into the sidecar list. PS3, Line 143: are owned by the caller > the ownership is temporarily transferred to the RPC call when this function I don't think so - the RPC call has pointers, but doesn't have ownership in the sense that it has no responsibility for managing a reference count or freeing the memory. PS3, Line 147: > Having the names 'func', 'cb' and 'cb_wrapper' all close by each other make Done PS3, Line 153: > Does this move mean that the params_ member is invalid after this call? If Done PS3, Line 327: > Maybe name this 'completion_cb'. Done http://gerrit.cloudera.org:8080/#/c/7103/3/be/src/runtime/coordinator-backend-state.cc File be/src/runtime/coordinator-backend-state.cc: PS3, Line 389: equest->mutable_bloom_filter()->set_log_heap_space(0); : request->mutable_bloom_filter()->set_directory_sidecar_idx(-1); : } > Why wouldn't a move capture ensure the same thing? proto_filter is a const shared_ptr&. You can't move from it. Instead, we could have the argument be shared_ptr, and move from it here; they're basically equivalent, it's just a question of where you make the copy. http://gerrit.cloudera.org:8080/#/c/7103/3/be/src/runtime/coordinator.cc File be/src/runtime/coordinator.cc: PS3, Line 1207: VLOG_QUERY << "Not enough memory to allocate filter: " : << PrettyPrinter::Print(heap_space, TUnit::BYTES) : << " (query: " << coord->query_id() << ")"; : // Disable, as one missing update means a correct filter cannot be > I would add this to the commit message. This means we would take double the I don't think so - because params.directory is a sidecar I don't think it's been copied since it arrived on the socket. In the Thrift case, the bytestream had to be deserialized into a TBloomFilter. That's what's happening here - the equivalent 'deserialization' step. This path should only get taken the first time a filter arrives, and it does briefly keep two filters around (the sidecar should get destroyed as soon as the RPC is responded to). http://gerrit.cloudera.org:8080/#/c/7103/3/be/src/runtime/data-stream-recvr.cc File be/src/runtime/data-stream-recvr.cc: PS3, Line 280: blocked_senders_.front() > Is this a right way to dispose a unique_ptr? Good point - release() is clearer, and get() may have been a benign bug. http://gerrit.cloudera.org:8080/#/c/7103/3/be/src/runtime/data-stream-sender.cc File be/src/runtime/data-stream-sender.cc: PS3, Line 58: > Not used. Done PS3, Line 60: > Not used. Done PS3, Line 133: scoped_ptr batch_; > No one really calls GetNumDataBytesSent() (except from our BE test). So, I' We're gaining correctness - so worth doing (otherwise if someone decides to use it in the future, they might run into problems). PS3, Line 148: > A reader of this code might not immediately understand why this class needs I expanded the comment here. PS3, Line 170: > Why is this set in Init()? Wouldn't it ideally be set it in the constructor Moved to c'tor. PS3, Line 175: proto_batch_idx_ > Just want to make sure that this will increase the shared_ptr refcount? It Yep - this was a mistake. Removed auto to make it more explicit. PS3, Line 203: co > Prefer a more descriptive name "rpc_completion_cb" or something similar. Done PS3, Line 214: ck_guard > channel == nullptr Done PS3, Line 252: batch->tuple_data, &idx); > Is this transferring the ownership to the RPC subsystem ? AddSideCar() inte The ownership is shared with the batch object. AddSidecar() internally moves from the argument, which is a copy (i.e. its own reference). PS3, Line 266: .release(), rpc_complete_callback); > This is a subtle change in behavior from previous Impala version. In partic Any reasonably conservative timeout runs the risk of false negatives if a sender is blocked. I agree with your analysis about this being a change in behaviour. In practice, though, here's what I hope will happen: if one write to a node is slow enough to previously trigger the timeout, I would expect the statestore RPCs to also go slow (and they will time out); the node will be marked as offline and the query will be cancelled. If there is a situation where this RPC only is slow in writing (but all other RPCs to the server are ok), then I agree
[Impala-ASF-CR] IMPALA-4856: Port data stream service to KRPC
Henry Robinson has uploaded a new patch set (#5). Change subject: IMPALA-4856: Port data stream service to KRPC .. IMPALA-4856: Port data stream service to KRPC This patch ports the data-flow parts of ImpalaInternalService to KRPC. * ImpalaInternalService is split into two services. The first, ImpalaInternalService, deals with control messages for plan fragment instance execution, cancellation and reporting, and remains implemented in Thrift for now. The second, DataStreamService, handles large-payload RPCs for transmitting runtime filters and row batches between hosts. * In the DataStreamService, all RPCs use 'native' protobuf. The DataStreamService starts on the port previously reserved for the StatestoreSubscriberService (which is also a KRPC service), to avoid having to configure another port when starting Impala. When the ImpalaInternalService is ported to KRPC, all services will run on one port. * To support needing to address two different backend services, a data service port has been added to TBackendDescriptor. * This patch adds support for asynchronous RPCs to the RpcMgr and Rpc classes. Previously, Impala used fixed size thread pools + synchronous RPCs to achieve some parallelism for 'broadcast' RPCs like filter propagation, or a dedicated per-sender+receiver pair thread on the sender side in the DataStreamSender case. In this patch, the PublishFilter() and TransmitData() RPCs are sent asynchronously using KRPC's thread pools. * The TransmitData() protocol has changed to adapt to asynchronous RPCs. The full details are in data-stream-mgr.h. * As a result, DataStreamSender no longer creates a thread-per-connection on the sender side. * Both tuple transmission and runtime filter publication use sidecars to minimise the number of copies and serialization steps required. * Also include a fix for KUDU-2011 that properly allows sidecars to be shared between KRPC and the RPC caller (fixing IMPALA-5093, a corruption bug). * A large portion of this patch is the replacement of TRowBatch with its Protobuf equivalent, RowBatchPb. The replacement is a literal port of the data structure, and row-batch-test, row-batch-list-test and row-batch-serialize-benchmark continue to execute without major logic changes. * Simplify FindRecvr() logic in DataStreamManager. No-longer need to handle blocking sender-side, so no need for complex promise-based machinery. Instead, all senders with no receiver are added to a per-receiver list, which is processed when the receiver arrives. If it does not arrive promptly, the DataStreamManager cleans them up after FLAGS_datastream_sender_timeout_ms. * This patch also begins a clean-up of how ImpalaServer instances are created (by removing CreateImpalaServer), and clarifying the relationship between ExecEnv and ImpalaServer. ImpalaServer now follows the standard construct->Init()->Start()->Join() lifecycle that we use for other services. * Ensure that all addresses used for KRPCs are fully resolved, avoiding the need to resolve them for each RPC. TESTING --- * New tests added to rpc-mgr-test. TO DO - * Re-enable throughput and latency measurements per data-stream sender when that information is exposed from KRPC (KUDU-1738). * TLS and Kerberos are still not supported by KRPC in this patch. Change-Id: Ia66704be7a0a8162bb85556d07b583ec756c584b --- M .clang-format M CMakeLists.txt M be/CMakeLists.txt M be/src/benchmarks/bloom-filter-benchmark.cc M be/src/benchmarks/row-batch-serialize-benchmark.cc M be/src/common/init.cc M be/src/common/status.cc M be/src/common/status.h M be/src/exprs/expr-test.cc M be/src/kudu/rpc/rpc_sidecar.cc M be/src/kudu/rpc/rpc_sidecar.h M be/src/rpc/CMakeLists.txt M be/src/rpc/TAcceptQueueServer.cpp M be/src/rpc/common.proto M be/src/rpc/rpc-mgr-test.cc M be/src/rpc/rpc-mgr.h M be/src/rpc/rpc-mgr.inline.h M be/src/rpc/rpc.h M be/src/runtime/backend-client.h M be/src/runtime/client-cache-types.h M be/src/runtime/coordinator-backend-state.cc M be/src/runtime/coordinator-backend-state.h M be/src/runtime/coordinator-filter-state.h M be/src/runtime/coordinator.cc M be/src/runtime/coordinator.h M be/src/runtime/data-stream-mgr.cc M be/src/runtime/data-stream-mgr.h M be/src/runtime/data-stream-recvr.cc M be/src/runtime/data-stream-recvr.h M be/src/runtime/data-stream-sender.cc M be/src/runtime/data-stream-sender.h M be/src/runtime/data-stream-test.cc M be/src/runtime/exec-env.cc M be/src/runtime/exec-env.h M be/src/runtime/fragment-instance-state.cc M be/src/runtime/fragment-instance-state.h M be/src/runtime/query-state.cc M be/src/runtime/query-state.h M be/src/runtime/row-batch-serialize-test.cc M be/src/runtime/row-batch.cc M be/src/runtime/row-batch.h M be/src/runtime/runtime-filter-bank.cc M be/src/runtime/runtime-filter-bank.h M be/src/scheduling/backend-config-test.cc M be/src/scheduling/back
[Impala-ASF-CR] IMPALA-4856: Port data stream service to KRPC
Henry Robinson has uploaded a new patch set (#4). Change subject: IMPALA-4856: Port data stream service to KRPC .. IMPALA-4856: Port data stream service to KRPC This patch ports the data-flow parts of ImpalaInternalService to KRPC. * ImpalaInternalService is split into two services. The first, ImpalaInternalService, deals with control messages for plan fragment instance execution, cancellation and reporting, and remains implemented in Thrift for now. The second, DataStreamService, handles large-payload RPCs for transmitting runtime filters and row batches between hosts. * In the DataStreamService, all RPCs use 'native' protobuf. The DataStreamService starts on the port previously reserved for the StatestoreSubscriberService (which is also a KRPC service), to avoid having to configure another port when starting Impala. When the ImpalaInternalService is ported to KRPC, all services will run on one port. * To support needing to address two different backend services, a data service port has been added to TBackendDescriptor. * This patch adds support for asynchronous RPCs to the RpcMgr and Rpc classes. Previously, Impala used fixed size thread pools + synchronous RPCs to achieve some parallelism for 'broadcast' RPCs like filter propagation, or a dedicated per-sender+receiver pair thread on the sender side in the DataStreamSender case. In this patch, the PublishFilter() and TransmitData() RPCs are sent asynchronously using KRPC's thread pools. * The TransmitData() protocol has changed to adapt to asynchronous RPCs. The full details are in data-stream-mgr.h. * As a result, DataStreamSender no longer creates a thread-per-connection on the sender side. * Both tuple transmission and runtime filter publication use sidecars to minimise the number of copies and serialization steps required. * Also include a fix for KUDU-2011 that properly allows sidecars to be shared between KRPC and the RPC caller (fixing IMPALA-5093, a corruption bug). * A large portion of this patch is the replacement of TRowBatch with its Protobuf equivalent, RowBatchPb. The replacement is a literal port of the data structure, and row-batch-test, row-batch-list-test and row-batch-serialize-benchmark continue to execute without major logic changes. * Simplify FindRecvr() logic in DataStreamManager. No-longer need to handle blocking sender-side, so no need for complex promise-based machinery. Instead, all senders with no receiver are added to a per-receiver list, which is processed when the receiver arrives. If it does not arrive promptly, the DataStreamManager cleans them up after FLAGS_datastream_sender_timeout_ms. * This patch also begins a clean-up of how ImpalaServer instances are created (by removing CreateImpalaServer), and clarifying the relationship between ExecEnv and ImpalaServer. ImpalaServer now follows the standard construct->Init()->Start()->Join() lifecycle that we use for other services. * Ensure that all addresses used for KRPCs are fully resolved, avoiding the need to resolve them for each RPC. TESTING --- * New tests added to rpc-mgr-test. TO DO - * Re-enable throughput and latency measurements per data-stream sender when that information is exposed from KRPC (KUDU-1738). * TLS and Kerberos are still not supported by KRPC in this patch. Change-Id: Ia66704be7a0a8162bb85556d07b583ec756c584b --- M .clang-format M CMakeLists.txt M be/CMakeLists.txt M be/src/benchmarks/bloom-filter-benchmark.cc M be/src/benchmarks/row-batch-serialize-benchmark.cc M be/src/common/init.cc M be/src/common/status.cc M be/src/common/status.h M be/src/exprs/expr-test.cc M be/src/kudu/rpc/rpc_sidecar.cc M be/src/kudu/rpc/rpc_sidecar.h M be/src/rpc/CMakeLists.txt M be/src/rpc/TAcceptQueueServer.cpp M be/src/rpc/common.proto M be/src/rpc/rpc-mgr-test.cc M be/src/rpc/rpc-mgr.h M be/src/rpc/rpc-mgr.inline.h M be/src/rpc/rpc.h M be/src/runtime/backend-client.h M be/src/runtime/client-cache-types.h M be/src/runtime/coordinator-backend-state.cc M be/src/runtime/coordinator-backend-state.h M be/src/runtime/coordinator-filter-state.h M be/src/runtime/coordinator.cc M be/src/runtime/coordinator.h M be/src/runtime/data-stream-mgr.cc M be/src/runtime/data-stream-mgr.h M be/src/runtime/data-stream-recvr.cc M be/src/runtime/data-stream-recvr.h M be/src/runtime/data-stream-sender.cc M be/src/runtime/data-stream-sender.h M be/src/runtime/data-stream-test.cc M be/src/runtime/exec-env.cc M be/src/runtime/exec-env.h M be/src/runtime/fragment-instance-state.cc M be/src/runtime/fragment-instance-state.h M be/src/runtime/query-state.cc M be/src/runtime/query-state.h M be/src/runtime/row-batch-serialize-test.cc M be/src/runtime/row-batch.cc M be/src/runtime/row-batch.h M be/src/runtime/runtime-filter-bank.cc M be/src/runtime/runtime-filter-bank.h M be/src/scheduling/backend-config-test.cc M be/src/scheduling/back
[Impala-ASF-CR] IMPALA-4856: Port data stream service to KRPC
Michael Ho has posted comments on this change. Change subject: IMPALA-4856: Port data stream service to KRPC .. Patch Set 3: (6 comments) Some more comments. Still going through the patch. http://gerrit.cloudera.org:8080/#/c/7103/3/be/src/rpc/rpc.h File be/src/rpc/rpc.h: PS3, Line 106: Ownership is : // shared by the caller, and the RPC subsystem Doesn't std::move transfer the ownership so the caller no longer shares the ownership, right ? PS3, Line 143: are owned by the caller the ownership is temporarily transferred to the RPC call when this function is invoked, right ? http://gerrit.cloudera.org:8080/#/c/7103/3/be/src/runtime/data-stream-sender.cc File be/src/runtime/data-stream-sender.cc: PS3, Line 214: !channel channel == nullptr PS3, Line 252: batch->compressed_tuple_data Is this transferring the ownership to the RPC subsystem ? AddSideCar() internally uses std::move(). This seems subtle enough to warrant a comment. PS3, Line 266: MonoDelta::FromMilliseconds(numeric_limits::max()) This is a subtle change in behavior from previous Impala version. In particular, FLAGS_backend_client_rpc_timeout_ms marks that the timeout for a socket if a thrift thread was stuck writing to the socket. Given KRPC socket is asynchronous, the DSS may get blocked for quite a while until the query gets cancelled. Should we impose some reasonably conservative timeout here ? http://gerrit.cloudera.org:8080/#/c/7103/3/be/src/runtime/row-batch.cc File be/src/runtime/row-batch.cc: PS3, Line 117: DCHECK( DCHECK_EQ -- To view, visit http://gerrit.cloudera.org:8080/7103 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-MessageType: comment Gerrit-Change-Id: Ia66704be7a0a8162bb85556d07b583ec756c584b Gerrit-PatchSet: 3 Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-Owner: Henry Robinson Gerrit-Reviewer: Henry Robinson Gerrit-Reviewer: Michael Ho Gerrit-Reviewer: Sailesh Mukil Gerrit-HasComments: Yes
[Impala-ASF-CR] IMPALA-4856: Port data stream service to KRPC
Sailesh Mukil has posted comments on this change. Change subject: IMPALA-4856: Port data stream service to KRPC .. Patch Set 3: (17 comments) http://gerrit.cloudera.org:8080/#/c/7103/3/be/src/rpc/rpc.h File be/src/rpc/rpc.h: PS3, Line 147: func Having the names 'func', 'cb' and 'cb_wrapper' all close by each other makes some of this slightly more complicated to read. I would opt for renaming 'func' to 'rpc_method' or something, so that it's crystal clear that it may not be a callback itself. PS3, Line 153: std::move(params_) Does this move mean that the params_ member is invalid after this call? If so, it would be good to add a comment where it's declared. PS3, Line 327: cb Maybe name this 'completion_cb'. http://gerrit.cloudera.org:8080/#/c/7103/3/be/src/runtime/coordinator-backend-state.cc File be/src/runtime/coordinator-backend-state.cc: PS3, Line 389: Copying proto_filter here ensures that its lifetime will last at least until this : // callback completes. : auto cb = [proto_filter] Why wouldn't a move capture ensure the same thing? http://gerrit.cloudera.org:8080/#/c/7103/3/be/src/runtime/coordinator.cc File be/src/runtime/coordinator.cc: PS3, Line 1207: // Do an explicit copy of the directory: 'params' may have come from an RPC so we : // can't assume ownership of its directory. : bloom_filter_ = : make_unique(params.header, params.directory); I would add this to the commit message. This means we would take double the memory cost every time we merge filters right? http://gerrit.cloudera.org:8080/#/c/7103/3/be/src/runtime/data-stream-recvr.cc File be/src/runtime/data-stream-recvr.cc: PS3, Line 280: blocked_senders_.front() Is this a right way to dispose a unique_ptr? http://gerrit.cloudera.org:8080/#/c/7103/3/be/src/runtime/data-stream-sender.cc File be/src/runtime/data-stream-sender.cc: PS3, Line 58: DECLARE_int32(datastream_sender_timeout_ms); Not used. PS3, Line 60: DECLARE_int32(state_store_subscriber_port); Not used. PS3, Line 133: AtomicInt64 num_data_bytes_sent_ No one really calls GetNumDataBytesSent() (except from our BE test). So, I'm not sure we're gaining anything by making this atomic. Not a big deal, but it might be cheaper to leave it as an int64. Your call though. PS3, Line 148: self_ A reader of this code might not immediately understand why this class needs to always have a reference to itself. It would be good to explicitly mention this member name in the header where the explanation is given. PS3, Line 170: self_ = shared_from_this(); Why is this set in Init()? Wouldn't it ideally be set it in the constructor? PS3, Line 175: auto proto_batch Just want to make sure that this will increase the shared_ptr refcount? It should because it will make an underlying copy of the pointer, but I just want to make sure. PS3, Line 203: cb Prefer a more descriptive name "rpc_completion_cb" or something similar. Line 336: batch_.reset(); DCHECK(self_.get()) http://gerrit.cloudera.org:8080/#/c/7103/3/be/src/runtime/row-batch.cc File be/src/runtime/row-batch.cc: Line 216: output_batch->header.set_compression_type(THdfsCompression::LZ4); Do you think it would be good to add a comment why we don't free the 'tuple_data' buffer here? Presumably so we can reuse the memory when the RowBatch is recycled? http://gerrit.cloudera.org:8080/#/c/7103/3/be/src/runtime/row-batch.h File be/src/runtime/row-batch.h: PS3, Line 73: == THdfsCompression::LZ4 != THdfsCompression::NONE Functionally same, more readable. PS3, Line 441: FlushMode flush_ = FlushMode::NO_FLUSH_RESOURCES Why not initialize this and the other args below with the default values in the constructor member initialization list? -- To view, visit http://gerrit.cloudera.org:8080/7103 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-MessageType: comment Gerrit-Change-Id: Ia66704be7a0a8162bb85556d07b583ec756c584b Gerrit-PatchSet: 3 Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-Owner: Henry Robinson Gerrit-Reviewer: Henry Robinson Gerrit-Reviewer: Michael Ho Gerrit-Reviewer: Sailesh Mukil Gerrit-HasComments: Yes
[Impala-ASF-CR] IMPALA-4856: Port data stream service to KRPC
Henry Robinson has uploaded a new patch set (#3). Change subject: IMPALA-4856: Port data stream service to KRPC .. IMPALA-4856: Port data stream service to KRPC This patch ports the data-flow parts of ImpalaInternalService to KRPC. * ImpalaInternalService is split into two services. The first, ImpalaInternalService, deals with control messages for plan fragment instance execution, cancellation and reporting, and remains implemented in Thrift for now. The second, DataStreamService, handles large-payload RPCs for transmitting runtime filters and row batches between hosts. * In the DataStreamService, all RPCs use 'native' protobuf. The DataStreamService starts on the port previously reserved for the StatestoreSubscriberService (which is also a KRPC service), to avoid having to configure another port when starting Impala. When the ImpalaInternalService is ported to KRPC, all services will run on one port. * To support needing to address two different backend services, a data service port has been added to TBackendDescriptor. * This patch adds support for asynchronous RPCs to the RpcMgr and Rpc classes. Previously, Impala used fixed size thread pools + synchronous RPCs to achieve some parallelism for 'broadcast' RPCs like filter propagation, or a dedicated per-sender+receiver pair thread on the sender side in the DataStreamSender case. In this patch, the PublishFilter() and TransmitData() RPCs are sent asynchronously using KRPC's thread pools. * The TransmitData() protocol has changed to adapt to asynchronous RPCs. The full details are in data-stream-mgr.h. * As a result, DataStreamSender no longer creates a thread-per-connection on the sender side. * Both tuple transmission and runtime filter publication use sidecars to minimise the number of copies and serialization steps required. * Also include a fix for KUDU-2011 that properly allows sidecars to be shared between KRPC and the RPC caller (fixing IMPALA-5093, a corruption bug). * A large portion of this patch is the replacement of TRowBatch with its Protobuf equivalent, RowBatchPb. The replacement is a literal port of the data structure, and row-batch-test, row-batch-list-test and row-batch-serialize-benchmark continue to execute without major logic changes. * Simplify FindRecvr() logic in DataStreamManager. No-longer need to handle blocking sender-side, so no need for complex promise-based machinery. Instead, all senders with no receiver are added to a per-receiver list, which is processed when the receiver arrives. If it does not arrive promptly, the DataStreamManager cleans them up after FLAGS_datastream_sender_timeout_ms. * This patch also begins a clean-up of how ImpalaServer instances are created (by removing CreateImpalaServer), and clarifying the relationship between ExecEnv and ImpalaServer. ImpalaServer now follows the standard construct->Init()->Start()->Join() lifecycle that we use for other services. * Ensure that all addresses used for KRPCs are fully resolved, avoiding the need to resolve them for each RPC. TESTING --- * New tests added to rpc-mgr-test. TO DO - * Re-enable throughput and latency measurements per data-stream sender when that information is exposed from KRPC (KUDU-1738). * TLS and Kerberos are still not supported by KRPC in this patch. Change-Id: Ia66704be7a0a8162bb85556d07b583ec756c584b --- M .clang-format M CMakeLists.txt M be/CMakeLists.txt M be/src/benchmarks/bloom-filter-benchmark.cc M be/src/benchmarks/row-batch-serialize-benchmark.cc M be/src/common/init.cc M be/src/common/status.cc M be/src/common/status.h M be/src/exprs/expr-test.cc M be/src/kudu/rpc/rpc_sidecar.cc M be/src/kudu/rpc/rpc_sidecar.h M be/src/rpc/CMakeLists.txt M be/src/rpc/TAcceptQueueServer.cpp M be/src/rpc/common.proto M be/src/rpc/rpc-mgr-test.cc M be/src/rpc/rpc-mgr.h M be/src/rpc/rpc-mgr.inline.h M be/src/rpc/rpc.h D be/src/runtime/backend-client.h M be/src/runtime/client-cache-types.h M be/src/runtime/coordinator-backend-state.cc M be/src/runtime/coordinator-backend-state.h M be/src/runtime/coordinator-filter-state.h M be/src/runtime/coordinator.cc M be/src/runtime/coordinator.h M be/src/runtime/data-stream-mgr.cc M be/src/runtime/data-stream-mgr.h M be/src/runtime/data-stream-recvr.cc M be/src/runtime/data-stream-recvr.h M be/src/runtime/data-stream-sender.cc M be/src/runtime/data-stream-sender.h M be/src/runtime/data-stream-test.cc M be/src/runtime/exec-env.cc M be/src/runtime/exec-env.h M be/src/runtime/fragment-instance-state.cc M be/src/runtime/fragment-instance-state.h M be/src/runtime/query-state.cc M be/src/runtime/query-state.h M be/src/runtime/row-batch-serialize-test.cc M be/src/runtime/row-batch.cc M be/src/runtime/row-batch.h M be/src/runtime/runtime-filter-bank.cc M be/src/runtime/runtime-filter-bank.h M be/src/runtime/runtime-state.cc M be/src/runtime/runtime-state.h
[Impala-ASF-CR] IMPALA-4856: Port data stream service to KRPC
Henry Robinson has posted comments on this change. Change subject: IMPALA-4856: Port data stream service to KRPC .. Patch Set 2: (8 comments) http://gerrit.cloudera.org:8080/#/c/7103/2/be/src/rpc/rpc.h File be/src/rpc/rpc.h: PS2, Line 126: 'cb' > Can you please state the context in which 'cb' is called from (e.g. reactor Done PS2, Line 133: aattempted > typo Done PS2, Line 230: // > /// Done PS2, Line 319: Retries > Is there any way to write a be-test to exercise the retry path ? Yep - see rpc-mgr-test.cc, RetryAsyncTest. That injects ERROR_SERVER_TOO_BUSY into an RPC response which triggers the retry logic. PS2, Line 322: auto cb_wrapper = [params = std::move(params), mgr, func, req, resp, : cb = std::move(cb), controller_ptr = controller.release(), num_attempts]() : mutable { > An alternative to this lambda implementation would be to define a separate It would, yeah. My preference is for using a lambda here partly because it's very clear about how the arguments are copied, and how ownership is managed (I find the copying behaviour of bind() a bit inscrutable). PS2, Line 332: cb(Status::OK(), req, resp, controller_ptr); > Re-reading the comments above, status seems to indicate whether status was Right - I was in two minds about using different statuses, or merging together the one from the RpcController and the one that we must provide somehow. I think this is the simplest way to pass both statuses, but let me know if you have an idea! I added a comment for now. Line 337: kudu::MonoDelta retry_interval = kudu::MonoDelta::FromMilliseconds(params->retry_interval_ms); > long line Done http://gerrit.cloudera.org:8080/#/c/7103/2/be/src/runtime/data-stream-sender.cc File be/src/runtime/data-stream-sender.cc: PS2, Line 203: auto cb = [self_ptr = weak_ptr(self_), : instance_id = fragment_instance_id_, proto_batch = batch] : (const Status& status, TransmitDataRequestPb* request, : TransmitDataResponsePb* response, RpcController* controller) { : : // Ensure that request and response get deleted when this callback returns. : auto request_container = unique_ptr(request); : auto response_container = unique_ptr(response); : : // Check if this channel still exists. : auto channel = self_ptr.lock(); : if (!channel) return; : { : lock_guard l(channel->lock_); : Status rpc_status = status.ok() ? FromKuduStatus(controller->status()) : status; : : int32_t status_code = response->status().status_code(); : channel->recvr_gone_ = status_code == TErrorCode::DATASTREAM_RECVR_ALREADY_GONE; : : if (!rpc_status.ok()) { : channel->last_rpc_status_ = rpc_status; : } else if (!channel->recvr_gone_) { : if (status_code != TErrorCode::OK) { : // Don't bubble up the 'receiver gone' status, because it's not an error. : channel->last_rpc_status_ = Status(response->status()); : } else { : int size = proto_batch->GetSize(); : channel->num_data_bytes_sent_.Add(size); : VLOG_ROW << "incremented #data_bytes_sent=" :<< channel->num_data_bytes_sent_.Load(); : } : } : channel->rpc_in_flight_ = false; : } : channel->rpc_done_cv_.notify_one(); : }; > I am no C++ expert so this question may be stupid: can we not write this as We could write this as a method, and use bind(), or we could create a struct with one method (this one) that captures the context upon construction. The latter is what a lambda compiles down to, and I prefer the syntax sugar a lambda gives you. The former uses bind(), which I am not a great fan of. In my experience, gdb handles this just fine, and the stack is IMHO cleaner than using bind() (I've broken in this method lots of times!). -- To view, visit http://gerrit.cloudera.org:8080/7103 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-MessageType: comment Gerrit-Change-Id: Ia66704be7a0a8162bb85556d07b583ec756c584b Gerrit-PatchSet: 2 Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-Owner: Henry Robinson Gerrit-Reviewer: Henry Robinson Gerrit-Reviewer: Michael Ho Gerrit-Reviewer: Sailesh Mukil Gerrit-HasComments: Yes
[Impala-ASF-CR] IMPALA-4856: Port data stream service to KRPC
Michael Ho has posted comments on this change. Change subject: IMPALA-4856: Port data stream service to KRPC .. Patch Set 2: (1 comment) http://gerrit.cloudera.org:8080/#/c/7103/2/be/src/rpc/rpc.h File be/src/rpc/rpc.h: PS2, Line 332: cb(Status::OK(), req, resp, controller_ptr); > Why do we pass Status::OK() for non-retryable error or after exceeding the Re-reading the comments above, status seems to indicate whether status was successfully attempted so it may be okay. The assumption is that cb will check for remote error from controller_ptr. -- To view, visit http://gerrit.cloudera.org:8080/7103 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-MessageType: comment Gerrit-Change-Id: Ia66704be7a0a8162bb85556d07b583ec756c584b Gerrit-PatchSet: 2 Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-Owner: Henry Robinson Gerrit-Reviewer: Henry Robinson Gerrit-Reviewer: Michael Ho Gerrit-Reviewer: Sailesh Mukil Gerrit-HasComments: Yes
[Impala-ASF-CR] IMPALA-4856: Port data stream service to KRPC
Michael Ho has posted comments on this change. Change subject: IMPALA-4856: Port data stream service to KRPC .. Patch Set 2: (11 comments) http://gerrit.cloudera.org:8080/#/c/7103/2/be/src/rpc/rpc.h File be/src/rpc/rpc.h: PS2, Line 126: 'cb' Can you please state the context in which 'cb' is called from (e.g. reactor thread) and also add the precaution caller should take when implementing 'cb' (e.g. no blocking etc) ? PS2, Line 133: aattempted typo PS2, Line 230: // /// Same below. PS2, Line 319: Retries Is there any way to write a be-test to exercise the retry path ? PS2, Line 322: auto cb_wrapper = [params = std::move(params), mgr, func, req, resp, : cb = std::move(cb), controller_ptr = controller.release(), num_attempts]() : mutable { An alternative to this lambda implementation would be to define a separate function and uses boost::bind() to stash the arguments, right ? PS2, Line 332: cb(Status::OK(), req, resp, controller_ptr); Why do we pass Status::OK() for non-retryable error or after exceeding the maximum number of retries ? Line 337: kudu::MonoDelta retry_interval = kudu::MonoDelta::FromMilliseconds(params->retry_interval_ms); long line http://gerrit.cloudera.org:8080/#/c/7103/1/be/src/runtime/data-stream-sender.cc File be/src/runtime/data-stream-sender.cc: PS1, Line 265: 10 Mind commenting above what 10 stands for ? PS1, Line 266: numeric_limits::max() Why is this not FLAGS_datastream_sender_timeout_ms ? http://gerrit.cloudera.org:8080/#/c/7103/2/be/src/runtime/data-stream-sender.cc File be/src/runtime/data-stream-sender.cc: PS2, Line 203: auto cb = [self_ptr = weak_ptr(self_), : instance_id = fragment_instance_id_, proto_batch = batch] : (const Status& status, TransmitDataRequestPb* request, : TransmitDataResponsePb* response, RpcController* controller) { : : // Ensure that request and response get deleted when this callback returns. : auto request_container = unique_ptr(request); : auto response_container = unique_ptr(response); : : // Check if this channel still exists. : auto channel = self_ptr.lock(); : if (!channel) return; : { : lock_guard l(channel->lock_); : Status rpc_status = status.ok() ? FromKuduStatus(controller->status()) : status; : : int32_t status_code = response->status().status_code(); : channel->recvr_gone_ = status_code == TErrorCode::DATASTREAM_RECVR_ALREADY_GONE; : : if (!rpc_status.ok()) { : channel->last_rpc_status_ = rpc_status; : } else if (!channel->recvr_gone_) { : if (status_code != TErrorCode::OK) { : // Don't bubble up the 'receiver gone' status, because it's not an error. : channel->last_rpc_status_ = Status(response->status()); : } else { : int size = proto_batch->GetSize(); : channel->num_data_bytes_sent_.Add(size); : VLOG_ROW << "incremented #data_bytes_sent=" :<< channel->num_data_bytes_sent_.Load(); : } : } : channel->rpc_in_flight_ = false; : } : channel->rpc_done_cv_.notify_one(); : }; I am no C++ expert so this question may be stupid: can we not write this as a lambda function ? I am not sure how well gdb can handle lambda functions when compiled with optimization and this callback seems important enough that one may want to inspect its states in a core dump if necessary. http://gerrit.cloudera.org:8080/#/c/7103/1/be/src/service/impala-internal-service.cc File be/src/service/impala-internal-service.cc: PS1, Line 63: DataStreamService nit: Just wondering why we didn't put DataStreamService in data-stream-service.cc ? -- To view, visit http://gerrit.cloudera.org:8080/7103 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-MessageType: comment Gerrit-Change-Id: Ia66704be7a0a8162bb85556d07b583ec756c584b Gerrit-PatchSet: 2 Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-Owner: Henry Robinson Gerrit-Reviewer: Henry Robinson Gerrit-Reviewer: Michael Ho Gerrit-Reviewer: Sailesh Mukil Gerrit-HasComments: Yes
[Impala-ASF-CR] IMPALA-4856: Port data stream service to KRPC
Henry Robinson has posted comments on this change. Change subject: IMPALA-4856: Port data stream service to KRPC .. Patch Set 1: (10 comments) http://gerrit.cloudera.org:8080/#/c/7103/1/be/src/runtime/data-stream-mgr.cc File be/src/runtime/data-stream-mgr.cc: Line 61: bool unused = false; > Doesn't the contract for FindRecvr() state that we need to hold 'lock_' bef Done Line 105: EarlySendersList waiters; > Add brief comment: Done PS1, Line 123: for (int32_t sender_id: waiters.closing_senders) recvr->RemoveSender(sender_id); > According to the header comment in data-stream-mgr.h, a sender shouldn't be Done PS1, Line 300: early_senders_ > Assume the following case: The sender fragment instance would fail, and then the coordinator should cancel the receiver. I believe there's an outstanding issue where, if the coordinator fails to cancel a fragment instance, the fragment instance will not fail itself. I'm going to file a JIRA for that, but it's unrelated to KRPC. Line 321: // Wait for 10s > Add a brief comment stating that this is to check if the DataStreamMgr is b Done http://gerrit.cloudera.org:8080/#/c/7103/1/be/src/runtime/data-stream-mgr.h File be/src/runtime/data-stream-mgr.h: PS1, Line 81: will do one of three things > nit: would be nice to format them as bullet points. Done PS1, Line 83: if the buffer is full > "if the batch queues are full"? Done PS1, Line 87: the sender > "the sender along with its payload" ? Done Line 224: /// has not yet prepared 'payload' is queued until it arrives, or is timed out. If the > nit: been prepared, Done http://gerrit.cloudera.org:8080/#/c/7103/1/be/src/service/impala-server.h File be/src/service/impala-server.h: PS1, Line 255: void UpdateFilter > Leave a TODO stating that this should move to query-state.h/cc after IMPALA I'm going to leave that for now, since I don't want to make design decisions for IMPALA-3825 in this patch. -- To view, visit http://gerrit.cloudera.org:8080/7103 To unsubscribe, visit http://gerrit.cloudera.org:8080/settings Gerrit-MessageType: comment Gerrit-Change-Id: Ia66704be7a0a8162bb85556d07b583ec756c584b Gerrit-PatchSet: 1 Gerrit-Project: Impala-ASF Gerrit-Branch: master Gerrit-Owner: Henry Robinson Gerrit-Reviewer: Henry Robinson Gerrit-Reviewer: Sailesh Mukil Gerrit-HasComments: Yes
[Impala-ASF-CR] IMPALA-4856: Port data stream service to KRPC
Henry Robinson has uploaded a new patch set (#2). Change subject: IMPALA-4856: Port data stream service to KRPC .. IMPALA-4856: Port data stream service to KRPC This patch ports the data-flow parts of ImpalaInternalService to KRPC. * ImpalaInternalService is split into two services. The first, ImpalaInternalService, deals with control messages for plan fragment instance execution, cancellation and reporting, and remains implemented in Thrift for now. The second, DataStreamService, handles large-payload RPCs for transmitting runtime filters and row batches between hosts. * In the DataStreamService, all RPCs use 'native' protobuf. The DataStreamService starts on the port previously reserved for the StatestoreSubscriberService (which is also a KRPC service), to avoid having to configure another port when starting Impala. When the ImpalaInternalService is ported to KRPC, all services will run on one port. * To support needing to address two different backend services, a data service port has been added to TBackendDescriptor. * This patch adds support for asynchronous RPCs to the RpcMgr and Rpc classes. Previously, Impala used fixed size thread pools + synchronous RPCs to achieve some parallelism for 'broadcast' RPCs like filter propagation, or a dedicated per-sender+receiver pair thread on the sender side in the DataStreamSender case. In this patch, the PublishFilter() and TransmitData() RPCs are sent asynchronously using KRPC's thread pools. * The TransmitData() protocol has changed to adapt to asynchronous RPCs. The full details are in data-stream-mgr.h. * As a result, DataStreamSender no longer creates a thread-per-connection on the sender side. * Both tuple transmission and runtime filter publication use sidecars to minimise the number of copies and serialization steps required. * Also include a fix for KUDU-2011 that properly allows sidecars to be shared between KRPC and the RPC caller (fixing IMPALA-5093, a corruption bug). * A large portion of this patch is the replacement of TRowBatch with its Protobuf equivalent, RowBatchPb. The replacement is a literal port of the data structure, and row-batch-test, row-batch-list-test and row-batch-serialize-benchmark continue to execute without major logic changes. * Simplify FindRecvr() logic in DataStreamManager. No-longer need to handle blocking sender-side, so no need for complex promise-based machinery. Instead, all senders with no receiver are added to a per-receiver list, which is processed when the receiver arrives. If it does not arrive promptly, the DataStreamManager cleans them up after FLAGS_datastream_sender_timeout_ms. * This patch also begins a clean-up of how ImpalaServer instances are created (by removing CreateImpalaServer), and clarifying the relationship between ExecEnv and ImpalaServer. ImpalaServer now follows the standard construct->Init()->Start()->Join() lifecycle that we use for other services. * Ensure that all addresses used for KRPCs are fully resolved, avoiding the need to resolve them for each RPC. TESTING --- * New tests added to rpc-mgr-test. TO DO - * Re-enable throughput and latency measurements per data-stream sender when that information is exposed from KRPC (KUDU-1738). * TLS and Kerberos are still not supported by KRPC in this patch.PART1 DSS Change-Id: Ia66704be7a0a8162bb85556d07b583ec756c584b --- M .clang-format M CMakeLists.txt M be/CMakeLists.txt M be/src/benchmarks/bloom-filter-benchmark.cc M be/src/benchmarks/row-batch-serialize-benchmark.cc M be/src/common/init.cc M be/src/common/status.cc M be/src/common/status.h M be/src/exprs/expr-test.cc M be/src/kudu/rpc/rpc_sidecar.cc M be/src/kudu/rpc/rpc_sidecar.h M be/src/rpc/CMakeLists.txt M be/src/rpc/TAcceptQueueServer.cpp M be/src/rpc/common.proto M be/src/rpc/rpc-mgr-test.cc M be/src/rpc/rpc-mgr.h M be/src/rpc/rpc-mgr.inline.h M be/src/rpc/rpc.h D be/src/runtime/backend-client.h M be/src/runtime/client-cache-types.h M be/src/runtime/coordinator-backend-state.cc M be/src/runtime/coordinator-backend-state.h M be/src/runtime/coordinator-filter-state.h M be/src/runtime/coordinator.cc M be/src/runtime/coordinator.h M be/src/runtime/data-stream-mgr.cc M be/src/runtime/data-stream-mgr.h M be/src/runtime/data-stream-recvr.cc M be/src/runtime/data-stream-recvr.h M be/src/runtime/data-stream-sender.cc M be/src/runtime/data-stream-sender.h M be/src/runtime/data-stream-test.cc M be/src/runtime/exec-env.cc M be/src/runtime/exec-env.h M be/src/runtime/fragment-instance-state.cc M be/src/runtime/fragment-instance-state.h M be/src/runtime/query-state.cc M be/src/runtime/query-state.h M be/src/runtime/row-batch-serialize-test.cc M be/src/runtime/row-batch.cc M be/src/runtime/row-batch.h M be/src/runtime/runtime-filter-bank.cc M be/src/runtime/runtime-filter-bank.h M be/src/runtime/runtime-state.cc M be/src/runtime/runtime
[Impala-ASF-CR] IMPALA-4856: Port data stream service to KRPC
Sailesh Mukil has posted comments on this change. Change subject: IMPALA-4856: Port data stream service to KRPC .. Patch Set 1: (12 comments) > This patch passes core, exhaustive and ASAN tests. It can execute > 32 concurrent streams of TPCDS-Q17 @ scale factor 3 on a > 138-node cluster with Kerberos enabled. (I don't believe the > previous implementation could do this effectively because of the > number of Thrift connections required). > > Some perf results from a 20-node cluster: > > ++--+---++-++---++-+---+ > | Workload | Query| File Format | Avg(s) | Base > Avg(s) | Delta(Avg) | StdDev(%) | Base StdDev(%) | Num Clients | > Iters | > ++--+---++-++---++-+---+ > | TPCH(_300) | TPCH-Q3 | parquet / none / none | 32.55 | 28.18 >| +15.51% | 4.71% | 1.17%| 1 | 3 > | > | TPCH(_300) | TPCH-Q13 | parquet / none / none | 24.43 | 22.21 >| +9.99% | 0.61% | 0.70%| 1 | 3 > | > | TPCH(_300) | TPCH-Q8 | parquet / none / none | 7.53 | 7.05 >| +6.69% | 1.70% | 2.09%| 1 | 3 > | > | TPCH(_300) | TPCH-Q22 | parquet / none / none | 6.35 | 6.04 >| +5.19% | 0.37% | 0.76%| 1 | 3 > | > | TPCH(_300) | TPCH-Q14 | parquet / none / none | 4.28 | 4.10 >| +4.36% | 0.03% | 0.73%| 1 | 3 > | > | TPCH(_300) | TPCH-Q15 | parquet / none / none | 3.53 | 3.41 >| +3.69% | 0.61% | 1.42%| 1 | 3 > | > | TPCH(_300) | TPCH-Q16 | parquet / none / none | 6.09 | 5.87 >| +3.63% | 0.15% | 1.78%| 1 | 3 > | > | TPCH(_300) | TPCH-Q11 | parquet / none / none | 1.73 | 1.70 >| +2.22% | 0.10% | 0.95%| 1 | 3 > | > | TPCH(_300) | TPCH-Q21 | parquet / none / none | 105.84 | 103.71 >| +2.06% | 0.57% | 0.44%| 1 | 3 > | > | TPCH(_300) | TPCH-Q9 | parquet / none / none | 30.76 | 30.46 >| +1.00% | 2.57% | 1.22%| 1 | 3 > | > | TPCH(_300) | TPCH-Q1 | parquet / none / none | 22.14 | 21.94 >| +0.91% | 0.81% | 0.86%| 1 | 3 > | > | TPCH(_300) | TPCH-Q4 | parquet / none / none | 5.09 | 5.05 >| +0.79% | 0.48% | 2.54%| 1 | 3 > | > | TPCH(_300) | TPCH-Q18 | parquet / none / none | 31.76 | 32.54 >| -2.39% | 0.44% | 0.03%| 1 | 3 > | > | TPCH(_300) | TPCH-Q2 | parquet / none / none | 1.98 | 2.04 >| -2.74% | 7.17% | 7.41%| 1 | 3 > | > | TPCH(_300) | TPCH-Q5 | parquet / none / none | 47.62 | 48.98 >| -2.79% | 0.51% | 0.16%| 1 | 3 > | > | TPCH(_300) | TPCH-Q20 | parquet / none / none | 3.18 | 3.27 >| -2.89% | 1.34% | 1.98%| 1 | 3 > | > | TPCH(_300) | TPCH-Q6 | parquet / none / none | 1.32 | 1.37 >| -3.72% | 0.03% | 4.00%| 1 | 3 > | > | TPCH(_300) | TPCH-Q10 | parquet / none / none | 9.00 | 9.48 >| -5.06% | 0.16% | 0.69%| 1 | 3 > | > | TPCH(_300) | TPCH-Q17 | parquet / none / none | 5.16 | 5.75 >| -10.18% | 6.44% | 2.63%| 1 | 3 > | > | TPCH(_300) | TPCH-Q12 | parquet / none / none | 3.01 | 3.39 >| -11.38% | 2.43% | 0.06%| 1 | 3 > | > | TPCH(_300) | TPCH-Q19 | parquet / none / none | 25.20 | 28.82 >| I -12.57% | 0.01% | 0.75%| 1 | 3 > | > | TPCH(_300) | TPCH-Q7 | parquet / none / none | 45.32 | 61.16 >| I -25.91% | 0.55% | 2.22%| 1 | 3 > | > ++--+---++-++---++-+---+ > > Primitives (note the significant regression in many_independent_fragments, > that needs further attention) > > +-++---++-++---++-+---+ > | Workload| Query >| File Format | Avg(s) | Base Avg(s) | > Delta(Avg) | StdDev(%) | Base StdDev(%) | Num Clients | Iters | > +-++---++-+-
[Impala-ASF-CR] IMPALA-4856: Port data stream service to KRPC
Henry Robinson has posted comments on this change. Change subject: IMPALA-4856: Port data stream service to KRPC .. Patch Set 1: This patch passes core, exhaustive and ASAN tests. It can execute 32 concurrent streams of TPCDS-Q17 @ scale factor 3 on a 138-node cluster with Kerberos enabled. (I don't believe the previous implementation could do this effectively because of the number of Thrift connections required). Some perf results from a 20-node cluster: ++--+---++-++---++-+---+ | Workload | Query| File Format | Avg(s) | Base Avg(s) | Delta(Avg) | StdDev(%) | Base StdDev(%) | Num Clients | Iters | ++--+---++-++---++-+---+ | TPCH(_300) | TPCH-Q3 | parquet / none / none | 32.55 | 28.18 | +15.51% | 4.71% | 1.17%| 1 | 3 | | TPCH(_300) | TPCH-Q13 | parquet / none / none | 24.43 | 22.21 | +9.99% | 0.61% | 0.70%| 1 | 3 | | TPCH(_300) | TPCH-Q8 | parquet / none / none | 7.53 | 7.05| +6.69% | 1.70% | 2.09%| 1 | 3 | | TPCH(_300) | TPCH-Q22 | parquet / none / none | 6.35 | 6.04| +5.19% | 0.37% | 0.76%| 1 | 3 | | TPCH(_300) | TPCH-Q14 | parquet / none / none | 4.28 | 4.10| +4.36% | 0.03% | 0.73%| 1 | 3 | | TPCH(_300) | TPCH-Q15 | parquet / none / none | 3.53 | 3.41| +3.69% | 0.61% | 1.42%| 1 | 3 | | TPCH(_300) | TPCH-Q16 | parquet / none / none | 6.09 | 5.87| +3.63% | 0.15% | 1.78%| 1 | 3 | | TPCH(_300) | TPCH-Q11 | parquet / none / none | 1.73 | 1.70| +2.22% | 0.10% | 0.95%| 1 | 3 | | TPCH(_300) | TPCH-Q21 | parquet / none / none | 105.84 | 103.71 | +2.06% | 0.57% | 0.44%| 1 | 3 | | TPCH(_300) | TPCH-Q9 | parquet / none / none | 30.76 | 30.46 | +1.00% | 2.57% | 1.22%| 1 | 3 | | TPCH(_300) | TPCH-Q1 | parquet / none / none | 22.14 | 21.94 | +0.91% | 0.81% | 0.86%| 1 | 3 | | TPCH(_300) | TPCH-Q4 | parquet / none / none | 5.09 | 5.05| +0.79% | 0.48% | 2.54%| 1 | 3 | | TPCH(_300) | TPCH-Q18 | parquet / none / none | 31.76 | 32.54 | -2.39% | 0.44% | 0.03%| 1 | 3 | | TPCH(_300) | TPCH-Q2 | parquet / none / none | 1.98 | 2.04| -2.74% | 7.17% | 7.41%| 1 | 3 | | TPCH(_300) | TPCH-Q5 | parquet / none / none | 47.62 | 48.98 | -2.79% | 0.51% | 0.16%| 1 | 3 | | TPCH(_300) | TPCH-Q20 | parquet / none / none | 3.18 | 3.27| -2.89% | 1.34% | 1.98%| 1 | 3 | | TPCH(_300) | TPCH-Q6 | parquet / none / none | 1.32 | 1.37| -3.72% | 0.03% | 4.00%| 1 | 3 | | TPCH(_300) | TPCH-Q10 | parquet / none / none | 9.00 | 9.48| -5.06% | 0.16% | 0.69%| 1 | 3 | | TPCH(_300) | TPCH-Q17 | parquet / none / none | 5.16 | 5.75| -10.18% | 6.44% | 2.63%| 1 | 3 | | TPCH(_300) | TPCH-Q12 | parquet / none / none | 3.01 | 3.39| -11.38% | 2.43% | 0.06%| 1 | 3 | | TPCH(_300) | TPCH-Q19 | parquet / none / none | 25.20 | 28.82 | I -12.57% | 0.01% | 0.75%| 1 | 3 | | TPCH(_300) | TPCH-Q7 | parquet / none / none | 45.32 | 61.16 | I -25.91% | 0.55% | 2.22%| 1 | 3 | ++--+---++-++---++-+---+ Primitives (note the significant regression in many_independent_fragments, that needs further attention) +-++---++-++---++-+---+ | Workload| Query | File Format | Avg(s) | Base Avg(s) | Delta(Avg) | StdDev(%) | Base StdDev(%) | Num Clients | Iters | +-++---++-++---++-+---+ | TARGETED-PERF(_300) | primitive_many_independent_fragments | parquet / none / none | 377.69 | 189.40 | R +99.42% | 0.32% | 0.22%| 1 |
[Impala-ASF-CR] IMPALA-4856: Port data stream service to KRPC
Henry Robinson has uploaded a new change for review. http://gerrit.cloudera.org:8080/7103 Change subject: IMPALA-4856: Port data stream service to KRPC .. IMPALA-4856: Port data stream service to KRPC This patch ports the data-flow parts of ImpalaInternalService to KRPC. * ImpalaInternalService is split into two services. The first, ImpalaInternalService, deals with control messages for plan fragment instance execution, cancellation and reporting, and remains implemented in Thrift for now. The second, DataStreamService, handles large-payload RPCs for transmitting runtime filters and row batches between hosts. * In the DataStreamService, all RPCs use 'native' protobuf. The DataStreamService starts on the port previously reserved for the StatestoreSubscriberService (which is also a KRPC service), to avoid having to configure another port when starting Impala. When the ImpalaInternalService is ported to KRPC, all services will run on one port. * To support needing to address two different backend services, a data service port has been added to TBackendDescriptor. * This patch adds support for asynchronous RPCs to the RpcMgr and Rpc classes. Previously, Impala used fixed size thread pools + synchronous RPCs to achieve some parallelism for 'broadcast' RPCs like filter propagation, or a dedicated per-sender+receiver pair thread on the sender side in the DataStreamSender case. In this patch, the PublishFilter() and TransmitData() RPCs are sent asynchronously using KRPC's thread pools. * The TransmitData() protocol has changed to adapt to asynchronous RPCs. The full details are in data-stream-mgr.h. * As a result, DataStreamSender no longer creates a thread-per-connection on the sender side. * Both tuple transmission and runtime filter publication use sidecars to minimise the number of copies and serialization steps required. * Also include a fix for KUDU-2011 that properly allows sidecars to be shared between KRPC and the RPC caller (fixing IMPALA-5093, a corruption bug). * A large portion of this patch is the replacement of TRowBatch with its Protobuf equivalent, RowBatchPb. The replacement is a literal port of the data structure, and row-batch-test, row-batch-list-test and row-batch-serialize-benchmark continue to execute without major logic changes. * Simplify FindRecvr() logic in DataStreamManager. No-longer need to handle blocking sender-side, so no need for complex promise-based machinery. Instead, all senders with no receiver are added to a per-receiver list, which is processed when the receiver arrives. If it does not arrive promptly, the DataStreamManager cleans them up after FLAGS_datastream_sender_timeout_ms. * This patch also begins a clean-up of how ImpalaServer instances are created (by removing CreateImpalaServer), and clarifying the relationship between ExecEnv and ImpalaServer. ImpalaServer now follows the standard construct->Init()->Start()->Join() lifecycle that we use for other services. * Ensure that all addresses used for KRPCs are fully resolved, avoiding the need to resolve them for each RPC. TESTING --- * New tests added to rpc-mgr-test. TO DO - * Re-enable throughput and latency measurements per data-stream sender when that information is exposed from KRPC (KUDU-1738). * TLS and Kerberos are still not supported by KRPC in this patch.PART1 DSS Change-Id: Ia66704be7a0a8162bb85556d07b583ec756c584b --- M .clang-format M CMakeLists.txt M be/CMakeLists.txt M be/src/benchmarks/bloom-filter-benchmark.cc M be/src/benchmarks/row-batch-serialize-benchmark.cc M be/src/common/init.cc M be/src/common/status.cc M be/src/common/status.h M be/src/exprs/expr-test.cc M be/src/kudu/rpc/rpc_sidecar.cc M be/src/kudu/rpc/rpc_sidecar.h M be/src/rpc/CMakeLists.txt M be/src/rpc/TAcceptQueueServer.cpp M be/src/rpc/common.proto M be/src/rpc/rpc-mgr-test.cc M be/src/rpc/rpc-mgr.h M be/src/rpc/rpc-mgr.inline.h M be/src/rpc/rpc.h D be/src/runtime/backend-client.h M be/src/runtime/client-cache-types.h M be/src/runtime/coordinator-backend-state.cc M be/src/runtime/coordinator-backend-state.h M be/src/runtime/coordinator-filter-state.h M be/src/runtime/coordinator.cc M be/src/runtime/coordinator.h M be/src/runtime/data-stream-mgr.cc M be/src/runtime/data-stream-mgr.h M be/src/runtime/data-stream-recvr.cc M be/src/runtime/data-stream-recvr.h M be/src/runtime/data-stream-sender.cc M be/src/runtime/data-stream-sender.h M be/src/runtime/data-stream-test.cc M be/src/runtime/exec-env.cc M be/src/runtime/exec-env.h M be/src/runtime/fragment-instance-state.cc M be/src/runtime/fragment-instance-state.h M be/src/runtime/query-state.cc M be/src/runtime/query-state.h M be/src/runtime/row-batch-serialize-test.cc M be/src/runtime/row-batch.cc M be/src/runtime/row-batch.h M be/src/runtime/runtime-filter-bank.cc M be/src/runtime/runtime-filter-bank.h M be/src/runtim