[jira] [Created] (SPARK-5316) DAGScheduler may make shuffleToMapStage leak if getParentStages failes

2015-01-19 Thread YanTang Zhai (JIRA)
YanTang Zhai created SPARK-5316:
---

 Summary: DAGScheduler may make shuffleToMapStage leak if 
getParentStages failes
 Key: SPARK-5316
 URL: https://issues.apache.org/jira/browse/SPARK-5316
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Reporter: YanTang Zhai
Priority: Minor


DAGScheduler may make shuffleToMapStage leak if getParentStages failes.
If getParentStages has exception for example input path does not exist, 
DAGScheduler would fail to handle job submission, while shuffleToMapStage may 
be put some records when getParentStages. However these records in 
shuffleToMapStage aren't going to be cleaned.
A simple job as follows:
{code:java}
val inputFile1 = ... // Input path does not exist when this job submits
val inputFile2 = ...
val outputFile = ...
val conf = new SparkConf()
val sc = new SparkContext(conf)
val rdd1 = sc.textFile(inputFile1)
.flatMap(line => line.split(" "))
.map(word => (word, 1))
.reduceByKey(_ + _, 1)
val rdd2 = sc.textFile(inputFile2)
.flatMap(line => line.split(","))
.map(word => (word, 1))
.reduceByKey(_ + _, 1)
try {
  val rdd3 = new PairRDDFunctions(rdd1).join(rdd2, 1)
  rdd3.saveAsTextFile(outputFile)
} catch {
  case e : Exception =>
  logError(e)
}
// print the information of DAGScheduler's shuffleToMapStage to check
// whether it still has uncleaned records.
...
{code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-5163) Load properties from configuration file for example spark-defaults.conf when creating SparkConf object

2015-01-08 Thread YanTang Zhai (JIRA)
YanTang Zhai created SPARK-5163:
---

 Summary: Load properties from configuration file for example 
spark-defaults.conf when creating SparkConf object
 Key: SPARK-5163
 URL: https://issues.apache.org/jira/browse/SPARK-5163
 Project: Spark
  Issue Type: Improvement
  Components: Spark Core
Reporter: YanTang Zhai
Priority: Minor


I create and run a Spark program which does not use SparkSubmit.
When I create a SparkConf object with `new SparkConf()`, it will not 
automatically load properties from configuration file for example 
spark-defaults.conf.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Closed] (SPARK-5007) Try random port when startServiceOnPort to reduce the chance of port collision

2015-01-08 Thread YanTang Zhai (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-5007?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

YanTang Zhai closed SPARK-5007.
---
Resolution: Won't Fix

> Try random port when startServiceOnPort to reduce the chance of port collision
> --
>
> Key: SPARK-5007
> URL: https://issues.apache.org/jira/browse/SPARK-5007
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Reporter: YanTang Zhai
>Priority: Minor
>
> When multiple Spark programs are submitted at the same node (called 
> springboard machine). The ports (default 4040) of these SparkUIs are from 
> 4040 to 4056. Then the Spark programs submitted later would fail because of 
> SparkUI port collision.
> The chance of port collision could be reduced by setting spark.ui.port or 
> spark.port.maxRetries.
> However, I think it's better to try random port when startServiceOnPort to 
> reduce the chance of port collision.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-5007) Try random port when startServiceOnPort to reduce the chance of port collision

2015-01-08 Thread YanTang Zhai (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-5007?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14270421#comment-14270421
 ] 

YanTang Zhai commented on SPARK-5007:
-

[~rxin] Oh, I see. Thank you very much.

> Try random port when startServiceOnPort to reduce the chance of port collision
> --
>
> Key: SPARK-5007
> URL: https://issues.apache.org/jira/browse/SPARK-5007
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Reporter: YanTang Zhai
>Priority: Minor
>
> When multiple Spark programs are submitted at the same node (called 
> springboard machine). The ports (default 4040) of these SparkUIs are from 
> 4040 to 4056. Then the Spark programs submitted later would fail because of 
> SparkUI port collision.
> The chance of port collision could be reduced by setting spark.ui.port or 
> spark.port.maxRetries.
> However, I think it's better to try random port when startServiceOnPort to 
> reduce the chance of port collision.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-5007) Try random port when startServiceOnPort to reduce the chance of port collision

2014-12-30 Thread YanTang Zhai (JIRA)
YanTang Zhai created SPARK-5007:
---

 Summary: Try random port when startServiceOnPort to reduce the 
chance of port collision
 Key: SPARK-5007
 URL: https://issues.apache.org/jira/browse/SPARK-5007
 Project: Spark
  Issue Type: Improvement
  Components: Spark Core
Reporter: YanTang Zhai
Priority: Minor


When multiple Spark programs are submitted at the same node (called springboard 
machine). The ports (default 4040) of these SparkUIs are from 4040 to 4056. 
Then the Spark programs submitted later would fail because of SparkUI port 
collision.
The chance of port collision could be reduced by setting spark.ui.port or 
spark.port.maxRetries.
However, I think it's better to try random port when startServiceOnPort to 
reduce the chance of port collision.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-4962) Put TaskScheduler.start back in SparkContext to shorten cluster resources occupation period

2014-12-26 Thread YanTang Zhai (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-4962?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14259041#comment-14259041
 ] 

YanTang Zhai commented on SPARK-4962:
-

The main purpose of this PR is to decrease resources waste for busy cluster.
For example, a process initializes a SparkContext instance, reads a few files 
from HDFS or many records from PostgreSQL, and then calls RDD's collect 
operation to submit a job.
When SparkContext is initialized, an app is submitted to cluster and some 
resources are hold by this app. 
These resources are not used really until the job is submitted by RDD's action.
The resources in the period from initialization to actual use could be 
considered wasteful.
If app is submitted when SparkContext is initialized, all of resources needed 
by the app may be granted before running job. 
Then the job could runs efficiently without resource constraint.
On the contrary, if app is submitted when job is submitted, resources needed by 
the app may be granted at different times. Then the job may run not so 
efficiently since some resources are applying.
Thus I use a configuration parameter spark.scheduler.app.slowstart (default 
false) to let user make tradeoffs between economy and efficiency.
There are 9 kinds of master URL and 6 kinds of SchedulerBackend.
LocalBackend and SimrSchedulerBackend don't need to put starting back since 
there is no difference.
SparkClusterSchedulerBackend (yarn-standalone or yarn-cluster) does not put 
starting back since the app should be submitted in advance by SparkSubmit.
CoarseMesosSchedulerBackend and MesosSchedulerBackend could put starting back.
YarnClientSchedulerBackend (yarn-client) could put starting back.
This PR puts TaskScheduler.start back only for yarn-client mode in the early.

> Put TaskScheduler.start back in SparkContext to shorten cluster resources 
> occupation period
> ---
>
> Key: SPARK-4962
> URL: https://issues.apache.org/jira/browse/SPARK-4962
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Reporter: YanTang Zhai
>Priority: Minor
>
> When SparkContext object is instantiated, TaskScheduler is started and some 
> resources are allocated from cluster. However, these
> resources may be not used for the moment. For example, 
> DAGScheduler.JobSubmitted is processing and so on. These resources are wasted 
> in
> this period. Thus, we want to put TaskScheduler.start back to shorten cluster 
> resources occupation period specially for busy cluster.
> TaskScheduler could be started just before running stages.
> We could analyse and compare the  resources occupation period before and 
> after optimization.
> TaskScheduler.start execution time: [time1__]
> DAGScheduler.JobSubmitted (excluding HadoopRDD.getPartitions or 
> TaskScheduler.start) execution time: [time2_]
> HadoopRDD.getPartitions execution time: [time3___]
> Stages execution time: [time4_]
> The cluster resources occupation period before optimization is 
> [time2_][time3___][time4_].
> The cluster resources occupation period after optimization 
> is[time3___][time4_].
> In summary, the cluster resources
> occupation period after optimization is less than before.
> If HadoopRDD.getPartitions could be put forward (SPARK-4961), the period may 
> be shorten more which is [time4_].
> The resources saving is important for busy cluster.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-4962) Put TaskScheduler.start back in SparkContext to shorten cluster resources occupation period

2014-12-24 Thread YanTang Zhai (JIRA)
YanTang Zhai created SPARK-4962:
---

 Summary: Put TaskScheduler.start back in SparkContext to shorten 
cluster resources occupation period
 Key: SPARK-4962
 URL: https://issues.apache.org/jira/browse/SPARK-4962
 Project: Spark
  Issue Type: Improvement
  Components: Spark Core
Reporter: YanTang Zhai
Priority: Minor


When SparkContext object is instantiated, TaskScheduler is started and some 
resources are allocated from cluster. However, these
resources may be not used for the moment. For example, 
DAGScheduler.JobSubmitted is processing and so on. These resources are wasted in
this period. Thus, we want to put TaskScheduler.start back to shorten cluster 
resources occupation period specially for busy cluster.
TaskScheduler could be started just before running stages.

We could analyse and compare the  resources occupation period before and after 
optimization.

TaskScheduler.start execution time: [time1__]
DAGScheduler.JobSubmitted (excluding HadoopRDD.getPartitions or 
TaskScheduler.start) execution time: [time2_]
HadoopRDD.getPartitions execution time: [time3___]
Stages execution time: [time4_]

The cluster resources occupation period before optimization is 
[time2_][time3___][time4_].
The cluster resources occupation period after optimization 
is[time3___][time4_].
In summary, the cluster resources
occupation period after optimization is less than before.
If HadoopRDD.getPartitions could be put forward (SPARK-4961), the period may be 
shorten more which is [time4_].
The resources saving is important for busy cluster.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-4961) Put HadoopRDD.getPartitions forward to reduce DAGScheduler.JobSubmitted processing time

2014-12-24 Thread YanTang Zhai (JIRA)
YanTang Zhai created SPARK-4961:
---

 Summary: Put HadoopRDD.getPartitions forward to reduce 
DAGScheduler.JobSubmitted processing time
 Key: SPARK-4961
 URL: https://issues.apache.org/jira/browse/SPARK-4961
 Project: Spark
  Issue Type: Improvement
  Components: Spark Core
Reporter: YanTang Zhai
Priority: Minor


HadoopRDD.getPartitions is lazyied to process in DAGScheduler.JobSubmitted. If 
inputdir is large, getPartitions may spend much time.
For example, in our cluster, it needs from 0.029s to 766.699s. If one 
JobSubmitted event is processing, others should wait. Thus, we
want to put HadoopRDD.getPartitions forward to reduce DAGScheduler.JobSubmitted 
processing time. Then other JobSubmitted event don't
need to wait much time. HadoopRDD object could get its partitons when it is 
instantiated.

We could analyse and compare the execution time before and after optimization.
TaskScheduler.start execution time: [time1__]
DAGScheduler.JobSubmitted (excluding HadoopRDD.getPartitions or 
TaskScheduler.start) execution time: [time2_]
HadoopRDD.getPartitions execution time: [time3___]
Stages execution time: [time4_]
(1) The app has only one job
(a)
The execution time of the job before optimization is 
[time1__][time2_][time3___][time4_].
The execution time of the job after optimization 
is[time1__][time3___][time2_][time4_].

In summary, if the app has only one job, the total execution time is same 
before and after optimization.
(2) The app has 4 jobs
(a) Before optimization,
job1 execution time is [time2_][time3___][time4_],
job2 execution time is [time2__][time3___][time4_],
job3 execution time 
is[time2][time3___][time4_],
job4 execution time 
is[time2_][time3___][time4_].
After optimization, 
job1 execution time is [time3___][time2_][time4_],
job2 execution time is [time3___][time2__][time4_],
job3 execution time 
is[time3___][time2_][time4_],
job4 execution time 
is[time3___][time2__][time4_].
In summary, if the app has multiple jobs, average execution time after 
optimization is less than before.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-4946) Using AkkaUtils.askWithReply in MapOutputTracker.askTracker to reduce the chance of the communicating problem

2014-12-23 Thread YanTang Zhai (JIRA)
YanTang Zhai created SPARK-4946:
---

 Summary: Using AkkaUtils.askWithReply in 
MapOutputTracker.askTracker to reduce the chance of the communicating problem
 Key: SPARK-4946
 URL: https://issues.apache.org/jira/browse/SPARK-4946
 Project: Spark
  Issue Type: Improvement
  Components: Spark Core
Reporter: YanTang Zhai
Priority: Minor


Using AkkaUtils.askWithReply in MapOutputTracker.askTracker to reduce the 
chance of the communicating problem.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-4723) To abort the stages which have attempted some times

2014-12-03 Thread YanTang Zhai (JIRA)
YanTang Zhai created SPARK-4723:
---

 Summary: To abort the stages which have attempted some times
 Key: SPARK-4723
 URL: https://issues.apache.org/jira/browse/SPARK-4723
 Project: Spark
  Issue Type: Improvement
