[jira] [Updated] (SPARK-25429) SparkListenerBus inefficient due to 'LiveStageMetrics#accumulatorIds:Array[Long]' data structure

2018-09-14 Thread DENG FEI (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-25429?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

DENG FEI updated SPARK-25429:
-
Description: 
{code:java}
private def updateStageMetrics(
  stageId: Int,
  attemptId: Int,
  taskId: Long,
  accumUpdates: Seq[AccumulableInfo],
  succeeded: Boolean): Unit = {
Option(stageMetrics.get(stageId)).foreach { metrics =>
  if (metrics.attemptId != attemptId || metrics.accumulatorIds.isEmpty) {
return
  }

  val oldTaskMetrics = metrics.taskMetrics.get(taskId)
  if (oldTaskMetrics != null && oldTaskMetrics.succeeded) {
return
  }

  val updates = accumUpdates
.filter { acc => acc.update.isDefined && 
metrics.accumulatorIds.contains(acc.id) }
.sortBy(_.id)

  if (updates.isEmpty) {
return
  }

  val ids = new Array[Long](updates.size)
  val values = new Array[Long](updates.size)
  updates.zipWithIndex.foreach { case (acc, idx) =>
ids(idx) = acc.id
// In a live application, accumulators have Long values, but when 
reading from event
// logs, they have String values. For now, assume all accumulators are 
Long and covert
// accordingly.
values(idx) = acc.update.get match {
  case s: String => s.toLong
  case l: Long => l
  case o => throw new IllegalArgumentException(s"Unexpected: $o")
}
  }

  // TODO: storing metrics by task ID can cause metrics for the same task 
index to be
  // counted multiple times, for example due to speculation or re-attempts.
  metrics.taskMetrics.put(taskId, new LiveTaskMetrics(ids, values, 
succeeded))
}
  }
{code}

'metrics.accumulatorIds.contains(acc.id)', if large SQL application generated 
many accumulator, it's inefficient use Arrray#contains.
Actually, application may timeout while quit and will killed by RM on YARN mode.



  was:
{code:java}
private def updateStageMetrics(
  stageId: Int,
  attemptId: Int,
  taskId: Long,
  accumUpdates: Seq[AccumulableInfo],
  succeeded: Boolean): Unit = {
Option(stageMetrics.get(stageId)).foreach { metrics =>
  if (metrics.attemptId != attemptId || metrics.accumulatorIds.isEmpty) {
return
  }

  val oldTaskMetrics = metrics.taskMetrics.get(taskId)
  if (oldTaskMetrics != null && oldTaskMetrics.succeeded) {
return
  }

  val updates = accumUpdates
.filter { acc => acc.update.isDefined && 
metrics.accumulatorIds.contains(acc.id) }
.sortBy(_.id)

  if (updates.isEmpty) {
return
  }

  val ids = new Array[Long](updates.size)
  val values = new Array[Long](updates.size)
  updates.zipWithIndex.foreach { case (acc, idx) =>
ids(idx) = acc.id
// In a live application, accumulators have Long values, but when 
reading from event
// logs, they have String values. For now, assume all accumulators are 
Long and covert
// accordingly.
values(idx) = acc.update.get match {
  case s: String => s.toLong
  case l: Long => l
  case o => throw new IllegalArgumentException(s"Unexpected: $o")
}
  }

  // TODO: storing metrics by task ID can cause metrics for the same task 
index to be
  // counted multiple times, for example due to speculation or re-attempts.
  metrics.taskMetrics.put(taskId, new LiveTaskMetrics(ids, values, 
succeeded))
}
  }
{code}

'metrics.accumulatorIds.contains(acc.id)', if large SQL application generated 
many accumulator, 




> SparkListenerBus inefficient due to 
> 'LiveStageMetrics#accumulatorIds:Array[Long]' data structure
> 
>
> Key: SPARK-25429
> URL: https://issues.apache.org/jira/browse/SPARK-25429
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 2.3.1
>Reporter: DENG FEI
>Priority: Major
>
> {code:java}
> private def updateStageMetrics(
>   stageId: Int,
>   attemptId: Int,
>   taskId: Long,
>   accumUpdates: Seq[AccumulableInfo],
>   succeeded: Boolean): Unit = {
> Option(stageMetrics.get(stageId)).foreach { metrics =>
>   if (metrics.attemptId != attemptId || metrics.accumulatorIds.isEmpty) {
> return
>   }
>   val oldTaskMetrics = metrics.taskMetrics.get(taskId)
>   if (oldTaskMetrics != null && oldTaskMetrics.succeeded) {
> return
>   }
>   val updates = accumUpdates
> .filter { acc => acc.update.isDefined && 
> metrics.accumulatorIds.contains(acc.id) }
> .sortBy(_.id)
>   if (updates.isEmpty) {
> return
>   }
>   val ids = new Array[Long](updates.size)
>   val values = new Array[Long](updates.size)
>   up

[jira] [Updated] (SPARK-25429) SparkListenerBus inefficient due to 'LiveStageMetrics#accumulatorIds:Array[Long]' data structure

2018-09-14 Thread DENG FEI (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-25429?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

DENG FEI updated SPARK-25429:
-
Description: 
{code:java}
private def updateStageMetrics(
  stageId: Int,
  attemptId: Int,
  taskId: Long,
  accumUpdates: Seq[AccumulableInfo],
  succeeded: Boolean): Unit = {
Option(stageMetrics.get(stageId)).foreach { metrics =>
  if (metrics.attemptId != attemptId || metrics.accumulatorIds.isEmpty) {
return
  }

  val oldTaskMetrics = metrics.taskMetrics.get(taskId)
  if (oldTaskMetrics != null && oldTaskMetrics.succeeded) {
return
  }

  val updates = accumUpdates
.filter { acc => acc.update.isDefined && 
metrics.accumulatorIds.contains(acc.id) }
.sortBy(_.id)

  if (updates.isEmpty) {
return
  }

  val ids = new Array[Long](updates.size)
  val values = new Array[Long](updates.size)
  updates.zipWithIndex.foreach { case (acc, idx) =>
ids(idx) = acc.id
// In a live application, accumulators have Long values, but when 
reading from event
// logs, they have String values. For now, assume all accumulators are 
Long and covert
// accordingly.
values(idx) = acc.update.get match {
  case s: String => s.toLong
  case l: Long => l
  case o => throw new IllegalArgumentException(s"Unexpected: $o")
}
  }

  // TODO: storing metrics by task ID can cause metrics for the same task 
index to be
  // counted multiple times, for example due to speculation or re-attempts.
  metrics.taskMetrics.put(taskId, new LiveTaskMetrics(ids, values, 
succeeded))
}
  }
{code}

'metrics.accumulatorIds.contains(acc.id)', if large SQL application generated 
many accumulator, 



> SparkListenerBus inefficient due to 
> 'LiveStageMetrics#accumulatorIds:Array[Long]' data structure
> 
>
> Key: SPARK-25429
> URL: https://issues.apache.org/jira/browse/SPARK-25429
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 2.3.1
>Reporter: DENG FEI
>Priority: Major
>
> {code:java}
> private def updateStageMetrics(
>   stageId: Int,
>   attemptId: Int,
>   taskId: Long,
>   accumUpdates: Seq[AccumulableInfo],
>   succeeded: Boolean): Unit = {
> Option(stageMetrics.get(stageId)).foreach { metrics =>
>   if (metrics.attemptId != attemptId || metrics.accumulatorIds.isEmpty) {
> return
>   }
>   val oldTaskMetrics = metrics.taskMetrics.get(taskId)
>   if (oldTaskMetrics != null && oldTaskMetrics.succeeded) {
> return
>   }
>   val updates = accumUpdates
> .filter { acc => acc.update.isDefined && 
> metrics.accumulatorIds.contains(acc.id) }
> .sortBy(_.id)
>   if (updates.isEmpty) {
> return
>   }
>   val ids = new Array[Long](updates.size)
>   val values = new Array[Long](updates.size)
>   updates.zipWithIndex.foreach { case (acc, idx) =>
> ids(idx) = acc.id
> // In a live application, accumulators have Long values, but when 
> reading from event
> // logs, they have String values. For now, assume all accumulators 
> are Long and covert
> // accordingly.
> values(idx) = acc.update.get match {
>   case s: String => s.toLong
>   case l: Long => l
>   case o => throw new IllegalArgumentException(s"Unexpected: $o")
> }
>   }
>   // TODO: storing metrics by task ID can cause metrics for the same task 
> index to be
>   // counted multiple times, for example due to speculation or 
> re-attempts.
>   metrics.taskMetrics.put(taskId, new LiveTaskMetrics(ids, values, 
> succeeded))
> }
>   }
> {code}
> 'metrics.accumulatorIds.contains(acc.id)', if large SQL application generated 
> many accumulator, 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-25429) SparkListenerBus inefficient due to 'LiveStageMetrics#accumulatorIds:Array[Long]' data structure

2018-09-13 Thread DENG FEI (JIRA)
DENG FEI created SPARK-25429:


 Summary: SparkListenerBus inefficient due to 
'LiveStageMetrics#accumulatorIds:Array[Long]' data structure
 Key: SPARK-25429
 URL: https://issues.apache.org/jira/browse/SPARK-25429
 Project: Spark
  Issue Type: Improvement
  Components: Spark Core
Affects Versions: 2.3.1
Reporter: DENG FEI






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-25139) PythonRunner#WriterThread released block after TaskRunner finally block which invoke BlockManager#releaseAllLocksForTask

2018-08-16 Thread DENG FEI (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-25139?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

DENG FEI updated SPARK-25139:
-
Description: 
We run pyspark streaming on YARN, the executor will die caused by the error: 
the task released lock while finished, but the python writer haven't do real 
releasing lock.

Normally the task just double check the lock, but it ran wrong in front.

The executor trace log is below:
 18/08/17 13:52:20 Executor task launch worker for task 137 DEBUG BlockManager: 
Getting local block input-0-1534485138800 18/08/17 13:52:20 Executor task 
launch worker for task 137 TRACE BlockInfoManager: Task 137 trying to acquire 
read lock for input-0-1534485138800 18/08/17 13:52:20 Executor task launch 
worker for task 137 TRACE BlockInfoManager: Task 137 acquired read lock for 
input-0-1534485138800 18/08/17 13:52:20 Executor task launch worker for task 
137 DEBUG BlockManager: Level for block input-0-1534485138800 is 
StorageLevel(disk, memory, 1 replicas) 18/08/17 13:52:20 Executor task launch 
worker for task 137 INFO BlockManager: Found block input-0-1534485138800 
locally 18/08/17 13:52:20 Executor task launch worker for task 137 INFO 
PythonRunner: Times: total = 8, boot = 3, init = 5, finish = 0 18/08/17 
13:52:20 stdout writer for python TRACE BlockInfoManager: Task 137 releasing 
lock for input-0-1534485138800 18/08/17 13:52:20 Executor task launch worker 
for task 137 INFO Executor: 1 block locks were not released by TID = 137: 
[input-0-1534485138800] 18/08/17 13:52:20 stdout writer for python ERROR Utils: 
Uncaught exception in thread stdout writer for python java.lang.AssertionError: 
assertion failed: Block input-0-1534485138800 is not locked for reading at 
scala.Predef$.assert(Predef.scala:170) at 
org.apache.spark.storage.BlockInfoManager.unlock(BlockInfoManager.scala:299) at 
org.apache.spark.storage.BlockManager.releaseLock(BlockManager.scala:769) at 
org.apache.spark.storage.BlockManager$$anonfun$1.apply$mcV$sp(BlockManager.scala:540)
 at 
org.apache.spark.util.CompletionIterator$$anon$1.completion(CompletionIterator.scala:44)
 at 
org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:33) 
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at 
scala.collection.Iterator$class.foreach(Iterator.scala:893) at 
scala.collection.AbstractIterator.foreach(Iterator.scala:1336) at 
org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:213)
 at 
org.apache.spark.api.python.PythonRunner$$anon$2.writeIteratorToStream(PythonRunner.scala:407)
 at 
org.apache.spark.api.python.BasePythonRunner$WriterThread$$anonfun$run$1.apply(PythonRunner.scala:215)
 at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1991) at 
