[GitHub] spark pull request #15505: [SPARK-18890][CORE] Move task serialization from ...

2017-01-13 Thread witgo
Github user witgo commented on a diff in the pull request:

https://github.com/apache/spark/pull/15505#discussion_r96106896
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
 ---
@@ -244,32 +245,45 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
 // Launch tasks returned by a set of resource offers
 private def launchTasks(tasks: Seq[Seq[TaskDescription]]) {
   for (task <- tasks.flatten) {
-val serializedTask = TaskDescription.encode(task)
-if (serializedTask.limit >= maxRpcMessageSize) {
-  scheduler.taskIdToTaskSetManager.get(task.taskId).foreach { 
taskSetMgr =>
-try {
-  var msg = "Serialized task %s:%d was %d bytes, which exceeds 
max allowed: " +
-"spark.rpc.message.maxSize (%d bytes). Consider increasing 
" +
-"spark.rpc.message.maxSize or using broadcast variables 
for large values."
-  msg = msg.format(task.taskId, task.index, 
serializedTask.limit, maxRpcMessageSize)
-  taskSetMgr.abort(msg)
-} catch {
-  case e: Exception => logError("Exception in error callback", 
e)
-}
-  }
+val serializedTask = try {
+  TaskDescription.encode(task)
+} catch {
+  case NonFatal(e) =>
+abortTaskSetManager(scheduler, task.taskId,
+  s"Failed to serialize task ${task.taskId}, not attempting to 
retry it.", Some(e))
+null
 }
-else {
+
+if (serializedTask != null && serializedTask.limit >= 
maxRpcMessageSize) {
+  val msg = "Serialized task %s:%d was %d bytes, which exceeds max 
allowed: " +
+"spark.rpc.message.maxSize (%d bytes). Consider increasing " +
+"spark.rpc.message.maxSize or using broadcast variables for 
large values."
+  abortTaskSetManager(scheduler, task.taskId,
+msg.format(task.taskId, task.index, serializedTask.limit, 
maxRpcMessageSize))
+} else if (serializedTask != null) {
+  if (serializedTask.limit > TaskSetManager.TASK_SIZE_TO_WARN_KB * 
1024) {
--- End diff --

There are two cases when this nested if can be combined into the else if.

1. The following code to appear twice

``` scala
val executorData = executorDataMap(task.executorId)
executorData.freeCores -= scheduler.CPUS_PER_TASK

logDebug(s"Launching task ${task.taskId} on executor id: ${task.executorId} 
" +
s" hostname: ${executorData.executorHost}.")

executorData.executorEndpoint.send(LaunchTask(new 
SerializableBuffer(serializedTask)))

```
2. Use the return value of the if statement to avoid code duplication

```scala
  val launchTask = if (serializedTask != null && serializedTask.limit >= 
maxRpcMessageSize) {
  false
} else if (serializedTask != null &&
  serializedTask.limit > TaskSetManager.TASK_SIZE_TO_WARN_KB * 1024) {
  true
} else {
  true
}
if (launchTask) {
  val executorData = executorDataMap(task.executorId)
  executorData.freeCores -= scheduler.CPUS_PER_TASK

  logDebug(s"Launching task ${task.taskId} on executor id: 
${task.executorId} " +
s" hostname: ${executorData.executorHost}.")

  executorData.executorEndpoint.send(LaunchTask(new 
SerializableBuffer(serializedTask)))
}

```
The existing code is more concise than the above.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15505: [SPARK-18890][CORE] Move task serialization from ...

2017-01-13 Thread witgo
Github user witgo commented on a diff in the pull request:

https://github.com/apache/spark/pull/15505#discussion_r96108161
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
 ---
@@ -244,32 +245,45 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
 // Launch tasks returned by a set of resource offers
 private def launchTasks(tasks: Seq[Seq[TaskDescription]]) {
   for (task <- tasks.flatten) {
-val serializedTask = TaskDescription.encode(task)
-if (serializedTask.limit >= maxRpcMessageSize) {
-  scheduler.taskIdToTaskSetManager.get(task.taskId).foreach { 
taskSetMgr =>
-try {
-  var msg = "Serialized task %s:%d was %d bytes, which exceeds 
max allowed: " +
-"spark.rpc.message.maxSize (%d bytes). Consider increasing 
" +
-"spark.rpc.message.maxSize or using broadcast variables 
for large values."
-  msg = msg.format(task.taskId, task.index, 
serializedTask.limit, maxRpcMessageSize)
-  taskSetMgr.abort(msg)
-} catch {
-  case e: Exception => logError("Exception in error callback", 
e)
-}
-  }
+val serializedTask = try {
+  TaskDescription.encode(task)
+} catch {
+  case NonFatal(e) =>
+abortTaskSetManager(scheduler, task.taskId,
--- End diff --

Yes,  it's a big issue. 
We can first verify how much performance is lost by we first have to create 
all the serialized tasks to make sure they all work.  Maybe no performance 
degradation.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15505: [SPARK-18890][CORE] Move task serialization from ...

2017-01-10 Thread witgo
Github user witgo commented on a diff in the pull request:

https://github.com/apache/spark/pull/15505#discussion_r95492638
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskDescription.scala ---
@@ -52,7 +55,36 @@ private[spark] class TaskDescription(
 val addedFiles: Map[String, Long],
 val addedJars: Map[String, Long],
 val properties: Properties,
-val serializedTask: ByteBuffer) {
+private var serializedTask_ : ByteBuffer,
+private var task_ : Task[_] = null) extends  Logging {
+
+  def this(
+  taskId: Long,
+  attemptNumber: Int,
+  executorId: String,
+  name: String,
+  index: Int, // Index within this task's TaskSet
+  addedFiles: Map[String, Long],
+  addedJars: Map[String, Long],
+  properties: Properties,
+  task: Task[_]) {
+this(taskId, attemptNumber, executorId, name, index,
+  addedFiles, addedJars, properties, null, task)
+  }
+
+  lazy val serializedTask: ByteBuffer = {
+if (serializedTask_ == null) {
+  serializedTask_ = try {
+ByteBuffer.wrap(Utils.serialize(task_))
+  } catch {
+case NonFatal(e) =>
+  val msg = s"Failed to serialize task $taskId, not attempting to 
retry it."
+  logError(msg, e)
+  throw new TaskNotSerializableException(e)
+  }
+}
+serializedTask_
--- End diff --

Ok, I agree with your, I will modify the code.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15505: [SPARK-18890][CORE] Move task serialization from ...

2017-01-09 Thread witgo
Github user witgo commented on a diff in the pull request:

https://github.com/apache/spark/pull/15505#discussion_r95298151
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskDescription.scala ---
@@ -52,7 +55,36 @@ private[spark] class TaskDescription(
 val addedFiles: Map[String, Long],
 val addedJars: Map[String, Long],
 val properties: Properties,
-val serializedTask: ByteBuffer) {
+private var serializedTask_ : ByteBuffer,
+private var task_ : Task[_] = null) extends  Logging {
--- End diff --

@squito
I agree with you, I will modify it.

@kayousterhout 
The main reason for this is to reduce the amount of code changes, we need 
to change more code files to implement what you say.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15505: [SPARK-18890][CORE] Move task serialization from ...

2017-01-12 Thread witgo
Github user witgo commented on a diff in the pull request:

https://github.com/apache/spark/pull/15505#discussion_r95933009
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskDescription.scala ---
@@ -52,7 +55,43 @@ private[spark] class TaskDescription(
 val addedFiles: Map[String, Long],
 val addedJars: Map[String, Long],
 val properties: Properties,
-val serializedTask: ByteBuffer) {
+private var serializedTask_ : ByteBuffer) extends  Logging {
--- End diff --

Another implementation:

https://github.com/witgo/spark/commit/4fbf30a568ed61982e17757f9df3c35cb9d64871


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15505: [SPARK-18890][CORE] Move task serialization from ...

2017-01-12 Thread witgo
Github user witgo commented on a diff in the pull request:

https://github.com/apache/spark/pull/15505#discussion_r95753220
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskDescription.scala ---
@@ -52,7 +55,43 @@ private[spark] class TaskDescription(
 val addedFiles: Map[String, Long],
 val addedJars: Map[String, Long],
 val properties: Properties,
-val serializedTask: ByteBuffer) {
+private var serializedTask_ : ByteBuffer) extends  Logging {
--- End diff --

How about this?

``` scala
private[spark] class TaskDescription(
val taskId: Long,
val attemptNumber: Int,
val executorId: String,
val name: String,
val index: Int,// Index within this task's TaskSet
val addedFiles: Map[String, Long],
val addedJars: Map[String, Long],
val properties: Properties,
private var serializedTask_ : ByteBuffer) extends  Logging {

  def this(
  taskId: Long,
  attemptNumber: Int,
  executorId: String,
  name: String,
  index: Int, // Index within this task's TaskSet
  addedFiles: Map[String, Long],
  addedJars: Map[String, Long],
  properties: Properties,
  task: Task[_]) {
  this(taskId, attemptNumber, executorId, name, index,
addedFiles, addedJars, properties, null.asInstanceOf[ByteBuffer])
  task_ = task
  }

  private var task_ : Task[_] = null

  private  def serializedTask: ByteBuffer = {
if (serializedTask_ == null) {
  // This is where we serialize the task on the driver before sending 
it to the executor.
  // This is not done when creating the TaskDescription so we can 
postpone this serialization
  // to later in the scheduling process -- particularly,
  // so it can happen in another thread by the 
CoarseGrainedSchedulerBackend.
  // On the executors, this will already be populated by decode
  serializedTask_ = try {
ByteBuffer.wrap(Utils.serialize(task_))
  } catch {
case NonFatal(e) =>
  val msg = s"Failed to serialize task $taskId, not attempting to 
retry it."
  logError(msg, e)
  throw new TaskNotSerializableException(e)
  }
}
serializedTask_
  }

  def getTask[_](loader: ClassLoader): Task[_] = {
if (task_ == null) {
  task_ = Utils.deserialize(serializedTask, 
loader).asInstanceOf[Task[_]]
}
return task_
  }

  override def toString: String = "TaskDescription(TID=%d, 
index=%d)".format(taskId, index)
}
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15505: [SPARK-18890][CORE] Move task serialization from ...

2017-01-09 Thread witgo
Github user witgo commented on a diff in the pull request:

https://github.com/apache/spark/pull/15505#discussion_r95304052
  
--- Diff: core/src/main/scala/org/apache/spark/util/Utils.scala ---
@@ -149,7 +149,12 @@ private[spark] object Utils extends Logging {
 
   /** Deserialize an object using Java serialization and the given 
ClassLoader */
   def deserialize[T](bytes: Array[Byte], loader: ClassLoader): T = {
-val bis = new ByteArrayInputStream(bytes)
+deserialize(ByteBuffer.wrap(bytes), loader)
+  }
+
+  /** Deserialize an object using Java serialization and the given 
ClassLoader */
+  def deserialize[T](bytes: ByteBuffer, loader: ClassLoader): T = {
+val bis = new ByteBufferInputStream(bytes)
--- End diff --

I don't think there's a problem here. it is covered by many test cases.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15505: [SPARK-18890][CORE] Move task serialization from the Tas...

2017-01-09 Thread witgo
Github user witgo commented on the issue:

https://github.com/apache/spark/pull/15505
  
@squito In the local mode, the performance is relatively less important, we 
only guarantee that there will be no performance degradation on it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15505: [SPARK-18890][CORE] Move task serialization from ...

2017-01-09 Thread witgo
Github user witgo commented on a diff in the pull request:

https://github.com/apache/spark/pull/15505#discussion_r95305125
  
--- Diff: 
core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala ---
@@ -592,47 +579,6 @@ class TaskSetManagerSuite extends SparkFunSuite with 
LocalSparkContext with Logg
 assert(manager.resourceOffer("execB", "host2", RACK_LOCAL).get.index 
=== 1)
   }
 
-  test("do not emit warning when serialized task is small") {
--- End diff --

Ok, I will add this test case back.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15505: [SPARK-17931][CORE] taskScheduler has some unneeded seri...

2016-11-29 Thread witgo
Github user witgo commented on the issue:

https://github.com/apache/spark/pull/15505
  
@kayousterhout I agree with you, and do as you say.
@squito These a good idea, and worth a try. We can write a prototype to 
verify it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15505: [SPARK-17931][CORE] taskScheduler has some unneeded seri...

2017-01-06 Thread witgo
Github user witgo commented on the issue:

https://github.com/apache/spark/pull/15505
  
@kayousterhout 
Okay, I'll do the code revision this weekend.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15505: [WIP][SPARK-18890][CORE] Multithreading serialization ta...

2017-01-08 Thread witgo
Github user witgo commented on the issue:

https://github.com/apache/spark/pull/15505
  
@kayousterhout  OK, Done


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15505: [SPARK-17931][CORE] taskScheduler has some unneed...

2016-12-21 Thread witgo
Github user witgo commented on a diff in the pull request:

https://github.com/apache/spark/pull/15505#discussion_r93558564
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/local/LocalSchedulerBackend.scala
 ---
@@ -59,6 +62,12 @@ private[spark] class LocalEndpoint(
   private val executor = new Executor(
 localExecutorId, localExecutorHostname, SparkEnv.get, userClassPath, 
isLocal = true)
 
+  private val serializer = new ThreadLocal[SerializerInstance] {
--- End diff --

LocalEndpoint does not support multiple threads,the ThreadLocal is not 
necessary.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15505: [SPARK-17931][CORE] taskScheduler has some unneed...

2016-12-21 Thread witgo
Github user witgo commented on a diff in the pull request:

https://github.com/apache/spark/pull/15505#discussion_r93558472
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskDescription.scala ---
@@ -17,27 +17,179 @@
 
 package org.apache.spark.scheduler
 
+import java.io._
 import java.nio.ByteBuffer
+import java.util.Properties
 
-import org.apache.spark.util.SerializableBuffer
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.spark.serializer.SerializerInstance
+import org.apache.spark.util.{ByteBufferInputStream, 
ByteBufferOutputStream}
 
 /**
  * Description of a task that gets passed onto executors to be executed, 
usually created by
  * [[TaskSetManager.resourceOffer]].
  */
-private[spark] class TaskDescription(
+private[spark] class TaskDescription private(
 val taskId: Long,
 val attemptNumber: Int,
 val executorId: String,
 val name: String,
-val index: Int,// Index within this task's TaskSet
-_serializedTask: ByteBuffer)
-  extends Serializable {
+val index: Int,
--- End diff --

Done


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15505: [SPARK-17931][CORE] taskScheduler has some unneed...

2016-12-21 Thread witgo
Github user witgo commented on a diff in the pull request:

https://github.com/apache/spark/pull/15505#discussion_r93558527
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskDescription.scala ---
@@ -17,27 +17,179 @@
 
 package org.apache.spark.scheduler
 
+import java.io._
 import java.nio.ByteBuffer
+import java.util.Properties
 
-import org.apache.spark.util.SerializableBuffer
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.spark.serializer.SerializerInstance
+import org.apache.spark.util.{ByteBufferInputStream, 
ByteBufferOutputStream}
 
 /**
  * Description of a task that gets passed onto executors to be executed, 
usually created by
  * [[TaskSetManager.resourceOffer]].
  */
-private[spark] class TaskDescription(
+private[spark] class TaskDescription private(
 val taskId: Long,
 val attemptNumber: Int,
 val executorId: String,
 val name: String,
-val index: Int,// Index within this task's TaskSet
-_serializedTask: ByteBuffer)
-  extends Serializable {
+val index: Int,
+val taskFiles: mutable.Map[String, Long],
+val taskJars: mutable.Map[String, Long],
+  private var task_ : Task[_],
+  private var taskBytes: InputStream,
+  private var taskProps: Properties) {
+
+  def this(
+  taskId: Long,
+  attemptNumber: Int,
+  executorId: String,
+  name: String,
+  index: Int,
+  taskFiles: mutable.Map[String, Long],
+  taskJars: mutable.Map[String, Long],
+  task: Task[_]) {
+this(taskId, attemptNumber, executorId, name, index, taskFiles, 
taskJars, task,
+  null.asInstanceOf[InputStream],
+  null.asInstanceOf[Properties])
+  }
+
+  @throws[IOException]
+  def encode(serializer: SerializerInstance): ByteBuffer = {
+val out = new ByteBufferOutputStream(4096)
+encode(out, serializer)
+out.close()
+out.toByteBuffer
+  }
+
+  @throws[IOException]
+  def encode(outputStream: OutputStream, serializer: SerializerInstance): 
Unit = {
+val out = new DataOutputStream(outputStream)
+// Write taskId
+out.writeLong(taskId)
+
+// Write attemptNumber
+out.writeInt(attemptNumber)
+
+// Write executorId
+out.writeUTF(executorId)
+
+// Write name
+out.writeUTF(name)
 
-  // Because ByteBuffers are not serializable, wrap the task in a 
SerializableBuffer
-  private val buffer = new SerializableBuffer(_serializedTask)
+// Write index
+out.writeInt(index)
 
-  def serializedTask: ByteBuffer = buffer.value
+// Write taskFiles
+out.writeInt(taskFiles.size)
+for ((name, timestamp) <- taskFiles) {
+  out.writeUTF(name)
+  out.writeLong(timestamp)
+}
+
+// Write taskJars
+out.writeInt(taskJars.size)
+for ((name, timestamp) <- taskJars) {
+  out.writeUTF(name)
+  out.writeLong(timestamp)
+}
+
+// Write the task properties separately so it is available before full 
task deserialization.
+val taskProps = task_.localProperties
+val propertyNames = taskProps.stringPropertyNames
+out.writeInt(propertyNames.size())
+propertyNames.asScala.foreach { key =>
+  val value = taskProps.getProperty(key)
+  out.writeUTF(key)
+  out.writeUTF(value)
+}
+
+// Write the task itself and finish
+val serializeStream = serializer.serializeStream(out)
+serializeStream.writeValue(task_)
+serializeStream.flush()
+  }
+
+  def task(ser: SerializerInstance): Task[_] = {
+if (task_ == null) {
+  val deserializeStream = ser.deserializeStream(taskBytes)
+  task_ = deserializeStream.readValue[Task[_]]()
+  task_.localProperties = taskProps
+  deserializeStream.close()
+  taskProps = null
+  taskBytes = null
+}
+task_
--- End diff --

The intent of this code modification is to move the deserialization 
operation to the TaskDescription class. so that taskBytes can be set to private.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15505: [SPARK-17931][CORE] taskScheduler has some unneed...

2016-12-21 Thread witgo
Github user witgo commented on a diff in the pull request:

https://github.com/apache/spark/pull/15505#discussion_r93558450
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
---
@@ -993,6 +993,12 @@ class DAGScheduler(
   JavaUtils.bufferToArray(closureSerializer.serialize((stage.rdd, 
stage.func): AnyRef))
   }
 
+  if (taskBinaryBytes.length > TaskSetManager.TASK_SIZE_TO_WARN_KB * 
1024) {
--- End diff --

 
The original code version seems to be warning about full task, including 
rdd, shuffle dep, closures, partition and so on.
I think that the previous warning code was retained because the #1498 did 
not completely modify it,
In most cases, the size of  serialized TaskDescription is smaller than 
tasks`s Broadcast.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15505: [SPARK-17931][CORE] taskScheduler has some unneeded seri...

2016-12-21 Thread witgo
Github user witgo commented on the issue:

https://github.com/apache/spark/pull/15505
  
@kayousterhout @squito 
I think Kay's approach is a good idea.   
We can first merging #16053, SPARK-18890 related code(including 
multi-threaded serialization TaskDescription) to stay in the PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15505: [SPARK-17931][CORE] taskScheduler has some unneed...

2016-12-21 Thread witgo
Github user witgo commented on a diff in the pull request:

https://github.com/apache/spark/pull/15505#discussion_r93558401
  
--- Diff: core/src/main/scala/org/apache/spark/executor/Executor.scala ---
@@ -136,14 +136,10 @@ private[spark] class Executor(
   startDriverHeartbeater()
 
   def launchTask(
-  context: ExecutorBackend,
-  taskId: Long,
-  attemptNumber: Int,
-  taskName: String,
-  serializedTask: ByteBuffer): Unit = {
-val tr = new TaskRunner(context, taskId = taskId, attemptNumber = 
attemptNumber, taskName,
-  serializedTask)
-runningTasks.put(taskId, tr)
+context: ExecutorBackend,
+taskDesc: TaskDescription): Unit = {
--- End diff --

Done


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15505: [SPARK-17931][CORE] taskScheduler has some unneed...

2016-12-21 Thread witgo
Github user witgo commented on a diff in the pull request:

https://github.com/apache/spark/pull/15505#discussion_r93558431
  
--- Diff: core/src/main/scala/org/apache/spark/executor/Executor.scala ---
@@ -232,6 +225,13 @@ private[spark] class Executor(
 }
 
 override def run(): Unit = {
+  val taskId = taskDesc.taskId
+  val attemptNumber = taskDesc.attemptNumber
+  val taskName = taskDesc.name
+  val taskFiles = taskDesc.taskFiles
+  val taskJars = taskDesc.taskJars
+  val taskProps = taskDesc.taskProperties
--- End diff --

If use `taskDesc.taskId`, we need to modify a lot of code. eg:
```scala
val errMsg =
  s"${releasedLocks.size} block locks were not released by TID 
= $taskId:\n" +
releasedLocks.mkString("[", ", ", "]")
```
The length of this line of  `s"${releasedLocks.size} block locks were not 
released by TID = ${taskDesc.taskId}:\n"` is greater than 100

How do I move the code to:

```scala
  class TaskRunner(
  execBackend: ExecutorBackend,
  val taskDesc: TaskDescription)
extends Runnable {

val taskId = taskDesc.taskId
val attemptNumber = taskDesc.attemptNumber
val taskName = taskDesc.name
val taskFiles = taskDesc.taskFiles
val taskJars = taskDesc.taskJars
val taskProps = taskDesc.taskProperties
```



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17329: [SPARK-19991]FileSegmentManagedBuffer performance...

2017-03-24 Thread witgo
Github user witgo commented on a diff in the pull request:

https://github.com/apache/spark/pull/17329#discussion_r107841105
  
--- Diff: 
common/network-common/src/main/java/org/apache/spark/network/buffer/FileSegmentManagedBuffer.java
 ---
@@ -37,13 +37,24 @@
  * A {@link ManagedBuffer} backed by a segment in a file.
  */
 public final class FileSegmentManagedBuffer extends ManagedBuffer {
-  private final TransportConf conf;
+  private final boolean lazyFileDescriptor;
+  private final int memoryMapBytes;
   private final File file;
   private final long offset;
   private final long length;
 
   public FileSegmentManagedBuffer(TransportConf conf, File file, long 
offset, long length) {
-this.conf = conf;
+this(conf.lazyFileDescriptor(), conf.memoryMapBytes(), file, offset, 
length);
+  }
+
+  public FileSegmentManagedBuffer(
--- End diff --

Yes,  But the above code does not optimize performance, 
`FileSegmentManagedBuffer.convertToNetty` method  is also called only once  in 
the master branch code . 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17329: [SPARK-19991]FileSegmentManagedBuffer performance...

2017-03-24 Thread witgo
Github user witgo commented on a diff in the pull request:

https://github.com/apache/spark/pull/17329#discussion_r108027203
  
--- Diff: 
common/network-common/src/main/java/org/apache/spark/network/buffer/FileSegmentManagedBuffer.java
 ---
@@ -37,13 +37,24 @@
  * A {@link ManagedBuffer} backed by a segment in a file.
  */
 public final class FileSegmentManagedBuffer extends ManagedBuffer {
-  private final TransportConf conf;
+  private final boolean lazyFileDescriptor;
+  private final int memoryMapBytes;
   private final File file;
   private final long offset;
   private final long length;
 
   public FileSegmentManagedBuffer(TransportConf conf, File file, long 
offset, long length) {
-this.conf = conf;
+this(conf.lazyFileDescriptor(), conf.memoryMapBytes(), file, offset, 
length);
+  }
+
+  public FileSegmentManagedBuffer(
--- End diff --

This branch [SPARK-19991_try2 
](https://github.com/witgo/spark/commits/SPARK-19991_try2) needs `244.45` s in 
my test


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17329: [SPARK-19991]FileSegmentManagedBuffer performance...

2017-03-25 Thread witgo
Github user witgo commented on a diff in the pull request:

https://github.com/apache/spark/pull/17329#discussion_r108032378
  
--- Diff: 
common/network-common/src/main/java/org/apache/spark/network/buffer/FileSegmentManagedBuffer.java
 ---
@@ -37,13 +37,24 @@
  * A {@link ManagedBuffer} backed by a segment in a file.
  */
 public final class FileSegmentManagedBuffer extends ManagedBuffer {
-  private final TransportConf conf;
+  private final boolean lazyFileDescriptor;
+  private final int memoryMapBytes;
   private final File file;
   private final long offset;
   private final long length;
 
   public FileSegmentManagedBuffer(TransportConf conf, File file, long 
offset, long length) {
-this.conf = conf;
+this(conf.lazyFileDescriptor(), conf.memoryMapBytes(), file, offset, 
length);
+  }
+
+  public FileSegmentManagedBuffer(
--- End diff --

Suppose there are E Executor in the cluster, a shuffle process has M Map 
task, R reduce task, in the master branch will be created:

1. Up to M * R FileSegmentManagedBuffer instances
2. Up to 2 * M * R NoSuchElementException instances

in this PR will be created:

1. Up to M * R FileSegmentManagedBuffer instances
2. Up to 2 * NoSuchElementException instances (ExternalShuffleBlockResolver 
and IndexShuffleBlockResolver are created once executor starts and They call 
the new constructor to create a FileSegmentManagedBuffer instance)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17329: [SPARK-19991]FileSegmentManagedBuffer performance...

2017-03-30 Thread witgo
Github user witgo closed the pull request at:

https://github.com/apache/spark/pull/17329


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17480: SPARK-20079: Re registration of AM hangs spark cl...

2017-03-30 Thread witgo
GitHub user witgo opened a pull request:

https://github.com/apache/spark/pull/17480

SPARK-20079: Re registration of AM hangs spark cluster in yarn-client mode.

When there is some need of task scheduling, `ExecutorAllocationManager` 
instances do not reset the `initializing` field 

## How was this patch tested?
Unit tests.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/witgo/spark SPARK-20079

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/17480.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #17480


commit b91dfeb4fea445727f6b5430aa947f35a287d56d
Author: Guoqiang Li <wi...@qq.com>
Date:   2017-03-30T14:17:49Z

SPARK-20079: Re registration of AM hangs spark cluster in yarn-client mode.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17329: [SPARK-19991]FileSegmentManagedBuffer performance...

2017-03-23 Thread witgo
Github user witgo commented on a diff in the pull request:

https://github.com/apache/spark/pull/17329#discussion_r107706851
  
--- Diff: 
common/network-common/src/main/java/org/apache/spark/network/buffer/FileSegmentManagedBuffer.java
 ---
@@ -37,13 +37,24 @@
  * A {@link ManagedBuffer} backed by a segment in a file.
  */
 public final class FileSegmentManagedBuffer extends ManagedBuffer {
-  private final TransportConf conf;
+  private final boolean lazyFileDescriptor;
+  private final int memoryMapBytes;
   private final File file;
   private final long offset;
   private final long length;
 
   public FileSegmentManagedBuffer(TransportConf conf, File file, long 
offset, long length) {
-this.conf = conf;
+this(conf.lazyFileDescriptor(), conf.memoryMapBytes(), file, offset, 
length);
+  }
+
+  public FileSegmentManagedBuffer(
--- End diff --

Like the following code?

```java
  public FileSegmentManagedBuffer(TransportConf conf, File file, long 
offset, long length) {
this.lazyFileDescriptor = conf.lazyFileDescriptor();
this.memoryMapBytes = conf.memoryMapBytes();
this.file = file;
this.offset = offset;
this.length = length;
  }
```

the code `conf.lazyFileDescriptor();` or `conf.memoryMapBytes();`  creates 
a NoSuchElementException instance.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17329: [SPARK-19991]FileSegmentManagedBuffer performance...

2017-03-22 Thread witgo
Github user witgo commented on a diff in the pull request:

https://github.com/apache/spark/pull/17329#discussion_r107572297
  
--- Diff: 
common/network-common/src/main/java/org/apache/spark/network/buffer/FileSegmentManagedBuffer.java
 ---
@@ -37,13 +37,24 @@
  * A {@link ManagedBuffer} backed by a segment in a file.
  */
 public final class FileSegmentManagedBuffer extends ManagedBuffer {
-  private final TransportConf conf;
+  private final boolean lazyFileDescriptor;
+  private final int memoryMapBytes;
   private final File file;
   private final long offset;
   private final long length;
 
   public FileSegmentManagedBuffer(TransportConf conf, File file, long 
offset, long length) {
-this.conf = conf;
+this(conf.lazyFileDescriptor(), conf.memoryMapBytes(), file, offset, 
length);
+  }
+
+  public FileSegmentManagedBuffer(
--- End diff --

That will change a lot of code, right?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17329: [SPARK-19991]FileSegmentManagedBuffer performance...

2017-03-25 Thread witgo
Github user witgo commented on a diff in the pull request:

https://github.com/apache/spark/pull/17329#discussion_r108049460
  
--- Diff: 
common/network-common/src/main/java/org/apache/spark/network/buffer/FileSegmentManagedBuffer.java
 ---
@@ -37,13 +37,24 @@
  * A {@link ManagedBuffer} backed by a segment in a file.
  */
 public final class FileSegmentManagedBuffer extends ManagedBuffer {
-  private final TransportConf conf;
+  private final boolean lazyFileDescriptor;
+  private final int memoryMapBytes;
   private final File file;
   private final long offset;
   private final long length;
 
   public FileSegmentManagedBuffer(TransportConf conf, File file, long 
offset, long length) {
-this.conf = conf;
+this(conf.lazyFileDescriptor(), conf.memoryMapBytes(), file, offset, 
length);
+  }
+
+  public FileSegmentManagedBuffer(
--- End diff --

Sorry,I didn't get your idea. Can you write some code?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17329: [SPARK-19991]FileSegmentManagedBuffer performance...

2017-03-18 Thread witgo
Github user witgo commented on a diff in the pull request:

https://github.com/apache/spark/pull/17329#discussion_r106781598
  
--- Diff: 
common/network-common/src/main/java/org/apache/spark/network/buffer/FileSegmentManagedBuffer.java
 ---
@@ -37,13 +37,24 @@
  * A {@link ManagedBuffer} backed by a segment in a file.
  */
 public final class FileSegmentManagedBuffer extends ManagedBuffer {
-  private final TransportConf conf;
+  private final boolean lazyFileDescriptor;
+  private final int memoryMapBytes;
   private final File file;
   private final long offset;
   private final long length;
 
   public FileSegmentManagedBuffer(TransportConf conf, File file, long 
offset, long length) {
-this.conf = conf;
+this(conf.lazyFileDescriptor(), conf.memoryMapBytes(), file, offset, 
length);
+  }
+
+  public FileSegmentManagedBuffer(
--- End diff --

Oh, do you have a better idea?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17329: [SPARK-19991]FileSegmentManagedBuffer performance improv...

2017-03-17 Thread witgo
Github user witgo commented on the issue:

https://github.com/apache/spark/pull/17329
  
```java

public class HadoopConfigProvider extends ConfigProvider {
  private final Configuration conf;

  public HadoopConfigProvider(Configuration conf) {
this.conf = conf;
  }

  @Override
  public String get(String name) {
String value = conf.get(name);
// When do not set the value of spark.storage.memoryMapThreshold or 
spark.shuffle.io.lazyFD,
   //  When the value of `value` is null
if (value == null) {
  throw new NoSuchElementException(name);
}
return value;
  }

  @Override
  public Iterable<Map.Entry<String, String>> getAll() {
return conf;
  }

}




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17329: [SPARK-19991]FileSegmentManagedBuffer performance...

2017-03-16 Thread witgo
GitHub user witgo opened a pull request:

https://github.com/apache/spark/pull/17329

[SPARK-19991]FileSegmentManagedBuffer performance improvement

FileSegmentManagedBuffer performance improvement.


## What changes were proposed in this pull request?

When we do not set the value of the configuration items 
`spark.storage.memoryMapThreshold` and `spark.shuffle.io.lazyFD`, 
each call to the cFileSegmentManagedBuffer.nioByteBuffer or 
FileSegmentManagedBuffer.createInputStream method creates a 
NoSuchElementException instance. This is a more time-consuming operation.

In the use case, this PR can improve the performance of about 3.5%

The test code:

``` scala

(1 to 10).foreach { i =>
  val numPartition = 1
  val rdd = sc.parallelize(0 until 
numPartition).repartition(numPartition).flatMap { t =>
(0 until numPartition).map(r => r * numPartition + t)
  }.repartition(numPartition)
  val serializeStart = System.currentTimeMillis()
  rdd.sum()
  val serializeFinish = System.currentTimeMillis()
  println(f"Test $i: ${(serializeFinish - serializeStart) / 1000D}%1.2f")
}


```

and `spark-defaults.conf` file:

```
spark.master  yarn-client
spark.executor.instances  20
spark.driver.memory   64g
spark.executor.memory 30g
spark.executor.cores  5
spark.default.parallelism 100 
spark.sql.shuffle.partitions  100
spark.serializer  
org.apache.spark.serializer.KryoSerializer
spark.driver.maxResultSize0
spark.ui.enabled  false 
spark.driver.extraJavaOptions -XX:+UseG1GC 
-XX:+UseStringDeduplication -XX:G1HeapRegionSize=16M -XX:MetaspaceSize=512M 
spark.executor.extraJavaOptions   -XX:+UseG1GC 
-XX:+UseStringDeduplication -XX:G1HeapRegionSize=16M -XX:MetaspaceSize=256M 
spark.cleaner.referenceTracking.blocking  true
spark.cleaner.referenceTracking.blocking.shuffle  true

```

The test results are as follows

| [SPARK-19991](https://github.com/witgo/spark/tree/SPARK-19991) 
|https://github.com/apache/spark/commit/68ea290b3aa89b2a539d13ea2c18bdb5a651b2bf|
|---| --- | 
|226.09 s| 235.21 s|

## How was this patch tested?

Existing tests.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/witgo/spark SPARK-19991

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/17329.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #17329


commit abcfc79991ecd1d5cef2cd1e275b872695ba19d9
Author: Guoqiang Li <liguoqia...@huawei.com>
Date:   2017-03-17T03:19:37Z

FileSegmentManagedBuffer performance improvement




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17480: [SPARK-20079][Core][yarn] Re registration of AM h...

2017-04-03 Thread witgo
Github user witgo commented on a diff in the pull request:

https://github.com/apache/spark/pull/17480#discussion_r109575470
  
--- Diff: 
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala ---
@@ -249,7 +249,9 @@ private[spark] class ExecutorAllocationManager(
* yarn-client mode when AM re-registers after a failure.
*/
   def reset(): Unit = synchronized {
-initializing = true
+if (maxNumExecutorsNeeded() == 0) {
--- End diff --

Done.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17480: [SPARK-20079][Core][yarn] Re registration of AM h...

2017-04-10 Thread witgo
Github user witgo commented on a diff in the pull request:

https://github.com/apache/spark/pull/17480#discussion_r110796578
  
--- Diff: 
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala ---
@@ -249,7 +249,14 @@ private[spark] class ExecutorAllocationManager(
* yarn-client mode when AM re-registers after a failure.
*/
   def reset(): Unit = synchronized {
-initializing = true
+/**
+ * When some tasks need to be scheduled and initial executor = 0, 
resetting the initializing
+ * field may cause it to not be set to false in yarn.
+ * SPARK-20079: https://issues.apache.org/jira/browse/SPARK-20079
+ */
+if (maxNumExecutorsNeeded() == 0) {
+  initializing = true
--- End diff --

@jerryshao  Can you explain the following  comments? I do not understand.
```scala
 if (initializing) {
  // Do not change our target while we are still initializing,
  // Otherwise the first job may have to ramp up unnecessarily
  0
} else if (maxNeeded < numExecutorsTarget) {
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17480: [SPARK-20079][Core][yarn] Re registration of AM hangs sp...

2017-03-31 Thread witgo
Github user witgo commented on the issue:

https://github.com/apache/spark/pull/17480
  
@jerryshao  Yes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17480: [SPARK-20079][Core][yarn] Re registration of AM h...

2017-04-12 Thread witgo
Github user witgo commented on a diff in the pull request:

https://github.com/apache/spark/pull/17480#discussion_r53390
  
--- Diff: 
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala ---
@@ -249,7 +249,14 @@ private[spark] class ExecutorAllocationManager(
* yarn-client mode when AM re-registers after a failure.
*/
   def reset(): Unit = synchronized {
-initializing = true
+/**
+ * When some tasks need to be scheduled and initial executor = 0, 
resetting the initializing
+ * field may cause it to not be set to false in yarn.
+ * SPARK-20079: https://issues.apache.org/jira/browse/SPARK-20079
+ */
+if (maxNumExecutorsNeeded() == 0) {
+  initializing = true
--- End diff --

OK. I've got it, thx.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17480: [SPARK-20079][Core][yarn] Re registration of AM h...

2017-04-07 Thread witgo
Github user witgo commented on a diff in the pull request:

https://github.com/apache/spark/pull/17480#discussion_r110361779
  
--- Diff: 
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala ---
@@ -249,7 +249,14 @@ private[spark] class ExecutorAllocationManager(
* yarn-client mode when AM re-registers after a failure.
*/
   def reset(): Unit = synchronized {
-initializing = true
+/**
+ * When some tasks need to be scheduled, resetting the initializing 
field may cause
+ * it to not be set to false in yarn.
--- End diff --

Currently this method will only be called in yarn-client mode when AM 
re-registers after a failure.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17480: [SPARK-20079][Core][yarn] Re registration of AM h...

2017-04-10 Thread witgo
Github user witgo commented on a diff in the pull request:

https://github.com/apache/spark/pull/17480#discussion_r110804557
  
--- Diff: 
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala ---
@@ -249,7 +249,14 @@ private[spark] class ExecutorAllocationManager(
* yarn-client mode when AM re-registers after a failure.
*/
   def reset(): Unit = synchronized {
-initializing = true
+/**
+ * When some tasks need to be scheduled and initial executor = 0, 
resetting the initializing
+ * field may cause it to not be set to false in yarn.
+ * SPARK-20079: https://issues.apache.org/jira/browse/SPARK-20079
+ */
+if (maxNumExecutorsNeeded() == 0) {
+  initializing = true
--- End diff --

The following code should have a similar function?

```scala
numExecutorsTarget = initialNumExecutors // The default value is 0
numExecutorsToAdd = 1
```
The incoming parameters of the client.requestTotalExecutors method are 
1,2,4,8,16...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17480: [SPARK-20079][Core][yarn] Re registration of AM hangs sp...

2017-04-19 Thread witgo
Github user witgo commented on the issue:

https://github.com/apache/spark/pull/17480
  
OK, I will do the work at weekends.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17480: [SPARK-20079][Core][yarn] Re registration of AM h...

2017-04-22 Thread witgo
Github user witgo commented on a diff in the pull request:

https://github.com/apache/spark/pull/17480#discussion_r112825043
  
--- Diff: 
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala ---
@@ -249,7 +249,6 @@ private[spark] class ExecutorAllocationManager(
* yarn-client mode when AM re-registers after a failure.
*/
   def reset(): Unit = synchronized {
-initializing = true
--- End diff --

@jerryshao  @vanzin
 I think that deleting the `initializing = true` is a good idea.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17116: [SPARK-18890][CORE](try 2) Move task serializatio...

2017-03-04 Thread witgo
Github user witgo closed the pull request at:

https://github.com/apache/spark/pull/17116


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17139: [SPARK-18890][CORE](try 3) Move task serialization from ...

2017-03-08 Thread witgo
Github user witgo commented on the issue:

https://github.com/apache/spark/pull/17139
  
@kayousterhout The test report has been updated.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15505: [SPARK-18890][CORE] Move task serialization from ...

2017-03-02 Thread witgo
Github user witgo closed the pull request at:

https://github.com/apache/spark/pull/15505


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15505: [SPARK-18890][CORE] Move task serialization from the Tas...

2017-03-02 Thread witgo
Github user witgo commented on the issue:

https://github.com/apache/spark/pull/15505
  
Yes, maybe a multithreaded serialization task code can have a better 
performance, let me close the PR


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15505: [SPARK-18890][CORE] Move task serialization from the Tas...

2017-03-02 Thread witgo
Github user witgo commented on the issue:

https://github.com/apache/spark/pull/15505
  

[SPARK-18890_20170303](https://github.com/witgo/spark/commits/SPARK-18890_20170303)
 `s code  is older but the test case running time is 5.2 s


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17139: [SPARK-18890][CORE](try 3) Move task serialization from ...

2017-03-07 Thread witgo
Github user witgo commented on the issue:

https://github.com/apache/spark/pull/17139
  
ping @kayousterhout  @squito


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17139: [SPARK-18890][CORE](try 3) Move task serialization from ...

2017-03-07 Thread witgo
Github user witgo commented on the issue:

https://github.com/apache/spark/pull/17139
  

Added the multi-threaded code for serialization `TaskDescription` .


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17567: [SPARK-19991][CORE][YARN] FileSegmentManagedBuffer perfo...

2017-04-07 Thread witgo
Github user witgo commented on the issue:

https://github.com/apache/spark/pull/17567
  
LGTM.  
Are there any performance test reports?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17567: [SPARK-19991][CORE][YARN] FileSegmentManagedBuffer perfo...

2017-04-08 Thread witgo
Github user witgo commented on the issue:

https://github.com/apache/spark/pull/17567
  
 OK, I see.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17480: [SPARK-20079][Core][yarn] Re registration of AM hangs sp...

2017-03-31 Thread witgo
Github user witgo commented on the issue:

https://github.com/apache/spark/pull/17480
  
The ExecutorAllocationManager.reset method is called when re-registering 
AM, which sets the ExecutorAllocationManager.initializing field true. When this 
field is true, the Driver does not start a new executor from the AM request. 
The following two cases will cause the field to False

1. executor idle for some time.
2. There are new stages to be submitted

If the stage after the submission, AM was killed and restart, the above two 
cases will not appear.
1. When AM is killed, the yarn will kill all running containers. All 
execuotr will be lost and no executor will be idle.
2. No surviving executor, resulting in the current stage will never be 
completed, DAG will not submit a new stage.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17480: [SPARK-20079][Core][yarn] Re registration of AM hangs sp...

2017-04-18 Thread witgo
Github user witgo commented on the issue:

https://github.com/apache/spark/pull/17480
  
@vanzin 
Sorry, I do not understand what you mean. Do you submit a new PR to your 
own ideas? If you can, I will close this PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17882: [WIP][SPARK-20079][yarn] Re registration of AM ha...

2017-07-26 Thread witgo
Github user witgo closed the pull request at:

https://github.com/apache/spark/pull/17882


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17882: [WIP][SPARK-20079][yarn] Re registration of AM hangs spa...

2017-07-26 Thread witgo
Github user witgo commented on the issue:

https://github.com/apache/spark/pull/17882
  
I'm very sorry, I haven't taken the time to update this code recently.
@vanzin , thank you for your work. I'll close this PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #14995: [Test Only][SPARK-6235][CORE]Address various 2G limits

2017-07-01 Thread witgo
Github user witgo commented on the issue:

https://github.com/apache/spark/pull/14995
  
I did not do much testing, but I think it can be used in the production 
environment
the url: 
https://github.com/witgo/spark/tree/SPARK-6235_Address_various_2G_limits


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18008: [SPARK-20776] Fix perf. problems in JobProgressListener ...

2017-05-17 Thread witgo
Github user witgo commented on the issue:

https://github.com/apache/spark/pull/18008
  
@JoshRosen , what's the tool in your screenshot?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18008: [SPARK-20776] Fix perf. problems in JobProgressListener ...

2017-05-17 Thread witgo
Github user witgo commented on the issue:

https://github.com/apache/spark/pull/18008
  
@JoshRosen   I see, Thank you.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17139: [SPARK-19486][CORE](try 3) Investigate using multiple th...

2017-06-21 Thread witgo
Github user witgo commented on the issue:

https://github.com/apache/spark/pull/17139
  
@jiangxb1987 ,Yes do you have any questions?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17882: [WIP][SPARK-20079][try 2][yarn] Re registration of AM ha...

2017-06-02 Thread witgo
Github user witgo commented on the issue:

https://github.com/apache/spark/pull/17882
  
@jerryshao @vanzin 
Would you take some time to review this PR?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17882: [SPARK-20079][yarn] Re registration of AM hangs s...

2017-06-07 Thread witgo
Github user witgo commented on a diff in the pull request:

https://github.com/apache/spark/pull/17882#discussion_r120652302
  
--- Diff: 
resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
 ---
@@ -68,6 +68,8 @@ private[spark] abstract class YarnSchedulerBackend(
   // Flag to specify whether this schedulerBackend should be reset.
   private var shouldResetOnAmRegister = false
 
+  private var lastRequestExecutors = RequestExecutors(-1, -1, Map.empty, 
Set.empty)
--- End diff --

Done.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #17882: [SPARK-20079][yarn] Re registration of AM hangs spark cl...

2017-06-07 Thread witgo
Github user witgo commented on the issue:

https://github.com/apache/spark/pull/17882
  
@vanzin Done.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17882: [SPARK-20079][yarn] Re registration of AM hangs s...

2017-06-07 Thread witgo
Github user witgo commented on a diff in the pull request:

https://github.com/apache/spark/pull/17882#discussion_r120644588
  
--- Diff: 
resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
 ---
@@ -176,16 +179,6 @@ private[spark] abstract class YarnSchedulerBackend(
   }
 
   /**
-   * Reset the state of SchedulerBackend to the initial state. This is 
happened when AM is failed
-   * and re-registered itself to driver after a failure. The stale state 
in driver should be
-   * cleaned.
-   */
-  override protected def reset(): Unit = {
--- End diff --

From the current code, we reset the state of `ExecutorAllocationManager` is 
not correct. Iy causes the `RetrieveLastRequestExecutors` message to not work 
properly


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17882: [WIP][SPARK-20079][yarn] Re registration of AM ha...

2017-06-14 Thread witgo
Github user witgo commented on a diff in the pull request:

https://github.com/apache/spark/pull/17882#discussion_r121972913
  
--- Diff: 
resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
 ---
@@ -176,16 +179,6 @@ private[spark] abstract class YarnSchedulerBackend(
   }
 
   /**
-   * Reset the state of SchedulerBackend to the initial state. This is 
happened when AM is failed
-   * and re-registered itself to driver after a failure. The stale state 
in driver should be
-   * cleaned.
-   */
-  override protected def reset(): Unit = {
--- End diff --

I think `ExecutorAllocationManager#reset` is still necessary, but the 
following code should be removed
```scala
initializing = true
numExecutorsTarget = initialNumExecutors
numExecutorsToAdd = 1
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17882: [WIP][SPARK-20079][try 2][yarn] Re registration o...

2017-05-06 Thread witgo
GitHub user witgo opened a pull request:

https://github.com/apache/spark/pull/17882

[WIP][SPARK-20079][try 2][yarn] Re registration of AM hangs spark cluster 
in yarn-client mode.

See #17480 

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/witgo/spark SPARK-20079_try2

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/17882.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #17882


commit fd4c486725ae49d068e67088a185fb4cc229e21f
Author: Guoqiang Li <wi...@qq.com>
Date:   2017-03-30T14:17:49Z

SPARK-20079: Re registration of AM hangs spark cluster in yarn-client mode.

commit fa27d7f6da49659c8aa6efe55a3e457e883eb17a
Author: Guoqiang Li <wi...@qq.com>
Date:   2017-04-04T04:33:45Z

review commits

commit 6341d31c2961a08db2f36339f5ebe8c814eeb4c7
Author: Guoqiang Li <wi...@qq.com>
Date:   2017-04-07T10:32:29Z

review commits

commit e992df93e7222d5d2bd66d9a2c19984c9b241fd5
Author: Guoqiang Li <wi...@qq.com>
Date:   2017-04-23T04:56:58Z

Delete "initializing = true" in ExecutorAllocationManager.reset

commit 917cf43ffaaeb20347df3c7e480cb75ae87dca83
Author: Guoqiang Li <wi...@qq.com>
Date:   2017-05-06T13:28:28Z

Add msg: RetrieveLastAllocatedExecutorNumber




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21346: [SPARK-6237][NETWORK] Network-layer changes to al...

2018-06-13 Thread witgo
Github user witgo commented on a diff in the pull request:

https://github.com/apache/spark/pull/21346#discussion_r195284967
  
--- Diff: 
common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java
 ---
@@ -220,30 +196,91 @@ public long sendRpc(ByteBuffer message, 
RpcResponseCallback callback) {
 handler.addRpcRequest(requestId, callback);
 
 channel.writeAndFlush(new RpcRequest(requestId, new 
NioManagedBuffer(message)))
-.addListener(future -> {
-  if (future.isSuccess()) {
-long timeTaken = System.currentTimeMillis() - startTime;
-if (logger.isTraceEnabled()) {
-  logger.trace("Sending request {} to {} took {} ms", 
requestId,
-getRemoteAddress(channel), timeTaken);
-}
-  } else {
-String errorMsg = String.format("Failed to send RPC %s to %s: 
%s", requestId,
-  getRemoteAddress(channel), future.cause());
-logger.error(errorMsg, future.cause());
-handler.removeRpcRequest(requestId);
-channel.close();
-try {
-  callback.onFailure(new IOException(errorMsg, 
future.cause()));
-} catch (Exception e) {
-  logger.error("Uncaught exception in RPC response callback 
handler!", e);
-}
-  }
-});
+  .addListener(new RpcChannelListener(startTime, requestId, callback));
+
+return requestId;
+  }
+
+  /**
+   * Send data to the remote end as a stream.   This differs from stream() 
in that this is a request
+   * to *send* data to the remote end, not to receive it from the remote.
+   *
+   * @param meta meta data associated with the stream, which will be read 
completely on the
+   * receiving end before the stream itself.
+   * @param data this will be streamed to the remote end to allow for 
transferring large amounts
+   * of data without reading into memory.
+   * @param callback handles the reply -- onSuccess will only be called 
when both message and data
+   * are received successfully.
+   */
+  public long uploadStream(
+  ManagedBuffer meta,
+  ManagedBuffer data,
+  RpcResponseCallback callback) {
+long startTime = System.currentTimeMillis();
+if (logger.isTraceEnabled()) {
+  logger.trace("Sending RPC to {}", getRemoteAddress(channel));
+}
+
+long requestId = Math.abs(UUID.randomUUID().getLeastSignificantBits());
--- End diff --

This  `Math.abs(UUID.randomUUID().getLeastSignificantBits());` is repeated 
twice. Move it to a separate new method .


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21346: [SPARK-6237][NETWORK] Network-layer changes to al...

2018-06-14 Thread witgo
Github user witgo commented on a diff in the pull request:

https://github.com/apache/spark/pull/21346#discussion_r195287202
  
--- Diff: 
common/network-common/src/main/java/org/apache/spark/network/protocol/UploadStream.java
 ---
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.protocol;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import com.google.common.base.Objects;
+import io.netty.buffer.ByteBuf;
+
+import org.apache.spark.network.buffer.ManagedBuffer;
+import org.apache.spark.network.buffer.NettyManagedBuffer;
+
+/**
+ * An RPC with data that is sent outside of the frame, so it can be read 
as a stream.
+ */
+public final class UploadStream extends AbstractMessage implements 
RequestMessage {
--- End diff --

Is it possible to merge UploadStream and RpcRequest into a class?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21451: [SPARK-24296][CORE][WIP] Replicate large blocks a...

2018-05-29 Thread witgo
Github user witgo commented on a diff in the pull request:

https://github.com/apache/spark/pull/21451#discussion_r191628993
  
--- Diff: 
common/network-common/src/main/java/org/apache/spark/network/server/RpcHandler.java
 ---
@@ -38,15 +38,24 @@
*
* This method will not be called in parallel for a single 
TransportClient (i.e., channel).
*
+   * The rpc *might* included a data stream in streamData 
(eg. for uploading a large
+   * amount of data which should not be buffered in memory here).  Any 
errors while handling the
+   * streamData will lead to failing this entire connection -- all other 
in-flight rpcs will fail.
+   * If stream data is not null, you *must* call 
streamData.registerStreamCallback
+   * before this method returns.
+   *
* @param client A channel client which enables the handler to make 
requests back to the sender
*   of this RPC. This will always be the exact same object 
for a particular channel.
* @param message The serialized bytes of the RPC.
+   * @param streamData StreamData if there is data which is meant to be 
read via a StreamCallback;
+   *   otherwise it is null.
* @param callback Callback which should be invoked exactly once upon 
success or failure of the
* RPC.
*/
   public abstract void receive(
   TransportClient client,
   ByteBuffer message,
+  StreamData streamData,
--- End diff --

It's not necessary to add a parameter.  Change the message parameter to 
InputStream.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21451: [SPARK-24296][CORE][WIP] Replicate large blocks a...

2018-05-31 Thread witgo
Github user witgo commented on a diff in the pull request:

https://github.com/apache/spark/pull/21451#discussion_r192279111
  
--- Diff: 
common/network-common/src/main/java/org/apache/spark/network/server/RpcHandler.java
 ---
@@ -38,15 +38,24 @@
*
* This method will not be called in parallel for a single 
TransportClient (i.e., channel).
*
+   * The rpc *might* included a data stream in streamData 
(eg. for uploading a large
+   * amount of data which should not be buffered in memory here).  Any 
errors while handling the
+   * streamData will lead to failing this entire connection -- all other 
in-flight rpcs will fail.
+   * If stream data is not null, you *must* call 
streamData.registerStreamCallback
+   * before this method returns.
+   *
* @param client A channel client which enables the handler to make 
requests back to the sender
*   of this RPC. This will always be the exact same object 
for a particular channel.
* @param message The serialized bytes of the RPC.
+   * @param streamData StreamData if there is data which is meant to be 
read via a StreamCallback;
+   *   otherwise it is null.
* @param callback Callback which should be invoked exactly once upon 
success or failure of the
* RPC.
*/
   public abstract void receive(
   TransportClient client,
   ByteBuffer message,
+  StreamData streamData,
--- End diff --

What about incorporating parameter `message` into parameter `streamData`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #14658: [WIP][SPARK-5928][SPARK-6238] Remote Shuffle Blocks cann...

2017-12-26 Thread witgo
Github user witgo commented on the issue:

https://github.com/apache/spark/pull/14658
  
Spark 2.2 has fixed this issue.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



<    4   5   6   7   8   9