[GitHub] [spark] vanzin commented on a change in pull request #25299: [SPARK-27651][Core] Avoid the network when shuffle blocks are fetched from the same host

2019-11-22 Thread GitBox
vanzin commented on a change in pull request #25299: [SPARK-27651][Core] Avoid 
the network when shuffle blocks are fetched from the same host
URL: https://github.com/apache/spark/pull/25299#discussion_r349760020
 
 

 ##
 File path: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/GetLocalDirsForExecutors.java
 ##
 @@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.shuffle.protocol;
+
+import com.google.common.base.Objects;
+import io.netty.buffer.ByteBuf;
+
+import org.apache.spark.network.protocol.Encoders;
+
+import java.util.Arrays;
 
 Review comment:
   Still not quite right. Grouping is:
   
   - java
   - scala
   - others
   - spark


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] vanzin commented on a change in pull request #25299: [SPARK-27651][Core] Avoid the network when shuffle blocks are fetched from the same host

2019-11-21 Thread GitBox
vanzin commented on a change in pull request #25299: [SPARK-27651][Core] Avoid 
the network when shuffle blocks are fetched from the same host
URL: https://github.com/apache/spark/pull/25299#discussion_r349368751
 
 

 ##
 File path: 
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
 ##
 @@ -420,12 +446,85 @@ final class ShuffleBlockFetcherIterator(
 }
   }
 
+  private[this] def fetchHostLocalBlock(
+  blockId: BlockId,
+  mapIndex: Int,
+  localDirs: Array[String],
+  blockManagerId: BlockManagerId): Boolean = {
+try {
+  val buf = blockManager.getHostLocalShuffleData(blockId, localDirs)
+  buf.retain()
+  results.put(SuccessFetchResult(blockId, mapIndex, blockManagerId, 
buf.size(), buf,
+isNetworkReqDone = false))
+  true
+} catch {
+  case e: Exception =>
+// If we see an exception, stop immediately.
+logError(s"Error occurred while fetching local blocks", e)
+results.put(FailureFetchResult(blockId, mapIndex, blockManagerId, e))
+false
+}
+  }
+
+  /**
+   * Fetch the host-local blocks while we are fetching remote blocks. This is 
ok because
+   * `ManagedBuffer`'s memory is allocated lazily when we create the input 
stream, so all we
+   * track in-memory are the ManagedBuffer references themselves.
+   */
+  private[this] def fetchHostLocalBlocks(hostLocalDirManager: 
HostLocalDirManager): Unit = {
+val cachedDirsByExec = hostLocalDirManager.getCachedHostLocalDirs()
+val (hostLocalBlocksWithCachedDirs, hostLocalBlocksWithMissingDirs) =
+  hostLocalBlocksByExecutor
+.map { case (hostLocalBmId, bmInfos) =>
+  (hostLocalBmId, bmInfos, 
cachedDirsByExec.get(hostLocalBmId.executorId))
+}.partition(_._3.isDefined)
+val bmId = blockManager.blockManagerId
+val immutableHostLocalBlocksWithoutDirs =
+  hostLocalBlocksWithMissingDirs.map { case (hostLocalBmId, bmInfos, _) =>
+hostLocalBmId -> bmInfos
+  }.toMap
+if (immutableHostLocalBlocksWithoutDirs.nonEmpty) {
+  logDebug(s"Asynchronous fetching host-local blocks without cached 
executors' dir: " +
+s"${immutableHostLocalBlocksWithoutDirs.mkString(", ")}")
+  hostLocalDirManager.getHostLocalDirs(
+immutableHostLocalBlocksWithoutDirs.keys.map(_.executorId).toArray, {
 
 Review comment:
   minor: if you use currying, this looks slightly better (the closure doesn't 
need to be inside the `()`).
   
   This line should also be indented a little more.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] vanzin commented on a change in pull request #25299: [SPARK-27651][Core] Avoid the network when shuffle blocks are fetched from the same host

2019-11-21 Thread GitBox
vanzin commented on a change in pull request #25299: [SPARK-27651][Core] Avoid 
the network when shuffle blocks are fetched from the same host
URL: https://github.com/apache/spark/pull/25299#discussion_r349369532
 
 

 ##
 File path: 
core/src/test/scala/org/apache/spark/shuffle/BlockStoreShuffleReaderSuite.scala
 ##
 @@ -20,6 +20,7 @@ package org.apache.spark.shuffle
 import java.io.{ByteArrayOutputStream, InputStream}
 import java.nio.ByteBuffer
 
+import org.mockito.ArgumentMatchers.{any, eq => meq}
 
 Review comment:
   `any` not used.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] vanzin commented on a change in pull request #25299: [SPARK-27651][Core] Avoid the network when shuffle blocks are fetched from the same host

2019-11-21 Thread GitBox
vanzin commented on a change in pull request #25299: [SPARK-27651][Core] Avoid 
the network when shuffle blocks are fetched from the same host
URL: https://github.com/apache/spark/pull/25299#discussion_r349364740
 
 

 ##
 File path: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/GetLocalDirsForExecutors.java
 ##
 @@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.shuffle.protocol;
+
+import com.google.common.base.Objects;
+import io.netty.buffer.ByteBuf;
+import org.apache.spark.network.protocol.Encoders;
+
+import java.util.Arrays;
 
 Review comment:
   nit: import order in this class and next is a bit messed up.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] vanzin commented on a change in pull request #25299: [SPARK-27651][Core] Avoid the network when shuffle blocks are fetched from the same host

2019-11-21 Thread GitBox
vanzin commented on a change in pull request #25299: [SPARK-27651][Core] Avoid 
the network when shuffle blocks are fetched from the same host
URL: https://github.com/apache/spark/pull/25299#discussion_r349369435
 
 

 ##
 File path: 
core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala
 ##
 @@ -96,6 +99,52 @@ class ExternalShuffleServiceSuite extends ShuffleSuite with 
BeforeAndAfterAll wi
 e.getMessage should include ("Fetch failure will not retry stage due to 
testing config")
   }
 
+  test("SPARK-27651: read host local shuffle blocks from disk and avoid 
network remote fetches") {
+val confWithHostLocalRead =
+  conf.clone.set(config.SHUFFLE_HOST_LOCAL_DISK_READING_ENABLED, true)
+
confWithHostLocalRead.set(config.STORAGE_LOCAL_DISK_BY_EXECUTORS_CACHE_SIZE, 5)
+sc = new SparkContext("local-cluster[2,1,1024]", "test", 
confWithHostLocalRead)
+sc.getConf.get(config.SHUFFLE_HOST_LOCAL_DISK_READING_ENABLED) should 
equal(true)
+sc.env.blockManager.externalShuffleServiceEnabled should equal(true)
+sc.env.blockManager.hostLocalDirManager.isDefined should equal(true)
+sc.env.blockManager.blockStoreClient.getClass should 
equal(classOf[ExternalBlockStoreClient])
+
+// In a slow machine, one slave may register hundreds of milliseconds 
ahead of the other one.
+// If we don't wait for all slaves, it's possible that only one executor 
runs all jobs. Then
+// all shuffle blocks will be in this executor, 
ShuffleBlockFetcherIterator will directly fetch
+// local blocks from the local BlockManager and won't send requests to 
ExternalShuffleService.
+// In this case, we won't receive FetchFailed. And it will make this test 
fail.
+// Therefore, we should wait until all slaves are up
+TestUtils.waitUntilExecutorsUp(sc, 2, 6)
+
+val rdd = sc.parallelize(0 until 1000, 10)
+  .map { i => (i, 1) }
+  .reduceByKey(_ + _)
+
+rdd.count()
+rdd.count()
+
+val cachedExecutors = rdd.mapPartitions { _ =>
+  SparkEnv.get.blockManager.hostLocalDirManager.map { localDirManager =>
+localDirManager.getCachedHostLocalDirs().keySet.iterator
+  }.getOrElse(Iterator.empty)
+}.collect().toSet
+
+// both executors are caching the dirs of the other one
+cachedExecutors should equal(sc.getExecutorIds().toSet)
+
+// Invalidate the registered executors, disallowing access to their 
shuffle blocks (without
+// deleting the actual shuffle files, so we could access them without the 
shuffle service).
+// As directories are already cached there is no request to external 
shuffle service.
+rpcHandler.applicationRemoved(sc.conf.getAppId, false /* cleanupLocalDirs 
*/)
+
+// Now Spark will not receive FetchFailed as host local blocks are read 
from the cached local
+// disk directly
+rdd.count()
 
 Review comment:
   Can you use just the job below (or just this one)? Or are both needed for 
some reason?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] vanzin commented on a change in pull request #25299: [SPARK-27651][Core] Avoid the network when shuffle blocks are fetched from the same host

2019-11-21 Thread GitBox
vanzin commented on a change in pull request #25299: [SPARK-27651][Core] Avoid 
the network when shuffle blocks are fetched from the same host
URL: https://github.com/apache/spark/pull/25299#discussion_r349364804
 
 

 ##
 File path: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/GetLocalDirsForExecutors.java
 ##
 @@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.shuffle.protocol;
+
+import com.google.common.base.Objects;
+import io.netty.buffer.ByteBuf;
+import org.apache.spark.network.protocol.Encoders;
+
+import java.util.Arrays;
+
+// Needed by ScalaDoc. See SPARK-7726
+import static 
org.apache.spark.network.shuffle.protocol.BlockTransferMessage.Type;
+
+/** Request to get the local dirs for the given executors. */
+public class GetLocalDirsForExecutors extends BlockTransferMessage {
+  public final String appId;
+
 
 Review comment:
   nit: remove


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] vanzin commented on a change in pull request #25299: [SPARK-27651][Core] Avoid the network when shuffle blocks are fetched from the same host

2019-11-21 Thread GitBox
vanzin commented on a change in pull request #25299: [SPARK-27651][Core] Avoid 
the network when shuffle blocks are fetched from the same host
URL: https://github.com/apache/spark/pull/25299#discussion_r349365348
 
 

 ##
 File path: 
common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/BlockTransferMessagesSuite.java
 ##
 @@ -23,6 +23,9 @@
 
 import org.apache.spark.network.shuffle.protocol.*;
 
+import java.util.HashMap;
 
 Review comment:
   nit: import order


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] vanzin commented on a change in pull request #25299: [SPARK-27651][Core] Avoid the network when shuffle blocks are fetched from the same host

2019-11-21 Thread GitBox
vanzin commented on a change in pull request #25299: [SPARK-27651][Core] Avoid 
the network when shuffle blocks are fetched from the same host
URL: https://github.com/apache/spark/pull/25299#discussion_r349365272
 
 

 ##
 File path: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/LocalDirsForExecutors.java
 ##
 @@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.shuffle.protocol;