org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:170)
 18/08/17 13:52:20 stdout writer for python ERROR 
SparkUncaughtExceptionHandler: Uncaught exception in thread Thread[stdout 
writer for python,5,main]

 

I think shoud wait WriterThread after Task#run.

  was:
We run pyspark streaming on YARN, the executor will die caused by the error.

The Task released lock if finished, but the python writer haven't do real 
releasing lock.Normally the task just double check the lock, but it ran wrong 
in front.

The executor trace log is below:
 18/08/17 13:52:20 Executor task launch worker for task 137 DEBUG BlockManager: 
Getting local block input-0-1534485138800 18/08/17 13:52:20 Executor task 
launch worker for task 137 TRACE BlockInfoManager: Task 137 trying to acquire 
read lock for input-0-1534485138800 18/08/17 13:52:20 Executor task launch 
worker for task 137 TRACE BlockInfoManager: Task 137 acquired read lock for 
input-0-1534485138800 18/08/17 13:52:20 Executor task launch worker for task 
137 DEBUG BlockManager: Level for block input-0-1534485138800 is 
StorageLevel(disk, memory, 1 replicas) 18/08/17 13:52:20 Executor task launch 
worker for task 137 INFO BlockManager: Found block input-0-1534485138800 
locally 18/08/17 13:52:20 Executor task launch worker for task 137 INFO 
PythonRunner: Times: total = 8, boot = 3, init = 5, finish = 0 18/08/17 
13:52:20 stdout writer for python TRACE BlockInfoManager: Task 137 releasing 
lock for input-0-1534485138800 18/08/17 13:52:20 Executor task launch worker 
for task 137 INFO Executor: 1 block locks were not released by TID = 137: 
[input-0-1534485138800] 18/08/17 13:52:20 stdout writer for python ERROR Utils: 
Uncaught exception in thread stdout writer for python java.lang.AssertionError: 
assertion failed: Block input-0-1534485138800 is not locked for reading at 
scala.Predef$.assert(Predef.scala:170) at 
org.apache.spark.storage.BlockInfoManager.unlock(BlockInfoManager.scala:299) at 
org.apache.spark.storage.BlockManager.releaseLock(BlockManager.scala:769) at 
org.apache.spark.storage.BlockManager$$anonfun$1.apply$mcV$sp(BlockManager.scala:540)
 at 