Reporter: YanTang Zhai
Priority: Minor


For some reason, some stages may attempt many times. A threshold could be added 
and the stages which have attempted more than the threshold could be aborted.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-4693) PruningPredicates may be wrong if predicates contains an empty AttributeSet() references

2014-12-02 Thread YanTang Zhai (JIRA)
YanTang Zhai created SPARK-4693:
---

 Summary: PruningPredicates may be wrong if predicates contains an 
empty AttributeSet() references
 Key: SPARK-4693
 URL: https://issues.apache.org/jira/browse/SPARK-4693
 Project: Spark
  Issue Type: Bug
  Components: SQL
Reporter: YanTang Zhai
Priority: Minor


The sql "select * from spark_test::for_test where abs(20141202) is not null" 
has predicates=List(IS NOT NULL 
HiveSimpleUdf#org.apache.hadoop.hive.ql.udf.UDFAbs(20141202)) and 
partitionKeyIds=AttributeSet(). PruningPredicates is List(IS NOT NULL 
HiveSimpleUdf#org.apache.hadoop.hive.ql.udf.UDFAbs(20141202)). Then the 
exception "java.lang.IllegalArgumentException: requirement failed: Partition 
pruning predicates only supported for partitioned tables." is thrown.
The sql "select * from spark_test::for_test_partitioned_table where 
abs(20141202) is not null and type_id=11 and platform = 3" with partitioned key 
insert_date has predicates=List(IS NOT NULL 
HiveSimpleUdf#org.apache.hadoop.hive.ql.udf.UDFAbs(20141202), (type_id#12 = 
11), (platform#8 = 3)) and partitionKeyIds=AttributeSet(insert_date#24). 
PruningPredicates is List(IS NOT NULL 
HiveSimpleUdf#org.apache.hadoop.hive.ql.udf.UDFAbs(20141202)).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-4692) Support ! boolean logic operator like NOT

2014-12-02 Thread YanTang Zhai (JIRA)
YanTang Zhai created SPARK-4692:
---

 Summary: Support ! boolean logic operator like NOT
 Key: SPARK-4692
 URL: https://issues.apache.org/jira/browse/SPARK-4692
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Reporter: YanTang Zhai
Priority: Minor


select * from for_test where !(col1 > col2)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-4677) Add hadoop input time in task webui

2014-12-01 Thread YanTang Zhai (JIRA)
YanTang Zhai created SPARK-4677:
---

 Summary: Add hadoop input time in task webui
 Key: SPARK-4677
 URL: https://issues.apache.org/jira/browse/SPARK-4677
 Project: Spark
  Issue Type: Improvement
  Components: Web UI
Reporter: YanTang Zhai
Priority: Minor


Add hadoop input time in task webui like GC Time to explicitly show the time 
used by task to read input data.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-4676) JavaSchemaRDD.schema may throw NullType MatchError if sql has null

2014-12-01 Thread YanTang Zhai (JIRA)
YanTang Zhai created SPARK-4676:
---

 Summary: JavaSchemaRDD.schema may throw NullType MatchError if sql 
has null
 Key: SPARK-4676
 URL: https://issues.apache.org/jira/browse/SPARK-4676
 Project: Spark
  Issue Type: Bug
  Components: SQL
Reporter: YanTang Zhai


val jsc = new org.apache.spark.api.java.JavaSparkContext(sc)
val jhc = new org.apache.spark.sql.hive.api.java.JavaHiveContext(jsc)
val nrdd = jhc.hql("select null from spark_test.for_test")
println(nrdd.schema)
Then the error is thrown as follows:
scala.MatchError: NullType (of class 
org.apache.spark.sql.catalyst.types.NullType$)
at 
org.apache.spark.sql.types.util.DataTypeConversions$.asJavaDataType(DataTypeConversions.scala:43)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-4401) RuleExecutor correctly logs trace iteration num

2014-11-14 Thread YanTang Zhai (JIRA)
YanTang Zhai created SPARK-4401:
---

 Summary: RuleExecutor correctly logs trace iteration num
 Key: SPARK-4401
 URL: https://issues.apache.org/jira/browse/SPARK-4401
 Project: Spark
  Issue Type: Bug
  Components: SQL
Reporter: YanTang Zhai


RuleExecutor logs trace wrong iteration num as follows
if (curPlan.fastEquals(lastPlan)) {
  logTrace(s"Fixed point reached for batch ${batch.name} after 
$iteration iterations.")
  continue = false
}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-4334) Utils.startServiceOnPort should check whether the tryPort is less than 1024

2014-11-10 Thread YanTang Zhai (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-4334?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

YanTang Zhai resolved SPARK-4334.
-
Resolution: Duplicate

> Utils.startServiceOnPort should check whether the tryPort is less than 1024
> ---
>
> Key: SPARK-4334
> URL: https://issues.apache.org/jira/browse/SPARK-4334
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Reporter: YanTang Zhai
>Priority: Minor
>
> Utils.startServiceOnPort should check whether the tryPort is less than 1024.
> If next attempt port is less than 1024, SockertException with "Permission 
> denied" will be thrown.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-4334) Utils.startServiceOnPort should check whether the tryPort is less than 1024

2014-11-10 Thread YanTang Zhai (JIRA)
YanTang Zhai created SPARK-4334:
---

 Summary: Utils.startServiceOnPort should check whether the tryPort 
is less than 1024
 Key: SPARK-4334
 URL: https://issues.apache.org/jira/browse/SPARK-4334
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Reporter: YanTang Zhai
Priority: Minor


Utils.startServiceOnPort should check whether the tryPort is less than 1024.
If next attempt port is less than 1024, SockertException with "Permission 
denied" will be thrown.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-4316) Utils.isBindCollision misjudges at Non-English environment

2014-11-10 Thread YanTang Zhai (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-4316?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14205810#comment-14205810
 ] 

YanTang Zhai commented on SPARK-4316:
-

Thanks @Sean Owen

> Utils.isBindCollision misjudges at Non-English environment
> --
>
> Key: SPARK-4316
> URL: https://issues.apache.org/jira/browse/SPARK-4316
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Reporter: YanTang Zhai
>Priority: Minor
>
> export LANG=zh_CN.utf8
> export LC_ALL=zh_CN.utf8
> Then Utils.isBindCollision misjudges since BindException's message is 
> "地址已在使用" not "Address already in use".



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-4316) Utils.isBindCollision misjudges at Non-English environment

2014-11-10 Thread YanTang Zhai (JIRA)
YanTang Zhai created SPARK-4316:
---

 Summary: Utils.isBindCollision misjudges at Non-English environment
 Key: SPARK-4316
 URL: https://issues.apache.org/jira/browse/SPARK-4316
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Reporter: YanTang Zhai
Priority: Minor


export LANG=zh_CN.utf8
export LC_ALL=zh_CN.utf8
Then Utils.isBindCollision misjudges since BindException's message is "地址已在使用" 
not "Address already in use".



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-4273) Providing ExternalSet to avoid OOM when count(distinct)

2014-11-06 Thread YanTang Zhai (JIRA)
YanTang Zhai created SPARK-4273:
---

 Summary: Providing ExternalSet to avoid OOM when count(distinct)
 Key: SPARK-4273
 URL: https://issues.apache.org/jira/browse/SPARK-4273
 Project: Spark
  Issue Type: Improvement
  Components: Spark Core, SQL
Reporter: YanTang Zhai
Priority: Minor


Some task may OOM when count(distinct) if it needs to process many records. 
CombineSetsAndCountFunction puts all records into an OpenHashSet, if it fetchs 
many records, it may occupy large memory.
I think a data structure ExternalSet like ExternalAppendOnlyMap could be 
provided to store OpenHashSet data in disks when it's capacity exceeds some 
threshold.
For example, OpenHashSet1(ohs1) has [d, b, c, a]. It is spilled to file1 with 
hashCode sorted, then the file1 contains [a, b, c, d]. The procedure could be 
indicated as follows:
ohs1 [d, b, c, a] => [a, b, c, d] => file1
ohs2 [e, f, g, a] => [a, e, f, g] => file2
ohs3 [e, h, i, g] => [e, g, h, i] => file3
ohs4 [j, h, a] => [a, h, j] => sortedSet
When output, all keys with the same hashCode will be put into a OpenHashSet, 
then the iterator of this OpenHashSet is accessing. The procedure could be 
indicated as follows:
file1-> a -> ohsA; file2 -> a -> ohsA; sortedSet -> a -> ohsA; ohsA -> a;
file1 -> b -> ohsB; ohsB -> b;
file1 -> c -> ohsC; ohsC -> c;
file1 -> d -> ohsD; ohsD -> d;
file2 -> e -> ohsE; file3 -> e -> ohsE; ohsE-> e;
...

I think using the ExternalSet could avoid OOM when count(distinct). Welcomes 
comments.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-4009) HiveTableScan should use makeRDDForTable instead of makeRDDForPartitionedTable for partitioned table when partitionPruningPred is None

2014-10-20 Thread YanTang Zhai (JIRA)
YanTang Zhai created SPARK-4009:
---

 Summary: HiveTableScan should use makeRDDForTable instead of 
makeRDDForPartitionedTable for partitioned table when partitionPruningPred is 
None
 Key: SPARK-4009
 URL: https://issues.apache.org/jira/browse/SPARK-4009
 Project: Spark
  Issue Type: Bug
  Components: SQL
Reporter: YanTang Zhai


HiveTableScan should use makeRDDForTable instead of makeRDDForPartitionedTable 
for partitioned table when partitionPruningPred is None.
If a table has many partitions for example more than 20 thousands while it has 
a few data for example less than 512MB, some sql querying the table will 
produce more than 2 RDDs. The job would submit failed with exception: java 
stack overflow.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-3545) Put HadoopRDD.getPartitions forward and put TaskScheduler.start back in SparkContext to reduce DAGScheduler.JobSubmitted processing time and shorten cluster resources occ

