[GitHub] spark pull request #15505: [SPARK-18890][CORE] Move task serialization from ...
Github user witgo commented on a diff in the pull request: https://github.com/apache/spark/pull/15505#discussion_r96106896 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala --- @@ -244,32 +245,45 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp // Launch tasks returned by a set of resource offers private def launchTasks(tasks: Seq[Seq[TaskDescription]]) { for (task <- tasks.flatten) { -val serializedTask = TaskDescription.encode(task) -if (serializedTask.limit >= maxRpcMessageSize) { - scheduler.taskIdToTaskSetManager.get(task.taskId).foreach { taskSetMgr => -try { - var msg = "Serialized task %s:%d was %d bytes, which exceeds max allowed: " + -"spark.rpc.message.maxSize (%d bytes). Consider increasing " + -"spark.rpc.message.maxSize or using broadcast variables for large values." - msg = msg.format(task.taskId, task.index, serializedTask.limit, maxRpcMessageSize) - taskSetMgr.abort(msg) -} catch { - case e: Exception => logError("Exception in error callback", e) -} - } +val serializedTask = try { + TaskDescription.encode(task) +} catch { + case NonFatal(e) => +abortTaskSetManager(scheduler, task.taskId, + s"Failed to serialize task ${task.taskId}, not attempting to retry it.", Some(e)) +null } -else { + +if (serializedTask != null && serializedTask.limit >= maxRpcMessageSize) { + val msg = "Serialized task %s:%d was %d bytes, which exceeds max allowed: " + +"spark.rpc.message.maxSize (%d bytes). Consider increasing " + +"spark.rpc.message.maxSize or using broadcast variables for large values." + abortTaskSetManager(scheduler, task.taskId, +msg.format(task.taskId, task.index, serializedTask.limit, maxRpcMessageSize)) +} else if (serializedTask != null) { + if (serializedTask.limit > TaskSetManager.TASK_SIZE_TO_WARN_KB * 1024) { --- End diff -- There are two cases when this nested if can be combined into the else if. 1. The following code to appear twice ``` scala val executorData = executorDataMap(task.executorId) executorData.freeCores -= scheduler.CPUS_PER_TASK logDebug(s"Launching task ${task.taskId} on executor id: ${task.executorId} " + s" hostname: ${executorData.executorHost}.") executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask))) ``` 2. Use the return value of the if statement to avoid code duplication ```scala val launchTask = if (serializedTask != null && serializedTask.limit >= maxRpcMessageSize) { false } else if (serializedTask != null && serializedTask.limit > TaskSetManager.TASK_SIZE_TO_WARN_KB * 1024) { true } else { true } if (launchTask) { val executorData = executorDataMap(task.executorId) executorData.freeCores -= scheduler.CPUS_PER_TASK logDebug(s"Launching task ${task.taskId} on executor id: ${task.executorId} " + s" hostname: ${executorData.executorHost}.") executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask))) } ``` The existing code is more concise than the above. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15505: [SPARK-18890][CORE] Move task serialization from ...
Github user witgo commented on a diff in the pull request: https://github.com/apache/spark/pull/15505#discussion_r96108161 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala --- @@ -244,32 +245,45 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp // Launch tasks returned by a set of resource offers private def launchTasks(tasks: Seq[Seq[TaskDescription]]) { for (task <- tasks.flatten) { -val serializedTask = TaskDescription.encode(task) -if (serializedTask.limit >= maxRpcMessageSize) { - scheduler.taskIdToTaskSetManager.get(task.taskId).foreach { taskSetMgr => -try { - var msg = "Serialized task %s:%d was %d bytes, which exceeds max allowed: " + -"spark.rpc.message.maxSize (%d bytes). Consider increasing " + -"spark.rpc.message.maxSize or using broadcast variables for large values." - msg = msg.format(task.taskId, task.index, serializedTask.limit, maxRpcMessageSize) - taskSetMgr.abort(msg) -} catch { - case e: Exception => logError("Exception in error callback", e) -} - } +val serializedTask = try { + TaskDescription.encode(task) +} catch { + case NonFatal(e) => +abortTaskSetManager(scheduler, task.taskId, --- End diff -- Yes, it's a big issue. We can first verify how much performance is lost by we first have to create all the serialized tasks to make sure they all work. Maybe no performance degradation. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15505: [SPARK-18890][CORE] Move task serialization from ...
Github user witgo commented on a diff in the pull request: https://github.com/apache/spark/pull/15505#discussion_r95492638 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskDescription.scala --- @@ -52,7 +55,36 @@ private[spark] class TaskDescription( val addedFiles: Map[String, Long], val addedJars: Map[String, Long], val properties: Properties, -val serializedTask: ByteBuffer) { +private var serializedTask_ : ByteBuffer, +private var task_ : Task[_] = null) extends Logging { + + def this( + taskId: Long, + attemptNumber: Int, + executorId: String, + name: String, + index: Int, // Index within this task's TaskSet + addedFiles: Map[String, Long], + addedJars: Map[String, Long], + properties: Properties, + task: Task[_]) { +this(taskId, attemptNumber, executorId, name, index, + addedFiles, addedJars, properties, null, task) + } + + lazy val serializedTask: ByteBuffer = { +if (serializedTask_ == null) { + serializedTask_ = try { +ByteBuffer.wrap(Utils.serialize(task_)) + } catch { +case NonFatal(e) => + val msg = s"Failed to serialize task $taskId, not attempting to retry it." + logError(msg, e) + throw new TaskNotSerializableException(e) + } +} +serializedTask_ --- End diff -- Ok, I agree with your, I will modify the code. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15505: [SPARK-18890][CORE] Move task serialization from ...
Github user witgo commented on a diff in the pull request: https://github.com/apache/spark/pull/15505#discussion_r95298151 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskDescription.scala --- @@ -52,7 +55,36 @@ private[spark] class TaskDescription( val addedFiles: Map[String, Long], val addedJars: Map[String, Long], val properties: Properties, -val serializedTask: ByteBuffer) { +private var serializedTask_ : ByteBuffer, +private var task_ : Task[_] = null) extends Logging { --- End diff -- @squito I agree with you, I will modify it. @kayousterhout The main reason for this is to reduce the amount of code changes, we need to change more code files to implement what you say. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15505: [SPARK-18890][CORE] Move task serialization from ...
Github user witgo commented on a diff in the pull request: https://github.com/apache/spark/pull/15505#discussion_r95933009 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskDescription.scala --- @@ -52,7 +55,43 @@ private[spark] class TaskDescription( val addedFiles: Map[String, Long], val addedJars: Map[String, Long], val properties: Properties, -val serializedTask: ByteBuffer) { +private var serializedTask_ : ByteBuffer) extends Logging { --- End diff -- Another implementation: https://github.com/witgo/spark/commit/4fbf30a568ed61982e17757f9df3c35cb9d64871 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15505: [SPARK-18890][CORE] Move task serialization from ...
Github user witgo commented on a diff in the pull request: https://github.com/apache/spark/pull/15505#discussion_r95753220 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskDescription.scala --- @@ -52,7 +55,43 @@ private[spark] class TaskDescription( val addedFiles: Map[String, Long], val addedJars: Map[String, Long], val properties: Properties, -val serializedTask: ByteBuffer) { +private var serializedTask_ : ByteBuffer) extends Logging { --- End diff -- How about this? ``` scala private[spark] class TaskDescription( val taskId: Long, val attemptNumber: Int, val executorId: String, val name: String, val index: Int,// Index within this task's TaskSet val addedFiles: Map[String, Long], val addedJars: Map[String, Long], val properties: Properties, private var serializedTask_ : ByteBuffer) extends Logging { def this( taskId: Long, attemptNumber: Int, executorId: String, name: String, index: Int, // Index within this task's TaskSet addedFiles: Map[String, Long], addedJars: Map[String, Long], properties: Properties, task: Task[_]) { this(taskId, attemptNumber, executorId, name, index, addedFiles, addedJars, properties, null.asInstanceOf[ByteBuffer]) task_ = task } private var task_ : Task[_] = null private def serializedTask: ByteBuffer = { if (serializedTask_ == null) { // This is where we serialize the task on the driver before sending it to the executor. // This is not done when creating the TaskDescription so we can postpone this serialization // to later in the scheduling process -- particularly, // so it can happen in another thread by the CoarseGrainedSchedulerBackend. // On the executors, this will already be populated by decode serializedTask_ = try { ByteBuffer.wrap(Utils.serialize(task_)) } catch { case NonFatal(e) => val msg = s"Failed to serialize task $taskId, not attempting to retry it." logError(msg, e) throw new TaskNotSerializableException(e) } } serializedTask_ } def getTask[_](loader: ClassLoader): Task[_] = { if (task_ == null) { task_ = Utils.deserialize(serializedTask, loader).asInstanceOf[Task[_]] } return task_ } override def toString: String = "TaskDescription(TID=%d, index=%d)".format(taskId, index) } ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15505: [SPARK-18890][CORE] Move task serialization from ...
Github user witgo commented on a diff in the pull request: https://github.com/apache/spark/pull/15505#discussion_r95304052 --- Diff: core/src/main/scala/org/apache/spark/util/Utils.scala --- @@ -149,7 +149,12 @@ private[spark] object Utils extends Logging { /** Deserialize an object using Java serialization and the given ClassLoader */ def deserialize[T](bytes: Array[Byte], loader: ClassLoader): T = { -val bis = new ByteArrayInputStream(bytes) +deserialize(ByteBuffer.wrap(bytes), loader) + } + + /** Deserialize an object using Java serialization and the given ClassLoader */ + def deserialize[T](bytes: ByteBuffer, loader: ClassLoader): T = { +val bis = new ByteBufferInputStream(bytes) --- End diff -- I don't think there's a problem here. it is covered by many test cases. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15505: [SPARK-18890][CORE] Move task serialization from the Tas...
Github user witgo commented on the issue: https://github.com/apache/spark/pull/15505 @squito In the local mode, the performance is relatively less important, we only guarantee that there will be no performance degradation on it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15505: [SPARK-18890][CORE] Move task serialization from ...
Github user witgo commented on a diff in the pull request: https://github.com/apache/spark/pull/15505#discussion_r95305125 --- Diff: core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala --- @@ -592,47 +579,6 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg assert(manager.resourceOffer("execB", "host2", RACK_LOCAL).get.index === 1) } - test("do not emit warning when serialized task is small") { --- End diff -- Ok, I will add this test case back. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15505: [SPARK-17931][CORE] taskScheduler has some unneeded seri...
Github user witgo commented on the issue: https://github.com/apache/spark/pull/15505 @kayousterhout I agree with you, and do as you say. @squito These a good idea, and worth a try. We can write a prototype to verify it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15505: [SPARK-17931][CORE] taskScheduler has some unneeded seri...
Github user witgo commented on the issue: https://github.com/apache/spark/pull/15505 @kayousterhout Okay, I'll do the code revision this weekend. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15505: [WIP][SPARK-18890][CORE] Multithreading serialization ta...
Github user witgo commented on the issue: https://github.com/apache/spark/pull/15505 @kayousterhout OK, Done --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15505: [SPARK-17931][CORE] taskScheduler has some unneed...
Github user witgo commented on a diff in the pull request: https://github.com/apache/spark/pull/15505#discussion_r93558564 --- Diff: core/src/main/scala/org/apache/spark/scheduler/local/LocalSchedulerBackend.scala --- @@ -59,6 +62,12 @@ private[spark] class LocalEndpoint( private val executor = new Executor( localExecutorId, localExecutorHostname, SparkEnv.get, userClassPath, isLocal = true) + private val serializer = new ThreadLocal[SerializerInstance] { --- End diff -- LocalEndpoint does not support multiple threads,the ThreadLocal is not necessary. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15505: [SPARK-17931][CORE] taskScheduler has some unneed...
Github user witgo commented on a diff in the pull request: https://github.com/apache/spark/pull/15505#discussion_r93558472 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskDescription.scala --- @@ -17,27 +17,179 @@ package org.apache.spark.scheduler +import java.io._ import java.nio.ByteBuffer +import java.util.Properties -import org.apache.spark.util.SerializableBuffer +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import org.apache.spark.serializer.SerializerInstance +import org.apache.spark.util.{ByteBufferInputStream, ByteBufferOutputStream} /** * Description of a task that gets passed onto executors to be executed, usually created by * [[TaskSetManager.resourceOffer]]. */ -private[spark] class TaskDescription( +private[spark] class TaskDescription private( val taskId: Long, val attemptNumber: Int, val executorId: String, val name: String, -val index: Int,// Index within this task's TaskSet -_serializedTask: ByteBuffer) - extends Serializable { +val index: Int, --- End diff -- Done --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15505: [SPARK-17931][CORE] taskScheduler has some unneed...
Github user witgo commented on a diff in the pull request: https://github.com/apache/spark/pull/15505#discussion_r93558527 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskDescription.scala --- @@ -17,27 +17,179 @@ package org.apache.spark.scheduler +import java.io._ import java.nio.ByteBuffer +import java.util.Properties -import org.apache.spark.util.SerializableBuffer +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import org.apache.spark.serializer.SerializerInstance +import org.apache.spark.util.{ByteBufferInputStream, ByteBufferOutputStream} /** * Description of a task that gets passed onto executors to be executed, usually created by * [[TaskSetManager.resourceOffer]]. */ -private[spark] class TaskDescription( +private[spark] class TaskDescription private( val taskId: Long, val attemptNumber: Int, val executorId: String, val name: String, -val index: Int,// Index within this task's TaskSet -_serializedTask: ByteBuffer) - extends Serializable { +val index: Int, +val taskFiles: mutable.Map[String, Long], +val taskJars: mutable.Map[String, Long], + private var task_ : Task[_], + private var taskBytes: InputStream, + private var taskProps: Properties) { + + def this( + taskId: Long, + attemptNumber: Int, + executorId: String, + name: String, + index: Int, + taskFiles: mutable.Map[String, Long], + taskJars: mutable.Map[String, Long], + task: Task[_]) { +this(taskId, attemptNumber, executorId, name, index, taskFiles, taskJars, task, + null.asInstanceOf[InputStream], + null.asInstanceOf[Properties]) + } + + @throws[IOException] + def encode(serializer: SerializerInstance): ByteBuffer = { +val out = new ByteBufferOutputStream(4096) +encode(out, serializer) +out.close() +out.toByteBuffer + } + + @throws[IOException] + def encode(outputStream: OutputStream, serializer: SerializerInstance): Unit = { +val out = new DataOutputStream(outputStream) +// Write taskId +out.writeLong(taskId) + +// Write attemptNumber +out.writeInt(attemptNumber) + +// Write executorId +out.writeUTF(executorId) + +// Write name +out.writeUTF(name) - // Because ByteBuffers are not serializable, wrap the task in a SerializableBuffer - private val buffer = new SerializableBuffer(_serializedTask) +// Write index +out.writeInt(index) - def serializedTask: ByteBuffer = buffer.value +// Write taskFiles +out.writeInt(taskFiles.size) +for ((name, timestamp) <- taskFiles) { + out.writeUTF(name) + out.writeLong(timestamp) +} + +// Write taskJars +out.writeInt(taskJars.size) +for ((name, timestamp) <- taskJars) { + out.writeUTF(name) + out.writeLong(timestamp) +} + +// Write the task properties separately so it is available before full task deserialization. +val taskProps = task_.localProperties +val propertyNames = taskProps.stringPropertyNames +out.writeInt(propertyNames.size()) +propertyNames.asScala.foreach { key => + val value = taskProps.getProperty(key) + out.writeUTF(key) + out.writeUTF(value) +} + +// Write the task itself and finish +val serializeStream = serializer.serializeStream(out) +serializeStream.writeValue(task_) +serializeStream.flush() + } + + def task(ser: SerializerInstance): Task[_] = { +if (task_ == null) { + val deserializeStream = ser.deserializeStream(taskBytes) + task_ = deserializeStream.readValue[Task[_]]() + task_.localProperties = taskProps + deserializeStream.close() + taskProps = null + taskBytes = null +} +task_ --- End diff -- The intent of this code modification is to move the deserialization operation to the TaskDescription class. so that taskBytes can be set to private. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15505: [SPARK-17931][CORE] taskScheduler has some unneed...
Github user witgo commented on a diff in the pull request: https://github.com/apache/spark/pull/15505#discussion_r93558450 --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala --- @@ -993,6 +993,12 @@ class DAGScheduler( JavaUtils.bufferToArray(closureSerializer.serialize((stage.rdd, stage.func): AnyRef)) } + if (taskBinaryBytes.length > TaskSetManager.TASK_SIZE_TO_WARN_KB * 1024) { --- End diff -- The original code version seems to be warning about full task, including rdd, shuffle dep, closures, partition and so on. I think that the previous warning code was retained because the #1498 did not completely modify it, In most cases, the size of serialized TaskDescription is smaller than tasks`s Broadcast. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15505: [SPARK-17931][CORE] taskScheduler has some unneeded seri...
Github user witgo commented on the issue: https://github.com/apache/spark/pull/15505 @kayousterhout @squito I think Kay's approach is a good idea. We can first merging #16053, SPARK-18890 related code(including multi-threaded serialization TaskDescription) to stay in the PR. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15505: [SPARK-17931][CORE] taskScheduler has some unneed...
Github user witgo commented on a diff in the pull request: https://github.com/apache/spark/pull/15505#discussion_r93558401 --- Diff: core/src/main/scala/org/apache/spark/executor/Executor.scala --- @@ -136,14 +136,10 @@ private[spark] class Executor( startDriverHeartbeater() def launchTask( - context: ExecutorBackend, - taskId: Long, - attemptNumber: Int, - taskName: String, - serializedTask: ByteBuffer): Unit = { -val tr = new TaskRunner(context, taskId = taskId, attemptNumber = attemptNumber, taskName, - serializedTask) -runningTasks.put(taskId, tr) +context: ExecutorBackend, +taskDesc: TaskDescription): Unit = { --- End diff -- Done --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15505: [SPARK-17931][CORE] taskScheduler has some unneed...
Github user witgo commented on a diff in the pull request: https://github.com/apache/spark/pull/15505#discussion_r93558431 --- Diff: core/src/main/scala/org/apache/spark/executor/Executor.scala --- @@ -232,6 +225,13 @@ private[spark] class Executor( } override def run(): Unit = { + val taskId = taskDesc.taskId + val attemptNumber = taskDesc.attemptNumber + val taskName = taskDesc.name + val taskFiles = taskDesc.taskFiles + val taskJars = taskDesc.taskJars + val taskProps = taskDesc.taskProperties --- End diff -- If use `taskDesc.taskId`, we need to modify a lot of code. eg: ```scala val errMsg = s"${releasedLocks.size} block locks were not released by TID = $taskId:\n" + releasedLocks.mkString("[", ", ", "]") ``` The length of this line of `s"${releasedLocks.size} block locks were not released by TID = ${taskDesc.taskId}:\n"` is greater than 100 How do I move the code to: ```scala class TaskRunner( execBackend: ExecutorBackend, val taskDesc: TaskDescription) extends Runnable { val taskId = taskDesc.taskId val attemptNumber = taskDesc.attemptNumber val taskName = taskDesc.name val taskFiles = taskDesc.taskFiles val taskJars = taskDesc.taskJars val taskProps = taskDesc.taskProperties ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17329: [SPARK-19991]FileSegmentManagedBuffer performance...
Github user witgo commented on a diff in the pull request: https://github.com/apache/spark/pull/17329#discussion_r107841105 --- Diff: common/network-common/src/main/java/org/apache/spark/network/buffer/FileSegmentManagedBuffer.java --- @@ -37,13 +37,24 @@ * A {@link ManagedBuffer} backed by a segment in a file. */ public final class FileSegmentManagedBuffer extends ManagedBuffer { - private final TransportConf conf; + private final boolean lazyFileDescriptor; + private final int memoryMapBytes; private final File file; private final long offset; private final long length; public FileSegmentManagedBuffer(TransportConf conf, File file, long offset, long length) { -this.conf = conf; +this(conf.lazyFileDescriptor(), conf.memoryMapBytes(), file, offset, length); + } + + public FileSegmentManagedBuffer( --- End diff -- Yes, But the above code does not optimize performance, `FileSegmentManagedBuffer.convertToNetty` method is also called only once in the master branch code . --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17329: [SPARK-19991]FileSegmentManagedBuffer performance...
Github user witgo commented on a diff in the pull request: https://github.com/apache/spark/pull/17329#discussion_r108027203 --- Diff: common/network-common/src/main/java/org/apache/spark/network/buffer/FileSegmentManagedBuffer.java --- @@ -37,13 +37,24 @@ * A {@link ManagedBuffer} backed by a segment in a file. */ public final class FileSegmentManagedBuffer extends ManagedBuffer { - private final TransportConf conf; + private final boolean lazyFileDescriptor; + private final int memoryMapBytes; private final File file; private final long offset; private final long length; public FileSegmentManagedBuffer(TransportConf conf, File file, long offset, long length) { -this.conf = conf; +this(conf.lazyFileDescriptor(), conf.memoryMapBytes(), file, offset, length); + } + + public FileSegmentManagedBuffer( --- End diff -- This branch [SPARK-19991_try2 ](https://github.com/witgo/spark/commits/SPARK-19991_try2) needs `244.45` s in my test --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17329: [SPARK-19991]FileSegmentManagedBuffer performance...
Github user witgo commented on a diff in the pull request: https://github.com/apache/spark/pull/17329#discussion_r108032378 --- Diff: common/network-common/src/main/java/org/apache/spark/network/buffer/FileSegmentManagedBuffer.java --- @@ -37,13 +37,24 @@ * A {@link ManagedBuffer} backed by a segment in a file. */ public final class FileSegmentManagedBuffer extends ManagedBuffer { - private final TransportConf conf; + private final boolean lazyFileDescriptor; + private final int memoryMapBytes; private final File file; private final long offset; private final long length; public FileSegmentManagedBuffer(TransportConf conf, File file, long offset, long length) { -this.conf = conf; +this(conf.lazyFileDescriptor(), conf.memoryMapBytes(), file, offset, length); + } + + public FileSegmentManagedBuffer( --- End diff -- Suppose there are E Executor in the cluster, a shuffle process has M Map task, R reduce task, in the master branch will be created: 1. Up to M * R FileSegmentManagedBuffer instances 2. Up to 2 * M * R NoSuchElementException instances in this PR will be created: 1. Up to M * R FileSegmentManagedBuffer instances 2. Up to 2 * NoSuchElementException instances (ExternalShuffleBlockResolver and IndexShuffleBlockResolver are created once executor starts and They call the new constructor to create a FileSegmentManagedBuffer instance) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17329: [SPARK-19991]FileSegmentManagedBuffer performance...
Github user witgo closed the pull request at: https://github.com/apache/spark/pull/17329 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17480: SPARK-20079: Re registration of AM hangs spark cl...
GitHub user witgo opened a pull request: https://github.com/apache/spark/pull/17480 SPARK-20079: Re registration of AM hangs spark cluster in yarn-client mode. When there is some need of task scheduling, `ExecutorAllocationManager` instances do not reset the `initializing` field ## How was this patch tested? Unit tests. You can merge this pull request into a Git repository by running: $ git pull https://github.com/witgo/spark SPARK-20079 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/17480.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #17480 commit b91dfeb4fea445727f6b5430aa947f35a287d56d Author: Guoqiang Li <wi...@qq.com> Date: 2017-03-30T14:17:49Z SPARK-20079: Re registration of AM hangs spark cluster in yarn-client mode. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17329: [SPARK-19991]FileSegmentManagedBuffer performance...
Github user witgo commented on a diff in the pull request: https://github.com/apache/spark/pull/17329#discussion_r107706851 --- Diff: common/network-common/src/main/java/org/apache/spark/network/buffer/FileSegmentManagedBuffer.java --- @@ -37,13 +37,24 @@ * A {@link ManagedBuffer} backed by a segment in a file. */ public final class FileSegmentManagedBuffer extends ManagedBuffer { - private final TransportConf conf; + private final boolean lazyFileDescriptor; + private final int memoryMapBytes; private final File file; private final long offset; private final long length; public FileSegmentManagedBuffer(TransportConf conf, File file, long offset, long length) { -this.conf = conf; +this(conf.lazyFileDescriptor(), conf.memoryMapBytes(), file, offset, length); + } + + public FileSegmentManagedBuffer( --- End diff -- Like the following code? ```java public FileSegmentManagedBuffer(TransportConf conf, File file, long offset, long length) { this.lazyFileDescriptor = conf.lazyFileDescriptor(); this.memoryMapBytes = conf.memoryMapBytes(); this.file = file; this.offset = offset; this.length = length; } ``` the code `conf.lazyFileDescriptor();` or `conf.memoryMapBytes();` creates a NoSuchElementException instance. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17329: [SPARK-19991]FileSegmentManagedBuffer performance...
Github user witgo commented on a diff in the pull request: https://github.com/apache/spark/pull/17329#discussion_r107572297 --- Diff: common/network-common/src/main/java/org/apache/spark/network/buffer/FileSegmentManagedBuffer.java --- @@ -37,13 +37,24 @@ * A {@link ManagedBuffer} backed by a segment in a file. */ public final class FileSegmentManagedBuffer extends ManagedBuffer { - private final TransportConf conf; + private final boolean lazyFileDescriptor; + private final int memoryMapBytes; private final File file; private final long offset; private final long length; public FileSegmentManagedBuffer(TransportConf conf, File file, long offset, long length) { -this.conf = conf; +this(conf.lazyFileDescriptor(), conf.memoryMapBytes(), file, offset, length); + } + + public FileSegmentManagedBuffer( --- End diff -- That will change a lot of code, right? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17329: [SPARK-19991]FileSegmentManagedBuffer performance...
Github user witgo commented on a diff in the pull request: https://github.com/apache/spark/pull/17329#discussion_r108049460 --- Diff: common/network-common/src/main/java/org/apache/spark/network/buffer/FileSegmentManagedBuffer.java --- @@ -37,13 +37,24 @@ * A {@link ManagedBuffer} backed by a segment in a file. */ public final class FileSegmentManagedBuffer extends ManagedBuffer { - private final TransportConf conf; + private final boolean lazyFileDescriptor; + private final int memoryMapBytes; private final File file; private final long offset; private final long length; public FileSegmentManagedBuffer(TransportConf conf, File file, long offset, long length) { -this.conf = conf; +this(conf.lazyFileDescriptor(), conf.memoryMapBytes(), file, offset, length); + } + + public FileSegmentManagedBuffer( --- End diff -- Sorry,I didn't get your idea. Can you write some code? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17329: [SPARK-19991]FileSegmentManagedBuffer performance...
Github user witgo commented on a diff in the pull request: https://github.com/apache/spark/pull/17329#discussion_r106781598 --- Diff: common/network-common/src/main/java/org/apache/spark/network/buffer/FileSegmentManagedBuffer.java --- @@ -37,13 +37,24 @@ * A {@link ManagedBuffer} backed by a segment in a file. */ public final class FileSegmentManagedBuffer extends ManagedBuffer { - private final TransportConf conf; + private final boolean lazyFileDescriptor; + private final int memoryMapBytes; private final File file; private final long offset; private final long length; public FileSegmentManagedBuffer(TransportConf conf, File file, long offset, long length) { -this.conf = conf; +this(conf.lazyFileDescriptor(), conf.memoryMapBytes(), file, offset, length); + } + + public FileSegmentManagedBuffer( --- End diff -- Oh, do you have a better idea? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17329: [SPARK-19991]FileSegmentManagedBuffer performance improv...
Github user witgo commented on the issue: https://github.com/apache/spark/pull/17329 ```java public class HadoopConfigProvider extends ConfigProvider { private final Configuration conf; public HadoopConfigProvider(Configuration conf) { this.conf = conf; } @Override public String get(String name) { String value = conf.get(name); // When do not set the value of spark.storage.memoryMapThreshold or spark.shuffle.io.lazyFD, // When the value of `value` is null if (value == null) { throw new NoSuchElementException(name); } return value; } @Override public Iterable<Map.Entry<String, String>> getAll() { return conf; } } --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17329: [SPARK-19991]FileSegmentManagedBuffer performance...
GitHub user witgo opened a pull request: https://github.com/apache/spark/pull/17329 [SPARK-19991]FileSegmentManagedBuffer performance improvement FileSegmentManagedBuffer performance improvement. ## What changes were proposed in this pull request? When we do not set the value of the configuration items `spark.storage.memoryMapThreshold` and `spark.shuffle.io.lazyFD`, each call to the cFileSegmentManagedBuffer.nioByteBuffer or FileSegmentManagedBuffer.createInputStream method creates a NoSuchElementException instance. This is a more time-consuming operation. In the use case, this PR can improve the performance of about 3.5% The test code: ``` scala (1 to 10).foreach { i => val numPartition = 1 val rdd = sc.parallelize(0 until numPartition).repartition(numPartition).flatMap { t => (0 until numPartition).map(r => r * numPartition + t) }.repartition(numPartition) val serializeStart = System.currentTimeMillis() rdd.sum() val serializeFinish = System.currentTimeMillis() println(f"Test $i: ${(serializeFinish - serializeStart) / 1000D}%1.2f") } ``` and `spark-defaults.conf` file: ``` spark.master yarn-client spark.executor.instances 20 spark.driver.memory 64g spark.executor.memory 30g spark.executor.cores 5 spark.default.parallelism 100 spark.sql.shuffle.partitions 100 spark.serializer org.apache.spark.serializer.KryoSerializer spark.driver.maxResultSize0 spark.ui.enabled false spark.driver.extraJavaOptions -XX:+UseG1GC -XX:+UseStringDeduplication -XX:G1HeapRegionSize=16M -XX:MetaspaceSize=512M spark.executor.extraJavaOptions -XX:+UseG1GC -XX:+UseStringDeduplication -XX:G1HeapRegionSize=16M -XX:MetaspaceSize=256M spark.cleaner.referenceTracking.blocking true spark.cleaner.referenceTracking.blocking.shuffle true ``` The test results are as follows | [SPARK-19991](https://github.com/witgo/spark/tree/SPARK-19991) |https://github.com/apache/spark/commit/68ea290b3aa89b2a539d13ea2c18bdb5a651b2bf| |---| --- | |226.09 s| 235.21 s| ## How was this patch tested? Existing tests. You can merge this pull request into a Git repository by running: $ git pull https://github.com/witgo/spark SPARK-19991 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/17329.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #17329 commit abcfc79991ecd1d5cef2cd1e275b872695ba19d9 Author: Guoqiang Li <liguoqia...@huawei.com> Date: 2017-03-17T03:19:37Z FileSegmentManagedBuffer performance improvement --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17480: [SPARK-20079][Core][yarn] Re registration of AM h...
Github user witgo commented on a diff in the pull request: https://github.com/apache/spark/pull/17480#discussion_r109575470 --- Diff: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala --- @@ -249,7 +249,9 @@ private[spark] class ExecutorAllocationManager( * yarn-client mode when AM re-registers after a failure. */ def reset(): Unit = synchronized { -initializing = true +if (maxNumExecutorsNeeded() == 0) { --- End diff -- Done. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17480: [SPARK-20079][Core][yarn] Re registration of AM h...
Github user witgo commented on a diff in the pull request: https://github.com/apache/spark/pull/17480#discussion_r110796578 --- Diff: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala --- @@ -249,7 +249,14 @@ private[spark] class ExecutorAllocationManager( * yarn-client mode when AM re-registers after a failure. */ def reset(): Unit = synchronized { -initializing = true +/** + * When some tasks need to be scheduled and initial executor = 0, resetting the initializing + * field may cause it to not be set to false in yarn. + * SPARK-20079: https://issues.apache.org/jira/browse/SPARK-20079 + */ +if (maxNumExecutorsNeeded() == 0) { + initializing = true --- End diff -- @jerryshao Can you explain the following comments? I do not understand. ```scala if (initializing) { // Do not change our target while we are still initializing, // Otherwise the first job may have to ramp up unnecessarily 0 } else if (maxNeeded < numExecutorsTarget) { ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17480: [SPARK-20079][Core][yarn] Re registration of AM hangs sp...
Github user witgo commented on the issue: https://github.com/apache/spark/pull/17480 @jerryshao Yes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17480: [SPARK-20079][Core][yarn] Re registration of AM h...
Github user witgo commented on a diff in the pull request: https://github.com/apache/spark/pull/17480#discussion_r53390 --- Diff: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala --- @@ -249,7 +249,14 @@ private[spark] class ExecutorAllocationManager( * yarn-client mode when AM re-registers after a failure. */ def reset(): Unit = synchronized { -initializing = true +/** + * When some tasks need to be scheduled and initial executor = 0, resetting the initializing + * field may cause it to not be set to false in yarn. + * SPARK-20079: https://issues.apache.org/jira/browse/SPARK-20079 + */ +if (maxNumExecutorsNeeded() == 0) { + initializing = true --- End diff -- OK. I've got it, thx. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17480: [SPARK-20079][Core][yarn] Re registration of AM h...
Github user witgo commented on a diff in the pull request: https://github.com/apache/spark/pull/17480#discussion_r110361779 --- Diff: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala --- @@ -249,7 +249,14 @@ private[spark] class ExecutorAllocationManager( * yarn-client mode when AM re-registers after a failure. */ def reset(): Unit = synchronized { -initializing = true +/** + * When some tasks need to be scheduled, resetting the initializing field may cause + * it to not be set to false in yarn. --- End diff -- Currently this method will only be called in yarn-client mode when AM re-registers after a failure. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17480: [SPARK-20079][Core][yarn] Re registration of AM h...
Github user witgo commented on a diff in the pull request: https://github.com/apache/spark/pull/17480#discussion_r110804557 --- Diff: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala --- @@ -249,7 +249,14 @@ private[spark] class ExecutorAllocationManager( * yarn-client mode when AM re-registers after a failure. */ def reset(): Unit = synchronized { -initializing = true +/** + * When some tasks need to be scheduled and initial executor = 0, resetting the initializing + * field may cause it to not be set to false in yarn. + * SPARK-20079: https://issues.apache.org/jira/browse/SPARK-20079 + */ +if (maxNumExecutorsNeeded() == 0) { + initializing = true --- End diff -- The following code should have a similar function? ```scala numExecutorsTarget = initialNumExecutors // The default value is 0 numExecutorsToAdd = 1 ``` The incoming parameters of the client.requestTotalExecutors method are 1,2,4,8,16... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17480: [SPARK-20079][Core][yarn] Re registration of AM hangs sp...
Github user witgo commented on the issue: https://github.com/apache/spark/pull/17480 OK, I will do the work at weekends. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17480: [SPARK-20079][Core][yarn] Re registration of AM h...
Github user witgo commented on a diff in the pull request: https://github.com/apache/spark/pull/17480#discussion_r112825043 --- Diff: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala --- @@ -249,7 +249,6 @@ private[spark] class ExecutorAllocationManager( * yarn-client mode when AM re-registers after a failure. */ def reset(): Unit = synchronized { -initializing = true --- End diff -- @jerryshao @vanzin I think that deleting the `initializing = true` is a good idea. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17116: [SPARK-18890][CORE](try 2) Move task serializatio...
Github user witgo closed the pull request at: https://github.com/apache/spark/pull/17116 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17139: [SPARK-18890][CORE](try 3) Move task serialization from ...
Github user witgo commented on the issue: https://github.com/apache/spark/pull/17139 @kayousterhout The test report has been updated. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #15505: [SPARK-18890][CORE] Move task serialization from ...
Github user witgo closed the pull request at: https://github.com/apache/spark/pull/15505 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15505: [SPARK-18890][CORE] Move task serialization from the Tas...
Github user witgo commented on the issue: https://github.com/apache/spark/pull/15505 Yes, maybe a multithreaded serialization task code can have a better performance, let me close the PR --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #15505: [SPARK-18890][CORE] Move task serialization from the Tas...
Github user witgo commented on the issue: https://github.com/apache/spark/pull/15505 [SPARK-18890_20170303](https://github.com/witgo/spark/commits/SPARK-18890_20170303) `s code is older but the test case running time is 5.2 s --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17139: [SPARK-18890][CORE](try 3) Move task serialization from ...
Github user witgo commented on the issue: https://github.com/apache/spark/pull/17139 ping @kayousterhout @squito --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17139: [SPARK-18890][CORE](try 3) Move task serialization from ...
Github user witgo commented on the issue: https://github.com/apache/spark/pull/17139 Added the multi-threaded code for serialization `TaskDescription` . --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17567: [SPARK-19991][CORE][YARN] FileSegmentManagedBuffer perfo...
Github user witgo commented on the issue: https://github.com/apache/spark/pull/17567 LGTM. Are there any performance test reports? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17567: [SPARK-19991][CORE][YARN] FileSegmentManagedBuffer perfo...
Github user witgo commented on the issue: https://github.com/apache/spark/pull/17567 OK, I see. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17480: [SPARK-20079][Core][yarn] Re registration of AM hangs sp...
Github user witgo commented on the issue: https://github.com/apache/spark/pull/17480 The ExecutorAllocationManager.reset method is called when re-registering AM, which sets the ExecutorAllocationManager.initializing field true. When this field is true, the Driver does not start a new executor from the AM request. The following two cases will cause the field to False 1. executor idle for some time. 2. There are new stages to be submitted If the stage after the submission, AM was killed and restart, the above two cases will not appear. 1. When AM is killed, the yarn will kill all running containers. All execuotr will be lost and no executor will be idle. 2. No surviving executor, resulting in the current stage will never be completed, DAG will not submit a new stage. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17480: [SPARK-20079][Core][yarn] Re registration of AM hangs sp...
Github user witgo commented on the issue: https://github.com/apache/spark/pull/17480 @vanzin Sorry, I do not understand what you mean. Do you submit a new PR to your own ideas? If you can, I will close this PR. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17882: [WIP][SPARK-20079][yarn] Re registration of AM ha...
Github user witgo closed the pull request at: https://github.com/apache/spark/pull/17882 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17882: [WIP][SPARK-20079][yarn] Re registration of AM hangs spa...
Github user witgo commented on the issue: https://github.com/apache/spark/pull/17882 I'm very sorry, I haven't taken the time to update this code recently. @vanzin , thank you for your work. I'll close this PR. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14995: [Test Only][SPARK-6235][CORE]Address various 2G limits
Github user witgo commented on the issue: https://github.com/apache/spark/pull/14995 I did not do much testing, but I think it can be used in the production environment the url: https://github.com/witgo/spark/tree/SPARK-6235_Address_various_2G_limits --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18008: [SPARK-20776] Fix perf. problems in JobProgressListener ...
Github user witgo commented on the issue: https://github.com/apache/spark/pull/18008 @JoshRosen , what's the tool in your screenshot? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18008: [SPARK-20776] Fix perf. problems in JobProgressListener ...
Github user witgo commented on the issue: https://github.com/apache/spark/pull/18008 @JoshRosen I see, Thank you. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17139: [SPARK-19486][CORE](try 3) Investigate using multiple th...
Github user witgo commented on the issue: https://github.com/apache/spark/pull/17139 @jiangxb1987 ,Yes do you have any questions? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17882: [WIP][SPARK-20079][try 2][yarn] Re registration of AM ha...
Github user witgo commented on the issue: https://github.com/apache/spark/pull/17882 @jerryshao @vanzin Would you take some time to review this PR? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17882: [SPARK-20079][yarn] Re registration of AM hangs s...
Github user witgo commented on a diff in the pull request: https://github.com/apache/spark/pull/17882#discussion_r120652302 --- Diff: resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala --- @@ -68,6 +68,8 @@ private[spark] abstract class YarnSchedulerBackend( // Flag to specify whether this schedulerBackend should be reset. private var shouldResetOnAmRegister = false + private var lastRequestExecutors = RequestExecutors(-1, -1, Map.empty, Set.empty) --- End diff -- Done. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #17882: [SPARK-20079][yarn] Re registration of AM hangs spark cl...
Github user witgo commented on the issue: https://github.com/apache/spark/pull/17882 @vanzin Done. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17882: [SPARK-20079][yarn] Re registration of AM hangs s...
Github user witgo commented on a diff in the pull request: https://github.com/apache/spark/pull/17882#discussion_r120644588 --- Diff: resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala --- @@ -176,16 +179,6 @@ private[spark] abstract class YarnSchedulerBackend( } /** - * Reset the state of SchedulerBackend to the initial state. This is happened when AM is failed - * and re-registered itself to driver after a failure. The stale state in driver should be - * cleaned. - */ - override protected def reset(): Unit = { --- End diff -- From the current code, we reset the state of `ExecutorAllocationManager` is not correct. Iy causes the `RetrieveLastRequestExecutors` message to not work properly --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17882: [WIP][SPARK-20079][yarn] Re registration of AM ha...
Github user witgo commented on a diff in the pull request: https://github.com/apache/spark/pull/17882#discussion_r121972913 --- Diff: resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala --- @@ -176,16 +179,6 @@ private[spark] abstract class YarnSchedulerBackend( } /** - * Reset the state of SchedulerBackend to the initial state. This is happened when AM is failed - * and re-registered itself to driver after a failure. The stale state in driver should be - * cleaned. - */ - override protected def reset(): Unit = { --- End diff -- I think `ExecutorAllocationManager#reset` is still necessary, but the following code should be removed ```scala initializing = true numExecutorsTarget = initialNumExecutors numExecutorsToAdd = 1 ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17882: [WIP][SPARK-20079][try 2][yarn] Re registration o...
GitHub user witgo opened a pull request: https://github.com/apache/spark/pull/17882 [WIP][SPARK-20079][try 2][yarn] Re registration of AM hangs spark cluster in yarn-client mode. See #17480 You can merge this pull request into a Git repository by running: $ git pull https://github.com/witgo/spark SPARK-20079_try2 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/17882.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #17882 commit fd4c486725ae49d068e67088a185fb4cc229e21f Author: Guoqiang Li <wi...@qq.com> Date: 2017-03-30T14:17:49Z SPARK-20079: Re registration of AM hangs spark cluster in yarn-client mode. commit fa27d7f6da49659c8aa6efe55a3e457e883eb17a Author: Guoqiang Li <wi...@qq.com> Date: 2017-04-04T04:33:45Z review commits commit 6341d31c2961a08db2f36339f5ebe8c814eeb4c7 Author: Guoqiang Li <wi...@qq.com> Date: 2017-04-07T10:32:29Z review commits commit e992df93e7222d5d2bd66d9a2c19984c9b241fd5 Author: Guoqiang Li <wi...@qq.com> Date: 2017-04-23T04:56:58Z Delete "initializing = true" in ExecutorAllocationManager.reset commit 917cf43ffaaeb20347df3c7e480cb75ae87dca83 Author: Guoqiang Li <wi...@qq.com> Date: 2017-05-06T13:28:28Z Add msg: RetrieveLastAllocatedExecutorNumber --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21346: [SPARK-6237][NETWORK] Network-layer changes to al...
Github user witgo commented on a diff in the pull request: https://github.com/apache/spark/pull/21346#discussion_r195284967 --- Diff: common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java --- @@ -220,30 +196,91 @@ public long sendRpc(ByteBuffer message, RpcResponseCallback callback) { handler.addRpcRequest(requestId, callback); channel.writeAndFlush(new RpcRequest(requestId, new NioManagedBuffer(message))) -.addListener(future -> { - if (future.isSuccess()) { -long timeTaken = System.currentTimeMillis() - startTime; -if (logger.isTraceEnabled()) { - logger.trace("Sending request {} to {} took {} ms", requestId, -getRemoteAddress(channel), timeTaken); -} - } else { -String errorMsg = String.format("Failed to send RPC %s to %s: %s", requestId, - getRemoteAddress(channel), future.cause()); -logger.error(errorMsg, future.cause()); -handler.removeRpcRequest(requestId); -channel.close(); -try { - callback.onFailure(new IOException(errorMsg, future.cause())); -} catch (Exception e) { - logger.error("Uncaught exception in RPC response callback handler!", e); -} - } -}); + .addListener(new RpcChannelListener(startTime, requestId, callback)); + +return requestId; + } + + /** + * Send data to the remote end as a stream. This differs from stream() in that this is a request + * to *send* data to the remote end, not to receive it from the remote. + * + * @param meta meta data associated with the stream, which will be read completely on the + * receiving end before the stream itself. + * @param data this will be streamed to the remote end to allow for transferring large amounts + * of data without reading into memory. + * @param callback handles the reply -- onSuccess will only be called when both message and data + * are received successfully. + */ + public long uploadStream( + ManagedBuffer meta, + ManagedBuffer data, + RpcResponseCallback callback) { +long startTime = System.currentTimeMillis(); +if (logger.isTraceEnabled()) { + logger.trace("Sending RPC to {}", getRemoteAddress(channel)); +} + +long requestId = Math.abs(UUID.randomUUID().getLeastSignificantBits()); --- End diff -- This `Math.abs(UUID.randomUUID().getLeastSignificantBits());` is repeated twice. Move it to a separate new method . --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21346: [SPARK-6237][NETWORK] Network-layer changes to al...
Github user witgo commented on a diff in the pull request: https://github.com/apache/spark/pull/21346#discussion_r195287202 --- Diff: common/network-common/src/main/java/org/apache/spark/network/protocol/UploadStream.java --- @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.network.protocol; + +import java.io.IOException; +import java.nio.ByteBuffer; + +import com.google.common.base.Objects; +import io.netty.buffer.ByteBuf; + +import org.apache.spark.network.buffer.ManagedBuffer; +import org.apache.spark.network.buffer.NettyManagedBuffer; + +/** + * An RPC with data that is sent outside of the frame, so it can be read as a stream. + */ +public final class UploadStream extends AbstractMessage implements RequestMessage { --- End diff -- Is it possible to merge UploadStream and RpcRequest into a class? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21451: [SPARK-24296][CORE][WIP] Replicate large blocks a...
Github user witgo commented on a diff in the pull request: https://github.com/apache/spark/pull/21451#discussion_r191628993 --- Diff: common/network-common/src/main/java/org/apache/spark/network/server/RpcHandler.java --- @@ -38,15 +38,24 @@ * * This method will not be called in parallel for a single TransportClient (i.e., channel). * + * The rpc *might* included a data stream in streamData (eg. for uploading a large + * amount of data which should not be buffered in memory here). Any errors while handling the + * streamData will lead to failing this entire connection -- all other in-flight rpcs will fail. + * If stream data is not null, you *must* call streamData.registerStreamCallback + * before this method returns. + * * @param client A channel client which enables the handler to make requests back to the sender * of this RPC. This will always be the exact same object for a particular channel. * @param message The serialized bytes of the RPC. + * @param streamData StreamData if there is data which is meant to be read via a StreamCallback; + * otherwise it is null. * @param callback Callback which should be invoked exactly once upon success or failure of the * RPC. */ public abstract void receive( TransportClient client, ByteBuffer message, + StreamData streamData, --- End diff -- It's not necessary to add a parameter. Change the message parameter to InputStream. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21451: [SPARK-24296][CORE][WIP] Replicate large blocks a...
Github user witgo commented on a diff in the pull request: https://github.com/apache/spark/pull/21451#discussion_r192279111 --- Diff: common/network-common/src/main/java/org/apache/spark/network/server/RpcHandler.java --- @@ -38,15 +38,24 @@ * * This method will not be called in parallel for a single TransportClient (i.e., channel). * + * The rpc *might* included a data stream in streamData (eg. for uploading a large + * amount of data which should not be buffered in memory here). Any errors while handling the + * streamData will lead to failing this entire connection -- all other in-flight rpcs will fail. + * If stream data is not null, you *must* call streamData.registerStreamCallback + * before this method returns. + * * @param client A channel client which enables the handler to make requests back to the sender * of this RPC. This will always be the exact same object for a particular channel. * @param message The serialized bytes of the RPC. + * @param streamData StreamData if there is data which is meant to be read via a StreamCallback; + * otherwise it is null. * @param callback Callback which should be invoked exactly once upon success or failure of the * RPC. */ public abstract void receive( TransportClient client, ByteBuffer message, + StreamData streamData, --- End diff -- What about incorporating parameter `message` into parameter `streamData`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #14658: [WIP][SPARK-5928][SPARK-6238] Remote Shuffle Blocks cann...
Github user witgo commented on the issue: https://github.com/apache/spark/pull/14658 Spark 2.2 has fixed this issue. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org