Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21346#discussion_r195819061
  
    --- Diff: 
common/network-common/src/test/java/org/apache/spark/network/RpcIntegrationSuite.java
 ---
    @@ -130,6 +200,59 @@ public void onFailure(Throwable e) {
         return res;
       }
     
    +  private RpcResult sendRpcWithStream(String... streams) throws Exception {
    +    TransportClient client = 
clientFactory.createClient(TestUtils.getLocalHost(), server.getPort());
    +    final Semaphore sem = new Semaphore(0);
    +    RpcResult res = new RpcResult();
    +    res.successMessages = Collections.synchronizedSet(new 
HashSet<String>());
    +    res.errorMessages = Collections.synchronizedSet(new HashSet<String>());
    +
    +    for (String stream : streams) {
    +      int idx = stream.lastIndexOf('/');
    +      ManagedBuffer meta = new 
NioManagedBuffer(JavaUtils.stringToBytes(stream));
    +      String streamName = (idx == -1) ? stream : stream.substring(idx + 1);
    +      ManagedBuffer data = testData.openStream(conf, streamName);
    +      client.uploadStream(meta, data, new RpcStreamCallback(stream, res, 
sem));
    +    }
    +
    +    if (!sem.tryAcquire(streams.length, 5, TimeUnit.SECONDS)) {
    +      fail("Timeout getting response from the server");
    +    }
    +    streamCallbacks.values().forEach(streamCallback -> {
    +      try {
    +        
streamCallback.waitForCompletionAndVerify(TimeUnit.SECONDS.toMillis(5));
    --- End diff --
    
    Isn't the wait part now redundant, after you waited for the semaphore?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to