[GitHub] spark pull request #21346: [SPARK-6237][NETWORK] Network-layer changes to al...

2018-06-26 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/21346


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21346: [SPARK-6237][NETWORK] Network-layer changes to al...

2018-06-26 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/21346#discussion_r198236456
  
--- Diff: 
common/network-common/src/test/java/org/apache/spark/network/RpcIntegrationSuite.java
 ---
@@ -207,9 +400,67 @@ private void assertErrorsContain(Set errors, 
Set contains) {
   break;
 }
   }
-  assertTrue("Could not find error containing " + contain + "; errors: 
" + errors, foundMatch);
+  if (!foundMatch) {
+notFound.add(contain);
+  }
+}
+return new ImmutablePair<>(remainingErrors, notFound);
+  }
+
+  private static class VerifyingStreamCallback implements 
StreamCallbackWithID {
+final String streamId;
+final StreamSuite.TestCallback helper;
+final OutputStream out;
+final File outFile;
+VerifyingStreamCallback(String streamId) throws IOException {
--- End diff --

whoops, sorry I missed this one.  fixed now


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21346: [SPARK-6237][NETWORK] Network-layer changes to al...

2018-06-26 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/21346#discussion_r198233111
  
--- Diff: 
common/network-common/src/test/java/org/apache/spark/network/RpcIntegrationSuite.java
 ---
@@ -207,9 +400,67 @@ private void assertErrorsContain(Set errors, 
Set contains) {
   break;
 }
   }
-  assertTrue("Could not find error containing " + contain + "; errors: 
" + errors, foundMatch);
+  if (!foundMatch) {
+notFound.add(contain);
+  }
+}
+return new ImmutablePair<>(remainingErrors, notFound);
+  }
+
+  private static class VerifyingStreamCallback implements 
StreamCallbackWithID {
+final String streamId;
+final StreamSuite.TestCallback helper;
+final OutputStream out;
+final File outFile;
+VerifyingStreamCallback(String streamId) throws IOException {
--- End diff --

ping


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21346: [SPARK-6237][NETWORK] Network-layer changes to al...

2018-06-19 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/21346#discussion_r196558660
  
--- Diff: 
common/network-common/src/test/java/org/apache/spark/network/RpcIntegrationSuite.java
 ---
@@ -130,6 +200,59 @@ public void onFailure(Throwable e) {
 return res;
   }
 
+  private RpcResult sendRpcWithStream(String... streams) throws Exception {
+TransportClient client = 
clientFactory.createClient(TestUtils.getLocalHost(), server.getPort());
+final Semaphore sem = new Semaphore(0);
+RpcResult res = new RpcResult();
+res.successMessages = Collections.synchronizedSet(new 
HashSet());
+res.errorMessages = Collections.synchronizedSet(new HashSet());
+
+for (String stream : streams) {
+  int idx = stream.lastIndexOf('/');
+  ManagedBuffer meta = new 
NioManagedBuffer(JavaUtils.stringToBytes(stream));
+  String streamName = (idx == -1) ? stream : stream.substring(idx + 1);
+  ManagedBuffer data = testData.openStream(conf, streamName);
+  client.uploadStream(meta, data, new RpcStreamCallback(stream, res, 
sem));
+}
+
+if (!sem.tryAcquire(streams.length, 5, TimeUnit.SECONDS)) {
+  fail("Timeout getting response from the server");
+}
+streamCallbacks.values().forEach(streamCallback -> {
+  try {
+
streamCallback.waitForCompletionAndVerify(TimeUnit.SECONDS.toMillis(5));
+  } catch (IOException e) {
--- End diff --

`forEach` doesn't like the IOException


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21346: [SPARK-6237][NETWORK] Network-layer changes to al...

2018-06-15 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/21346#discussion_r195819256
  
--- Diff: 
common/network-common/src/test/java/org/apache/spark/network/RpcIntegrationSuite.java
 ---
@@ -130,6 +200,59 @@ public void onFailure(Throwable e) {
 return res;
   }
 
+  private RpcResult sendRpcWithStream(String... streams) throws Exception {
+TransportClient client = 
clientFactory.createClient(TestUtils.getLocalHost(), server.getPort());
+final Semaphore sem = new Semaphore(0);
+RpcResult res = new RpcResult();
+res.successMessages = Collections.synchronizedSet(new 
HashSet());
+res.errorMessages = Collections.synchronizedSet(new HashSet());
+
+for (String stream : streams) {
+  int idx = stream.lastIndexOf('/');
+  ManagedBuffer meta = new 
NioManagedBuffer(JavaUtils.stringToBytes(stream));
+  String streamName = (idx == -1) ? stream : stream.substring(idx + 1);
+  ManagedBuffer data = testData.openStream(conf, streamName);
+  client.uploadStream(meta, data, new RpcStreamCallback(stream, res, 
sem));
+}
+
+if (!sem.tryAcquire(streams.length, 5, TimeUnit.SECONDS)) {
+  fail("Timeout getting response from the server");
+}
+streamCallbacks.values().forEach(streamCallback -> {
+  try {
+
streamCallback.waitForCompletionAndVerify(TimeUnit.SECONDS.toMillis(5));
+  } catch (IOException e) {
--- End diff --

Method throws `Exception`, so this seems unnecessary.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21346: [SPARK-6237][NETWORK] Network-layer changes to al...

2018-06-15 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/21346#discussion_r195819061
  
--- Diff: 
common/network-common/src/test/java/org/apache/spark/network/RpcIntegrationSuite.java
 ---
@@ -130,6 +200,59 @@ public void onFailure(Throwable e) {
 return res;
   }
 
+  private RpcResult sendRpcWithStream(String... streams) throws Exception {
+TransportClient client = 
clientFactory.createClient(TestUtils.getLocalHost(), server.getPort());
+final Semaphore sem = new Semaphore(0);
+RpcResult res = new RpcResult();
+res.successMessages = Collections.synchronizedSet(new 
HashSet());
+res.errorMessages = Collections.synchronizedSet(new HashSet());
+
+for (String stream : streams) {
+  int idx = stream.lastIndexOf('/');
+  ManagedBuffer meta = new 
NioManagedBuffer(JavaUtils.stringToBytes(stream));
+  String streamName = (idx == -1) ? stream : stream.substring(idx + 1);
+  ManagedBuffer data = testData.openStream(conf, streamName);
+  client.uploadStream(meta, data, new RpcStreamCallback(stream, res, 
sem));
+}
+
+if (!sem.tryAcquire(streams.length, 5, TimeUnit.SECONDS)) {
+  fail("Timeout getting response from the server");
+}
+streamCallbacks.values().forEach(streamCallback -> {
+  try {
+
streamCallback.waitForCompletionAndVerify(TimeUnit.SECONDS.toMillis(5));
--- End diff --

Isn't the wait part now redundant, after you waited for the semaphore?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21346: [SPARK-6237][NETWORK] Network-layer changes to al...

2018-06-15 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/21346#discussion_r195796301
  
--- Diff: 
common/network-common/src/main/java/org/apache/spark/network/protocol/UploadStream.java
 ---
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.protocol;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import com.google.common.base.Objects;
+import io.netty.buffer.ByteBuf;
+
+import org.apache.spark.network.buffer.ManagedBuffer;
+import org.apache.spark.network.buffer.NettyManagedBuffer;
+
+/**
+ * An RPC with data that is sent outside of the frame, so it can be read 
as a stream.
+ */
+public final class UploadStream extends AbstractMessage implements 
RequestMessage {
--- End diff --

perhaps, but do you think that is really that useful?  the handling of them 
is different (both in the network layer and the outer RpcHandler).  And other 
things being equal, I'm biased to fewer changes to existing code paths.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21346: [SPARK-6237][NETWORK] Network-layer changes to al...

2018-06-15 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/21346#discussion_r195795803
  
--- Diff: 
common/network-common/src/test/java/org/apache/spark/network/RpcIntegrationSuite.java
 ---
@@ -130,6 +200,60 @@ public void onFailure(Throwable e) {
 return res;
   }
 
+  private RpcResult sendRpcWithStream(String... streams) throws Exception {
+TransportClient client = 
clientFactory.createClient(TestUtils.getLocalHost(), server.getPort());
+final Semaphore sem = new Semaphore(0);
+RpcResult res = new RpcResult();
+res.successMessages = Collections.synchronizedSet(new 
HashSet());
+res.errorMessages = Collections.synchronizedSet(new HashSet());
+
+for (String stream : streams) {
+  int idx = stream.lastIndexOf('/');
+  ManagedBuffer meta = new 
NioManagedBuffer(JavaUtils.stringToBytes(stream));
+  String streamName = (idx == -1) ? stream : stream.substring(idx + 1);
+  ManagedBuffer data = testData.openStream(conf, streamName);
+  client.uploadStream(meta, data, new RpcStreamCallback(stream, res, 
sem));
+}
+streamCallbacks.values().forEach(streamCallback -> {
--- End diff --

oh great point, thanks for catching that.  I will move this after the 
semaphore check, that will ensure that everything has been added to 
`streamCallbacks`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21346: [SPARK-6237][NETWORK] Network-layer changes to al...

2018-06-15 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/21346#discussion_r195788253
  
--- Diff: 
common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java
 ---
@@ -216,34 +192,99 @@ public long sendRpc(ByteBuffer message, 
RpcResponseCallback callback) {
   logger.trace("Sending RPC to {}", getRemoteAddress(channel));
 }
 
-long requestId = Math.abs(UUID.randomUUID().getLeastSignificantBits());
+long requestId = requestId();
 handler.addRpcRequest(requestId, callback);
 
 channel.writeAndFlush(new RpcRequest(requestId, new 
NioManagedBuffer(message)))
-.addListener(future -> {
-  if (future.isSuccess()) {
-long timeTaken = System.currentTimeMillis() - startTime;
-if (logger.isTraceEnabled()) {
-  logger.trace("Sending request {} to {} took {} ms", 
requestId,
-getRemoteAddress(channel), timeTaken);
-}
-  } else {
-String errorMsg = String.format("Failed to send RPC %s to %s: 
%s", requestId,
-  getRemoteAddress(channel), future.cause());
-logger.error(errorMsg, future.cause());
-handler.removeRpcRequest(requestId);
-channel.close();
-try {
-  callback.onFailure(new IOException(errorMsg, 
future.cause()));
-} catch (Exception e) {
-  logger.error("Uncaught exception in RPC response callback 
handler!", e);
-}
-  }
-});
+  .addListener(new RpcChannelListener(startTime, requestId, callback));
 
 return requestId;
   }
 
+  /**
+   * Send data to the remote end as a stream.   This differs from stream() 
in that this is a request
+   * to *send* data to the remote end, not to receive it from the remote.
+   *
+   * @param meta meta data associated with the stream, which will be read 
completely on the
+   * receiving end before the stream itself.
+   * @param data this will be streamed to the remote end to allow for 
transferring large amounts
+   * of data without reading into memory.
+   * @param callback handles the reply -- onSuccess will only be called 
when both message and data
+   * are received successfully.
+   */
+  public long uploadStream(
+  ManagedBuffer meta,
+  ManagedBuffer data,
+  RpcResponseCallback callback) {
+long startTime = System.currentTimeMillis();
--- End diff --

I didn't do that the originally as I figured you wanted the startTime to be 
before `writeAndFlush`, but I can work around that too.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21346: [SPARK-6237][NETWORK] Network-layer changes to al...

2018-06-14 Thread witgo
Github user witgo commented on a diff in the pull request:

https://github.com/apache/spark/pull/21346#discussion_r195287202
  
--- Diff: 
common/network-common/src/main/java/org/apache/spark/network/protocol/UploadStream.java
 ---
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.protocol;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import com.google.common.base.Objects;
+import io.netty.buffer.ByteBuf;
+
+import org.apache.spark.network.buffer.ManagedBuffer;
+import org.apache.spark.network.buffer.NettyManagedBuffer;
+
+/**
+ * An RPC with data that is sent outside of the frame, so it can be read 
as a stream.
+ */
+public final class UploadStream extends AbstractMessage implements 
RequestMessage {
--- End diff --

Is it possible to merge UploadStream and RpcRequest into a class?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21346: [SPARK-6237][NETWORK] Network-layer changes to al...

2018-06-14 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/21346#discussion_r195592355
  
--- Diff: 
common/network-common/src/test/java/org/apache/spark/network/StreamTestHelper.java
 ---
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.nio.ByteBuffer;
+import java.util.Random;
+
+import com.google.common.io.Files;
+
+import org.apache.spark.network.buffer.FileSegmentManagedBuffer;
+import org.apache.spark.network.buffer.ManagedBuffer;
+import org.apache.spark.network.buffer.NioManagedBuffer;
+import org.apache.spark.network.util.TransportConf;
+
+class StreamTestHelper {
+  static final String[] STREAMS = { "largeBuffer", "smallBuffer", 
"emptyBuffer", "file" };
+
+  final File testFile;
+  File tempDir;
+
+  ByteBuffer emptyBuffer;
+  ByteBuffer smallBuffer;
+  ByteBuffer largeBuffer;
+
+  private static ByteBuffer createBuffer(int bufSize) {
+ByteBuffer buf = ByteBuffer.allocate(bufSize);
+for (int i = 0; i < bufSize; i ++) {
+  buf.put((byte) i);
+}
+buf.flip();
+return buf;
+  }
+
+  StreamTestHelper() throws Exception {
+tempDir = Files.createTempDir();
+emptyBuffer = createBuffer(0);
+smallBuffer = createBuffer(100);
+largeBuffer = createBuffer(10);
+
+testFile = File.createTempFile("stream-test-file", "txt", tempDir);
+FileOutputStream fp = new FileOutputStream(testFile);
+try {
+  Random rnd = new Random();
+  for (int i = 0; i < 512; i++) {
+byte[] fileContent = new byte[1024];
+rnd.nextBytes(fileContent);
+fp.write(fileContent);
+  }
+} finally {
+  fp.close();
+}
+  }
+
+  public ByteBuffer srcBuffer(String name) {
+switch (name) {
+  case "largeBuffer":
+return largeBuffer;
+  case "smallBuffer":
+return smallBuffer;
+  case "emptyBuffer":
+return emptyBuffer;
+  default:
+throw new IllegalArgumentException("Invalid stream: " + name);
+}
+  }
+
+  public ManagedBuffer openStream(TransportConf conf, String streamId) {
+switch (streamId) {
+  case "file":
+return new FileSegmentManagedBuffer(conf, testFile, 0, 
testFile.length());
+  default:
+return new NioManagedBuffer(srcBuffer(streamId));
+}
+  }
+
+
+  void cleanup() {
+if (tempDir != null) {
+  for (File f : tempDir.listFiles()) {
--- End diff --

`JavaUtils.deleteRecursively`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21346: [SPARK-6237][NETWORK] Network-layer changes to al...

2018-06-14 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/21346#discussion_r195591573
  
--- Diff: 
common/network-common/src/test/java/org/apache/spark/network/RpcIntegrationSuite.java
 ---
@@ -207,9 +400,67 @@ private void assertErrorsContain(Set errors, 
Set contains) {
   break;
 }
   }
-  assertTrue("Could not find error containing " + contain + "; errors: 
" + errors, foundMatch);
+  if (!foundMatch) {
+notFound.add(contain);
+  }
+}
+return new ImmutablePair<>(remainingErrors, notFound);
+  }
+
+  private static class VerifyingStreamCallback implements 
StreamCallbackWithID {
+final String streamId;
+final StreamSuite.TestCallback helper;
+final OutputStream out;
+final File outFile;
+VerifyingStreamCallback(String streamId) throws IOException {
+  if (streamId.equals("file")) {
+outFile = File.createTempFile("data", ".tmp", testData.tempDir);
+out = new FileOutputStream(outFile);
+  } else {
+out = new ByteArrayOutputStream();
+outFile = null;
+  }
+  this.streamId = streamId;
+  helper = new StreamSuite.TestCallback(out);
+}
+
+void waitForCompletionAndVerify(long timeoutMs) throws IOException {
+  helper.waitForCompletion(timeoutMs);
+  if (streamId.equals("file")) {
+assertTrue("File stream did not match.", 
Files.equal(testData.testFile, outFile));
+  } else {
+byte[] result = ((ByteArrayOutputStream)out).toByteArray();
+ByteBuffer srcBuffer = testData.srcBuffer(streamId);
+ByteBuffer base;
+synchronized (srcBuffer) {
+  base = srcBuffer.duplicate();
+}
+byte[] expected = new byte[base.remaining()];
+base.get(expected);
+assertEquals(expected.length, result.length);
+assertTrue("buffers don't match", Arrays.equals(expected, result));
+
--- End diff --

nit: remove


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21346: [SPARK-6237][NETWORK] Network-layer changes to al...

2018-06-14 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/21346#discussion_r195588878
  
--- Diff: 
common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java
 ---
@@ -203,6 +197,76 @@ public void onFailure(Throwable e) {
 }
   }
 
+  /**
+   * Handle a request from the client to upload a stream of data.
+   */
+  private void processStreamUpload(final UploadStream req) {
+assert (req.body() == null);
+try {
+  RpcResponseCallback callback = new RpcResponseCallback() {
+@Override
+public void onSuccess(ByteBuffer response) {
+  respond(new RpcResponse(req.requestId, new 
NioManagedBuffer(response)));
+}
+
+@Override
+public void onFailure(Throwable e) {
+  respond(new RpcFailure(req.requestId, 
Throwables.getStackTraceAsString(e)));
+}
+  };
+  TransportFrameDecoder frameDecoder = (TransportFrameDecoder)
+  channel.pipeline().get(TransportFrameDecoder.HANDLER_NAME);
+  ByteBuffer meta = req.meta.nioByteBuffer();
+  StreamCallbackWithID streamHandler = 
rpcHandler.receiveStream(reverseClient, meta, callback);
+  StreamCallbackWithID wrappedCallback = new StreamCallbackWithID() {
+@Override
+public void onData(String streamId, ByteBuffer buf) throws 
IOException {
+  streamHandler.onData(streamId, buf);
+}
+
+@Override
+public void onComplete(String streamId) throws IOException {
+   try {
+ streamHandler.onComplete(streamId);
+ callback.onSuccess(ByteBuffer.allocate(0));
+   } catch (Exception ex) {
+ IOException ioExc = new IOException("Failure post-processing 
complete stream;" +
+   " failing this rpc and leaving channel active");
+ callback.onFailure(ioExc);
+ streamHandler.onFailure(streamId, ioExc);
+   }
+}
+
+@Override
+public void onFailure(String streamId, Throwable cause) throws 
IOException {
+  callback.onFailure(new IOException("Destination failed while 
reading stream", cause));
+  streamHandler.onFailure(streamId, cause);
+}
+
+@Override
+public String getID() {
+  return streamHandler.getID();
+}
+  };
+  if (req.bodyByteCount > 0) {
+StreamInterceptor interceptor = new StreamInterceptor(this, 
wrappedCallback.getID(),
+  req.bodyByteCount, wrappedCallback);
+frameDecoder.setInterceptor(interceptor);
+  } else {
+wrappedCallback.onComplete(wrappedCallback.getID());
+  }
+} catch (Exception e) {
+  logger.error("Error while invoking RpcHandler#receive() on RPC id " 
+ req.requestId, e);
+  respond(new RpcFailure(req.requestId, 
Throwables.getStackTraceAsString(e)));
+  // We choose to totally fail the channel, rather than trying to 
recover as we do in other
+  // cases.  We don't know how many bytes of the stream the client has 
already sent for the
+  // stream, its not worth trying to recover.
--- End diff --

it's


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21346: [SPARK-6237][NETWORK] Network-layer changes to al...

2018-06-14 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/21346#discussion_r195592205
  
--- Diff: 
common/network-common/src/test/java/org/apache/spark/network/StreamTestHelper.java
 ---
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.nio.ByteBuffer;
+import java.util.Random;
+
+import com.google.common.io.Files;
+
+import org.apache.spark.network.buffer.FileSegmentManagedBuffer;
+import org.apache.spark.network.buffer.ManagedBuffer;
+import org.apache.spark.network.buffer.NioManagedBuffer;
+import org.apache.spark.network.util.TransportConf;
+
+class StreamTestHelper {
+  static final String[] STREAMS = { "largeBuffer", "smallBuffer", 
"emptyBuffer", "file" };
+
+  final File testFile;
+  File tempDir;
+
+  ByteBuffer emptyBuffer;
+  ByteBuffer smallBuffer;
+  ByteBuffer largeBuffer;
+
+  private static ByteBuffer createBuffer(int bufSize) {
+ByteBuffer buf = ByteBuffer.allocate(bufSize);
+for (int i = 0; i < bufSize; i ++) {
+  buf.put((byte) i);
+}
+buf.flip();
+return buf;
+  }
+
+  StreamTestHelper() throws Exception {
+tempDir = Files.createTempDir();
+emptyBuffer = createBuffer(0);
+smallBuffer = createBuffer(100);
+largeBuffer = createBuffer(10);
+
+testFile = File.createTempFile("stream-test-file", "txt", tempDir);
+FileOutputStream fp = new FileOutputStream(testFile);
+try {
+  Random rnd = new Random();
+  for (int i = 0; i < 512; i++) {
+byte[] fileContent = new byte[1024];
+rnd.nextBytes(fileContent);
+fp.write(fileContent);
+  }
+} finally {
+  fp.close();
+}
+  }
+
+  public ByteBuffer srcBuffer(String name) {
+switch (name) {
+  case "largeBuffer":
+return largeBuffer;
+  case "smallBuffer":
+return smallBuffer;
+  case "emptyBuffer":
+return emptyBuffer;
+  default:
+throw new IllegalArgumentException("Invalid stream: " + name);
+}
+  }
+
+  public ManagedBuffer openStream(TransportConf conf, String streamId) {
+switch (streamId) {
+  case "file":
+return new FileSegmentManagedBuffer(conf, testFile, 0, 
testFile.length());
+  default:
+return new NioManagedBuffer(srcBuffer(streamId));
+}
+  }
+
+
--- End diff --

nit: remove


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21346: [SPARK-6237][NETWORK] Network-layer changes to al...

2018-06-14 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/21346#discussion_r195592158
  
--- Diff: 
common/network-common/src/test/java/org/apache/spark/network/StreamTestHelper.java
 ---
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.nio.ByteBuffer;
+import java.util.Random;
+
+import com.google.common.io.Files;
+
+import org.apache.spark.network.buffer.FileSegmentManagedBuffer;
+import org.apache.spark.network.buffer.ManagedBuffer;
+import org.apache.spark.network.buffer.NioManagedBuffer;
+import org.apache.spark.network.util.TransportConf;
+
+class StreamTestHelper {
+  static final String[] STREAMS = { "largeBuffer", "smallBuffer", 
"emptyBuffer", "file" };
+
+  final File testFile;
+  File tempDir;
--- End diff --

`final` for all these?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21346: [SPARK-6237][NETWORK] Network-layer changes to al...

2018-06-14 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/21346#discussion_r195586580
  
--- Diff: 
common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java
 ---
@@ -216,34 +192,99 @@ public long sendRpc(ByteBuffer message, 
RpcResponseCallback callback) {
   logger.trace("Sending RPC to {}", getRemoteAddress(channel));
 }
 
-long requestId = Math.abs(UUID.randomUUID().getLeastSignificantBits());
+long requestId = requestId();
 handler.addRpcRequest(requestId, callback);
 
 channel.writeAndFlush(new RpcRequest(requestId, new 
NioManagedBuffer(message)))
-.addListener(future -> {
-  if (future.isSuccess()) {
-long timeTaken = System.currentTimeMillis() - startTime;
-if (logger.isTraceEnabled()) {
-  logger.trace("Sending request {} to {} took {} ms", 
requestId,
-getRemoteAddress(channel), timeTaken);
-}
-  } else {
-String errorMsg = String.format("Failed to send RPC %s to %s: 
%s", requestId,
-  getRemoteAddress(channel), future.cause());
-logger.error(errorMsg, future.cause());
-handler.removeRpcRequest(requestId);
-channel.close();
-try {
-  callback.onFailure(new IOException(errorMsg, 
future.cause()));
-} catch (Exception e) {
-  logger.error("Uncaught exception in RPC response callback 
handler!", e);
-}
-  }
-});
+  .addListener(new RpcChannelListener(startTime, requestId, callback));
 
 return requestId;
   }
 
+  /**
+   * Send data to the remote end as a stream.   This differs from stream() 
in that this is a request
+   * to *send* data to the remote end, not to receive it from the remote.
+   *
+   * @param meta meta data associated with the stream, which will be read 
completely on the
+   * receiving end before the stream itself.
+   * @param data this will be streamed to the remote end to allow for 
transferring large amounts
+   * of data without reading into memory.
+   * @param callback handles the reply -- onSuccess will only be called 
when both message and data
+   * are received successfully.
+   */
+  public long uploadStream(
+  ManagedBuffer meta,
+  ManagedBuffer data,
+  RpcResponseCallback callback) {
+long startTime = System.currentTimeMillis();
--- End diff --

Seems like it should be easy to move this to `StdChannelListener`'s 
constructor. Looks pretty similar in all methods.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21346: [SPARK-6237][NETWORK] Network-layer changes to al...

2018-06-14 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/21346#discussion_r195590730
  
--- Diff: 
common/network-common/src/test/java/org/apache/spark/network/RpcIntegrationSuite.java
 ---
@@ -130,6 +200,60 @@ public void onFailure(Throwable e) {
 return res;
   }
 
+  private RpcResult sendRpcWithStream(String... streams) throws Exception {
+TransportClient client = 
clientFactory.createClient(TestUtils.getLocalHost(), server.getPort());
+final Semaphore sem = new Semaphore(0);
+RpcResult res = new RpcResult();
+res.successMessages = Collections.synchronizedSet(new 
HashSet());
+res.errorMessages = Collections.synchronizedSet(new HashSet());
+
+for (String stream : streams) {
+  int idx = stream.lastIndexOf('/');
+  ManagedBuffer meta = new 
NioManagedBuffer(JavaUtils.stringToBytes(stream));
+  String streamName = (idx == -1) ? stream : stream.substring(idx + 1);
+  ManagedBuffer data = testData.openStream(conf, streamName);
+  client.uploadStream(meta, data, new RpcStreamCallback(stream, res, 
sem));
+}
+streamCallbacks.values().forEach(streamCallback -> {
--- End diff --

I'm trying to follow the logic here...

- in L215, client sends a stream to the remote
- in L82, remote receives the stream and registers the callback
- here, you wait for the callbacks to finish in the order they were 
registered.

Isn't there a race between steps 2 and 3, as in you might miss one or more 
callback registration?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21346: [SPARK-6237][NETWORK] Network-layer changes to al...

2018-06-14 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/21346#discussion_r195584408
  
--- Diff: 
common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java
 ---
@@ -141,26 +141,14 @@ public void fetchChunk(
 StreamChunkId streamChunkId = new StreamChunkId(streamId, chunkIndex);
 handler.addFetchRequest(streamChunkId, callback);
 
-channel.writeAndFlush(new 
ChunkFetchRequest(streamChunkId)).addListener(future -> {
-  if (future.isSuccess()) {
-long timeTaken = System.currentTimeMillis() - startTime;
-if (logger.isTraceEnabled()) {
-  logger.trace("Sending request {} to {} took {} ms", 
streamChunkId,
-getRemoteAddress(channel), timeTaken);
+channel.writeAndFlush(new ChunkFetchRequest(streamChunkId))
+  .addListener( new StdChannelListener(startTime, streamChunkId) {
--- End diff --

nit: no space after `(`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21346: [SPARK-6237][NETWORK] Network-layer changes to al...

2018-06-14 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/21346#discussion_r195585864
  
--- Diff: 
common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java
 ---
@@ -216,34 +192,99 @@ public long sendRpc(ByteBuffer message, 
RpcResponseCallback callback) {
   logger.trace("Sending RPC to {}", getRemoteAddress(channel));
 }
 
-long requestId = Math.abs(UUID.randomUUID().getLeastSignificantBits());
+long requestId = requestId();
 handler.addRpcRequest(requestId, callback);
 
 channel.writeAndFlush(new RpcRequest(requestId, new 
NioManagedBuffer(message)))
-.addListener(future -> {
-  if (future.isSuccess()) {
-long timeTaken = System.currentTimeMillis() - startTime;
-if (logger.isTraceEnabled()) {
-  logger.trace("Sending request {} to {} took {} ms", 
requestId,
-getRemoteAddress(channel), timeTaken);
-}
-  } else {
-String errorMsg = String.format("Failed to send RPC %s to %s: 
%s", requestId,
-  getRemoteAddress(channel), future.cause());
-logger.error(errorMsg, future.cause());
-handler.removeRpcRequest(requestId);
-channel.close();
-try {
-  callback.onFailure(new IOException(errorMsg, 
future.cause()));
-} catch (Exception e) {
-  logger.error("Uncaught exception in RPC response callback 
handler!", e);
-}
-  }
-});
+  .addListener(new RpcChannelListener(startTime, requestId, callback));
 
 return requestId;
   }
 
+  /**
+   * Send data to the remote end as a stream.   This differs from stream() 
in that this is a request
+   * to *send* data to the remote end, not to receive it from the remote.
+   *
+   * @param meta meta data associated with the stream, which will be read 
completely on the
+   * receiving end before the stream itself.
+   * @param data this will be streamed to the remote end to allow for 
transferring large amounts
+   * of data without reading into memory.
+   * @param callback handles the reply -- onSuccess will only be called 
when both message and data
+   * are received successfully.
+   */
+  public long uploadStream(
+  ManagedBuffer meta,
+  ManagedBuffer data,
+  RpcResponseCallback callback) {
+long startTime = System.currentTimeMillis();
+if (logger.isTraceEnabled()) {
+  logger.trace("Sending RPC to {}", getRemoteAddress(channel));
+}
+
+long requestId = requestId();
+handler.addRpcRequest(requestId, callback);
+
+channel.writeAndFlush(new UploadStream(requestId, meta, data))
+  .addListener(new RpcChannelListener(startTime, requestId, callback));
+
+return requestId;
+  }
+
+  private class StdChannelListener
--- End diff --

I personally try to keep nested classes at the bottom of the enclosing 
class, but up to you.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21346: [SPARK-6237][NETWORK] Network-layer changes to al...

2018-06-14 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/21346#discussion_r195584794
  
--- Diff: 
common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java
 ---
@@ -216,34 +192,99 @@ public long sendRpc(ByteBuffer message, 
RpcResponseCallback callback) {
   logger.trace("Sending RPC to {}", getRemoteAddress(channel));
 }
 
-long requestId = Math.abs(UUID.randomUUID().getLeastSignificantBits());
+long requestId = requestId();
 handler.addRpcRequest(requestId, callback);
 
 channel.writeAndFlush(new RpcRequest(requestId, new 
NioManagedBuffer(message)))
-.addListener(future -> {
-  if (future.isSuccess()) {
-long timeTaken = System.currentTimeMillis() - startTime;
-if (logger.isTraceEnabled()) {
-  logger.trace("Sending request {} to {} took {} ms", 
requestId,
-getRemoteAddress(channel), timeTaken);
-}
-  } else {
-String errorMsg = String.format("Failed to send RPC %s to %s: 
%s", requestId,
-  getRemoteAddress(channel), future.cause());
-logger.error(errorMsg, future.cause());
-handler.removeRpcRequest(requestId);
-channel.close();
-try {
-  callback.onFailure(new IOException(errorMsg, 
future.cause()));
-} catch (Exception e) {
-  logger.error("Uncaught exception in RPC response callback 
handler!", e);
-}
-  }
-});
+  .addListener(new RpcChannelListener(startTime, requestId, callback));
 
 return requestId;
   }
 
+  /**
+   * Send data to the remote end as a stream.   This differs from stream() 
in that this is a request
--- End diff --

I know you're in the "2 spaces after period camp", but that's 3.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21346: [SPARK-6237][NETWORK] Network-layer changes to al...

2018-06-14 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/21346#discussion_r195591438
  
--- Diff: 
common/network-common/src/test/java/org/apache/spark/network/RpcIntegrationSuite.java
 ---
@@ -207,9 +400,67 @@ private void assertErrorsContain(Set errors, 
Set contains) {
   break;
 }
   }
-  assertTrue("Could not find error containing " + contain + "; errors: 
" + errors, foundMatch);
+  if (!foundMatch) {
+notFound.add(contain);
+  }
+}
+return new ImmutablePair<>(remainingErrors, notFound);
+  }
+
+  private static class VerifyingStreamCallback implements 
StreamCallbackWithID {
+final String streamId;
+final StreamSuite.TestCallback helper;
+final OutputStream out;
+final File outFile;
+VerifyingStreamCallback(String streamId) throws IOException {
--- End diff --

nit: add empty line


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21346: [SPARK-6237][NETWORK] Network-layer changes to al...

2018-06-14 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/21346#discussion_r195464933
  
--- Diff: 
common/network-common/src/main/java/org/apache/spark/network/server/RpcHandler.java
 ---
@@ -38,15 +38,28 @@
*
* This method will not be called in parallel for a single 
TransportClient (i.e., channel).
*
+   * The rpc *might* included a data stream in streamData 
(eg. for uploading a large
+   * amount of data which should not be buffered in memory here).  An 
error while reading data from
+   * the stream ({@link 
org.apache.spark.network.client.StreamCallback#onData(String, ByteBuffer)})
+   * will fail the entire channel.  A failure in "post-processing" the 
stream in
+   * {@link 
org.apache.spark.network.client.StreamCallback#onComplete(String)} will result 
in an
+   * rpcFailure, but the channel will remain active.
+   *
+   * If streamData is not null, you *must* call 
streamData.registerStreamCallback
+   * before this method returns.
+   *
* @param client A channel client which enables the handler to make 
requests back to the sender
*   of this RPC. This will always be the exact same object 
for a particular channel.
* @param message The serialized bytes of the RPC.
+   * @param streamData StreamData if there is data which is meant to be 
read via a StreamCallback;
--- End diff --

I've done this refactoring, and I agree it made the change significantly 
simpler.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21346: [SPARK-6237][NETWORK] Network-layer changes to al...

2018-06-14 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/21346#discussion_r195464525
  
--- Diff: 
common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java
 ---
@@ -220,30 +196,91 @@ public long sendRpc(ByteBuffer message, 
RpcResponseCallback callback) {
 handler.addRpcRequest(requestId, callback);
 
 channel.writeAndFlush(new RpcRequest(requestId, new 
NioManagedBuffer(message)))
-.addListener(future -> {
-  if (future.isSuccess()) {
-long timeTaken = System.currentTimeMillis() - startTime;
-if (logger.isTraceEnabled()) {
-  logger.trace("Sending request {} to {} took {} ms", 
requestId,
-getRemoteAddress(channel), timeTaken);
-}
-  } else {
-String errorMsg = String.format("Failed to send RPC %s to %s: 
%s", requestId,
-  getRemoteAddress(channel), future.cause());
-logger.error(errorMsg, future.cause());
-handler.removeRpcRequest(requestId);
-channel.close();
-try {
-  callback.onFailure(new IOException(errorMsg, 
future.cause()));
-} catch (Exception e) {
-  logger.error("Uncaught exception in RPC response callback 
handler!", e);
-}
-  }
-});
+  .addListener(new RpcChannelListener(startTime, requestId, callback));
+
+return requestId;
+  }
+
+  /**
+   * Send data to the remote end as a stream.   This differs from stream() 
in that this is a request
+   * to *send* data to the remote end, not to receive it from the remote.
+   *
+   * @param meta meta data associated with the stream, which will be read 
completely on the
+   * receiving end before the stream itself.
+   * @param data this will be streamed to the remote end to allow for 
transferring large amounts
+   * of data without reading into memory.
+   * @param callback handles the reply -- onSuccess will only be called 
when both message and data
+   * are received successfully.
+   */
+  public long uploadStream(
+  ManagedBuffer meta,
+  ManagedBuffer data,
+  RpcResponseCallback callback) {
+long startTime = System.currentTimeMillis();
+if (logger.isTraceEnabled()) {
+  logger.trace("Sending RPC to {}", getRemoteAddress(channel));
+}
+
+long requestId = Math.abs(UUID.randomUUID().getLeastSignificantBits());
--- End diff --

done


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21346: [SPARK-6237][NETWORK] Network-layer changes to al...

2018-06-13 Thread witgo
Github user witgo commented on a diff in the pull request:

https://github.com/apache/spark/pull/21346#discussion_r195284967
  
--- Diff: 
common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java
 ---
@@ -220,30 +196,91 @@ public long sendRpc(ByteBuffer message, 
RpcResponseCallback callback) {
 handler.addRpcRequest(requestId, callback);
 
 channel.writeAndFlush(new RpcRequest(requestId, new 
NioManagedBuffer(message)))
-.addListener(future -> {
-  if (future.isSuccess()) {
-long timeTaken = System.currentTimeMillis() - startTime;
-if (logger.isTraceEnabled()) {
-  logger.trace("Sending request {} to {} took {} ms", 
requestId,
-getRemoteAddress(channel), timeTaken);
-}
-  } else {
-String errorMsg = String.format("Failed to send RPC %s to %s: 
%s", requestId,
-  getRemoteAddress(channel), future.cause());
-logger.error(errorMsg, future.cause());
-handler.removeRpcRequest(requestId);
-channel.close();
-try {
-  callback.onFailure(new IOException(errorMsg, 
future.cause()));
-} catch (Exception e) {
-  logger.error("Uncaught exception in RPC response callback 
handler!", e);
-}
-  }
-});
+  .addListener(new RpcChannelListener(startTime, requestId, callback));
+
+return requestId;
+  }
+
+  /**
+   * Send data to the remote end as a stream.   This differs from stream() 
in that this is a request
+   * to *send* data to the remote end, not to receive it from the remote.
+   *
+   * @param meta meta data associated with the stream, which will be read 
completely on the
+   * receiving end before the stream itself.
+   * @param data this will be streamed to the remote end to allow for 
transferring large amounts
+   * of data without reading into memory.
+   * @param callback handles the reply -- onSuccess will only be called 
when both message and data
+   * are received successfully.
+   */
+  public long uploadStream(
+  ManagedBuffer meta,
+  ManagedBuffer data,
+  RpcResponseCallback callback) {
+long startTime = System.currentTimeMillis();
+if (logger.isTraceEnabled()) {
+  logger.trace("Sending RPC to {}", getRemoteAddress(channel));
+}
+
+long requestId = Math.abs(UUID.randomUUID().getLeastSignificantBits());
--- End diff --

This  `Math.abs(UUID.randomUUID().getLeastSignificantBits());` is repeated 
twice. Move it to a separate new method .


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21346: [SPARK-6237][NETWORK] Network-layer changes to al...

2018-06-04 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/21346#discussion_r192825636
  
--- Diff: 
common/network-common/src/main/java/org/apache/spark/network/server/RpcHandler.java
 ---
@@ -38,15 +38,28 @@
*
* This method will not be called in parallel for a single 
TransportClient (i.e., channel).
*
+   * The rpc *might* included a data stream in streamData 
(eg. for uploading a large
+   * amount of data which should not be buffered in memory here).  An 
error while reading data from
+   * the stream ({@link 
org.apache.spark.network.client.StreamCallback#onData(String, ByteBuffer)})
+   * will fail the entire channel.  A failure in "post-processing" the 
stream in
+   * {@link 
org.apache.spark.network.client.StreamCallback#onComplete(String)} will result 
in an
+   * rpcFailure, but the channel will remain active.
+   *
+   * If streamData is not null, you *must* call 
streamData.registerStreamCallback
+   * before this method returns.
+   *
* @param client A channel client which enables the handler to make 
requests back to the sender
*   of this RPC. This will always be the exact same object 
for a particular channel.
* @param message The serialized bytes of the RPC.
+   * @param streamData StreamData if there is data which is meant to be 
read via a StreamCallback;
--- End diff --

I'm wondering if a separate callback for these streams wouldn't be better. 
It would at the very least avoid having to change all the existing handlers.

But it would also make it clearer what the contract is. For example, the 
callback could return the stream callback to be registered. 

It also doesn't seem like `StreamData` itself has a lot of useful 
information other than the registration method, so it could be replaced with 
parameters in the new callback, avoiding having to expose that type to RPC 
handlers.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21346: [SPARK-6237][NETWORK] Network-layer changes to al...

2018-06-04 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/21346#discussion_r192823558
  
--- Diff: project/MimaExcludes.scala ---
@@ -36,6 +36,9 @@ object MimaExcludes {
 
   // Exclude rules for 2.4.x
   lazy val v24excludes = v23excludes ++ Seq(
+// [SPARK-6237][NETWORK] Network-layer changes to allow stream upload
+
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.network.netty.NettyBlockRpcServer.receive"),
--- End diff --

I only see references to them in Scala code... also `private[package]` 
translates to `public` in Java, so that would at least avoid the mima checks.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21346: [SPARK-6237][NETWORK] Network-layer changes to al...

2018-06-04 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/21346#discussion_r192797087
  
--- Diff: 
common/network-common/src/main/java/org/apache/spark/network/server/RpcHandler.java
 ---
@@ -38,15 +38,28 @@
*
* This method will not be called in parallel for a single 
TransportClient (i.e., channel).
*
+   * The rpc *might* included a data stream in streamData 
(eg. for uploading a large
+   * amount of data which should not be buffered in memory here).  An 
error while reading data from
+   * the stream ({@link 
org.apache.spark.network.client.StreamCallback#onData(String, ByteBuffer)})
+   * will fail the entire channel.  A failure in "post-processing" the 
stream in
+   * {@link 
org.apache.spark.network.client.StreamCallback#onComplete(String)} will result 
in an
+   * rpcFailure, but the channel will remain active.
+   *
+   * If streamData is not null, you *must* call 
streamData.registerStreamCallback
+   * before this method returns.
+   *
* @param client A channel client which enables the handler to make 
requests back to the sender
*   of this RPC. This will always be the exact same object 
for a particular channel.
* @param message The serialized bytes of the RPC.
+   * @param streamData StreamData if there is data which is meant to be 
read via a StreamCallback;
+   *   otherwise it is null.
* @param callback Callback which should be invoked exactly once upon 
success or failure of the
* RPC.
*/
   public abstract void receive(
   TransportClient client,
   ByteBuffer message,
+  StreamData streamData,
--- End diff --

moving discussion from here: 
https://github.com/apache/spark/pull/21451#discussion_r191628993

@witgo suggested the `message` could be moved inside `streamData` -- any 
particular reason to do that?  It would work fine to do it that way as well, 
though I don't see any advantage.  I guess I'm in favor of keeping it this way.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21346: [SPARK-6237][NETWORK] Network-layer changes to al...

2018-06-04 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/21346#discussion_r192795662
  
--- Diff: 
common/network-common/src/main/java/org/apache/spark/network/server/RpcHandler.java
 ---
@@ -38,15 +38,24 @@
*
* This method will not be called in parallel for a single 
TransportClient (i.e., channel).
*
+   * The rpc *might* included a data stream in streamData 
(eg. for uploading a large
+   * amount of data which should not be buffered in memory here).  Any 
errors while handling the
+   * streamData will lead to failing this entire connection -- all other 
in-flight rpcs will fail.
--- End diff --

you bring up a good point here.  I was thinking about how the places we 
might have an error occur:

1) while reading the stream data (ie. StreamCallback.onData).  In the 
intended use case, this is basically just opening a file and writing bytes to 
it.

2) post-processing the complete data (StreamCallback.onComplete).  This is 
doing the whole BlockManager.put, which can be rather complex.

Failures in (1) are unlikely and are difficult to recover; failures in (2) 
are more likely, but the channel should be totally fine.  I've updated the 
code, comments,  and test to make sure things are OK for (2).  
https://github.com/apache/spark/pull/21346/commits/6c086c51873c72fa0cf9f373afd069ac63de3b75

though your points are still valid for (1), though I think we can live with 
it.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21346: [SPARK-6237][NETWORK] Network-layer changes to al...

2018-06-02 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/21346#discussion_r192566116
  
--- Diff: 
common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java
 ---
@@ -141,26 +141,14 @@ public void fetchChunk(
 StreamChunkId streamChunkId = new StreamChunkId(streamId, chunkIndex);
 handler.addFetchRequest(streamChunkId, callback);
 
-channel.writeAndFlush(new 
ChunkFetchRequest(streamChunkId)).addListener(future -> {
--- End diff --

Thanks for explaining. I guess the re-ordering of `channel.close()` and the 
`handler` operations is safe because the handler doesn't hold references to the 
channel / otherwise does not interact with it (and doesn't hold references to 
objects tied to channel lifecycle (like buffers))?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21346: [SPARK-6237][NETWORK] Network-layer changes to al...

2018-06-02 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/21346#discussion_r192565980
  
--- Diff: 
common/network-common/src/main/java/org/apache/spark/network/server/RpcHandler.java
 ---
@@ -38,15 +38,24 @@
*
* This method will not be called in parallel for a single 
TransportClient (i.e., channel).
*
+   * The rpc *might* included a data stream in streamData 
(eg. for uploading a large
+   * amount of data which should not be buffered in memory here).  Any 
errors while handling the
+   * streamData will lead to failing this entire connection -- all other 
in-flight rpcs will fail.
--- End diff --

I'm trying to think through whether we'll risk introducing any weird new 
failure modes (or increasing the occurrence of existing-but-improbable failure 
modes). For example, causing in-flight RPCs to fail could surface latent RPC 
timeout issues: if we have a timeout which is way too long and we drop 
in-flight responses on the floor without sending back negative ACKs then we 
could see (finite but potentially long) hangs.

On the other hand, this pathway is used for executor <-> executor transfers 
and generally not executor <-> driver transfers, so my understanding is that 
failures in this channel generally won't impact control RPCs.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21346: [SPARK-6237][NETWORK] Network-layer changes to al...

2018-06-02 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/21346#discussion_r192565530
  
--- Diff: 
common/network-common/src/main/java/org/apache/spark/network/protocol/UploadStream.java
 ---
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.protocol;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import com.google.common.base.Objects;
+import io.netty.buffer.ByteBuf;
+
+import org.apache.spark.network.buffer.ManagedBuffer;
+import org.apache.spark.network.buffer.NettyManagedBuffer;
+
+/**
+ * An RPC with data that is sent outside of the frame, so it can be read 
as a stream.
+ */
+public final class UploadStream extends AbstractMessage implements 
RequestMessage {
+  /** Used to link an RPC request with its response. */
+  public final long requestId;
+  public final ManagedBuffer meta;
+  public final long bodyByteCount;
+
+  public UploadStream(long requestId, ManagedBuffer meta, ManagedBuffer 
body) {
+super(body, false); // body is *not* included in the frame
+this.requestId = requestId;
+this.meta = meta;
+bodyByteCount = body.size();
+  }
+
+  // this version is called when decoding the bytes on the receiving end.  
The body is handled
+  // separately.
+  private UploadStream(long requestId, ManagedBuffer meta, long 
bodyByteCount) {
+super(null, false);
+this.requestId = requestId;
+this.meta = meta;
+this.bodyByteCount = bodyByteCount;
+  }
+
+  @Override
+  public Type type() { return Type.UploadStream; }
+
+  @Override
+  public int encodedLength() {
+// the requestId, meta size, meta and bodyByteCount (body is not 
included)
+return 8 + 4 + ((int) meta.size()) + 8;
+  }
+
+  @Override
+  public void encode(ByteBuf buf) {
+buf.writeLong(requestId);
+try {
+  ByteBuffer metaBuf = meta.nioByteBuffer();
+  buf.writeInt(metaBuf.remaining());
+  buf.writeBytes(metaBuf);
+} catch (IOException io) {
+  throw new RuntimeException(io);
+}
+buf.writeLong(bodyByteCount);
+  }
+
+  public static UploadStream decode(ByteBuf buf) {
+long requestId = buf.readLong();
+int metaSize = buf.readInt();
+ManagedBuffer meta = new 
NettyManagedBuffer(buf.readRetainedSlice(metaSize));
+long bodyByteCount = buf.readLong();
+// This is called by the frame decoder, so the data is still null.  We 
need a StreamInterceptor
+// to read the data.
+return new UploadStream(requestId, meta, bodyByteCount);
+  }
+
+  @Override
+  public int hashCode() {
+return Objects.hashCode(requestId, body());
+  }
+
+  @Override
+  public boolean equals(Object other) {
+if (other instanceof UploadStream) {
+  UploadStream o = (UploadStream) other;
+  return requestId == o.requestId && super.equals(o);
+}
+return false;
+  }
+
+  @Override
+  public String toString() {
+return Objects.toStringHelper(this)
+  .add("requestId", requestId)
+  .add("body", body())
--- End diff --

I'm not actually sure. I wonder if this is a latent problem in the old code 
waiting to happen in case we turn on trace logging. We can probably investigate 
that separately, but just wanted to note it since it seemed a little dodgy.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21346: [SPARK-6237][NETWORK] Network-layer changes to al...

2018-05-30 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/21346#discussion_r191981552
  
--- Diff: 
common/network-common/src/main/java/org/apache/spark/network/server/RpcHandler.java
 ---
@@ -38,15 +38,24 @@
*
* This method will not be called in parallel for a single 
TransportClient (i.e., channel).
*
+   * The rpc *might* included a data stream in streamData 
(eg. for uploading a large
+   * amount of data which should not be buffered in memory here).  Any 
errors while handling the
+   * streamData will lead to failing this entire connection -- all other 
in-flight rpcs will fail.
--- End diff --

pretty good question actually :)

I will take a closer look at this myself but I believe this connection is 
shared by other tasks running on the same executor which are trying to talk to 
the same destination.  So that might mean another task which is replicating to 
the same destination, or reading data from that same remote executor.  those 
don't have specific retry behavior for connection closed -- that might result 
in the data just not getting replicated, fetching data from elsewhere, or the 
task getting retried.

I think this is actually OK -- the existing code could cause an OOM on the 
remote end anyway, which obviously would fail a lot more.   This failure 
behavior seems reasonable.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21346: [SPARK-6237][NETWORK] Network-layer changes to al...

2018-05-30 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/21346#discussion_r191979425
  
--- Diff: 
common/network-common/src/main/java/org/apache/spark/network/protocol/UploadStream.java
 ---
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.protocol;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import com.google.common.base.Objects;
+import io.netty.buffer.ByteBuf;
+
+import org.apache.spark.network.buffer.ManagedBuffer;
+import org.apache.spark.network.buffer.NettyManagedBuffer;
+
+/**
+ * An RPC with data that is sent outside of the frame, so it can be read 
as a stream.
+ */
+public final class UploadStream extends AbstractMessage implements 
RequestMessage {
+  /** Used to link an RPC request with its response. */
+  public final long requestId;
+  public final ManagedBuffer meta;
+  public final long bodyByteCount;
+
+  public UploadStream(long requestId, ManagedBuffer meta, ManagedBuffer 
body) {
+super(body, false); // body is *not* included in the frame
+this.requestId = requestId;
+this.meta = meta;
+bodyByteCount = body.size();
+  }
+
+  // this version is called when decoding the bytes on the receiving end.  
The body is handled
+  // separately.
+  private UploadStream(long requestId, ManagedBuffer meta, long 
bodyByteCount) {
+super(null, false);
+this.requestId = requestId;
+this.meta = meta;
+this.bodyByteCount = bodyByteCount;
+  }
+
+  @Override
+  public Type type() { return Type.UploadStream; }
+
+  @Override
+  public int encodedLength() {
+// the requestId, meta size, meta and bodyByteCount (body is not 
included)
+return 8 + 4 + ((int) meta.size()) + 8;
+  }
+
+  @Override
+  public void encode(ByteBuf buf) {
+buf.writeLong(requestId);
+try {
+  ByteBuffer metaBuf = meta.nioByteBuffer();
+  buf.writeInt(metaBuf.remaining());
+  buf.writeBytes(metaBuf);
+} catch (IOException io) {
+  throw new RuntimeException(io);
+}
+buf.writeLong(bodyByteCount);
+  }
+
+  public static UploadStream decode(ByteBuf buf) {
+long requestId = buf.readLong();
+int metaSize = buf.readInt();
+ManagedBuffer meta = new 
NettyManagedBuffer(buf.readRetainedSlice(metaSize));
+long bodyByteCount = buf.readLong();
+// This is called by the frame decoder, so the data is still null.  We 
need a StreamInterceptor
+// to read the data.
+return new UploadStream(requestId, meta, bodyByteCount);
+  }
+
+  @Override
+  public int hashCode() {
+return Objects.hashCode(requestId, body());
+  }
+
+  @Override
+  public boolean equals(Object other) {
+if (other instanceof UploadStream) {
+  UploadStream o = (UploadStream) other;
+  return requestId == o.requestId && super.equals(o);
+}
+return false;
+  }
+
+  @Override
+  public String toString() {
+return Objects.toStringHelper(this)
+  .add("requestId", requestId)
+  .add("body", body())
--- End diff --

to be honest, this was also just parroted from other classes -- looking now 
at implementations of ManagedBuffer, if they have a `toString()` it does 
something reasonable.

Is that actually useful for debugging?  maybe not, don't think I ever 
actually looked at this.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21346: [SPARK-6237][NETWORK] Network-layer changes to al...

2018-05-30 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/21346#discussion_r191979019
  
--- Diff: 
common/network-common/src/main/java/org/apache/spark/network/protocol/UploadStream.java
 ---
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.protocol;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import com.google.common.base.Objects;
+import io.netty.buffer.ByteBuf;
+
+import org.apache.spark.network.buffer.ManagedBuffer;
+import org.apache.spark.network.buffer.NettyManagedBuffer;
+
+/**
+ * An RPC with data that is sent outside of the frame, so it can be read 
as a stream.
+ */
+public final class UploadStream extends AbstractMessage implements 
RequestMessage {
+  /** Used to link an RPC request with its response. */
+  public final long requestId;
+  public final ManagedBuffer meta;
+  public final long bodyByteCount;
+
+  public UploadStream(long requestId, ManagedBuffer meta, ManagedBuffer 
body) {
+super(body, false); // body is *not* included in the frame
+this.requestId = requestId;
+this.meta = meta;
+bodyByteCount = body.size();
+  }
+
+  // this version is called when decoding the bytes on the receiving end.  
The body is handled
+  // separately.
+  private UploadStream(long requestId, ManagedBuffer meta, long 
bodyByteCount) {
+super(null, false);
+this.requestId = requestId;
+this.meta = meta;
+this.bodyByteCount = bodyByteCount;
+  }
+
+  @Override
+  public Type type() { return Type.UploadStream; }
+
+  @Override
+  public int encodedLength() {
+// the requestId, meta size, meta and bodyByteCount (body is not 
included)
+return 8 + 4 + ((int) meta.size()) + 8;
+  }
+
+  @Override
+  public void encode(ByteBuf buf) {
+buf.writeLong(requestId);
+try {
+  ByteBuffer metaBuf = meta.nioByteBuffer();
+  buf.writeInt(metaBuf.remaining());
+  buf.writeBytes(metaBuf);
+} catch (IOException io) {
+  throw new RuntimeException(io);
+}
+buf.writeLong(bodyByteCount);
+  }
+
+  public static UploadStream decode(ByteBuf buf) {
+long requestId = buf.readLong();
+int metaSize = buf.readInt();
+ManagedBuffer meta = new 
NettyManagedBuffer(buf.readRetainedSlice(metaSize));
+long bodyByteCount = buf.readLong();
+// This is called by the frame decoder, so the data is still null.  We 
need a StreamInterceptor
+// to read the data.
+return new UploadStream(requestId, meta, bodyByteCount);
+  }
+
+  @Override
+  public int hashCode() {
+return Objects.hashCode(requestId, body());
--- End diff --

this is a good point.  Admittedly I just copied this from `StreamResponse` 
without thinking about it too much -- that class exhibits the same issue.  I'll 
remove `body` from both.

(In practice, we're not using sticking them in hashmaps now so there 
wouldn't be any bugs in behavior because of this.)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21346: [SPARK-6237][NETWORK] Network-layer changes to al...

2018-05-30 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/21346#discussion_r191978545
  
--- Diff: 
common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java
 ---
@@ -141,26 +141,14 @@ public void fetchChunk(
 StreamChunkId streamChunkId = new StreamChunkId(streamId, chunkIndex);
 handler.addFetchRequest(streamChunkId, callback);
 
-channel.writeAndFlush(new 
ChunkFetchRequest(streamChunkId)).addListener(future -> {
--- End diff --

yes exactly.  Marcelo asked for this refactoring in his review -- there was 
already a ton of copy-paste, and instead of adding more made sense to refactor. 
 Shouldn't be any behavior change (there are minor changes that shouldn't 
matter ...  `channel.close()` happens before the more specific cleanup 
operations whereas it was in the middle previously, the `try` encompasses a bit 
more than before.)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21346: [SPARK-6237][NETWORK] Network-layer changes to al...

2018-05-30 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/21346#discussion_r191978140
  
--- Diff: 
common/network-common/src/main/java/org/apache/spark/network/client/StreamInterceptor.java
 ---
@@ -50,16 +52,22 @@
 
   @Override
   public void exceptionCaught(Throwable cause) throws Exception {
-handler.deactivateStream();
+deactivateStream();
 callback.onFailure(streamId, cause);
   }
 
   @Override
   public void channelInactive() throws Exception {
-handler.deactivateStream();
+deactivateStream();
 callback.onFailure(streamId, new ClosedChannelException());
   }
 
+  private void deactivateStream() {
+if (handler instanceof TransportResponseHandler) {
--- End diff --

the only purpose of `TransportResponseHandler.deactivateStream()` is to 
include the stream request in the count for `numOutstandingRequests` (its not 
doing any critical cleanup).  I will include a comment here explaining that.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21346: [SPARK-6237][NETWORK] Network-layer changes to al...

2018-05-30 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/21346#discussion_r191976952
  
--- Diff: 
common/network-common/src/main/java/org/apache/spark/network/server/StreamData.java
 ---
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.server;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.spark.network.client.RpcResponseCallback;
+import org.apache.spark.network.client.StreamCallback;
+import org.apache.spark.network.client.StreamInterceptor;
+import org.apache.spark.network.util.TransportFrameDecoder;
+
+/**
+ * A holder for streamed data sent along with an RPC message.
+ */
+public class StreamData {
+
+  private final TransportRequestHandler handler;
+  private final TransportFrameDecoder frameDecoder;
+  private final RpcResponseCallback rpcCallback;
+  private final ByteBuffer meta;
--- End diff --

whoops, you're right.  I was using this at one point in the follow-on 
patch, then changed it and didn't fully clean this up.  thanks


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21346: [SPARK-6237][NETWORK] Network-layer changes to al...

2018-05-30 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/21346#discussion_r191964329
  
--- Diff: project/MimaExcludes.scala ---
@@ -36,6 +36,9 @@ object MimaExcludes {
 
   // Exclude rules for 2.4.x
   lazy val v24excludes = v23excludes ++ Seq(
+// [SPARK-6237][NETWORK] Network-layer changes to allow stream upload
+
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.network.netty.NettyBlockRpcServer.receive"),
--- End diff --

I suspect that it's because we might want to access these across Java 
package boundaries and Java doesn't have the equivalent of Scala's nested 
package scoped `private[package]`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21346: [SPARK-6237][NETWORK] Network-layer changes to al...

2018-05-30 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/21346#discussion_r191941962
  
--- Diff: 
common/network-common/src/main/java/org/apache/spark/network/server/StreamData.java
 ---
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.server;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.spark.network.client.RpcResponseCallback;
+import org.apache.spark.network.client.StreamCallback;
+import org.apache.spark.network.client.StreamInterceptor;
+import org.apache.spark.network.util.TransportFrameDecoder;
+
+/**
+ * A holder for streamed data sent along with an RPC message.
+ */
+public class StreamData {
+
+  private final TransportRequestHandler handler;
+  private final TransportFrameDecoder frameDecoder;
+  private final RpcResponseCallback rpcCallback;
+  private final ByteBuffer meta;
--- End diff --

It looks like this field is not actually used in the current 
implementation. Is that intentional?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21346: [SPARK-6237][NETWORK] Network-layer changes to al...

2018-05-30 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/21346#discussion_r191941503
  
--- Diff: 
common/network-common/src/main/java/org/apache/spark/network/server/RpcHandler.java
 ---
@@ -38,15 +38,24 @@
*
* This method will not be called in parallel for a single 
TransportClient (i.e., channel).
*
+   * The rpc *might* included a data stream in streamData 
(eg. for uploading a large
+   * amount of data which should not be buffered in memory here).  Any 
errors while handling the
+   * streamData will lead to failing this entire connection -- all other 
in-flight rpcs will fail.
--- End diff --

Perhaps naive question: what are the implications of this? Is this 
referring to a scenario where we've multiplexed multiple asynchronous requests 
/ responses over a single network connection? I think I understand _why_ the 
failure mode is as stated (we're worried about leaving non-consumed leftover 
data in the channel) but I just wanted to ask about the implications of failing 
other in-flight RPCs.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21346: [SPARK-6237][NETWORK] Network-layer changes to al...

2018-05-30 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/21346#discussion_r191940304
  
--- Diff: 
common/network-common/src/main/java/org/apache/spark/network/protocol/UploadStream.java
 ---
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.protocol;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import com.google.common.base.Objects;
+import io.netty.buffer.ByteBuf;
+
+import org.apache.spark.network.buffer.ManagedBuffer;
+import org.apache.spark.network.buffer.NettyManagedBuffer;
+
+/**
+ * An RPC with data that is sent outside of the frame, so it can be read 
as a stream.
+ */
+public final class UploadStream extends AbstractMessage implements 
RequestMessage {
+  /** Used to link an RPC request with its response. */
+  public final long requestId;
+  public final ManagedBuffer meta;
+  public final long bodyByteCount;
+
+  public UploadStream(long requestId, ManagedBuffer meta, ManagedBuffer 
body) {
+super(body, false); // body is *not* included in the frame
+this.requestId = requestId;
+this.meta = meta;
+bodyByteCount = body.size();
+  }
+
+  // this version is called when decoding the bytes on the receiving end.  
The body is handled
+  // separately.
+  private UploadStream(long requestId, ManagedBuffer meta, long 
bodyByteCount) {
+super(null, false);
+this.requestId = requestId;
+this.meta = meta;
+this.bodyByteCount = bodyByteCount;
+  }
+
+  @Override
+  public Type type() { return Type.UploadStream; }
+
+  @Override
+  public int encodedLength() {
+// the requestId, meta size, meta and bodyByteCount (body is not 
included)
+return 8 + 4 + ((int) meta.size()) + 8;
+  }
+
+  @Override
+  public void encode(ByteBuf buf) {
+buf.writeLong(requestId);
+try {
+  ByteBuffer metaBuf = meta.nioByteBuffer();
+  buf.writeInt(metaBuf.remaining());
+  buf.writeBytes(metaBuf);
+} catch (IOException io) {
+  throw new RuntimeException(io);
+}
+buf.writeLong(bodyByteCount);
+  }
+
+  public static UploadStream decode(ByteBuf buf) {
+long requestId = buf.readLong();
+int metaSize = buf.readInt();
+ManagedBuffer meta = new 
NettyManagedBuffer(buf.readRetainedSlice(metaSize));
+long bodyByteCount = buf.readLong();
+// This is called by the frame decoder, so the data is still null.  We 
need a StreamInterceptor
+// to read the data.
+return new UploadStream(requestId, meta, bodyByteCount);
+  }
+
+  @Override
+  public int hashCode() {
+return Objects.hashCode(requestId, body());
+  }
+
+  @Override
+  public boolean equals(Object other) {
+if (other instanceof UploadStream) {
+  UploadStream o = (UploadStream) other;
+  return requestId == o.requestId && super.equals(o);
+}
+return false;
+  }
+
+  @Override
+  public String toString() {
+return Objects.toStringHelper(this)
+  .add("requestId", requestId)
+  .add("body", body())
--- End diff --

Similar question here about whether `body()` is useful in this context: 
will this actually end up printing buffer contents, which are potentially huge? 
Or will it do something reasonable and print only the buffer type?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21346: [SPARK-6237][NETWORK] Network-layer changes to al...

2018-05-30 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/21346#discussion_r191939431
  
--- Diff: 
common/network-common/src/main/java/org/apache/spark/network/protocol/UploadStream.java
 ---
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.protocol;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import com.google.common.base.Objects;
+import io.netty.buffer.ByteBuf;
+
+import org.apache.spark.network.buffer.ManagedBuffer;
+import org.apache.spark.network.buffer.NettyManagedBuffer;
+
+/**
+ * An RPC with data that is sent outside of the frame, so it can be read 
as a stream.
+ */
+public final class UploadStream extends AbstractMessage implements 
RequestMessage {
+  /** Used to link an RPC request with its response. */
+  public final long requestId;
+  public final ManagedBuffer meta;
+  public final long bodyByteCount;
+
+  public UploadStream(long requestId, ManagedBuffer meta, ManagedBuffer 
body) {
+super(body, false); // body is *not* included in the frame
+this.requestId = requestId;
+this.meta = meta;
+bodyByteCount = body.size();
+  }
+
+  // this version is called when decoding the bytes on the receiving end.  
The body is handled
+  // separately.
+  private UploadStream(long requestId, ManagedBuffer meta, long 
bodyByteCount) {
+super(null, false);
+this.requestId = requestId;
+this.meta = meta;
+this.bodyByteCount = bodyByteCount;
+  }
+
+  @Override
+  public Type type() { return Type.UploadStream; }
+
+  @Override
+  public int encodedLength() {
+// the requestId, meta size, meta and bodyByteCount (body is not 
included)
+return 8 + 4 + ((int) meta.size()) + 8;
+  }
+
+  @Override
+  public void encode(ByteBuf buf) {
+buf.writeLong(requestId);
+try {
+  ByteBuffer metaBuf = meta.nioByteBuffer();
+  buf.writeInt(metaBuf.remaining());
+  buf.writeBytes(metaBuf);
+} catch (IOException io) {
+  throw new RuntimeException(io);
+}
+buf.writeLong(bodyByteCount);
+  }
+
+  public static UploadStream decode(ByteBuf buf) {
+long requestId = buf.readLong();
+int metaSize = buf.readInt();
+ManagedBuffer meta = new 
NettyManagedBuffer(buf.readRetainedSlice(metaSize));
+long bodyByteCount = buf.readLong();
+// This is called by the frame decoder, so the data is still null.  We 
need a StreamInterceptor
+// to read the data.
+return new UploadStream(requestId, meta, bodyByteCount);
+  }
+
+  @Override
+  public int hashCode() {
+return Objects.hashCode(requestId, body());
--- End diff --

The `equals()` and `hashCode()` implementations of this `UploadStream` 
class appear to differ slightly: the `equals()` method only checks equality of 
the `requestIds`, whereas this hashCode is checking both the `requestId` and 
the `body()`. I'm not sure what a `ManagedBuffer`'s `hashCode()` is: the 
`hashCode()` might not depend on the buffer contents, in which case this could 
lead to false hashCode mismatches for equal requests. Should we use just 
`requestId` here instead?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21346: [SPARK-6237][NETWORK] Network-layer changes to al...

2018-05-30 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/21346#discussion_r191938203
  
--- Diff: 
common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java
 ---
@@ -141,26 +141,14 @@ public void fetchChunk(
 StreamChunkId streamChunkId = new StreamChunkId(streamId, chunkIndex);
 handler.addFetchRequest(streamChunkId, callback);
 
-channel.writeAndFlush(new 
ChunkFetchRequest(streamChunkId)).addListener(future -> {
--- End diff --

Are the changes to these `.addListener()` calls primarily cleanup / 
refactoring? Is the intent to reduce the amount of _new_ duplicate code which 
would otherwise be added to `uploadStream` in this file?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21346: [SPARK-6237][NETWORK] Network-layer changes to al...

2018-05-30 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/21346#discussion_r191935821
  
--- Diff: 
common/network-common/src/main/java/org/apache/spark/network/client/StreamInterceptor.java
 ---
@@ -50,16 +52,22 @@
 
   @Override
   public void exceptionCaught(Throwable cause) throws Exception {
-handler.deactivateStream();
+deactivateStream();
 callback.onFailure(streamId, cause);
   }
 
   @Override
   public void channelInactive() throws Exception {
-handler.deactivateStream();
+deactivateStream();
 callback.onFailure(streamId, new ClosedChannelException());
   }
 
+  private void deactivateStream() {
+if (handler instanceof TransportResponseHandler) {
--- End diff --

Why don't we need to do this for `TransportRequestHandler`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21346: [SPARK-6237][NETWORK] Network-layer changes to al...

2018-05-26 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/21346#discussion_r191059478
  
--- Diff: 
common/network-common/src/test/java/org/apache/spark/network/StreamTestHelper.java
 ---
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network;
+
+import com.google.common.io.Files;
+import org.apache.spark.network.buffer.FileSegmentManagedBuffer;
--- End diff --

ooops, sorry got used to the style checker warning finding these in scala.  
fixed these now.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21346: [SPARK-6237][NETWORK] Network-layer changes to al...

2018-05-25 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/21346#discussion_r191001378
  
--- Diff: 
common/network-common/src/test/java/org/apache/spark/network/RpcIntegrationSuite.java
 ---
@@ -85,10 +96,52 @@ public void receive(TransportClient client, ByteBuffer 
message) {
 oneWayMsgs = new ArrayList<>();
   }
 
+  private static void receiveStream(String msg, StreamData streamData) {
+try {
+  if (msg.startsWith("fail/")) {
+String[] parts = msg.split("/");
+switch(parts[1]) {
--- End diff --

space before `(`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21346: [SPARK-6237][NETWORK] Network-layer changes to al...

2018-05-25 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/21346#discussion_r191002520
  
--- Diff: 
common/network-common/src/test/java/org/apache/spark/network/RpcIntegrationSuite.java
 ---
@@ -193,10 +299,78 @@ public void sendOneWayMessage() throws Exception {
 }
   }
 
+  @Test
+  public void sendRpcWithStreamOneAtATime() throws Exception {
+for (String stream: StreamTestHelper.STREAMS) {
--- End diff --

space before `:`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21346: [SPARK-6237][NETWORK] Network-layer changes to al...

2018-05-25 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/21346#discussion_r191003869
  
--- Diff: project/MimaExcludes.scala ---
@@ -36,6 +36,9 @@ object MimaExcludes {
 
   // Exclude rules for 2.4.x
   lazy val v24excludes = v23excludes ++ Seq(
+// [SPARK-6237][NETWORK] Network-layer changes to allow stream upload
+
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.network.netty.NettyBlockRpcServer.receive"),
--- End diff --

Kinda wondering why this class is public in the first place... along with 
`SparkTransportConf` in the same package.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21346: [SPARK-6237][NETWORK] Network-layer changes to al...

2018-05-25 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/21346#discussion_r190999338
  
--- Diff: 
common/network-common/src/main/java/org/apache/spark/network/server/StreamData.java
 ---
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.server;
+
+import org.apache.spark.network.client.RpcResponseCallback;
+import org.apache.spark.network.client.StreamCallback;
+import org.apache.spark.network.client.StreamInterceptor;
+import org.apache.spark.network.util.TransportFrameDecoder;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+/**
+ * A holder for streamed data sent along with an RPC message.
+ */
+public class StreamData {
+
+  private final TransportRequestHandler handler;
+  private final TransportFrameDecoder frameDecoder;
+  private final RpcResponseCallback rpcCallback;
+  private final ByteBuffer meta;
+  private final long streamByteCount;
+  private boolean hasCallback = false;
+
+  public StreamData(
+  TransportRequestHandler handler,
+  TransportFrameDecoder frameDecoder,
+  RpcResponseCallback rpcCallback,
+  ByteBuffer meta,
+  long streamByteCount) {
+this.handler = handler;
+this.frameDecoder = frameDecoder;
+this.rpcCallback = rpcCallback;
+this.meta = meta;
+this.streamByteCount = streamByteCount;
+  }
+
+  public boolean hasCallback() {
+return hasCallback;
+  }
+
+  /**
+   * Register callback to receive the streaming data.
+   *
+   * If an exception is thrown from the callback, it will be propogated 
back to the sender as an rpc
+   * failure.
+   * @param callback
--- End diff --

either remove or document all parameters (and add an empty line before).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21346: [SPARK-6237][NETWORK] Network-layer changes to al...

2018-05-25 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/21346#discussion_r191003553
  
--- Diff: 
common/network-common/src/test/java/org/apache/spark/network/StreamTestHelper.java
 ---
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network;
+
+import com.google.common.io.Files;
+import org.apache.spark.network.buffer.FileSegmentManagedBuffer;
--- End diff --

Wrong place... basically in every file you've changed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21346: [SPARK-6237][NETWORK] Network-layer changes to al...

2018-05-25 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/21346#discussion_r190999775
  
--- Diff: 
common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java
 ---
@@ -23,25 +23,16 @@
 import com.google.common.base.Throwables;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
+import org.apache.spark.network.protocol.*;
--- End diff --

These are in the wrong place.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21346: [SPARK-6237][NETWORK] Network-layer changes to al...

2018-05-25 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/21346#discussion_r191002393
  
--- Diff: 
common/network-common/src/test/java/org/apache/spark/network/RpcIntegrationSuite.java
 ---
@@ -130,6 +183,59 @@ public void onFailure(Throwable e) {
 return res;
   }
 
+  private RpcResult sendRpcWithStream(String... streams) throws Exception {
+TransportClient client = 
clientFactory.createClient(TestUtils.getLocalHost(), server.getPort());
+final Semaphore sem = new Semaphore(0);
+RpcResult res = new RpcResult();
+res.successMessages = Collections.synchronizedSet(new 
HashSet());
+res.errorMessages = Collections.synchronizedSet(new HashSet());
+
+for (String stream: streams) {
+  int idx = stream.lastIndexOf('/');
+  ManagedBuffer meta = new 
NioManagedBuffer(JavaUtils.stringToBytes(stream));
+  String streamName = (idx == -1) ? stream : stream.substring(idx + 1);
+  ManagedBuffer data = testData.openStream(conf, streamName);
+  client.uploadStream(meta, data, new RpcStreamCallback(stream, res, 
sem));
+}
+streamCallbacks.values().forEach(streamCallback -> {
+  try {
+
streamCallback.waitForCompletionAndVerify(TimeUnit.SECONDS.toMillis(5));
+  } catch (IOException e) {
+throw new RuntimeException(e);
+  }
+});
+
+
+if (!sem.tryAcquire(streams.length, 5, TimeUnit.SECONDS)) {
+  fail("Timeout getting response from the server");
+}
+client.close();
+return res;
+  }
+
+  private static class RpcStreamCallback implements RpcResponseCallback {
+final String streamId;
+final RpcResult res;
+final Semaphore sem;
+RpcStreamCallback(String streamId, RpcResult res, Semaphore sem) {
--- End diff --

add empty line


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21346: [SPARK-6237][NETWORK] Network-layer changes to al...

2018-05-25 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/21346#discussion_r190997078
  
--- Diff: 
common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java
 ---
@@ -244,6 +242,54 @@ public long sendRpc(ByteBuffer message, 
RpcResponseCallback callback) {
 return requestId;
   }
 
+  /**
+   * Send data to the remote end as a stream.   This differs from stream() 
in that this is a request
+   * to *send* data to the remote end, not to receive it from the remote.
+   *
+   * @param meta meta data associated with the stream, which will be read 
completely on the
+   * receiving end before the stream itself.
+   * @param data this will be streamed to the remote end to allow for 
transferring large amounts
+   * of data without reading into memory.
+   * @param callback handles the reply -- onSuccess will only be called 
when both message and data
+   * are received successfully.
+   */
+  public long uploadStream(
+  ManagedBuffer meta,
+  ManagedBuffer data,
+  RpcResponseCallback callback) {
+long startTime = System.currentTimeMillis();
+if (logger.isTraceEnabled()) {
+  logger.trace("Sending RPC to {}", getRemoteAddress(channel));
+}
+
+long requestId = Math.abs(UUID.randomUUID().getLeastSignificantBits());
+handler.addRpcRequest(requestId, callback);
+
+channel.writeAndFlush(new UploadStream(requestId, meta, data))
+.addListener(future -> {
+  if (future.isSuccess()) {
--- End diff --

First reaction is that it's about the right time to refactor this into a 
helper method... all instances in this class look quite similar.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21346: [SPARK-6237][NETWORK] Network-layer changes to al...

2018-05-25 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/21346#discussion_r191001160
  
--- Diff: 
common/network-common/src/test/java/org/apache/spark/network/RpcIntegrationSuite.java
 ---
@@ -17,17 +17,21 @@
 
 package org.apache.spark.network;
 
+import java.io.*;
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Set;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 
 import com.google.common.collect.Sets;
+import com.google.common.io.Files;
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.spark.network.buffer.ManagedBuffer;
--- End diff --

Wrong place.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21346: [SPARK-6237][NETWORK] Network-layer changes to al...

2018-05-25 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/21346#discussion_r190998463
  
--- Diff: 
common/network-common/src/main/java/org/apache/spark/network/server/RpcHandler.java
 ---
@@ -38,15 +38,24 @@
*
* This method will not be called in parallel for a single 
TransportClient (i.e., channel).
*
+   * The rpc *might* included a data stream in streamData(eg. 
for uploading a large
--- End diff --

space before `(`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21346: [SPARK-6237][NETWORK] Network-layer changes to al...

2018-05-25 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/21346#discussion_r190997532
  
--- Diff: 
common/network-common/src/main/java/org/apache/spark/network/protocol/UploadStream.java
 ---
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.protocol;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import com.google.common.base.Objects;
+import io.netty.buffer.ByteBuf;
+
+import org.apache.spark.network.buffer.ManagedBuffer;
+import org.apache.spark.network.buffer.NettyManagedBuffer;
+
+/**
+ * An RPC with data that is sent outside of the frame, so it can be read 
in a stream.
--- End diff --

as a stream?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21346: [SPARK-6237][NETWORK] Network-layer changes to al...

2018-05-25 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/21346#discussion_r191001733
  
--- Diff: 
common/network-common/src/test/java/org/apache/spark/network/RpcIntegrationSuite.java
 ---
@@ -130,6 +183,59 @@ public void onFailure(Throwable e) {
 return res;
   }
 
+  private RpcResult sendRpcWithStream(String... streams) throws Exception {
+TransportClient client = 
clientFactory.createClient(TestUtils.getLocalHost(), server.getPort());
+final Semaphore sem = new Semaphore(0);
+RpcResult res = new RpcResult();
+res.successMessages = Collections.synchronizedSet(new 
HashSet());
+res.errorMessages = Collections.synchronizedSet(new HashSet());
+
+for (String stream: streams) {
--- End diff --

space before `:`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21346: [SPARK-6237][NETWORK] Network-layer changes to al...

2018-05-16 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/21346#discussion_r188801902
  
--- Diff: project/MimaExcludes.scala ---
@@ -73,7 +73,10 @@ object MimaExcludes {
 
ProblemFilters.exclude[IncompatibleTemplateDefProblem]("org.apache.spark.ml.tree.InternalNode"),
 
ProblemFilters.exclude[IncompatibleTemplateDefProblem]("org.apache.spark.ml.tree.Node"),
 
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.ml.classification.DecisionTreeClassificationModel.this"),
-
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.ml.regression.DecisionTreeRegressionModel.this")
+
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.ml.regression.DecisionTreeRegressionModel.this"),
+
+// [SPARK-6237][NETWORK] Network-layer changes to allow stream upload
--- End diff --

I think we started adding these at the top since that is cleaner (doesn't 
require changing the previous exclusion rule).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21346: [SPARK-6237][NETWORK] Network-layer changes to al...

2018-05-16 Thread squito
GitHub user squito opened a pull request:

https://github.com/apache/spark/pull/21346

[SPARK-6237][NETWORK] Network-layer changes to allow stream upload.

These changes allow an RPCHandler to receive an upload as a stream of
data, without having to buffer the entire message in the FrameDecoder.
The primary use case is for replicating large blocks.

Added unit tests for handling streaming data, including successfully 
sending data, and failures in reading the stream with concurrent requests.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/squito/spark upload_stream

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/21346.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #21346


commit 49e0a80f89433368d3a3116eb9fcd7854ceecb62
Author: Imran Rashid 
Date:   2018-05-02T14:55:15Z

[SPARK-6237][NETWORK] Network-layer changes to allow stream upload.

These changes allow an RPCHandler to receive an upload as a stream of
data, without having to buffer the entire message in the FrameDecoder.
The primary use case is for replicating large blocks.

Added unit tests.




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org