Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21346#discussion_r195588878
  
    --- Diff: 
common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java
 ---
    @@ -203,6 +197,76 @@ public void onFailure(Throwable e) {
         }
       }
     
    +  /**
    +   * Handle a request from the client to upload a stream of data.
    +   */
    +  private void processStreamUpload(final UploadStream req) {
    +    assert (req.body() == null);
    +    try {
    +      RpcResponseCallback callback = new RpcResponseCallback() {
    +        @Override
    +        public void onSuccess(ByteBuffer response) {
    +          respond(new RpcResponse(req.requestId, new 
NioManagedBuffer(response)));
    +        }
    +
    +        @Override
    +        public void onFailure(Throwable e) {
    +          respond(new RpcFailure(req.requestId, 
Throwables.getStackTraceAsString(e)));
    +        }
    +      };
    +      TransportFrameDecoder frameDecoder = (TransportFrameDecoder)
    +          channel.pipeline().get(TransportFrameDecoder.HANDLER_NAME);
    +      ByteBuffer meta = req.meta.nioByteBuffer();
    +      StreamCallbackWithID streamHandler = 
rpcHandler.receiveStream(reverseClient, meta, callback);
    +      StreamCallbackWithID wrappedCallback = new StreamCallbackWithID() {
    +        @Override
    +        public void onData(String streamId, ByteBuffer buf) throws 
IOException {
    +          streamHandler.onData(streamId, buf);
    +        }
    +
    +        @Override
    +        public void onComplete(String streamId) throws IOException {
    +           try {
    +             streamHandler.onComplete(streamId);
    +             callback.onSuccess(ByteBuffer.allocate(0));
    +           } catch (Exception ex) {
    +             IOException ioExc = new IOException("Failure post-processing 
complete stream;" +
    +               " failing this rpc and leaving channel active");
    +             callback.onFailure(ioExc);
    +             streamHandler.onFailure(streamId, ioExc);
    +           }
    +        }
    +
    +        @Override
    +        public void onFailure(String streamId, Throwable cause) throws 
IOException {
    +          callback.onFailure(new IOException("Destination failed while 
reading stream", cause));
    +          streamHandler.onFailure(streamId, cause);
    +        }
    +
    +        @Override
    +        public String getID() {
    +          return streamHandler.getID();
    +        }
    +      };
    +      if (req.bodyByteCount > 0) {
    +        StreamInterceptor interceptor = new StreamInterceptor(this, 
wrappedCallback.getID(),
    +          req.bodyByteCount, wrappedCallback);
    +        frameDecoder.setInterceptor(interceptor);
    +      } else {
    +        wrappedCallback.onComplete(wrappedCallback.getID());
    +      }
    +    } catch (Exception e) {
    +      logger.error("Error while invoking RpcHandler#receive() on RPC id " 
+ req.requestId, e);
    +      respond(new RpcFailure(req.requestId, 
Throwables.getStackTraceAsString(e)));
    +      // We choose to totally fail the channel, rather than trying to 
recover as we do in other
    +      // cases.  We don't know how many bytes of the stream the client has 
already sent for the
    +      // stream, its not worth trying to recover.
    --- End diff --
    
    it's


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to