[GitHub] [spark] viirya commented on a change in pull request #30492: [SPARK-33545][CORE] Support Fallback Storage during Worker decommission

2020-12-01 Thread GitBox


viirya commented on a change in pull request #30492:
URL: https://github.com/apache/spark/pull/30492#discussion_r533159510



##
File path: core/src/main/scala/org/apache/spark/storage/FallbackStorage.scala
##
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.storage
+
+import java.io.DataInputStream
+import java.nio.ByteBuffer
+
+import scala.concurrent.Future
+import scala.reflect.ClassTag
+
+import org.apache.hadoop.fs.{FileSystem, Path}
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.internal.Logging
+import 
org.apache.spark.internal.config.STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH
+import org.apache.spark.network.buffer.{ManagedBuffer, NioManagedBuffer}
+import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef, RpcTimeout}
+import org.apache.spark.shuffle.{IndexShuffleBlockResolver, ShuffleBlockInfo}
+import org.apache.spark.shuffle.IndexShuffleBlockResolver.NOOP_REDUCE_ID
+import org.apache.spark.util.Utils
+
+/**
+ * A fallback storage used by storage decommissioners.
+ */
+private[storage] class FallbackStorage(conf: SparkConf) extends Logging {
+  require(conf.contains("spark.app.id"))
+  require(conf.get(STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH).isDefined)
+
+  private val fallbackPath = new 
Path(conf.get(STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH).get)
+  private val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf)
+  private val fallbackFileSystem = FileSystem.get(fallbackPath.toUri, 
hadoopConf)
+  private val appId = conf.getAppId
+
+  // Visible for testing
+  def copy(
+  shuffleBlockInfo: ShuffleBlockInfo,
+  bm: BlockManager): Unit = {
+val shuffleId = shuffleBlockInfo.shuffleId
+val mapId = shuffleBlockInfo.mapId
+
+bm.migratableResolver match {
+  case r: IndexShuffleBlockResolver =>
+val indexFile = r.getIndexFile(shuffleId, mapId)
+
+if (indexFile.exists()) {
+  fallbackFileSystem.copyFromLocalFile(
+new Path(indexFile.getAbsolutePath),
+new Path(fallbackPath, s"$appId/$shuffleId/${indexFile.getName}"))
+
+  val dataFile = r.getDataFile(shuffleId, mapId)
+  if (dataFile.exists()) {
+fallbackFileSystem.copyFromLocalFile(
+  new Path(dataFile.getAbsolutePath),
+  new Path(fallbackPath, s"$appId/$shuffleId/${dataFile.getName}"))
+  }
+
+  // Report block statuses
+  val reduceId = NOOP_REDUCE_ID
+  val indexBlockId = ShuffleIndexBlockId(shuffleId, mapId, reduceId)
+  FallbackStorage.reportBlockStatus(bm, indexBlockId, indexFile.length)
+  if (dataFile.exists) {
+val dataBlockId = ShuffleDataBlockId(shuffleId, mapId, reduceId)
+FallbackStorage.reportBlockStatus(bm, dataBlockId, dataFile.length)
+  }
+}
+  case r =>
+logWarning(s"Unsupported Resolver: ${r.getClass.getName}")
+}
+  }
+
+  def exists(shuffleId: Int, filename: String): Boolean = {
+fallbackFileSystem.exists(new Path(fallbackPath, 
s"$appId/$shuffleId/$filename"))
+  }
+}
+
+class NoopRpcEndpointRef(conf: SparkConf) extends RpcEndpointRef(conf) {
+  import scala.concurrent.ExecutionContext.Implicits.global
+  override def address: RpcAddress = null
+  override def name: String = "fallback"
+  override def send(message: Any): Unit = {}
+  override def ask[T: ClassTag](message: Any, timeout: RpcTimeout): Future[T] 
= {
+Future{true.asInstanceOf[T]}
+  }
+}
+
+object FallbackStorage extends Logging {
+  /** We use one block manager id as a place holder. */
+  val FALLBACK_BLOCK_MANAGER_ID: BlockManagerId = BlockManagerId("fallback", 
"remote", 7337)
+
+  def getFallbackStorage(conf: SparkConf): Option[FallbackStorage] = {
+if (conf.get(STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH).isDefined) {
+  Some(new FallbackStorage(conf))
+} else {
+  None
+}
+  }
+
+  /** Register the fallback block manager and its RPC endpoint. */
+  def registerBlockManagerIfNeeded(master: BlockManagerMaster, conf: 
SparkConf): Unit = {
+if (conf.get(STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH).isDefined) {
+  

[GitHub] [spark] viirya commented on a change in pull request #30492: [SPARK-33545][CORE] Support Fallback Storage during Worker decommission

2020-12-01 Thread GitBox


viirya commented on a change in pull request #30492:
URL: https://github.com/apache/spark/pull/30492#discussion_r533159510



##
File path: core/src/main/scala/org/apache/spark/storage/FallbackStorage.scala
##
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.storage
+
+import java.io.DataInputStream
+import java.nio.ByteBuffer
+
+import scala.concurrent.Future
+import scala.reflect.ClassTag
+
+import org.apache.hadoop.fs.{FileSystem, Path}
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.internal.Logging
+import 
org.apache.spark.internal.config.STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH
+import org.apache.spark.network.buffer.{ManagedBuffer, NioManagedBuffer}
+import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef, RpcTimeout}
+import org.apache.spark.shuffle.{IndexShuffleBlockResolver, ShuffleBlockInfo}
+import org.apache.spark.shuffle.IndexShuffleBlockResolver.NOOP_REDUCE_ID
+import org.apache.spark.util.Utils
+
+/**
+ * A fallback storage used by storage decommissioners.
+ */
+private[storage] class FallbackStorage(conf: SparkConf) extends Logging {
+  require(conf.contains("spark.app.id"))
+  require(conf.get(STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH).isDefined)
+
+  private val fallbackPath = new 
Path(conf.get(STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH).get)
+  private val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf)
+  private val fallbackFileSystem = FileSystem.get(fallbackPath.toUri, 
hadoopConf)
+  private val appId = conf.getAppId
+
+  // Visible for testing
+  def copy(
+  shuffleBlockInfo: ShuffleBlockInfo,
+  bm: BlockManager): Unit = {
+val shuffleId = shuffleBlockInfo.shuffleId
+val mapId = shuffleBlockInfo.mapId
+
+bm.migratableResolver match {
+  case r: IndexShuffleBlockResolver =>
+val indexFile = r.getIndexFile(shuffleId, mapId)
+
+if (indexFile.exists()) {
+  fallbackFileSystem.copyFromLocalFile(
+new Path(indexFile.getAbsolutePath),
+new Path(fallbackPath, s"$appId/$shuffleId/${indexFile.getName}"))
+
+  val dataFile = r.getDataFile(shuffleId, mapId)
+  if (dataFile.exists()) {
+fallbackFileSystem.copyFromLocalFile(
+  new Path(dataFile.getAbsolutePath),
+  new Path(fallbackPath, s"$appId/$shuffleId/${dataFile.getName}"))
+  }
+
+  // Report block statuses
+  val reduceId = NOOP_REDUCE_ID
+  val indexBlockId = ShuffleIndexBlockId(shuffleId, mapId, reduceId)
+  FallbackStorage.reportBlockStatus(bm, indexBlockId, indexFile.length)
+  if (dataFile.exists) {
+val dataBlockId = ShuffleDataBlockId(shuffleId, mapId, reduceId)
+FallbackStorage.reportBlockStatus(bm, dataBlockId, dataFile.length)
+  }
+}
+  case r =>
+logWarning(s"Unsupported Resolver: ${r.getClass.getName}")
+}
+  }
+
+  def exists(shuffleId: Int, filename: String): Boolean = {
+fallbackFileSystem.exists(new Path(fallbackPath, 
s"$appId/$shuffleId/$filename"))
+  }
+}
+
+class NoopRpcEndpointRef(conf: SparkConf) extends RpcEndpointRef(conf) {
+  import scala.concurrent.ExecutionContext.Implicits.global
+  override def address: RpcAddress = null
+  override def name: String = "fallback"
+  override def send(message: Any): Unit = {}
+  override def ask[T: ClassTag](message: Any, timeout: RpcTimeout): Future[T] 
= {
+Future{true.asInstanceOf[T]}
+  }
+}
+
+object FallbackStorage extends Logging {
+  /** We use one block manager id as a place holder. */
+  val FALLBACK_BLOCK_MANAGER_ID: BlockManagerId = BlockManagerId("fallback", 
"remote", 7337)
+
+  def getFallbackStorage(conf: SparkConf): Option[FallbackStorage] = {
+if (conf.get(STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH).isDefined) {
+  Some(new FallbackStorage(conf))
+} else {
+  None
+}
+  }
+
+  /** Register the fallback block manager and its RPC endpoint. */
+  def registerBlockManagerIfNeeded(master: BlockManagerMaster, conf: 
SparkConf): Unit = {
+if (conf.get(STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH).isDefined) {
+  

[GitHub] [spark] viirya commented on a change in pull request #30492: [SPARK-33545][CORE] Support Fallback Storage during Worker decommission

2020-11-30 Thread GitBox


viirya commented on a change in pull request #30492:
URL: https://github.com/apache/spark/pull/30492#discussion_r532917274



##
File path: core/src/main/scala/org/apache/spark/internal/config/package.scala
##
@@ -471,6 +471,16 @@ package object config {
 "cache block replication should be positive.")
   .createWithDefaultString("30s")
 
+  private[spark] val STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH =

Review comment:
   Oh okay.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] viirya commented on a change in pull request #30492: [SPARK-33545][CORE] Support Fallback Storage during Worker decommission

2020-11-30 Thread GitBox


viirya commented on a change in pull request #30492:
URL: https://github.com/apache/spark/pull/30492#discussion_r532894938



##
File path: core/src/main/scala/org/apache/spark/internal/config/package.scala
##
@@ -471,6 +471,16 @@ package object config {
 "cache block replication should be positive.")
   .createWithDefaultString("30s")
 
+  private[spark] val STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH =

Review comment:
   Could you add a simple note to the doc? E.g., "Fallback storage is 
registered during SparkContext initialization, so this must be enabled before 
creating SparkContext."





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] viirya commented on a change in pull request #30492: [SPARK-33545][CORE] Support Fallback Storage during Worker decommission

2020-11-30 Thread GitBox


viirya commented on a change in pull request #30492:
URL: https://github.com/apache/spark/pull/30492#discussion_r532892675



##
File path: core/src/main/scala/org/apache/spark/internal/config/package.scala
##
@@ -471,6 +471,16 @@ package object config {
 "cache block replication should be positive.")
   .createWithDefaultString("30s")
 
+  private[spark] val STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH =

Review comment:
   Oh I see. Seems we don't have static config for core.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] viirya commented on a change in pull request #30492: [SPARK-33545][CORE] Support Fallback Storage during Worker decommission

2020-11-30 Thread GitBox


viirya commented on a change in pull request #30492:
URL: https://github.com/apache/spark/pull/30492#discussion_r532888694



##
File path: core/src/main/scala/org/apache/spark/internal/config/package.scala
##
@@ -471,6 +471,16 @@ package object config {
 "cache block replication should be positive.")
   .createWithDefaultString("30s")
 
+  private[spark] val STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH =

Review comment:
   Because we only register fallback storage when SparkContext 
initialization, I think it doesn't make sense to change this config after 
creating SparkContext, so this sounds like a static SQL config?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] viirya commented on a change in pull request #30492: [SPARK-33545][CORE] Support Fallback Storage during Worker decommission

2020-11-30 Thread GitBox


viirya commented on a change in pull request #30492:
URL: https://github.com/apache/spark/pull/30492#discussion_r532883713



##
File path: core/src/main/scala/org/apache/spark/storage/BlockManager.scala
##
@@ -627,7 +627,16 @@ private[spark] class BlockManager(
   override def getLocalBlockData(blockId: BlockId): ManagedBuffer = {
 if (blockId.isShuffle) {
   logDebug(s"Getting local shuffle block ${blockId}")
-  shuffleManager.shuffleBlockResolver.getBlockData(blockId)
+  try {
+shuffleManager.shuffleBlockResolver.getBlockData(blockId)
+  } catch {
+case e: IOException =>
+  if 
(conf.get(config.STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH).isDefined) {
+FallbackStorage.read(conf, blockId)
+  } else {
+throw e
+  }

Review comment:
   Make sense. Thanks @holdenk 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] viirya commented on a change in pull request #30492: [SPARK-33545][CORE] Support Fallback Storage during Worker decommission

2020-11-26 Thread GitBox


viirya commented on a change in pull request #30492:
URL: https://github.com/apache/spark/pull/30492#discussion_r531319996



##
File path: core/src/main/scala/org/apache/spark/storage/BlockManager.scala
##
@@ -627,7 +627,16 @@ private[spark] class BlockManager(
   override def getLocalBlockData(blockId: BlockId): ManagedBuffer = {
 if (blockId.isShuffle) {
   logDebug(s"Getting local shuffle block ${blockId}")
-  shuffleManager.shuffleBlockResolver.getBlockData(blockId)
+  try {
+shuffleManager.shuffleBlockResolver.getBlockData(blockId)
+  } catch {
+case e: IOException =>
+  if 
(conf.get(config.STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH).isDefined) {
+FallbackStorage.read(conf, blockId)
+  } else {
+throw e
+  }

Review comment:
   Is there any check if `FALLBACK_BLOCK_MANAGER_ID` has the blocks before 
calling `FallbackStorage.read`? Looks like if `getBlockData` throws 
`IOException`, the block manager will just go to read from fallback storage.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] viirya commented on a change in pull request #30492: [SPARK-33545][CORE] Support Fallback Storage during Worker decommission

2020-11-26 Thread GitBox


viirya commented on a change in pull request #30492:
URL: https://github.com/apache/spark/pull/30492#discussion_r531174946



##
File path: core/src/main/scala/org/apache/spark/storage/BlockManager.scala
##
@@ -627,7 +627,16 @@ private[spark] class BlockManager(
   override def getLocalBlockData(blockId: BlockId): ManagedBuffer = {
 if (blockId.isShuffle) {
   logDebug(s"Getting local shuffle block ${blockId}")
-  shuffleManager.shuffleBlockResolver.getBlockData(blockId)
+  try {
+shuffleManager.shuffleBlockResolver.getBlockData(blockId)
+  } catch {
+case e: IOException =>
+  if 
(conf.get(config.STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH).isDefined) {
+FallbackStorage.read(conf, blockId)
+  } else {
+throw e
+  }

Review comment:
   Can you explain the reasoning here? So once any executor cannot get 
local shuffle block, it will always try to read from fallback storage?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] viirya commented on a change in pull request #30492: [SPARK-33545][CORE] Support Fallback Storage during Worker decommission

2020-11-26 Thread GitBox


viirya commented on a change in pull request #30492:
URL: https://github.com/apache/spark/pull/30492#discussion_r531172201



##
File path: core/src/main/scala/org/apache/spark/storage/FallbackStorage.scala
##
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.storage
+
+import java.io.DataInputStream
+import java.nio.ByteBuffer
+
+import scala.concurrent.Future
+import scala.reflect.ClassTag
+
+import org.apache.hadoop.fs.{FileSystem, Path}
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.internal.Logging
+import 
org.apache.spark.internal.config.STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH
+import org.apache.spark.network.buffer.{ManagedBuffer, NioManagedBuffer}
+import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef, RpcTimeout}
+import org.apache.spark.shuffle.{IndexShuffleBlockResolver, ShuffleBlockInfo}
+import org.apache.spark.shuffle.IndexShuffleBlockResolver.NOOP_REDUCE_ID
+import org.apache.spark.util.Utils
+
+/**
+ * A fallback storage used by storage decommissioners.
+ */
+private[storage] class FallbackStorage(conf: SparkConf) extends Logging {
+  require(conf.get(STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH).isDefined)
+
+  private val fallbackPath = new 
Path(conf.get(STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH).get)
+  private val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf)
+  private val fallbackFileSystem = FileSystem.get(fallbackPath.toUri, 
hadoopConf)
+
+  // Visible for testing
+  def copy(
+  shuffleBlockInfo: ShuffleBlockInfo,
+  bm: BlockManager): Unit = {
+val shuffleId = shuffleBlockInfo.shuffleId
+val mapId = shuffleBlockInfo.mapId
+
+bm.migratableResolver match {
+  case r: IndexShuffleBlockResolver =>
+val indexFile = r.getIndexFile(shuffleId, mapId)
+
+if (indexFile.exists()) {
+  fallbackFileSystem.copyFromLocalFile(
+new Path(indexFile.getAbsolutePath),
+new Path(fallbackPath, s"$shuffleId/${indexFile.getName}"))
+
+  val dataFile = r.getDataFile(shuffleId, mapId)
+  if (dataFile.exists()) {
+fallbackFileSystem.copyFromLocalFile(
+  new Path(dataFile.getAbsolutePath),
+  new Path(fallbackPath, s"$shuffleId/${dataFile.getName}"))
+  }
+
+  // Report block statuses
+  val reduceId = NOOP_REDUCE_ID
+  val indexBlockId = ShuffleIndexBlockId(shuffleId, mapId, reduceId)
+  FallbackStorage.reportBlockStatus(bm, indexBlockId, indexFile.length)
+  if (dataFile.exists) {
+val dataBlockId = ShuffleDataBlockId(shuffleId, mapId, reduceId)
+FallbackStorage.reportBlockStatus(bm, dataBlockId, dataFile.length)
+  }
+}
+  case r =>
+logWarning(s"Unsupported Resolver: ${r.getClass.getName}")
+}
+  }
+
+  def exists(shuffleId: Int, filename: String): Boolean = {

Review comment:
   Is this only for test?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] viirya commented on a change in pull request #30492: [SPARK-33545][CORE] Support Fallback Storage during Worker decommission

2020-11-26 Thread GitBox


viirya commented on a change in pull request #30492:
URL: https://github.com/apache/spark/pull/30492#discussion_r531171533



##
File path: core/src/main/scala/org/apache/spark/storage/FallbackStorage.scala
##
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.storage
+
+import java.io.DataInputStream
+import java.nio.ByteBuffer
+
+import scala.concurrent.Future
+import scala.reflect.ClassTag
+
+import org.apache.hadoop.fs.{FileSystem, Path}
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.internal.Logging
+import 
org.apache.spark.internal.config.STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH
+import org.apache.spark.network.buffer.{ManagedBuffer, NioManagedBuffer}
+import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef, RpcTimeout}
+import org.apache.spark.shuffle.{IndexShuffleBlockResolver, ShuffleBlockInfo}
+import org.apache.spark.shuffle.IndexShuffleBlockResolver.NOOP_REDUCE_ID
+import org.apache.spark.util.Utils
+
+/**
+ * A fallback storage used by storage decommissioners.
+ */
+private[storage] class FallbackStorage(conf: SparkConf) extends Logging {
+  require(conf.get(STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH).isDefined)
+
+  private val fallbackPath = new 
Path(conf.get(STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH).get)
+  private val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf)
+  private val fallbackFileSystem = FileSystem.get(fallbackPath.toUri, 
hadoopConf)
+
+  // Visible for testing
+  def copy(
+  shuffleBlockInfo: ShuffleBlockInfo,
+  bm: BlockManager): Unit = {
+val shuffleId = shuffleBlockInfo.shuffleId
+val mapId = shuffleBlockInfo.mapId
+
+bm.migratableResolver match {
+  case r: IndexShuffleBlockResolver =>
+val indexFile = r.getIndexFile(shuffleId, mapId)
+
+if (indexFile.exists()) {
+  fallbackFileSystem.copyFromLocalFile(
+new Path(indexFile.getAbsolutePath),
+new Path(fallbackPath, s"$shuffleId/${indexFile.getName}"))
+
+  val dataFile = r.getDataFile(shuffleId, mapId)
+  if (dataFile.exists()) {
+fallbackFileSystem.copyFromLocalFile(

Review comment:
   The API `copyFromLocalFile` doesn't mention, but when the remote file 
exists, does it overwrite or throw exception?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] viirya commented on a change in pull request #30492: [SPARK-33545][CORE] Support Fallback Storage during Worker decommission

2020-11-26 Thread GitBox


viirya commented on a change in pull request #30492:
URL: https://github.com/apache/spark/pull/30492#discussion_r531170833



##
File path: core/src/main/scala/org/apache/spark/storage/FallbackStorage.scala
##
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.storage
+
+import java.io.DataInputStream
+import java.nio.ByteBuffer
+
+import scala.concurrent.Future
+import scala.reflect.ClassTag
+
+import org.apache.hadoop.fs.{FileSystem, Path}
+
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.internal.Logging
+import 
org.apache.spark.internal.config.STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH
+import org.apache.spark.network.buffer.{ManagedBuffer, NioManagedBuffer}
+import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef, RpcTimeout}
+import org.apache.spark.shuffle.{IndexShuffleBlockResolver, ShuffleBlockInfo}
+import org.apache.spark.shuffle.IndexShuffleBlockResolver.NOOP_REDUCE_ID
+import org.apache.spark.util.Utils
+
+/**
+ * A fallback storage used by storage decommissioners.
+ */
+private[storage] class FallbackStorage(conf: SparkConf) extends Logging {
+  require(conf.get(STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH).isDefined)
+
+  private val fallbackPath = new 
Path(conf.get(STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH).get)
+  private val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf)
+  private val fallbackFileSystem = FileSystem.get(fallbackPath.toUri, 
hadoopConf)
+
+  // Visible for testing
+  def copy(
+  shuffleBlockInfo: ShuffleBlockInfo,
+  bm: BlockManager): Unit = {
+val shuffleId = shuffleBlockInfo.shuffleId
+val mapId = shuffleBlockInfo.mapId
+
+bm.migratableResolver match {
+  case r: IndexShuffleBlockResolver =>
+val indexFile = r.getIndexFile(shuffleId, mapId)
+
+if (indexFile.exists()) {
+  fallbackFileSystem.copyFromLocalFile(
+new Path(indexFile.getAbsolutePath),
+new Path(fallbackPath, s"$shuffleId/${indexFile.getName}"))

Review comment:
   Is one `fallbackPath` possible shared between multiple Spark 
applications? If so will this path conflict between apps?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] viirya commented on a change in pull request #30492: [SPARK-33545][CORE] Support Fallback Storage during Worker decommission

2020-11-26 Thread GitBox


viirya commented on a change in pull request #30492:
URL: https://github.com/apache/spark/pull/30492#discussion_r531163858



##
File path: 
core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala
##
@@ -114,6 +115,8 @@ private[storage] class BlockManagerDecommissioner(
   // driver a no longer referenced RDD with shuffle files.
   if 
(bm.migratableResolver.getMigrationBlocks(shuffleBlockInfo).isEmpty) {
 logWarning(s"Skipping block ${shuffleBlockInfo}, block 
deleted.")
+  } else if (fallbackStorage.isDefined) {
+fallbackStorage.foreach(_.copy(shuffleBlockInfo, bm))

Review comment:
   Oh I see. You only put fallback storage into peers when there is no 
other peer executors.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] viirya commented on a change in pull request #30492: [SPARK-33545][CORE] Support Fallback Storage during Worker decommission

2020-11-25 Thread GitBox


viirya commented on a change in pull request #30492:
URL: https://github.com/apache/spark/pull/30492#discussion_r530820010



##
File path: 
core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala
##
@@ -114,6 +115,8 @@ private[storage] class BlockManagerDecommissioner(
   // driver a no longer referenced RDD with shuffle files.
   if 
(bm.migratableResolver.getMigrationBlocks(shuffleBlockInfo).isEmpty) {
 logWarning(s"Skipping block ${shuffleBlockInfo}, block 
deleted.")
+  } else if (fallbackStorage.isDefined) {
+fallbackStorage.foreach(_.copy(shuffleBlockInfo, bm))

Review comment:
   When is the best time to move block to fallback storage during 
decommission? For current change, it just chooses fallback storage once 
encountering failure at first try.
   
   Should we let retrying do the work until reaching 
`maxReplicationFailuresForDecommission` and then move to fallback storage? 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] viirya commented on a change in pull request #30492: [SPARK-33545][CORE] Support Fallback Storage during Worker decommission

2020-11-25 Thread GitBox


viirya commented on a change in pull request #30492:
URL: https://github.com/apache/spark/pull/30492#discussion_r530820010



##
File path: 
core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala
##
@@ -114,6 +115,8 @@ private[storage] class BlockManagerDecommissioner(
   // driver a no longer referenced RDD with shuffle files.
   if 
(bm.migratableResolver.getMigrationBlocks(shuffleBlockInfo).isEmpty) {
 logWarning(s"Skipping block ${shuffleBlockInfo}, block 
deleted.")
+  } else if (fallbackStorage.isDefined) {
+fallbackStorage.foreach(_.copy(shuffleBlockInfo, bm))

Review comment:
   When is the best time to move block to fallback storage during 
decommission? For current change, it just chooses fallback storage once 
encountering failure at first try.
   
   Should we let retrying do it work until reaching 
`maxReplicationFailuresForDecommission` and then move to fallback storage? 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] viirya commented on a change in pull request #30492: [SPARK-33545][CORE] Support Fallback Storage during Worker decommission

2020-11-24 Thread GitBox


viirya commented on a change in pull request #30492:
URL: https://github.com/apache/spark/pull/30492#discussion_r529968023



##
File path: core/src/main/scala/org/apache/spark/storage/FallbackStorage.scala
##
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.storage
+
+import java.io.{DataInputStream, InputStream, IOException}
+import java.nio.ByteBuffer
+
+import scala.concurrent.Future
+import scala.reflect.ClassTag
+
+import org.apache.hadoop.fs.{FileSystem, Path}
+
+import org.apache.spark.{SparkConf, SparkEnv}
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.internal.Logging
+import 
org.apache.spark.internal.config.STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH
+import org.apache.spark.network.buffer.{ManagedBuffer, NioManagedBuffer}
+import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef, RpcTimeout}
+import org.apache.spark.shuffle.{IndexShuffleBlockResolver, ShuffleBlockInfo}
+import org.apache.spark.shuffle.IndexShuffleBlockResolver.NOOP_REDUCE_ID
+import org.apache.spark.util.Utils
+
+/**
+ * A fallback storage used by storage decommissioners.
+ */
+private[storage] class FallbackStorage(conf: SparkConf) extends Logging {
+  require(conf.get(STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH).isDefined)
+
+  private val fallbackPath = new 
Path(conf.get(STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH).get)
+  private val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf)
+  private val fallbackFileSystem = FileSystem.get(fallbackPath.toUri, 
hadoopConf)
+
+  // Visible for testing
+  def copy(
+  shuffleBlockInfo: ShuffleBlockInfo,
+  bm: BlockManager): Unit = {
+val shuffleId = shuffleBlockInfo.shuffleId
+val mapId = shuffleBlockInfo.mapId
+
+bm.migratableResolver match {
+  case r: IndexShuffleBlockResolver =>
+val indexFile = r.getIndexFile(shuffleId, mapId)
+
+if (indexFile.exists()) {
+  fallbackFileSystem.copyFromLocalFile(
+new Path(indexFile.getAbsolutePath),
+new Path(fallbackPath, s"$shuffleId/${indexFile.getName}"))
+
+  val dataFile = r.getDataFile(shuffleId, mapId)
+  if (dataFile.exists()) {
+fallbackFileSystem.copyFromLocalFile(
+  new Path(dataFile.getAbsolutePath),
+  new Path(fallbackPath, s"$shuffleId/${dataFile.getName}"))
+  }
+
+  // Report block statuses
+  val reduceId = NOOP_REDUCE_ID
+  val indexBlockId = ShuffleIndexBlockId(shuffleId, mapId, reduceId)
+  FallbackStorage.reportBlockStatus(bm, indexBlockId, indexFile.length)
+  if (dataFile.exists) {
+val dataBlockId = ShuffleDataBlockId(shuffleId, mapId, reduceId)
+FallbackStorage.reportBlockStatus(bm, dataBlockId, dataFile.length)
+  }
+}
+  case r =>
+logWarning(s"Unsupported Resolver: ${r.getClass.getName}")
+}
+  }
+
+  def exists(shuffleId: Int, filename: String): Boolean = {
+fallbackFileSystem.exists(new Path(fallbackPath, s"$shuffleId/$filename"))
+  }
+}
+
+class NoopRpcEndpointRef(conf: SparkConf) extends RpcEndpointRef(conf) {
+  import scala.concurrent.ExecutionContext.Implicits.global
+  override def address: RpcAddress = null
+  override def name: String = "fallback"
+  override def send(message: Any): Unit = {}
+  override def ask[T: ClassTag](message: Any, timeout: RpcTimeout): Future[T] 
= {
+Future{true.asInstanceOf[T]}
+  }
+}
+
+object FallbackStorage extends Logging {
+  /** We use one block manager id as a place holder. */
+  val FALLBACK_BLOCK_MANAGER_ID: BlockManagerId = BlockManagerId("fallback", 
"remote", 7337)
+
+  def getFallbackStorage(conf: SparkConf): Option[FallbackStorage] = {
+if (conf.get(STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH).isDefined) {
+  Some(new FallbackStorage(conf))
+} else {
+  None
+}
+  }
+
+  /** Register the fallback block manager and its RPC endpoint. */
+  def registerBlockManagerIfNeeded(master: BlockManagerMaster, conf: 
SparkConf): Unit = {
+if (conf.get(STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH).isDefined) {
+  master.registerBlockManager(
+