2014-09-16 Thread YanTang Zhai (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-3545?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

YanTang Zhai updated SPARK-3545:

Description: 
We have two problems:
(1) HadoopRDD.getPartitions is lazyied to process in DAGScheduler.JobSubmitted. 
If inputdir is large, getPartitions may spend much time. 

For example, in our cluster, it needs from 0.029s to 766.699s. If one 
JobSubmitted event is processing, others should wait. Thus, we 

want to put HadoopRDD.getPartitions forward to reduce DAGScheduler.JobSubmitted 
processing time. Then other JobSubmitted event don't 

need to wait much time. HadoopRDD object could get its partitons when it is 
instantiated.
(2) When SparkContext object is instantiated, TaskScheduler is started and some 
resources are allocated from cluster. However, these 

resources may be not used for the moment. For example, 
DAGScheduler.JobSubmitted is processing and so on. These resources are wasted 
in 

this period. Thus, we want to put TaskScheduler.start back to shorten cluster 
resources occupation period specially for busy cluster. 

TaskScheduler could be started just before running stages.

We could analyse and compare the execution time before and after optimization.
TaskScheduler.start execution time: [time1__]
DAGScheduler.JobSubmitted (excluding HadoopRDD.getPartitions or 
TaskScheduler.start) execution time: [time2_]
HadoopRDD.getPartitions execution time: [time3___]
Stages execution time: [time4_]
(1) The app has only one job
(a)
The execution time of the job before optimization is 
[time1__][time2_][time3___][time4_].
The execution time of the job after optimization 
is[time3___][time2_][time1__][time4_].
(b)
The cluster resources occupation period before optimization is 
[time2_][time3___][time4_].
The cluster resources occupation period after optimization is[time4_].
In summary, if the app has only one job, the total execution time is same 
before and after optimization while the cluster resources 

occupation period after optimization is less than before.
(2) The app has 4 jobs
(a) Before optimization,
job1 execution time is [time2_][time3___][time4_],
job2 execution time is [time2__][time3___][time4_],
job3 execution time 
is[time2][time3___][time4_],
job4 execution time 
is[time2__][time3___][time4_].
After optimization,  
job1 execution time is [time3___][time2_][time1__][time4_],
job2 execution time is [time3___][time2__][time4_],
job3 execution time 
is[time3___][time2_][time4_],
job4 execution time 
is[time3___][time2__][time4_].
In summary, if the app has multiple jobs, average execution time after 
optimization is less than before and the cluster resources 

occupation period after optimization is less than before.

  was:
We have two problems:
(1) HadoopRDD.getPartitions is lazyied to process in DAGScheduler.JobSubmitted. 
If inputdir is large, getPartitions may spend much time. 

For example, in our cluster, it needs from 0.029s to 766.699s. If one 
JobSubmitted event is processing, others should wait. Thus, we 

want to put HadoopRDD.getPartitions forward to reduce DAGScheduler.JobSubmitted 
processing time. Then other JobSubmitted event don't 

need to wait much time. HadoopRDD object could get its partitons when it is 
instantiated.
(2) When SparkContext object is instantiated, TaskScheduler is started and some 
resources are allocated from cluster. However, these 

resources may be not used for the moment. For example, 
DAGScheduler.JobSubmitted is processing and so on. These resources are wasted 
in 

this period. Thus, we want to put TaskScheduler.start back to shorten cluster 
resources occupation period specially for busy cluster. 

TaskScheduler could be started just before running stages.

We could analyse and compare the execution time before and after optimization.
TaskScheduler.start execution time: [time1__]
DAGScheduler.JobSubmitted (excluding HadoopRDD.getPartitions or 
TaskScheduler.start) execution time: [time2_]
HadoopRDD.getPartitions execution time: [time3___]
Stages execution time: [time4_]
(1) The app has only one job
(a)
The execution time of the job before optimization is 
[time1__][time2_][time3___][time4_].
The execution time of the job after optimization 
is[time3___][time2_][time1__][time4_].
(b)
The cluster resources occupation period before optimization is 
[time2_][time3___][time4_].
The cluster resources occupation period after optimization is[time4_].
In summary, if the app has only one job, the total execution time is same 
before and after optimization while the cluster resources 

occupation period after optimization is less than before.
(2) The app has 4 jobs
(a) Before optimization,
job1 execution time is

[jira] [Updated] (SPARK-3545) Put HadoopRDD.getPartitions forward and put TaskScheduler.start back in SparkContext to reduce DAGScheduler.JobSubmitted processing time and shorten cluster resources occ

2014-09-16 Thread YanTang Zhai (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-3545?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

YanTang Zhai updated SPARK-3545:

Description: 
We have two problems:
(1) HadoopRDD.getPartitions is lazyied to process in DAGScheduler.JobSubmitted. 
If inputdir is large, getPartitions may spend much time. 

For example, in our cluster, it needs from 0.029s to 766.699s. If one 
JobSubmitted event is processing, others should wait. Thus, we 

want to put HadoopRDD.getPartitions forward to reduce DAGScheduler.JobSubmitted 
processing time. Then other JobSubmitted event don't 

need to wait much time. HadoopRDD object could get its partitons when it is 
instantiated.
(2) When SparkContext object is instantiated, TaskScheduler is started and some 
resources are allocated from cluster. However, these 

resources may be not used for the moment. For example, 
DAGScheduler.JobSubmitted is processing and so on. These resources are wasted 
in 

this period. Thus, we want to put TaskScheduler.start back to shorten cluster 
resources occupation period specially for busy cluster. 

TaskScheduler could be started just before running stages.

We could analyse and compare the execution time before and after optimization.
TaskScheduler.start execution time: [time1__]
DAGScheduler.JobSubmitted (excluding HadoopRDD.getPartitions or 
TaskScheduler.start) execution time: [time2_]
HadoopRDD.getPartitions execution time: [time3___]
Stages execution time: [time4_]
(1) The app has only one job
(a)
The execution time of the job before optimization is 
[time1__][time2_][time3___][time4_].
The execution time of the job after optimization 
is[time3___][time2_][time1__][time4_].
(b)
The cluster resources occupation period before optimization is 
[time2_][time3___][time4_].
The cluster resources occupation period after optimization is[time4_].
In summary, if the app has only one job, the total execution time is same 
before and after optimization while the cluster resources 

occupation period after optimization is less than before.
(2) The app has 4 jobs
(a) Before optimization,
job1 execution time is [time2_][time3___][time4_],
job2 execution time is [time2][time3___][time4_],
job3 execution time 
is[time2][time3___][time4_],
job4 execution time 
is[time2___][time3___][time4_].
After optimization,  
job1 execution time is [time3___][time2_][time1__][time4_],
job2 execution time is [time3___][time2___][time4_],
job3 execution time 
is[time3___][time2_][time4_],
job4 execution time 
is[time3___][time2__][time4_].
In summary, if the app has multiple jobs, average execution time after 
optimization is less than before and the cluster resources 

occupation period after optimization is less than before.

  was:
We have two problems:
(1) HadoopRDD.getPartitions is lazyied to process in DAGScheduler.JobSubmitted. 
If inputdir is large, getPartitions may spend much time. 

For example, in our cluster, it needs from 0.029s to 766.699s. If one 
JobSubmitted event is processing, others should wait. Thus, we 

want to put HadoopRDD.getPartitions forward to reduce DAGScheduler.JobSubmitted 
processing time. Then other JobSubmitted event don't 

need to wait much time. HadoopRDD object could get its partitons when it is 
instantiated.
(2) When SparkContext object is instantiated, TaskScheduler is started and some 
resources are allocated from cluster. However, these 

resources may be not used for the moment. For example, 
DAGScheduler.JobSubmitted is processing and so on. These resources are wasted 
in 

this period. Thus, we want to put TaskScheduler.start back to shorten cluster 
resources occupation period specially for busy cluster. 

TaskScheduler could be started just before running stages.

We could analyse and compare the execution time before and after optimization.
TaskScheduler.start execution time: [time1__]
DAGScheduler.JobSubmitted (excluding HadoopRDD.getPartitions or 
TaskScheduler.start) execution time: [time2_]
HadoopRDD.getPartitions execution time: [time3___]
Stages execution time: [time4_]
(1) The app has only one job
(a) 
The execution time of the job before optimization is 
[time1__][time2_][time3___][time4_].
The execution time of the job after optimization is {{ }} 
[time3___][time2_][time1__][time4_].
(b)
The cluster resources occupation period before optimization is 
[time2_][time3___][time4_].
The cluster resources occupation period after optimization is  [time4_].
In summary, if the app has only one job, the total execution time is same 
before and after optimization while the cluster resources 

occupation period after optimization is less than before.
(2) The app has 4 jobs
(a) Before optimization,

[jira] [Updated] (SPARK-3545) Put HadoopRDD.getPartitions forward and put TaskScheduler.start back in SparkContext to reduce DAGScheduler.JobSubmitted processing time and shorten cluster resources occ

2014-09-16 Thread YanTang Zhai (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-3545?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

YanTang Zhai updated SPARK-3545:

Description: 
We have two problems:
(1) HadoopRDD.getPartitions is lazyied to process in DAGScheduler.JobSubmitted. 
If inputdir is large, getPartitions may spend much time. 

For example, in our cluster, it needs from 0.029s to 766.699s. If one 
JobSubmitted event is processing, others should wait. Thus, we 

want to put HadoopRDD.getPartitions forward to reduce DAGScheduler.JobSubmitted 
processing time. Then other JobSubmitted event don't 

need to wait much time. HadoopRDD object could get its partitons when it is 
instantiated.
(2) When SparkContext object is instantiated, TaskScheduler is started and some 
resources are allocated from cluster. However, these 

resources may be not used for the moment. For example, 
DAGScheduler.JobSubmitted is processing and so on. These resources are wasted 
in 

this period. Thus, we want to put TaskScheduler.start back to shorten cluster 
resources occupation period specially for busy cluster. 

TaskScheduler could be started just before running stages.

We could analyse and compare the execution time before and after optimization.
TaskScheduler.start execution time: [time1__]
DAGScheduler.JobSubmitted (excluding HadoopRDD.getPartitions or 
TaskScheduler.start) execution time: [time2_]
HadoopRDD.getPartitions execution time: [time3___]
Stages execution time: [time4_]
(1) The app has only one job
(a) 
The execution time of the job before optimization is 
[time1__][time2_][time3___][time4_].
The execution time of the job after optimization is {{ }} 
[time3___][time2_][time1__][time4_].
(b)
The cluster resources occupation period before optimization is 
[time2_][time3___][time4_].
The cluster resources occupation period after optimization is  [time4_].
In summary, if the app has only one job, the total execution time is same 
before and after optimization while the cluster resources 

occupation period after optimization is less than before.
(2) The app has 4 jobs
(a) Before optimization,
job1 execution time is [time2_][time3___][time4_],
job2 execution time is [time2][time3___][time4_],
job3 execution time is
[time2][time3___][time4_],
job4 execution time is
[time2___][time3___][time4_].
After optimization,  
job1 execution time is [time3___][time2_][time1__][time4_],
job2 execution time is [time3___][time2___][time4_],
job3 execution time is
[time3___][time2_][time4_],
job4 execution time is
[time3___][time2__][time4_].
In summary, if the app has multiple jobs, average execution time after 
optimization is less than before and the cluster resources 

occupation period after optimization is less than before.

  was:
We have two problems:
(1) HadoopRDD.getPartitions is lazyied to process in DAGScheduler.JobSubmitted. 
If inputdir is large, getPartitions may spend much time. 

For example, in our cluster, it needs from 0.029s to 766.699s. If one 
JobSubmitted event is processing, others should wait. Thus, we 

want to put HadoopRDD.getPartitions forward to reduce DAGScheduler.JobSubmitted 
processing time. Then other JobSubmitted event don't 

need to wait much time. HadoopRDD object could get its partitons when it is 
instantiated.
(2) When SparkContext object is instantiated, TaskScheduler is started and some 
resources are allocated from cluster. However, these 

resources may be not used for the moment. For example, 
DAGScheduler.JobSubmitted is processing and so on. These resources are wasted 
in 

this period. Thus, we want to put TaskScheduler.start back to shorten cluster 
resources occupation period specially for busy cluster. 

TaskScheduler could be started just before running stages.

We could analyse and compare the execution time before and after optimization.
TaskScheduler.start execution time: [time1__]
DAGScheduler.JobSubmitted (excluding HadoopRDD.getPartitions or 
TaskScheduler.start) execution time: [time2_]
HadoopRDD.getPartitions execution time: [time3___]
Stages execution time: [time4_]
(1) The app has only one job
(a) 
The execution time of the job before optimization is 
[time1__][time2_][time3___][time4_].
The execution time of the job after optimization is{{ 
}}[time3___][time2_][time1__][time4_].
(b)
The cluster resources occupation period before optimization is 
[time2_][time3___][time4_].
The cluster resources occupation period after optimization is  [time4_].
In summary, if the app has only one job, the total execution time is same 
before and after optimization while the cluster resources 

occupation period after optimization is less than before.
(2) The app has 4 jobs
(a) Before optimization,

[jira] [Updated] (SPARK-3545) Put HadoopRDD.getPartitions forward and put TaskScheduler.start back in SparkContext to reduce DAGScheduler.JobSubmitted processing time and shorten cluster resources occ

2014-09-16 Thread YanTang Zhai (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-3545?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

YanTang Zhai updated SPARK-3545:

Description: 
We have two problems:
(1) HadoopRDD.getPartitions is lazyied to process in DAGScheduler.JobSubmitted. 
If inputdir is large, getPartitions may spend much time. 

For example, in our cluster, it needs from 0.029s to 766.699s. If one 
JobSubmitted event is processing, others should wait. Thus, we 

want to put HadoopRDD.getPartitions forward to reduce DAGScheduler.JobSubmitted 
processing time. Then other JobSubmitted event don't 

need to wait much time. HadoopRDD object could get its partitons when it is 
instantiated.
(2) When SparkContext object is instantiated, TaskScheduler is started and some 
resources are allocated from cluster. However, these 

resources may be not used for the moment. For example, 
DAGScheduler.JobSubmitted is processing and so on. These resources are wasted 
in 

this period. Thus, we want to put TaskScheduler.start back to shorten cluster 
resources occupation period specially for busy cluster. 

TaskScheduler could be started just before running stages.

We could analyse and compare the execution time before and after optimization.
TaskScheduler.start execution time: [time1__]
DAGScheduler.JobSubmitted (excluding HadoopRDD.getPartitions or 
TaskScheduler.start) execution time: [time2_]
HadoopRDD.getPartitions execution time: [time3___]
Stages execution time: [time4_]
(1) The app has only one job
(a) 
The execution time of the job before optimization is 
[time1__][time2_][time3___][time4_].
The execution time of the job after optimization is{{ 
}}[time3___][time2_][time1__][time4_].
(b)
The cluster resources occupation period before optimization is 
[time2_][time3___][time4_].
The cluster resources occupation period after optimization is  [time4_].
In summary, if the app has only one job, the total execution time is same 
before and after optimization while the cluster resources 

occupation period after optimization is less than before.
(2) The app has 4 jobs
(a) Before optimization,
job1 execution time is [time2_][time3___][time4_],
job2 execution time is [time2][time3___][time4_],
job3 execution time is
[time2][time3___][time4_],
job4 execution time is
[time2___][time3___][time4_].
After optimization,  
job1 execution time is [time3___][time2_][time1__][time4_],
job2 execution time is [time3___][time2___][time4_],
job3 execution time is
[time3___][time2_][time4_],
job4 execution time is
[time3___][time2__][time4_].
In summary, if the app has multiple jobs, average execution time after 
optimization is less than before and the cluster resources 

occupation period after optimization is less than before.

  was:
We have two problems:
(1) HadoopRDD.getPartitions is lazyied to process in DAGScheduler.JobSubmitted. 
If inputdir is large, getPartitions may spend much time. 

For example, in our cluster, it needs from 0.029s to 766.699s. If one 
JobSubmitted event is processing, others should wait. Thus, we 

want to put HadoopRDD.getPartitions forward to reduce DAGScheduler.JobSubmitted 
processing time. Then other JobSubmitted event don't 

need to wait much time. HadoopRDD object could get its partitons when it is 
instantiated.
(2) When SparkContext object is instantiated, TaskScheduler is started and some 
resources are allocated from cluster. However, these 

resources may be not used for the moment. For example, 
DAGScheduler.JobSubmitted is processing and so on. These resources are wasted 
in 

this period. Thus, we want to put TaskScheduler.start back to shorten cluster 
resources occupation period specially for busy cluster. 

TaskScheduler could be started just before running stages.

We could analyse and compare the execution time before and after optimization.
TaskScheduler.start execution time: [time1__]
DAGScheduler.JobSubmitted (excluding HadoopRDD.getPartitions or 
TaskScheduler.start) execution time: [time2_]
HadoopRDD.getPartitions execution time: [time3___]
Stages execution time: [time4_]
(1) The app has only one job
(a) 
The execution time of the job before optimization is 
[time1__][time2_][time3___][time4_].
The execution time of the job after optimization 
is{{monospaced}}{{monospaced}}{{monospaced}}{{monospaced}}[time3___][time2_][time1__][time4_].
(b)
The cluster resources occupation period before optimization is 
[time2_][time3___][time4_].
The cluster resources occupation period after optimization is  [time4_].
In summary, if the app has only one job, the total execution time is same 
before and after optimization while the cluster resources 

occupation period after optimization is less than before.

[jira] [Updated] (SPARK-3545) Put HadoopRDD.getPartitions forward and put TaskScheduler.start back in SparkContext to reduce DAGScheduler.JobSubmitted processing time and shorten cluster resources occ

2014-09-16 Thread YanTang Zhai (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-3545?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

YanTang Zhai updated SPARK-3545:

Description: 
We have two problems:
(1) HadoopRDD.getPartitions is lazyied to process in DAGScheduler.JobSubmitted. 
If inputdir is large, getPartitions may spend much time. 

For example, in our cluster, it needs from 0.029s to 766.699s. If one 
JobSubmitted event is processing, others should wait. Thus, we 

want to put HadoopRDD.getPartitions forward to reduce DAGScheduler.JobSubmitted 
processing time. Then other JobSubmitted event don't 

need to wait much time. HadoopRDD object could get its partitons when it is 
instantiated.
(2) When SparkContext object is instantiated, TaskScheduler is started and some 
resources are allocated from cluster. However, these 

resources may be not used for the moment. For example, 
DAGScheduler.JobSubmitted is processing and so on. These resources are wasted 
in 

this period. Thus, we want to put TaskScheduler.start back to shorten cluster 
resources occupation period specially for busy cluster. 

TaskScheduler could be started just before running stages.

We could analyse and compare the execution time before and after optimization.
TaskScheduler.start execution time: [time1__]
DAGScheduler.JobSubmitted (excluding HadoopRDD.getPartitions or 
TaskScheduler.start) execution time: [time2_]
HadoopRDD.getPartitions execution time: [time3___]
Stages execution time: [time4_]
(1) The app has only one job
(a) 
The execution time of the job before optimization is 
[time1__][time2_][time3___][time4_].
The execution time of the job after optimization 
is{{monospaced}}{{monospaced}}{{monospaced}}{{monospaced}}[time3___][time2_][time1__][time4_].
(b)
The cluster resources occupation period before optimization is 
[time2_][time3___][time4_].
The cluster resources occupation period after optimization is  [time4_].
In summary, if the app has only one job, the total execution time is same 
before and after optimization while the cluster resources 

occupation period after optimization is less than before.
(2) The app has 4 jobs
(a) Before optimization,
job1 execution time is [time2_][time3___][time4_],
job2 execution time is [time2][time3___][time4_],
job3 execution time is
[time2][time3___][time4_],
job4 execution time is
[time2___][time3___][time4_].
After optimization,  
job1 execution time is [time3___][time2_][time1__][time4_],
job2 execution time is [time3___][time2___][time4_],
job3 execution time is
[time3___][time2_][time4_],
job4 execution time is
[time3___][time2__][time4_].
In summary, if the app has multiple jobs, average execution time after 
optimization is less than before and the cluster resources 

occupation period after optimization is less than before.

  was:
We have two problems:
(1) HadoopRDD.getPartitions is lazyied to process in DAGScheduler.JobSubmitted. 
If inputdir is large, getPartitions may spend much time. 

For example, in our cluster, it needs from 0.029s to 766.699s. If one 
JobSubmitted event is processing, others should wait. Thus, we 

want to put HadoopRDD.getPartitions forward to reduce DAGScheduler.JobSubmitted 
processing time. Then other JobSubmitted event don't 

need to wait much time. HadoopRDD object could get its partitons when it is 
instantiated.
(2) When SparkContext object is instantiated, TaskScheduler is started and some 
resources are allocated from cluster. However, these 

resources may be not used for the moment. For example, 
DAGScheduler.JobSubmitted is processing and so on. These resources are wasted 
in 

this period. Thus, we want to put TaskScheduler.start back to shorten cluster 
resources occupation period specially for busy cluster. 

TaskScheduler could be started just before running stages.

We could analyse and compare the execution time before and after optimization.
TaskScheduler.start execution time: [time1__]
DAGScheduler.JobSubmitted (excluding HadoopRDD.getPartitions or 
TaskScheduler.start) execution time: [time2_]
HadoopRDD.getPartitions execution time: [time3___]
Stages execution time: [time4_]
(1) The app has only one job
(a) 
The execution time of the job before optimization is 
[time1__][time2_][time3___][time4_].
The execution time of the job after optimization is
[time3___][time2_][time1__][time4_].
(b)
The cluster resources occupation period before optimization is 
[time2_][time3___][time4_].
The cluster resources occupation period after optimization is  [time4_].
In summary, if the app has only one job, the total execution time is same 
before and after optimization while the cluster resources 

occupation period after optimization is less than before.

[jira] [Updated] (SPARK-3545) Put HadoopRDD.getPartitions forward and put TaskScheduler.start back in SparkContext to reduce DAGScheduler.JobSubmitted processing time and shorten cluster resources occ

2014-09-16 Thread YanTang Zhai (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-3545?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

YanTang Zhai updated SPARK-3545:

Description: 
We have two problems:
(1) HadoopRDD.getPartitions is lazyied to process in DAGScheduler.JobSubmitted. 
If inputdir is large, getPartitions may spend much time. 

For example, in our cluster, it needs from 0.029s to 766.699s. If one 
JobSubmitted event is processing, others should wait. Thus, we 

want to put HadoopRDD.getPartitions forward to reduce DAGScheduler.JobSubmitted 
processing time. Then other JobSubmitted event don't 

need to wait much time. HadoopRDD object could get its partitons when it is 
instantiated.
(2) When SparkContext object is instantiated, TaskScheduler is started and some 
resources are allocated from cluster. However, these 

resources may be not used for the moment. For example, 
DAGScheduler.JobSubmitted is processing and so on. These resources are wasted 
in 

this period. Thus, we want to put TaskScheduler.start back to shorten cluster 
resources occupation period specially for busy cluster. 

TaskScheduler could be started just before running stages.

We could analyse and compare the execution time before and after optimization.
TaskScheduler.start execution time: [time1__]
DAGScheduler.JobSubmitted (excluding HadoopRDD.getPartitions or 
TaskScheduler.start) execution time: [time2_]
HadoopRDD.getPartitions execution time: [time3___]
Stages execution time: [time4_]
(1) The app has only one job
(a) 
The execution time of the job before optimization is 
[time1__][time2_][time3___][time4_].
The execution time of the job after optimization is
[time3___][time2_][time1__][time4_].
(b)
The cluster resources occupation period before optimization is 
[time2_][time3___][time4_].
The cluster resources occupation period after optimization is  [time4_].
In summary, if the app has only one job, the total execution time is same 
before and after optimization while the cluster resources 

occupation period after optimization is less than before.
(2) The app has 4 jobs
(a) Before optimization,
job1 execution time is [time2_][time3___][time4_],
job2 execution time is [time2][time3___][time4_],
job3 execution time is
[time2][time3___][time4_],
job4 execution time is
[time2___][time3___][time4_].
After optimization,  
job1 execution time is [time3___][time2_][time1__][time4_],
job2 execution time is [time3___][time2___][time4_],
job3 execution time is
[time3___][time2_][time4_],
job4 execution time is
[time3___][time2__][time4_].
In summary, if the app has multiple jobs, average execution time after 
optimization is less than before and the cluster resources 

occupation period after optimization is less than before.

  was:
We have two problems:
(1) HadoopRDD.getPartitions is lazyied to process in DAGScheduler.JobSubmitted. 
If inputdir is large, getPartitions may spend much time. 

For example, in our cluster, it needs from 0.029s to 766.699s. If one 
JobSubmitted event is processing, others should wait. Thus, we 

want to put HadoopRDD.getPartitions forward to reduce DAGScheduler.JobSubmitted 
processing time. Then other JobSubmitted event don't 

need to wait much time. HadoopRDD object could get its partitons when it is 
instantiated.
(2) When SparkContext object is instantiated, TaskScheduler is started and some 
resources are allocated from cluster. However, these 

resources may be not used for the moment. For example, 
DAGScheduler.JobSubmitted is processing and so on. These resources are wasted 
in 

this period. Thus, we want to put TaskScheduler.start back to shorten cluster 
resources occupation period specially for busy cluster. 

TaskScheduler could be started just before running stages.

We could analyse and compare the execution time before and after optimization.
TaskScheduler.start execution time: [time1__]
DAGScheduler.JobSubmitted (excluding HadoopRDD.getPartitions or 
TaskScheduler.start) execution time: [time2_]
HadoopRDD.getPartitions execution time: [time3___]
Stages execution time: [time4_]
(1) The app has only one job
(a) The execution time of the job before optimization is 
[time1__][time2_][time3___][time4_].
The execution time of the job after optimization is  
[time3___][time2_][time1__][time4_].
(b) The cluster resources occupation period before optimization is 
[time2_][time3___][time4_].
The cluster resources occupation period after optimization is  [time4_].
In summary, if the app has only one job, the total execution time is same 
before and after optimization while the cluster resources 

occupation period after optimization is less than before.
(2) The app has 4 jobs
(a) Before optimization,

[jira] [Created] (SPARK-3545) Put HadoopRDD.getPartitions forward and put TaskScheduler.start back in SparkContext to reduce DAGScheduler.JobSubmitted processing time and shorten cluster resources occ

2014-09-16 Thread YanTang Zhai (JIRA)
YanTang Zhai created SPARK-3545:
---

 Summary: Put HadoopRDD.getPartitions forward and put 
TaskScheduler.start back in SparkContext to reduce DAGScheduler.JobSubmitted 
processing time and shorten cluster resources occupation period
 Key: SPARK-3545
 URL: https://issues.apache.org/jira/browse/SPARK-3545
 Project: Spark
  Issue Type: Improvement
Reporter: YanTang Zhai
Priority: Minor


We have two problems:
(1) HadoopRDD.getPartitions is lazyied to process in DAGScheduler.JobSubmitted. 
If inputdir is large, getPartitions may spend much time. 

For example, in our cluster, it needs from 0.029s to 766.699s. If one 
JobSubmitted event is processing, others should wait. Thus, we 

want to put HadoopRDD.getPartitions forward to reduce DAGScheduler.JobSubmitted 
processing time. Then other JobSubmitted event don't 

need to wait much time. HadoopRDD object could get its partitons when it is 
instantiated.
(2) When SparkContext object is instantiated, TaskScheduler is started and some 
resources are allocated from cluster. However, these 

resources may be not used for the moment. For example, 
DAGScheduler.JobSubmitted is processing and so on. These resources are wasted 
in 

this period. Thus, we want to put TaskScheduler.start back to shorten cluster 
resources occupation period specially for busy cluster. 

TaskScheduler could be started just before running stages.

We could analyse and compare the execution time before and after optimization.
TaskScheduler.start execution time: [time1__]
DAGScheduler.JobSubmitted (excluding HadoopRDD.getPartitions or 
TaskScheduler.start) execution time: [time2_]
HadoopRDD.getPartitions execution time: [time3___]
Stages execution time: [time4_]
(1) The app has only one job
(a) The execution time of the job before optimization is 
[time1__][time2_][time3___][time4_].
The execution time of the job after optimization is  
[time3___][time2_][time1__][time4_].
(b) The cluster resources occupation period before optimization is 
[time2_][time3___][time4_].
The cluster resources occupation period after optimization is  [time4_].
In summary, if the app has only one job, the total execution time is same 
before and after optimization while the cluster resources 

occupation period after optimization is less than before.
(2) The app has 4 jobs
(a) Before optimization, job1 execution time is [time2_][time3___][time4_],
 job2 execution time is 
[time2][time3___][time4_],
 job3 execution time is
[time2][time3___][time4_],
 job4 execution time is
[time2___][time3___][time4_].
After optimization,  job1 execution time is 
[time3___][time2_][time1__][time4_],
 job2 execution time is 
[time3___][time2___][time4_],
 job3 execution time is
[time3___][time2_][time4_],
 job4 execution time is
[time3___][time2__][time4_].
In summary, if the app has multiple jobs, average execution time after 
optimization is less than before and the cluster resources 

occupation period after optimization is less than before.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-3148) Update global variables of HttpBroadcast so that multiple SparkContexts can coexist

2014-08-20 Thread YanTang Zhai (JIRA)
YanTang Zhai created SPARK-3148:
---

 Summary: Update global variables of HttpBroadcast so that multiple 
SparkContexts can coexist
 Key: SPARK-3148
 URL: https://issues.apache.org/jira/browse/SPARK-3148
 Project: Spark
  Issue Type: Improvement
  Components: Spark Core
Reporter: YanTang Zhai
Priority: Minor


Update global variables of HttpBroadcast so that multiple SparkContexts can 
coexist



--
This message was sent by Atlassian JIRA
(v6.2#6252)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-3067) JobProgressPage could not show Fair Scheduler Pools section sometimes

2014-08-15 Thread YanTang Zhai (JIRA)
YanTang Zhai created SPARK-3067:
---

 Summary: JobProgressPage could not show Fair Scheduler Pools 
section sometimes
 Key: SPARK-3067
 URL: https://issues.apache.org/jira/browse/SPARK-3067
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Reporter: YanTang Zhai
Priority: Minor


JobProgressPage could not show Fair Scheduler Pools section sometimes.
SparkContext starts webui and then postEnvironmentUpdate. Sometimes 
JobProgressPage is accessed between webui starting and postEnvironmentUpdate, 
then the lazy val isFairScheduler will be false. The Fair Scheduler Pools 
section will not display any more.



--
This message was sent by Atlassian JIRA
(v6.2#6252)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-3003) FailedStage could not be cancelled by DAGScheduler when cancelJob or cancelStage

2014-08-13 Thread YanTang Zhai (JIRA)
YanTang Zhai created SPARK-3003:
---

 Summary: FailedStage could not be cancelled by DAGScheduler when 
cancelJob or cancelStage
 Key: SPARK-3003
 URL: https://issues.apache.org/jira/browse/SPARK-3003
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Reporter: YanTang Zhai
Priority: Minor


Some stage is changed from running to failed, then DAGSCheduler could not  
cancel it when cancelJob or cancelStage. Since in failJobAndIndependentStages, 
DAGSCheduler will only cancel runningStage and post SparkListenerStageCompleted 
for it.



--
This message was sent by Atlassian JIRA
(v6.2#6252)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-2643) Stages web ui has ERROR when pool name is None

2014-08-08 Thread YanTang Zhai (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-2643?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

YanTang Zhai updated SPARK-2643:


Description: 
14/07/23 16:01:44 WARN servlet.ServletHandler: /stages/
java.util.NoSuchElementException: None.get
at scala.None$.get(Option.scala:313)
at scala.None$.get(Option.scala:311)
at 
org.apache.spark.ui.jobs.StageTableBase.stageRow(StageTable.scala:132)
at 
org.apache.spark.ui.jobs.StageTableBase.org$apache$spark$ui$jobs$StageTableBase$$renderStageRow(StageTable.scala:150)
at 
org.apache.spark.ui.jobs.StageTableBase$$anonfun$toNodeSeq$1.apply(StageTable.scala:52)
at 
org.apache.spark.ui.jobs.StageTableBase$$anonfun$toNodeSeq$1.apply(StageTable.scala:52)
at 
org.apache.spark.ui.jobs.StageTableBase$$anonfun$stageTable$1.apply(StageTable.scala:61)
at 
org.apache.spark.ui.jobs.StageTableBase$$anonfun$stageTable$1.apply(StageTable.scala:61)
at 
scala.collection.immutable.Stream$$anonfun$map$1.apply(Stream.scala:376)
at 
scala.collection.immutable.Stream$$anonfun$map$1.apply(Stream.scala:376)
at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1085)
at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1077)
at 
scala.collection.immutable.StreamIterator$$anonfun$next$1.apply(Stream.scala:980)
at 
scala.collection.immutable.StreamIterator$$anonfun$next$1.apply(Stream.scala:980)
at 
scala.collection.immutable.StreamIterator$LazyCell.v$lzycompute(Stream.scala:969)
at 
scala.collection.immutable.StreamIterator$LazyCell.v(Stream.scala:969)
at scala.collection.immutable.StreamIterator.hasNext(Stream.scala:974)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.xml.NodeBuffer.$amp$plus(NodeBuffer.scala:38)
at scala.xml.NodeBuffer.$amp$plus(NodeBuffer.scala:40)
at 
org.apache.spark.ui.jobs.StageTableBase.stageTable(StageTable.scala:60)
at 
org.apache.spark.ui.jobs.StageTableBase.toNodeSeq(StageTable.scala:52)
at 
org.apache.spark.ui.jobs.JobProgressPage.render(JobProgressPage.scala:91)
at org.apache.spark.ui.WebUI$$anonfun$attachPage$1.apply(WebUI.scala:65)
at org.apache.spark.ui.WebUI$$anonfun$attachPage$1.apply(WebUI.scala:65)
at org.apache.spark.ui.JettyUtils$$anon$1.doGet(JettyUtils.scala:70)
at javax.servlet.http.HttpServlet.service(HttpServlet.java:707)
at javax.servlet.http.HttpServlet.service(HttpServlet.java:820)
at 
org.eclipse.jetty.servlet.ServletHolder.handle(ServletHolder.java:684)
at 
org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:501)
at 
org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1086)
at 
org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:428)
at 
org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1020)
at 
org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:135)
at 
org.eclipse.jetty.server.handler.ContextHandlerCollection.handle(ContextHandlerCollection.java:255)
at 
org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:116)
at org.eclipse.jetty.server.Server.handle(Server.java:370)
at 
org.eclipse.jetty.server.AbstractHttpConnection.handleRequest(AbstractHttpConnection.java:494)
at 
org.eclipse.jetty.server.AbstractHttpConnection.headerComplete(AbstractHttpConnection.java:971)
at 
org.eclipse.jetty.server.AbstractHttpConnection$RequestHandler.headerComplete(AbstractHttpConnection.java:1033)
at org.eclipse.jetty.http.HttpParser.parseNext(HttpParser.java:644)
at org.eclipse.jetty.http.HttpParser.parseAvailable(HttpParser.java:235)
at 
org.eclipse.jetty.server.AsyncHttpConnection.handle(AsyncHttpConnection.java:82)
at 
org.eclipse.jetty.io.nio.SelectChannelEndPoint.handle(SelectChannelEndPoint.java:667)
at 
org.eclipse.jetty.io.nio.SelectChannelEndPoint$1.run(SelectChannelEndPoint.java:52)
at 
org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:608)
at 
org.eclipse.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:543)
at java.lang.Thread.run(Thread.java:744)