org.apache

[jira] [Updated] (SPARK-25139) PythonRunner#WriterThread released block after TaskRunner finally block which invoke BlockManager#releaseAllLocksForTask

2018-08-16 Thread DENG FEI (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-25139?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

DENG FEI updated SPARK-25139:
-
Description: 
We run pyspark streaming on YARN, the executor will die caused by the error.

The Task released lock if finished, but the python writer haven't do real 
releasing lock.Normally the task just double check the lock, but it ran wrong 
in front.

The executor trace log is below:
 18/08/17 13:52:20 Executor task launch worker for task 137 DEBUG BlockManager: 
Getting local block input-0-1534485138800 18/08/17 13:52:20 Executor task 
launch worker for task 137 TRACE BlockInfoManager: Task 137 trying to acquire 
read lock for input-0-1534485138800 18/08/17 13:52:20 Executor task launch 
worker for task 137 TRACE BlockInfoManager: Task 137 acquired read lock for 
input-0-1534485138800 18/08/17 13:52:20 Executor task launch worker for task 
137 DEBUG BlockManager: Level for block input-0-1534485138800 is 
StorageLevel(disk, memory, 1 replicas) 18/08/17 13:52:20 Executor task launch 
worker for task 137 INFO BlockManager: Found block input-0-1534485138800 
locally 18/08/17 13:52:20 Executor task launch worker for task 137 INFO 
PythonRunner: Times: total = 8, boot = 3, init = 5, finish = 0 18/08/17 
13:52:20 stdout writer for python TRACE BlockInfoManager: Task 137 releasing 
lock for input-0-1534485138800 18/08/17 13:52:20 Executor task launch worker 
for task 137 INFO Executor: 1 block locks were not released by TID = 137: 
[input-0-1534485138800] 18/08/17 13:52:20 stdout writer for python ERROR Utils: 
Uncaught exception in thread stdout writer for python java.lang.AssertionError: 
assertion failed: Block input-0-1534485138800 is not locked for reading at 
scala.Predef$.assert(Predef.scala:170) at 
org.apache.spark.storage.BlockInfoManager.unlock(BlockInfoManager.scala:299) at 
org.apache.spark.storage.BlockManager.releaseLock(BlockManager.scala:769) at 
org.apache.spark.storage.BlockManager$$anonfun$1.apply$mcV$sp(BlockManager.scala:540)
 at 
org.apache.spark.util.CompletionIterator$$anon$1.completion(CompletionIterator.scala:44)
 at 
org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:33) 
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at 
scala.collection.Iterator$class.foreach(Iterator.scala:893) at 
scala.collection.AbstractIterator.foreach(Iterator.scala:1336) at 
org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:213)
 at 
org.apache.spark.api.python.PythonRunner$$anon$2.writeIteratorToStream(PythonRunner.scala:407)
 at 
org.apache.spark.api.python.BasePythonRunner$WriterThread$$anonfun$run$1.apply(PythonRunner.scala:215)
 at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1991) at 
org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:170)
 18/08/17 13:52:20 stdout writer for python ERROR 
SparkUncaughtExceptionHandler: Uncaught exception in thread Thread[stdout 
writer for python,5,main]

 

I think shoud wait WriterThread after Task#run.

  was:
We run pyspark streaming on YARN, the executor will die caused by the error.

The Task released lock if finished, but the 

The executor trace log is below:
 18/08/17 13:52:20 Executor task launch worker for task 137 DEBUG BlockManager: 
Getting local block input-0-1534485138800 18/08/17 13:52:20 Executor task 
launch worker for task 137 TRACE BlockInfoManager: Task 137 trying to acquire 
read lock for input-0-1534485138800 18/08/17 13:52:20 Executor task launch 
worker for task 137 TRACE BlockInfoManager: Task 137 acquired read lock for 
input-0-1534485138800 18/08/17 13:52:20 Executor task launch worker for task 
137 DEBUG BlockManager: Level for block input-0-1534485138800 is 
StorageLevel(disk, memory, 1 replicas) 18/08/17 13:52:20 Executor task launch 
worker for task 137 INFO BlockManager: Found block input-0-1534485138800 
locally 18/08/17 13:52:20 Executor task launch worker for task 137 INFO 
PythonRunner: Times: total = 8, boot = 3, init = 5, finish = 0 18/08/17 
13:52:20 stdout writer for python TRACE BlockInfoManager: Task 137 releasing 
lock for input-0-1534485138800 18/08/17 13:52:20 Executor task launch worker 
for task 137 INFO Executor: 1 block locks were not released by TID = 137: 
[input-0-1534485138800] 18/08/17 13:52:20 stdout writer for python ERROR Utils: 
Uncaught exception in thread stdout writer for python java.lang.AssertionError: 
assertion failed: Block input-0-1534485138800 is not locked for reading at 
scala.Predef$.assert(Predef.scala:170) at 
org.apache.spark.storage.BlockInfoManager.unlock(BlockInfoManager.scala:299) at 
org.apache.spark.storage.BlockManager.releaseLock(BlockManager.scala:769) at 
org.apache.spark.storage.BlockManager$$anonfun$1.apply$mcV$sp(BlockManager.scala:540)
 at 
org.apache.spark.util.CompletionIterator$$anon$1.completion(CompletionIterator.scala:44)
 at 
org.apache.spark.util.CompletionIterato

[jira] [Updated] (SPARK-25139) PythonRunner#WriterThread released block after TaskRunner finally block which invoke BlockManager#releaseAllLocksForTask

2018-08-16 Thread DENG FEI (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-25139?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

DENG FEI updated SPARK-25139:
-
Description: 
We run pyspark streaming on YARN, the executor will die caused by the error.

The Task released lock if finished, but the 

The executor trace log is below:
 18/08/17 13:52:20 Executor task launch worker for task 137 DEBUG BlockManager: 
Getting local block input-0-1534485138800 18/08/17 13:52:20 Executor task 
launch worker for task 137 TRACE BlockInfoManager: Task 137 trying to acquire 
read lock for input-0-1534485138800 18/08/17 13:52:20 Executor task launch 
worker for task 137 TRACE BlockInfoManager: Task 137 acquired read lock for 
input-0-1534485138800 18/08/17 13:52:20 Executor task launch worker for task 
137 DEBUG BlockManager: Level for block input-0-1534485138800 is 
StorageLevel(disk, memory, 1 replicas) 18/08/17 13:52:20 Executor task launch 
worker for task 137 INFO BlockManager: Found block input-0-1534485138800 
locally 18/08/17 13:52:20 Executor task launch worker for task 137 INFO 
PythonRunner: Times: total = 8, boot = 3, init = 5, finish = 0 18/08/17 
13:52:20 stdout writer for python TRACE BlockInfoManager: Task 137 releasing 
lock for input-0-1534485138800 18/08/17 13:52:20 Executor task launch worker 
for task 137 INFO Executor: 1 block locks were not released by TID = 137: 
[input-0-1534485138800] 18/08/17 13:52:20 stdout writer for python ERROR Utils: 
Uncaught exception in thread stdout writer for python java.lang.AssertionError: 
assertion failed: Block input-0-1534485138800 is not locked for reading at 
scala.Predef$.assert(Predef.scala:170) at 
org.apache.spark.storage.BlockInfoManager.unlock(BlockInfoManager.scala:299) at 
org.apache.spark.storage.BlockManager.releaseLock(BlockManager.scala:769) at 
org.apache.spark.storage.BlockManager$$anonfun$1.apply$mcV$sp(BlockManager.scala:540)
 at 
org.apache.spark.util.CompletionIterator$$anon$1.completion(CompletionIterator.scala:44)
 at 
org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:33) 
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at 
scala.collection.Iterator$class.foreach(Iterator.scala:893) at 
scala.collection.AbstractIterator.foreach(Iterator.scala:1336) at 
org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:213)
 at 
