Github user sohami commented on a diff in the pull request:

    https://github.com/apache/drill/pull/809#discussion_r115614854
  
    --- Diff: contrib/native/client/src/clientlib/drillClientImpl.cpp ---
    @@ -854,75 +990,328 @@ void DrillClientImpl::waitForResults(){
         }
     }
     
    -status_t DrillClientImpl::readMsg(ByteBuf_t _buf,
    -        AllocatedBufferPtr* allocatedBuffer,
    +/*
    + *  Decode the length of the message from bufWithLen and then read entire 
message from the socket.
    + *  Parameters:
    + *      bufWithLen          - in param  - buffer containing the bytes 
which has length of the RPC message/encrypted chunk
    + *      bufferWithLenBytes  - out param - buffer pointer which points to 
memory allocated in this function and has the
    + *                                        entire one RPC message / 
encrypted chunk along with the length of the message
    + *      lengthBytesLength   - out param - bytes of bufWithLen which 
contains the length of the entire RPC message or
    + *                                        encrypted chunk
    + *      lengthDecodeHandler - in param  - function pointer with length 
decoder to use. For encrypted chunk we use
    + *                                        lengthDecode and for plain RPC 
message we use rpcLengthDecode.
    + *  Return:
    + *      status_t    - QRY_SUCCESS    - In case of success.
    + *                  - 
QRY_COMM_ERROR/QRY_INTERNAL_ERROR/QRY_CLIENT_OUTOFMEM - In cases of error.
    + */
    +status_t DrillClientImpl::readLenBytesFromSocket(ByteBuf_t bufWithLen, 
AllocatedBufferPtr* bufferWithLenBytes,
    +           uint32_t& lengthBytesLength, lengthDecoder lengthDecodeHandler) 
{
    +
    +    uint32_t rmsgLen = 0;
    +    size_t bytes_read = 0;
    +    size_t leftover = 0;
    +    boost::system::error_code error;
    +    *bufferWithLenBytes = NULL;
    +    size_t bufferWithLenBytesSize = 0;
    +
    +    bytes_read = (this->*lengthDecodeHandler)(bufWithLen, rmsgLen);
    +    lengthBytesLength = bytes_read;
    +
    +    DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Length bytes = " << bytes_read 
<< std::endl;)
    +    DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Msg Length = " << rmsgLen << 
std::endl;)
    +
    +    if(rmsgLen>0){
    +        leftover = LEN_PREFIX_BUFLEN - bytes_read;
    +
    +        // Allocate a buffer for reading all the bytes in bufWithLen and 
length number of bytes
    +           bufferWithLenBytesSize = rmsgLen + bytes_read;
    +        *bufferWithLenBytes = new AllocatedBuffer(bufferWithLenBytesSize);
    +
    +        if(*bufferWithLenBytes == NULL){
    +            return handleQryError(QRY_CLIENT_OUTOFMEM, 
getMessage(ERR_QRY_OUTOFMEM), NULL);
    +        }
    +
    +        DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << 
"DrillClientImpl::readLenBytesFromSocket: Allocated and locked buffer: [ "
    +                                          << *bufferWithLenBytes << ", 
size = " << bufferWithLenBytesSize << " ]\n";)
    +
    +        // Copy the memory of bufWithLen into bufferWithLenBytesSize
    +        memcpy((*bufferWithLenBytes)->m_pBuffer, bufWithLen, 
LEN_PREFIX_BUFLEN);
    +
    +        DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Copied bufWithLen into 
bufferWithLenBytes. "
    +                                          << "Now reading data (rmsgLen - 
