[jira] [Updated] (SPARK-25429) SparkListenerBus inefficient due to 'LiveStageMetrics#accumulatorIds:Array[Long]' data structure
[ https://issues.apache.org/jira/browse/SPARK-25429?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] DENG FEI updated SPARK-25429: - Description: {code:java} private def updateStageMetrics( stageId: Int, attemptId: Int, taskId: Long, accumUpdates: Seq[AccumulableInfo], succeeded: Boolean): Unit = { Option(stageMetrics.get(stageId)).foreach { metrics => if (metrics.attemptId != attemptId || metrics.accumulatorIds.isEmpty) { return } val oldTaskMetrics = metrics.taskMetrics.get(taskId) if (oldTaskMetrics != null && oldTaskMetrics.succeeded) { return } val updates = accumUpdates .filter { acc => acc.update.isDefined && metrics.accumulatorIds.contains(acc.id) } .sortBy(_.id) if (updates.isEmpty) { return } val ids = new Array[Long](updates.size) val values = new Array[Long](updates.size) updates.zipWithIndex.foreach { case (acc, idx) => ids(idx) = acc.id // In a live application, accumulators have Long values, but when reading from event // logs, they have String values. For now, assume all accumulators are Long and covert // accordingly. values(idx) = acc.update.get match { case s: String => s.toLong case l: Long => l case o => throw new IllegalArgumentException(s"Unexpected: $o") } } // TODO: storing metrics by task ID can cause metrics for the same task index to be // counted multiple times, for example due to speculation or re-attempts. metrics.taskMetrics.put(taskId, new LiveTaskMetrics(ids, values, succeeded)) } } {code} 'metrics.accumulatorIds.contains(acc.id)', if large SQL application generated many accumulator, it's inefficient use Arrray#contains. Actually, application may timeout while quit and will killed by RM on YARN mode. was: {code:java} private def updateStageMetrics( stageId: Int, attemptId: Int, taskId: Long, accumUpdates: Seq[AccumulableInfo], succeeded: Boolean): Unit = { Option(stageMetrics.get(stageId)).foreach { metrics => if (metrics.attemptId != attemptId || metrics.accumulatorIds.isEmpty) { return } val oldTaskMetrics = metrics.taskMetrics.get(taskId) if (oldTaskMetrics != null && oldTaskMetrics.succeeded) { return } val updates = accumUpdates .filter { acc => acc.update.isDefined && metrics.accumulatorIds.contains(acc.id) } .sortBy(_.id) if (updates.isEmpty) { return } val ids = new Array[Long](updates.size) val values = new Array[Long](updates.size) updates.zipWithIndex.foreach { case (acc, idx) => ids(idx) = acc.id // In a live application, accumulators have Long values, but when reading from event // logs, they have String values. For now, assume all accumulators are Long and covert // accordingly. values(idx) = acc.update.get match { case s: String => s.toLong case l: Long => l case o => throw new IllegalArgumentException(s"Unexpected: $o") } } // TODO: storing metrics by task ID can cause metrics for the same task index to be // counted multiple times, for example due to speculation or re-attempts. metrics.taskMetrics.put(taskId, new LiveTaskMetrics(ids, values, succeeded)) } } {code} 'metrics.accumulatorIds.contains(acc.id)', if large SQL application generated many accumulator, > SparkListenerBus inefficient due to > 'LiveStageMetrics#accumulatorIds:Array[Long]' data structure > > > Key: SPARK-25429 > URL: https://issues.apache.org/jira/browse/SPARK-25429 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 2.3.1 >Reporter: DENG FEI >Priority: Major > > {code:java} > private def updateStageMetrics( > stageId: Int, > attemptId: Int, > taskId: Long, > accumUpdates: Seq[AccumulableInfo], > succeeded: Boolean): Unit = { > Option(stageMetrics.get(stageId)).foreach { metrics => > if (metrics.attemptId != attemptId || metrics.accumulatorIds.isEmpty) { > return > } > val oldTaskMetrics = metrics.taskMetrics.get(taskId) > if (oldTaskMetrics != null && oldTaskMetrics.succeeded) { > return > } > val updates = accumUpdates > .filter { acc => acc.update.isDefined && > metrics.accumulatorIds.contains(acc.id) } > .sortBy(_.id) > if (updates.isEmpty) { > return > } > val ids = new Array[Long](updates.size) > val values = new Array[Long](updates.size) > up
[jira] [Updated] (SPARK-25429) SparkListenerBus inefficient due to 'LiveStageMetrics#accumulatorIds:Array[Long]' data structure
[ https://issues.apache.org/jira/browse/SPARK-25429?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] DENG FEI updated SPARK-25429: - Description: {code:java} private def updateStageMetrics( stageId: Int, attemptId: Int, taskId: Long, accumUpdates: Seq[AccumulableInfo], succeeded: Boolean): Unit = { Option(stageMetrics.get(stageId)).foreach { metrics => if (metrics.attemptId != attemptId || metrics.accumulatorIds.isEmpty) { return } val oldTaskMetrics = metrics.taskMetrics.get(taskId) if (oldTaskMetrics != null && oldTaskMetrics.succeeded) { return } val updates = accumUpdates .filter { acc => acc.update.isDefined && metrics.accumulatorIds.contains(acc.id) } .sortBy(_.id) if (updates.isEmpty) { return } val ids = new Array[Long](updates.size) val values = new Array[Long](updates.size) updates.zipWithIndex.foreach { case (acc, idx) => ids(idx) = acc.id // In a live application, accumulators have Long values, but when reading from event // logs, they have String values. For now, assume all accumulators are Long and covert // accordingly. values(idx) = acc.update.get match { case s: String => s.toLong case l: Long => l case o => throw new IllegalArgumentException(s"Unexpected: $o") } } // TODO: storing metrics by task ID can cause metrics for the same task index to be // counted multiple times, for example due to speculation or re-attempts. metrics.taskMetrics.put(taskId, new LiveTaskMetrics(ids, values, succeeded)) } } {code} 'metrics.accumulatorIds.contains(acc.id)', if large SQL application generated many accumulator, > SparkListenerBus inefficient due to > 'LiveStageMetrics#accumulatorIds:Array[Long]' data structure > > > Key: SPARK-25429 > URL: https://issues.apache.org/jira/browse/SPARK-25429 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 2.3.1 >Reporter: DENG FEI >Priority: Major > > {code:java} > private def updateStageMetrics( > stageId: Int, > attemptId: Int, > taskId: Long, > accumUpdates: Seq[AccumulableInfo], > succeeded: Boolean): Unit = { > Option(stageMetrics.get(stageId)).foreach { metrics => > if (metrics.attemptId != attemptId || metrics.accumulatorIds.isEmpty) { > return > } > val oldTaskMetrics = metrics.taskMetrics.get(taskId) > if (oldTaskMetrics != null && oldTaskMetrics.succeeded) { > return > } > val updates = accumUpdates > .filter { acc => acc.update.isDefined && > metrics.accumulatorIds.contains(acc.id) } > .sortBy(_.id) > if (updates.isEmpty) { > return > } > val ids = new Array[Long](updates.size) > val values = new Array[Long](updates.size) > updates.zipWithIndex.foreach { case (acc, idx) => > ids(idx) = acc.id > // In a live application, accumulators have Long values, but when > reading from event > // logs, they have String values. For now, assume all accumulators > are Long and covert > // accordingly. > values(idx) = acc.update.get match { > case s: String => s.toLong > case l: Long => l > case o => throw new IllegalArgumentException(s"Unexpected: $o") > } > } > // TODO: storing metrics by task ID can cause metrics for the same task > index to be > // counted multiple times, for example due to speculation or > re-attempts. > metrics.taskMetrics.put(taskId, new LiveTaskMetrics(ids, values, > succeeded)) > } > } > {code} > 'metrics.accumulatorIds.contains(acc.id)', if large SQL application generated > many accumulator, -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-25429) SparkListenerBus inefficient due to 'LiveStageMetrics#accumulatorIds:Array[Long]' data structure
DENG FEI created SPARK-25429: Summary: SparkListenerBus inefficient due to 'LiveStageMetrics#accumulatorIds:Array[Long]' data structure Key: SPARK-25429 URL: https://issues.apache.org/jira/browse/SPARK-25429 Project: Spark Issue Type: Improvement Components: Spark Core Affects Versions: 2.3.1 Reporter: DENG FEI -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-25139) PythonRunner#WriterThread released block after TaskRunner finally block which invoke BlockManager#releaseAllLocksForTask
[ https://issues.apache.org/jira/browse/SPARK-25139?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] DENG FEI updated SPARK-25139: - Description: We run pyspark streaming on YARN, the executor will die caused by the error: the task released lock while finished, but the python writer haven't do real releasing lock. Normally the task just double check the lock, but it ran wrong in front. The executor trace log is below: 18/08/17 13:52:20 Executor task launch worker for task 137 DEBUG BlockManager: Getting local block input-0-1534485138800 18/08/17 13:52:20 Executor task launch worker for task 137 TRACE BlockInfoManager: Task 137 trying to acquire read lock for input-0-1534485138800 18/08/17 13:52:20 Executor task launch worker for task 137 TRACE BlockInfoManager: Task 137 acquired read lock for input-0-1534485138800 18/08/17 13:52:20 Executor task launch worker for task 137 DEBUG BlockManager: Level for block input-0-1534485138800 is StorageLevel(disk, memory, 1 replicas) 18/08/17 13:52:20 Executor task launch worker for task 137 INFO BlockManager: Found block input-0-1534485138800 locally 18/08/17 13:52:20 Executor task launch worker for task 137 INFO PythonRunner: Times: total = 8, boot = 3, init = 5, finish = 0 18/08/17 13:52:20 stdout writer for python TRACE BlockInfoManager: Task 137 releasing lock for input-0-1534485138800 18/08/17 13:52:20 Executor task launch worker for task 137 INFO Executor: 1 block locks were not released by TID = 137: [input-0-1534485138800] 18/08/17 13:52:20 stdout writer for python ERROR Utils: Uncaught exception in thread stdout writer for python java.lang.AssertionError: assertion failed: Block input-0-1534485138800 is not locked for reading at scala.Predef$.assert(Predef.scala:170) at org.apache.spark.storage.BlockInfoManager.unlock(BlockInfoManager.scala:299) at org.apache.spark.storage.BlockManager.releaseLock(BlockManager.scala:769) at org.apache.spark.storage.BlockManager$$anonfun$1.apply$mcV$sp(BlockManager.scala:540) at org.apache.spark.util.CompletionIterator$$anon$1.completion(CompletionIterator.scala:44) at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:33) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at scala.collection.Iterator$class.foreach(Iterator.scala:893) at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:213) at org.apache.spark.api.python.PythonRunner$$anon$2.writeIteratorToStream(PythonRunner.scala:407) at org.apache.spark.api.python.BasePythonRunner$WriterThread$$anonfun$run$1.apply(PythonRunner.scala:215) at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1991) at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:170) 18/08/17 13:52:20 stdout writer for python ERROR SparkUncaughtExceptionHandler: Uncaught exception in thread Thread[stdout writer for python,5,main] I think shoud wait WriterThread after Task#run. was: We run pyspark streaming on YARN, the executor will die caused by the error. The Task released lock if finished, but the python writer haven't do real releasing lock.Normally the task just double check the lock, but it ran wrong in front. The executor trace log is below: 18/08/17 13:52:20 Executor task launch worker for task 137 DEBUG BlockManager: Getting local block input-0-1534485138800 18/08/17 13:52:20 Executor task launch worker for task 137 TRACE BlockInfoManager: Task 137 trying to acquire read lock for input-0-1534485138800 18/08/17 13:52:20 Executor task launch worker for task 137 TRACE BlockInfoManager: Task 137 acquired read lock for input-0-1534485138800 18/08/17 13:52:20 Executor task launch worker for task 137 DEBUG BlockManager: Level for block input-0-1534485138800 is StorageLevel(disk, memory, 1 replicas) 18/08/17 13:52:20 Executor task launch worker for task 137 INFO BlockManager: Found block input-0-1534485138800 locally 18/08/17 13:52:20 Executor task launch worker for task 137 INFO PythonRunner: Times: total = 8, boot = 3, init = 5, finish = 0 18/08/17 13:52:20 stdout writer for python TRACE BlockInfoManager: Task 137 releasing lock for input-0-1534485138800 18/08/17 13:52:20 Executor task launch worker for task 137 INFO Executor: 1 block locks were not released by TID = 137: [input-0-1534485138800] 18/08/17 13:52:20 stdout writer for python ERROR Utils: Uncaught exception in thread stdout writer for python java.lang.AssertionError: assertion failed: Block input-0-1534485138800 is not locked for reading at scala.Predef$.assert(Predef.scala:170) at org.apache.spark.storage.BlockInfoManager.unlock(BlockInfoManager.scala:299) at org.apache.spark.storage.BlockManager.releaseLock(BlockManager.scala:769) at org.apache.spark.storage.BlockManager$$anonfun$1.apply$mcV$sp(BlockManager.scala:540) at org.apache
[jira] [Updated] (SPARK-25139) PythonRunner#WriterThread released block after TaskRunner finally block which invoke BlockManager#releaseAllLocksForTask
[ https://issues.apache.org/jira/browse/SPARK-25139?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] DENG FEI updated SPARK-25139: - Description: We run pyspark streaming on YARN, the executor will die caused by the error. The Task released lock if finished, but the python writer haven't do real releasing lock.Normally the task just double check the lock, but it ran wrong in front. The executor trace log is below: 18/08/17 13:52:20 Executor task launch worker for task 137 DEBUG BlockManager: Getting local block input-0-1534485138800 18/08/17 13:52:20 Executor task launch worker for task 137 TRACE BlockInfoManager: Task 137 trying to acquire read lock for input-0-1534485138800 18/08/17 13:52:20 Executor task launch worker for task 137 TRACE BlockInfoManager: Task 137 acquired read lock for input-0-1534485138800 18/08/17 13:52:20 Executor task launch worker for task 137 DEBUG BlockManager: Level for block input-0-1534485138800 is StorageLevel(disk, memory, 1 replicas) 18/08/17 13:52:20 Executor task launch worker for task 137 INFO BlockManager: Found block input-0-1534485138800 locally 18/08/17 13:52:20 Executor task launch worker for task 137 INFO PythonRunner: Times: total = 8, boot = 3, init = 5, finish = 0 18/08/17 13:52:20 stdout writer for python TRACE BlockInfoManager: Task 137 releasing lock for input-0-1534485138800 18/08/17 13:52:20 Executor task launch worker for task 137 INFO Executor: 1 block locks were not released by TID = 137: [input-0-1534485138800] 18/08/17 13:52:20 stdout writer for python ERROR Utils: Uncaught exception in thread stdout writer for python java.lang.AssertionError: assertion failed: Block input-0-1534485138800 is not locked for reading at scala.Predef$.assert(Predef.scala:170) at org.apache.spark.storage.BlockInfoManager.unlock(BlockInfoManager.scala:299) at org.apache.spark.storage.BlockManager.releaseLock(BlockManager.scala:769) at org.apache.spark.storage.BlockManager$$anonfun$1.apply$mcV$sp(BlockManager.scala:540) at org.apache.spark.util.CompletionIterator$$anon$1.completion(CompletionIterator.scala:44) at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:33) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at scala.collection.Iterator$class.foreach(Iterator.scala:893) at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:213) at org.apache.spark.api.python.PythonRunner$$anon$2.writeIteratorToStream(PythonRunner.scala:407) at org.apache.spark.api.python.BasePythonRunner$WriterThread$$anonfun$run$1.apply(PythonRunner.scala:215) at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1991) at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:170) 18/08/17 13:52:20 stdout writer for python ERROR SparkUncaughtExceptionHandler: Uncaught exception in thread Thread[stdout writer for python,5,main] I think shoud wait WriterThread after Task#run. was: We run pyspark streaming on YARN, the executor will die caused by the error. The Task released lock if finished, but the The executor trace log is below: 18/08/17 13:52:20 Executor task launch worker for task 137 DEBUG BlockManager: Getting local block input-0-1534485138800 18/08/17 13:52:20 Executor task launch worker for task 137 TRACE BlockInfoManager: Task 137 trying to acquire read lock for input-0-1534485138800 18/08/17 13:52:20 Executor task launch worker for task 137 TRACE BlockInfoManager: Task 137 acquired read lock for input-0-1534485138800 18/08/17 13:52:20 Executor task launch worker for task 137 DEBUG BlockManager: Level for block input-0-1534485138800 is StorageLevel(disk, memory, 1 replicas) 18/08/17 13:52:20 Executor task launch worker for task 137 INFO BlockManager: Found block input-0-1534485138800 locally 18/08/17 13:52:20 Executor task launch worker for task 137 INFO PythonRunner: Times: total = 8, boot = 3, init = 5, finish = 0 18/08/17 13:52:20 stdout writer for python TRACE BlockInfoManager: Task 137 releasing lock for input-0-1534485138800 18/08/17 13:52:20 Executor task launch worker for task 137 INFO Executor: 1 block locks were not released by TID = 137: [input-0-1534485138800] 18/08/17 13:52:20 stdout writer for python ERROR Utils: Uncaught exception in thread stdout writer for python java.lang.AssertionError: assertion failed: Block input-0-1534485138800 is not locked for reading at scala.Predef$.assert(Predef.scala:170) at org.apache.spark.storage.BlockInfoManager.unlock(BlockInfoManager.scala:299) at org.apache.spark.storage.BlockManager.releaseLock(BlockManager.scala:769) at org.apache.spark.storage.BlockManager$$anonfun$1.apply$mcV$sp(BlockManager.scala:540) at org.apache.spark.util.CompletionIterator$$anon$1.completion(CompletionIterator.scala:44) at org.apache.spark.util.CompletionIterato
[jira] [Updated] (SPARK-25139) PythonRunner#WriterThread released block after TaskRunner finally block which invoke BlockManager#releaseAllLocksForTask
[ https://issues.apache.org/jira/browse/SPARK-25139?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] DENG FEI updated SPARK-25139: - Description: We run pyspark streaming on YARN, the executor will die caused by the error. The Task released lock if finished, but the The executor trace log is below: 18/08/17 13:52:20 Executor task launch worker for task 137 DEBUG BlockManager: Getting local block input-0-1534485138800 18/08/17 13:52:20 Executor task launch worker for task 137 TRACE BlockInfoManager: Task 137 trying to acquire read lock for input-0-1534485138800 18/08/17 13:52:20 Executor task launch worker for task 137 TRACE BlockInfoManager: Task 137 acquired read lock for input-0-1534485138800 18/08/17 13:52:20 Executor task launch worker for task 137 DEBUG BlockManager: Level for block input-0-1534485138800 is StorageLevel(disk, memory, 1 replicas) 18/08/17 13:52:20 Executor task launch worker for task 137 INFO BlockManager: Found block input-0-1534485138800 locally 18/08/17 13:52:20 Executor task launch worker for task 137 INFO PythonRunner: Times: total = 8, boot = 3, init = 5, finish = 0 18/08/17 13:52:20 stdout writer for python TRACE BlockInfoManager: Task 137 releasing lock for input-0-1534485138800 18/08/17 13:52:20 Executor task launch worker for task 137 INFO Executor: 1 block locks were not released by TID = 137: [input-0-1534485138800] 18/08/17 13:52:20 stdout writer for python ERROR Utils: Uncaught exception in thread stdout writer for python java.lang.AssertionError: assertion failed: Block input-0-1534485138800 is not locked for reading at scala.Predef$.assert(Predef.scala:170) at org.apache.spark.storage.BlockInfoManager.unlock(BlockInfoManager.scala:299) at org.apache.spark.storage.BlockManager.releaseLock(BlockManager.scala:769) at org.apache.spark.storage.BlockManager$$anonfun$1.apply$mcV$sp(BlockManager.scala:540) at org.apache.spark.util.CompletionIterator$$anon$1.completion(CompletionIterator.scala:44) at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:33) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at scala.collection.Iterator$class.foreach(Iterator.scala:893) at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:213) at org.apache.spark.api.python.PythonRunner$$anon$2.writeIteratorToStream(PythonRunner.scala:407) at org.apache.spark.api.python.BasePythonRunner$WriterThread$$anonfun$run$1.apply(PythonRunner.scala:215) at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1991) at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:170) 18/08/17 13:52:20 stdout writer for python ERROR SparkUncaughtExceptionHandler: Uncaught exception in thread Thread[stdout writer for python,5,main] I think shoud wait WriterThread after Task#run. was: We run pyspark streaming on YARN, the executor will die caused by the error. The executor trace log is below: 18/08/17 13:52:20 Executor task launch worker for task 137 DEBUG BlockManager: Getting local block input-0-1534485138800 18/08/17 13:52:20 Executor task launch worker for task 137 TRACE BlockInfoManager: Task 137 trying to acquire read lock for input-0-1534485138800 18/08/17 13:52:20 Executor task launch worker for task 137 TRACE BlockInfoManager: Task 137 acquired read lock for input-0-1534485138800 18/08/17 13:52:20 Executor task launch worker for task 137 DEBUG BlockManager: Level for block input-0-1534485138800 is StorageLevel(disk, memory, 1 replicas) 18/08/17 13:52:20 Executor task launch worker for task 137 INFO BlockManager: Found block input-0-1534485138800 locally 18/08/17 13:52:20 Executor task launch worker for task 137 INFO PythonRunner: Times: total = 8, boot = 3, init = 5, finish = 0 18/08/17 13:52:20 stdout writer for python TRACE BlockInfoManager: Task 137 releasing lock for input-0-1534485138800 18/08/17 13:52:20 Executor task launch worker for task 137 INFO Executor: 1 block locks were not released by TID = 137: [input-0-1534485138800] 18/08/17 13:52:20 stdout writer for python ERROR Utils: Uncaught exception in thread stdout writer for python java.lang.AssertionError: assertion failed: Block input-0-1534485138800 is not locked for reading at scala.Predef$.assert(Predef.scala:170) at org.apache.spark.storage.BlockInfoManager.unlock(BlockInfoManager.scala:299) at org.apache.spark.storage.BlockManager.releaseLock(BlockManager.scala:769) at org.apache.spark.storage.BlockManager$$anonfun$1.apply$mcV$sp(BlockManager.scala:540) at org.apache.spark.util.CompletionIterator$$anon$1.completion(CompletionIterator.scala:44) at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:33) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at scala.collection.Iterator$class.foreach(Iterator.scala:893)
[jira] [Updated] (SPARK-25139) PythonRunner#WriterThread released block after TaskRunner finally block which invoke BlockManager#releaseAllLocksForTask
[ https://issues.apache.org/jira/browse/SPARK-25139?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] DENG FEI updated SPARK-25139: - Description: We run pyspark streaming on YARN, the executor will die caused by the error. The executor trace log is below: 18/08/17 13:52:20 Executor task launch worker for task 137 DEBUG BlockManager: Getting local block input-0-1534485138800 18/08/17 13:52:20 Executor task launch worker for task 137 TRACE BlockInfoManager: Task 137 trying to acquire read lock for input-0-1534485138800 18/08/17 13:52:20 Executor task launch worker for task 137 TRACE BlockInfoManager: Task 137 acquired read lock for input-0-1534485138800 18/08/17 13:52:20 Executor task launch worker for task 137 DEBUG BlockManager: Level for block input-0-1534485138800 is StorageLevel(disk, memory, 1 replicas) 18/08/17 13:52:20 Executor task launch worker for task 137 INFO BlockManager: Found block input-0-1534485138800 locally 18/08/17 13:52:20 Executor task launch worker for task 137 INFO PythonRunner: Times: total = 8, boot = 3, init = 5, finish = 0 18/08/17 13:52:20 stdout writer for python TRACE BlockInfoManager: Task 137 releasing lock for input-0-1534485138800 18/08/17 13:52:20 Executor task launch worker for task 137 INFO Executor: 1 block locks were not released by TID = 137: [input-0-1534485138800] 18/08/17 13:52:20 stdout writer for python ERROR Utils: Uncaught exception in thread stdout writer for python java.lang.AssertionError: assertion failed: Block input-0-1534485138800 is not locked for reading at scala.Predef$.assert(Predef.scala:170) at org.apache.spark.storage.BlockInfoManager.unlock(BlockInfoManager.scala:299) at org.apache.spark.storage.BlockManager.releaseLock(BlockManager.scala:769) at org.apache.spark.storage.BlockManager$$anonfun$1.apply$mcV$sp(BlockManager.scala:540) at org.apache.spark.util.CompletionIterator$$anon$1.completion(CompletionIterator.scala:44) at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:33) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at scala.collection.Iterator$class.foreach(Iterator.scala:893) at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:213) at org.apache.spark.api.python.PythonRunner$$anon$2.writeIteratorToStream(PythonRunner.scala:407) at org.apache.spark.api.python.BasePythonRunner$WriterThread$$anonfun$run$1.apply(PythonRunner.scala:215) at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1991) at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:170) 18/08/17 13:52:20 stdout writer for python ERROR SparkUncaughtExceptionHandler: Uncaught exception in thread Thread[stdout writer for python,5,main] I think shoud wait WriterThread after Task#run. was: We run pyspark streaming on YARN, the executor will die caused by the error. The executor trace log is below: {noformat} 18/08/17 13:52:20 Executor task launch worker for task 137 DEBUG BlockManager: Getting local block input-0-1534485138800 18/08/17 13:52:20 Executor task launch worker for task 137 TRACE BlockInfoManager: Task 137 trying to acquire read lock for input-0-1534485138800 18/08/17 13:52:20 Executor task launch worker for task 137 TRACE BlockInfoManager: Task 137 acquired read lock for input-0-1534485138800 18/08/17 13:52:20 Executor task launch worker for task 137 DEBUG BlockManager: Level for block input-0-1534485138800 is StorageLevel(disk, memory, 1 replicas) 18/08/17 13:52:20 Executor task launch worker for task 137 INFO BlockManager: Found block input-0-1534485138800 locally 18/08/17 13:52:20 Executor task launch worker for task 137 INFO PythonRunner: Times: total = 8, boot = 3, init = 5, finish = 0 18/08/17 13:52:20 stdout writer for python TRACE BlockInfoManager: Task 137 releasing lock for input-0-1534485138800 18/08/17 13:52:20 Executor task launch worker for task 137 INFO Executor: 1 block locks were not released by TID = 137: [input-0-1534485138800] 18/08/17 13:52:20 stdout writer for python ERROR Utils: Uncaught exception in thread stdout writer for python java.lang.AssertionError: assertion failed: Block input-0-1534485138800 is not locked for reading at scala.Predef$.assert(Predef.scala:170) at org.apache.spark.storage.BlockInfoManager.unlock(BlockInfoManager.scala:299) at org.apache.spark.storage.BlockManager.releaseLock(BlockManager.scala:769) at org.apache.spark.storage.BlockManager$$anonfun$1.apply$mcV$sp(BlockManager.scala:540) at org.apache.spark.util.CompletionIterator$$anon$1.completion(CompletionIterator.scala:44) at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:33) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at scala.collection.Iterator$class.foreach(Iterator.scala:893) at scala.collection.AbstractIterat
[jira] [Created] (SPARK-25139) PythonRunner#WriterThread released block after TaskRunner finally block which invoke BlockManager#releaseAllLocksForTask
DENG FEI created SPARK-25139: Summary: PythonRunner#WriterThread released block after TaskRunner finally block which invoke BlockManager#releaseAllLocksForTask Key: SPARK-25139 URL: https://issues.apache.org/jira/browse/SPARK-25139 Project: Spark Issue Type: Bug Components: Block Manager Affects Versions: 2.3.1 Reporter: DENG FEI We run pyspark streaming on YARN, the executor will die caused by the error. The executor trace log is below: {noformat} 18/08/17 13:52:20 Executor task launch worker for task 137 DEBUG BlockManager: Getting local block input-0-1534485138800 18/08/17 13:52:20 Executor task launch worker for task 137 TRACE BlockInfoManager: Task 137 trying to acquire read lock for input-0-1534485138800 18/08/17 13:52:20 Executor task launch worker for task 137 TRACE BlockInfoManager: Task 137 acquired read lock for input-0-1534485138800 18/08/17 13:52:20 Executor task launch worker for task 137 DEBUG BlockManager: Level for block input-0-1534485138800 is StorageLevel(disk, memory, 1 replicas) 18/08/17 13:52:20 Executor task launch worker for task 137 INFO BlockManager: Found block input-0-1534485138800 locally 18/08/17 13:52:20 Executor task launch worker for task 137 INFO PythonRunner: Times: total = 8, boot = 3, init = 5, finish = 0 18/08/17 13:52:20 stdout writer for python TRACE BlockInfoManager: Task 137 releasing lock for input-0-1534485138800 18/08/17 13:52:20 Executor task launch worker for task 137 INFO Executor: 1 block locks were not released by TID = 137: [input-0-1534485138800] 18/08/17 13:52:20 stdout writer for python ERROR Utils: Uncaught exception in thread stdout writer for python java.lang.AssertionError: assertion failed: Block input-0-1534485138800 is not locked for reading at scala.Predef$.assert(Predef.scala:170) at org.apache.spark.storage.BlockInfoManager.unlock(BlockInfoManager.scala:299) at org.apache.spark.storage.BlockManager.releaseLock(BlockManager.scala:769) at org.apache.spark.storage.BlockManager$$anonfun$1.apply$mcV$sp(BlockManager.scala:540) at org.apache.spark.util.CompletionIterator$$anon$1.completion(CompletionIterator.scala:44) at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:33) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at scala.collection.Iterator$class.foreach(Iterator.scala:893) at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:213) at org.apache.spark.api.python.PythonRunner$$anon$2.writeIteratorToStream(PythonRunner.scala:407) at org.apache.spark.api.python.BasePythonRunner$WriterThread$$anonfun$run$1.apply(PythonRunner.scala:215) at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1991) at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:170) 18/08/17 13:52:20 stdout writer for python ERROR SparkUncaughtExceptionHandler: Uncaught exception in thread Thread[stdout writer for python,5,main] {noformat} I think shoud wait WriterThread after Task#run. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-25070) BlockFetchingListener#onBlockFetchSuccess throw "java.util.NoSuchElementException: key not found: shuffle_8_68_113" on ShuffleBlockFetcherIterator caused stage hang lon
[ https://issues.apache.org/jira/browse/SPARK-25070?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] DENG FEI updated SPARK-25070: - Description: The task fetch shuffle block success, but failed onBlockFetchSuccess, lead the task hang long time and speculate as false. The log is below: 18/08/08 14:55:53 INFO ShuffleBlockFetcherIterator: Started 16 remote fetches in 16 ms 18/08/08 14:55:53 WARN TransportChannelHandler: Exception in connection from /xxx.xxx.xxx.xxx:7337 java.util.NoSuchElementException: key not found: shuffle_8_68_113 at scala.collection.MapLike$class.default(MapLike.scala:228) at scala.collection.AbstractMap.default(Map.scala:59) at scala.collection.MapLike$class.apply(MapLike.scala:141) at scala.collection.AbstractMap.apply(Map.scala:59) at org.apache.spark.storage.ShuffleBlockFetcherIterator$$anon$1.onBlockFetchSuccess(ShuffleBlockFetcherIterator.scala:217) at org.apache.spark.network.shuffle.RetryingBlockFetcher$RetryingBlockFetchListener.onBlockFetchSuccess(RetryingBlockFetcher.java:204) at org.apache.spark.network.shuffle.OneForOneBlockFetcher$ChunkCallback.onSuccess(OneForOneBlockFetcher.java:97) at org.apache.spark.network.client.TransportResponseHandler.handle(TransportResponseHandler.java:171) at XX 18/08/08 14:55:53 INFO Executor: Finished task 44.0 in stage 14.0 (TID 1483). 3458 bytes result sent to driver 18/08/09 10:02:32 INFO Executor: Executor is trying to kill task 113.0 in stage 14.0 (TID 1552), reason: stage cancelled {code:java} val blockFetchingListener = new BlockFetchingListener { override def onBlockFetchSuccess(blockId: String, buf: ManagedBuffer): Unit = { // Only add the buffer to results queue if the iterator is not zombie, // i.e. cleanup() has not been called yet. ShuffleBlockFetcherIterator.this.synchronized { try { if (!isZombie) { // Increment the ref count because we need to pass this to a different thread. // This needs to be released after use. buf.retain() remainingBlocks -= blockId results.put(new SuccessFetchResult(BlockId(blockId), address, sizeMap(blockId), buf, remainingBlocks.isEmpty)) logDebug("remainingBlocks: " + remainingBlocks) } } catch { case e : Throwable => onBlockFetchFailure(blockId, e) } } logTrace("Got remote block " + blockId + " after " + Utils.getUsedTimeMs(startTime)) } override def onBlockFetchFailure(blockId: String, e: Throwable): Unit = { logError(s"Failed to get block(s) from ${req.address.host}:${req.address.port}", e) results.put(new FailureFetchResult(BlockId(blockId), address, e)) } } {code} {code:java} results.put(new SuccessFetchResult(BlockId(blockId), address, sizeMap(blockId), buf, remainingBlocks.isEmpty)){code} was: The task fetch shuffle block success, but failed onBlockFetchSuccess, lead the task hang long time and speculate as false. The log is below: 18/08/08 14:55:53 INFO ShuffleBlockFetcherIterator: Started 16 remote fetches in 16 ms 18/08/08 14:55:53 WARN TransportChannelHandler: Exception in connection from /xxx.xxx.xxx.xxx:7337 java.util.NoSuchElementException: key not found: shuffle_8_68_113 at scala.collection.MapLike$class.default(MapLike.scala:228) at scala.collection.AbstractMap.default(Map.scala:59) at scala.collection.MapLike$class.apply(MapLike.scala:141) at scala.collection.AbstractMap.apply(Map.scala:59) at org.apache.spark.storage.ShuffleBlockFetcherIterator$$anon$1.onBlockFetchSuccess(ShuffleBlockFetcherIterator.scala:217) at org.apache.spark.network.shuffle.RetryingBlockFetcher$RetryingBlockFetchListener.onBlockFetchSuccess(RetryingBlockFetcher.java:204) at org.apache.spark.network.shuffle.OneForOneBlockFetcher$ChunkCallback.onSuccess(OneForOneBlockFetcher.java:97) at org.apache.spark.network.client.TransportResponseHandler.handle(TransportResponseHandler.java:171) at org.apache.spark.network.server.TransportChannelHandler.channelRead(TransportChannelHandler.java:120) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:292) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:278) at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:266) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:292) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:278) at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:292) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:278) at org.apache.spark
[jira] [Created] (SPARK-25070) BlockFetchingListener#onBlockFetchSuccess throw "java.util.NoSuchElementException: key not found: shuffle_8_68_113" on ShuffleBlockFetcherIterator caused stage hang lon
DENG FEI created SPARK-25070: Summary: BlockFetchingListener#onBlockFetchSuccess throw "java.util.NoSuchElementException: key not found: shuffle_8_68_113" on ShuffleBlockFetcherIterator caused stage hang long time Key: SPARK-25070 URL: https://issues.apache.org/jira/browse/SPARK-25070 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 2.2.0 Reporter: DENG FEI The task fetch shuffle block success, but failed onBlockFetchSuccess, lead the task hang long time and speculate as false. The log is below: 18/08/08 14:55:53 INFO ShuffleBlockFetcherIterator: Started 16 remote fetches in 16 ms 18/08/08 14:55:53 WARN TransportChannelHandler: Exception in connection from /xxx.xxx.xxx.xxx:7337 java.util.NoSuchElementException: key not found: shuffle_8_68_113 at scala.collection.MapLike$class.default(MapLike.scala:228) at scala.collection.AbstractMap.default(Map.scala:59) at scala.collection.MapLike$class.apply(MapLike.scala:141) at scala.collection.AbstractMap.apply(Map.scala:59) at org.apache.spark.storage.ShuffleBlockFetcherIterator$$anon$1.onBlockFetchSuccess(ShuffleBlockFetcherIterator.scala:217) at org.apache.spark.network.shuffle.RetryingBlockFetcher$RetryingBlockFetchListener.onBlockFetchSuccess(RetryingBlockFetcher.java:204) at org.apache.spark.network.shuffle.OneForOneBlockFetcher$ChunkCallback.onSuccess(OneForOneBlockFetcher.java:97) at org.apache.spark.network.client.TransportResponseHandler.handle(TransportResponseHandler.java:171) at org.apache.spark.network.server.TransportChannelHandler.channelRead(TransportChannelHandler.java:120) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:292) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:278) at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:266) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:292) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:278) at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:292) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:278) at org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:85) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:292) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:278) at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:962) at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:528) at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:485) at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:399) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:371) at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:112) at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137) at java.lang.Thread.run(Thread.java:745) 18/08/08 14:55:53 INFO Executor: Finished task 44.0 in stage 14.0 (TID 1483). 3458 bytes result sent to driver 18/08/09 10:02:32 INFO Executor: Executor is trying to kill task 113.0 in stage 14.0 (TID 1552), reason: stage cancelled {code:java} val blockFetchingListener = new BlockFetchingListener { override def onBlockFetchSuccess(blockId: String, buf: ManagedBuffer): Unit = { // Only add the buffer to results queue if the iterator is not zombie, // i.e. cleanup() has not been called yet. ShuffleBlockFetcherIterator.this.synchronized { try { if (!isZombie) { // Increment the ref count because we need to pass this to a different thread. // This needs to be released after use. buf.retain() remainingBlocks -= blockId results.put(new SuccessFetchResult(BlockId(blockId), address, sizeMap(blockId), buf, remainingBlocks.isEmpty)) logDebug("remainingBlocks: " + remainingBlocks) } } catch { case e : Throwable => onBlockFetchFailure(blockId, e) } } logTrace("Got remote block " + blockId + " after " + Utils.getUsedTimeMs(startTime)) } override def onBlockFetchFailure(blockId: String, e: Throwable): Unit = { logError(s"Failed to get block(s) from ${req.address.host}:${req.address.port}"
[jira] [Resolved] (SPARK-25055) MessageWithHeader transfer ByteBuffer from Netty's CompositeByteBuf many times
[ https://issues.apache.org/jira/browse/SPARK-25055?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] DENG FEI resolved SPARK-25055. -- Resolution: Duplicate > MessageWithHeader transfer ByteBuffer from Netty's CompositeByteBuf many times > -- > > Key: SPARK-25055 > URL: https://issues.apache.org/jira/browse/SPARK-25055 > Project: Spark > Issue Type: Bug > Components: Block Manager >Affects Versions: 2.3.1 >Reporter: DENG FEI >Priority: Major > > MessageWithHeader transfer header and body if they are ByteBuf, in the case > of fetch remote big block with greater than 'NIO_BUFFER_LIMIT', because of > ChunkedByteBuffer#toNetty avoid consolidate, CompositeByteBuf.nioByteBuf will > allocate from heap every times, lead to fetch timeout. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-25055) MessageWithHeader transfer ByteBuffer from Netty's CompositeByteBuf many times
[ https://issues.apache.org/jira/browse/SPARK-25055?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16573154#comment-16573154 ] DENG FEI commented on SPARK-25055: -- {code:java} private int copyByteBuf(ByteBuf buf, WritableByteChannel target) throws IOException { ByteBuffer buffer = buf.nioBuffer(); int totalWritten = 0; while(buffer.remaining() > 0) { int written = (buffer.remaining() <= NIO_BUFFER_LIMIT) ? target.write(buffer) : writeNioBuffer(target, buffer); buf.skipBytes(written); totalWritten += written; } return totalWritten; }{code} > MessageWithHeader transfer ByteBuffer from Netty's CompositeByteBuf many times > -- > > Key: SPARK-25055 > URL: https://issues.apache.org/jira/browse/SPARK-25055 > Project: Spark > Issue Type: Bug > Components: Block Manager >Affects Versions: 2.3.1 >Reporter: DENG FEI >Priority: Major > > MessageWithHeader transfer header and body if they are ByteBuf, in the case > of fetch remote big block with greater than 'NIO_BUFFER_LIMIT', because of > ChunkedByteBuffer#toNetty avoid consolidate, CompositeByteBuf.nioByteBuf will > allocate from heap every times, lead to fetch timeout. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-25055) MessageWithHeader transfer ByteBuffer from Netty's CompositeByteBuf many times
DENG FEI created SPARK-25055: Summary: MessageWithHeader transfer ByteBuffer from Netty's CompositeByteBuf many times Key: SPARK-25055 URL: https://issues.apache.org/jira/browse/SPARK-25055 Project: Spark Issue Type: Bug Components: Block Manager Affects Versions: 2.3.1 Reporter: DENG FEI MessageWithHeader transfer header and body if they are ByteBuf, in the case of fetch remote big block with greater than 'NIO_BUFFER_LIMIT', because of ChunkedByteBuffer#toNetty avoid consolidate, CompositeByteBuf.nioByteBuf will allocate from heap every times, lead to fetch timeout. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-23139) Read eventLog file with mixed encodings
[ https://issues.apache.org/jira/browse/SPARK-23139?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16356646#comment-16356646 ] DENG FEI commented on SPARK-23139: -- [~jiangxb1987] such as SparkListenerTaskEnd#TaskFailedReason#toErrorString, user app can throw customized message, and will encoding by _file.encoding_ or node env. [~vanzin] ASSII is all charsets's base, i think it's enough for user to see the app history what's happen. > Read eventLog file with mixed encodings > --- > > Key: SPARK-23139 > URL: https://issues.apache.org/jira/browse/SPARK-23139 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.2.0 >Reporter: DENG FEI >Priority: Major > > EventLog may contain mixed encodings such as custom exception message, but > caused to replay failure. > java.nio.charset.MalformedInputException: Input length = 1 > at java.nio.charset.CoderResult.throwException(CoderResult.java:281) > at sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:339) > at sun.nio.cs.StreamDecoder.read(StreamDecoder.java:178) > at java.io.InputStreamReader.read(InputStreamReader.java:184) > at java.io.BufferedReader.fill(BufferedReader.java:161) > at java.io.BufferedReader.readLine(BufferedReader.java:324) > at java.io.BufferedReader.readLine(BufferedReader.java:389) > at > scala.io.BufferedSource$BufferedLineIterator.hasNext(BufferedSource.scala:72) > at scala.collection.Iterator$$anon$21.hasNext(Iterator.scala:836) > at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:461) > at > org.apache.spark.scheduler.ReplayListenerBus.replay(ReplayListenerBus.scala:78) > at > org.apache.spark.scheduler.ReplayListenerBus.replay(ReplayListenerBus.scala:58) > at > org.apache.spark.deploy.history.FsHistoryProvider.org$apache$spark$deploy$history$FsHistoryProvider$$replay(FsHistoryProvider.scala:694) > at > org.apache.spark.deploy.history.FsHistoryProvider.org$apache$spark$deploy$history$FsHistoryProvider$$mergeApplicationListing(FsHistoryProvider.scala:507) > at > org.apache.spark.deploy.history.FsHistoryProvider$$anonfun$checkForLogs$4$$anon$4.run(FsHistoryProvider.scala:399) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-23139) Read eventLog file with mixed encodings
[ https://issues.apache.org/jira/browse/SPARK-23139?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16356428#comment-16356428 ] DENG FEI commented on SPARK-23139: -- _ASSII_ is enough to spark event log. And if forcing writing with UTF-8, should also forcing reading with UTF-8 too. > Read eventLog file with mixed encodings > --- > > Key: SPARK-23139 > URL: https://issues.apache.org/jira/browse/SPARK-23139 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.2.0 >Reporter: DENG FEI >Priority: Major > > EventLog may contain mixed encodings such as custom exception message, but > caused to replay failure. > java.nio.charset.MalformedInputException: Input length = 1 > at java.nio.charset.CoderResult.throwException(CoderResult.java:281) > at sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:339) > at sun.nio.cs.StreamDecoder.read(StreamDecoder.java:178) > at java.io.InputStreamReader.read(InputStreamReader.java:184) > at java.io.BufferedReader.fill(BufferedReader.java:161) > at java.io.BufferedReader.readLine(BufferedReader.java:324) > at java.io.BufferedReader.readLine(BufferedReader.java:389) > at > scala.io.BufferedSource$BufferedLineIterator.hasNext(BufferedSource.scala:72) > at scala.collection.Iterator$$anon$21.hasNext(Iterator.scala:836) > at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:461) > at > org.apache.spark.scheduler.ReplayListenerBus.replay(ReplayListenerBus.scala:78) > at > org.apache.spark.scheduler.ReplayListenerBus.replay(ReplayListenerBus.scala:58) > at > org.apache.spark.deploy.history.FsHistoryProvider.org$apache$spark$deploy$history$FsHistoryProvider$$replay(FsHistoryProvider.scala:694) > at > org.apache.spark.deploy.history.FsHistoryProvider.org$apache$spark$deploy$history$FsHistoryProvider$$mergeApplicationListing(FsHistoryProvider.scala:507) > at > org.apache.spark.deploy.history.FsHistoryProvider$$anonfun$checkForLogs$4$$anon$4.run(FsHistoryProvider.scala:399) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-23139) Read eventLog file with mixed encodings
[ https://issues.apache.org/jira/browse/SPARK-23139?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16355022#comment-16355022 ] DENG FEI edited comment on SPARK-23139 at 2/7/18 6:34 AM: -- [~irashid] You're right, but one can change the default character set in _'spark.driver / executor.extraJavaOptions'_ by setting _'-Dfile.encoding = ***'._ This should not be limiting. was (Author: deng fei): [~irashid] You're right, but one can change the default character set in _'spark.driver / executor.extraJavaOptions'_ by setting _'-Dfile.encoding = ***'._ __This should not be limiting. > Read eventLog file with mixed encodings > --- > > Key: SPARK-23139 > URL: https://issues.apache.org/jira/browse/SPARK-23139 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.2.0 >Reporter: DENG FEI >Priority: Major > > EventLog may contain mixed encodings such as custom exception message, but > caused to replay failure. > java.nio.charset.MalformedInputException: Input length = 1 > at java.nio.charset.CoderResult.throwException(CoderResult.java:281) > at sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:339) > at sun.nio.cs.StreamDecoder.read(StreamDecoder.java:178) > at java.io.InputStreamReader.read(InputStreamReader.java:184) > at java.io.BufferedReader.fill(BufferedReader.java:161) > at java.io.BufferedReader.readLine(BufferedReader.java:324) > at java.io.BufferedReader.readLine(BufferedReader.java:389) > at > scala.io.BufferedSource$BufferedLineIterator.hasNext(BufferedSource.scala:72) > at scala.collection.Iterator$$anon$21.hasNext(Iterator.scala:836) > at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:461) > at > org.apache.spark.scheduler.ReplayListenerBus.replay(ReplayListenerBus.scala:78) > at > org.apache.spark.scheduler.ReplayListenerBus.replay(ReplayListenerBus.scala:58) > at > org.apache.spark.deploy.history.FsHistoryProvider.org$apache$spark$deploy$history$FsHistoryProvider$$replay(FsHistoryProvider.scala:694) > at > org.apache.spark.deploy.history.FsHistoryProvider.org$apache$spark$deploy$history$FsHistoryProvider$$mergeApplicationListing(FsHistoryProvider.scala:507) > at > org.apache.spark.deploy.history.FsHistoryProvider$$anonfun$checkForLogs$4$$anon$4.run(FsHistoryProvider.scala:399) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-23139) Read eventLog file with mixed encodings
[ https://issues.apache.org/jira/browse/SPARK-23139?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16355022#comment-16355022 ] DENG FEI commented on SPARK-23139: -- [~irashid] You're right, but one can change the default character set in _'spark.driver / executor.extraJavaOptions'_ by setting _'-Dfile.encoding = ***'._ __This should not be limiting. > Read eventLog file with mixed encodings > --- > > Key: SPARK-23139 > URL: https://issues.apache.org/jira/browse/SPARK-23139 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.2.0 >Reporter: DENG FEI >Priority: Major > > EventLog may contain mixed encodings such as custom exception message, but > caused to replay failure. > java.nio.charset.MalformedInputException: Input length = 1 > at java.nio.charset.CoderResult.throwException(CoderResult.java:281) > at sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:339) > at sun.nio.cs.StreamDecoder.read(StreamDecoder.java:178) > at java.io.InputStreamReader.read(InputStreamReader.java:184) > at java.io.BufferedReader.fill(BufferedReader.java:161) > at java.io.BufferedReader.readLine(BufferedReader.java:324) > at java.io.BufferedReader.readLine(BufferedReader.java:389) > at > scala.io.BufferedSource$BufferedLineIterator.hasNext(BufferedSource.scala:72) > at scala.collection.Iterator$$anon$21.hasNext(Iterator.scala:836) > at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:461) > at > org.apache.spark.scheduler.ReplayListenerBus.replay(ReplayListenerBus.scala:78) > at > org.apache.spark.scheduler.ReplayListenerBus.replay(ReplayListenerBus.scala:58) > at > org.apache.spark.deploy.history.FsHistoryProvider.org$apache$spark$deploy$history$FsHistoryProvider$$replay(FsHistoryProvider.scala:694) > at > org.apache.spark.deploy.history.FsHistoryProvider.org$apache$spark$deploy$history$FsHistoryProvider$$mergeApplicationListing(FsHistoryProvider.scala:507) > at > org.apache.spark.deploy.history.FsHistoryProvider$$anonfun$checkForLogs$4$$anon$4.run(FsHistoryProvider.scala:399) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-23139) Read eventLog file with mixed encodings
DENG FEI created SPARK-23139: Summary: Read eventLog file with mixed encodings Key: SPARK-23139 URL: https://issues.apache.org/jira/browse/SPARK-23139 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 2.2.0 Reporter: DENG FEI EventLog may contain mixed encodings such as custom exception message, but caused to replay failure. java.nio.charset.MalformedInputException: Input length = 1 at java.nio.charset.CoderResult.throwException(CoderResult.java:281) at sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:339) at sun.nio.cs.StreamDecoder.read(StreamDecoder.java:178) at java.io.InputStreamReader.read(InputStreamReader.java:184) at java.io.BufferedReader.fill(BufferedReader.java:161) at java.io.BufferedReader.readLine(BufferedReader.java:324) at java.io.BufferedReader.readLine(BufferedReader.java:389) at scala.io.BufferedSource$BufferedLineIterator.hasNext(BufferedSource.scala:72) at scala.collection.Iterator$$anon$21.hasNext(Iterator.scala:836) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:461) at org.apache.spark.scheduler.ReplayListenerBus.replay(ReplayListenerBus.scala:78) at org.apache.spark.scheduler.ReplayListenerBus.replay(ReplayListenerBus.scala:58) at org.apache.spark.deploy.history.FsHistoryProvider.org$apache$spark$deploy$history$FsHistoryProvider$$replay(FsHistoryProvider.scala:694) at org.apache.spark.deploy.history.FsHistoryProvider.org$apache$spark$deploy$history$FsHistoryProvider$$mergeApplicationListing(FsHistoryProvider.scala:507) at org.apache.spark.deploy.history.FsHistoryProvider$$anonfun$checkForLogs$4$$anon$4.run(FsHistoryProvider.scala:399) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org