14/07/23 16:01:44 WARN server.AbstractHttpConnection: /stages/
java.lang.NoSuchMethodError: 
javax.servlet.http.HttpServletRequest.isAsyncStarted()Z
at 
org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:583)
at 
org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1086)
at 
org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:428)
at 
org.eclipse.jetty.server.handler.ContextHandler.doScope(Cont

[jira] [Updated] (SPARK-2643) Stages web ui has ERROR when pool name is None

2014-08-08 Thread YanTang Zhai (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-2643?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

YanTang Zhai updated SPARK-2643:


Description: 
14/07/23 16:01:44 WARN servlet.ServletHandler: /stages/
java.util.NoSuchElementException: None.get
at scala.None$.get(Option.scala:313)
at scala.None$.get(Option.scala:311)
at 
org.apache.spark.ui.jobs.StageTableBase.stageRow(StageTable.scala:132)
at 
org.apache.spark.ui.jobs.StageTableBase.org$apache$spark$ui$jobs$StageTableBase$$renderStageRow(StageTable.scala:150)
at 
org.apache.spark.ui.jobs.StageTableBase$$anonfun$toNodeSeq$1.apply(StageTable.scala:52)
at 
org.apache.spark.ui.jobs.StageTableBase$$anonfun$toNodeSeq$1.apply(StageTable.scala:52)
at 
org.apache.spark.ui.jobs.StageTableBase$$anonfun$stageTable$1.apply(StageTable.scala:61)
at 
org.apache.spark.ui.jobs.StageTableBase$$anonfun$stageTable$1.apply(StageTable.scala:61)
at 
scala.collection.immutable.Stream$$anonfun$map$1.apply(Stream.scala:376)
at 
scala.collection.immutable.Stream$$anonfun$map$1.apply(Stream.scala:376)
at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1085)
at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1077)
at 
scala.collection.immutable.StreamIterator$$anonfun$next$1.apply(Stream.scala:980)
at 
scala.collection.immutable.StreamIterator$$anonfun$next$1.apply(Stream.scala:980)
at 
scala.collection.immutable.StreamIterator$LazyCell.v$lzycompute(Stream.scala:969)
at 
scala.collection.immutable.StreamIterator$LazyCell.v(Stream.scala:969)
at scala.collection.immutable.StreamIterator.hasNext(Stream.scala:974)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.xml.NodeBuffer.$amp$plus(NodeBuffer.scala:38)
at scala.xml.NodeBuffer.$amp$plus(NodeBuffer.scala:40)
at 
org.apache.spark.ui.jobs.StageTableBase.stageTable(StageTable.scala:60)
at 
org.apache.spark.ui.jobs.StageTableBase.toNodeSeq(StageTable.scala:52)
at 
org.apache.spark.ui.jobs.JobProgressPage.render(JobProgressPage.scala:91)
at org.apache.spark.ui.WebUI$$anonfun$attachPage$1.apply(WebUI.scala:65)
at org.apache.spark.ui.WebUI$$anonfun$attachPage$1.apply(WebUI.scala:65)
at org.apache.spark.ui.JettyUtils$$anon$1.doGet(JettyUtils.scala:70)
at javax.servlet.http.HttpServlet.service(HttpServlet.java:707)
at javax.servlet.http.HttpServlet.service(HttpServlet.java:820)
at 
org.eclipse.jetty.servlet.ServletHolder.handle(ServletHolder.java:684)
at 
org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:501)
at 
org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1086)
at 
org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:428)
at 
org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1020)
at 
org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:135)
at 
org.eclipse.jetty.server.handler.ContextHandlerCollection.handle(ContextHandlerCollection.java:255)
at 
org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:116)
at org.eclipse.jetty.server.Server.handle(Server.java:370)
at 
org.eclipse.jetty.server.AbstractHttpConnection.handleRequest(AbstractHttpConnection.java:494)
at 
org.eclipse.jetty.server.AbstractHttpConnection.headerComplete(AbstractHttpConnection.java:971)
at 
org.eclipse.jetty.server.AbstractHttpConnection$RequestHandler.headerComplete(AbstractHttpConnection.java:1033)
at org.eclipse.jetty.http.HttpParser.parseNext(HttpParser.java:644)
at org.eclipse.jetty.http.HttpParser.parseAvailable(HttpParser.java:235)
at 
org.eclipse.jetty.server.AsyncHttpConnection.handle(AsyncHttpConnection.java:82)
at 
org.eclipse.jetty.io.nio.SelectChannelEndPoint.handle(SelectChannelEndPoint.java:667)
at 
org.eclipse.jetty.io.nio.SelectChannelEndPoint$1.run(SelectChannelEndPoint.java:52)
at 
org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:608)
at 
org.eclipse.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:543)
at java.lang.Thread.run(Thread.java:744)