leftover) : " << (rmsgLen - leftover)
    +                                          << std::endl;)
    +
    +        // Read the entire data left from socket and copy to currentBuffer.
    +        ByteBuf_t b = (*bufferWithLenBytes)->m_pBuffer + LEN_PREFIX_BUFLEN;
    +        size_t bytesToRead = rmsgLen - leftover;
    +
    +        while(1){
    +            bytes_read = this->m_socket.read_some(boost::asio::buffer(b, 
bytesToRead), error);
    +            if(error) break;
    +            DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Data Message: actual 
bytes read = " << bytes_read << std::endl;)
    +            if(bytes_read == bytesToRead) break;
    +            bytesToRead -= bytes_read;
    +            b += bytes_read;
    +        }
    +    } else {
    +        return handleQryError(QRY_INTERNAL_ERROR, 
getMessage(ERR_QRY_INVREADLEN), NULL);
    +    }
    +
    +    return error ? handleQryError(QRY_COMM_ERROR, 
getMessage(ERR_QRY_COMMERR, error.message().c_str()), NULL)
    +                 : QRY_SUCCESS;
    +}
    +
    +
    +/*
    + *  Function to read entire RPC message from socket and decode it to 
InboundRpcMessage
    + *  Parameters:
    + *      _buf            - in param  - Buffer containing the length bytes.
    + *      allocatedBuffer - out param - Buffer containing the length bytes 
and entire RPC message bytes.
    + *      msg             - out param - Decoded InBoundRpcMessage from the 
bytes in allocatedBuffer
    + *  Return:
    + *      status_t    - QRY_SUCCESS   - In case of success.
    + *                  - 
QRY_COMM_ERROR/QRY_INTERNAL_ERROR/QRY_CLIENT_OUTOFMEM - In cases of error.
    + */
    +status_t DrillClientImpl::readMsg(ByteBuf_t _buf, AllocatedBufferPtr* 
allocatedBuffer,
             rpc::InBoundRpcMessage& msg){
     
         DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::readMsg: Read 
message from buffer "
    -        <<  reinterpret_cast<int*>(_buf) << std::endl;)
    -    size_t leftover=0;
    -    uint32_t rmsgLen;
    -    AllocatedBufferPtr currentBuffer;
    -    *allocatedBuffer=NULL;
    +                                      <<  reinterpret_cast<int*>(_buf) << 
std::endl;)
    +    *allocatedBuffer = NULL;
         {
             // We need to protect the readLength and read buffer, and the 
pending requests counter,
             // but we don't have to keep the lock while we decode the rest of 
the buffer.
             boost::lock_guard<boost::mutex> lock(this->m_dcMutex);
    -        std::size_t bytes_read = rpc::lengthDecode(_buf, rmsgLen);
    -        DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "len bytes = " << bytes_read 
<< std::endl;)
    -        DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "rmsgLen = " << rmsgLen << 
std::endl;)
    -
    -        if(rmsgLen>0){
    -            leftover = LEN_PREFIX_BUFLEN - bytes_read;
    -            // Allocate a buffer
    -            currentBuffer=new AllocatedBuffer(rmsgLen);
    -            DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << 
"DrillClientImpl::readMsg: Allocated and locked buffer: [ "
    -                << currentBuffer << ", size = " << rmsgLen << " ]\n";)
    -            if(currentBuffer==NULL){
    +        uint32_t lengthFieldSize = 0;
    +
    +        // Read the message length and extract length size bytes to form 
InBoundRpcMessage
    +        status_t statusCode = readLenBytesFromSocket(_buf, 
allocatedBuffer, lengthFieldSize, &DrillClientImpl::rpcLengthDecode);
    +
    +        // Check for error conditions
    +        if(QRY_SUCCESS != statusCode) {
    +            Utils::freeBuffer(_buf, LEN_PREFIX_BUFLEN);
    +            return statusCode;
    +        }
    +
    +        // Get the message size
    +        size_t msgLen = (*allocatedBuffer)->m_bufSize;
    +
    +        // Read data successfully, now let's try to decode the buffer and 
form a valid RPC message.
    +        // allocatedBuffer also contains the length bytes which is not 
needed by decodes so skip that part of buffer.
    +        // We have it since in case of encryption the unwrap function 
expects it
    +        if (!decode((*allocatedBuffer)->m_pBuffer + lengthFieldSize, 
msgLen - lengthFieldSize, msg)) {
    +            Utils::freeBuffer(_buf, LEN_PREFIX_BUFLEN);
    +            return handleQryError(QRY_COMM_ERROR, 
getMessage(ERR_QRY_COMMERR, "Cannot decode server message"), NULL);
    +        }
    +
    +        DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Successfully created a RPC 
message with Coordination id: "
    +                                          << msg.m_coord_id << std::endl;)
    +    }
    +    DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::readMsg: Free 