org.apache.spark.api.python.PythonRunner$$anon$2.writeIteratorToStream(PythonRunner.scala:407)
 at 
org.apache.spark.api.python.BasePythonRunner$WriterThread$$anonfun$run$1.apply(PythonRunner.scala:215)
 at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1991) at 
org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:170)
 18/08/17 13:52:20 stdout writer for python ERROR 
SparkUncaughtExceptionHandler: Uncaught exception in thread Thread[stdout 
writer for python,5,main]

I think shoud wait WriterThread after Task#run.

  was:
We run pyspark streaming on YARN, the executor will die caused by the error.

The executor trace log is below:
18/08/17 13:52:20 Executor task launch worker for task 137 DEBUG BlockManager: 
Getting local block input-0-1534485138800 18/08/17 13:52:20 Executor task 
launch worker for task 137 TRACE BlockInfoManager: Task 137 trying to acquire 
read lock for input-0-1534485138800 18/08/17 13:52:20 Executor task launch 
worker for task 137 TRACE BlockInfoManager: Task 137 acquired read lock for 
input-0-1534485138800 18/08/17 13:52:20 Executor task launch worker for task 
137 DEBUG BlockManager: Level for block input-0-1534485138800 is 
StorageLevel(disk, memory, 1 replicas) 18/08/17 13:52:20 Executor task launch 
worker for task 137 INFO BlockManager: Found block input-0-1534485138800 
locally 18/08/17 13:52:20 Executor task launch worker for task 137 INFO 
PythonRunner: Times: total = 8, boot = 3, init = 5, finish = 0 18/08/17 
13:52:20 stdout writer for python TRACE BlockInfoManager: Task 137 releasing 
lock for input-0-1534485138800 18/08/17 13:52:20 Executor task launch worker 
for task 137 INFO Executor: 1 block locks were not released by TID = 137: 
[input-0-1534485138800] 18/08/17 13:52:20 stdout writer for python ERROR Utils: 
Uncaught exception in thread stdout writer for python java.lang.AssertionError: 
assertion failed: Block input-0-1534485138800 is not locked for reading at 
scala.Predef$.assert(Predef.scala:170) at 
org.apache.spark.storage.BlockInfoManager.unlock(BlockInfoManager.scala:299) at 
org.apache.spark.storage.BlockManager.releaseLock(BlockManager.scala:769) at 
org.apache.spark.storage.BlockManager$$anonfun$1.apply$mcV$sp(BlockManager.scala:540)
 at 
org.apache.spark.util.CompletionIterator$$anon$1.completion(CompletionIterator.scala:44)
 at 
org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:33) 
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at 
scala.collection.Iterator$class.foreach(Iterator.scala:893)

[jira] [Updated] (SPARK-25139) PythonRunner#WriterThread released block after TaskRunner finally block which invoke BlockManager#releaseAllLocksForTask

2018-08-16 Thread DENG FEI (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-25139?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

DENG FEI updated SPARK-25139:
-
Description: 
We run pyspark streaming on YARN, the executor will die caused by the error.

The executor trace log is below:
18/08/17 13:52:20 Executor task launch worker for task 137 DEBUG BlockManager: 
Getting local block input-0-1534485138800 18/08/17 13:52:20 Executor task 
launch worker for task 137 TRACE BlockInfoManager: Task 137 trying to acquire 
read lock for input-0-1534485138800 18/08/17 13:52:20 Executor task launch 
worker for task 137 TRACE BlockInfoManager: Task 137 acquired read lock for 
input-0-1534485138800 18/08/17 13:52:20 Executor task launch worker for task 
137 DEBUG BlockManager: Level for block input-0-1534485138800 is 
StorageLevel(disk, memory, 1 replicas) 18/08/17 13:52:20 Executor task launch 
worker for task 137 INFO BlockManager: Found block input-0-1534485138800 
locally 18/08/17 13:52:20 Executor task launch worker for task 137 INFO 
PythonRunner: Times: total = 8, boot = 3, init = 5, finish = 0 18/08/17 
13:52:20 stdout writer for python TRACE BlockInfoManager: Task 137 releasing 
lock for input-0-1534485138800 18/08/17 13:52:20 Executor task launch worker 
for task 137 INFO Executor: 1 block locks were not released by TID = 137: 
[input-0-1534485138800] 18/08/17 13:52:20 stdout writer for python ERROR Utils: 
Uncaught exception in thread stdout writer for python java.lang.AssertionError: 
assertion failed: Block input-0-1534485138800 is not locked for reading at 
scala.Predef$.assert(Predef.scala:170) at 
org.apache.spark.storage.BlockInfoManager.unlock(BlockInfoManager.scala:299) at 
org.apache.spark.storage.BlockManager.releaseLock(BlockManager.scala:769) at 
org.apache.spark.storage.BlockManager$$anonfun$1.apply$mcV$sp(BlockManager.scala:540)
 at 
org.apache.spark.util.CompletionIterator$$anon$1.completion(CompletionIterator.scala:44)
 at 
org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:33) 
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at 
scala.collection.Iterator$class.foreach(Iterator.scala:893) at 
scala.collection.AbstractIterator.foreach(Iterator.scala:1336) at 
org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:213)
 at 
org.apache.spark.api.python.PythonRunner$$anon$2.writeIteratorToStream(PythonRunner.scala:407)
 at 
org.apache.spark.api.python.BasePythonRunner$WriterThread$$anonfun$run$1.apply(PythonRunner.scala:215)
 at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1991) at 
org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:170)
 18/08/17 13:52:20 stdout writer for python ERROR 
SparkUncaughtExceptionHandler: Uncaught exception in thread Thread[stdout 
writer for python,5,main]

I think shoud wait WriterThread after Task#run.

  was:
We run pyspark streaming on YARN, the executor will die caused by the error.