+
+import com.google.common.base.Objects;
+import io.netty.buffer.ByteBuf;
+import org.apache.spark.network.protocol.Encoders;
+
+import java.util.*;
+
+// Needed by ScalaDoc. See SPARK-7726
+import static 
org.apache.spark.network.shuffle.protocol.BlockTransferMessage.Type;
+
+/** The reply to get local dirs giving back the dirs for each of the requested 
executors. */
+public class LocalDirsForExecutors extends BlockTransferMessage {
+  private final String[] execIds;
+  private final int[] numLocalDirsByExec;
+  private final String[] allLocalDirs;
+
+  public LocalDirsForExecutors(Map localDirsByExec) {
+this.execIds = new String[localDirsByExec.size()];
+this.numLocalDirsByExec = new int[localDirsByExec.size()];
+ArrayList localDirs = new ArrayList<>();
+int index = 0;
+for (Map.Entry e: localDirsByExec.entrySet()) {
+  execIds[index] = e.getKey();
+  numLocalDirsByExec[index] = e.getValue().length;
+  Collections.addAll(localDirs, e.getValue());
+  index++;
+}
+this.allLocalDirs = localDirs.toArray(new String[0]);
+  }
+
+  private LocalDirsForExecutors(String[] execIds, int[] numLocalDirsByExec, 
String[] allLocalDirs) {
+this.execIds = execIds;
+this.numLocalDirsByExec = numLocalDirsByExec;
+this.allLocalDirs = allLocalDirs;
+  }
+
+  @Override
+  protected Type type() { return Type.LOCAL_DIRS_FOR_EXECUTORS; }
+
+  @Override
+  public int hashCode() {
+return Arrays.hashCode(execIds);
+  }
+
+  @Override
+  public String toString() {
+return Objects.toStringHelper(this)
+  .add("execIds", Arrays.toString(execIds))
+  .add("numLocalDirsByExec", Arrays.toString(numLocalDirsByExec))
+  .add("allLocalDirs", Arrays.toString(allLocalDirs))
+  .toString();
+  }
+
+  @Override
+  public boolean equals(Object other) {
+if (other instanceof LocalDirsForExecutors) {
+  LocalDirsForExecutors o = (LocalDirsForExecutors) other;
+  return Arrays.equals(execIds, o.execIds)
+&& Arrays.equals(numLocalDirsByExec, o.numLocalDirsByExec)
+&& Arrays.equals(allLocalDirs, o.allLocalDirs);
+}
+return false;
+  }
+
+  @Override
+  public int encodedLength() {
+return Encoders.StringArrays.encodedLength(execIds)
+  + Encoders.IntArrays.encodedLength(numLocalDirsByExec)
+  + Encoders.StringArrays.encodedLength(allLocalDirs);
+  }
+
+  @Override
+  public void encode(ByteBuf buf) {
+Encoders.StringArrays.encode(buf, execIds);
+Encoders.IntArrays.encode(buf, numLocalDirsByExec);
+Encoders.StringArrays.encode(buf, allLocalDirs);
+  }
+
+  public static LocalDirsForExecutors decode(ByteBuf buf) {
+String[] execIds = Encoders.StringArrays.decode(buf);
+int[] numLocalDirsByExec = Encoders.IntArrays.decode(buf);
+String[] allLocalDirs = Encoders.StringArrays.decode(buf);
+return new LocalDirsForExecutors(execIds, numLocalDirsByExec, 
allLocalDirs);
+  }
+
+  public Map getLocalDirsByExec() {
+Map localDirsByExec = new HashMap<>();
+int index = 0;
+int localDirsIndex = 0;
+for (int length: numLocalDirsByExec) {
+  localDirsByExec.put(execIds[index],
+Arrays.copyOfRange(allLocalDirs, localDirsIndex, localDirsIndex + 
length));
+  localDirsIndex += length;
+  index++;
+}
+return  localDirsByExec;
 
 Review comment:
   nit: too many spaces


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

--

[GitHub] [spark] vanzin commented on a change in pull request #25299: [SPARK-27651][Core] Avoid the network when shuffle blocks are fetched from the same host

2019-11-19 Thread GitBox
vanzin commented on a change in pull request #25299: [SPARK-27651][Core] Avoid 
the network when shuffle blocks are fetched from the same host
URL: https://github.com/apache/spark/pull/25299#discussion_r348096296
 
 

 ##
 File path: core/src/main/scala/org/apache/spark/storage/BlockManager.scala
 ##
 @@ -33,6 +33,9 @@ import scala.util.Random
 import scala.util.control.NonFatal
 
 import com.codahale.metrics.{MetricRegistry, MetricSet}
+import com.google.common.cache.CacheBuilder
+import com.google.common.util.concurrent.FutureCallback
+import java.util
 
 Review comment:
   Doesn't look like you're using this (I only see `java.util.Map`, for 
example).


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] vanzin commented on a change in pull request #25299: [SPARK-27651][Core] Avoid the network when shuffle blocks are fetched from the same host

2019-11-19 Thread GitBox
vanzin commented on a change in pull request #25299: [SPARK-27651][Core] Avoid 
the network when shuffle blocks are fetched from the same host
URL: https://github.com/apache/spark/pull/25299#discussion_r348107457
 
 

 ##
 File path: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java
 ##
 @@ -369,6 +371,16 @@ public int removeBlocks(String appId, String execId, 
String[] blockIds) {
 return numRemovedBlocks;
   }
 
+  public Map getLocalDirs(String appId, String[] execIds) {
+return Arrays.stream(execIds)
+  .map(exec -> {
+ExecutorShuffleInfo info = executors.get(new AppExecId(appId, exec));
+return Pair.of(exec, (info != null) ? info.localDirs : null);
+  })
+  .filter(pair -> pair.getValue() != null)
 
 Review comment:
   After going through the block manager code again, I think it's better to 
throw an exception than to ignore this. It's unlikely to happen, but if it does 
(probably because of a bug somewhere instead of environmental issues), 
returning a different set of executors than the request will probably cause 
hard to debug issues in the application, so a useful exception at this point 
(instead of just an NPE) would be better for debugging.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] vanzin commented on a change in pull request #25299: [SPARK-27651][Core] Avoid the network when shuffle blocks are fetched from the same host

2019-11-19 Thread GitBox
vanzin commented on a change in pull request #25299: [SPARK-27651][Core] Avoid 
the network when shuffle blocks are fetched from the same host
URL: https://github.com/apache/spark/pull/25299#discussion_r348105864
 
 

 ##
 File path: 
core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
 ##
 @@ -121,37 +177,88 @@ class ShuffleBlockFetcherIteratorSuite extends 
SparkFunSuite with PrivateMethodT
   false)
 
 // 3 local blocks fetched in initialization
-verify(blockManager, times(3)).getBlockData(any())
+verify(blockManager, times(3)).getLocalBlockData(any())
 