14/07/23 16:01:44 WARN server.AbstractHttpConnection: /stages/
java.lang.NoSuchMethodError: 
javax.servlet.http.HttpServletRequest.isAsyncStarted()Z
at 
org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:583)
at 
org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1086)
at 
org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:428)
at 
org.eclipse.jetty.server.handler.ContextHandler.doScope(Cont

[jira] [Updated] (SPARK-2643) Stages web ui has ERROR when pool name is None

2014-08-08 Thread YanTang Zhai (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-2643?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

YanTang Zhai updated SPARK-2643:


Description: 
14/07/23 16:01:44 WARN servlet.ServletHandler: /stages/
java.util.NoSuchElementException: None.get
at scala.None$.get(Option.scala:313)
at scala.None$.get(Option.scala:311)
at 
org.apache.spark.ui.jobs.StageTableBase.stageRow(StageTable.scala:132)
at 
org.apache.spark.ui.jobs.StageTableBase.org$apache$spark$ui$jobs$StageTableBase$$renderStageRow(StageTable.scala:150)
at 
org.apache.spark.ui.jobs.StageTableBase$$anonfun$toNodeSeq$1.apply(StageTable.scala:52)
at 
org.apache.spark.ui.jobs.StageTableBase$$anonfun$toNodeSeq$1.apply(StageTable.scala:52)
at 
org.apache.spark.ui.jobs.StageTableBase$$anonfun$stageTable$1.apply(StageTable.scala:61)
at 
org.apache.spark.ui.jobs.StageTableBase$$anonfun$stageTable$1.apply(StageTable.scala:61)
at 
scala.collection.immutable.Stream$$anonfun$map$1.apply(Stream.scala:376)
at 
scala.collection.immutable.Stream$$anonfun$map$1.apply(Stream.scala:376)
at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1085)
at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1077)
at 
scala.collection.immutable.StreamIterator$$anonfun$next$1.apply(Stream.scala:980)
at 
scala.collection.immutable.StreamIterator$$anonfun$next$1.apply(Stream.scala:980)
at 
scala.collection.immutable.StreamIterator$LazyCell.v$lzycompute(Stream.scala:969)
at 
scala.collection.immutable.StreamIterator$LazyCell.v(Stream.scala:969)
at scala.collection.immutable.StreamIterator.hasNext(Stream.scala:974)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.xml.NodeBuffer.$amp$plus(NodeBuffer.scala:38)
at scala.xml.NodeBuffer.$amp$plus(NodeBuffer.scala:40)
at 
org.apache.spark.ui.jobs.StageTableBase.stageTable(StageTable.scala:60)
at 
org.apache.spark.ui.jobs.StageTableBase.toNodeSeq(StageTable.scala:52)
at 
org.apache.spark.ui.jobs.JobProgressPage.render(JobProgressPage.scala:91)
at org.apache.spark.ui.WebUI$$anonfun$attachPage$1.apply(WebUI.scala:65)
at org.apache.spark.ui.WebUI$$anonfun$attachPage$1.apply(WebUI.scala:65)
at org.apache.spark.ui.JettyUtils$$anon$1.doGet(JettyUtils.scala:70)
at javax.servlet.http.HttpServlet.service(HttpServlet.java:707)
at javax.servlet.http.HttpServlet.service(HttpServlet.java:820)
at 
org.eclipse.jetty.servlet.ServletHolder.handle(ServletHolder.java:684)
at 
org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:501)
at 
org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1086)
at 
org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:428)
at 
org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1020)
at 
org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:135)
at 
org.eclipse.jetty.server.handler.ContextHandlerCollection.handle(ContextHandlerCollection.java:255)
at 
org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:116)
at org.eclipse.jetty.server.Server.handle(Server.java:370)
at 
org.eclipse.jetty.server.AbstractHttpConnection.handleRequest(AbstractHttpConnection.java:494)
at 
org.eclipse.jetty.server.AbstractHttpConnection.headerComplete(AbstractHttpConnection.java:971)
at 
org.eclipse.jetty.server.AbstractHttpConnection$RequestHandler.headerComplete(AbstractHttpConnection.java:1033)
at org.eclipse.jetty.http.HttpParser.parseNext(HttpParser.java:644)
at org.eclipse.jetty.http.HttpParser.parseAvailable(HttpParser.java:235)
at 
org.eclipse.jetty.server.AsyncHttpConnection.handle(AsyncHttpConnection.java:82)
at 
org.eclipse.jetty.io.nio.SelectChannelEndPoint.handle(SelectChannelEndPoint.java:667)
at 
org.eclipse.jetty.io.nio.SelectChannelEndPoint$1.run(SelectChannelEndPoint.java:52)
at 
org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:608)
at 
org.eclipse.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:543)
at java.lang.Thread.run(Thread.java:744)