The executor trace log is below:
{noformat}
18/08/17 13:52:20 Executor task launch worker for task 137 DEBUG BlockManager: 
Getting local block input-0-1534485138800 18/08/17 13:52:20 Executor task 
launch worker for task 137 TRACE BlockInfoManager: Task 137 trying to acquire 
read lock for input-0-1534485138800 18/08/17 13:52:20 Executor task launch 
worker for task 137 TRACE BlockInfoManager: Task 137 acquired read lock for 
input-0-1534485138800 18/08/17 13:52:20 Executor task launch worker for task 
137 DEBUG BlockManager: Level for block input-0-1534485138800 is 
StorageLevel(disk, memory, 1 replicas) 18/08/17 13:52:20 Executor task launch 
worker for task 137 INFO BlockManager: Found block input-0-1534485138800 
locally 18/08/17 13:52:20 Executor task launch worker for task 137 INFO 
PythonRunner: Times: total = 8, boot = 3, init = 5, finish = 0 18/08/17 
13:52:20 stdout writer for python TRACE BlockInfoManager: Task 137 releasing 
lock for input-0-1534485138800 18/08/17 13:52:20 Executor task launch worker 
for task 137 INFO Executor: 1 block locks were not released by TID = 137: 
[input-0-1534485138800] 18/08/17 13:52:20 stdout writer for python ERROR Utils: 
Uncaught exception in thread stdout writer for python java.lang.AssertionError: 
assertion failed: Block input-0-1534485138800 is not locked for reading at 
scala.Predef$.assert(Predef.scala:170) at 
org.apache.spark.storage.BlockInfoManager.unlock(BlockInfoManager.scala:299) at 
org.apache.spark.storage.BlockManager.releaseLock(BlockManager.scala:769) at 
org.apache.spark.storage.BlockManager$$anonfun$1.apply$mcV$sp(BlockManager.scala:540)
 at 
org.apache.spark.util.CompletionIterator$$anon$1.completion(CompletionIterator.scala:44)
 at 
org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:33) 
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at 
scala.collection.Iterator$class.foreach(Iterator.scala:893) at 
scala.collection.AbstractIterat

[jira] [Created] (SPARK-25139) PythonRunner#WriterThread released block after TaskRunner finally block which invoke BlockManager#releaseAllLocksForTask

2018-08-16 Thread DENG FEI (JIRA)
DENG FEI created SPARK-25139:


 Summary: PythonRunner#WriterThread released block after TaskRunner 
finally block which  invoke BlockManager#releaseAllLocksForTask
 Key: SPARK-25139
 URL: https://issues.apache.org/jira/browse/SPARK-25139
 Project: Spark
  Issue Type: Bug
  Components: Block Manager
Affects Versions: 2.3.1
Reporter: DENG FEI


We run pyspark streaming on YARN, the executor will die caused by the error.

The executor trace log is below:
{noformat}
18/08/17 13:52:20 Executor task launch worker for task 137 DEBUG BlockManager: 
Getting local block input-0-1534485138800 18/08/17 13:52:20 Executor task 
launch worker for task 137 TRACE BlockInfoManager: Task 137 trying to acquire 
read lock for input-0-1534485138800 18/08/17 13:52:20 Executor task launch 
worker for task 137 TRACE BlockInfoManager: Task 137 acquired read lock for 
input-0-1534485138800 18/08/17 13:52:20 Executor task launch worker for task 
137 DEBUG BlockManager: Level for block input-0-1534485138800 is 
StorageLevel(disk, memory, 1 replicas) 18/08/17 13:52:20 Executor task launch 
worker for task 137 INFO BlockManager: Found block input-0-1534485138800 
locally 18/08/17 13:52:20 Executor task launch worker for task 137 INFO 
PythonRunner: Times: total = 8, boot = 3, init = 5, finish = 0 18/08/17 
13:52:20 stdout writer for python TRACE BlockInfoManager: Task 137 releasing 
lock for input-0-1534485138800 18/08/17 13:52:20 Executor task launch worker 
for task 137 INFO Executor: 1 block locks were not released by TID = 137: 
[input-0-1534485138800] 18/08/17 13:52:20 stdout writer for python ERROR Utils: 
Uncaught exception in thread stdout writer for python java.lang.AssertionError: 
assertion failed: Block input-0-1534485138800 is not locked for reading at 
scala.Predef$.assert(Predef.scala:170) at 
org.apache.spark.storage.BlockInfoManager.unlock(BlockInfoManager.scala:299) at 
org.apache.spark.storage.BlockManager.releaseLock(BlockManager.scala:769) at 
org.apache.spark.storage.BlockManager$$anonfun$1.apply$mcV$sp(BlockManager.scala:540)
 at 
org.apache.spark.util.CompletionIterator$$anon$1.completion(CompletionIterator.scala:44)
 at 
org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:33) 
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at 
scala.collection.Iterator$class.foreach(Iterator.scala:893) at 
scala.collection.AbstractIterator.foreach(Iterator.scala:1336) at 
org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:213)
 at 
org.apache.spark.api.python.PythonRunner$$anon$2.writeIteratorToStream(PythonRunner.scala:407)
 at 
org.apache.spark.api.python.BasePythonRunner$WriterThread$$anonfun$run$1.apply(PythonRunner.scala:215)
 at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1991) at 
org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:170)
 18/08/17 13:52:20 stdout writer for python ERROR 
SparkUncaughtExceptionHandler: Uncaught exception in thread Thread[stdout 
writer for python,5,main]
{noformat}

I think shoud wait WriterThread after Task#run.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-25070) BlockFetchingListener#onBlockFetchSuccess throw "java.util.NoSuchElementException: key not found: shuffle_8_68_113" on ShuffleBlockFetcherIterator caused stage hang lon