-for (i <- 0 until 5) {
-  assert(iterator.hasNext, s"iterator should have 5 elements but actually 
has $i elements")
+val allBlocks = localBlocks ++ remoteBlocks ++ hostLocalBlocks
+for (i <- 0 until allBlocks.size) {
+  assert(iterator.hasNext,
+s"iterator should have ${allBlocks.size} elements but actually has $i 
elements")
   val (blockId, inputStream) = iterator.next()
 
   // Make sure we release buffers when a wrapped input stream is closed.
-  val mockBuf = localBlocks.getOrElse(blockId, remoteBlocks(blockId))
-  // Note: ShuffleBlockFetcherIterator wraps input streams in a 
BufferReleasingInputStream
-  val wrappedInputStream = 
inputStream.asInstanceOf[BufferReleasingInputStream]
-  verify(mockBuf, times(0)).release()
-  val delegateAccess = PrivateMethod[InputStream](Symbol("delegate"))
-
-  verify(wrappedInputStream.invokePrivate(delegateAccess()), 
times(0)).close()
-  wrappedInputStream.close()
-  verify(mockBuf, times(1)).release()
-  verify(wrappedInputStream.invokePrivate(delegateAccess()), 
times(1)).close()
-  wrappedInputStream.close() // close should be idempotent
-  verify(mockBuf, times(1)).release()
-  verify(wrappedInputStream.invokePrivate(delegateAccess()), 
times(1)).close()
+  val mockBuf = allBlocks(blockId)
+  verifyBufferRelease(mockBuf, inputStream)
 }
 
-// 3 local blocks, and 2 remote blocks
-// (but from the same block manager so one call to fetchBlocks)
-verify(blockManager, times(3)).getBlockData(any())
+// 4 host-local locks fetched
+verify(blockManager, times(4))
+  .getHostLocalShuffleData(any(), meq(Array("local-dir")))
+
+// 2 remote blocks are read from the same block manager
 verify(transfer, times(1)).fetchBlocks(any(), any(), any(), any(), any(), 
any())
+assert(blockManager.hostLocalDirManager.get.getCachedHostLocalDirs().size 
=== 1)
   }
 
-  test("fetch continuous blocks in batch successful 3 local reads + 2 remote 
reads") {
+  test("error during accessing host local dirs for executors") {
 val blockManager = mock(classOf[BlockManager])
-val localBmId = BlockManagerId("test-client", "test-client", 1)
+val localBmId = BlockManagerId("test-local-client", "test-local-host", 1)
+doReturn(localBmId).when(blockManager).blockManagerId
+val hostLocalBlocks = Map[BlockId, ManagedBuffer](
+  ShuffleBlockId(0, 1, 0) -> createMockManagedBuffer())
+
+hostLocalBlocks.foreach { case (blockId, buf) =>
+  doReturn(buf)
+.when(blockManager)
+.getHostLocalShuffleData(meq(blockId.asInstanceOf[ShuffleBlockId]), 
any())
+}
+val hostLocalBmId = BlockManagerId("test-host-local-client-1", 
"test-local-host", 3)
+
+val mockExternalBlockStoreClient = mock(classOf[ExternalBlockStoreClient])
+val hostLocalDirManager = new HostLocalDirManager(
+  futureExecutionContext = global,
+  cacheSize = 1,
+  externalBlockStoreClient = mockExternalBlockStoreClient,
+  host = "localhost",
+  externalShuffleServicePort = 7337)
+
+
when(blockManager.hostLocalDirManager).thenReturn(Some(hostLocalDirManager))
+when(mockExternalBlockStoreClient.getHostLocalDirs(any(), any(), any(), 
any()))
+  .thenAnswer((invocation: InvocationOnMock) => {
 
 Review comment:
   Same.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] vanzin commented on a change in pull request #25299: [SPARK-27651][Core] Avoid the network when shuffle blocks are fetched from the same host

2019-11-19 Thread GitBox
vanzin commented on a change in pull request #25299: [SPARK-27651][Core] Avoid 
the network when shuffle blocks are fetched from the same host
URL: https://github.com/apache/spark/pull/25299#discussion_r348090451
 
 

 ##
 File path: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java
 ##
 @@ -182,14 +183,54 @@ public void onSuccess(ByteBuffer response) {
   @Override
   public void onFailure(Throwable e) {
 logger.warn("Error trying to remove RDD blocks " + 
Arrays.toString(blockIds) +
-" via external shuffle service from executor: " + execId, e);
+  " via external shuffle service from executor: " + execId, e);
 numRemovedBlocksFuture.complete(0);
 client.close();
   }
 });
 return numRemovedBlocksFuture;
   }
 
+  public void getHostLocalDirs(
+  String host,
+  int port,
+  String[] execIds,
+  CompletableFuture> hostLocalDirsCompletable) {
+checkInit();
+GetLocalDirsForExecutors getLocalDirsMessage = new 
GetLocalDirsForExecutors(appId, execIds);
+try {
+  TransportClient client = clientFactory.createClient(host, port);
+  client.sendRpc(getLocalDirsMessage.toByteBuffer(), new 
RpcResponseCallback() {
+@Override
+public void onSuccess(ByteBuffer response) {
+  try {
+BlockTransferMessage msgObj = 
BlockTransferMessage.Decoder.fromByteBuffer(response);
+hostLocalDirsCompletable.complete(
+  ((LocalDirsForExecutors) msgObj).getLocalDirsByExec());
+  } catch (Throwable t) {
+logger.warn("Error trying to get the host local dirs for " +
+  Arrays.toString(getLocalDirsMessage.execIds) + " via external 
shuffle service",
+  t.getCause());
+hostLocalDirsCompletable.completeExceptionally(t);
+  } finally {
+client.close();
+  }
+}
+
+@Override
+public void onFailure(Throwable t) {
+  logger.warn("Error trying to get the host local dirs for " +
+Arrays.toString(getLocalDirsMessage.execIds) + " via external 
shuffle service",
+  t.getCause());
+hostLocalDirsCompletable.completeExceptionally(t);
 
 Review comment:
   indentation


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] vanzin commented on a change in pull request #25299: [SPARK-27651][Core] Avoid the network when shuffle blocks are fetched from the same host

2019-11-19 Thread GitBox
vanzin commented on a change in pull request #25299: [SPARK-27651][Core] Avoid 
the network when shuffle blocks are fetched from the same host
URL: https://github.com/apache/spark/pull/25299#discussion_r348104838
 
 

 ##
 File path: 
core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
 ##
 @@ -65,6 +65,29 @@ class ShuffleBlockFetcherIteratorSuite extends 
SparkFunSuite with PrivateMethodT
 transfer
   }
 
+  private def initHostLocalDirManager(
+  blockManager: BlockManager,
+  hostLocalDirs: Map[String, Array[String]]): Unit = {
+val mockExternalBlockStoreClient = mock(classOf[ExternalBlockStoreClient])
+val hostLocalDirManager = new HostLocalDirManager(
+  futureExecutionContext = global,
+  cacheSize = 1,
+  externalBlockStoreClient = mockExternalBlockStoreClient,
+  host = "localhost",
+  externalShuffleServicePort = 7337)
+
+
when(blockManager.hostLocalDirManager).thenReturn(Some(hostLocalDirManager))
+when(mockExternalBlockStoreClient.getHostLocalDirs(any(), any(), any(), 
any()))
+  .thenAnswer((invocation: InvocationOnMock) => {
 
 Review comment:
   `thenAnswer { invocation =>` works, doesn't it? If it does, it looks better.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] vanzin commented on a change in pull request #25299: [SPARK-27651][Core] Avoid the network when shuffle blocks are fetched from the same host

2019-11-19 Thread GitBox
vanzin commented on a change in pull request #25299: [SPARK-27651][Core] Avoid 
the network when shuffle blocks are fetched from the same host
URL: https://github.com/apache/spark/pull/25299#discussion_r348105943
 
 

 ##
 File path: 
core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
 ##
 @@ -121,37 +177,88 @@ class ShuffleBlockFetcherIteratorSuite extends 
SparkFunSuite with PrivateMethodT
   false)
 
 // 3 local blocks fetched in initialization
-verify(blockManager, times(3)).getBlockData(any())
+verify(blockManager, times(3)).getLocalBlockData(any())
 
-for (i <- 0 until 5) {
-  assert(iterator.hasNext, s"iterator should have 5 elements but actually 
has $i elements")
+val allBlocks = localBlocks ++ remoteBlocks ++ hostLocalBlocks
+for (i <- 0 until allBlocks.size) {
+  assert(iterator.hasNext,
+s"iterator should have ${allBlocks.size} elements but actually has $i 
elements")
   val (blockId, inputStream) = iterator.next()
 
   // Make sure we release buffers when a wrapped input stream is closed.
-  val mockBuf = localBlocks.getOrElse(blockId, remoteBlocks(blockId))
-  // Note: ShuffleBlockFetcherIterator wraps input streams in a 
BufferReleasingInputStream
-  val wrappedInputStream = 
inputStream.asInstanceOf[BufferReleasingInputStream]
-  verify(mockBuf, times(0)).release()
-  val delegateAccess = PrivateMethod[InputStream](Symbol("delegate"))
-
-  verify(wrappedInputStream.invokePrivate(delegateAccess()), 
times(0)).close()
-  wrappedInputStream.close()
-  verify(mockBuf, times(1)).release()
-  verify(wrappedInputStream.invokePrivate(delegateAccess()), 
times(1)).close()
-  wrappedInputStream.close() // close should be idempotent
-  verify(mockBuf, times(1)).release()
-  verify(wrappedInputStream.invokePrivate(delegateAccess()), 
times(1)).close()
+  val mockBuf = allBlocks(blockId)
+  verifyBufferRelease(mockBuf, inputStream)
 }
 
-// 3 local blocks, and 2 remote blocks
-// (but from the same block manager so one call to fetchBlocks)
-verify(blockManager, times(3)).getBlockData(any())
+// 4 host-local locks fetched
+verify(blockManager, times(4))
+  .getHostLocalShuffleData(any(), meq(Array("local-dir")))
+
+// 2 remote blocks are read from the same block manager
 verify(transfer, times(1)).fetchBlocks(any(), any(), any(), any(), any(), 
any())
+assert(blockManager.hostLocalDirManager.get.getCachedHostLocalDirs().size 
=== 1)
   }
 
-  test("fetch continuous blocks in batch successful 3 local reads + 2 remote 
reads") {
+  test("error during accessing host local dirs for executors") {
 val blockManager = mock(classOf[BlockManager])
-val localBmId = BlockManagerId("test-client", "test-client", 1)
+val localBmId = BlockManagerId("test-local-client", "test-local-host", 1)
+doReturn(localBmId).when(blockManager).blockManagerId
+val hostLocalBlocks = Map[BlockId, ManagedBuffer](
+  ShuffleBlockId(0, 1, 0) -> createMockManagedBuffer())
+
+hostLocalBlocks.foreach { case (blockId, buf) =>
+  doReturn(buf)
+.when(blockManager)
+.getHostLocalShuffleData(meq(blockId.asInstanceOf[ShuffleBlockId]), 
any())
+}
+val hostLocalBmId = BlockManagerId("test-host-local-client-1", 
"test-local-host", 3)
+
+val mockExternalBlockStoreClient = mock(classOf[ExternalBlockStoreClient])
+val hostLocalDirManager = new HostLocalDirManager(
+  futureExecutionContext = global,
+  cacheSize = 1,
+  externalBlockStoreClient = mockExternalBlockStoreClient,
+  host = "localhost",
+  externalShuffleServicePort = 7337)
+
+
when(blockManager.hostLocalDirManager).thenReturn(Some(hostLocalDirManager))
+when(mockExternalBlockStoreClient.getHostLocalDirs(any(), any(), any(), 
any()))
+  .thenAnswer((invocation: InvocationOnMock) => {
+val completableFuture = invocation.getArguments()(3)
+  .asInstanceOf[CompletableFuture[java.util.Map[String, 
Array[String
+Future {
 
 Review comment:
   ?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] vanzin commented on a change in pull request #25299: [SPARK-27651][Core] Avoid the network when shuffle blocks are fetched from the same host

2019-11-19 Thread GitBox
vanzin commented on a change in pull request #25299: [SPARK-27651][Core] Avoid 
the network when shuffle blocks are fetched from the same host
URL: https://github.com/apache/spark/pull/25299#discussion_r348095460
 
 

 ##
 File path: core/src/main/scala/org/apache/spark/storage/BlockManager.scala
 ##
 @@ -113,6 +116,47 @@ private[spark] class ByteBufferBlockData(
 
 }
 
+private[spark] class HostLocalDirManager(
+futureExecutionContext: ExecutionContext,
+cacheSize: Int,
+externalBlockStoreClient: ExternalBlockStoreClient,
+host: String,
+externalShuffleServicePort: Int) extends Logging {
+
+  private val executorIdToLocalDirsCache =
+CacheBuilder
+  .newBuilder()
+  .maximumSize(cacheSize)
+  .build[String, Array[String]]()
+
+  private[spark] def getCachedHostLocalDirs()
+  : scala.collection.Map[String, Array[String]] = 
executorIdToLocalDirsCache.synchronized {
+import scala.collection.JavaConverters._
+return executorIdToLocalDirsCache.asMap().asScala
+  }
+
+  private[spark] def getHostLocalDirs(
+  executorIds: Array[String],
+  callback: FutureCallback[java.util.Map[String, Array[String]]]): Unit = {
 
 Review comment:
   Is this ever called outside of core? Because you can't use guava types 
across modules (because of shading).
   
   Any reason why you didn't just go with my suggestion of using a closure that 
takes a `Try[java.util.Map[String, Array[String]]`? It looks much nicer in the 
caller (since you can use currying).


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] vanzin commented on a change in pull request #25299: [SPARK-27651][Core] Avoid the network when shuffle blocks are fetched from the same host

2019-11-19 Thread GitBox
vanzin commented on a change in pull request #25299: [SPARK-27651][Core] Avoid 
the network when shuffle blocks are fetched from the same host
URL: https://github.com/apache/spark/pull/25299#discussion_r348094293
 
 

 ##
 File path: 
core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala
 ##
 @@ -51,12 +52,36 @@ private[spark] class IndexShuffleBlockResolver(
 
   private val transportConf = SparkTransportConf.fromSparkConf(conf, "shuffle")
 
-  def getDataFile(shuffleId: Int, mapId: Long): File = {
-blockManager.diskBlockManager.getFile(ShuffleDataBlockId(shuffleId, mapId, 
NOOP_REDUCE_ID))
+
+  def getDataFile(shuffleId: Int, mapId: Long): File = getDataFile(shuffleId, 
mapId, None)
 
 Review comment:
   Use a default parameter like for `getIndexFile`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] vanzin commented on a change in pull request #25299: [SPARK-27651][Core] Avoid the network when shuffle blocks are fetched from the same host

2019-11-18 Thread GitBox
vanzin commented on a change in pull request #25299: [SPARK-27651][Core] Avoid 
the network when shuffle blocks are fetched from the same host
URL: https://github.com/apache/spark/pull/25299#discussion_r347532257
 
 

 ##
 File path: 
core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
 ##
 @@ -393,6 +402,7 @@ class BlockManagerMasterEndpoint(
   topologyMapper.getTopologyForHost(idWithoutTopologyInfo.host))
 
 val time = System.currentTimeMillis()
+executorIdToLocalDirs.get(id.executorId, () => localDirs)
 
 Review comment:
   But you are not using that pattern. You'd just adding something to the cache.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] vanzin commented on a change in pull request #25299: [SPARK-27651][Core] Avoid the network when shuffle blocks are fetched from the same host

2019-11-15 Thread GitBox
vanzin commented on a change in pull request #25299: [SPARK-27651][Core] Avoid 
the network when shuffle blocks are fetched from the same host
URL: https://github.com/apache/spark/pull/25299#discussion_r347016993
 
 

 ##
 File path: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java
 ##
 @@ -369,6 +371,12 @@ public int removeBlocks(String appId, String execId, 
String[] blockIds) {
 return numRemovedBlocks;
   }
 
+  public Map getLocalDirs(String appId, String[] execIds) {
+return Arrays.stream(execIds)
+  .map(exec -> Pair.of(exec, executors.get(new AppExecId(appId, 
exec)).localDirs))
 
 Review comment:
   I can't think of a case, but if it happens, do you want to throw an NPE or 
return as much data as is available?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] vanzin commented on a change in pull request #25299: [SPARK-27651][Core] Avoid the network when shuffle blocks are fetched from the same host

2019-11-15 Thread GitBox
vanzin commented on a change in pull request #25299: [SPARK-27651][Core] Avoid 
the network when shuffle blocks are fetched from the same host
URL: https://github.com/apache/spark/pull/25299#discussion_r346997499
 
 

 ##
 File path: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/LocalDirsForExecutors.java
 ##
 @@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.shuffle.protocol;
+
+import com.google.common.base.Objects;
+import io.netty.buffer.ByteBuf;
+import org.apache.spark.network.protocol.Encoders;
+
+import java.util.*;
+import java.util.Map.Entry;
+
+// Needed by ScalaDoc. See SPARK-7726
+
+
+/** The reply to get local dirs giving back the dirs for each of the requested 
executors. */
+public class LocalDirsForExecutors extends BlockTransferMessage {
+  private final String[] execIds;
+  private final int[] numLocalDirsByExec;
+  private final String[] allLocalDirs;
+
+  public LocalDirsForExecutors(Map localDirsByExec) {
+this.execIds = new String[localDirsByExec.size()];
+this.numLocalDirsByExec = new int[localDirsByExec.size()];
+ArrayList localDirs = new ArrayList<>();
+int index = 0;
+for (Entry e: localDirsByExec.entrySet()) {
+  execIds[index] = e.getKey();
+  numLocalDirsByExec[index] = e.getValue().length;
+  Collections.addAll(localDirs, e.getValue());
+  index++;
+}
+this.allLocalDirs = localDirs.toArray(new String[0]);
 
 Review comment:
   Read the javadoc for `toArray`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] vanzin commented on a change in pull request #25299: [SPARK-27651][Core] Avoid the network when shuffle blocks are fetched from the same host

2019-11-15 Thread GitBox
vanzin commented on a change in pull request #25299: [SPARK-27651][Core] Avoid 
the network when shuffle blocks are fetched from the same host
URL: https://github.com/apache/spark/pull/25299#discussion_r346953082
 
 

 ##
 File path: 
core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala
 ##
 @@ -190,7 +218,8 @@ private[spark] class IndexShuffleBlockResolver(
 }
   }
 
-  override def getBlockData(blockId: BlockId): ManagedBuffer = {
+  override def getBlockData(blockId: BlockId, dirs: Option[Array[String]])
+: ManagedBuffer = {
 
 Review comment:
   I don't think there's an "official" style for this case; but this line 
shouldn't be at the same indent level as the method implementation.
   
   So I'd use the multi-line argument style.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] vanzin commented on a change in pull request #25299: [SPARK-27651][Core] Avoid the network when shuffle blocks are fetched from the same host

2019-11-15 Thread GitBox
vanzin commented on a change in pull request #25299: [SPARK-27651][Core] Avoid 
the network when shuffle blocks are fetched from the same host
URL: https://github.com/apache/spark/pull/25299#discussion_r346950387
 
 

 ##
 File path: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/LocalDirsForExecutors.java
 ##
 @@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.shuffle.protocol;
+
+import com.google.common.base.Objects;
+import io.netty.buffer.ByteBuf;
+import org.apache.spark.network.protocol.Encoders;
+
+import java.util.*;
+import java.util.Map.Entry;
+
+// Needed by ScalaDoc. See SPARK-7726
+
+
+/** The reply to get local dirs giving back the dirs for each of the requested 
executors. */
+public class LocalDirsForExecutors extends BlockTransferMessage {
+  private final String[] execIds;
+  private final int[] numLocalDirsByExec;
+  private final String[] allLocalDirs;
+
+  public LocalDirsForExecutors(Map localDirsByExec) {
+this.execIds = new String[localDirsByExec.size()];
+this.numLocalDirsByExec = new int[localDirsByExec.size()];
+ArrayList localDirs = new ArrayList<>();
+int index = 0;
+for (Entry e: localDirsByExec.entrySet()) {
 
 Review comment:
   I prefer using `Map.Entry` to importing the nested type.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] vanzin commented on a change in pull request #25299: [SPARK-27651][Core] Avoid the network when shuffle blocks are fetched from the same host

2019-11-15 Thread GitBox
vanzin commented on a change in pull request #25299: [SPARK-27651][Core] Avoid 
the network when shuffle blocks are fetched from the same host
URL: https://github.com/apache/spark/pull/25299#discussion_r346952354
 
 

 ##
 File path: 
core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala
 ##
 @@ -51,12 +52,39 @@ private[spark] class IndexShuffleBlockResolver(
 
   private val transportConf = SparkTransportConf.fromSparkConf(conf, "shuffle")
 
-  def getDataFile(shuffleId: Int, mapId: Long): File = {
-blockManager.diskBlockManager.getFile(ShuffleDataBlockId(shuffleId, mapId, 
NOOP_REDUCE_ID))
+
+  def getDataFile(shuffleId: Int, mapId: Long): File = getDataFile(shuffleId, 
mapId, None)
+
+  /**
+   * Get the shuffle data file.
+   *
+   * When the dirs parameter is None then use the disk manager's local 
directories. Otherwise,
+   * read from the specified directories.
+   */
+   def getDataFile(
 
 Review comment:
   declaration fits in one line


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] vanzin commented on a change in pull request #25299: [SPARK-27651][Core] Avoid the network when shuffle blocks are fetched from the same host

2019-11-15 Thread GitBox
vanzin commented on a change in pull request #25299: [SPARK-27651][Core] Avoid 
the network when shuffle blocks are fetched from the same host
URL: https://github.com/apache/spark/pull/25299#discussion_r346947325
 
 

 ##
 File path: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java
 ##
 @@ -182,14 +183,54 @@ public void onSuccess(ByteBuffer response) {
   @Override
   public void onFailure(Throwable e) {
 logger.warn("Error trying to remove RDD blocks " + 
Arrays.toString(blockIds) +
-" via external shuffle service from executor: " + execId, e);
+  " via external shuffle service from executor: " + execId, e);
 numRemovedBlocksFuture.complete(0);
 client.close();
   }
 });
 return numRemovedBlocksFuture;
   }
 
+  public void getHostLocalDirs(
+  String host,
+  int port,
+  String[] execIds,
+  CompletableFuture> hostLocalDirsCompletable) {
+checkInit();
+GetLocalDirsForExecutors getLocalDirsMessage = new 
GetLocalDirsForExecutors(appId, execIds);
+try {
+  TransportClient client = clientFactory.createClient(host, port);
+  client.sendRpc(getLocalDirsMessage.toByteBuffer(), new 
RpcResponseCallback() {
+@Override
+public void onSuccess(ByteBuffer response) {
+  try {
+BlockTransferMessage msgObj = 
BlockTransferMessage.Decoder.fromByteBuffer(response);
+hostLocalDirsCompletable.complete(
+  ((LocalDirsForExecutors) msgObj).getLocalDirsByExec());
+  } catch (Throwable t) {
+logger.warn("Error trying to get the host local dirs for " +
+Arrays.toString(getLocalDirsMessage.execIds) + " via external 
shuffle service",
 
 Review comment:
   Indentation.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] vanzin commented on a change in pull request #25299: [SPARK-27651][Core] Avoid the network when shuffle blocks are fetched from the same host

2019-11-15 Thread GitBox
vanzin commented on a change in pull request #25299: [SPARK-27651][Core] Avoid 
the network when shuffle blocks are fetched from the same host
URL: https://github.com/apache/spark/pull/25299#discussion_r346953940
 
 

 ##
 File path: core/src/main/scala/org/apache/spark/storage/BlockManager.scala
 ##
 @@ -113,6 +114,41 @@ private[spark] class ByteBufferBlockData(
 
 }
 
+private[spark] class HostLocalDirManager(
+futureExecutionContext: ExecutionContext,
+cacheSize: Int,
+externalBlockStoreClient: ExternalBlockStoreClient,
+host: String,
+externalShuffleServicePort: Int) extends Logging {
+
+  private val executorIdToLocalDirsCache =
+CacheBuilder
+  .newBuilder()
+  .maximumSize(cacheSize)
+  .build[String, Array[String]]()
+
+  private[spark] def getCachedHostLocalDirs()
+: scala.collection.Map[String, Array[String]] = 
executorIdToLocalDirsCache.synchronized {
+import scala.collection.JavaConverters._
+return executorIdToLocalDirsCache.asMap().asScala
+  }
+
+  private[spark] def getHostLocalDirs(
+  executorIds: Array[String],
+  hostLocalDirsCompletable: CompletableFuture[java.util.Map[String, 
Array[String]]]): Unit = {
+Future {
 
 Review comment:
   You're mixing the Java API with the Scala API here and it becomes a little 
weird to follow what's going on where.
   
   For example, the `thenAccept` callback here will always run in the global 
execution context (because that's what `futureExecutionContext` is in your 
code). It also means the RPC will block one thread in that context, since `get` 
is blocking. That's not good.
   
   On the call site, though, it depends. If the RPC is ultra fast, the 
`whenComplete` will run in the same thread as the caller; or it will run in the 
RPC thread completing the fiuture, which will be different that the thread 
processing the `thenAccept`.
   
   I think instead you should be hiding these details from the caller. Have the 
caller provide a callback that will receive a `Try[java.util.Map[String, 
Array[String]]` when done, and have it always be executed asynchronously in the 
RPC thread completing the call. (That means setting up the `CompletableFuture` 
internally in this method before sending the RPC.)
   
   This avoid mixing the Java and Scala futures, and makes it easier to 
understand what's going on in what thread. (e.g., the caller will always be 
asynchronous)


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] vanzin commented on a change in pull request #25299: [SPARK-27651][Core] Avoid the network when shuffle blocks are fetched from the same host

2019-11-15 Thread GitBox
vanzin commented on a change in pull request #25299: [SPARK-27651][Core] Avoid 
the network when shuffle blocks are fetched from the same host
URL: https://github.com/apache/spark/pull/25299#discussion_r346972207
 
 

 ##
 File path: 
core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
 ##
 @@ -393,6 +402,7 @@ class BlockManagerMasterEndpoint(
   topologyMapper.getTopologyForHost(idWithoutTopologyInfo.host))
 
 val time = System.currentTimeMillis()
+executorIdToLocalDirs.get(id.executorId, () => localDirs)
 
 Review comment:
   Is this call to populate the cache? Why not use `put`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] vanzin commented on a change in pull request #25299: [SPARK-27651][Core] Avoid the network when shuffle blocks are fetched from the same host

2019-11-15 Thread GitBox
vanzin commented on a change in pull request #25299: [SPARK-27651][Core] Avoid 
the network when shuffle blocks are fetched from the same host
URL: https://github.com/apache/spark/pull/25299#discussion_r346979781
 
 

 ##
 File path: 
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
 ##
 @@ -420,12 +447,88 @@ final class ShuffleBlockFetcherIterator(
 }
   }
 
+  private[this] def fetchHostLocalBlock(
+  blockId: BlockId,
+  mapIndex: Int,
+  localDirs: Array[String],
+  blockManagerId: BlockManagerId): Boolean = {
+try {
+  val buf = blockManager.getHostLocalShuffleData(blockId, localDirs)
+  buf.retain()
+  results.put(SuccessFetchResult(blockId, mapIndex, blockManagerId, 
buf.size(), buf,
+isNetworkReqDone = false))
+  true
+} catch {
+  case e: Exception =>
+// If we see an exception, stop immediately.
+logError(s"Error occurred while fetching local blocks", e)
+results.put(FailureFetchResult(blockId, mapIndex, blockManagerId, e))
+false
+}
+  }
+
+  /**
+   * Fetch the host-local blocks while we are fetching remote blocks. This is 
ok because
+   * `ManagedBuffer`'s memory is allocated lazily when we create the input 
stream, so all we
+   * track in-memory are the ManagedBuffer references themselves.
+   */
+  private[this] def fetchHostLocalBlocks(hostLocalDirManager: 
HostLocalDirManager): Unit = {
+val cachedDirsByExec = hostLocalDirManager.getCachedHostLocalDirs()
+val (hostLocalBlocksWithCachedDirs, hostLocalBlocksWithMissingDirs) =
+  hostLocalBlocksByExecutor
+.map { case (hostLocalBmId, bmInfos) =>
+  (hostLocalBmId, bmInfos, 
cachedDirsByExec.get(hostLocalBmId.executorId))
+}.partition(_._3.isDefined)
+val bmId = blockManager.blockManagerId
+val immutableHostLocalBlocksWithoutDirs =
+  hostLocalBlocksWithMissingDirs.map { case (hostLocalBmId, bmInfos, _) =>
+hostLocalBmId -> bmInfos
+  }.toMap
+if (immutableHostLocalBlocksWithoutDirs.nonEmpty) {
+  logDebug(s"Asynchronous fetching host-local blocks without cached 
executors' dir: " +
+s"${immutableHostLocalBlocksWithoutDirs.mkString(", ")}")
+  val hostLocalDirsCompletable = new CompletableFuture[util.Map[String, 
Array[String]]]()
+  hostLocalDirManager.getHostLocalDirs(
+immutableHostLocalBlocksWithoutDirs.keys.map(_.executorId).toArray,
+hostLocalDirsCompletable)
+  hostLocalDirsCompletable.whenComplete((dirs, throwable) => {
+if (dirs != null ) {
+  immutableHostLocalBlocksWithoutDirs.foreach { case (hostLocalBmId, 
blockInfos) =>
+blockInfos.takeWhile { case (blockId, _, mapIndex) =>
+  fetchHostLocalBlock(
+blockId,
+mapIndex,
+dirs.get(hostLocalBmId.executorId),
+hostLocalBmId)
+}
+  }
+} else {
+  logError(s"Error occurred while fetching host local blocks", 
throwable)
+  val (hostLocalBmId, blockInfoSeq) = 
immutableHostLocalBlocksWithoutDirs.head
+  val (blockId, _, mapIndex) = blockInfoSeq.head
+  results.put(FailureFetchResult(blockId, mapIndex, hostLocalBmId, 
throwable))
 
 Review comment:
   If I'm following the code correctly, you're requiring that when this feature 
is on, that all reads from the same host are made from disk, right?
   
   Wouldn't it be better to allow remote requests if you detect that you're 
missing this information, and replace pending remote requests with local ones 
when this RPC finishes? Not sure how easy that would be though.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] vanzin commented on a change in pull request #25299: [SPARK-27651][Core] Avoid the network when shuffle blocks are fetched from the same host

2019-11-15 Thread GitBox
vanzin commented on a change in pull request #25299: [SPARK-27651][Core] Avoid 
the network when shuffle blocks are fetched from the same host
URL: https://github.com/apache/spark/pull/25299#discussion_r346948299
 
 

 ##
 File path: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleBlockResolver.java
 ##
 @@ -369,6 +371,12 @@ public int removeBlocks(String appId, String execId, 
String[] blockIds) {
 return numRemovedBlocks;
   }
 
+  public Map getLocalDirs(String appId, String[] execIds) {
+return Arrays.stream(execIds)
+  .map(exec -> Pair.of(exec, executors.get(new AppExecId(appId, 
exec)).localDirs))
 
 Review comment:
   If for some reason the exec ID is not known, won't this throw an NPE? 
Shouldn't you try to return as much data as possible instead of that happening?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] vanzin commented on a change in pull request #25299: [SPARK-27651][Core] Avoid the network when shuffle blocks are fetched from the same host

2019-11-15 Thread GitBox
vanzin commented on a change in pull request #25299: [SPARK-27651][Core] Avoid 
the network when shuffle blocks are fetched from the same host
URL: https://github.com/apache/spark/pull/25299#discussion_r346948729
 
 

 ##
 File path: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/GetLocalDirsForExecutors.java
 ##
 @@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.shuffle.protocol;
+
+import com.google.common.base.Objects;
+import io.netty.buffer.ByteBuf;
+import org.apache.spark.network.protocol.Encoders;
+
+import java.util.Arrays;
+
+// Needed by ScalaDoc. See SPARK-7726
 
 Review comment:
   What is needed by scaladoc?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] vanzin commented on a change in pull request #25299: [SPARK-27651][Core] Avoid the network when shuffle blocks are fetched from the same host

2019-11-15 Thread GitBox
vanzin commented on a change in pull request #25299: [SPARK-27651][Core] Avoid 
the network when shuffle blocks are fetched from the same host
URL: https://github.com/apache/spark/pull/25299#discussion_r346949222
 
 

 ##
 File path: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/LocalDirsForExecutors.java
 ##
 @@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.shuffle.protocol;
+
+import com.google.common.base.Objects;
+import io.netty.buffer.ByteBuf;
+import org.apache.spark.network.protocol.Encoders;
+
+import java.util.*;
+import java.util.Map.Entry;
+
+// Needed by ScalaDoc. See SPARK-7726
 
 Review comment:
   ?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] vanzin commented on a change in pull request #25299: [SPARK-27651][Core] Avoid the network when shuffle blocks are fetched from the same host

2019-11-15 Thread GitBox
vanzin commented on a change in pull request #25299: [SPARK-27651][Core] Avoid 
the network when shuffle blocks are fetched from the same host
URL: https://github.com/apache/spark/pull/25299#discussion_r346960133
 
 

 ##
 File path: core/src/main/scala/org/apache/spark/storage/BlockManager.scala
 ##
 @@ -113,6 +114,41 @@ private[spark] class ByteBufferBlockData(
 
 }
 
+private[spark] class HostLocalDirManager(
+futureExecutionContext: ExecutionContext,
+cacheSize: Int,
+externalBlockStoreClient: ExternalBlockStoreClient,
+host: String,
+externalShuffleServicePort: Int) extends Logging {
+
+  private val executorIdToLocalDirsCache =
+CacheBuilder
+  .newBuilder()
+  .maximumSize(cacheSize)
+  .build[String, Array[String]]()
+
+  private[spark] def getCachedHostLocalDirs()
+: scala.collection.Map[String, Array[String]] = 
executorIdToLocalDirsCache.synchronized {
 
 Review comment:
   indent more


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] vanzin commented on a change in pull request #25299: [SPARK-27651][Core] Avoid the network when shuffle blocks are fetched from the same host

2019-11-15 Thread GitBox
vanzin commented on a change in pull request #25299: [SPARK-27651][Core] Avoid 
the network when shuffle blocks are fetched from the same host
URL: https://github.com/apache/spark/pull/25299#discussion_r346950639
 
 

 ##
 File path: 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/LocalDirsForExecutors.java
 ##
 @@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.shuffle.protocol;
+
+import com.google.common.base.Objects;
+import io.netty.buffer.ByteBuf;
+import org.apache.spark.network.protocol.Encoders;
+
+import java.util.*;
+import java.util.Map.Entry;
+
+// Needed by ScalaDoc. See SPARK-7726
+
+
+/** The reply to get local dirs giving back the dirs for each of the requested 
executors. */
+public class LocalDirsForExecutors extends BlockTransferMessage {
+  private final String[] execIds;
+  private final int[] numLocalDirsByExec;
+  private final String[] allLocalDirs;
+
+  public LocalDirsForExecutors(Map localDirsByExec) {
+this.execIds = new String[localDirsByExec.size()];
+this.numLocalDirsByExec = new int[localDirsByExec.size()];
+ArrayList localDirs = new ArrayList<>();
+int index = 0;
+for (Entry e: localDirsByExec.entrySet()) {
+  execIds[index] = e.getKey();
+  numLocalDirsByExec[index] = e.getValue().length;
+  Collections.addAll(localDirs, e.getValue());
+  index++;
+}
+this.allLocalDirs = localDirs.toArray(new String[0]);
 
 Review comment:
   `new String[localDirs.length]`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] vanzin commented on a change in pull request #25299: [SPARK-27651][Core] Avoid the network when shuffle blocks are fetched from the same host

2019-11-15 Thread GitBox
vanzin commented on a change in pull request #25299: [SPARK-27651][Core] Avoid 
the network when shuffle blocks are fetched from the same host
URL: https://github.com/apache/spark/pull/25299#discussion_r346976078
 
 

 ##
 File path: 
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
 ##
 @@ -272,73 +280,92 @@ final class ShuffleBlockFetcherIterator(
 }
   }
 
-  private[this] def splitLocalRemoteBlocks(): ArrayBuffer[FetchRequest] = {
-// Make remote requests at most maxBytesInFlight / 5 in length; the reason 
to keep them
-// smaller than maxBytesInFlight is to allow multiple, parallel fetches 
from up to 5
-// nodes, rather than blocking on reading output from one node.
-val targetRequestSize = math.max(maxBytesInFlight / 5, 1L)
-logDebug("maxBytesInFlight: " + maxBytesInFlight + ", targetRequestSize: " 
+ targetRequestSize
-  + ", maxBlocksInFlightPerAddress: " + maxBlocksInFlightPerAddress)
-
-// Split local and remote blocks. Remote blocks are further split into 
FetchRequests of size
-// at most maxBytesInFlight in order to limit the amount of data in flight.
-val remoteRequests = new ArrayBuffer[FetchRequest]
+  private[this] def partitionBlocksByFetchMode(): ArrayBuffer[FetchRequest] = {
+logDebug(s"maxBytesInFlight: $maxBytesInFlight, targetRemoteRequestSize: "
+  + s"$targetRemoteRequestSize, maxBlocksInFlightPerAddress: 
$maxBlocksInFlightPerAddress")
+
+// Partition to local, host-local and remote blocks. Remote blocks are 
further split into
+// FetchRequests of size at most maxBytesInFlight in order to limit the 
amount of data in flight
+val collectedRemoteRequests = new ArrayBuffer[FetchRequest]
 var localBlockBytes = 0L
+var hostLocalBlockBytes = 0L
 var remoteBlockBytes = 0L
+var numRemoteBlocks = 0
+
+val hostLocalDirReadingEnabled =
+  blockManager.hostLocalDirManager != null && 
blockManager.hostLocalDirManager.isDefined
 
 for ((address, blockInfos) <- blocksByAddress) {
   if (address.executorId == blockManager.blockManagerId.executorId) {
-blockInfos.find(_._2 <= 0) match {
-  case Some((blockId, size, _)) if size < 0 =>
-throw new BlockException(blockId, "Negative block size " + size)
-  case Some((blockId, size, _)) if size == 0 =>
-throw new BlockException(blockId, "Zero-sized blocks should be 
excluded.")
-  case None => // do nothing.
-}
+checkBlockSizes(blockInfos)
 val mergedBlockInfos = mergeContinuousShuffleBlockIdsIfNeeded(
   blockInfos.map(info => FetchBlockInfo(info._1, info._2, 
info._3)).to[ArrayBuffer])
 localBlocks ++= mergedBlockInfos.map(info => (info.blockId, 
info.mapIndex))
 localBlockBytes += mergedBlockInfos.map(_.size).sum
+  } else if (hostLocalDirReadingEnabled && address.host == 
blockManager.blockManagerId.host) {
+checkBlockSizes(blockInfos)
+val mergedBlockInfos = mergeContinuousShuffleBlockIdsIfNeeded(
+  blockInfos.map(info => FetchBlockInfo(info._1, info._2, 
info._3)).to[ArrayBuffer])
+val blocksForAddress =
+  mergedBlockInfos.map(info => (info.blockId, info.size, 
info.mapIndex))
+hostLocalBlocksByExecutor += address -> blocksForAddress
+hostLocalBlocks ++= blocksForAddress.map(info => (info._1, info._3))
+hostLocalBlockBytes += mergedBlockInfos.map(_.size).sum
   } else {
-val iterator = blockInfos.iterator
-var curRequestSize = 0L
-var curBlocks = new ArrayBuffer[FetchBlockInfo]
-while (iterator.hasNext) {
-  val (blockId, size, mapIndex) = iterator.next()
-  remoteBlockBytes += size
-  if (size < 0) {
-throw new BlockException(blockId, "Negative block size " + size)
-  } else if (size == 0) {
-throw new BlockException(blockId, "Zero-sized blocks should be 
excluded.")
-  } else {
-curBlocks += FetchBlockInfo(blockId, size, mapIndex)
-curRequestSize += size
-  }
-  if (curRequestSize >= targetRequestSize ||
-  curBlocks.size >= maxBlocksInFlightPerAddress) {
-// Add this FetchRequest
-val mergedBlocks = 
mergeContinuousShuffleBlockIdsIfNeeded(curBlocks)
-remoteBlocks ++= mergedBlocks.map(_.blockId)
-remoteRequests += new FetchRequest(address, mergedBlocks)
-logDebug(s"Creating fetch request of $curRequestSize at $address "
-  + s"with ${mergedBlocks.size} blocks")
-curBlocks = new ArrayBuffer[FetchBlockInfo]
-curRequestSize = 0
-  }
-}
-// Add in the final request
-if (curBlocks.nonEmpty) {
-  val mergedBlocks = mergeContinuousShuffleBlockIdsIfNeeded(curBlocks)
-  remoteBlocks ++= mergedBlocks.map(_.blockId)
-  remoteRequests += new FetchRequ

[GitHub] [spark] vanzin commented on a change in pull request #25299: [SPARK-27651][Core] Avoid the network when shuffle blocks are fetched from the same host

2019-11-15 Thread GitBox
vanzin commented on a change in pull request #25299: [SPARK-27651][Core] Avoid 
the network when shuffle blocks are fetched from the same host
URL: https://github.com/apache/spark/pull/25299#discussion_r346951488
 
 

 ##
 File path: core/src/main/scala/org/apache/spark/SparkContext.scala
 ##
 @@ -44,7 +44,7 @@ import org.apache.spark.deploy.{LocalSparkCluster, 
SparkHadoopUtil}
 import org.apache.spark.deploy.StandaloneResourceUtils._
 import org.apache.spark.executor.ExecutorMetrics
 import org.apache.spark.input.{FixedLengthBinaryInputFormat, 
PortableDataStream, StreamInputFormat, WholeTextFileInputFormat}
-import org.apache.spark.internal.Logging
+import org.apache.spark.internal.{config, Logging}
 
 Review comment:
   `config._` is already imported below


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] vanzin commented on a change in pull request #25299: [SPARK-27651][Core] Avoid the network when shuffle blocks are fetched from the same host

2019-11-14 Thread GitBox
vanzin commented on a change in pull request #25299: [SPARK-27651][Core] Avoid 
the network when shuffle blocks are fetched from the same host
URL: https://github.com/apache/spark/pull/25299#discussion_r346459236
 
 

 ##
 File path: 
common/network-common/src/main/java/org/apache/spark/network/client/AsyncResponseCallback.java
 ##
 @@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.client;
+
+/**
+ * Callback for the result of a single call.
+ * This will be invoked once with either success or failure.
+ */
+public interface AsyncResponseCallback {
 
 Review comment:
   Do this then.
   
   - Get rid of this interface. It doesn't really belong in network-common 
anyway. At worst it would belong in network-shuffle, but I think it's not 
needed.
   - Make `getHostLocalDirs` return a `java.util.concurrent.Future`, and 
internally use a `java.util.concurrent.CompletableFuture` to update it.
   
   You should also use `sendRpc` instead of `sendRpcSync` + a thread pool. That 
makes retries a little more complicated (but not much; `fetchBlocks` handles 
that, and in this case should be much simpler).
   
   Or, to simplify, you could just avoid retries altogether. How often will the 
call fail, really? And if it does, you just lose an optimization; you can try 
again next time there is a shuffle.
   
   On the Scala side, just wrap the Java future:
   ```
   val scalaFuture = Future { javaFuture.get }
   ```
   
   Or expose `CompletableFuture` and use its callback methods directly.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] vanzin commented on a change in pull request #25299: [SPARK-27651][Core] Avoid the network when shuffle blocks are fetched from the same host

2019-11-14 Thread GitBox
vanzin commented on a change in pull request #25299: [SPARK-27651][Core] Avoid 
the network when shuffle blocks are fetched from the same host
URL: https://github.com/apache/spark/pull/25299#discussion_r346437749
 
 

 ##
 File path: 
common/network-common/src/main/java/org/apache/spark/network/client/AsyncResponseCallback.java
 ##
 @@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.client;
+
+/**
+ * Callback for the result of a single call.
+ * This will be invoked once with either success or failure.
+ */
+public interface AsyncResponseCallback {
 
 Review comment:
   Ok I missed that it's used in `ExternalBlockStoreClient` which is not in 
core. Still, it doesn't look like it belongs in this module for the reason I 
explained - it's exposing functionality that is intentionally not provided by 
this module.
   
   Looking at it, it seems that `ExternalBlockStoreClient.getHostLocalDirs` 
could be implemented in `BlockManager`, thus avoiding this. It's just sending 
an RPC, so all you need are the request and response types defined in this 
library, and the code to actually call `sendRpc` could be in `BlockManager`. 
Then you could just use a `Promise`, which is both async (if needed) and typed, 
in the call sites.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] vanzin commented on a change in pull request #25299: [SPARK-27651][Core] Avoid the network when shuffle blocks are fetched from the same host

2019-11-14 Thread GitBox
vanzin commented on a change in pull request #25299: [SPARK-27651][Core] Avoid 
the network when shuffle blocks are fetched from the same host
URL: https://github.com/apache/spark/pull/25299#discussion_r346415943
 
 

 ##
 File path: 
common/network-common/src/main/java/org/apache/spark/network/client/AsyncResponseCallback.java
 ##
 @@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.client;
+
+/**
+ * Callback for the result of a single call.
+ * This will be invoked once with either success or failure.
+ */
+public interface AsyncResponseCallback {
 
 Review comment:
   Ok, the only difference I noticed is that this has a type parameter. So it 
has nothing to do with it being async, but with deserialization. Which makes me 
think that it doesn't belong here, since this library intentionally does not 
expose serialization functionality to its users.
   
   (The fact that no method in this library even uses this type directly is 
another tell that it doesn't really belong here.)
   
   Instead, you should have something in core if you want to have a typed 
response callback and reuse it in those two methods. e.g. a `Promise`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] vanzin commented on a change in pull request #25299: [SPARK-27651][Core] Avoid the network when shuffle blocks are fetched from the same host

2019-11-14 Thread GitBox
vanzin commented on a change in pull request #25299: [SPARK-27651][Core] Avoid 
the network when shuffle blocks are fetched from the same host
URL: https://github.com/apache/spark/pull/25299#discussion_r346412640
 
 

 ##
 File path: 
common/network-common/src/main/java/org/apache/spark/network/client/AsyncResponseCallback.java
 ##
 @@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.client;
+
+/**
+ * Callback for the result of a single call.
+ * This will be invoked once with either success or failure.
+ */
+public interface AsyncResponseCallback {
 
 Review comment:
   That's not my question though. My question is what is different? Because to 
me, other than the name, this interface is exactly the same as the existing 
one. I don't see any special handling of it in your code, or really anything 
that tells me it has different functionality.
   
   In fact, in all the places I noticed, `onSuccess` and `onFailure` are being 
called just like in a regular `RpcResponseCallback`, so I'm not even seeing 
anything "async" about its use at all.
   
   So why does it have to exist?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] vanzin commented on a change in pull request #25299: [SPARK-27651][Core] Avoid the network when shuffle blocks are fetched from the same host

2019-11-13 Thread GitBox
vanzin commented on a change in pull request #25299: [SPARK-27651][Core] Avoid 
the network when shuffle blocks are fetched from the same host
URL: https://github.com/apache/spark/pull/25299#discussion_r346059908
 
 

 ##
 File path: 
common/network-common/src/main/java/org/apache/spark/network/client/AsyncResponseCallback.java
 ##
 @@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.client;
+
+/**
+ * Callback for the result of a single call.
+ * This will be invoked once with either success or failure.
+ */
+public interface AsyncResponseCallback {
 
 Review comment:
   I'm not sure why you need this interface? What in the existing 
`RpcResponseCallback` interface makes it not suitable for asynchronous replies?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] vanzin commented on a change in pull request #25299: [SPARK-27651][Core] Avoid the network when shuffle blocks are fetched from the same host

2019-08-27 Thread GitBox
vanzin commented on a change in pull request #25299: [SPARK-27651][Core] Avoid 
the network when shuffle blocks are fetched from the same host
URL: https://github.com/apache/spark/pull/25299#discussion_r318239195
 
 

 ##
 File path: 
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
 ##
 @@ -358,12 +382,65 @@ final class ShuffleBlockFetcherIterator(
 }
   }
 
+
+  /**
+   * Fetch the host-local blocks while we are fetching remote blocks. This is 
ok because
+   * `ManagedBuffer`'s memory is allocated lazily when we create the input 
stream, so all we
+   * track in-memory are the ManagedBuffer references themselves.
+   */
+  private[this] def fetchHostLocalBlocks() {
+logDebug(s"Start fetching host-local blocks: ${hostLocalBlocks.mkString(", 
")}")
+val hostLocalExecutorIds = 
hostLocalBlocksByExecutor.keySet.map(_.executorId)
+val readsWithoutLocalDir = LinkedHashMap[BlockManagerId, Seq[(BlockId, 
Long)]]()
+val localDirsByExec = 
blockManager.getHostLocalDirs(hostLocalExecutorIds.toArray)
+hostLocalBlocksByExecutor.foreach { case (bmId, blockInfos) =>
+  val localDirs = localDirsByExec.get(bmId.executorId)
+  if (localDirs.isDefined) {
+blockInfos.foreach {  case (blockId, _) =>
+  try {
+val buf = blockManager
+  .getHostLocalShuffleData(blockId.asInstanceOf[ShuffleBlockId], 
localDirs.get)
 
 Review comment:
   Actually this is a good point. I think it might be a good idea for 
`blockManager.getHostLocalDirs` to be more asynchronous: return whatever 
current data it has about local dirs, and asynchronously ask the driver the 
location of local dirs for the missing executors.
   
   This would allow the shuffle to make progress without synchronizing all 
tasks on the driver to retrieve this information. You'll still send potentially 
a lot of messages to the driver in that initial burst of activity, but at least 
it won't actually slow down the shuffle.
   
   The RPC to the driver timing out is not a big issue for this code (since the 
executor knows how to get the data through other means), but it would indicate 
that the driver is backed up for other reasons. I don't think this particular 
RPC by itself would cause those issues.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] vanzin commented on a change in pull request #25299: [SPARK-27651][Core] Avoid the network when shuffle blocks are fetched from the same host

2019-08-26 Thread GitBox
vanzin commented on a change in pull request #25299: [SPARK-27651][Core] Avoid 
the network when shuffle blocks are fetched from the same host
URL: https://github.com/apache/spark/pull/25299#discussion_r317803799
 
 

 ##
 File path: core/src/main/scala/org/apache/spark/storage/BlockManager.scala
 ##
 @@ -995,6 +1003,19 @@ private[spark] class BlockManager(
 None
   }
 
+  private[spark] def getHostLocalDirs(executorIds: Array[String])
+  : scala.collection.Map[String, Array[String]] = {
+val cachedItems = 
executorIdToLocalDirsCache.filterKeys(executorIds.contains(_))
+if (cachedItems.size < executorIds.length) {
+  val notCachedItems = master
+
.getHostLocalDirs(executorIds.filterNot(executorIdToLocalDirsCache.contains))
+  executorIdToLocalDirsCache ++= notCachedItems
 
 Review comment:
   I think it's ok for now but might be good to add a TODO comment; there may 
be really weird edge cases, especially for long running apps (e.g. a Thrift 
Server instance with dynamic allocation on).


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] vanzin commented on a change in pull request #25299: [SPARK-27651][Core] Avoid the network when shuffle blocks are fetched from the same host

2019-08-26 Thread GitBox
vanzin commented on a change in pull request #25299: [SPARK-27651][Core] Avoid 
the network when shuffle blocks are fetched from the same host
URL: https://github.com/apache/spark/pull/25299#discussion_r317802789
 
 

 ##
 File path: core/src/main/scala/org/apache/spark/storage/BlockManager.scala
 ##
 @@ -995,6 +1003,19 @@ private[spark] class BlockManager(
 None
   }
 
+  private[spark] def getHostLocalDirs(executorIds: Array[String])
+  : scala.collection.Map[String, Array[String]] = {
 
 Review comment:
   Sounds like a small gain given the rest of what the method is doing, but ok.
   
   On another topic, shouldn't this method be `synchronized`? Since it may 
actually modify the cache.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] vanzin commented on a change in pull request #25299: [SPARK-27651][Core] Avoid the network when shuffle blocks are fetched from the same host

2019-08-20 Thread GitBox
vanzin commented on a change in pull request #25299: [SPARK-27651][Core] Avoid 
the network when shuffle blocks are fetched from the same host
URL: https://github.com/apache/spark/pull/25299#discussion_r315847443
 
 

 ##
 File path: 
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
 ##
 @@ -379,6 +456,13 @@ final class ShuffleBlockFetcherIterator(
 // Get Local Blocks
 fetchLocalBlocks()
 logDebug(s"Got local blocks in ${Utils.getUsedTimeNs(startTimeNs)}")
+
+// Get Host-local Blocks
 
 Review comment:
   Just repeats the code.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] vanzin commented on a change in pull request #25299: [SPARK-27651][Core] Avoid the network when shuffle blocks are fetched from the same host

2019-08-20 Thread GitBox
vanzin commented on a change in pull request #25299: [SPARK-27651][Core] Avoid 
the network when shuffle blocks are fetched from the same host
URL: https://github.com/apache/spark/pull/25299#discussion_r315841144
 
 

 ##
 File path: 
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
 ##
 @@ -74,16 +76,22 @@ final class ShuffleBlockFetcherIterator(
 maxReqSizeShuffleToMem: Long,
 detectCorrupt: Boolean,
 detectCorruptUseExtraMemory: Boolean,
-shuffleMetrics: ShuffleReadMetricsReporter)
+shuffleMetrics: ShuffleReadMetricsReporter,
+enableHostLocalDiskReading: Boolean = true)
   extends Iterator[(BlockId, InputStream)] with DownloadFileManager with 
Logging {
 
   import ShuffleBlockFetcherIterator._
 
+  // Make remote requests at most maxBytesInFlight / 5 in length; the reason 
to keep them
+  // smaller than maxBytesInFlight is to allow multiple, parallel fetches from 
up to 5
+  // nodes, rather than blocking on reading output from one node.
+  val targetRemoteRequestSize = math.max(maxBytesInFlight / 5, 1L)
+
   /**
* Total number of blocks to fetch. This should be equal to the total number 
of blocks
* in [[blocksByAddress]] because we already filter out zero-sized blocks in 
[[blocksByAddress]].
*
-   * This should equal localBlocks.size + remoteBlocks.size.
+   * This should equal localBlocks.size + remoteBlocks.size + 
hostLocalBlocks.size
 
 Review comment:
   Since you're updating this, you're removing `remoteBlocks`, so...


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] vanzin commented on a change in pull request #25299: [SPARK-27651][Core] Avoid the network when shuffle blocks are fetched from the same host

2019-08-20 Thread GitBox
vanzin commented on a change in pull request #25299: [SPARK-27651][Core] Avoid 
the network when shuffle blocks are fetched from the same host
URL: https://github.com/apache/spark/pull/25299#discussion_r315854332
 
 

 ##
 File path: 
core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
 ##
 @@ -120,32 +160,65 @@ class ShuffleBlockFetcherIteratorSuite extends 
SparkFunSuite with PrivateMethodT
   metrics)
 
 // 3 local blocks fetched in initialization
-verify(blockManager, times(3)).getBlockData(any())
-
-for (i <- 0 until 5) {
-  assert(iterator.hasNext, s"iterator should have 5 elements but actually 
has $i elements")
+verify(blockManager, times(3)).getLocalBlockData(any())
+// 4 host-local locks fetched in initialization
+verify(blockManager, times(4))
+  .getHostLocalShuffleData(any(), isA(classOf[Array[String]]))
+
+val allBlocks = localBlocks ++ remoteBlocks ++ remoteBlocksAsFallback ++ 
hostLocalBlocks
+for (i <- 0 until allBlocks.size) {
+  assert(iterator.hasNext,
+s"iterator should have ${allBlocks.size} elements but actually has $i 
elements")
   val (blockId, inputStream) = iterator.next()
 
   // Make sure we release buffers when a wrapped input stream is closed.
-  val mockBuf = localBlocks.getOrElse(blockId, remoteBlocks(blockId))
-  // Note: ShuffleBlockFetcherIterator wraps input streams in a 
BufferReleasingInputStream
-  val wrappedInputStream = 
inputStream.asInstanceOf[BufferReleasingInputStream]
-  verify(mockBuf, times(0)).release()
-  val delegateAccess = PrivateMethod[InputStream]('delegate)
-
-  verify(wrappedInputStream.invokePrivate(delegateAccess()), 
times(0)).close()
-  wrappedInputStream.close()
-  verify(mockBuf, times(1)).release()
-  verify(wrappedInputStream.invokePrivate(delegateAccess()), 
times(1)).close()
-  wrappedInputStream.close() // close should be idempotent
-  verify(mockBuf, times(1)).release()
-  verify(wrappedInputStream.invokePrivate(delegateAccess()), 
times(1)).close()
+  val mockBuf = allBlocks(blockId)
+  verifyBufferRelease(mockBuf, inputStream)
 }
 
-// 3 local blocks, and 2 remote blocks
-// (but from the same block manager so one call to fetchBlocks)
-verify(blockManager, times(3)).getBlockData(any())
-verify(transfer, times(1)).fetchBlocks(any(), any(), any(), any(), any(), 
any())
+// 2 remote blocks are read from the same block manager and 1 host-local 
is fall back on to
+// a remote read so there are 2 calls to fetchBlock
+verify(transfer, times(2)).fetchBlocks(any(), any(), any(), any(), any(), 
any())
+  }
+
+  test("only 1 host local without local dir which falls back to remote fetch") 
{
+val blockManager = mock(classOf[BlockManager])
+val localBmId = BlockManagerId("test-local-client", "test-local-host", 1)
+doReturn(localBmId).when(blockManager).blockManagerId
+
+val blockId1 = ShuffleBlockId(0, 5, 0)
+val mockBuf = createMockManagedBuffer()
+val remoteBlocksAsFallback = Map[BlockId, ManagedBuffer](blockId1 -> 
mockBuf)
+val transfer = createMockTransfer(remoteBlocksAsFallback)
+val hostLocalBmIdNoLocDir = BlockManagerId("test-host-local-client-2", 
"test-local-host", 4)
+
+doReturn(Map("test-host-local-client-1" -> Array("local-dir")))
+  .when(blockManager).getHostLocalDirs(any())
+
+val blocksByAddress = Seq[(BlockManagerId, Seq[(BlockId, Long)])](
+  (hostLocalBmIdNoLocDir, remoteBlocksAsFallback.keys.map(blockId => 
(blockId, 1L)).toSeq)
+).toIterator
+
+val taskContext = TaskContext.empty()
+val metrics = taskContext.taskMetrics.createTempShuffleReadMetrics()
+val iterator = new ShuffleBlockFetcherIterator(
+  taskContext,
+  transfer,
+  blockManager,
+  blocksByAddress,
+  (_, in) => in,
+  48 * 1024 * 1024,
+  Int.MaxValue,
+  Int.MaxValue,
+  Int.MaxValue,
+  true,
+  false,
+  metrics)
+
+assert(iterator.hasNext, s"iterator should have 1 element")
+val (_, inputStream) = iterator.next()
 
 Review comment:
   Is there a way to check that the remote path was really taken? That 
currently assumed to have happened from the test setup, but it would be good to 
check it explicitly.
   
   Probably just:
   
   verify(transfer, times(1)).fetchBlocks(any(), any(), any(), any(), 
any(), any())
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h..

[GitHub] [spark] vanzin commented on a change in pull request #25299: [SPARK-27651][Core] Avoid the network when shuffle blocks are fetched from the same host

2019-08-20 Thread GitBox
vanzin commented on a change in pull request #25299: [SPARK-27651][Core] Avoid 
the network when shuffle blocks are fetched from the same host
URL: https://github.com/apache/spark/pull/25299#discussion_r315831188
 
 

 ##
 File path: project/MimaExcludes.scala
 ##
 @@ -36,6 +36,9 @@ object MimaExcludes {
 
   // Exclude rules for 3.0.x
   lazy val v30excludes = v24excludes ++ Seq(
+// [SPARK-27651][Core] Avoid the network when shuffle blocks are fetched 
from the same host
+
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.status.LiveEntityHelpers.createMetrics"),
 
 Review comment:
   Is this still needed?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] vanzin commented on a change in pull request #25299: [SPARK-27651][Core] Avoid the network when shuffle blocks are fetched from the same host

2019-08-20 Thread GitBox
vanzin commented on a change in pull request #25299: [SPARK-27651][Core] Avoid 
the network when shuffle blocks are fetched from the same host
URL: https://github.com/apache/spark/pull/25299#discussion_r315815432
 
 

 ##
 File path: core/src/main/scala/org/apache/spark/storage/BlockManager.scala
 ##
 @@ -995,6 +1003,19 @@ private[spark] class BlockManager(
 None
   }
 
+  private[spark] def getHostLocalDirs(executorIds: Array[String])
+  : scala.collection.Map[String, Array[String]] = {
 
 Review comment:
   Just `Map`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] vanzin commented on a change in pull request #25299: [SPARK-27651][Core] Avoid the network when shuffle blocks are fetched from the same host

2019-08-20 Thread GitBox
vanzin commented on a change in pull request #25299: [SPARK-27651][Core] Avoid 
the network when shuffle blocks are fetched from the same host
URL: https://github.com/apache/spark/pull/25299#discussion_r315851620
 
 

 ##
 File path: 
core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala
 ##
 @@ -96,6 +99,39 @@ class ExternalShuffleServiceSuite extends ShuffleSuite with 
BeforeAndAfterAll wi
 e.getMessage should include ("Fetch failure will not retry stage due to 
testing config")
   }
 
+  test("SPARK-27651: host local disk reading avoids external shuffle service 
on the same node") {
+val confWithHostLocalRead =
+  conf.clone.set(config.SHUFFLE_HOST_LOCAL_DISK_READING_ENABLED, true)
+sc = new SparkContext("local-cluster[2,1,1024]", "test", 
confWithHostLocalRead)
+sc.getConf.get(config.SHUFFLE_HOST_LOCAL_DISK_READING_ENABLED) should 
equal(true)
+sc.env.blockManager.externalShuffleServiceEnabled should equal(true)
+sc.env.blockManager.externalShuffleServiceEnabled should equal(true)
 
 Review comment:
   Duplicate


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] vanzin commented on a change in pull request #25299: [SPARK-27651][Core] Avoid the network when shuffle blocks are fetched from the same host

2019-08-20 Thread GitBox
vanzin commented on a change in pull request #25299: [SPARK-27651][Core] Avoid 
the network when shuffle blocks are fetched from the same host
URL: https://github.com/apache/spark/pull/25299#discussion_r315832831
 
 

 ##
 File path: 
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
 ##
 @@ -265,70 +277,82 @@ final class ShuffleBlockFetcherIterator(
 }
   }
 
-  private[this] def splitLocalRemoteBlocks(): ArrayBuffer[FetchRequest] = {
-// Make remote requests at most maxBytesInFlight / 5 in length; the reason 
to keep them
-// smaller than maxBytesInFlight is to allow multiple, parallel fetches 
from up to 5
-// nodes, rather than blocking on reading output from one node.
-val targetRequestSize = math.max(maxBytesInFlight / 5, 1L)
-logDebug("maxBytesInFlight: " + maxBytesInFlight + ", targetRequestSize: " 
+ targetRequestSize
-  + ", maxBlocksInFlightPerAddress: " + maxBlocksInFlightPerAddress)
-
-// Split local and remote blocks. Remote blocks are further split into 
FetchRequests of size
-// at most maxBytesInFlight in order to limit the amount of data in flight.
-val remoteRequests = new ArrayBuffer[FetchRequest]
+  private[this] def partitionBlocksByFetchMode(): ArrayBuffer[FetchRequest] = {
+logDebug(s"maxBytesInFlight: $maxBytesInFlight, targetRemoteRequestSize: "
+  + s"$targetRemoteRequestSize, maxBlocksInFlightPerAddress: 
$maxBlocksInFlightPerAddress")
+
+// Partition to local, host-local and remote blocks. Remote blocks are 
further split into
+// FetchRequests of size at most maxBytesInFlight in order to limit the 
amount of data in flight
+val collectedRemoteRequests = new ArrayBuffer[FetchRequest]
 var localBlockBytes = 0L
+var hostLocalBlockBytes = 0L
 var remoteBlockBytes = 0L
+var numRemoteBlocks = 0
 
 for ((address, blockInfos) <- blocksByAddress) {
+  numBlocksToFetch += blockInfos.size
   if (address.executorId == blockManager.blockManagerId.executorId) {
-blockInfos.find(_._2 <= 0) match {
-  case Some((blockId, size)) if size < 0 =>
-throw new BlockException(blockId, "Negative block size " + size)
-  case Some((blockId, size)) if size == 0 =>
-throw new BlockException(blockId, "Zero-sized blocks should be 
excluded.")
-  case None => // do nothing.
-}
+checkBlockSizes(blockInfos)
 localBlocks ++= blockInfos.map(_._1)
 localBlockBytes += blockInfos.map(_._2).sum
-numBlocksToFetch += localBlocks.size
+  } else if (enableHostLocalDiskReading && address.host == 
blockManager.blockManagerId.host) {
 
 Review comment:
   Could you add a comment here that there's a case where not all of these 
blocks can be read from the local disk (because of the limit in the cache 
size), and that is handled later by `fetchHostLocalBlocks`?
   
   That also means that the log message printed later may not be 100% accurate, 
although that's a minor issue.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] vanzin commented on a change in pull request #25299: [SPARK-27651][Core] Avoid the network when shuffle blocks are fetched from the same host

2019-08-20 Thread GitBox
vanzin commented on a change in pull request #25299: [SPARK-27651][Core] Avoid 
the network when shuffle blocks are fetched from the same host
URL: https://github.com/apache/spark/pull/25299#discussion_r315812795
 
 

 ##
 File path: core/src/main/scala/org/apache/spark/SparkContext.scala
 ##
 @@ -2851,6 +2851,9 @@ object SparkContext extends Logging {
   memoryPerSlaveInt, sc.executorMemory))
 }
 
+// for local cluster mode the SHUFFLE_HOST_LOCAL_DISK_READING_ENABLED 
defaults to false
 
 Review comment:
   The comment just repeats the code. The comment should say why this is needed 
/ wanted.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] vanzin commented on a change in pull request #25299: [SPARK-27651][Core] Avoid the network when shuffle blocks are fetched from the same host

2019-08-20 Thread GitBox
vanzin commented on a change in pull request #25299: [SPARK-27651][Core] Avoid 
the network when shuffle blocks are fetched from the same host
URL: https://github.com/apache/spark/pull/25299#discussion_r315833901
 
 

 ##
 File path: 
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
 ##
 @@ -358,12 +382,65 @@ final class ShuffleBlockFetcherIterator(
 }
   }
 
+
+  /**
+   * Fetch the host-local blocks while we are fetching remote blocks. This is 
ok because
+   * `ManagedBuffer`'s memory is allocated lazily when we create the input 
stream, so all we
+   * track in-memory are the ManagedBuffer references themselves.
+   */
+  private[this] def fetchHostLocalBlocks() {
+logDebug(s"Start fetching host-local blocks: ${hostLocalBlocks.mkString(", 
")}")
+val hostLocalExecutorIds = 
hostLocalBlocksByExecutor.keySet.map(_.executorId)
+val readsWithoutLocalDir = LinkedHashMap[BlockManagerId, Seq[(BlockId, 
Long)]]()
+val localDirsByExec = 
blockManager.getHostLocalDirs(hostLocalExecutorIds.toArray)
+hostLocalBlocksByExecutor.foreach { case (bmId, blockInfos) =>
+  val localDirs = localDirsByExec.get(bmId.executorId)
+  if (localDirs.isDefined) {
+blockInfos.foreach {  case (blockId, _) =>
+  try {
+val buf = blockManager
+  .getHostLocalShuffleData(blockId.asInstanceOf[ShuffleBlockId], 
localDirs.get)
+shuffleMetrics.incLocalBlocksFetched(1)
+shuffleMetrics.incLocalBytesRead(buf.size)
+buf.retain()
+results.put(SuccessFetchResult(blockId, 
blockManager.blockManagerId,
+  buf.size(), buf, isNetworkReqDone = false))
+  } catch {
+case e: Exception =>
+  // If we see an exception, stop immediately.
+  logError(s"Error occurred while fetching local blocks", e)
+  results.put(FailureFetchResult(blockId, 
blockManager.blockManagerId, e))
+  return
+  }
+}
+  } else {
+readsWithoutLocalDir += bmId -> blockInfos
+  }
+}
+
+if (readsWithoutLocalDir.nonEmpty) {
+  val collectedRemoteRequests = new ArrayBuffer[FetchRequest]
+  readsWithoutLocalDir.foreach( { case (bmId, blockInfos) =>
 
 Review comment:
   `.foreach { case (bmId, blockInfos) =>`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] vanzin commented on a change in pull request #25299: [SPARK-27651][Core] Avoid the network when shuffle blocks are fetched from the same host

2019-08-20 Thread GitBox
vanzin commented on a change in pull request #25299: [SPARK-27651][Core] Avoid 
the network when shuffle blocks are fetched from the same host
URL: https://github.com/apache/spark/pull/25299#discussion_r315826558
 
 

 ##
 File path: core/src/main/scala/org/apache/spark/storage/BlockManager.scala
 ##
 @@ -995,6 +1003,19 @@ private[spark] class BlockManager(
 None
   }
 
+  private[spark] def getHostLocalDirs(executorIds: Array[String])
+  : scala.collection.Map[String, Array[String]] = {
+val cachedItems = 
executorIdToLocalDirsCache.filterKeys(executorIds.contains(_))
+if (cachedItems.size < executorIds.length) {
+  val notCachedItems = master
+
.getHostLocalDirs(executorIds.filterNot(executorIdToLocalDirsCache.contains))
+  executorIdToLocalDirsCache ++= notCachedItems
 
 Review comment:
   Isn't this going to grow potentially unbounded?
   
   You added a cache to `BlockManagerMasterEndpoint` but that's something that 
only exists in the driver. So it doesn't look like there's anything limiting 
the number of entries cached in executors, at least directly.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] vanzin commented on a change in pull request #25299: [SPARK-27651][Core] Avoid the network when shuffle blocks are fetched from the same host

2019-08-20 Thread GitBox
vanzin commented on a change in pull request #25299: [SPARK-27651][Core] Avoid 
the network when shuffle blocks are fetched from the same host
URL: https://github.com/apache/spark/pull/25299#discussion_r315829047
 
 

 ##
 File path: 
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
 ##
 @@ -74,16 +76,22 @@ final class ShuffleBlockFetcherIterator(
 maxReqSizeShuffleToMem: Long,
 detectCorrupt: Boolean,
 detectCorruptUseExtraMemory: Boolean,
-shuffleMetrics: ShuffleReadMetricsReporter)
+shuffleMetrics: ShuffleReadMetricsReporter,
+enableHostLocalDiskReading: Boolean = true)
   extends Iterator[(BlockId, InputStream)] with DownloadFileManager with 
Logging {
 
   import ShuffleBlockFetcherIterator._
 
+  // Make remote requests at most maxBytesInFlight / 5 in length; the reason 
to keep them
+  // smaller than maxBytesInFlight is to allow multiple, parallel fetches from 
up to 5
+  // nodes, rather than blocking on reading output from one node.
+  val targetRemoteRequestSize = math.max(maxBytesInFlight / 5, 1L)
 
 Review comment:
   `private`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] vanzin commented on a change in pull request #25299: [SPARK-27651][Core] Avoid the network when shuffle blocks are fetched from the same host

2019-08-20 Thread GitBox
vanzin commented on a change in pull request #25299: [SPARK-27651][Core] Avoid 
the network when shuffle blocks are fetched from the same host
URL: https://github.com/apache/spark/pull/25299#discussion_r315833073
 
 

 ##
 File path: 
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
 ##
 @@ -358,12 +382,65 @@ final class ShuffleBlockFetcherIterator(
 }
   }
 
+
+  /**
+   * Fetch the host-local blocks while we are fetching remote blocks. This is 
ok because
+   * `ManagedBuffer`'s memory is allocated lazily when we create the input 
stream, so all we
+   * track in-memory are the ManagedBuffer references themselves.
+   */
+  private[this] def fetchHostLocalBlocks() {
 
 Review comment:
   missing return type


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] vanzin commented on a change in pull request #25299: [SPARK-27651][Core] Avoid the network when shuffle blocks are fetched from the same host

2019-08-09 Thread GitBox
vanzin commented on a change in pull request #25299: [SPARK-27651][Core] Avoid 
the network when shuffle blocks are fetched from the same host
URL: https://github.com/apache/spark/pull/25299#discussion_r312673830
 
 

 ##
 File path: core/src/main/scala/org/apache/spark/storage/BlockManager.scala
 ##
 @@ -995,6 +1003,20 @@ private[spark] class BlockManager(
 None
   }
 
+  private[spark] def getHostLocalDirs(executorIds: Array[String])
+: scala.collection.Map[String, Array[String]] = {
 
 Review comment:
   nit: indent more


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] vanzin commented on a change in pull request #25299: [SPARK-27651][Core] Avoid the network when shuffle blocks are fetched from the same host

2019-08-09 Thread GitBox
vanzin commented on a change in pull request #25299: [SPARK-27651][Core] Avoid 
the network when shuffle blocks are fetched from the same host
URL: https://github.com/apache/spark/pull/25299#discussion_r312674806
 
 

 ##
 File path: 
core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala
 ##
 @@ -97,6 +97,13 @@ private[spark] object BlockManagerMessages {
   case class GetLocationsAndStatus(blockId: BlockId, requesterHost: String)
 extends ToBlockManagerMaster
 
+  case class GetLocalDirs(executorIds: Array[String]) extends 
ToBlockManagerMaster
+
+  /**
+   * The response message of `GetLocalDirs` request.
+   */
+  case class BlockManagerLocalDirs(localDirs: Map[String, Array[String]])
 
 Review comment:
   Do you need this message? Can't you just return `Map[String, Array[String]]` 
directly from the RPC?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] vanzin commented on a change in pull request #25299: [SPARK-27651][Core] Avoid the network when shuffle blocks are fetched from the same host

2019-08-09 Thread GitBox
vanzin commented on a change in pull request #25299: [SPARK-27651][Core] Avoid 
the network when shuffle blocks are fetched from the same host
URL: https://github.com/apache/spark/pull/25299#discussion_r312674680
 
 

 ##
 File path: 
core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
 ##
 @@ -51,6 +51,9 @@ class BlockManagerMasterEndpoint(
   // Mapping from block manager id to the block manager's information.
   private val blockManagerInfo = new mutable.HashMap[BlockManagerId, 
BlockManagerInfo]
 
+  // Mapping from executor id to the block manager's local disk directories.
+  private val executorIdToLocalDirs = new mutable.HashMap[String, 
Array[String]]
 
 Review comment:
   I don't think letting it grow unbounded is a good idea.
   
   One idea that I haven't really thought through (or checked whether it's 
viable) is to try to look into the `MapOutputTracker` and clean this up when 
there are no more known outputs needed from that executor's host.
   
   (Or maybe even keep this cache inside the `MapOutputTracker`?)
   
   Otherwise, having a limit to this cache would be good (e.g. with a LRU 
eviction policy). Worst thing is what Imran says, you'll go over the network.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] vanzin commented on a change in pull request #25299: [SPARK-27651][Core] Avoid the network when shuffle blocks are fetched from the same host

2019-07-31 Thread GitBox
vanzin commented on a change in pull request #25299: [SPARK-27651][Core] Avoid 
the network when shuffle blocks are fetched from the same host
URL: https://github.com/apache/spark/pull/25299#discussion_r309317674
 
 

 ##
 File path: core/src/main/scala/org/apache/spark/status/api/v1/api.scala
 ##
 @@ -344,6 +348,7 @@ class ShuffleReadMetricDistributions private[spark](
 val readRecords: IndexedSeq[Double],
 val remoteBlocksFetched: IndexedSeq[Double],
 val localBlocksFetched: IndexedSeq[Double],
+val hostLocalBlocksFetched: IndexedSeq[Double],
 
 Review comment:
   It's possible, but we should still check and fix it if the problem exists.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] vanzin commented on a change in pull request #25299: [SPARK-27651][Core] Avoid the network when shuffle blocks are fetched from the same host

2019-07-30 Thread GitBox
vanzin commented on a change in pull request #25299: [SPARK-27651][Core] Avoid 
the network when shuffle blocks are fetched from the same host
URL: https://github.com/apache/spark/pull/25299#discussion_r308974718
 
 

 ##
 File path: 
core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala
 ##
 @@ -97,6 +97,15 @@ private[spark] object BlockManagerMessages {
   case class GetLocationsAndStatus(blockId: BlockId, requesterHost: String)
 extends ToBlockManagerMaster
 
+  case class GetLocalDirs(executorIds: Array[String]) extends 
ToBlockManagerMaster
+
+  /**
+   * The response message of `GetLocalDirs` request.
+   *
+   * @param localDirs follows the order of the block manager IDs used in the 
`GetLocalDirs` request.
 
 Review comment:
   Why? Maps are just as serializable as arrays.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] vanzin commented on a change in pull request #25299: [SPARK-27651][Core] Avoid the network when shuffle blocks are fetched from the same host

2019-07-30 Thread GitBox
vanzin commented on a change in pull request #25299: [SPARK-27651][Core] Avoid 
the network when shuffle blocks are fetched from the same host
URL: https://github.com/apache/spark/pull/25299#discussion_r308938182
 
 

 ##
 File path: core/src/main/scala/org/apache/spark/storage/BlockManager.scala
 ##
 @@ -542,11 +542,17 @@ private[spark] class BlockManager(
 }
   }
 
+  override def getHostLocalShuffleData(
+  blockId: ShuffleBlockId,
+  dirs: Array[String]): ManagedBuffer = {
+  shuffleManager.shuffleBlockResolver.getBlockData(blockId, Some(dirs))
 
 Review comment:
   indentation


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] vanzin commented on a change in pull request #25299: [SPARK-27651][Core] Avoid the network when shuffle blocks are fetched from the same host

2019-07-30 Thread GitBox
vanzin commented on a change in pull request #25299: [SPARK-27651][Core] Avoid 
the network when shuffle blocks are fetched from the same host
URL: https://github.com/apache/spark/pull/25299#discussion_r308939389
 
 

 ##
 File path: 
core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
 ##
 @@ -51,6 +51,9 @@ class BlockManagerMasterEndpoint(
   // Mapping from block manager id to the block manager's information.
   private val blockManagerInfo = new mutable.HashMap[BlockManagerId, 
BlockManagerInfo]
 
+  // Mapping from executor id to the block manager's local disk directories.
+  private val executorIdLocalDirs = new mutable.HashMap[String, Array[String]]
 
 Review comment:
   Name is a bit weird. Either `executorLocalDirs` or `executorIdToLocalDirs`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] vanzin commented on a change in pull request #25299: [SPARK-27651][Core] Avoid the network when shuffle blocks are fetched from the same host

2019-07-30 Thread GitBox
vanzin commented on a change in pull request #25299: [SPARK-27651][Core] Avoid 
the network when shuffle blocks are fetched from the same host
URL: https://github.com/apache/spark/pull/25299#discussion_r308934633
 
 

 ##
 File path: 
core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala
 ##
 @@ -51,12 +52,43 @@ private[spark] class IndexShuffleBlockResolver(
 
   private val transportConf = SparkTransportConf.fromSparkConf(conf, "shuffle")
 
-  def getDataFile(shuffleId: Int, mapId: Int): File = {
-blockManager.diskBlockManager.getFile(ShuffleDataBlockId(shuffleId, mapId, 
NOOP_REDUCE_ID))
+
+  def getDataFile(shuffleId: Int, mapId: Int): File = getDataFile(shuffleId, 
mapId, None)
+
+  /**
+   * Get the shuffle data file.
+   *
+   * When the dirs param is None then the disk manager's local directories are 
used and this way
+   * local data files are read otherwise via the specified directories 
host-local blocks can be read
+   * (but this can be used only for reading as non-existing parent directory 
storing the data
+   * file won't be created this way).
+   */
+   def getDataFile(
+  shuffleId: Int,
+  mapId: Int,
+  dirs: Option[Array[String]]): File = {
+val blockId = ShuffleDataBlockId(shuffleId, mapId, NOOP_REDUCE_ID)
+dirs
+  .map(ExecutorDiskUtils.getFile(_, blockManager.subDirsPerLocalDir, 
blockId.name))
+  .getOrElse(blockManager.diskBlockManager.getFile(blockId))
   }
 
-  private def getIndexFile(shuffleId: Int, mapId: Int): File = {
-blockManager.diskBlockManager.getFile(ShuffleIndexBlockId(shuffleId, 
mapId, NOOP_REDUCE_ID))
+  /**
+   * Get the shuffle index file.
+   *
+   * When the dirs param is None then the disk manager's local directories are 
used and this way
 
 Review comment:
   Same.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] vanzin commented on a change in pull request #25299: [SPARK-27651][Core] Avoid the network when shuffle blocks are fetched from the same host

2019-07-30 Thread GitBox
vanzin commented on a change in pull request #25299: [SPARK-27651][Core] Avoid 
the network when shuffle blocks are fetched from the same host
URL: https://github.com/apache/spark/pull/25299#discussion_r308937544
 
 

 ##
 File path: core/src/main/scala/org/apache/spark/status/api/v1/api.scala
 ##
 @@ -344,6 +348,7 @@ class ShuffleReadMetricDistributions private[spark](
 val readRecords: IndexedSeq[Double],
 val remoteBlocksFetched: IndexedSeq[Double],
 val localBlocksFetched: IndexedSeq[Double],
+val hostLocalBlocksFetched: IndexedSeq[Double],
 
 Review comment:
   Hmmm.
   
   I wonder what happens if:
   - you run the current version of the SHS with the disk enabled
   - look at an app's stage in the UI, which will cache this information
   - shut down the SHS, and bring up the version with this change
   - the data in the disk store doesn't have this field, so what will happen?
   
   My feeling is that this field will be `null`, which may cause some problems.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] vanzin commented on a change in pull request #25299: [SPARK-27651][Core] Avoid the network when shuffle blocks are fetched from the same host

2019-07-30 Thread GitBox
vanzin commented on a change in pull request #25299: [SPARK-27651][Core] Avoid 
the network when shuffle blocks are fetched from the same host
URL: https://github.com/apache/spark/pull/25299#discussion_r308944004
 
 

 ##
 File path: 
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
 ##
 @@ -365,12 +387,43 @@ final class ShuffleBlockFetcherIterator(
 }
   }
 
+
+  /**
+   * Fetch the host-local blocks while we are fetching remote blocks. This is 
ok because
+   * `ManagedBuffer`'s memory is allocated lazily when we create the input 
stream, so all we
+   * track in-memory are the ManagedBuffer references themselves.
+   */
+  private[this] def fetchHostLocalBlocks() {
+logDebug(s"Start fetching host-local blocks: ${hostLocalBlocks.mkString(", 
")}")
+
+val localDirsByExec =
+  
blockManager.master.getHostLocalDirs(hostLocalBlocksByExecutor.keySet.toArray).localDirs
+for ((localDirs, blocks) <- 
localDirsByExec.zip(hostLocalBlocksByExecutor.values);
+  blockId <- blocks) {
 
 Review comment:
   indent more
   
   I actually prefer this for multi-line fors:
   
   ```
   for (
 a <- b;
 c <-d;
 ...
   ) {
 ...
   }
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] vanzin commented on a change in pull request #25299: [SPARK-27651][Core] Avoid the network when shuffle blocks are fetched from the same host

2019-07-30 Thread GitBox
vanzin commented on a change in pull request #25299: [SPARK-27651][Core] Avoid 
the network when shuffle blocks are fetched from the same host
URL: https://github.com/apache/spark/pull/25299#discussion_r308945696
 
 

 ##
 File path: 
core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala
 ##
 @@ -96,6 +98,35 @@ class ExternalShuffleServiceSuite extends ShuffleSuite with 
BeforeAndAfterAll wi
 e.getMessage should include ("Fetch failure will not retry stage due to 
testing config")
   }
 
+  test("SPARK-27651: host local disk reading avoids external shuffle service 
on the same node") {
+sc = new SparkContext("local-cluster[2,1,1024]", "test", conf)
+conf.get(config.SHUFFLE_HOST_LOCAL_DISK_READING_ENABLED) should equal(true)
+sc.env.blockManager.externalShuffleServiceEnabled should equal(true)
+sc.env.blockManager.externalShuffleServiceEnabled should equal(true)
+sc.env.blockManager.shuffleClient.getClass should 
equal(classOf[ExternalShuffleClient])
+
+// In a slow machine, one slave may register hundreds of milliseconds 
ahead of the other one.
+// If we don't wait for all slaves, it's possible that only one executor 
runs all jobs. Then
+// all shuffle blocks will be in this executor, 
ShuffleBlockFetcherIterator will directly fetch
+// local blocks from the local BlockManager and won't send requests to 
ExternalShuffleService.
+// In this case, we won't receive FetchFailed. And it will make this test 
fail.
+// Therefore, we should wait until all slaves are up
+TestUtils.waitUntilExecutorsUp(sc, 2, 6)
+
+val rdd = sc.parallelize(0 until 1000, 10).map(i => (i, 1)).reduceByKey(_ 
+ _)
 
 Review comment:
   `.map { i => ... }`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] vanzin commented on a change in pull request #25299: [SPARK-27651][Core] Avoid the network when shuffle blocks are fetched from the same host

2019-07-30 Thread GitBox
vanzin commented on a change in pull request #25299: [SPARK-27651][Core] Avoid 
the network when shuffle blocks are fetched from the same host
URL: https://github.com/apache/spark/pull/25299#discussion_r308939960
 
 

 ##
 File path: 
core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
 ##
 @@ -514,15 +527,16 @@ class BlockManagerMasterEndpoint(
 
 if (locations.nonEmpty && status.isDefined) {
   val localDirs = locations.find { loc =>
-  if (loc.port != externalShuffleServicePort && loc.host == 
requesterHost) {
+// When the external shuffle service running on the same host is found 
among the block
+// locations then the block must be persisted on the disk. In this 
case the executorId
+// can be used to access this block even when the original executor is 
already stopped.
+loc.host == requesterHost &&
+  (loc.port == externalShuffleServicePort ||
 blockManagerInfo
   .get(loc)
   .flatMap(_.getStatus(blockId).map(_.storageLevel.useDisk))
-  .getOrElse(false)
-  } else {
-false
-  }
-  }.map(blockManagerInfo(_).localDirs)
+  .getOrElse(false))
+  }.map(bmId => executorIdLocalDirs(bmId.executorId))
 
 Review comment:
   `.map { bmId => ... }`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] vanzin commented on a change in pull request #25299: [SPARK-27651][Core] Avoid the network when shuffle blocks are fetched from the same host

2019-07-30 Thread GitBox
vanzin commented on a change in pull request #25299: [SPARK-27651][Core] Avoid 
the network when shuffle blocks are fetched from the same host
URL: https://github.com/apache/spark/pull/25299#discussion_r308934058
 
 

 ##
 File path: 
core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala
 ##
 @@ -51,12 +52,43 @@ private[spark] class IndexShuffleBlockResolver(
 
   private val transportConf = SparkTransportConf.fromSparkConf(conf, "shuffle")
 
-  def getDataFile(shuffleId: Int, mapId: Int): File = {
-blockManager.diskBlockManager.getFile(ShuffleDataBlockId(shuffleId, mapId, 
NOOP_REDUCE_ID))
+
+  def getDataFile(shuffleId: Int, mapId: Int): File = getDataFile(shuffleId, 
mapId, None)
+
+  /**
+   * Get the shuffle data file.
+   *
+   * When the dirs param is None then the disk manager's local directories are 
used and this way
 
 Review comment:
   This paragraph is really hard to follow. It's also unclear what creating 
directories has to do with this API.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] vanzin commented on a change in pull request #25299: [SPARK-27651][Core] Avoid the network when shuffle blocks are fetched from the same host

2019-07-30 Thread GitBox
vanzin commented on a change in pull request #25299: [SPARK-27651][Core] Avoid 
the network when shuffle blocks are fetched from the same host
URL: https://github.com/apache/spark/pull/25299#discussion_r308935805
 
 

 ##
 File path: 
core/src/main/scala/org/apache/spark/shuffle/ShuffleBlockResolver.scala
 ##
 @@ -31,10 +31,14 @@ trait ShuffleBlockResolver {
   type ShuffleId = Int
 
   /**
-   * Retrieve the data for the specified block. If the data for that block is 
not available,
-   * throws an unspecified exception.
+   * Retrieve the data for the specified block.
+   *
+   * When the dirs param is None then the disk manager's local directories are 
used and this way
+   * local blocks are read otherwise via the specified directories host-local 
blocks can be read.
 
 Review comment:
   Same thing.
   
   Needs at the very least some punctuation. e.g. "When the `dirs` parameter is 
`None` then use the disk manager's local directories. Otherwise, read from the 
specified directories."


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] vanzin commented on a change in pull request #25299: [SPARK-27651][Core] Avoid the network when shuffle blocks are fetched from the same host

2019-07-30 Thread GitBox
vanzin commented on a change in pull request #25299: [SPARK-27651][Core] Avoid 
the network when shuffle blocks are fetched from the same host
URL: https://github.com/apache/spark/pull/25299#discussion_r308941297
 
 

 ##
 File path: 
core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala
 ##
 @@ -97,6 +97,15 @@ private[spark] object BlockManagerMessages {
   case class GetLocationsAndStatus(blockId: BlockId, requesterHost: String)
 extends ToBlockManagerMaster
 
+  case class GetLocalDirs(executorIds: Array[String]) extends 
ToBlockManagerMaster
+
+  /**
+   * The response message of `GetLocalDirs` request.
+   *
+   * @param localDirs follows the order of the block manager IDs used in the 
`GetLocalDirs` request.
 
 Review comment:
   Wouldn't it be better to return a map?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org