[jira] [Created] (SPARK-5316) DAGScheduler may make shuffleToMapStage leak if getParentStages failes
YanTang Zhai created SPARK-5316: --- Summary: DAGScheduler may make shuffleToMapStage leak if getParentStages failes Key: SPARK-5316 URL: https://issues.apache.org/jira/browse/SPARK-5316 Project: Spark Issue Type: Bug Components: Spark Core Reporter: YanTang Zhai Priority: Minor DAGScheduler may make shuffleToMapStage leak if getParentStages failes. If getParentStages has exception for example input path does not exist, DAGScheduler would fail to handle job submission, while shuffleToMapStage may be put some records when getParentStages. However these records in shuffleToMapStage aren't going to be cleaned. A simple job as follows: {code:java} val inputFile1 = ... // Input path does not exist when this job submits val inputFile2 = ... val outputFile = ... val conf = new SparkConf() val sc = new SparkContext(conf) val rdd1 = sc.textFile(inputFile1) .flatMap(line => line.split(" ")) .map(word => (word, 1)) .reduceByKey(_ + _, 1) val rdd2 = sc.textFile(inputFile2) .flatMap(line => line.split(",")) .map(word => (word, 1)) .reduceByKey(_ + _, 1) try { val rdd3 = new PairRDDFunctions(rdd1).join(rdd2, 1) rdd3.saveAsTextFile(outputFile) } catch { case e : Exception => logError(e) } // print the information of DAGScheduler's shuffleToMapStage to check // whether it still has uncleaned records. ... {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-5163) Load properties from configuration file for example spark-defaults.conf when creating SparkConf object
YanTang Zhai created SPARK-5163: --- Summary: Load properties from configuration file for example spark-defaults.conf when creating SparkConf object Key: SPARK-5163 URL: https://issues.apache.org/jira/browse/SPARK-5163 Project: Spark Issue Type: Improvement Components: Spark Core Reporter: YanTang Zhai Priority: Minor I create and run a Spark program which does not use SparkSubmit. When I create a SparkConf object with `new SparkConf()`, it will not automatically load properties from configuration file for example spark-defaults.conf. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Closed] (SPARK-5007) Try random port when startServiceOnPort to reduce the chance of port collision
[ https://issues.apache.org/jira/browse/SPARK-5007?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] YanTang Zhai closed SPARK-5007. --- Resolution: Won't Fix > Try random port when startServiceOnPort to reduce the chance of port collision > -- > > Key: SPARK-5007 > URL: https://issues.apache.org/jira/browse/SPARK-5007 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Reporter: YanTang Zhai >Priority: Minor > > When multiple Spark programs are submitted at the same node (called > springboard machine). The ports (default 4040) of these SparkUIs are from > 4040 to 4056. Then the Spark programs submitted later would fail because of > SparkUI port collision. > The chance of port collision could be reduced by setting spark.ui.port or > spark.port.maxRetries. > However, I think it's better to try random port when startServiceOnPort to > reduce the chance of port collision. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-5007) Try random port when startServiceOnPort to reduce the chance of port collision
[ https://issues.apache.org/jira/browse/SPARK-5007?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14270421#comment-14270421 ] YanTang Zhai commented on SPARK-5007: - [~rxin] Oh, I see. Thank you very much. > Try random port when startServiceOnPort to reduce the chance of port collision > -- > > Key: SPARK-5007 > URL: https://issues.apache.org/jira/browse/SPARK-5007 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Reporter: YanTang Zhai >Priority: Minor > > When multiple Spark programs are submitted at the same node (called > springboard machine). The ports (default 4040) of these SparkUIs are from > 4040 to 4056. Then the Spark programs submitted later would fail because of > SparkUI port collision. > The chance of port collision could be reduced by setting spark.ui.port or > spark.port.maxRetries. > However, I think it's better to try random port when startServiceOnPort to > reduce the chance of port collision. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-5007) Try random port when startServiceOnPort to reduce the chance of port collision
YanTang Zhai created SPARK-5007: --- Summary: Try random port when startServiceOnPort to reduce the chance of port collision Key: SPARK-5007 URL: https://issues.apache.org/jira/browse/SPARK-5007 Project: Spark Issue Type: Improvement Components: Spark Core Reporter: YanTang Zhai Priority: Minor When multiple Spark programs are submitted at the same node (called springboard machine). The ports (default 4040) of these SparkUIs are from 4040 to 4056. Then the Spark programs submitted later would fail because of SparkUI port collision. The chance of port collision could be reduced by setting spark.ui.port or spark.port.maxRetries. However, I think it's better to try random port when startServiceOnPort to reduce the chance of port collision. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-4962) Put TaskScheduler.start back in SparkContext to shorten cluster resources occupation period
[ https://issues.apache.org/jira/browse/SPARK-4962?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14259041#comment-14259041 ] YanTang Zhai commented on SPARK-4962: - The main purpose of this PR is to decrease resources waste for busy cluster. For example, a process initializes a SparkContext instance, reads a few files from HDFS or many records from PostgreSQL, and then calls RDD's collect operation to submit a job. When SparkContext is initialized, an app is submitted to cluster and some resources are hold by this app. These resources are not used really until the job is submitted by RDD's action. The resources in the period from initialization to actual use could be considered wasteful. If app is submitted when SparkContext is initialized, all of resources needed by the app may be granted before running job. Then the job could runs efficiently without resource constraint. On the contrary, if app is submitted when job is submitted, resources needed by the app may be granted at different times. Then the job may run not so efficiently since some resources are applying. Thus I use a configuration parameter spark.scheduler.app.slowstart (default false) to let user make tradeoffs between economy and efficiency. There are 9 kinds of master URL and 6 kinds of SchedulerBackend. LocalBackend and SimrSchedulerBackend don't need to put starting back since there is no difference. SparkClusterSchedulerBackend (yarn-standalone or yarn-cluster) does not put starting back since the app should be submitted in advance by SparkSubmit. CoarseMesosSchedulerBackend and MesosSchedulerBackend could put starting back. YarnClientSchedulerBackend (yarn-client) could put starting back. This PR puts TaskScheduler.start back only for yarn-client mode in the early. > Put TaskScheduler.start back in SparkContext to shorten cluster resources > occupation period > --- > > Key: SPARK-4962 > URL: https://issues.apache.org/jira/browse/SPARK-4962 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Reporter: YanTang Zhai >Priority: Minor > > When SparkContext object is instantiated, TaskScheduler is started and some > resources are allocated from cluster. However, these > resources may be not used for the moment. For example, > DAGScheduler.JobSubmitted is processing and so on. These resources are wasted > in > this period. Thus, we want to put TaskScheduler.start back to shorten cluster > resources occupation period specially for busy cluster. > TaskScheduler could be started just before running stages. > We could analyse and compare the resources occupation period before and > after optimization. > TaskScheduler.start execution time: [time1__] > DAGScheduler.JobSubmitted (excluding HadoopRDD.getPartitions or > TaskScheduler.start) execution time: [time2_] > HadoopRDD.getPartitions execution time: [time3___] > Stages execution time: [time4_] > The cluster resources occupation period before optimization is > [time2_][time3___][time4_]. > The cluster resources occupation period after optimization > is[time3___][time4_]. > In summary, the cluster resources > occupation period after optimization is less than before. > If HadoopRDD.getPartitions could be put forward (SPARK-4961), the period may > be shorten more which is [time4_]. > The resources saving is important for busy cluster. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-4962) Put TaskScheduler.start back in SparkContext to shorten cluster resources occupation period
YanTang Zhai created SPARK-4962: --- Summary: Put TaskScheduler.start back in SparkContext to shorten cluster resources occupation period Key: SPARK-4962 URL: https://issues.apache.org/jira/browse/SPARK-4962 Project: Spark Issue Type: Improvement Components: Spark Core Reporter: YanTang Zhai Priority: Minor When SparkContext object is instantiated, TaskScheduler is started and some resources are allocated from cluster. However, these resources may be not used for the moment. For example, DAGScheduler.JobSubmitted is processing and so on. These resources are wasted in this period. Thus, we want to put TaskScheduler.start back to shorten cluster resources occupation period specially for busy cluster. TaskScheduler could be started just before running stages. We could analyse and compare the resources occupation period before and after optimization. TaskScheduler.start execution time: [time1__] DAGScheduler.JobSubmitted (excluding HadoopRDD.getPartitions or TaskScheduler.start) execution time: [time2_] HadoopRDD.getPartitions execution time: [time3___] Stages execution time: [time4_] The cluster resources occupation period before optimization is [time2_][time3___][time4_]. The cluster resources occupation period after optimization is[time3___][time4_]. In summary, the cluster resources occupation period after optimization is less than before. If HadoopRDD.getPartitions could be put forward (SPARK-4961), the period may be shorten more which is [time4_]. The resources saving is important for busy cluster. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-4961) Put HadoopRDD.getPartitions forward to reduce DAGScheduler.JobSubmitted processing time
YanTang Zhai created SPARK-4961: --- Summary: Put HadoopRDD.getPartitions forward to reduce DAGScheduler.JobSubmitted processing time Key: SPARK-4961 URL: https://issues.apache.org/jira/browse/SPARK-4961 Project: Spark Issue Type: Improvement Components: Spark Core Reporter: YanTang Zhai Priority: Minor HadoopRDD.getPartitions is lazyied to process in DAGScheduler.JobSubmitted. If inputdir is large, getPartitions may spend much time. For example, in our cluster, it needs from 0.029s to 766.699s. If one JobSubmitted event is processing, others should wait. Thus, we want to put HadoopRDD.getPartitions forward to reduce DAGScheduler.JobSubmitted processing time. Then other JobSubmitted event don't need to wait much time. HadoopRDD object could get its partitons when it is instantiated. We could analyse and compare the execution time before and after optimization. TaskScheduler.start execution time: [time1__] DAGScheduler.JobSubmitted (excluding HadoopRDD.getPartitions or TaskScheduler.start) execution time: [time2_] HadoopRDD.getPartitions execution time: [time3___] Stages execution time: [time4_] (1) The app has only one job (a) The execution time of the job before optimization is [time1__][time2_][time3___][time4_]. The execution time of the job after optimization is[time1__][time3___][time2_][time4_]. In summary, if the app has only one job, the total execution time is same before and after optimization. (2) The app has 4 jobs (a) Before optimization, job1 execution time is [time2_][time3___][time4_], job2 execution time is [time2__][time3___][time4_], job3 execution time is[time2][time3___][time4_], job4 execution time is[time2_][time3___][time4_]. After optimization, job1 execution time is [time3___][time2_][time4_], job2 execution time is [time3___][time2__][time4_], job3 execution time is[time3___][time2_][time4_], job4 execution time is[time3___][time2__][time4_]. In summary, if the app has multiple jobs, average execution time after optimization is less than before. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-4946) Using AkkaUtils.askWithReply in MapOutputTracker.askTracker to reduce the chance of the communicating problem
YanTang Zhai created SPARK-4946: --- Summary: Using AkkaUtils.askWithReply in MapOutputTracker.askTracker to reduce the chance of the communicating problem Key: SPARK-4946 URL: https://issues.apache.org/jira/browse/SPARK-4946 Project: Spark Issue Type: Improvement Components: Spark Core Reporter: YanTang Zhai Priority: Minor Using AkkaUtils.askWithReply in MapOutputTracker.askTracker to reduce the chance of the communicating problem. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-4723) To abort the stages which have attempted some times
YanTang Zhai created SPARK-4723: --- Summary: To abort the stages which have attempted some times Key: SPARK-4723 URL: https://issues.apache.org/jira/browse/SPARK-4723 Project: Spark Issue Type: Improvement Reporter: YanTang Zhai Priority: Minor For some reason, some stages may attempt many times. A threshold could be added and the stages which have attempted more than the threshold could be aborted. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-4693) PruningPredicates may be wrong if predicates contains an empty AttributeSet() references
YanTang Zhai created SPARK-4693: --- Summary: PruningPredicates may be wrong if predicates contains an empty AttributeSet() references Key: SPARK-4693 URL: https://issues.apache.org/jira/browse/SPARK-4693 Project: Spark Issue Type: Bug Components: SQL Reporter: YanTang Zhai Priority: Minor The sql "select * from spark_test::for_test where abs(20141202) is not null" has predicates=List(IS NOT NULL HiveSimpleUdf#org.apache.hadoop.hive.ql.udf.UDFAbs(20141202)) and partitionKeyIds=AttributeSet(). PruningPredicates is List(IS NOT NULL HiveSimpleUdf#org.apache.hadoop.hive.ql.udf.UDFAbs(20141202)). Then the exception "java.lang.IllegalArgumentException: requirement failed: Partition pruning predicates only supported for partitioned tables." is thrown. The sql "select * from spark_test::for_test_partitioned_table where abs(20141202) is not null and type_id=11 and platform = 3" with partitioned key insert_date has predicates=List(IS NOT NULL HiveSimpleUdf#org.apache.hadoop.hive.ql.udf.UDFAbs(20141202), (type_id#12 = 11), (platform#8 = 3)) and partitionKeyIds=AttributeSet(insert_date#24). PruningPredicates is List(IS NOT NULL HiveSimpleUdf#org.apache.hadoop.hive.ql.udf.UDFAbs(20141202)). -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-4692) Support ! boolean logic operator like NOT
YanTang Zhai created SPARK-4692: --- Summary: Support ! boolean logic operator like NOT Key: SPARK-4692 URL: https://issues.apache.org/jira/browse/SPARK-4692 Project: Spark Issue Type: Improvement Components: SQL Reporter: YanTang Zhai Priority: Minor select * from for_test where !(col1 > col2) -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-4677) Add hadoop input time in task webui
YanTang Zhai created SPARK-4677: --- Summary: Add hadoop input time in task webui Key: SPARK-4677 URL: https://issues.apache.org/jira/browse/SPARK-4677 Project: Spark Issue Type: Improvement Components: Web UI Reporter: YanTang Zhai Priority: Minor Add hadoop input time in task webui like GC Time to explicitly show the time used by task to read input data. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-4676) JavaSchemaRDD.schema may throw NullType MatchError if sql has null
YanTang Zhai created SPARK-4676: --- Summary: JavaSchemaRDD.schema may throw NullType MatchError if sql has null Key: SPARK-4676 URL: https://issues.apache.org/jira/browse/SPARK-4676 Project: Spark Issue Type: Bug Components: SQL Reporter: YanTang Zhai val jsc = new org.apache.spark.api.java.JavaSparkContext(sc) val jhc = new org.apache.spark.sql.hive.api.java.JavaHiveContext(jsc) val nrdd = jhc.hql("select null from spark_test.for_test") println(nrdd.schema) Then the error is thrown as follows: scala.MatchError: NullType (of class org.apache.spark.sql.catalyst.types.NullType$) at org.apache.spark.sql.types.util.DataTypeConversions$.asJavaDataType(DataTypeConversions.scala:43) -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-4401) RuleExecutor correctly logs trace iteration num
YanTang Zhai created SPARK-4401: --- Summary: RuleExecutor correctly logs trace iteration num Key: SPARK-4401 URL: https://issues.apache.org/jira/browse/SPARK-4401 Project: Spark Issue Type: Bug Components: SQL Reporter: YanTang Zhai RuleExecutor logs trace wrong iteration num as follows if (curPlan.fastEquals(lastPlan)) { logTrace(s"Fixed point reached for batch ${batch.name} after $iteration iterations.") continue = false } -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-4334) Utils.startServiceOnPort should check whether the tryPort is less than 1024
[ https://issues.apache.org/jira/browse/SPARK-4334?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] YanTang Zhai resolved SPARK-4334. - Resolution: Duplicate > Utils.startServiceOnPort should check whether the tryPort is less than 1024 > --- > > Key: SPARK-4334 > URL: https://issues.apache.org/jira/browse/SPARK-4334 > Project: Spark > Issue Type: Bug > Components: Spark Core >Reporter: YanTang Zhai >Priority: Minor > > Utils.startServiceOnPort should check whether the tryPort is less than 1024. > If next attempt port is less than 1024, SockertException with "Permission > denied" will be thrown. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-4334) Utils.startServiceOnPort should check whether the tryPort is less than 1024
YanTang Zhai created SPARK-4334: --- Summary: Utils.startServiceOnPort should check whether the tryPort is less than 1024 Key: SPARK-4334 URL: https://issues.apache.org/jira/browse/SPARK-4334 Project: Spark Issue Type: Bug Components: Spark Core Reporter: YanTang Zhai Priority: Minor Utils.startServiceOnPort should check whether the tryPort is less than 1024. If next attempt port is less than 1024, SockertException with "Permission denied" will be thrown. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-4316) Utils.isBindCollision misjudges at Non-English environment
[ https://issues.apache.org/jira/browse/SPARK-4316?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14205810#comment-14205810 ] YanTang Zhai commented on SPARK-4316: - Thanks @Sean Owen > Utils.isBindCollision misjudges at Non-English environment > -- > > Key: SPARK-4316 > URL: https://issues.apache.org/jira/browse/SPARK-4316 > Project: Spark > Issue Type: Bug > Components: Spark Core >Reporter: YanTang Zhai >Priority: Minor > > export LANG=zh_CN.utf8 > export LC_ALL=zh_CN.utf8 > Then Utils.isBindCollision misjudges since BindException's message is > "地址已在使用" not "Address already in use". -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-4316) Utils.isBindCollision misjudges at Non-English environment
YanTang Zhai created SPARK-4316: --- Summary: Utils.isBindCollision misjudges at Non-English environment Key: SPARK-4316 URL: https://issues.apache.org/jira/browse/SPARK-4316 Project: Spark Issue Type: Bug Components: Spark Core Reporter: YanTang Zhai Priority: Minor export LANG=zh_CN.utf8 export LC_ALL=zh_CN.utf8 Then Utils.isBindCollision misjudges since BindException's message is "地址已在使用" not "Address already in use". -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-4273) Providing ExternalSet to avoid OOM when count(distinct)
YanTang Zhai created SPARK-4273: --- Summary: Providing ExternalSet to avoid OOM when count(distinct) Key: SPARK-4273 URL: https://issues.apache.org/jira/browse/SPARK-4273 Project: Spark Issue Type: Improvement Components: Spark Core, SQL Reporter: YanTang Zhai Priority: Minor Some task may OOM when count(distinct) if it needs to process many records. CombineSetsAndCountFunction puts all records into an OpenHashSet, if it fetchs many records, it may occupy large memory. I think a data structure ExternalSet like ExternalAppendOnlyMap could be provided to store OpenHashSet data in disks when it's capacity exceeds some threshold. For example, OpenHashSet1(ohs1) has [d, b, c, a]. It is spilled to file1 with hashCode sorted, then the file1 contains [a, b, c, d]. The procedure could be indicated as follows: ohs1 [d, b, c, a] => [a, b, c, d] => file1 ohs2 [e, f, g, a] => [a, e, f, g] => file2 ohs3 [e, h, i, g] => [e, g, h, i] => file3 ohs4 [j, h, a] => [a, h, j] => sortedSet When output, all keys with the same hashCode will be put into a OpenHashSet, then the iterator of this OpenHashSet is accessing. The procedure could be indicated as follows: file1-> a -> ohsA; file2 -> a -> ohsA; sortedSet -> a -> ohsA; ohsA -> a; file1 -> b -> ohsB; ohsB -> b; file1 -> c -> ohsC; ohsC -> c; file1 -> d -> ohsD; ohsD -> d; file2 -> e -> ohsE; file3 -> e -> ohsE; ohsE-> e; ... I think using the ExternalSet could avoid OOM when count(distinct). Welcomes comments. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-4009) HiveTableScan should use makeRDDForTable instead of makeRDDForPartitionedTable for partitioned table when partitionPruningPred is None
YanTang Zhai created SPARK-4009: --- Summary: HiveTableScan should use makeRDDForTable instead of makeRDDForPartitionedTable for partitioned table when partitionPruningPred is None Key: SPARK-4009 URL: https://issues.apache.org/jira/browse/SPARK-4009 Project: Spark Issue Type: Bug Components: SQL Reporter: YanTang Zhai HiveTableScan should use makeRDDForTable instead of makeRDDForPartitionedTable for partitioned table when partitionPruningPred is None. If a table has many partitions for example more than 20 thousands while it has a few data for example less than 512MB, some sql querying the table will produce more than 2 RDDs. The job would submit failed with exception: java stack overflow. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-3545) Put HadoopRDD.getPartitions forward and put TaskScheduler.start back in SparkContext to reduce DAGScheduler.JobSubmitted processing time and shorten cluster resources occ
[ https://issues.apache.org/jira/browse/SPARK-3545?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] YanTang Zhai updated SPARK-3545: Description: We have two problems: (1) HadoopRDD.getPartitions is lazyied to process in DAGScheduler.JobSubmitted. If inputdir is large, getPartitions may spend much time. For example, in our cluster, it needs from 0.029s to 766.699s. If one JobSubmitted event is processing, others should wait. Thus, we want to put HadoopRDD.getPartitions forward to reduce DAGScheduler.JobSubmitted processing time. Then other JobSubmitted event don't need to wait much time. HadoopRDD object could get its partitons when it is instantiated. (2) When SparkContext object is instantiated, TaskScheduler is started and some resources are allocated from cluster. However, these resources may be not used for the moment. For example, DAGScheduler.JobSubmitted is processing and so on. These resources are wasted in this period. Thus, we want to put TaskScheduler.start back to shorten cluster resources occupation period specially for busy cluster. TaskScheduler could be started just before running stages. We could analyse and compare the execution time before and after optimization. TaskScheduler.start execution time: [time1__] DAGScheduler.JobSubmitted (excluding HadoopRDD.getPartitions or TaskScheduler.start) execution time: [time2_] HadoopRDD.getPartitions execution time: [time3___] Stages execution time: [time4_] (1) The app has only one job (a) The execution time of the job before optimization is [time1__][time2_][time3___][time4_]. The execution time of the job after optimization is[time3___][time2_][time1__][time4_]. (b) The cluster resources occupation period before optimization is [time2_][time3___][time4_]. The cluster resources occupation period after optimization is[time4_]. In summary, if the app has only one job, the total execution time is same before and after optimization while the cluster resources occupation period after optimization is less than before. (2) The app has 4 jobs (a) Before optimization, job1 execution time is [time2_][time3___][time4_], job2 execution time is [time2__][time3___][time4_], job3 execution time is[time2][time3___][time4_], job4 execution time is[time2__][time3___][time4_]. After optimization, job1 execution time is [time3___][time2_][time1__][time4_], job2 execution time is [time3___][time2__][time4_], job3 execution time is[time3___][time2_][time4_], job4 execution time is[time3___][time2__][time4_]. In summary, if the app has multiple jobs, average execution time after optimization is less than before and the cluster resources occupation period after optimization is less than before. was: We have two problems: (1) HadoopRDD.getPartitions is lazyied to process in DAGScheduler.JobSubmitted. If inputdir is large, getPartitions may spend much time. For example, in our cluster, it needs from 0.029s to 766.699s. If one JobSubmitted event is processing, others should wait. Thus, we want to put HadoopRDD.getPartitions forward to reduce DAGScheduler.JobSubmitted processing time. Then other JobSubmitted event don't need to wait much time. HadoopRDD object could get its partitons when it is instantiated. (2) When SparkContext object is instantiated, TaskScheduler is started and some resources are allocated from cluster. However, these resources may be not used for the moment. For example, DAGScheduler.JobSubmitted is processing and so on. These resources are wasted in this period. Thus, we want to put TaskScheduler.start back to shorten cluster resources occupation period specially for busy cluster. TaskScheduler could be started just before running stages. We could analyse and compare the execution time before and after optimization. TaskScheduler.start execution time: [time1__] DAGScheduler.JobSubmitted (excluding HadoopRDD.getPartitions or TaskScheduler.start) execution time: [time2_] HadoopRDD.getPartitions execution time: [time3___] Stages execution time: [time4_] (1) The app has only one job (a) The execution time of the job before optimization is [time1__][time2_][time3___][time4_]. The execution time of the job after optimization is[time3___][time2_][time1__][time4_]. (b) The cluster resources occupation period before optimization is [time2_][time3___][time4_]. The cluster resources occupation period after optimization is[time4_]. In summary, if the app has only one job, the total execution time is same before and after optimization while the cluster resources occupation period after optimization is less than before. (2) The app has 4 jobs (a) Before optimization, job1 execution time is
[jira] [Updated] (SPARK-3545) Put HadoopRDD.getPartitions forward and put TaskScheduler.start back in SparkContext to reduce DAGScheduler.JobSubmitted processing time and shorten cluster resources occ
[ https://issues.apache.org/jira/browse/SPARK-3545?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] YanTang Zhai updated SPARK-3545: Description: We have two problems: (1) HadoopRDD.getPartitions is lazyied to process in DAGScheduler.JobSubmitted. If inputdir is large, getPartitions may spend much time. For example, in our cluster, it needs from 0.029s to 766.699s. If one JobSubmitted event is processing, others should wait. Thus, we want to put HadoopRDD.getPartitions forward to reduce DAGScheduler.JobSubmitted processing time. Then other JobSubmitted event don't need to wait much time. HadoopRDD object could get its partitons when it is instantiated. (2) When SparkContext object is instantiated, TaskScheduler is started and some resources are allocated from cluster. However, these resources may be not used for the moment. For example, DAGScheduler.JobSubmitted is processing and so on. These resources are wasted in this period. Thus, we want to put TaskScheduler.start back to shorten cluster resources occupation period specially for busy cluster. TaskScheduler could be started just before running stages. We could analyse and compare the execution time before and after optimization. TaskScheduler.start execution time: [time1__] DAGScheduler.JobSubmitted (excluding HadoopRDD.getPartitions or TaskScheduler.start) execution time: [time2_] HadoopRDD.getPartitions execution time: [time3___] Stages execution time: [time4_] (1) The app has only one job (a) The execution time of the job before optimization is [time1__][time2_][time3___][time4_]. The execution time of the job after optimization is[time3___][time2_][time1__][time4_]. (b) The cluster resources occupation period before optimization is [time2_][time3___][time4_]. The cluster resources occupation period after optimization is[time4_]. In summary, if the app has only one job, the total execution time is same before and after optimization while the cluster resources occupation period after optimization is less than before. (2) The app has 4 jobs (a) Before optimization, job1 execution time is [time2_][time3___][time4_], job2 execution time is [time2][time3___][time4_], job3 execution time is[time2][time3___][time4_], job4 execution time is[time2___][time3___][time4_]. After optimization, job1 execution time is [time3___][time2_][time1__][time4_], job2 execution time is [time3___][time2___][time4_], job3 execution time is[time3___][time2_][time4_], job4 execution time is[time3___][time2__][time4_]. In summary, if the app has multiple jobs, average execution time after optimization is less than before and the cluster resources occupation period after optimization is less than before. was: We have two problems: (1) HadoopRDD.getPartitions is lazyied to process in DAGScheduler.JobSubmitted. If inputdir is large, getPartitions may spend much time. For example, in our cluster, it needs from 0.029s to 766.699s. If one JobSubmitted event is processing, others should wait. Thus, we want to put HadoopRDD.getPartitions forward to reduce DAGScheduler.JobSubmitted processing time. Then other JobSubmitted event don't need to wait much time. HadoopRDD object could get its partitons when it is instantiated. (2) When SparkContext object is instantiated, TaskScheduler is started and some resources are allocated from cluster. However, these resources may be not used for the moment. For example, DAGScheduler.JobSubmitted is processing and so on. These resources are wasted in this period. Thus, we want to put TaskScheduler.start back to shorten cluster resources occupation period specially for busy cluster. TaskScheduler could be started just before running stages. We could analyse and compare the execution time before and after optimization. TaskScheduler.start execution time: [time1__] DAGScheduler.JobSubmitted (excluding HadoopRDD.getPartitions or TaskScheduler.start) execution time: [time2_] HadoopRDD.getPartitions execution time: [time3___] Stages execution time: [time4_] (1) The app has only one job (a) The execution time of the job before optimization is [time1__][time2_][time3___][time4_]. The execution time of the job after optimization is {{ }} [time3___][time2_][time1__][time4_]. (b) The cluster resources occupation period before optimization is [time2_][time3___][time4_]. The cluster resources occupation period after optimization is [time4_]. In summary, if the app has only one job, the total execution time is same before and after optimization while the cluster resources occupation period after optimization is less than before. (2) The app has 4 jobs (a) Before optimization,
[jira] [Updated] (SPARK-3545) Put HadoopRDD.getPartitions forward and put TaskScheduler.start back in SparkContext to reduce DAGScheduler.JobSubmitted processing time and shorten cluster resources occ
[ https://issues.apache.org/jira/browse/SPARK-3545?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] YanTang Zhai updated SPARK-3545: Description: We have two problems: (1) HadoopRDD.getPartitions is lazyied to process in DAGScheduler.JobSubmitted. If inputdir is large, getPartitions may spend much time. For example, in our cluster, it needs from 0.029s to 766.699s. If one JobSubmitted event is processing, others should wait. Thus, we want to put HadoopRDD.getPartitions forward to reduce DAGScheduler.JobSubmitted processing time. Then other JobSubmitted event don't need to wait much time. HadoopRDD object could get its partitons when it is instantiated. (2) When SparkContext object is instantiated, TaskScheduler is started and some resources are allocated from cluster. However, these resources may be not used for the moment. For example, DAGScheduler.JobSubmitted is processing and so on. These resources are wasted in this period. Thus, we want to put TaskScheduler.start back to shorten cluster resources occupation period specially for busy cluster. TaskScheduler could be started just before running stages. We could analyse and compare the execution time before and after optimization. TaskScheduler.start execution time: [time1__] DAGScheduler.JobSubmitted (excluding HadoopRDD.getPartitions or TaskScheduler.start) execution time: [time2_] HadoopRDD.getPartitions execution time: [time3___] Stages execution time: [time4_] (1) The app has only one job (a) The execution time of the job before optimization is [time1__][time2_][time3___][time4_]. The execution time of the job after optimization is {{ }} [time3___][time2_][time1__][time4_]. (b) The cluster resources occupation period before optimization is [time2_][time3___][time4_]. The cluster resources occupation period after optimization is [time4_]. In summary, if the app has only one job, the total execution time is same before and after optimization while the cluster resources occupation period after optimization is less than before. (2) The app has 4 jobs (a) Before optimization, job1 execution time is [time2_][time3___][time4_], job2 execution time is [time2][time3___][time4_], job3 execution time is [time2][time3___][time4_], job4 execution time is [time2___][time3___][time4_]. After optimization, job1 execution time is [time3___][time2_][time1__][time4_], job2 execution time is [time3___][time2___][time4_], job3 execution time is [time3___][time2_][time4_], job4 execution time is [time3___][time2__][time4_]. In summary, if the app has multiple jobs, average execution time after optimization is less than before and the cluster resources occupation period after optimization is less than before. was: We have two problems: (1) HadoopRDD.getPartitions is lazyied to process in DAGScheduler.JobSubmitted. If inputdir is large, getPartitions may spend much time. For example, in our cluster, it needs from 0.029s to 766.699s. If one JobSubmitted event is processing, others should wait. Thus, we want to put HadoopRDD.getPartitions forward to reduce DAGScheduler.JobSubmitted processing time. Then other JobSubmitted event don't need to wait much time. HadoopRDD object could get its partitons when it is instantiated. (2) When SparkContext object is instantiated, TaskScheduler is started and some resources are allocated from cluster. However, these resources may be not used for the moment. For example, DAGScheduler.JobSubmitted is processing and so on. These resources are wasted in this period. Thus, we want to put TaskScheduler.start back to shorten cluster resources occupation period specially for busy cluster. TaskScheduler could be started just before running stages. We could analyse and compare the execution time before and after optimization. TaskScheduler.start execution time: [time1__] DAGScheduler.JobSubmitted (excluding HadoopRDD.getPartitions or TaskScheduler.start) execution time: [time2_] HadoopRDD.getPartitions execution time: [time3___] Stages execution time: [time4_] (1) The app has only one job (a) The execution time of the job before optimization is [time1__][time2_][time3___][time4_]. The execution time of the job after optimization is{{ }}[time3___][time2_][time1__][time4_]. (b) The cluster resources occupation period before optimization is [time2_][time3___][time4_]. The cluster resources occupation period after optimization is [time4_]. In summary, if the app has only one job, the total execution time is same before and after optimization while the cluster resources occupation period after optimization is less than before. (2) The app has 4 jobs (a) Before optimization,
[jira] [Updated] (SPARK-3545) Put HadoopRDD.getPartitions forward and put TaskScheduler.start back in SparkContext to reduce DAGScheduler.JobSubmitted processing time and shorten cluster resources occ
[ https://issues.apache.org/jira/browse/SPARK-3545?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] YanTang Zhai updated SPARK-3545: Description: We have two problems: (1) HadoopRDD.getPartitions is lazyied to process in DAGScheduler.JobSubmitted. If inputdir is large, getPartitions may spend much time. For example, in our cluster, it needs from 0.029s to 766.699s. If one JobSubmitted event is processing, others should wait. Thus, we want to put HadoopRDD.getPartitions forward to reduce DAGScheduler.JobSubmitted processing time. Then other JobSubmitted event don't need to wait much time. HadoopRDD object could get its partitons when it is instantiated. (2) When SparkContext object is instantiated, TaskScheduler is started and some resources are allocated from cluster. However, these resources may be not used for the moment. For example, DAGScheduler.JobSubmitted is processing and so on. These resources are wasted in this period. Thus, we want to put TaskScheduler.start back to shorten cluster resources occupation period specially for busy cluster. TaskScheduler could be started just before running stages. We could analyse and compare the execution time before and after optimization. TaskScheduler.start execution time: [time1__] DAGScheduler.JobSubmitted (excluding HadoopRDD.getPartitions or TaskScheduler.start) execution time: [time2_] HadoopRDD.getPartitions execution time: [time3___] Stages execution time: [time4_] (1) The app has only one job (a) The execution time of the job before optimization is [time1__][time2_][time3___][time4_]. The execution time of the job after optimization is{{ }}[time3___][time2_][time1__][time4_]. (b) The cluster resources occupation period before optimization is [time2_][time3___][time4_]. The cluster resources occupation period after optimization is [time4_]. In summary, if the app has only one job, the total execution time is same before and after optimization while the cluster resources occupation period after optimization is less than before. (2) The app has 4 jobs (a) Before optimization, job1 execution time is [time2_][time3___][time4_], job2 execution time is [time2][time3___][time4_], job3 execution time is [time2][time3___][time4_], job4 execution time is [time2___][time3___][time4_]. After optimization, job1 execution time is [time3___][time2_][time1__][time4_], job2 execution time is [time3___][time2___][time4_], job3 execution time is [time3___][time2_][time4_], job4 execution time is [time3___][time2__][time4_]. In summary, if the app has multiple jobs, average execution time after optimization is less than before and the cluster resources occupation period after optimization is less than before. was: We have two problems: (1) HadoopRDD.getPartitions is lazyied to process in DAGScheduler.JobSubmitted. If inputdir is large, getPartitions may spend much time. For example, in our cluster, it needs from 0.029s to 766.699s. If one JobSubmitted event is processing, others should wait. Thus, we want to put HadoopRDD.getPartitions forward to reduce DAGScheduler.JobSubmitted processing time. Then other JobSubmitted event don't need to wait much time. HadoopRDD object could get its partitons when it is instantiated. (2) When SparkContext object is instantiated, TaskScheduler is started and some resources are allocated from cluster. However, these resources may be not used for the moment. For example, DAGScheduler.JobSubmitted is processing and so on. These resources are wasted in this period. Thus, we want to put TaskScheduler.start back to shorten cluster resources occupation period specially for busy cluster. TaskScheduler could be started just before running stages. We could analyse and compare the execution time before and after optimization. TaskScheduler.start execution time: [time1__] DAGScheduler.JobSubmitted (excluding HadoopRDD.getPartitions or TaskScheduler.start) execution time: [time2_] HadoopRDD.getPartitions execution time: [time3___] Stages execution time: [time4_] (1) The app has only one job (a) The execution time of the job before optimization is [time1__][time2_][time3___][time4_]. The execution time of the job after optimization is{{monospaced}}{{monospaced}}{{monospaced}}{{monospaced}}[time3___][time2_][time1__][time4_]. (b) The cluster resources occupation period before optimization is [time2_][time3___][time4_]. The cluster resources occupation period after optimization is [time4_]. In summary, if the app has only one job, the total execution time is same before and after optimization while the cluster resources occupation period after optimization is less than before.
[jira] [Updated] (SPARK-3545) Put HadoopRDD.getPartitions forward and put TaskScheduler.start back in SparkContext to reduce DAGScheduler.JobSubmitted processing time and shorten cluster resources occ
[ https://issues.apache.org/jira/browse/SPARK-3545?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] YanTang Zhai updated SPARK-3545: Description: We have two problems: (1) HadoopRDD.getPartitions is lazyied to process in DAGScheduler.JobSubmitted. If inputdir is large, getPartitions may spend much time. For example, in our cluster, it needs from 0.029s to 766.699s. If one JobSubmitted event is processing, others should wait. Thus, we want to put HadoopRDD.getPartitions forward to reduce DAGScheduler.JobSubmitted processing time. Then other JobSubmitted event don't need to wait much time. HadoopRDD object could get its partitons when it is instantiated. (2) When SparkContext object is instantiated, TaskScheduler is started and some resources are allocated from cluster. However, these resources may be not used for the moment. For example, DAGScheduler.JobSubmitted is processing and so on. These resources are wasted in this period. Thus, we want to put TaskScheduler.start back to shorten cluster resources occupation period specially for busy cluster. TaskScheduler could be started just before running stages. We could analyse and compare the execution time before and after optimization. TaskScheduler.start execution time: [time1__] DAGScheduler.JobSubmitted (excluding HadoopRDD.getPartitions or TaskScheduler.start) execution time: [time2_] HadoopRDD.getPartitions execution time: [time3___] Stages execution time: [time4_] (1) The app has only one job (a) The execution time of the job before optimization is [time1__][time2_][time3___][time4_]. The execution time of the job after optimization is{{monospaced}}{{monospaced}}{{monospaced}}{{monospaced}}[time3___][time2_][time1__][time4_]. (b) The cluster resources occupation period before optimization is [time2_][time3___][time4_]. The cluster resources occupation period after optimization is [time4_]. In summary, if the app has only one job, the total execution time is same before and after optimization while the cluster resources occupation period after optimization is less than before. (2) The app has 4 jobs (a) Before optimization, job1 execution time is [time2_][time3___][time4_], job2 execution time is [time2][time3___][time4_], job3 execution time is [time2][time3___][time4_], job4 execution time is [time2___][time3___][time4_]. After optimization, job1 execution time is [time3___][time2_][time1__][time4_], job2 execution time is [time3___][time2___][time4_], job3 execution time is [time3___][time2_][time4_], job4 execution time is [time3___][time2__][time4_]. In summary, if the app has multiple jobs, average execution time after optimization is less than before and the cluster resources occupation period after optimization is less than before. was: We have two problems: (1) HadoopRDD.getPartitions is lazyied to process in DAGScheduler.JobSubmitted. If inputdir is large, getPartitions may spend much time. For example, in our cluster, it needs from 0.029s to 766.699s. If one JobSubmitted event is processing, others should wait. Thus, we want to put HadoopRDD.getPartitions forward to reduce DAGScheduler.JobSubmitted processing time. Then other JobSubmitted event don't need to wait much time. HadoopRDD object could get its partitons when it is instantiated. (2) When SparkContext object is instantiated, TaskScheduler is started and some resources are allocated from cluster. However, these resources may be not used for the moment. For example, DAGScheduler.JobSubmitted is processing and so on. These resources are wasted in this period. Thus, we want to put TaskScheduler.start back to shorten cluster resources occupation period specially for busy cluster. TaskScheduler could be started just before running stages. We could analyse and compare the execution time before and after optimization. TaskScheduler.start execution time: [time1__] DAGScheduler.JobSubmitted (excluding HadoopRDD.getPartitions or TaskScheduler.start) execution time: [time2_] HadoopRDD.getPartitions execution time: [time3___] Stages execution time: [time4_] (1) The app has only one job (a) The execution time of the job before optimization is [time1__][time2_][time3___][time4_]. The execution time of the job after optimization is [time3___][time2_][time1__][time4_]. (b) The cluster resources occupation period before optimization is [time2_][time3___][time4_]. The cluster resources occupation period after optimization is [time4_]. In summary, if the app has only one job, the total execution time is same before and after optimization while the cluster resources occupation period after optimization is less than before.
[jira] [Updated] (SPARK-3545) Put HadoopRDD.getPartitions forward and put TaskScheduler.start back in SparkContext to reduce DAGScheduler.JobSubmitted processing time and shorten cluster resources occ
[ https://issues.apache.org/jira/browse/SPARK-3545?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] YanTang Zhai updated SPARK-3545: Description: We have two problems: (1) HadoopRDD.getPartitions is lazyied to process in DAGScheduler.JobSubmitted. If inputdir is large, getPartitions may spend much time. For example, in our cluster, it needs from 0.029s to 766.699s. If one JobSubmitted event is processing, others should wait. Thus, we want to put HadoopRDD.getPartitions forward to reduce DAGScheduler.JobSubmitted processing time. Then other JobSubmitted event don't need to wait much time. HadoopRDD object could get its partitons when it is instantiated. (2) When SparkContext object is instantiated, TaskScheduler is started and some resources are allocated from cluster. However, these resources may be not used for the moment. For example, DAGScheduler.JobSubmitted is processing and so on. These resources are wasted in this period. Thus, we want to put TaskScheduler.start back to shorten cluster resources occupation period specially for busy cluster. TaskScheduler could be started just before running stages. We could analyse and compare the execution time before and after optimization. TaskScheduler.start execution time: [time1__] DAGScheduler.JobSubmitted (excluding HadoopRDD.getPartitions or TaskScheduler.start) execution time: [time2_] HadoopRDD.getPartitions execution time: [time3___] Stages execution time: [time4_] (1) The app has only one job (a) The execution time of the job before optimization is [time1__][time2_][time3___][time4_]. The execution time of the job after optimization is [time3___][time2_][time1__][time4_]. (b) The cluster resources occupation period before optimization is [time2_][time3___][time4_]. The cluster resources occupation period after optimization is [time4_]. In summary, if the app has only one job, the total execution time is same before and after optimization while the cluster resources occupation period after optimization is less than before. (2) The app has 4 jobs (a) Before optimization, job1 execution time is [time2_][time3___][time4_], job2 execution time is [time2][time3___][time4_], job3 execution time is [time2][time3___][time4_], job4 execution time is [time2___][time3___][time4_]. After optimization, job1 execution time is [time3___][time2_][time1__][time4_], job2 execution time is [time3___][time2___][time4_], job3 execution time is [time3___][time2_][time4_], job4 execution time is [time3___][time2__][time4_]. In summary, if the app has multiple jobs, average execution time after optimization is less than before and the cluster resources occupation period after optimization is less than before. was: We have two problems: (1) HadoopRDD.getPartitions is lazyied to process in DAGScheduler.JobSubmitted. If inputdir is large, getPartitions may spend much time. For example, in our cluster, it needs from 0.029s to 766.699s. If one JobSubmitted event is processing, others should wait. Thus, we want to put HadoopRDD.getPartitions forward to reduce DAGScheduler.JobSubmitted processing time. Then other JobSubmitted event don't need to wait much time. HadoopRDD object could get its partitons when it is instantiated. (2) When SparkContext object is instantiated, TaskScheduler is started and some resources are allocated from cluster. However, these resources may be not used for the moment. For example, DAGScheduler.JobSubmitted is processing and so on. These resources are wasted in this period. Thus, we want to put TaskScheduler.start back to shorten cluster resources occupation period specially for busy cluster. TaskScheduler could be started just before running stages. We could analyse and compare the execution time before and after optimization. TaskScheduler.start execution time: [time1__] DAGScheduler.JobSubmitted (excluding HadoopRDD.getPartitions or TaskScheduler.start) execution time: [time2_] HadoopRDD.getPartitions execution time: [time3___] Stages execution time: [time4_] (1) The app has only one job (a) The execution time of the job before optimization is [time1__][time2_][time3___][time4_]. The execution time of the job after optimization is [time3___][time2_][time1__][time4_]. (b) The cluster resources occupation period before optimization is [time2_][time3___][time4_]. The cluster resources occupation period after optimization is [time4_]. In summary, if the app has only one job, the total execution time is same before and after optimization while the cluster resources occupation period after optimization is less than before. (2) The app has 4 jobs (a) Before optimization,
[jira] [Created] (SPARK-3545) Put HadoopRDD.getPartitions forward and put TaskScheduler.start back in SparkContext to reduce DAGScheduler.JobSubmitted processing time and shorten cluster resources occ
YanTang Zhai created SPARK-3545: --- Summary: Put HadoopRDD.getPartitions forward and put TaskScheduler.start back in SparkContext to reduce DAGScheduler.JobSubmitted processing time and shorten cluster resources occupation period Key: SPARK-3545 URL: https://issues.apache.org/jira/browse/SPARK-3545 Project: Spark Issue Type: Improvement Reporter: YanTang Zhai Priority: Minor We have two problems: (1) HadoopRDD.getPartitions is lazyied to process in DAGScheduler.JobSubmitted. If inputdir is large, getPartitions may spend much time. For example, in our cluster, it needs from 0.029s to 766.699s. If one JobSubmitted event is processing, others should wait. Thus, we want to put HadoopRDD.getPartitions forward to reduce DAGScheduler.JobSubmitted processing time. Then other JobSubmitted event don't need to wait much time. HadoopRDD object could get its partitons when it is instantiated. (2) When SparkContext object is instantiated, TaskScheduler is started and some resources are allocated from cluster. However, these resources may be not used for the moment. For example, DAGScheduler.JobSubmitted is processing and so on. These resources are wasted in this period. Thus, we want to put TaskScheduler.start back to shorten cluster resources occupation period specially for busy cluster. TaskScheduler could be started just before running stages. We could analyse and compare the execution time before and after optimization. TaskScheduler.start execution time: [time1__] DAGScheduler.JobSubmitted (excluding HadoopRDD.getPartitions or TaskScheduler.start) execution time: [time2_] HadoopRDD.getPartitions execution time: [time3___] Stages execution time: [time4_] (1) The app has only one job (a) The execution time of the job before optimization is [time1__][time2_][time3___][time4_]. The execution time of the job after optimization is [time3___][time2_][time1__][time4_]. (b) The cluster resources occupation period before optimization is [time2_][time3___][time4_]. The cluster resources occupation period after optimization is [time4_]. In summary, if the app has only one job, the total execution time is same before and after optimization while the cluster resources occupation period after optimization is less than before. (2) The app has 4 jobs (a) Before optimization, job1 execution time is [time2_][time3___][time4_], job2 execution time is [time2][time3___][time4_], job3 execution time is [time2][time3___][time4_], job4 execution time is [time2___][time3___][time4_]. After optimization, job1 execution time is [time3___][time2_][time1__][time4_], job2 execution time is [time3___][time2___][time4_], job3 execution time is [time3___][time2_][time4_], job4 execution time is [time3___][time2__][time4_]. In summary, if the app has multiple jobs, average execution time after optimization is less than before and the cluster resources occupation period after optimization is less than before. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-3148) Update global variables of HttpBroadcast so that multiple SparkContexts can coexist
YanTang Zhai created SPARK-3148: --- Summary: Update global variables of HttpBroadcast so that multiple SparkContexts can coexist Key: SPARK-3148 URL: https://issues.apache.org/jira/browse/SPARK-3148 Project: Spark Issue Type: Improvement Components: Spark Core Reporter: YanTang Zhai Priority: Minor Update global variables of HttpBroadcast so that multiple SparkContexts can coexist -- This message was sent by Atlassian JIRA (v6.2#6252) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-3067) JobProgressPage could not show Fair Scheduler Pools section sometimes
YanTang Zhai created SPARK-3067: --- Summary: JobProgressPage could not show Fair Scheduler Pools section sometimes Key: SPARK-3067 URL: https://issues.apache.org/jira/browse/SPARK-3067 Project: Spark Issue Type: Bug Components: Spark Core Reporter: YanTang Zhai Priority: Minor JobProgressPage could not show Fair Scheduler Pools section sometimes. SparkContext starts webui and then postEnvironmentUpdate. Sometimes JobProgressPage is accessed between webui starting and postEnvironmentUpdate, then the lazy val isFairScheduler will be false. The Fair Scheduler Pools section will not display any more. -- This message was sent by Atlassian JIRA (v6.2#6252) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-3003) FailedStage could not be cancelled by DAGScheduler when cancelJob or cancelStage
YanTang Zhai created SPARK-3003: --- Summary: FailedStage could not be cancelled by DAGScheduler when cancelJob or cancelStage Key: SPARK-3003 URL: https://issues.apache.org/jira/browse/SPARK-3003 Project: Spark Issue Type: Bug Components: Spark Core Reporter: YanTang Zhai Priority: Minor Some stage is changed from running to failed, then DAGSCheduler could not cancel it when cancelJob or cancelStage. Since in failJobAndIndependentStages, DAGSCheduler will only cancel runningStage and post SparkListenerStageCompleted for it. -- This message was sent by Atlassian JIRA (v6.2#6252) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-2643) Stages web ui has ERROR when pool name is None
[ https://issues.apache.org/jira/browse/SPARK-2643?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] YanTang Zhai updated SPARK-2643: Description: 14/07/23 16:01:44 WARN servlet.ServletHandler: /stages/ java.util.NoSuchElementException: None.get at scala.None$.get(Option.scala:313) at scala.None$.get(Option.scala:311) at org.apache.spark.ui.jobs.StageTableBase.stageRow(StageTable.scala:132) at org.apache.spark.ui.jobs.StageTableBase.org$apache$spark$ui$jobs$StageTableBase$$renderStageRow(StageTable.scala:150) at org.apache.spark.ui.jobs.StageTableBase$$anonfun$toNodeSeq$1.apply(StageTable.scala:52) at org.apache.spark.ui.jobs.StageTableBase$$anonfun$toNodeSeq$1.apply(StageTable.scala:52) at org.apache.spark.ui.jobs.StageTableBase$$anonfun$stageTable$1.apply(StageTable.scala:61) at org.apache.spark.ui.jobs.StageTableBase$$anonfun$stageTable$1.apply(StageTable.scala:61) at scala.collection.immutable.Stream$$anonfun$map$1.apply(Stream.scala:376) at scala.collection.immutable.Stream$$anonfun$map$1.apply(Stream.scala:376) at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1085) at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1077) at scala.collection.immutable.StreamIterator$$anonfun$next$1.apply(Stream.scala:980) at scala.collection.immutable.StreamIterator$$anonfun$next$1.apply(Stream.scala:980) at scala.collection.immutable.StreamIterator$LazyCell.v$lzycompute(Stream.scala:969) at scala.collection.immutable.StreamIterator$LazyCell.v(Stream.scala:969) at scala.collection.immutable.StreamIterator.hasNext(Stream.scala:974) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.xml.NodeBuffer.$amp$plus(NodeBuffer.scala:38) at scala.xml.NodeBuffer.$amp$plus(NodeBuffer.scala:40) at org.apache.spark.ui.jobs.StageTableBase.stageTable(StageTable.scala:60) at org.apache.spark.ui.jobs.StageTableBase.toNodeSeq(StageTable.scala:52) at org.apache.spark.ui.jobs.JobProgressPage.render(JobProgressPage.scala:91) at org.apache.spark.ui.WebUI$$anonfun$attachPage$1.apply(WebUI.scala:65) at org.apache.spark.ui.WebUI$$anonfun$attachPage$1.apply(WebUI.scala:65) at org.apache.spark.ui.JettyUtils$$anon$1.doGet(JettyUtils.scala:70) at javax.servlet.http.HttpServlet.service(HttpServlet.java:707) at javax.servlet.http.HttpServlet.service(HttpServlet.java:820) at org.eclipse.jetty.servlet.ServletHolder.handle(ServletHolder.java:684) at org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:501) at org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1086) at org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:428) at org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1020) at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:135) at org.eclipse.jetty.server.handler.ContextHandlerCollection.handle(ContextHandlerCollection.java:255) at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:116) at org.eclipse.jetty.server.Server.handle(Server.java:370) at org.eclipse.jetty.server.AbstractHttpConnection.handleRequest(AbstractHttpConnection.java:494) at org.eclipse.jetty.server.AbstractHttpConnection.headerComplete(AbstractHttpConnection.java:971) at org.eclipse.jetty.server.AbstractHttpConnection$RequestHandler.headerComplete(AbstractHttpConnection.java:1033) at org.eclipse.jetty.http.HttpParser.parseNext(HttpParser.java:644) at org.eclipse.jetty.http.HttpParser.parseAvailable(HttpParser.java:235) at org.eclipse.jetty.server.AsyncHttpConnection.handle(AsyncHttpConnection.java:82) at org.eclipse.jetty.io.nio.SelectChannelEndPoint.handle(SelectChannelEndPoint.java:667) at org.eclipse.jetty.io.nio.SelectChannelEndPoint$1.run(SelectChannelEndPoint.java:52) at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:608) at org.eclipse.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:543) at java.lang.Thread.run(Thread.java:744) 14/07/23 16:01:44 WARN server.AbstractHttpConnection: /stages/ java.lang.NoSuchMethodError: javax.servlet.http.HttpServletRequest.isAsyncStarted()Z at org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:583) at org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1086) at org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:428) at org.eclipse.jetty.server.handler.ContextHandler.doScope(Cont
[jira] [Updated] (SPARK-2643) Stages web ui has ERROR when pool name is None
[ https://issues.apache.org/jira/browse/SPARK-2643?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] YanTang Zhai updated SPARK-2643: Description: 14/07/23 16:01:44 WARN servlet.ServletHandler: /stages/ java.util.NoSuchElementException: None.get at scala.None$.get(Option.scala:313) at scala.None$.get(Option.scala:311) at org.apache.spark.ui.jobs.StageTableBase.stageRow(StageTable.scala:132) at org.apache.spark.ui.jobs.StageTableBase.org$apache$spark$ui$jobs$StageTableBase$$renderStageRow(StageTable.scala:150) at org.apache.spark.ui.jobs.StageTableBase$$anonfun$toNodeSeq$1.apply(StageTable.scala:52) at org.apache.spark.ui.jobs.StageTableBase$$anonfun$toNodeSeq$1.apply(StageTable.scala:52) at org.apache.spark.ui.jobs.StageTableBase$$anonfun$stageTable$1.apply(StageTable.scala:61) at org.apache.spark.ui.jobs.StageTableBase$$anonfun$stageTable$1.apply(StageTable.scala:61) at scala.collection.immutable.Stream$$anonfun$map$1.apply(Stream.scala:376) at scala.collection.immutable.Stream$$anonfun$map$1.apply(Stream.scala:376) at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1085) at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1077) at scala.collection.immutable.StreamIterator$$anonfun$next$1.apply(Stream.scala:980) at scala.collection.immutable.StreamIterator$$anonfun$next$1.apply(Stream.scala:980) at scala.collection.immutable.StreamIterator$LazyCell.v$lzycompute(Stream.scala:969) at scala.collection.immutable.StreamIterator$LazyCell.v(Stream.scala:969) at scala.collection.immutable.StreamIterator.hasNext(Stream.scala:974) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.xml.NodeBuffer.$amp$plus(NodeBuffer.scala:38) at scala.xml.NodeBuffer.$amp$plus(NodeBuffer.scala:40) at org.apache.spark.ui.jobs.StageTableBase.stageTable(StageTable.scala:60) at org.apache.spark.ui.jobs.StageTableBase.toNodeSeq(StageTable.scala:52) at org.apache.spark.ui.jobs.JobProgressPage.render(JobProgressPage.scala:91) at org.apache.spark.ui.WebUI$$anonfun$attachPage$1.apply(WebUI.scala:65) at org.apache.spark.ui.WebUI$$anonfun$attachPage$1.apply(WebUI.scala:65) at org.apache.spark.ui.JettyUtils$$anon$1.doGet(JettyUtils.scala:70) at javax.servlet.http.HttpServlet.service(HttpServlet.java:707) at javax.servlet.http.HttpServlet.service(HttpServlet.java:820) at org.eclipse.jetty.servlet.ServletHolder.handle(ServletHolder.java:684) at org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:501) at org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1086) at org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:428) at org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1020) at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:135) at org.eclipse.jetty.server.handler.ContextHandlerCollection.handle(ContextHandlerCollection.java:255) at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:116) at org.eclipse.jetty.server.Server.handle(Server.java:370) at org.eclipse.jetty.server.AbstractHttpConnection.handleRequest(AbstractHttpConnection.java:494) at org.eclipse.jetty.server.AbstractHttpConnection.headerComplete(AbstractHttpConnection.java:971) at org.eclipse.jetty.server.AbstractHttpConnection$RequestHandler.headerComplete(AbstractHttpConnection.java:1033) at org.eclipse.jetty.http.HttpParser.parseNext(HttpParser.java:644) at org.eclipse.jetty.http.HttpParser.parseAvailable(HttpParser.java:235) at org.eclipse.jetty.server.AsyncHttpConnection.handle(AsyncHttpConnection.java:82) at org.eclipse.jetty.io.nio.SelectChannelEndPoint.handle(SelectChannelEndPoint.java:667) at org.eclipse.jetty.io.nio.SelectChannelEndPoint$1.run(SelectChannelEndPoint.java:52) at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:608) at org.eclipse.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:543) at java.lang.Thread.run(Thread.java:744) 14/07/23 16:01:44 WARN server.AbstractHttpConnection: /stages/ java.lang.NoSuchMethodError: javax.servlet.http.HttpServletRequest.isAsyncStarted()Z at org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:583) at org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1086) at org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:428) at org.eclipse.jetty.server.handler.ContextHandler.doScope(Cont
[jira] [Updated] (SPARK-2643) Stages web ui has ERROR when pool name is None
[ https://issues.apache.org/jira/browse/SPARK-2643?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] YanTang Zhai updated SPARK-2643: Description: 14/07/23 16:01:44 WARN servlet.ServletHandler: /stages/ java.util.NoSuchElementException: None.get at scala.None$.get(Option.scala:313) at scala.None$.get(Option.scala:311) at org.apache.spark.ui.jobs.StageTableBase.stageRow(StageTable.scala:132) at org.apache.spark.ui.jobs.StageTableBase.org$apache$spark$ui$jobs$StageTableBase$$renderStageRow(StageTable.scala:150) at org.apache.spark.ui.jobs.StageTableBase$$anonfun$toNodeSeq$1.apply(StageTable.scala:52) at org.apache.spark.ui.jobs.StageTableBase$$anonfun$toNodeSeq$1.apply(StageTable.scala:52) at org.apache.spark.ui.jobs.StageTableBase$$anonfun$stageTable$1.apply(StageTable.scala:61) at org.apache.spark.ui.jobs.StageTableBase$$anonfun$stageTable$1.apply(StageTable.scala:61) at scala.collection.immutable.Stream$$anonfun$map$1.apply(Stream.scala:376) at scala.collection.immutable.Stream$$anonfun$map$1.apply(Stream.scala:376) at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1085) at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1077) at scala.collection.immutable.StreamIterator$$anonfun$next$1.apply(Stream.scala:980) at scala.collection.immutable.StreamIterator$$anonfun$next$1.apply(Stream.scala:980) at scala.collection.immutable.StreamIterator$LazyCell.v$lzycompute(Stream.scala:969) at scala.collection.immutable.StreamIterator$LazyCell.v(Stream.scala:969) at scala.collection.immutable.StreamIterator.hasNext(Stream.scala:974) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.xml.NodeBuffer.$amp$plus(NodeBuffer.scala:38) at scala.xml.NodeBuffer.$amp$plus(NodeBuffer.scala:40) at org.apache.spark.ui.jobs.StageTableBase.stageTable(StageTable.scala:60) at org.apache.spark.ui.jobs.StageTableBase.toNodeSeq(StageTable.scala:52) at org.apache.spark.ui.jobs.JobProgressPage.render(JobProgressPage.scala:91) at org.apache.spark.ui.WebUI$$anonfun$attachPage$1.apply(WebUI.scala:65) at org.apache.spark.ui.WebUI$$anonfun$attachPage$1.apply(WebUI.scala:65) at org.apache.spark.ui.JettyUtils$$anon$1.doGet(JettyUtils.scala:70) at javax.servlet.http.HttpServlet.service(HttpServlet.java:707) at javax.servlet.http.HttpServlet.service(HttpServlet.java:820) at org.eclipse.jetty.servlet.ServletHolder.handle(ServletHolder.java:684) at org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:501) at org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1086) at org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:428) at org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1020) at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:135) at org.eclipse.jetty.server.handler.ContextHandlerCollection.handle(ContextHandlerCollection.java:255) at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:116) at org.eclipse.jetty.server.Server.handle(Server.java:370) at org.eclipse.jetty.server.AbstractHttpConnection.handleRequest(AbstractHttpConnection.java:494) at org.eclipse.jetty.server.AbstractHttpConnection.headerComplete(AbstractHttpConnection.java:971) at org.eclipse.jetty.server.AbstractHttpConnection$RequestHandler.headerComplete(AbstractHttpConnection.java:1033) at org.eclipse.jetty.http.HttpParser.parseNext(HttpParser.java:644) at org.eclipse.jetty.http.HttpParser.parseAvailable(HttpParser.java:235) at org.eclipse.jetty.server.AsyncHttpConnection.handle(AsyncHttpConnection.java:82) at org.eclipse.jetty.io.nio.SelectChannelEndPoint.handle(SelectChannelEndPoint.java:667) at org.eclipse.jetty.io.nio.SelectChannelEndPoint$1.run(SelectChannelEndPoint.java:52) at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:608) at org.eclipse.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:543) at java.lang.Thread.run(Thread.java:744) 14/07/23 16:01:44 WARN server.AbstractHttpConnection: /stages/ java.lang.NoSuchMethodError: javax.servlet.http.HttpServletRequest.isAsyncStarted()Z at org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:583) at org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1086) at org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:428) at org.eclipse.jetty.server.handler.ContextHandler.doScope(Cont
[jira] [Created] (SPARK-2715) ExternalAppendOnlyMap adds max limit of times and max limit of disk bytes written for spilling
YanTang Zhai created SPARK-2715: --- Summary: ExternalAppendOnlyMap adds max limit of times and max limit of disk bytes written for spilling Key: SPARK-2715 URL: https://issues.apache.org/jira/browse/SPARK-2715 Project: Spark Issue Type: Improvement Components: Spark Core Reporter: YanTang Zhai Priority: Minor ExternalAppendOnlyMap adds max limit of times and max limit of disk bytes written for spilling. Therefore, some task could be let fail fast instead of running for a long time if it has data skew. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Created] (SPARK-2714) DAGScheduler logs jobid when runJob finishes
YanTang Zhai created SPARK-2714: --- Summary: DAGScheduler logs jobid when runJob finishes Key: SPARK-2714 URL: https://issues.apache.org/jira/browse/SPARK-2714 Project: Spark Issue Type: Documentation Components: Spark Core Reporter: YanTang Zhai Priority: Minor DAGScheduler logs jobid when runJob finishes -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (SPARK-2647) DAGScheduler plugs others when processing one JobSubmitted event
[ https://issues.apache.org/jira/browse/SPARK-2647?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14071755#comment-14071755 ] YanTang Zhai commented on SPARK-2647: - I've created PR: https://github.com/apache/spark/pull/1548. Please help to review. Thanks. > DAGScheduler plugs others when processing one JobSubmitted event > > > Key: SPARK-2647 > URL: https://issues.apache.org/jira/browse/SPARK-2647 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Reporter: YanTang Zhai > > If a few of jobs are submitted, DAGScheduler plugs others when processing one > JobSubmitted event. > For example ont JobSubmitted event is processed as follows and costs much time > "spark-akka.actor.default-dispatcher-67" daemon prio=10 > tid=0x7f75ec001000 nid=0x7dd6 in Object.wait() [0x7f76063e1000] >java.lang.Thread.State: WAITING (on object monitor) > at java.lang.Object.wait(Native Method) > at java.lang.Object.wait(Object.java:503) > at org.apache.hadoopcdh3.ipc.Client.call(Client.java:1130) > - locked <0x000783b17330> (a org.apache.hadoopcdh3.ipc.Client$Call) > at org.apache.hadoopcdh3.ipc.RPC$Invoker.invoke(RPC.java:241) > at com.sun.proxy.$Proxy11.getBlockLocations(Unknown Source) > at sun.reflect.GeneratedMethodAccessor86.invoke(Unknown Source) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:606) > at > org.apache.hadoopcdh3.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:83) > at > org.apache.hadoopcdh3.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:60) > at com.sun.proxy.$Proxy11.getBlockLocations(Unknown Source) > at > org.apache.hadoopcdh3.hdfs.DFSClient.callGetBlockLocations(DFSClient.java:1472) > at > org.apache.hadoopcdh3.hdfs.DFSClient.getBlockLocations(DFSClient.java:1498) > at > org.apache.hadoopcdh3.hdfs.Cdh3DistributedFileSystem$1.doCall(Cdh3DistributedFileSystem.java:208) > at > org.apache.hadoopcdh3.hdfs.Cdh3DistributedFileSystem$1.doCall(Cdh3DistributedFileSystem.java:204) > at > org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) > at > org.apache.hadoopcdh3.hdfs.Cdh3DistributedFileSystem.getFileBlockLocations(Cdh3DistributedFileSystem.java:204) > at org.apache.hadoop.fs.FileSystem$4.next(FileSystem.java:1812) > at org.apache.hadoop.fs.FileSystem$4.next(FileSystem.java:1797) > at > org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:233) > at > StorageEngineClient.CombineFileInputFormat.getSplits(CombineFileInputFormat.java:141) > at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:172) > at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204) > at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202) > at scala.Option.getOrElse(Option.scala:120) > at org.apache.spark.rdd.RDD.partitions(RDD.scala:202) > at org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28) > at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204) > at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202) > at scala.Option.getOrElse(Option.scala:120) > at org.apache.spark.rdd.RDD.partitions(RDD.scala:202) > at > org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32) > at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204) > at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202) > at scala.Option.getOrElse(Option.scala:120) > at org.apache.spark.rdd.RDD.partitions(RDD.scala:202) > at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:54) > at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:54) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at scala.collection.immutable.List.foreach(List.scala:318) > at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) > at scala.collection.AbstractTraversable.map(Traversable.scala:105) > at org.apache.spark.rdd.UnionRDD.getPartitions(UnionRDD.scala:54) > at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204) > at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202) > at scala.Option.getOrElse(Option.scala:120) > at org.apache.spark.rdd.RDD.partitions(RDD.scala:202) > at > org.apache.spark.rdd.MapPartitionsRDD.getPa
[jira] [Created] (SPARK-2647) DAGScheduler plugs others when processing one JobSubmitted event
YanTang Zhai created SPARK-2647: --- Summary: DAGScheduler plugs others when processing one JobSubmitted event Key: SPARK-2647 URL: https://issues.apache.org/jira/browse/SPARK-2647 Project: Spark Issue Type: Improvement Components: Spark Core Reporter: YanTang Zhai If a few of jobs are submitted, DAGScheduler plugs others when processing one JobSubmitted event. For example ont JobSubmitted event is processed as follows and costs much time "spark-akka.actor.default-dispatcher-67" daemon prio=10 tid=0x7f75ec001000 nid=0x7dd6 in Object.wait() [0x7f76063e1000] java.lang.Thread.State: WAITING (on object monitor) at java.lang.Object.wait(Native Method) at java.lang.Object.wait(Object.java:503) at org.apache.hadoopcdh3.ipc.Client.call(Client.java:1130) - locked <0x000783b17330> (a org.apache.hadoopcdh3.ipc.Client$Call) at org.apache.hadoopcdh3.ipc.RPC$Invoker.invoke(RPC.java:241) at com.sun.proxy.$Proxy11.getBlockLocations(Unknown Source) at sun.reflect.GeneratedMethodAccessor86.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.hadoopcdh3.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:83) at org.apache.hadoopcdh3.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:60) at com.sun.proxy.$Proxy11.getBlockLocations(Unknown Source) at org.apache.hadoopcdh3.hdfs.DFSClient.callGetBlockLocations(DFSClient.java:1472) at org.apache.hadoopcdh3.hdfs.DFSClient.getBlockLocations(DFSClient.java:1498) at org.apache.hadoopcdh3.hdfs.Cdh3DistributedFileSystem$1.doCall(Cdh3DistributedFileSystem.java:208) at org.apache.hadoopcdh3.hdfs.Cdh3DistributedFileSystem$1.doCall(Cdh3DistributedFileSystem.java:204) at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) at org.apache.hadoopcdh3.hdfs.Cdh3DistributedFileSystem.getFileBlockLocations(Cdh3DistributedFileSystem.java:204) at org.apache.hadoop.fs.FileSystem$4.next(FileSystem.java:1812) at org.apache.hadoop.fs.FileSystem$4.next(FileSystem.java:1797) at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:233) at StorageEngineClient.CombineFileInputFormat.getSplits(CombineFileInputFormat.java:141) at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:172) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:202) at org.apache.spark.rdd.MappedRDD.getPartitions(MappedRDD.scala:28) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:202) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:202) at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:54) at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:54) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.immutable.List.foreach(List.scala:318) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.AbstractTraversable.map(Traversable.scala:105) at org.apache.spark.rdd.UnionRDD.getPartitions(UnionRDD.scala:54) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:202) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:204) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:202) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:202) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions
[jira] [Updated] (SPARK-2643) Stages web ui has ERROR when pool name is None
[ https://issues.apache.org/jira/browse/SPARK-2643?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] YanTang Zhai updated SPARK-2643: Component/s: Web UI > Stages web ui has ERROR when pool name is None > -- > > Key: SPARK-2643 > URL: https://issues.apache.org/jira/browse/SPARK-2643 > Project: Spark > Issue Type: Bug > Components: Web UI >Reporter: YanTang Zhai >Priority: Minor > > 14/07/23 16:01:44 WARN servlet.ServletHandler: /stages/ > java.util.NoSuchElementException: None.get > at scala.None$.get(Option.scala:313) > at scala.None$.get(Option.scala:311) > at > org.apache.spark.ui.jobs.StageTableBase.stageRow(StageTable.scala:132) > at > org.apache.spark.ui.jobs.StageTableBase.org$apache$spark$ui$jobs$StageTableBase$$renderStageRow(StageTable.scala:150) > at > org.apache.spark.ui.jobs.StageTableBase$$anonfun$toNodeSeq$1.apply(StageTable.scala:52) > at > org.apache.spark.ui.jobs.StageTableBase$$anonfun$toNodeSeq$1.apply(StageTable.scala:52) > at > org.apache.spark.ui.jobs.StageTableBase$$anonfun$stageTable$1.apply(StageTable.scala:61) > at > org.apache.spark.ui.jobs.StageTableBase$$anonfun$stageTable$1.apply(StageTable.scala:61) > at > scala.collection.immutable.Stream$$anonfun$map$1.apply(Stream.scala:376) > at > scala.collection.immutable.Stream$$anonfun$map$1.apply(Stream.scala:376) > at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1085) > at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1077) > at > scala.collection.immutable.StreamIterator$$anonfun$next$1.apply(Stream.scala:980) > at > scala.collection.immutable.StreamIterator$$anonfun$next$1.apply(Stream.scala:980) > at > scala.collection.immutable.StreamIterator$LazyCell.v$lzycompute(Stream.scala:969) > at > scala.collection.immutable.StreamIterator$LazyCell.v(Stream.scala:969) > at scala.collection.immutable.StreamIterator.hasNext(Stream.scala:974) > at scala.collection.Iterator$class.foreach(Iterator.scala:727) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) > at scala.xml.NodeBuffer.$amp$plus(NodeBuffer.scala:38) > at scala.xml.NodeBuffer.$amp$plus(NodeBuffer.scala:40) > at > org.apache.spark.ui.jobs.StageTableBase.stageTable(StageTable.scala:60) > at > org.apache.spark.ui.jobs.StageTableBase.toNodeSeq(StageTable.scala:52) > at > org.apache.spark.ui.jobs.JobProgressPage.render(JobProgressPage.scala:91) > at > org.apache.spark.ui.WebUI$$anonfun$attachPage$1.apply(WebUI.scala:65) > at > org.apache.spark.ui.WebUI$$anonfun$attachPage$1.apply(WebUI.scala:65) > at org.apache.spark.ui.JettyUtils$$anon$1.doGet(JettyUtils.scala:70) > at javax.servlet.http.HttpServlet.service(HttpServlet.java:707) > at javax.servlet.http.HttpServlet.service(HttpServlet.java:820) > at > org.eclipse.jetty.servlet.ServletHolder.handle(ServletHolder.java:684) > at > org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:501) > at > org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1086) > at > org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:428) > at > org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1020) > at > org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:135) > at > org.eclipse.jetty.server.handler.ContextHandlerCollection.handle(ContextHandlerCollection.java:255) > at > org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:116) > at org.eclipse.jetty.server.Server.handle(Server.java:370) > at > org.eclipse.jetty.server.AbstractHttpConnection.handleRequest(AbstractHttpConnection.java:494) > at > org.eclipse.jetty.server.AbstractHttpConnection.headerComplete(AbstractHttpConnection.java:971) > at > org.eclipse.jetty.server.AbstractHttpConnection$RequestHandler.headerComplete(AbstractHttpConnection.java:1033) > at org.eclipse.jetty.http.HttpParser.parseNext(HttpParser.java:644) > at > org.eclipse.jetty.http.HttpParser.parseAvailable(HttpParser.java:235) > at > org.eclipse.jetty.server.AsyncHttpConnection.handle(AsyncHttpConnection.java:82) > at > org.eclipse.jetty.io.nio.SelectChannelEndPoint.handle(SelectChannelEndPoint.java:667) > at > org.eclipse.jetty.io.nio.SelectChannelEndPoint$1.run(SelectChannelEndPoint.java:52) > at > org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:608) > at > org.eclipse.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:543) >
[jira] [Created] (SPARK-2643) Stages web ui has ERROR when pool name is None
YanTang Zhai created SPARK-2643: --- Summary: Stages web ui has ERROR when pool name is None Key: SPARK-2643 URL: https://issues.apache.org/jira/browse/SPARK-2643 Project: Spark Issue Type: Bug Reporter: YanTang Zhai Priority: Minor 14/07/23 16:01:44 WARN servlet.ServletHandler: /stages/ java.util.NoSuchElementException: None.get at scala.None$.get(Option.scala:313) at scala.None$.get(Option.scala:311) at org.apache.spark.ui.jobs.StageTableBase.stageRow(StageTable.scala:132) at org.apache.spark.ui.jobs.StageTableBase.org$apache$spark$ui$jobs$StageTableBase$$renderStageRow(StageTable.scala:150) at org.apache.spark.ui.jobs.StageTableBase$$anonfun$toNodeSeq$1.apply(StageTable.scala:52) at org.apache.spark.ui.jobs.StageTableBase$$anonfun$toNodeSeq$1.apply(StageTable.scala:52) at org.apache.spark.ui.jobs.StageTableBase$$anonfun$stageTable$1.apply(StageTable.scala:61) at org.apache.spark.ui.jobs.StageTableBase$$anonfun$stageTable$1.apply(StageTable.scala:61) at scala.collection.immutable.Stream$$anonfun$map$1.apply(Stream.scala:376) at scala.collection.immutable.Stream$$anonfun$map$1.apply(Stream.scala:376) at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1085) at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1077) at scala.collection.immutable.StreamIterator$$anonfun$next$1.apply(Stream.scala:980) at scala.collection.immutable.StreamIterator$$anonfun$next$1.apply(Stream.scala:980) at scala.collection.immutable.StreamIterator$LazyCell.v$lzycompute(Stream.scala:969) at scala.collection.immutable.StreamIterator$LazyCell.v(Stream.scala:969) at scala.collection.immutable.StreamIterator.hasNext(Stream.scala:974) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.xml.NodeBuffer.$amp$plus(NodeBuffer.scala:38) at scala.xml.NodeBuffer.$amp$plus(NodeBuffer.scala:40) at org.apache.spark.ui.jobs.StageTableBase.stageTable(StageTable.scala:60) at org.apache.spark.ui.jobs.StageTableBase.toNodeSeq(StageTable.scala:52) at org.apache.spark.ui.jobs.JobProgressPage.render(JobProgressPage.scala:91) at org.apache.spark.ui.WebUI$$anonfun$attachPage$1.apply(WebUI.scala:65) at org.apache.spark.ui.WebUI$$anonfun$attachPage$1.apply(WebUI.scala:65) at org.apache.spark.ui.JettyUtils$$anon$1.doGet(JettyUtils.scala:70) at javax.servlet.http.HttpServlet.service(HttpServlet.java:707) at javax.servlet.http.HttpServlet.service(HttpServlet.java:820) at org.eclipse.jetty.servlet.ServletHolder.handle(ServletHolder.java:684) at org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:501) at org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1086) at org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:428) at org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1020) at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:135) at org.eclipse.jetty.server.handler.ContextHandlerCollection.handle(ContextHandlerCollection.java:255) at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:116) at org.eclipse.jetty.server.Server.handle(Server.java:370) at org.eclipse.jetty.server.AbstractHttpConnection.handleRequest(AbstractHttpConnection.java:494) at org.eclipse.jetty.server.AbstractHttpConnection.headerComplete(AbstractHttpConnection.java:971) at org.eclipse.jetty.server.AbstractHttpConnection$RequestHandler.headerComplete(AbstractHttpConnection.java:1033) at org.eclipse.jetty.http.HttpParser.parseNext(HttpParser.java:644) at org.eclipse.jetty.http.HttpParser.parseAvailable(HttpParser.java:235) at org.eclipse.jetty.server.AsyncHttpConnection.handle(AsyncHttpConnection.java:82) at org.eclipse.jetty.io.nio.SelectChannelEndPoint.handle(SelectChannelEndPoint.java:667) at org.eclipse.jetty.io.nio.SelectChannelEndPoint$1.run(SelectChannelEndPoint.java:52) at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:608) at org.eclipse.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:543) at java.lang.Thread.run(Thread.java:744) 14/07/23 16:01:44 WARN server.AbstractHttpConnection: /stages/ java.lang.NoSuchMethodError: javax.servlet.http.HttpServletRequest.isAsyncStarted()Z at org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:583) at org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1086) a
[jira] [Created] (SPARK-2642) Add jobId in web UI
YanTang Zhai created SPARK-2642: --- Summary: Add jobId in web UI Key: SPARK-2642 URL: https://issues.apache.org/jira/browse/SPARK-2642 Project: Spark Issue Type: Improvement Components: Web UI Reporter: YanTang Zhai Priority: Minor Web UI has stage id only at present. Multiple stages could not explicitly show as the same job. Job id will be added in wen ui. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Created] (SPARK-2556) Multiple SparkContexts can coexist in one process
YanTang Zhai created SPARK-2556: --- Summary: Multiple SparkContexts can coexist in one process Key: SPARK-2556 URL: https://issues.apache.org/jira/browse/SPARK-2556 Project: Spark Issue Type: Improvement Components: Spark Core Reporter: YanTang Zhai Priority: Minor Multiple SparkContexts could not coexist in one process at present since different SparkContexts share same global variables. These global variables and objects will be modified to local in order that multiple SparkContexts can coexist. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (SPARK-2325) Utils.getLocalDir had better check the directory and choose a good one instead of choosing the first one directly
[ https://issues.apache.org/jira/browse/SPARK-2325?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14052075#comment-14052075 ] YanTang Zhai commented on SPARK-2325: - I've created PR: https://github.com/apache/spark/pull/1281. Please help to review. Thanks. > Utils.getLocalDir had better check the directory and choose a good one > instead of choosing the first one directly > - > > Key: SPARK-2325 > URL: https://issues.apache.org/jira/browse/SPARK-2325 > Project: Spark > Issue Type: Bug > Components: Spark Core >Reporter: YanTang Zhai > > If the first directory of spark.local.dir is bad, application will exit with > the exception: > Exception in thread "main" java.io.IOException: Failed to create a temp > directory (under /data1/sparkenv/local) after 10 attempts! > at org.apache.spark.util.Utils$.createTempDir(Utils.scala:258) > at > org.apache.spark.broadcast.HttpBroadcast$.createServer(HttpBroadcast.scala:154) > at > org.apache.spark.broadcast.HttpBroadcast$.initialize(HttpBroadcast.scala:127) > at > org.apache.spark.broadcast.HttpBroadcastFactory.initialize(HttpBroadcastFactory.scala:31) > at > org.apache.spark.broadcast.BroadcastManager.initialize(BroadcastManager.scala:48) > at > org.apache.spark.broadcast.BroadcastManager.(BroadcastManager.scala:35) > at org.apache.spark.SparkEnv$.create(SparkEnv.scala:218) > at org.apache.spark.SparkContext.(SparkContext.scala:202) > at JobTaskJoin$.main(JobTaskJoin.scala:9) > at JobTaskJoin.main(JobTaskJoin.scala) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:601) > at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:292) > at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:55) > at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) > Utils.getLocalDir had better check the directory and choose a good one > instead of choosing the first one directly. For example, spark.local.dir is > /data1/sparkenv/local,/data2/sparkenv/local. The disk data1 is bad while the > disk data2 is good, we could choose the data2 not data1. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Closed] (SPARK-1525) TaskSchedulerImpl should decrease availableCpus by spark.task.cpus not 1
[ https://issues.apache.org/jira/browse/SPARK-1525?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] YanTang Zhai closed SPARK-1525. --- Resolution: Fixed > TaskSchedulerImpl should decrease availableCpus by spark.task.cpus not 1 > > > Key: SPARK-1525 > URL: https://issues.apache.org/jira/browse/SPARK-1525 > Project: Spark > Issue Type: Bug > Components: Spark Core >Reporter: YanTang Zhai >Priority: Minor > > TaskSchedulerImpl decreases availableCpus by 1 in resourceOffers process > always even though spark.task.cpus is more than 1, which will schedule more > tasks to some node when spark.task.cpus is more than 1. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (SPARK-2324) SparkContext should not exit directly when spark.local.dir is a list of multiple paths and one of them has error
[ https://issues.apache.org/jira/browse/SPARK-2324?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14048745#comment-14048745 ] YanTang Zhai commented on SPARK-2324: - I've created PR: https://github.com/apache/spark/pull/1274. Please help to review. Thanks. > SparkContext should not exit directly when spark.local.dir is a list of > multiple paths and one of them has error > > > Key: SPARK-2324 > URL: https://issues.apache.org/jira/browse/SPARK-2324 > Project: Spark > Issue Type: Bug > Components: Spark Core >Reporter: YanTang Zhai > > The spark.local.dir is configured as a list of multiple paths as follows > /data1/sparkenv/local,/data2/sparkenv/local. If the disk data2 of the driver > node has error, the application will exit since DiskBlockManager exits > directly at createLocalDirs. If the disk data2 of the worker node has error, > the executor will exit either. > DiskBlockManager should not exit directly at createLocalDirs if one of > spark.local.dir has error. Since spark.local.dir has multiple paths, a > problem should not affect the overall situation. > I think DiskBlockManager could ignore the bad directory at createLocalDirs. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Created] (SPARK-2326) DiskBlockManager could add DiskChecker function for kicking off bad directories
YanTang Zhai created SPARK-2326: --- Summary: DiskBlockManager could add DiskChecker function for kicking off bad directories Key: SPARK-2326 URL: https://issues.apache.org/jira/browse/SPARK-2326 Project: Spark Issue Type: Bug Reporter: YanTang Zhai If the disk failure happens when the spark cluster is running, DiskBlockManager should kick off bad directories automatically. DiskBlockManager could add DiskChecker function. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Created] (SPARK-2325) Utils.getLocalDir had better check the directory and choose a good one instead of choosing the first one directly
YanTang Zhai created SPARK-2325: --- Summary: Utils.getLocalDir had better check the directory and choose a good one instead of choosing the first one directly Key: SPARK-2325 URL: https://issues.apache.org/jira/browse/SPARK-2325 Project: Spark Issue Type: Bug Components: Spark Core Reporter: YanTang Zhai If the first directory of spark.local.dir is bad, application will exit with the exception: Exception in thread "main" java.io.IOException: Failed to create a temp directory (under /data1/sparkenv/local) after 10 attempts! at org.apache.spark.util.Utils$.createTempDir(Utils.scala:258) at org.apache.spark.broadcast.HttpBroadcast$.createServer(HttpBroadcast.scala:154) at org.apache.spark.broadcast.HttpBroadcast$.initialize(HttpBroadcast.scala:127) at org.apache.spark.broadcast.HttpBroadcastFactory.initialize(HttpBroadcastFactory.scala:31) at org.apache.spark.broadcast.BroadcastManager.initialize(BroadcastManager.scala:48) at org.apache.spark.broadcast.BroadcastManager.(BroadcastManager.scala:35) at org.apache.spark.SparkEnv$.create(SparkEnv.scala:218) at org.apache.spark.SparkContext.(SparkContext.scala:202) at JobTaskJoin$.main(JobTaskJoin.scala:9) at JobTaskJoin.main(JobTaskJoin.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:601) at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:292) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:55) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Utils.getLocalDir had better check the directory and choose a good one instead of choosing the first one directly. For example, spark.local.dir is /data1/sparkenv/local,/data2/sparkenv/local. The disk data1 is bad while the disk data2 is good, we could choose the data2 not data1. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Created] (SPARK-2324) SparkContext should not exit directly when spark.local.dir is a list of multiple paths and one of them has error
YanTang Zhai created SPARK-2324: --- Summary: SparkContext should not exit directly when spark.local.dir is a list of multiple paths and one of them has error Key: SPARK-2324 URL: https://issues.apache.org/jira/browse/SPARK-2324 Project: Spark Issue Type: Bug Components: Spark Core Reporter: YanTang Zhai The spark.local.dir is configured as a list of multiple paths as follows /data1/sparkenv/local,/data2/sparkenv/local. If the disk data2 of the driver node has error, the application will exit since DiskBlockManager exits directly at createLocalDirs. If the disk data2 of the worker node has error, the executor will exit either. DiskBlockManager should not exit directly at createLocalDirs if one of spark.local.dir has error. Since spark.local.dir has multiple paths, a problem should not affect the overall situation. I think DiskBlockManager could ignore the bad directory at createLocalDirs. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Commented] (SPARK-2290) Worker should directly use its own sparkHome instead of appDesc.sparkHome when LaunchExecutor
[ https://issues.apache.org/jira/browse/SPARK-2290?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14045691#comment-14045691 ] YanTang Zhai commented on SPARK-2290: - I've created PR: https://github.com/apache/spark/pull/1244. Please help to review, thanks. > Worker should directly use its own sparkHome instead of appDesc.sparkHome > when LaunchExecutor > - > > Key: SPARK-2290 > URL: https://issues.apache.org/jira/browse/SPARK-2290 > Project: Spark > Issue Type: Bug > Components: Spark Core >Reporter: YanTang Zhai >Priority: Minor > > The client path is /data/home/spark/test/spark-1.0.0 while the worker deploy > path is /data/home/spark/spark-1.0.0 which is different from the client path. > Then an application is launched using the ./bin/spark-submit --class > JobTaskJoin --master spark://172.25.38.244:7077 --executor-memory 128M > ../jobtaskjoin_2.10-1.0.0.jar. However the application is failed because an > exception occurs at > java.io.IOException: Cannot run program > "/data/home/spark/test/spark-1.0.0-bin-0.20.2-cdh3u3/bin/compute-classpath.sh" > (in directory "."): error=2, No such file or directory > at java.lang.ProcessBuilder.start(ProcessBuilder.java:1029) > at org.apache.spark.util.Utils$.executeAndGetOutput(Utils.scala:759) > at > org.apache.spark.deploy.worker.CommandUtils$.buildJavaOpts(CommandUtils.scala:72) > at > org.apache.spark.deploy.worker.CommandUtils$.buildCommandSeq(CommandUtils.scala:37) > at > org.apache.spark.deploy.worker.ExecutorRunner.getCommandSeq(ExecutorRunner.scala:109) > at > org.apache.spark.deploy.worker.ExecutorRunner.fetchAndRunExecutor(ExecutorRunner.scala:124) > at > org.apache.spark.deploy.worker.ExecutorRunner$$anon$1.run(ExecutorRunner.scala:58) > Caused by: java.io.IOException: error=2, No such file or directory > at java.lang.UNIXProcess.forkAndExec(Native Method) > at java.lang.UNIXProcess.(UNIXProcess.java:135) > at java.lang.ProcessImpl.start(ProcessImpl.java:130) > at java.lang.ProcessBuilder.start(ProcessBuilder.java:1021) > ... 6 more > Therefore, I think worker should not use appDesc.sparkHome when > LaunchExecutor, Contrarily, worker could use its own sparkHome directly. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Created] (SPARK-2290) Worker should directly use its own sparkHome instead of appDesc.sparkHome when LaunchExecutor
YanTang Zhai created SPARK-2290: --- Summary: Worker should directly use its own sparkHome instead of appDesc.sparkHome when LaunchExecutor Key: SPARK-2290 URL: https://issues.apache.org/jira/browse/SPARK-2290 Project: Spark Issue Type: Bug Components: Spark Core Reporter: YanTang Zhai Priority: Minor The client path is /data/home/spark/test/spark-1.0.0 while the worker deploy path is /data/home/spark/spark-1.0.0 which is different from the client path. Then an application is launched using the ./bin/spark-submit --class JobTaskJoin --master spark://172.25.38.244:7077 --executor-memory 128M ../jobtaskjoin_2.10-1.0.0.jar. However the application is failed because an exception occurs at java.io.IOException: Cannot run program "/data/home/spark/test/spark-1.0.0-bin-0.20.2-cdh3u3/bin/compute-classpath.sh" (in directory "."): error=2, No such file or directory at java.lang.ProcessBuilder.start(ProcessBuilder.java:1029) at org.apache.spark.util.Utils$.executeAndGetOutput(Utils.scala:759) at org.apache.spark.deploy.worker.CommandUtils$.buildJavaOpts(CommandUtils.scala:72) at org.apache.spark.deploy.worker.CommandUtils$.buildCommandSeq(CommandUtils.scala:37) at org.apache.spark.deploy.worker.ExecutorRunner.getCommandSeq(ExecutorRunner.scala:109) at org.apache.spark.deploy.worker.ExecutorRunner.fetchAndRunExecutor(ExecutorRunner.scala:124) at org.apache.spark.deploy.worker.ExecutorRunner$$anon$1.run(ExecutorRunner.scala:58) Caused by: java.io.IOException: error=2, No such file or directory at java.lang.UNIXProcess.forkAndExec(Native Method) at java.lang.UNIXProcess.(UNIXProcess.java:135) at java.lang.ProcessImpl.start(ProcessImpl.java:130) at java.lang.ProcessBuilder.start(ProcessBuilder.java:1021) ... 6 more Therefore, I think worker should not use appDesc.sparkHome when LaunchExecutor, Contrarily, worker could use its own sparkHome directly. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Created] (SPARK-1525) TaskSchedulerImpl should decrease availableCpus by spark.task.cpus not 1
YanTang Zhai created SPARK-1525: --- Summary: TaskSchedulerImpl should decrease availableCpus by spark.task.cpus not 1 Key: SPARK-1525 URL: https://issues.apache.org/jira/browse/SPARK-1525 Project: Spark Issue Type: Bug Components: Spark Core Reporter: YanTang Zhai Priority: Minor TaskSchedulerImpl decreases availableCpus by 1 in resourceOffers process always even though spark.task.cpus is more than 1, which will schedule more tasks to some node when spark.task.cpus is more than 1. -- This message was sent by Atlassian JIRA (v6.2#6252)
[jira] [Created] (SPARK-1524) TaskSetManager'd better not schedule tasks which has no preferred executorId using PROCESS_LOCAL in the first search process
YanTang Zhai created SPARK-1524: --- Summary: TaskSetManager'd better not schedule tasks which has no preferred executorId using PROCESS_LOCAL in the first search process Key: SPARK-1524 URL: https://issues.apache.org/jira/browse/SPARK-1524 Project: Spark Issue Type: Improvement Components: Spark Core Reporter: YanTang Zhai Priority: Minor ShuffleMapTask is constructed with TaskLocation which has only host not (host, executorID) pair in DAGScheduler. When TaskSetManager schedules ShuffleMapTask which has no preferred executorId using specific execId host and PROCESS_LOCAL locality level, no tasks match the given locality constraint in the first search process. We also find that the host used by Scheduler is hostname while the host used by TaskLocation is IP in our cluster. The tow hosts do not match, that makes pendingTasksForHost HashMap empty and the finding task process against our expectation. -- This message was sent by Atlassian JIRA (v6.2#6252)