2018-08-09 Thread DENG FEI (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-25070?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

DENG FEI updated SPARK-25070:
-
Description: 
The task fetch shuffle block success, but failed onBlockFetchSuccess, lead the 
task hang long time and speculate as false.

The log is below:

18/08/08 14:55:53 INFO ShuffleBlockFetcherIterator: Started 16 remote fetches 
in 16 ms

18/08/08 14:55:53 WARN TransportChannelHandler: Exception in connection from 
/xxx.xxx.xxx.xxx:7337 java.util.NoSuchElementException: key not found: 
shuffle_8_68_113 at scala.collection.MapLike$class.default(MapLike.scala:228) 
at scala.collection.AbstractMap.default(Map.scala:59) at 
scala.collection.MapLike$class.apply(MapLike.scala:141) at 
scala.collection.AbstractMap.apply(Map.scala:59) at 
org.apache.spark.storage.ShuffleBlockFetcherIterator$$anon$1.onBlockFetchSuccess(ShuffleBlockFetcherIterator.scala:217)
 at 
org.apache.spark.network.shuffle.RetryingBlockFetcher$RetryingBlockFetchListener.onBlockFetchSuccess(RetryingBlockFetcher.java:204)
 at 
org.apache.spark.network.shuffle.OneForOneBlockFetcher$ChunkCallback.onSuccess(OneForOneBlockFetcher.java:97)
 at 
org.apache.spark.network.client.TransportResponseHandler.handle(TransportResponseHandler.java:171)
 at

XX

18/08/08 14:55:53 INFO Executor: Finished task 44.0 in stage 14.0 (TID 1483). 
3458 bytes result sent to driver 18/08/09 10:02:32 INFO Executor: Executor is 
trying to kill task 113.0 in stage 14.0 (TID 1552), reason: stage cancelled
{code:java}
val blockFetchingListener = new BlockFetchingListener {
  override def onBlockFetchSuccess(blockId: String, buf: ManagedBuffer): Unit = 
{
// Only add the buffer to results queue if the iterator is not zombie,
// i.e. cleanup() has not been called yet.
ShuffleBlockFetcherIterator.this.synchronized {
  try {
if (!isZombie) {
  // Increment the ref count because we need to pass this to a 
different thread.
  // This needs to be released after use.
  buf.retain()
  remainingBlocks -= blockId
  results.put(new SuccessFetchResult(BlockId(blockId), address, 
sizeMap(blockId), buf,
remainingBlocks.isEmpty))
  logDebug("remainingBlocks: " + remainingBlocks)
}
  } catch {
case e : Throwable => onBlockFetchFailure(blockId, e)
  }
}
logTrace("Got remote block " + blockId + " after " + 
Utils.getUsedTimeMs(startTime))
  }

  override def onBlockFetchFailure(blockId: String, e: Throwable): Unit = {
logError(s"Failed to get block(s) from 
${req.address.host}:${req.address.port}", e)
results.put(new FailureFetchResult(BlockId(blockId), address, e))
  }
}
{code}
 

 
{code:java}
results.put(new SuccessFetchResult(BlockId(blockId), address, sizeMap(blockId), 
buf,
remainingBlocks.isEmpty)){code}
  

 

  was:
The task fetch shuffle block success, but failed onBlockFetchSuccess, lead the 
task hang long time and speculate as false.

The log is below:

18/08/08 14:55:53 INFO ShuffleBlockFetcherIterator: Started 16 remote fetches 
in 16 ms 18/08/08 14:55:53 WARN TransportChannelHandler: Exception in 
connection from /xxx.xxx.xxx.xxx:7337 java.util.NoSuchElementException: key not 
found: shuffle_8_68_113 at 
scala.collection.MapLike$class.default(MapLike.scala:228) at 
scala.collection.AbstractMap.default(Map.scala:59) at 
scala.collection.MapLike$class.apply(MapLike.scala:141) at 
scala.collection.AbstractMap.apply(Map.scala:59) at 
org.apache.spark.storage.ShuffleBlockFetcherIterator$$anon$1.onBlockFetchSuccess(ShuffleBlockFetcherIterator.scala:217)
 at 
org.apache.spark.network.shuffle.RetryingBlockFetcher$RetryingBlockFetchListener.onBlockFetchSuccess(RetryingBlockFetcher.java:204)
 at 
org.apache.spark.network.shuffle.OneForOneBlockFetcher$ChunkCallback.onSuccess(OneForOneBlockFetcher.java:97)
 at 
org.apache.spark.network.client.TransportResponseHandler.handle(TransportResponseHandler.java:171)
 at 
org.apache.spark.network.server.TransportChannelHandler.channelRead(TransportChannelHandler.java:120)
 at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:292)
 at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:278)
 at 
io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:266)
 at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:292)
 at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:278)
 at 
io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
 at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:292)
 at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:278)
 at 
org.apache.spark

[jira] [Created] (SPARK-25070) BlockFetchingListener#onBlockFetchSuccess throw "java.util.NoSuchElementException: key not found: shuffle_8_68_113" on ShuffleBlockFetcherIterator caused stage hang lon

2018-08-09 Thread DENG FEI (JIRA)
DENG FEI created SPARK-25070:


 Summary: BlockFetchingListener#onBlockFetchSuccess throw 
"java.util.NoSuchElementException: key not found: shuffle_8_68_113" on  
ShuffleBlockFetcherIterator caused stage hang long time
 Key: SPARK-25070
 URL: https://issues.apache.org/jira/browse/SPARK-25070
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 2.2.0
Reporter: DENG FEI


The task fetch shuffle block success, but failed onBlockFetchSuccess, lead the 
task hang long time and speculate as false.

The log is below:

18/08/08 14:55:53 INFO ShuffleBlockFetcherIterator: Started 16 remote fetches 
in 16 ms 18/08/08 14:55:53 WARN TransportChannelHandler: Exception in 
connection from /xxx.xxx.xxx.xxx:7337 java.util.NoSuchElementException: key not 
found: shuffle_8_68_113 at 
scala.collection.MapLike$class.default(MapLike.scala:228) at 
scala.collection.AbstractMap.default(Map.scala:59) at 
scala.collection.MapLike$class.apply(MapLike.scala:141) at 
scala.collection.AbstractMap.apply(Map.scala:59) at 
org.apache.spark.storage.ShuffleBlockFetcherIterator$$anon$1.onBlockFetchSuccess(ShuffleBlockFetcherIterator.scala:217)
 at 
org.apache.spark.network.shuffle.RetryingBlockFetcher$RetryingBlockFetchListener.onBlockFetchSuccess(RetryingBlockFetcher.java:204)
 at 
org.apache.spark.network.shuffle.OneForOneBlockFetcher$ChunkCallback.onSuccess(OneForOneBlockFetcher.java:97)
 at 
org.apache.spark.network.client.TransportResponseHandler.handle(TransportResponseHandler.java:171)
 at 
org.apache.spark.network.server.TransportChannelHandler.channelRead(TransportChannelHandler.java:120)
 at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:292)
 at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:278)
 at 
io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:266)
 at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:292)
 at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:278)
 at 
io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
 at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:292)
 at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:278)
 at 
org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:85)
 at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:292)
 at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:278)
 at 
io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:962)
 at 
io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
 at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:528) 
at 
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:485)
 at 
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:399) at 
io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:371) at 
io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:112)
 at 
io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137)
 at java.lang.Thread.run(Thread.java:745) 18/08/08 14:55:53 INFO Executor: 