14/07/23 16:01:44 WARN server.AbstractHttpConnection: /stages/
java.lang.NoSuchMethodError: 
javax.servlet.http.HttpServletRequest.isAsyncStarted()Z
at 
org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:583)
at 
org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1086)
at 
org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:428)
at 
org.eclipse.jetty.server.handler.ContextHandler.doScope(Cont

[jira] [Created] (SPARK-2715) ExternalAppendOnlyMap adds max limit of times and max limit of disk bytes written for spilling

2014-07-27 Thread YanTang Zhai (JIRA)
YanTang Zhai created SPARK-2715:
---

 Summary: ExternalAppendOnlyMap adds max limit of times and max 
limit of disk bytes written for spilling
 Key: SPARK-2715
 URL: https://issues.apache.org/jira/browse/SPARK-2715
 Project: Spark
  Issue Type: Improvement
  Components: Spark Core
Reporter: YanTang Zhai
Priority: Minor


ExternalAppendOnlyMap adds max limit of times and max limit of disk bytes 
written for spilling. Therefore, some task could be let fail fast instead of 
running for a long time if it has data skew.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Created] (SPARK-2714) DAGScheduler logs jobid when runJob finishes

2014-07-27 Thread YanTang Zhai (JIRA)
YanTang Zhai created SPARK-2714:
---

 Summary: DAGScheduler logs jobid when runJob finishes
 Key: SPARK-2714
 URL: https://issues.apache.org/jira/browse/SPARK-2714
 Project: Spark
  Issue Type: Documentation
  Components: Spark Core
Reporter: YanTang Zhai
Priority: Minor


DAGScheduler logs jobid when runJob finishes



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (SPARK-2647) DAGScheduler plugs others when processing one JobSubmitted event

2014-07-23 Thread YanTang Zhai (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-2647?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14071755#comment-14071755
 ] 

YanTang Zhai commented on SPARK-2647:
-

I've created PR: https://github.com/apache/spark/pull/1548. Please help to 
review. Thanks.

> DAGScheduler plugs others when processing one JobSubmitted event
> 
>
> Key: SPARK-2647
> URL: https://issues.apache.org/jira/browse/SPARK-2647
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core
>Reporter: YanTang Zhai
>
> If a few of jobs are submitted, DAGScheduler plugs others when processing one 
> JobSubmitted event.
> For example ont JobSubmitted event is processed as follows and costs much time
> "spark-akka.actor.default-dispatcher-67" daemon prio=10 
> tid=0x7f75ec001000 nid=0x7dd6 in Object.wait() [0x7f76063e1000]
>java.lang.Thread.State: WAITING (on object monitor)
>   at java.lang.Object.wait(Native Method)
>   at java.lang.Object.wait(Object.java:503)
>   at org.apache.hadoopcdh3.ipc.Client.call(Client.java:1130)
>   - locked <0x000783b17330> (a org.apache.hadoopcdh3.ipc.Client$Call)
>   at org.apache.hadoopcdh3.ipc.RPC$Invoker.invoke(RPC.java:241)
>   at com.sun.proxy.$Proxy11.getBlockLocations(Unknown Source)
>   at sun.reflect.GeneratedMethodAccessor86.invoke(Unknown Source)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:606)
>   at 
> org.apache.hadoopcdh3.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:83)
>   at 
> org.apache.hadoopcdh3.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:60)
>   at com.sun.proxy.$Proxy11.getBlockLocations(Unknown Source)
>   at 
> org.apache.hadoopcdh3.hdfs.DFSClient.callGetBlockLocations(DFSClient.java:1472)
>   at 
> org.apache.hadoopcdh3.hdfs.DFSClient.getBlockLocations(DFSClient.java:1498)
>   at 
> org.apache.hadoopcdh3.hdfs.Cdh3DistributedFileSystem$1.doCall(Cdh3DistributedFileSystem.java:208)
>   at 
> org.apache.hadoopcdh3.hdfs.Cdh3DistributedFileSystem$1.doCall(Cdh3DistributedFileSystem.java:204)
>   at 
> org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
>   at 
> org.apache.hadoopcdh3.hdfs.Cdh3DistributedFileSystem.getFileBlockLocations(Cdh3DistributedFileSystem.java:204)
>   at org.apache.hadoop.fs.FileSystem$4.next(FileSystem.java:1812)
>   at org.apache.hadoop.fs.FileSystem$4.next(FileSystem.java:1797)
>   at 
> org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:233)
>   at 
> StorageEngineClient.CombineFileInputFormat.getSplits(CombineFileInputFormat.java:141)
>   at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:172)
>   at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204)
>   at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202)
>   at scala.Option.getOrElse(Option.scala:120)
>   at org.apache.spark.rdd.RDD.partitions(RDD.scala:202)
>   at org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28)
>   at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204)
>   at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202)
>   at scala.Option.getOrElse(Option.scala:120)
>   at org.apache.spark.rdd.RDD.partitions(RDD.scala:202)
>   at 
> org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32)
>   at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204)
>   at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202)
>   at scala.Option.getOrElse(Option.scala:120)
>   at org.apache.spark.rdd.RDD.partitions(RDD.scala:202)
>   at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:54)
>   at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:54)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>   at scala.collection.immutable.List.foreach(List.scala:318)
>   at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
>   at scala.collection.AbstractTraversable.map(Traversable.scala:105)
>   at org.apache.spark.rdd.UnionRDD.getPartitions(UnionRDD.scala:54)
>   at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204)
>   at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202)
>   at scala.Option.getOrElse(Option.scala:120)
>   at org.apache.spark.rdd.RDD.partitions(RDD.scala:202)
>   at 
> org.apache.spark.rdd.MapPartitionsRDD.getPa

[jira] [Created] (SPARK-2647) DAGScheduler plugs others when processing one JobSubmitted event

2014-07-23 Thread YanTang Zhai (JIRA)
YanTang Zhai created SPARK-2647:
---

 Summary: DAGScheduler plugs others when processing one 
JobSubmitted event
 Key: SPARK-2647
 URL: https://issues.apache.org/jira/browse/SPARK-2647
 Project: Spark
  Issue Type: Improvement
  Components: Spark Core
Reporter: YanTang Zhai


If a few of jobs are submitted, DAGScheduler plugs others when processing one 
JobSubmitted event.
For example ont JobSubmitted event is processed as follows and costs much time
"spark-akka.actor.default-dispatcher-67" daemon prio=10 tid=0x7f75ec001000 
nid=0x7dd6 in Object.wait() [0x7f76063e1000]
   java.lang.Thread.State: WAITING (on object monitor)
at java.lang.Object.wait(Native Method)
at java.lang.Object.wait(Object.java:503)
at org.apache.hadoopcdh3.ipc.Client.call(Client.java:1130)
- locked <0x000783b17330> (a org.apache.hadoopcdh3.ipc.Client$Call)
at org.apache.hadoopcdh3.ipc.RPC$Invoker.invoke(RPC.java:241)
at com.sun.proxy.$Proxy11.getBlockLocations(Unknown Source)
at sun.reflect.GeneratedMethodAccessor86.invoke(Unknown Source)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at 
org.apache.hadoopcdh3.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:83)
at 
org.apache.hadoopcdh3.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:60)
at com.sun.proxy.$Proxy11.getBlockLocations(Unknown Source)
at 
org.apache.hadoopcdh3.hdfs.DFSClient.callGetBlockLocations(DFSClient.java:1472)
at 
org.apache.hadoopcdh3.hdfs.DFSClient.getBlockLocations(DFSClient.java:1498)
at 
org.apache.hadoopcdh3.hdfs.Cdh3DistributedFileSystem$1.doCall(Cdh3DistributedFileSystem.java:208)
at 
org.apache.hadoopcdh3.hdfs.Cdh3DistributedFileSystem$1.doCall(Cdh3DistributedFileSystem.java:204)
at 
org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
at 
org.apache.hadoopcdh3.hdfs.Cdh3DistributedFileSystem.getFileBlockLocations(Cdh3DistributedFileSystem.java:204)
at org.apache.hadoop.fs.FileSystem$4.next(FileSystem.java:1812)
at org.apache.hadoop.fs.FileSystem$4.next(FileSystem.java:1797)
at 
org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:233)
at 
StorageEngineClient.CombineFileInputFormat.getSplits(CombineFileInputFormat.java:141)
at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:172)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:202)
at org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:202)
at 
org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:202)
at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:54)
at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:54)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.immutable.List.foreach(List.scala:318)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at org.apache.spark.rdd.UnionRDD.getPartitions(UnionRDD.scala:54)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:202)
at 
org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:202)
at 
org.apache.spark.rdd.MapPartitionsRDD.getPartitions

