Github user witgo commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21346#discussion_r195284967
  
    --- Diff: 
common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java
 ---
    @@ -220,30 +196,91 @@ public long sendRpc(ByteBuffer message, 
RpcResponseCallback callback) {
         handler.addRpcRequest(requestId, callback);
     
         channel.writeAndFlush(new RpcRequest(requestId, new 
NioManagedBuffer(message)))
    -        .addListener(future -> {
    -          if (future.isSuccess()) {
    -            long timeTaken = System.currentTimeMillis() - startTime;
    -            if (logger.isTraceEnabled()) {
    -              logger.trace("Sending request {} to {} took {} ms", 
requestId,
    -                getRemoteAddress(channel), timeTaken);
    -            }
    -          } else {
    -            String errorMsg = String.format("Failed to send RPC %s to %s: 
%s", requestId,
    -              getRemoteAddress(channel), future.cause());
    -            logger.error(errorMsg, future.cause());
    -            handler.removeRpcRequest(requestId);
    -            channel.close();
    -            try {
    -              callback.onFailure(new IOException(errorMsg, 
future.cause()));
    -            } catch (Exception e) {
    -              logger.error("Uncaught exception in RPC response callback 
handler!", e);
    -            }
    -          }
    -        });
    +      .addListener(new RpcChannelListener(startTime, requestId, callback));
    +
    +    return requestId;
    +  }
    +
    +  /**
    +   * Send data to the remote end as a stream.   This differs from stream() 
in that this is a request
    +   * to *send* data to the remote end, not to receive it from the remote.
    +   *
    +   * @param meta meta data associated with the stream, which will be read 
completely on the
    +   *             receiving end before the stream itself.
    +   * @param data this will be streamed to the remote end to allow for 
transferring large amounts
    +   *             of data without reading into memory.
    +   * @param callback handles the reply -- onSuccess will only be called 
when both message and data
    +   *                 are received successfully.
    +   */
    +  public long uploadStream(
    +      ManagedBuffer meta,
    +      ManagedBuffer data,
    +      RpcResponseCallback callback) {
    +    long startTime = System.currentTimeMillis();
    +    if (logger.isTraceEnabled()) {
    +      logger.trace("Sending RPC to {}", getRemoteAddress(channel));
    +    }
    +
    +    long requestId = Math.abs(UUID.randomUUID().getLeastSignificantBits());
    --- End diff --
    
    This  `Math.abs(UUID.randomUUID().getLeastSignificantBits());` is repeated 
twice. Move it to a separate new method .


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to