Finished task 44.0 in stage 14.0 (TID 1483). 3458 bytes result sent to driver 
18/08/09 10:02:32 INFO Executor: Executor is trying to kill task 113.0 in stage 
14.0 (TID 1552), reason: stage cancelled
{code:java}
val blockFetchingListener = new BlockFetchingListener {
  override def onBlockFetchSuccess(blockId: String, buf: ManagedBuffer): Unit = 
{
// Only add the buffer to results queue if the iterator is not zombie,
// i.e. cleanup() has not been called yet.
ShuffleBlockFetcherIterator.this.synchronized {
  try {
if (!isZombie) {
  // Increment the ref count because we need to pass this to a 
different thread.
  // This needs to be released after use.
  buf.retain()
  remainingBlocks -= blockId
  results.put(new SuccessFetchResult(BlockId(blockId), address, 
sizeMap(blockId), buf,
remainingBlocks.isEmpty))
  logDebug("remainingBlocks: " + remainingBlocks)
}
  } catch {
case e : Throwable => onBlockFetchFailure(blockId, e)
  }
}
logTrace("Got remote block " + blockId + " after " + 
Utils.getUsedTimeMs(startTime))
  }

  override def onBlockFetchFailure(blockId: String, e: Throwable): Unit = {
logError(s"Failed to get block(s) from 
${req.address.host}:${req.address.port}"

[jira] [Resolved] (SPARK-25055) MessageWithHeader transfer ByteBuffer from Netty's CompositeByteBuf many times

2018-08-08 Thread DENG FEI (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-25055?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

DENG FEI resolved SPARK-25055.
--
Resolution: Duplicate

> MessageWithHeader transfer ByteBuffer from Netty's CompositeByteBuf many times
> --
>
> Key: SPARK-25055
> URL: https://issues.apache.org/jira/browse/SPARK-25055
> Project: Spark
>  Issue Type: Bug
>  Components: Block Manager
>Affects Versions: 2.3.1
>Reporter: DENG FEI
>Priority: Major
>
> MessageWithHeader transfer header and body if they are ByteBuf, in the case 
> of  fetch remote big block with greater than 'NIO_BUFFER_LIMIT', because of 
> ChunkedByteBuffer#toNetty avoid consolidate, CompositeByteBuf.nioByteBuf will 
> allocate from heap every times,  lead to fetch timeout.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-25055) MessageWithHeader transfer ByteBuffer from Netty's CompositeByteBuf many times

2018-08-08 Thread DENG FEI (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-25055?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16573154#comment-16573154
 ] 

DENG FEI commented on SPARK-25055:
--

{code:java}
private int copyByteBuf(ByteBuf buf, WritableByteChannel target) throws 
IOException {
 ByteBuffer buffer = buf.nioBuffer();
 int totalWritten = 0;
 while(buffer.remaining() > 0) {
 int written = (buffer.remaining() <= NIO_BUFFER_LIMIT) ?
 target.write(buffer) : writeNioBuffer(target, buffer);
 buf.skipBytes(written);
 totalWritten += written;
 }
 return totalWritten;
}{code}

> MessageWithHeader transfer ByteBuffer from Netty's CompositeByteBuf many times
> --
>
> Key: SPARK-25055
> URL: https://issues.apache.org/jira/browse/SPARK-25055
> Project: Spark
>  Issue Type: Bug
>  Components: Block Manager
>Affects Versions: 2.3.1
>Reporter: DENG FEI
>Priority: Major
>
> MessageWithHeader transfer header and body if they are ByteBuf, in the case 
> of  fetch remote big block with greater than 'NIO_BUFFER_LIMIT', because of 
> ChunkedByteBuffer#toNetty avoid consolidate, CompositeByteBuf.nioByteBuf will 
> allocate from heap every times,  lead to fetch timeout.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-25055) MessageWithHeader transfer ByteBuffer from Netty's CompositeByteBuf many times

2018-08-08 Thread DENG FEI (JIRA)
DENG FEI created SPARK-25055:


 Summary: MessageWithHeader transfer ByteBuffer from Netty's 
CompositeByteBuf many times
 Key: SPARK-25055
 URL: https://issues.apache.org/jira/browse/SPARK-25055
 Project: Spark
  Issue Type: Bug
  Components: Block Manager
Affects Versions: 2.3.1
Reporter: DENG FEI


MessageWithHeader transfer header and body if they are ByteBuf, in the case of  
fetch remote big block with greater than 'NIO_BUFFER_LIMIT', because of 
ChunkedByteBuffer#toNetty avoid consolidate, CompositeByteBuf.nioByteBuf will 
allocate from heap every times,  lead to fetch timeout.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-23139) Read eventLog file with mixed encodings

2018-02-08 Thread DENG FEI (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23139?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16356646#comment-16356646
 ] 

DENG FEI commented on SPARK-23139:
--

[~jiangxb1987] such as SparkListenerTaskEnd#TaskFailedReason#toErrorString, 
user app can throw customized message, and will encoding by _file.encoding_ or 
node env.

[~vanzin] ASSII is all charsets's base, i think it's enough for user to see the 
app history what's happen.

> Read eventLog file with mixed encodings
> ---
>
> Key: SPARK-23139
> URL: https://issues.apache.org/jira/browse/SPARK-23139
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.2.0
>Reporter: DENG FEI
>Priority: Major
>
> EventLog may contain mixed encodings such as custom exception message, but 
> caused to replay failure.
> java.nio.charset.MalformedInputException: Input length = 1
> at java.nio.charset.CoderResult.throwException(CoderResult.java:281)
>  at sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:339)
>  at sun.nio.cs.StreamDecoder.read(StreamDecoder.java:178)
>  at java.io.InputStreamReader.read(InputStreamReader.java:184)
>  at java.io.BufferedReader.fill(BufferedReader.java:161)
>  at java.io.BufferedReader.readLine(BufferedReader.java:324)
>  at java.io.BufferedReader.readLine(BufferedReader.java:389)
>  at 
> scala.io.BufferedSource$BufferedLineIterator.hasNext(BufferedSource.scala:72)
>  at scala.collection.Iterator$$anon$21.hasNext(Iterator.scala:836)
>  at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:461)
>  at 
> org.apache.spark.scheduler.ReplayListenerBus.replay(ReplayListenerBus.scala:78)
>  at 
> org.apache.spark.scheduler.ReplayListenerBus.replay(ReplayListenerBus.scala:58)
>  at 
> org.apache.spark.deploy.history.FsHistoryProvider.org$apache$spark$deploy$history$FsHistoryProvider$$replay(FsHistoryProvider.scala:694)
>  at 
> org.apache.spark.deploy.history.FsHistoryProvider.org$apache$spark$deploy$history$FsHistoryProvider$$mergeApplicationListing(FsHistoryProvider.scala:507)
>  at 
> org.apache.spark.deploy.history.FsHistoryProvider$$anonfun$checkForLogs$4$$anon$4.run(FsHistoryProvider.scala:399)
>  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>  at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>  at java.lang.Thread.run(Thread.java:745)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-23139) Read eventLog file with mixed encodings

2018-02-07 Thread DENG FEI (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23139?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16356428#comment-16356428
 ] 

DENG FEI commented on SPARK-23139:
--

_ASSII_ is enough to spark event log.

And if forcing writing with UTF-8, should also forcing reading with UTF-8 too.

> Read eventLog file with mixed encodings
> ---
>
> Key: SPARK-23139
> URL: https://issues.apache.org/jira/browse/SPARK-23139
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.2.0
>Reporter: DENG FEI
>Priority: Major
>
> EventLog may contain mixed encodings such as custom exception message, but 
> caused to replay failure.
> java.nio.charset.MalformedInputException: Input length = 1
> at java.nio.charset.CoderResult.throwException(CoderResult.java:281)
>  at sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:339)
>  at sun.nio.cs.StreamDecoder.read(StreamDecoder.java:178)
>  at java.io.InputStreamReader.read(InputStreamReader.java:184)
>  at java.io.BufferedReader.fill(BufferedReader.java:161)
>  at java.io.BufferedReader.readLine(BufferedReader.java:324)
>  at java.io.BufferedReader.readLine(BufferedReader.java:389)
>  at 
> scala.io.BufferedSource$BufferedLineIterator.hasNext(BufferedSource.scala:72)
>  at scala.collection.Iterator$$anon$21.hasNext(Iterator.scala:836)
>  at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:461)
>  at 
> org.apache.spark.scheduler.ReplayListenerBus.replay(ReplayListenerBus.scala:78)
>  at 
> org.apache.spark.scheduler.ReplayListenerBus.replay(ReplayListenerBus.scala:58)
>  at 
> org.apache.spark.deploy.history.FsHistoryProvider.org$apache$spark$deploy$history$FsHistoryProvider$$replay(FsHistoryProvider.scala:694)
>  at 
> org.apache.spark.deploy.history.FsHistoryProvider.org$apache$spark$deploy$history$FsHistoryProvider$$mergeApplicationListing(FsHistoryProvider.scala:507)
>  at 
> org.apache.spark.deploy.history.FsHistoryProvider$$anonfun$checkForLogs$4$$anon$4.run(FsHistoryProvider.scala:399)
>  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>  at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>  at java.lang.Thread.run(Thread.java:745)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-23139) Read eventLog file with mixed encodings