[jira] [Updated] (SPARK-2643) Stages web ui has ERROR when pool name is None

2014-07-23 Thread YanTang Zhai (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-2643?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

YanTang Zhai updated SPARK-2643:


Component/s: Web UI

> Stages web ui has ERROR when pool name is None
> --
>
> Key: SPARK-2643
> URL: https://issues.apache.org/jira/browse/SPARK-2643
> Project: Spark
>  Issue Type: Bug
>  Components: Web UI
>Reporter: YanTang Zhai
>Priority: Minor
>
> 14/07/23 16:01:44 WARN servlet.ServletHandler: /stages/
> java.util.NoSuchElementException: None.get
> at scala.None$.get(Option.scala:313)
> at scala.None$.get(Option.scala:311)
> at 
> org.apache.spark.ui.jobs.StageTableBase.stageRow(StageTable.scala:132)
> at 
> org.apache.spark.ui.jobs.StageTableBase.org$apache$spark$ui$jobs$StageTableBase$$renderStageRow(StageTable.scala:150)
> at 
> org.apache.spark.ui.jobs.StageTableBase$$anonfun$toNodeSeq$1.apply(StageTable.scala:52)
> at 
> org.apache.spark.ui.jobs.StageTableBase$$anonfun$toNodeSeq$1.apply(StageTable.scala:52)
> at 
> org.apache.spark.ui.jobs.StageTableBase$$anonfun$stageTable$1.apply(StageTable.scala:61)
> at 
> org.apache.spark.ui.jobs.StageTableBase$$anonfun$stageTable$1.apply(StageTable.scala:61)
> at 
> scala.collection.immutable.Stream$$anonfun$map$1.apply(Stream.scala:376)
> at 
> scala.collection.immutable.Stream$$anonfun$map$1.apply(Stream.scala:376)
> at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1085)
> at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1077)
> at 
> scala.collection.immutable.StreamIterator$$anonfun$next$1.apply(Stream.scala:980)
> at 
> scala.collection.immutable.StreamIterator$$anonfun$next$1.apply(Stream.scala:980)
> at 
> scala.collection.immutable.StreamIterator$LazyCell.v$lzycompute(Stream.scala:969)
> at 
> scala.collection.immutable.StreamIterator$LazyCell.v(Stream.scala:969)
> at scala.collection.immutable.StreamIterator.hasNext(Stream.scala:974)
> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> at scala.xml.NodeBuffer.$amp$plus(NodeBuffer.scala:38)
> at scala.xml.NodeBuffer.$amp$plus(NodeBuffer.scala:40)
> at 
> org.apache.spark.ui.jobs.StageTableBase.stageTable(StageTable.scala:60)
> at 
> org.apache.spark.ui.jobs.StageTableBase.toNodeSeq(StageTable.scala:52)
> at 
> org.apache.spark.ui.jobs.JobProgressPage.render(JobProgressPage.scala:91)
> at 
> org.apache.spark.ui.WebUI$$anonfun$attachPage$1.apply(WebUI.scala:65)
> at 
> org.apache.spark.ui.WebUI$$anonfun$attachPage$1.apply(WebUI.scala:65)
> at org.apache.spark.ui.JettyUtils$$anon$1.doGet(JettyUtils.scala:70)
> at javax.servlet.http.HttpServlet.service(HttpServlet.java:707)
> at javax.servlet.http.HttpServlet.service(HttpServlet.java:820)
> at 
> org.eclipse.jetty.servlet.ServletHolder.handle(ServletHolder.java:684)
> at 
> org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:501)
> at 
> org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1086)
> at 
> org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:428)
> at 
> org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1020)
> at 
> org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:135)
> at 
> org.eclipse.jetty.server.handler.ContextHandlerCollection.handle(ContextHandlerCollection.java:255)
> at 
> org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:116)
> at org.eclipse.jetty.server.Server.handle(Server.java:370)
> at 
> org.eclipse.jetty.server.AbstractHttpConnection.handleRequest(AbstractHttpConnection.java:494)
> at 
> org.eclipse.jetty.server.AbstractHttpConnection.headerComplete(AbstractHttpConnection.java:971)
> at 
> org.eclipse.jetty.server.AbstractHttpConnection$RequestHandler.headerComplete(AbstractHttpConnection.java:1033)
> at org.eclipse.jetty.http.HttpParser.parseNext(HttpParser.java:644)
> at 
> org.eclipse.jetty.http.HttpParser.parseAvailable(HttpParser.java:235)
> at 
> org.eclipse.jetty.server.AsyncHttpConnection.handle(AsyncHttpConnection.java:82)
> at 
> org.eclipse.jetty.io.nio.SelectChannelEndPoint.handle(SelectChannelEndPoint.java:667)
> at 
> org.eclipse.jetty.io.nio.SelectChannelEndPoint$1.run(SelectChannelEndPoint.java:52)
> at 
> org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:608)
> at 
> org.eclipse.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:543)
> 

[jira] [Created] (SPARK-2643) Stages web ui has ERROR when pool name is None

2014-07-23 Thread YanTang Zhai (JIRA)
YanTang Zhai created SPARK-2643:
---

 Summary: Stages web ui has ERROR when pool name is None
 Key: SPARK-2643
 URL: https://issues.apache.org/jira/browse/SPARK-2643
 Project: Spark
  Issue Type: Bug
Reporter: YanTang Zhai
Priority: Minor


14/07/23 16:01:44 WARN servlet.ServletHandler: /stages/
java.util.NoSuchElementException: None.get
at scala.None$.get(Option.scala:313)
at scala.None$.get(Option.scala:311)
at 
org.apache.spark.ui.jobs.StageTableBase.stageRow(StageTable.scala:132)
at 
org.apache.spark.ui.jobs.StageTableBase.org$apache$spark$ui$jobs$StageTableBase$$renderStageRow(StageTable.scala:150)
at 
org.apache.spark.ui.jobs.StageTableBase$$anonfun$toNodeSeq$1.apply(StageTable.scala:52)
at 
org.apache.spark.ui.jobs.StageTableBase$$anonfun$toNodeSeq$1.apply(StageTable.scala:52)
at 
org.apache.spark.ui.jobs.StageTableBase$$anonfun$stageTable$1.apply(StageTable.scala:61)
at 
org.apache.spark.ui.jobs.StageTableBase$$anonfun$stageTable$1.apply(StageTable.scala:61)
at 
scala.collection.immutable.Stream$$anonfun$map$1.apply(Stream.scala:376)
at 
scala.collection.immutable.Stream$$anonfun$map$1.apply(Stream.scala:376)
at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1085)
at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1077)
at 
scala.collection.immutable.StreamIterator$$anonfun$next$1.apply(Stream.scala:980)
at 
scala.collection.immutable.StreamIterator$$anonfun$next$1.apply(Stream.scala:980)
at 
scala.collection.immutable.StreamIterator$LazyCell.v$lzycompute(Stream.scala:969)
at 
scala.collection.immutable.StreamIterator$LazyCell.v(Stream.scala:969)
at scala.collection.immutable.StreamIterator.hasNext(Stream.scala:974)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.xml.NodeBuffer.$amp$plus(NodeBuffer.scala:38)
at scala.xml.NodeBuffer.$amp$plus(NodeBuffer.scala:40)
at 
org.apache.spark.ui.jobs.StageTableBase.stageTable(StageTable.scala:60)
at 
org.apache.spark.ui.jobs.StageTableBase.toNodeSeq(StageTable.scala:52)
at 
org.apache.spark.ui.jobs.JobProgressPage.render(JobProgressPage.scala:91)
at org.apache.spark.ui.WebUI$$anonfun$attachPage$1.apply(WebUI.scala:65)
at org.apache.spark.ui.WebUI$$anonfun$attachPage$1.apply(WebUI.scala:65)
at org.apache.spark.ui.JettyUtils$$anon$1.doGet(JettyUtils.scala:70)
at javax.servlet.http.HttpServlet.service(HttpServlet.java:707)
at javax.servlet.http.HttpServlet.service(HttpServlet.java:820)
at 
org.eclipse.jetty.servlet.ServletHolder.handle(ServletHolder.java:684)
at 
org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:501)
at 
org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1086)
at 
org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:428)
at 
org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1020)
at 
org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:135)
at 
org.eclipse.jetty.server.handler.ContextHandlerCollection.handle(ContextHandlerCollection.java:255)
at 
org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:116)
at org.eclipse.jetty.server.Server.handle(Server.java:370)
at 
org.eclipse.jetty.server.AbstractHttpConnection.handleRequest(AbstractHttpConnection.java:494)
at 
org.eclipse.jetty.server.AbstractHttpConnection.headerComplete(AbstractHttpConnection.java:971)
at 
org.eclipse.jetty.server.AbstractHttpConnection$RequestHandler.headerComplete(AbstractHttpConnection.java:1033)
at org.eclipse.jetty.http.HttpParser.parseNext(HttpParser.java:644)
at org.eclipse.jetty.http.HttpParser.parseAvailable(HttpParser.java:235)
at 
org.eclipse.jetty.server.AsyncHttpConnection.handle(AsyncHttpConnection.java:82)
at 
org.eclipse.jetty.io.nio.SelectChannelEndPoint.handle(SelectChannelEndPoint.java:667)
at 
org.eclipse.jetty.io.nio.SelectChannelEndPoint$1.run(SelectChannelEndPoint.java:52)
at 
org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:608)
at 
org.eclipse.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:543)
at java.lang.Thread.run(Thread.java:744)

14/07/23 16:01:44 WARN server.AbstractHttpConnection: /stages/
java.lang.NoSuchMethodError: 
javax.servlet.http.HttpServletRequest.isAsyncStarted()Z
at 
org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:583)
at 
org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1086)
a

[jira] [Created] (SPARK-2642) Add jobId in web UI

2014-07-23 Thread YanTang Zhai (JIRA)
YanTang Zhai created SPARK-2642:
---

 Summary: Add jobId in web UI
 Key: SPARK-2642
 URL: https://issues.apache.org/jira/browse/SPARK-2642
 Project: Spark
  Issue Type: Improvement
  Components: Web UI
Reporter: YanTang Zhai
Priority: Minor


Web UI has stage id only at present. Multiple stages could not explicitly show 
as the same job.
Job id will be added in wen ui.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Created] (SPARK-2556) Multiple SparkContexts can coexist in one process

2014-07-17 Thread YanTang Zhai (JIRA)
YanTang Zhai created SPARK-2556:
---

 Summary: Multiple SparkContexts can coexist in one process
 Key: SPARK-2556
 URL: https://issues.apache.org/jira/browse/SPARK-2556
 Project: Spark
  Issue Type: Improvement
  Components: Spark Core
Reporter: YanTang Zhai
Priority: Minor


Multiple SparkContexts could not coexist in one process at present since 
different SparkContexts share same global variables. 
These global variables and objects will be modified to local in order that 
multiple SparkContexts can coexist.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (SPARK-2325) Utils.getLocalDir had better check the directory and choose a good one instead of choosing the first one directly

2014-07-03 Thread YanTang Zhai (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-2325?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14052075#comment-14052075
 ] 

YanTang Zhai commented on SPARK-2325:
-

I've created PR: https://github.com/apache/spark/pull/1281. Please help to 
review. Thanks.

> Utils.getLocalDir had better check the directory and choose a good one 
> instead of choosing the first one directly
> -
>
> Key: SPARK-2325
> URL: https://issues.apache.org/jira/browse/SPARK-2325
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Reporter: YanTang Zhai
>
> If the first directory of spark.local.dir is bad, application will exit with 
> the exception:
> Exception in thread "main" java.io.IOException: Failed to create a temp 
> directory (under /data1/sparkenv/local) after 10 attempts!
> at org.apache.spark.util.Utils$.createTempDir(Utils.scala:258)
> at 
> org.apache.spark.broadcast.HttpBroadcast$.createServer(HttpBroadcast.scala:154)
> at 
> org.apache.spark.broadcast.HttpBroadcast$.initialize(HttpBroadcast.scala:127)
> at 
> org.apache.spark.broadcast.HttpBroadcastFactory.initialize(HttpBroadcastFactory.scala:31)
> at 
> org.apache.spark.broadcast.BroadcastManager.initialize(BroadcastManager.scala:48)
> at 
> org.apache.spark.broadcast.BroadcastManager.(BroadcastManager.scala:35)
> at org.apache.spark.SparkEnv$.create(SparkEnv.scala:218)
> at org.apache.spark.SparkContext.(SparkContext.scala:202)
> at JobTaskJoin$.main(JobTaskJoin.scala:9)
> at JobTaskJoin.main(JobTaskJoin.scala)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:601)
> at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:292)
> at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:55)
> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
> Utils.getLocalDir had better check the directory and choose a good one 
> instead of choosing the first one directly. For example, spark.local.dir is 
> /data1/sparkenv/local,/data2/sparkenv/local. The disk data1 is bad while the 
> disk data2 is good, we could choose the data2 not data1.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Closed] (SPARK-1525) TaskSchedulerImpl should decrease availableCpus by spark.task.cpus not 1

