[jira] [Updated] (SPARK-29619) Add retry times when reading the daemon port.

2019-11-13 Thread jiaan.geng (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-29619?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

jiaan.geng updated SPARK-29619:
---
Description: This ticket is related to 
https://issues.apache.org/jira/browse/SPARK-29885 and add try mechanism.

> Add retry times when reading the daemon port.
> -
>
> Key: SPARK-29619
> URL: https://issues.apache.org/jira/browse/SPARK-29619
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 3.0.0
>Reporter: jiaan.geng
>Priority: Major
>
> This ticket is related to https://issues.apache.org/jira/browse/SPARK-29885 
> and add try mechanism.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-29619) Add retry times when reading the daemon port.

2019-11-13 Thread jiaan.geng (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-29619?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

jiaan.geng updated SPARK-29619:
---
Summary: Add retry times when reading the daemon port.  (was: Improve the 
exception message when reading the daemon port and add retry times.)

> Add retry times when reading the daemon port.
> -
>
> Key: SPARK-29619
> URL: https://issues.apache.org/jira/browse/SPARK-29619
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Affects Versions: 3.0.0
>Reporter: jiaan.geng
>Priority: Major
>
> In production environment, my pyspark application occurs an exception and 
> it's message as below:
> {code:java}
> 19/10/28 16:15:03 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
> org.apache.spark.SparkException: No port number in pyspark.daemon's stdout
>  at 
> org.apache.spark.api.python.PythonWorkerFactory.startDaemon(PythonWorkerFactory.scala:204)
>  at 
> org.apache.spark.api.python.PythonWorkerFactory.createThroughDaemon(PythonWorkerFactory.scala:122)
>  at 
> org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:95)
>  at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:117)
>  at 
> org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:108)
>  at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:65)
>  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
>  at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:337)
>  at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:335)
>  at 
> org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1182)
>  at 
> org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1156)
>  at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091)
>  at 
> org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156)
>  at 
> org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882)
>  at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:335)
>  at org.apache.spark.rdd.RDD.iterator(RDD.scala:286)
>  at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:65)
>  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
>  at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
>  at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
>  at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
>  at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
>  at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
>  at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
>  at org.apache.spark.scheduler.Task.run(Task.scala:121)
>  at 
> org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
>  at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
>  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>  at java.lang.Thread.run(Thread.java:745){code}
>  
> At first, I think a physical node has many ports are occupied by a large 
> number of processes.
> But I found the total number of ports in use is only 671.
>  
> {code:java}
> [yarn@r1115 ~]$ netstat -a | wc -l
> 671
> {code}
> I  checked the code of PythonWorkerFactory in line 204 and found:
> {code:java}
> daemon = pb.start()
> val in = new DataInputStream(daemon.getInputStream)
>  try {
>  daemonPort = in.readInt()
>  } catch {
>  case _: EOFException =>
>  throw new SparkException(s"No port number in $daemonModule's stdout")
>  }
> {code}
> I added some code here:
> {code:java}
> logError("Meet EOFException, daemon is alive: ${daemon.isAlive()}")
> logError("Exit value: ${daemon.exitValue()}")
> {code}
> Then I recurrent the exception and it's message as below:
> {code:java}
> 19/10/28 16:15:03 ERROR PythonWorkerFactory: Meet EOFException, daemon is 
> alive: false
> 19/10/28 16:15:03 ERROR PythonWorkerFactory: Exit value: 139
> 19/10/28 16:15:03 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
> org.apache.spark.SparkException: No port number in pyspark.daemon's stdout
>  at 
> org.apache.spark.api.python.PythonWorkerFactory.startDaemon(PythonWorkerFactory.scala:206)
>  at 
> org.apache.spark.api.python.PythonWorkerFactory.createThroughDaemon(PythonWorkerFactory.scala:122)
>  at 
> org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:95)
>  at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:117)
>  at

[jira] [Updated] (SPARK-29619) Add retry times when reading the daemon port.

2019-11-13 Thread jiaan.geng (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-29619?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

jiaan.geng updated SPARK-29619:
---
Description: (was: In production environment, my pyspark application 
occurs an exception and it's message as below:
{code:java}
19/10/28 16:15:03 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
org.apache.spark.SparkException: No port number in pyspark.daemon's stdout
 at 
org.apache.spark.api.python.PythonWorkerFactory.startDaemon(PythonWorkerFactory.scala:204)
 at 
org.apache.spark.api.python.PythonWorkerFactory.createThroughDaemon(PythonWorkerFactory.scala:122)
 at 
org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:95)
 at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:117)
 at org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:108)
 at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:65)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
 at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:337)
 at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:335)
 at 
org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1182)
 at 
org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1156)
 at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091)
 at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156)
 at 
org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:882)
 at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:335)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:286)
 at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:65)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
 at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
 at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
 at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
 at org.apache.spark.scheduler.Task.run(Task.scala:121)
 at 
org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
 at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
 at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
 at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
 at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
 at java.lang.Thread.run(Thread.java:745){code}
 

At first, I think a physical node has many ports are occupied by a large number 
of processes.

But I found the total number of ports in use is only 671.

 
{code:java}
[yarn@r1115 ~]$ netstat -a | wc -l
671
{code}
I  checked the code of PythonWorkerFactory in line 204 and found:
{code:java}
daemon = pb.start()
val in = new DataInputStream(daemon.getInputStream)
 try {
 daemonPort = in.readInt()
 } catch {
 case _: EOFException =>
 throw new SparkException(s"No port number in $daemonModule's stdout")
 }
{code}
I added some code here:
{code:java}
logError("Meet EOFException, daemon is alive: ${daemon.isAlive()}")
logError("Exit value: ${daemon.exitValue()}")
{code}
Then I recurrent the exception and it's message as below:
{code:java}
19/10/28 16:15:03 ERROR PythonWorkerFactory: Meet EOFException, daemon is 
alive: false
19/10/28 16:15:03 ERROR PythonWorkerFactory: Exit value: 139
19/10/28 16:15:03 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
org.apache.spark.SparkException: No port number in pyspark.daemon's stdout
 at 
org.apache.spark.api.python.PythonWorkerFactory.startDaemon(PythonWorkerFactory.scala:206)
 at 
org.apache.spark.api.python.PythonWorkerFactory.createThroughDaemon(PythonWorkerFactory.scala:122)
 at 
org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:95)
 at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:117)
 at org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:108)
 at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:65)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
 at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:337)
 at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:335)
 at 
org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1182)
 at 
org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1156)
 at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1091)
 at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1156)
 at 
or