2018-02-06 Thread DENG FEI (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23139?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16355022#comment-16355022
 ] 

DENG FEI edited comment on SPARK-23139 at 2/7/18 6:34 AM:
--

[~irashid] 

You're right, but one can change the default character set in _'spark.driver / 
executor.extraJavaOptions'_ by setting _'-Dfile.encoding = ***'._

This should not be limiting.


was (Author: deng fei):
[~irashid] 

You're right, but one can change the default character set in _'spark.driver / 
executor.extraJavaOptions'_ by setting _'-Dfile.encoding = ***'._

__This should not be limiting.

> Read eventLog file with mixed encodings
> ---
>
> Key: SPARK-23139
> URL: https://issues.apache.org/jira/browse/SPARK-23139
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.2.0
>Reporter: DENG FEI
>Priority: Major
>
> EventLog may contain mixed encodings such as custom exception message, but 
> caused to replay failure.
> java.nio.charset.MalformedInputException: Input length = 1
> at java.nio.charset.CoderResult.throwException(CoderResult.java:281)
>  at sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:339)
>  at sun.nio.cs.StreamDecoder.read(StreamDecoder.java:178)
>  at java.io.InputStreamReader.read(InputStreamReader.java:184)
>  at java.io.BufferedReader.fill(BufferedReader.java:161)
>  at java.io.BufferedReader.readLine(BufferedReader.java:324)
>  at java.io.BufferedReader.readLine(BufferedReader.java:389)
>  at 
> scala.io.BufferedSource$BufferedLineIterator.hasNext(BufferedSource.scala:72)
>  at scala.collection.Iterator$$anon$21.hasNext(Iterator.scala:836)
>  at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:461)
>  at 
> org.apache.spark.scheduler.ReplayListenerBus.replay(ReplayListenerBus.scala:78)
>  at 
> org.apache.spark.scheduler.ReplayListenerBus.replay(ReplayListenerBus.scala:58)
>  at 
> org.apache.spark.deploy.history.FsHistoryProvider.org$apache$spark$deploy$history$FsHistoryProvider$$replay(FsHistoryProvider.scala:694)
>  at 
> org.apache.spark.deploy.history.FsHistoryProvider.org$apache$spark$deploy$history$FsHistoryProvider$$mergeApplicationListing(FsHistoryProvider.scala:507)
>  at 
> org.apache.spark.deploy.history.FsHistoryProvider$$anonfun$checkForLogs$4$$anon$4.run(FsHistoryProvider.scala:399)
>  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>  at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>  at java.lang.Thread.run(Thread.java:745)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-23139) Read eventLog file with mixed encodings

2018-02-06 Thread DENG FEI (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23139?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16355022#comment-16355022
 ] 

DENG FEI commented on SPARK-23139:
--

[~irashid] 

You're right, but one can change the default character set in _'spark.driver / 
executor.extraJavaOptions'_ by setting _'-Dfile.encoding = ***'._

__This should not be limiting.

> Read eventLog file with mixed encodings
> ---
>
> Key: SPARK-23139
> URL: https://issues.apache.org/jira/browse/SPARK-23139
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 2.2.0
>Reporter: DENG FEI
>Priority: Major
>
> EventLog may contain mixed encodings such as custom exception message, but 
> caused to replay failure.
> java.nio.charset.MalformedInputException: Input length = 1
> at java.nio.charset.CoderResult.throwException(CoderResult.java:281)
>  at sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:339)
>  at sun.nio.cs.StreamDecoder.read(StreamDecoder.java:178)
>  at java.io.InputStreamReader.read(InputStreamReader.java:184)
>  at java.io.BufferedReader.fill(BufferedReader.java:161)
>  at java.io.BufferedReader.readLine(BufferedReader.java:324)
>  at java.io.BufferedReader.readLine(BufferedReader.java:389)
>  at 
> scala.io.BufferedSource$BufferedLineIterator.hasNext(BufferedSource.scala:72)
>  at scala.collection.Iterator$$anon$21.hasNext(Iterator.scala:836)
>  at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:461)
>  at 
> org.apache.spark.scheduler.ReplayListenerBus.replay(ReplayListenerBus.scala:78)
>  at 
> org.apache.spark.scheduler.ReplayListenerBus.replay(ReplayListenerBus.scala:58)
>  at 
> org.apache.spark.deploy.history.FsHistoryProvider.org$apache$spark$deploy$history$FsHistoryProvider$$replay(FsHistoryProvider.scala:694)
>  at 
> org.apache.spark.deploy.history.FsHistoryProvider.org$apache$spark$deploy$history$FsHistoryProvider$$mergeApplicationListing(FsHistoryProvider.scala:507)
>  at 
> org.apache.spark.deploy.history.FsHistoryProvider$$anonfun$checkForLogs$4$$anon$4.run(FsHistoryProvider.scala:399)
>  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>  at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>  at java.lang.Thread.run(Thread.java:745)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-23139) Read eventLog file with mixed encodings

2018-01-17 Thread DENG FEI (JIRA)
DENG FEI created SPARK-23139:


 Summary: Read eventLog file with mixed encodings
 Key: SPARK-23139
 URL: https://issues.apache.org/jira/browse/SPARK-23139
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 2.2.0
Reporter: DENG FEI


EventLog may contain mixed encodings such as custom exception message, but 
caused to replay failure.

java.nio.charset.MalformedInputException: Input length = 1

at java.nio.charset.CoderResult.throwException(CoderResult.java:281)
 at sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:339)
 at sun.nio.cs.StreamDecoder.read(StreamDecoder.java:178)
 at java.io.InputStreamReader.read(InputStreamReader.java:184)
 at java.io.BufferedReader.fill(BufferedReader.java:161)
 at java.io.BufferedReader.readLine(BufferedReader.java:324)
 at java.io.BufferedReader.readLine(BufferedReader.java:389)
 at 
scala.io.BufferedSource$BufferedLineIterator.hasNext(BufferedSource.scala:72)
 at scala.collection.Iterator$$anon$21.hasNext(Iterator.scala:836)
 at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:461)
 at 
org.apache.spark.scheduler.ReplayListenerBus.replay(ReplayListenerBus.scala:78)
 at 
org.apache.spark.scheduler.ReplayListenerBus.replay(ReplayListenerBus.scala:58)
 at 
org.apache.spark.deploy.history.FsHistoryProvider.org$apache$spark$deploy$history$FsHistoryProvider$$replay(FsHistoryProvider.scala:694)
 at 
org.apache.spark.deploy.history.FsHistoryProvider.org$apache$spark$deploy$history$FsHistoryProvider$$mergeApplicationListing(FsHistoryProvider.scala:507)
 at 
org.apache.spark.deploy.history.FsHistoryProvider$$anonfun$checkForLogs$4$$anon$4.run(FsHistoryProvider.scala:399)
 at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
 at java.util.concurrent.FutureTask.run(FutureTask.java:266)
 at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
 at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
 at java.lang.Thread.run(Thread.java:745)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org