2014-07-01 Thread YanTang Zhai (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-1525?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

YanTang Zhai closed SPARK-1525.
---

Resolution: Fixed

> TaskSchedulerImpl should decrease availableCpus by spark.task.cpus not 1
> 
>
> Key: SPARK-1525
> URL: https://issues.apache.org/jira/browse/SPARK-1525
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Reporter: YanTang Zhai
>Priority: Minor
>
> TaskSchedulerImpl decreases availableCpus by 1 in resourceOffers process 
> always even though spark.task.cpus is more than 1, which will schedule more 
> tasks to some node when spark.task.cpus is more than 1.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (SPARK-2324) SparkContext should not exit directly when spark.local.dir is a list of multiple paths and one of them has error

2014-07-01 Thread YanTang Zhai (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-2324?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14048745#comment-14048745
 ] 

YanTang Zhai commented on SPARK-2324:
-

I've created PR: https://github.com/apache/spark/pull/1274. Please help to 
review. Thanks.

> SparkContext should not exit directly when spark.local.dir is a list of 
> multiple paths and one of them has error
> 
>
> Key: SPARK-2324
> URL: https://issues.apache.org/jira/browse/SPARK-2324
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Reporter: YanTang Zhai
>
> The spark.local.dir is configured as a list of multiple paths as follows 
> /data1/sparkenv/local,/data2/sparkenv/local. If the disk data2 of the driver 
> node has error, the application will exit since DiskBlockManager exits 
> directly at createLocalDirs. If the disk data2 of the worker node has error, 
> the executor will exit either.
> DiskBlockManager should not exit directly at createLocalDirs if one of 
> spark.local.dir has error. Since spark.local.dir has multiple paths, a 
> problem should not affect the overall situation.
> I think DiskBlockManager could ignore the bad directory at createLocalDirs.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Created] (SPARK-2326) DiskBlockManager could add DiskChecker function for kicking off bad directories

2014-06-30 Thread YanTang Zhai (JIRA)
YanTang Zhai created SPARK-2326:
---

 Summary: DiskBlockManager could add DiskChecker function for 
kicking off bad directories
 Key: SPARK-2326
 URL: https://issues.apache.org/jira/browse/SPARK-2326
 Project: Spark
  Issue Type: Bug
Reporter: YanTang Zhai


If the disk failure happens when the spark cluster is running, DiskBlockManager 
should kick off bad directories automatically. DiskBlockManager could add 
DiskChecker function.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Created] (SPARK-2325) Utils.getLocalDir had better check the directory and choose a good one instead of choosing the first one directly

2014-06-30 Thread YanTang Zhai (JIRA)
YanTang Zhai created SPARK-2325:
---

 Summary: Utils.getLocalDir had better check the directory and 
choose a good one instead of choosing the first one directly
 Key: SPARK-2325
 URL: https://issues.apache.org/jira/browse/SPARK-2325
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Reporter: YanTang Zhai


If the first directory of spark.local.dir is bad, application will exit with 
the exception:
Exception in thread "main" java.io.IOException: Failed to create a temp 
directory (under /data1/sparkenv/local) after 10 attempts!
at org.apache.spark.util.Utils$.createTempDir(Utils.scala:258)
at 
org.apache.spark.broadcast.HttpBroadcast$.createServer(HttpBroadcast.scala:154)
at 
org.apache.spark.broadcast.HttpBroadcast$.initialize(HttpBroadcast.scala:127)
at 
org.apache.spark.broadcast.HttpBroadcastFactory.initialize(HttpBroadcastFactory.scala:31)
at 
org.apache.spark.broadcast.BroadcastManager.initialize(BroadcastManager.scala:48)
at 
org.apache.spark.broadcast.BroadcastManager.(BroadcastManager.scala:35)
at org.apache.spark.SparkEnv$.create(SparkEnv.scala:218)
at org.apache.spark.SparkContext.(SparkContext.scala:202)
at JobTaskJoin$.main(JobTaskJoin.scala:9)
at JobTaskJoin.main(JobTaskJoin.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:601)
at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:292)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:55)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

Utils.getLocalDir had better check the directory and choose a good one instead 
of choosing the first one directly. For example, spark.local.dir is 
/data1/sparkenv/local,/data2/sparkenv/local. The disk data1 is bad while the 
disk data2 is good, we could choose the data2 not data1.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Created] (SPARK-2324) SparkContext should not exit directly when spark.local.dir is a list of multiple paths and one of them has error

2014-06-30 Thread YanTang Zhai (JIRA)
YanTang Zhai created SPARK-2324:
---

 Summary: SparkContext should not exit directly when 
spark.local.dir is a list of multiple paths and one of them has error
 Key: SPARK-2324
 URL: https://issues.apache.org/jira/browse/SPARK-2324
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Reporter: YanTang Zhai


The spark.local.dir is configured as a list of multiple paths as follows 
/data1/sparkenv/local,/data2/sparkenv/local. If the disk data2 of the driver 
node has error, the application will exit since DiskBlockManager exits directly 
at createLocalDirs. If the disk data2 of the worker node has error, the 
executor will exit either.
DiskBlockManager should not exit directly at createLocalDirs if one of 
spark.local.dir has error. Since spark.local.dir has multiple paths, a problem 
should not affect the overall situation.
I think DiskBlockManager could ignore the bad directory at createLocalDirs.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Commented] (SPARK-2290) Worker should directly use its own sparkHome instead of appDesc.sparkHome when LaunchExecutor

2014-06-27 Thread YanTang Zhai (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-2290?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14045691#comment-14045691
 ] 

YanTang Zhai commented on SPARK-2290:
-

I've created PR: https://github.com/apache/spark/pull/1244. Please help to 
review, thanks.

> Worker should directly use its own sparkHome instead of appDesc.sparkHome 
> when LaunchExecutor
> -
>
> Key: SPARK-2290
> URL: https://issues.apache.org/jira/browse/SPARK-2290
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Reporter: YanTang Zhai
>Priority: Minor
>
> The client path is /data/home/spark/test/spark-1.0.0 while the worker deploy 
> path is /data/home/spark/spark-1.0.0 which is different from the client path. 
> Then an application is launched using the ./bin/spark-submit --class 
> JobTaskJoin --master spark://172.25.38.244:7077 --executor-memory 128M 
> ../jobtaskjoin_2.10-1.0.0.jar. However the application is failed because an 
> exception occurs at 
> java.io.IOException: Cannot run program 
> "/data/home/spark/test/spark-1.0.0-bin-0.20.2-cdh3u3/bin/compute-classpath.sh"
>  (in directory "."): error=2, No such file or directory
> at java.lang.ProcessBuilder.start(ProcessBuilder.java:1029)
> at org.apache.spark.util.Utils$.executeAndGetOutput(Utils.scala:759)
> at 
> org.apache.spark.deploy.worker.CommandUtils$.buildJavaOpts(CommandUtils.scala:72)
> at 
> org.apache.spark.deploy.worker.CommandUtils$.buildCommandSeq(CommandUtils.scala:37)
> at 
> org.apache.spark.deploy.worker.ExecutorRunner.getCommandSeq(ExecutorRunner.scala:109)
> at 
> org.apache.spark.deploy.worker.ExecutorRunner.fetchAndRunExecutor(ExecutorRunner.scala:124)
> at 
> org.apache.spark.deploy.worker.ExecutorRunner$$anon$1.run(ExecutorRunner.scala:58)
> Caused by: java.io.IOException: error=2, No such file or directory
> at java.lang.UNIXProcess.forkAndExec(Native Method)
> at java.lang.UNIXProcess.(UNIXProcess.java:135)
> at java.lang.ProcessImpl.start(ProcessImpl.java:130)
> at java.lang.ProcessBuilder.start(ProcessBuilder.java:1021)
> ... 6 more
> Therefore, I think worker should not use appDesc.sparkHome when 
> LaunchExecutor, Contrarily, worker could use its own sparkHome directly.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Created] (SPARK-2290) Worker should directly use its own sparkHome instead of appDesc.sparkHome when LaunchExecutor

2014-06-26 Thread YanTang Zhai (JIRA)
YanTang Zhai created SPARK-2290:
---

 Summary: Worker should directly use its own sparkHome instead of 
appDesc.sparkHome when LaunchExecutor
 Key: SPARK-2290
 URL: https://issues.apache.org/jira/browse/SPARK-2290
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Reporter: YanTang Zhai
Priority: Minor


The client path is /data/home/spark/test/spark-1.0.0 while the worker deploy 
path is /data/home/spark/spark-1.0.0 which is different from the client path. 
Then an application is launched using the ./bin/spark-submit --class 
JobTaskJoin --master spark://172.25.38.244:7077 --executor-memory 128M 
../jobtaskjoin_2.10-1.0.0.jar. However the application is failed because an 
exception occurs at 
java.io.IOException: Cannot run program 
"/data/home/spark/test/spark-1.0.0-bin-0.20.2-cdh3u3/bin/compute-classpath.sh" 
(in directory "."): error=2, No such file or directory
at java.lang.ProcessBuilder.start(ProcessBuilder.java:1029)
at org.apache.spark.util.Utils$.executeAndGetOutput(Utils.scala:759)
at 
org.apache.spark.deploy.worker.CommandUtils$.buildJavaOpts(CommandUtils.scala:72)
at 
org.apache.spark.deploy.worker.CommandUtils$.buildCommandSeq(CommandUtils.scala:37)
at 
org.apache.spark.deploy.worker.ExecutorRunner.getCommandSeq(ExecutorRunner.scala:109)
at 
org.apache.spark.deploy.worker.ExecutorRunner.fetchAndRunExecutor(ExecutorRunner.scala:124)
at 
org.apache.spark.deploy.worker.ExecutorRunner$$anon$1.run(ExecutorRunner.scala:58)
Caused by: java.io.IOException: error=2, No such file or directory
at java.lang.UNIXProcess.forkAndExec(Native Method)
at java.lang.UNIXProcess.(UNIXProcess.java:135)
at java.lang.ProcessImpl.start(ProcessImpl.java:130)
at java.lang.ProcessBuilder.start(ProcessBuilder.java:1021)
... 6 more

Therefore, I think worker should not use appDesc.sparkHome when LaunchExecutor, 
Contrarily, worker could use its own sparkHome directly.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Created] (SPARK-1525) TaskSchedulerImpl should decrease availableCpus by spark.task.cpus not 1

2014-04-17 Thread YanTang Zhai (JIRA)
YanTang Zhai created SPARK-1525:
---

 Summary: TaskSchedulerImpl should decrease availableCpus by 
spark.task.cpus not 1
 Key: SPARK-1525
 URL: https://issues.apache.org/jira/browse/SPARK-1525
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Reporter: YanTang Zhai
Priority: Minor


TaskSchedulerImpl decreases availableCpus by 1 in resourceOffers process always 
even though spark.task.cpus is more than 1, which will schedule more tasks to 
some node when spark.task.cpus is more than 1.



--
This message was sent by Atlassian JIRA
(v6.2#6252)


[jira] [Created] (SPARK-1524) TaskSetManager'd better not schedule tasks which has no preferred executorId using PROCESS_LOCAL in the first search process

2014-04-17 Thread YanTang Zhai (JIRA)
YanTang Zhai created SPARK-1524:
---

 Summary: TaskSetManager'd better not schedule tasks which has no 
preferred executorId using PROCESS_LOCAL in the first search process
 Key: SPARK-1524
 URL: https://issues.apache.org/jira/browse/SPARK-1524
 Project: Spark
  Issue Type: Improvement
  Components: Spark Core
Reporter: YanTang Zhai
Priority: Minor


ShuffleMapTask is constructed with TaskLocation which has only host not (host, 
executorID) pair in DAGScheduler.
When TaskSetManager schedules ShuffleMapTask which has no preferred executorId 
using specific execId host and PROCESS_LOCAL locality level, no tasks match the 
given locality constraint in the first search process.
We also find that the host used by Scheduler is hostname while the host used by 
TaskLocation is IP in our cluster. The tow hosts do not match, that makes 
pendingTasksForHost HashMap empty and the finding task process against our 
expectation.



--
This message was sent by Atlassian JIRA
(v6.2#6252)