buffer "
    +                                      <<  reinterpret_cast<int*>(_buf) << 
std::endl;)
    +    Utils::freeBuffer(_buf, LEN_PREFIX_BUFLEN);
    +    return QRY_SUCCESS;
    +}
    +
    +
    +/*
    + *  Read ENCRYPT_LEN_PREFIX_BUFLEN bytes to decode length of one complete 
encrypted chunk. The length bytes are expected
    + *  to be in network order. It is converted to host order and the value is 
stored in rmsgLen parameter.
    + *  Parameters:
    + *      _buf    - in param  - ByteBuf_t containing atleast the length 
bytes.
    + *      rmsgLen - out param - Contain the decoded value of length.
    + *  Return:
    + *      size_t  - length bytes read to decode
    + */
    +size_t DrillClientImpl::lengthDecode(const ByteBuf_t _buf, uint32_t& 
rmsgLen) {
    +    memcpy(&rmsgLen, _buf, ENCRYPT_LEN_PREFIX_BUFLEN);
    +    rmsgLen = ntohl(rmsgLen);
    +    return ENCRYPT_LEN_PREFIX_BUFLEN;
    +}
    +
    +/*
    + *  Wrapper which uses RPC message length decoder to get length of one 
complete RPC message from _buf.
    + *  Parameters:
    + *      _buf    - in param  - ByteBuf_t containing atleast the length 
bytes.
    + *      rmsgLen - out param - Contain the decoded value of length.
    + *  Return:
    + *      size_t     - length bytes read to decode
    + */
    +size_t DrillClientImpl::rpcLengthDecode(const ByteBuf_t _buf, uint32_t& 
rmsgLen) {
    +    return rpc::lengthDecode(_buf, rmsgLen);
    +}
    +
    +
    +/*
    + *  Read all the encrypted chunk needed to form a complete RPC message. 
Read an entire chunk from network, decrypt it
    + *  and put in a buffer. The same process is repeated until the entire 
buffer to form a completed RPC message is read.
    + *  Parameters:
    + *      _buf            - in param  - ByteBuf_t containing atleast the 
length bytes.
    + *      allocatedBuffer - out param - Buffer containing the entire RPC 
message bytes which is formed by reading all the
    + *                                    required encrypted chunk from 
network and decrypting each individual chunk.
    +.*      msg             - out param - InBoundRpcMessage formed from bytes 
in allocatedBuffer
    + *  Return:
    + *      status_t    - QRY_SUCCESS - In case of success.
    + *                  - 
QRY_COMM_ERROR/QRY_INTERNAL_ERROR/QRY_CLIENT_OUTOFMEM - In cases of error.
    + */
    +status_t DrillClientImpl::readAndDecryptMsg(
    +        ByteBuf_t _buf,
    +        AllocatedBufferPtr* allocatedBuffer,
    +        rpc::InBoundRpcMessage& msg) {
    +
    +    DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << 
"DrillClientImpl::readAndDecryptMsg: Read message from buffer "
    +                                      << reinterpret_cast<int*>(_buf) << 
std::endl;)
    +
    +    size_t leftover = 0;
    +    uint32_t rpcMsgLen = 0;
    +    size_t bytes_read = 0;
    +    uint32_t writeIndex = 0;
    +    size_t bytesToRead = 0;
    +    uint32_t unWrappedLen = 0;
    +
    +    AllocatedBufferPtr currentBuffer = NULL;
    +    *allocatedBuffer = NULL;
    +    const char* unWrappedData = NULL;
    +    boost::system::error_code error;
    +    std::stringstream errorMsg;
    +
    +    {
    +        // We need to protect the readLength and read buffer, and the 
pending requests counter,
    +        // but we don't have to keep the lock while we decode the rest of 
the buffer.
    +        boost::lock_guard<boost::mutex> lock(this->m_dcMutex);
    --- End diff --
    
    Same as above. the decode referenced here is not unwrap but decode function 
below which creates a valid RpcMsg.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to