[jira] [Commented] (SPARK-19067) mapGroupsWithState - arbitrary stateful operations with Structured Streaming (similar to DStream.mapWithState)
[ https://issues.apache.org/jira/browse/SPARK-19067?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15958403#comment-15958403 ] Amit Sela commented on SPARK-19067: --- [~tdas] should I open a ticket, for future improvements, about allowing timeouts to execute even if there's no data in the pipeline ? As I mentioned in the PR comments: Using the {{EventTime}} timeout in the future, I assume the "clock" would be watermark based instead of wall-time, and I see at least two use-cases where this would matter: # Testing - being able to move the clock forward to end-of-time to force firing everything that still awaits for the closing of windows. # A pipeline where there is a filter before the stateful op. such that there is data, and the watermark advances, but some of the events are dropped and don't reach the stateful operator so it will hold off firing until the "proper" data (that passes filter) comes along - this again could cause an unknown delay to emitting results out of the stateful operator. > mapGroupsWithState - arbitrary stateful operations with Structured Streaming > (similar to DStream.mapWithState) > -- > > Key: SPARK-19067 > URL: https://issues.apache.org/jira/browse/SPARK-19067 > Project: Spark > Issue Type: New Feature > Components: Structured Streaming >Reporter: Michael Armbrust >Assignee: Tathagata Das >Priority: Critical > Fix For: 2.2.0 > > > Right now the only way to do stateful operations with with Aggregator or > UDAF. However, this does not give users control of emission or expiration of > state making it hard to implement things like sessionization. We should add > a more general construct (probably similar to {{DStream.mapWithState}}) to > structured streaming. Here is the design. > *Requirements* > - Users should be able to specify a function that can do the following > - Access the input row corresponding to a key > - Access the previous state corresponding to a key > - Optionally, update or remove the state > - Output any number of new rows (or none at all) > *Proposed API* > {code} > // New methods on KeyValueGroupedDataset > class KeyValueGroupedDataset[K, V] { > // Scala friendly > def mapGroupsWithState[S: Encoder, U: Encoder](func: (K, Iterator[V], > State[S]) => U) > def flatMapGroupsWithState[S: Encode, U: Encoder](func: (K, > Iterator[V], State[S]) => Iterator[U]) > // Java friendly >def mapGroupsWithState[S, U](func: MapGroupsWithStateFunction[K, V, S, > R], stateEncoder: Encoder[S], resultEncoder: Encoder[U]) >def flatMapGroupsWithState[S, U](func: > FlatMapGroupsWithStateFunction[K, V, S, R], stateEncoder: Encoder[S], > resultEncoder: Encoder[U]) > } > // --- New Java-friendly function classes --- > public interface MapGroupsWithStateFunction extends Serializable { > R call(K key, Iterator values, state: State) throws Exception; > } > public interface FlatMapGroupsWithStateFunction extends > Serializable { > Iterator call(K key, Iterator values, state: GroupState) throws > Exception; > } > // -- Wrapper class for state data -- > trait GroupState[S] { > def exists(): Boolean > def get(): S// throws Exception is state does not > exist > def getOption(): Option[S] > def update(newState: S): Unit > def remove(): Unit // exists() will be false after this > } > {code} > Key Semantics of the State class > - The state can be null. > - If the state.remove() is called, then state.exists() will return false, and > getOption will returm None. > - After that state.update(newState) is called, then state.exists() will > return true, and getOption will return Some(...). > - None of the operations are thread-safe. This is to avoid memory barriers. > *Usage* > {code} > val stateFunc = (word: String, words: Iterator[String, runningCount: > GroupState[Long]) => { > val newCount = words.size + runningCount.getOption.getOrElse(0L) > runningCount.update(newCount) >(word, newCount) > } > dataset // type > is Dataset[String] > .groupByKey[String](w => w) // generates > KeyValueGroupedDataset[String, String] > .mapGroupsWithState[Long, (String, Long)](stateFunc)// returns > Dataset[(String, Long)] > {code} > *Future Directions* > - Timeout based state expiration (that has not received data for a while) - > Done > - General expression based expiration - TODO. Any real usecases that cannot > be done with timeouts? -- This message was sent b
[jira] [Created] (SPARK-20237) Spark-1.6 current and later versions of memory management issues
zhangwei72 created SPARK-20237: -- Summary: Spark-1.6 current and later versions of memory management issues Key: SPARK-20237 URL: https://issues.apache.org/jira/browse/SPARK-20237 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 2.1.0, 2.0.2, 2.0.1, 2.0.0, 1.6.3, 1.6.2, 1.6.1, 1.6.0 Environment: java 1.7.0 scala-2.10.5 maven-3.3.9hadoop-2.2.0 spark-1.6.2 Reporter: zhangwei72 Priority: Critical Fix For: 1.6.2 In spark-1.6 and later versions, there is a problem with its memory management UnifiedMemoryManager. Spark.memory.storageFraction configuration should be at least storage Memory memory. In the memory management UnifiedMemoryManager, the calculation of Execution memory can be up to storage how much memory can borrow,using val memoryReclaimableFromStorage = math.max(storageMemoryPool.memoryFree,storageMemoryPool.poolSize - storageRegionSize). When storageMemoryPool.memoryFree > storageMemoryPool.poolSize - storageRegionSize, the size of the a will be chosen, that is,storage Memory will reduce the storageMemoryPool.memoryFree so much. Because of storageMemoryPool.memoryFree > storageMemoryPool.poolSize - storageRegionSize, so storageMemoryPool.poolSize - storageMemoryPool.memoryFree < storageRegionSize Now storageMemoryPool.poolSize < storageRegionSize,storageRegionSize is the smallest proportion of frame definition,so there is a problem. To solve this problem, we define the function as val memoryReclaimableFromStorage = storageMemoryPool.poolSize - storageRegionSize. Experimental proof: I added some log information to the UnifiedMemoryManager file as follows: logInfo("storageMemoryPool.memoryFree %f".format(storageMemoryPool.memoryFree/1024.0/1024.0)) logInfo("onHeapExecutionMemoryPool.memoryFree %f".format(onHeapExecutionMemoryPool.memoryFree/1024.0/1024.0)) logInfo("storageMemoryPool.memoryUsed %f".format( storageMemoryPool.memoryUsed/1024.0/1024.0)) logInfo("onHeapExecutionMemoryPool.memoryUsed %f".format(onHeapExecutionMemoryPool.memoryUsed/1024.0/1024.0)) logInfo("storageMemoryPool.poolSize %f".format( storageMemoryPool.poolSize/1024.0/1024.0)) logInfo("onHeapExecutionMemoryPool.poolSize %f".format(onHeapExecutionMemoryPool.poolSize/1024.0/1024.0)) When I run the PageRank program, the input file for PageRank is generated by the BigDataBench-Chinese Academy of Sciences and is used to evaluate large data analysis system tools with a size of 676M. The information submitted is as follows: ./bin/spark-submit --class org.apache.spark.examples.SparkPageRank \ --master yarn \ --deploy-mode cluster \ --num-executors 1 \ --driver-memory 4g \ --executor-memory 7g \ --executor-cores 6 \ --queue thequeue \ ./examples/target/scala-2.10/spark-examples-1.6.2-hadoop2.2.0.jar \ /test/Google_genGraph_23.txt 6 The configuration is as follows: spark.memory.useLegacyMode=false spark.memory.fraction=0.75 spark.memory.storageFraction=0.2 Log information is as follows: 17/02/28 11:07:34 INFO memory.UnifiedMemoryManager: storageMemoryPool.memoryFree 0.00 17/02/28 11:07:34 INFO memory.UnifiedMemoryManager: onHeapExecutionMemoryPool.memoryFree 5663.325877 17/02/28 11:07:34 INFO memory.UnifiedMemoryManager: storageMemoryPool.memoryUsed 0.299123 M 17/02/28 11:07:34 INFO memory.UnifiedMemoryManager: onHeapExecutionMemoryPool.memoryUsed 0.00 17/02/28 11:07:34 INFO memory.UnifiedMemoryManager: storageMemoryPool.poolSize 0.299123 17/02/28 11:07:34 INFO memory.UnifiedMemoryManager: onHeapExecutionMemoryPool.poolSize 5663.325877 According to the configuration, storageMemoryPool.poolSize at least 1G or more, but the log information is only 0.299123 M, so there is an error. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-20236) Overwrite a partitioned table should only overwrite related partitions
Wenchen Fan created SPARK-20236: --- Summary: Overwrite a partitioned table should only overwrite related partitions Key: SPARK-20236 URL: https://issues.apache.org/jira/browse/SPARK-20236 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 2.2.0 Reporter: Wenchen Fan When we overwrite a partitioned table, currently Spark will truncate the entire table to write new data, or truncate a bunch of partitions according to the given static partitions. For example, {{INSERT OVERWRITE tbl ...}} will truncate the entire table, {{INSERT OVERWRITE tbl PARTITION (a=1, b)}} will truncate all the partitions that starts with {{a=1}}. This behavior is kind of reasonable as we can know which partitions will be overwritten before runtime. However, hive has a different behavior that it only overwrites related partitions, e.g. {{INSERT OVERWRITE tbl SELECT 1,2,3}} will only overwrite partition {{a=2, b=3}}, assuming {{tbl}} has only one data column and is partitioned by {{a}} and {{b}}. It seems better if we can follow hive's behavior. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-20236) Overwrite a partitioned table should only overwrite related partitions
[ https://issues.apache.org/jira/browse/SPARK-20236?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15958398#comment-15958398 ] Wenchen Fan commented on SPARK-20236: - cc [~jiangxb1987] > Overwrite a partitioned table should only overwrite related partitions > -- > > Key: SPARK-20236 > URL: https://issues.apache.org/jira/browse/SPARK-20236 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.2.0 >Reporter: Wenchen Fan > > When we overwrite a partitioned table, currently Spark will truncate the > entire table to write new data, or truncate a bunch of partitions according > to the given static partitions. > For example, {{INSERT OVERWRITE tbl ...}} will truncate the entire table, > {{INSERT OVERWRITE tbl PARTITION (a=1, b)}} will truncate all the partitions > that starts with {{a=1}}. > This behavior is kind of reasonable as we can know which partitions will be > overwritten before runtime. However, hive has a different behavior that it > only overwrites related partitions, e.g. {{INSERT OVERWRITE tbl SELECT > 1,2,3}} will only overwrite partition {{a=2, b=3}}, assuming {{tbl}} has only > one data column and is partitioned by {{a}} and {{b}}. > It seems better if we can follow hive's behavior. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-20187) Replace loadTable with moveFile to speed up load table for many output files
[ https://issues.apache.org/jira/browse/SPARK-20187?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yuming Wang resolved SPARK-20187. - Resolution: Duplicate > Replace loadTable with moveFile to speed up load table for many output files > > > Key: SPARK-20187 > URL: https://issues.apache.org/jira/browse/SPARK-20187 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.1.0 >Reporter: Yuming Wang > Attachments: spark.loadTable.log.tar.gz, spark.moveFile.log.tar.gz > > > [HiveClientImpl.loadTable|https://github.com/apache/spark/blob/v2.1.0/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala#L667] > load files one by one, so this step will take a long time if a job generates > many files. There is a [Hive.moveFile > api|https://github.com/apache/hive/blob/release-1.2.1/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java#L2567] > can speed up this step for {{create table tableName as select ...}} and > {{insert overwrite table tableName select ...}} > Here are two APIs comparison: > {noformat:align=left|title=loadTable api: It took about 26 minutes(10:50:14 - > 11:16:18) to load table} > 17/04/01 10:50:04 INFO TaskSetManager: Finished task 207165.0 in stage 0.0 > (TID 216796) in 5952 ms on jqhadoop-test28-8.int.yihaodian.com (executor 54) > (216869/216869) > 17/04/01 10:50:04 INFO YarnScheduler: Removed TaskSet 0.0, whose tasks have > all completed, from pool > 17/04/01 10:50:04 INFO DAGScheduler: ResultStage 0 (processCmd at > CliDriver.java:376) finished in 541.797 s > 17/04/01 10:50:04 INFO DAGScheduler: Job 0 finished: processCmd at > CliDriver.java:376, took 551.208919 s > 17/04/01 10:50:04 INFO FileFormatWriter: Job null committed. > 17/04/01 10:50:14 INFO Hive: Replacing > src:viewfs://cluster4/user/hive/warehouse/staging/.hive-staging_hive_2017-04-01_10-40-02_349_8047899863313770218-1/-ext-1/part-0-9335c5f3-60fa-418b-a466-2d76a5e84537-c000, > dest: > viewfs://cluster4/user/hive/warehouse/tmp.db/spark_load_slow/part-0-9335c5f3-60fa-418b-a466-2d76a5e84537-c000, > Status:true > 17/04/01 10:50:14 INFO Hive: Replacing > src:viewfs://cluster4/user/hive/warehouse/staging/.hive-staging_hive_2017-04-01_10-40-02_349_8047899863313770218-1/-ext-1/part-1-9335c5f3-60fa-418b-a466-2d76a5e84537-c000, > dest: > viewfs://cluster4/user/hive/warehouse/tmp.db/spark_load_slow/part-1-9335c5f3-60fa-418b-a466-2d76a5e84537-c000, > Status:true > ... > 17/04/01 11:16:11 INFO Hive: Replacing > src:viewfs://cluster4/user/hive/warehouse/staging/.hive-staging_hive_2017-04-01_10-40-02_349_8047899863313770218-1/-ext-1/part-9-9335c5f3-60fa-418b-a466-2d76a5e84537-c000, > dest: > viewfs://cluster4/user/hive/warehouse/tmp.db/spark_load_slow/part-9-9335c5f3-60fa-418b-a466-2d76a5e84537-c000, > Status:true > 17/04/01 11:16:18 INFO SparkSqlParser: Parsing command: > `tmp`.`spark_load_slow` > 17/04/01 11:16:18 INFO CatalystSqlParser: Parsing command: string > 17/04/01 11:16:18 INFO CatalystSqlParser: Parsing command: string > 17/04/01 11:16:18 INFO CatalystSqlParser: Parsing command: string > 17/04/01 11:16:18 INFO CatalystSqlParser: Parsing command: string > 17/04/01 11:16:18 INFO CatalystSqlParser: Parsing command: string > Time taken: 2178.736 seconds > 17/04/01 11:16:18 INFO CliDriver: Time taken: 2178.736 seconds > {noformat} > {noformat:align=left|title=moveFile api: It took about 9 minutes(13:24:39 - > 13:33:46) to load table|align=right} > 17/04/01 13:24:38 INFO TaskSetManager: Finished task 210610.0 in stage 0.0 > (TID 216829) in 5888 ms on jqhadoop-test28-28.int.yihaodian.com (executor 59) > (216869/216869) > 17/04/01 13:24:38 INFO YarnScheduler: Removed TaskSet 0.0, whose tasks have > all completed, from pool > 17/04/01 13:24:38 INFO DAGScheduler: ResultStage 0 (processCmd at > CliDriver.java:376) finished in 532.409 s > 17/04/01 13:24:38 INFO DAGScheduler: Job 0 finished: processCmd at > CliDriver.java:376, took 539.337610 s > 17/04/01 13:24:39 INFO FileFormatWriter: Job null committed. > 17/04/01 13:24:39 INFO Hive: Replacing > src:viewfs://cluster4/user/hive/warehouse/staging/.hive-staging_hive_2017-04-01_13-14-46_099_8962745596360417817-1/-ext-1, > dest: viewfs://cluster4/user/hive/warehouse/tmp.db/spark_load_slow_movefile, > Status:true > 17/04/01 13:33:46 INFO SparkSqlParser: Parsing command: > `tmp`.`spark_load_slow_movefile` > 17/04/01 13:33:46 INFO CatalystSqlParser: Parsing command: string > 17/04/01 13:33:46 INFO CatalystSqlParser: Parsing command: string > 17/04/01 13:33:46 INFO CatalystSqlParser: Parsing command: string > 17/04/01 13:33:46 INFO CatalystSqlParser: Parsing command: string > 17/04/01 13:33:46 INFO CatalystSqlParser: Parsing command: string > Time taken
[jira] [Created] (SPARK-20235) Hive on S3 s3:sse and non S3:sse buckets
Franck Tago created SPARK-20235: --- Summary: Hive on S3 s3:sse and non S3:sse buckets Key: SPARK-20235 URL: https://issues.apache.org/jira/browse/SPARK-20235 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 2.1.0 Reporter: Franck Tago Priority: Minor my spark application writes into 2 hive tables . both tables are external with data residing on S3 I want to encrypt the data when writing into hive table1 , but I do not want to encrypt the data when writing into hive table 2. given that the parameter fs.s3a.server-side-encryption-algorithm is set globally , I do not see how these use cases are supported in Spark. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-12717) pyspark broadcast fails when using multiple threads
[ https://issues.apache.org/jira/browse/SPARK-12717?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15958336#comment-15958336 ] Kang Liu commented on SPARK-12717: -- I met the same problem with pyspark 2.1.0. Any progress? > pyspark broadcast fails when using multiple threads > --- > > Key: SPARK-12717 > URL: https://issues.apache.org/jira/browse/SPARK-12717 > Project: Spark > Issue Type: Bug > Components: PySpark >Affects Versions: 1.6.0 > Environment: Linux, python 2.6 or python 2.7. >Reporter: Edward Walker >Priority: Critical > > The following multi-threaded program that uses broadcast variables > consistently throws exceptions like: *Exception("Broadcast variable '18' not > loaded!",)* --- even when run with "--master local[10]". > {code:title=bug_spark.py|borderStyle=solid} > try: > > import pyspark > > except: > > pass > > from optparse import OptionParser > > > > def my_option_parser(): > > op = OptionParser() > > op.add_option("--parallelism", dest="parallelism", type="int", > default=20) > return op > > > > def do_process(x, w): > > return x * w.value > > > > def func(name, rdd, conf): > > new_rdd = rdd.map(lambda x : do_process(x, conf)) > > total = new_rdd.reduce(lambda x, y : x + y) > > count = rdd.count() > > print name, 1.0 * total / count > > > > if __name__ == "__main__": > > import threading > > op = my_option_parser() > > options, args = op.parse_args() > > sc = pyspark.SparkContext(appName="Buggy") > > data_rdd = sc.parallelize(range(0,1000), 1) > > confs = [ sc.broadcast(i) for i in xrange(options.parallelism) ] > > threads = [ threading.Thread(target=func, args=["thread_" + str(i), > data_rdd, confs[i]]) for i in xrange(options.parallelism) ] > > for t in threads: > > t.start() > > for t in threads: > > t.join() > {code} > Abridged run output: > {code:title=abridge_run.txt|borderStyle=solid} > % spark-submit --master local[10] bug_spark.py --parallelism 20 > [snip] > 16/01/08 17:10:20 ERROR Executor: Exception in task 0.0 in
[jira] [Closed] (SPARK-19722) Clean up the usage of EliminateSubqueryAliases
[ https://issues.apache.org/jira/browse/SPARK-19722?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xiao Li closed SPARK-19722. --- Resolution: Won't Fix > Clean up the usage of EliminateSubqueryAliases > -- > > Key: SPARK-19722 > URL: https://issues.apache.org/jira/browse/SPARK-19722 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.2.0 >Reporter: Xiao Li >Assignee: Xiao Li >Priority: Minor > > In the PR https://github.com/apache/spark/pull/11403, we introduced the > function `canonicalized` for eliminating useless subqueries. We can simply > replace the call of rule `EliminateSubqueryAliases` by the function > `canonicalized`. > After we changed the view resolution and management, the current reason why > we keep `EliminateSubqueryAliases ` in optimizer becomes out-of-dated. Thus, > we also should update the reason to `eager analysis of Dataset`. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-20233) Apply star-join filter heuristics to dynamic programming join enumeration
[ https://issues.apache.org/jira/browse/SPARK-20233?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15958265#comment-15958265 ] Apache Spark commented on SPARK-20233: -- User 'ioana-delaney' has created a pull request for this issue: https://github.com/apache/spark/pull/17546 > Apply star-join filter heuristics to dynamic programming join enumeration > - > > Key: SPARK-20233 > URL: https://issues.apache.org/jira/browse/SPARK-20233 > Project: Spark > Issue Type: Sub-task > Components: SQL >Affects Versions: 2.2.0 >Reporter: Ioana Delaney >Priority: Critical > > This JIRA integrates star-join detection with the cost-based optimizer. > The join enumeration using dynamic programming generates a set of feasible > joins. The sub-optimal plans can be eliminated by a sequence of independent, > optional filters. The optional filters include heuristics for reducing the > search space. For example, > # Star-join: Tables in a star schema relationship are planned together since > they are assumed to have an optimal execution. > # Cartesian products: Cartesian products are deferred as late as possible to > avoid large intermediate results (expanding joins, in general). > # Composite inners: “Bushy tree” plans are not generated to avoid > materializing intermediate result. > For reference, see “Measuring the Complexity of Join Enumeration in Query > Optimization” by Ono et al. > This JIRA implements the star join filter. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-20233) Apply star-join filter heuristics to dynamic programming join enumeration
[ https://issues.apache.org/jira/browse/SPARK-20233?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-20233: Assignee: (was: Apache Spark) > Apply star-join filter heuristics to dynamic programming join enumeration > - > > Key: SPARK-20233 > URL: https://issues.apache.org/jira/browse/SPARK-20233 > Project: Spark > Issue Type: Sub-task > Components: SQL >Affects Versions: 2.2.0 >Reporter: Ioana Delaney >Priority: Critical > > This JIRA integrates star-join detection with the cost-based optimizer. > The join enumeration using dynamic programming generates a set of feasible > joins. The sub-optimal plans can be eliminated by a sequence of independent, > optional filters. The optional filters include heuristics for reducing the > search space. For example, > # Star-join: Tables in a star schema relationship are planned together since > they are assumed to have an optimal execution. > # Cartesian products: Cartesian products are deferred as late as possible to > avoid large intermediate results (expanding joins, in general). > # Composite inners: “Bushy tree” plans are not generated to avoid > materializing intermediate result. > For reference, see “Measuring the Complexity of Join Enumeration in Query > Optimization” by Ono et al. > This JIRA implements the star join filter. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-20233) Apply star-join filter heuristics to dynamic programming join enumeration
[ https://issues.apache.org/jira/browse/SPARK-20233?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-20233: Assignee: Apache Spark > Apply star-join filter heuristics to dynamic programming join enumeration > - > > Key: SPARK-20233 > URL: https://issues.apache.org/jira/browse/SPARK-20233 > Project: Spark > Issue Type: Sub-task > Components: SQL >Affects Versions: 2.2.0 >Reporter: Ioana Delaney >Assignee: Apache Spark >Priority: Critical > > This JIRA integrates star-join detection with the cost-based optimizer. > The join enumeration using dynamic programming generates a set of feasible > joins. The sub-optimal plans can be eliminated by a sequence of independent, > optional filters. The optional filters include heuristics for reducing the > search space. For example, > # Star-join: Tables in a star schema relationship are planned together since > they are assumed to have an optimal execution. > # Cartesian products: Cartesian products are deferred as late as possible to > avoid large intermediate results (expanding joins, in general). > # Composite inners: “Bushy tree” plans are not generated to avoid > materializing intermediate result. > For reference, see “Measuring the Complexity of Join Enumeration in Query > Optimization” by Ono et al. > This JIRA implements the star join filter. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-20226) Call to sqlContext.cacheTable takes an incredibly long time in some cases
[ https://issues.apache.org/jira/browse/SPARK-20226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15958259#comment-15958259 ] Liang-Chi Hsieh commented on SPARK-20226: - [~barrybecker4] Can you try to disable this config flag {{spark.sql.constraintPropagation.enabled}} and try it again? Thanks. > Call to sqlContext.cacheTable takes an incredibly long time in some cases > - > > Key: SPARK-20226 > URL: https://issues.apache.org/jira/browse/SPARK-20226 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.1.0 > Environment: linux or windows >Reporter: Barry Becker > Labels: cache > Attachments: profile_indexer2.PNG, xyzzy.csv > > > I have a case where the call to sqlContext.cacheTable can take an arbitrarily > long time depending on the number of columns that are referenced in a > withColumn expression applied to a dataframe. > The dataset is small (20 columns 7861 rows). The sequence to reproduce is the > following: > 1) add a new column that references 8 - 14 of the columns in the dataset. >- If I add 8 columns, then the call to cacheTable is fast - like *5 > seconds* >- If I add 11 columns, then it is slow - like *60 seconds* >- and if I add 14 columns, then it basically *takes forever* - I gave up > after 10 minutes or so. > The Column expression that is added, is basically just concatenating > the columns together in a single string. If a number is concatenated on a > string (or vice versa) the number is first converted to a string. > The expression looks something like this: > {code} > `Plate` + `State` + `License Type` + `Summons Number` + `Issue Date` + > `Violation Time` + `Violation` + `Judgment Entry Date` + `Fine Amount` + > `Penalty Amount` + `Interest Amount` > {code} > which we then convert to a Column expression that looks like this: > {code} > UDF(UDF(UDF(UDF(UDF(UDF(UDF(UDF(UDF(UDF('Plate, 'State), 'License Type), > UDF('Summons Number)), UDF('Issue Date)), 'Violation Time), 'Violation), > UDF('Judgment Entry Date)), UDF('Fine Amount)), UDF('Penalty Amount)), > UDF('Interest Amount)) > {code} >where the UDFs are very simple functions that basically call toString > and + as needed. > 2) apply a pipeline that includes some transformers that was saved earlier. > Here are the steps of the pipeline (extracted from parquet) > - > {code}{"class":"org.apache.spark.ml.feature.StringIndexerModel","timestamp":1491333200603,"sparkVersion":"2.1.0","uid":"strIdx_aeb04d2777cc","paramMap":{"handleInvalid":"skip","outputCol":"State_IDX__","inputCol":"State_CLEANED__"}}{code} > - > {code}{"class":"org.apache.spark.ml.feature.StringIndexerModel","timestamp":1491333200837,"sparkVersion":"2.1.0","uid":"strIdx_0164c4c13979","paramMap":{"inputCol":"License > Type_CLEANED__","handleInvalid":"skip","outputCol":"License > Type_IDX__"}}{code} > - > {code}{"class":"org.apache.spark.ml.feature.StringIndexerModel","timestamp":1491333201068,"sparkVersion":"2.1.0","uid":"strIdx_25b6cbd02751","paramMap":{"inputCol":"Violation_CLEANED__","handleInvalid":"skip","outputCol":"Violation_IDX__"}}{code} > - > {code}{"class":"org.apache.spark.ml.feature.StringIndexerModel","timestamp":1491333201282,"sparkVersion":"2.1.0","uid":"strIdx_aa12df0354d9","paramMap":{"handleInvalid":"skip","inputCol":"County_CLEANED__","outputCol":"County_IDX__"}}{code} > - > {code}{"class":"org.apache.spark.ml.feature.StringIndexerModel","timestamp":1491333201552,"sparkVersion":"2.1.0","uid":"strIdx_babb120f3cc1","paramMap":{"handleInvalid":"skip","outputCol":"Issuing > Agency_IDX__","inputCol":"Issuing Agency_CLEANED__"}}{code} > - > {code}{"class":"org.apache.spark.ml.feature.StringIndexerModel","timestamp":1491333201759,"sparkVersion":"2.1.0","uid":"strIdx_5f2de9d9542d","paramMap":{"handleInvalid":"skip","outputCol":"Violation > Status_IDX__","inputCol":"Violation Status_CLEANED__"}}{code} > - > {code}{"class":"org.apache.spark.ml.feature.Bucketizer","timestamp":1491333201987,"sparkVersion":"2.1.0", > "uid":"bucketizer_6f65ca9fa813", > "paramMap":{ > "outputCol":"Summons > Number_BINNED__","handleInvalid":"keep","splits":["-Inf",1.386630656E9,3.696078592E9,4.005258752E9,6.045063168E9,8.136507392E9,"Inf"],"inputCol":"Summons > Number_CLEANED__" >} >}{code} > - > {code}{"class":"org.apache.spark.ml.feature.Bucketizer","timestamp":1491333202079,"sparkVersion":"2.1.0", > "uid":"bucketizer_f5db4fb8120e", > "paramMap":{ > > "splits":["-Inf",1.435215616E9,1.443855616E9,1.447271936E9,1.448222464E9,1.448395264E9,1.448481536E9,1.448827136E9,1.449259264E9,1.449432064E9,1.449518336E9,"Inf"], > "handleInvalid":"keep","outputCol":"Issue
[jira] [Assigned] (SPARK-20217) Executor should not fail stage if killed task throws non-interrupted exception
[ https://issues.apache.org/jira/browse/SPARK-20217?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yin Huai reassigned SPARK-20217: Assignee: Eric Liang > Executor should not fail stage if killed task throws non-interrupted exception > -- > > Key: SPARK-20217 > URL: https://issues.apache.org/jira/browse/SPARK-20217 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.2.0 >Reporter: Eric Liang >Assignee: Eric Liang > Fix For: 2.2.0 > > > This is reproducible as follows. Run the following, and then use > SparkContext.killTaskAttempt to kill one of the tasks. The entire stage will > fail since we threw a RuntimeException instead of InterruptedException. > We should probably unconditionally return TaskKilled instead of TaskFailed if > the task was killed by the driver, regardless of the actual exception thrown. > {code} > spark.range(100).repartition(100).foreach { i => > try { > Thread.sleep(1000) > } catch { > case t: InterruptedException => > throw new RuntimeException(t) > } > } > {code} > Based on the code in TaskSetManager, I think this also affects kills of > speculative tasks. However, since the number of speculated tasks is few, and > usually you need to fail a task a few times before the stage is cancelled, > probably no-one noticed this in production. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-20217) Executor should not fail stage if killed task throws non-interrupted exception
[ https://issues.apache.org/jira/browse/SPARK-20217?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yin Huai resolved SPARK-20217. -- Resolution: Fixed Fix Version/s: 2.2.0 Issue resolved by pull request 17531 [https://github.com/apache/spark/pull/17531] > Executor should not fail stage if killed task throws non-interrupted exception > -- > > Key: SPARK-20217 > URL: https://issues.apache.org/jira/browse/SPARK-20217 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.2.0 >Reporter: Eric Liang > Fix For: 2.2.0 > > > This is reproducible as follows. Run the following, and then use > SparkContext.killTaskAttempt to kill one of the tasks. The entire stage will > fail since we threw a RuntimeException instead of InterruptedException. > We should probably unconditionally return TaskKilled instead of TaskFailed if > the task was killed by the driver, regardless of the actual exception thrown. > {code} > spark.range(100).repartition(100).foreach { i => > try { > Thread.sleep(1000) > } catch { > case t: InterruptedException => > throw new RuntimeException(t) > } > } > {code} > Based on the code in TaskSetManager, I think this also affects kills of > speculative tasks. However, since the number of speculated tasks is few, and > usually you need to fail a task a few times before the stage is cancelled, > probably no-one noticed this in production. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-20231) Refactor star schema code for the subsequent star join detection in CBO
[ https://issues.apache.org/jira/browse/SPARK-20231?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xiao Li resolved SPARK-20231. - Resolution: Fixed Assignee: Ioana Delaney Fix Version/s: 2.2.0 > Refactor star schema code for the subsequent star join detection in CBO > --- > > Key: SPARK-20231 > URL: https://issues.apache.org/jira/browse/SPARK-20231 > Project: Spark > Issue Type: Sub-task > Components: SQL >Affects Versions: 2.2.0 >Reporter: Ioana Delaney >Assignee: Ioana Delaney >Priority: Minor > Fix For: 2.2.0 > > > Decouples star schema code from the default join planning. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-20214) pyspark linalg _convert_to_vector should check for sorted indices
[ https://issues.apache.org/jira/browse/SPARK-20214?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Joseph K. Bradley updated SPARK-20214: -- Affects Version/s: 1.5.2 1.6.3 > pyspark linalg _convert_to_vector should check for sorted indices > - > > Key: SPARK-20214 > URL: https://issues.apache.org/jira/browse/SPARK-20214 > Project: Spark > Issue Type: Bug > Components: ML, MLlib, PySpark, Tests >Affects Versions: 1.5.2, 1.6.3, 2.0.2, 2.1.1, 2.2.0 >Reporter: Joseph K. Bradley >Assignee: Liang-Chi Hsieh > Fix For: 2.0.3, 2.1.1, 2.2.0 > > > I've seen a few failures of this line: > https://github.com/apache/spark/blame/402bf2a50ddd4039ff9f376b641bd18fffa54171/python/pyspark/mllib/tests.py#L847 > It converts a scipy.sparse.lil_matrix to a dok_matrix and then to a > pyspark.mllib.linalg.Vector. The failure happens in the conversion to a > vector and indicates that the dok_matrix is not returning its values in > sorted order. (Actually, the failure is in _convert_to_vector, which converts > the dok_matrix to a csc_matrix and then passes the CSC data to the MLlib > Vector constructor.) Here's the stack trace: > {code} > Traceback (most recent call last): > File "/home/jenkins/workspace/python/pyspark/mllib/tests.py", line 847, in > test_serialize > self.assertEqual(sv, _convert_to_vector(lil.todok())) > File "/home/jenkins/workspace/python/pyspark/mllib/linalg/__init__.py", > line 78, in _convert_to_vector > return SparseVector(l.shape[0], csc.indices, csc.data) > File "/home/jenkins/workspace/python/pyspark/mllib/linalg/__init__.py", > line 556, in __init__ > % (self.indices[i], self.indices[i + 1])) > TypeError: Indices 3 and 1 are not strictly increasing > {code} > This seems like a bug in _convert_to_vector, where we really should check > {{csc_matrix.has_sorted_indices}} first. > I haven't seen this bug in pyspark.ml.linalg, but it probably exists there > too. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-20214) pyspark linalg _convert_to_vector should check for sorted indices
[ https://issues.apache.org/jira/browse/SPARK-20214?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Joseph K. Bradley resolved SPARK-20214. --- Resolution: Fixed Fix Version/s: 2.2.0 2.0.3 2.1.1 Issue resolved by pull request 17532 [https://github.com/apache/spark/pull/17532] > pyspark linalg _convert_to_vector should check for sorted indices > - > > Key: SPARK-20214 > URL: https://issues.apache.org/jira/browse/SPARK-20214 > Project: Spark > Issue Type: Bug > Components: ML, MLlib, PySpark, Tests >Affects Versions: 2.0.2, 2.1.1, 2.2.0 >Reporter: Joseph K. Bradley >Assignee: Liang-Chi Hsieh > Fix For: 2.1.1, 2.0.3, 2.2.0 > > > I've seen a few failures of this line: > https://github.com/apache/spark/blame/402bf2a50ddd4039ff9f376b641bd18fffa54171/python/pyspark/mllib/tests.py#L847 > It converts a scipy.sparse.lil_matrix to a dok_matrix and then to a > pyspark.mllib.linalg.Vector. The failure happens in the conversion to a > vector and indicates that the dok_matrix is not returning its values in > sorted order. (Actually, the failure is in _convert_to_vector, which converts > the dok_matrix to a csc_matrix and then passes the CSC data to the MLlib > Vector constructor.) Here's the stack trace: > {code} > Traceback (most recent call last): > File "/home/jenkins/workspace/python/pyspark/mllib/tests.py", line 847, in > test_serialize > self.assertEqual(sv, _convert_to_vector(lil.todok())) > File "/home/jenkins/workspace/python/pyspark/mllib/linalg/__init__.py", > line 78, in _convert_to_vector > return SparseVector(l.shape[0], csc.indices, csc.data) > File "/home/jenkins/workspace/python/pyspark/mllib/linalg/__init__.py", > line 556, in __init__ > % (self.indices[i], self.indices[i + 1])) > TypeError: Indices 3 and 1 are not strictly increasing > {code} > This seems like a bug in _convert_to_vector, where we really should check > {{csc_matrix.has_sorted_indices}} first. > I haven't seen this bug in pyspark.ml.linalg, but it probably exists there > too. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-20234) Improve Framework for Basic ML Tests
Bryan Cutler created SPARK-20234: Summary: Improve Framework for Basic ML Tests Key: SPARK-20234 URL: https://issues.apache.org/jira/browse/SPARK-20234 Project: Spark Issue Type: Improvement Components: ML, Tests Affects Versions: 2.1.0 Reporter: Bryan Cutler Recently, I came across a few simple ML bugs that should have been caught by basic checks, but there was a lack of coverage because the existing checks are just tacked on to other tests and easy to forget to add to new test suites. I think it's worth spending some time to making these checks more standardized and easy to bring into new test suites so they are not overlooked - possibly by using a trait. The basic checks I'm referring to are: * parameter checks * uid checks * model/estimator copies * model/estimator persistence -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-20234) Improve Framework for Basic ML Tests
[ https://issues.apache.org/jira/browse/SPARK-20234?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15958077#comment-15958077 ] Bryan Cutler commented on SPARK-20234: -- I can work on this > Improve Framework for Basic ML Tests > > > Key: SPARK-20234 > URL: https://issues.apache.org/jira/browse/SPARK-20234 > Project: Spark > Issue Type: Improvement > Components: ML, Tests >Affects Versions: 2.1.0 >Reporter: Bryan Cutler > > Recently, I came across a few simple ML bugs that should have been caught by > basic checks, but there was a lack of coverage because the existing checks > are just tacked on to other tests and easy to forget to add to new test > suites. > I think it's worth spending some time to making these checks more > standardized and easy to bring into new test suites so they are not > overlooked - possibly by using a trait. > The basic checks I'm referring to are: > * parameter checks > * uid checks > * model/estimator copies > * model/estimator persistence -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-20224) Update apache docs
[ https://issues.apache.org/jira/browse/SPARK-20224?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tathagata Das resolved SPARK-20224. --- Resolution: Fixed Issue resolved by pull request 17539 [https://github.com/apache/spark/pull/17539] > Update apache docs > -- > > Key: SPARK-20224 > URL: https://issues.apache.org/jira/browse/SPARK-20224 > Project: Spark > Issue Type: Sub-task > Components: Structured Streaming >Affects Versions: 2.2.0 >Reporter: Tathagata Das >Assignee: Tathagata Das > Fix For: 2.2.0 > > -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-20233) Apply star-join filter heuristics to dynamic programming join enumeration
Ioana Delaney created SPARK-20233: - Summary: Apply star-join filter heuristics to dynamic programming join enumeration Key: SPARK-20233 URL: https://issues.apache.org/jira/browse/SPARK-20233 Project: Spark Issue Type: Sub-task Components: SQL Affects Versions: 2.2.0 Reporter: Ioana Delaney Priority: Critical This JIRA integrates star-join detection with the cost-based optimizer. The join enumeration using dynamic programming generates a set of feasible joins. The sub-optimal plans can be eliminated by a sequence of independent, optional filters. The optional filters include heuristics for reducing the search space. For example, # Star-join: Tables in a star schema relationship are planned together since they are assumed to have an optimal execution. # Cartesian products: Cartesian products are deferred as late as possible to avoid large intermediate results (expanding joins, in general). # Composite inners: “Bushy tree” plans are not generated to avoid materializing intermediate result. For reference, see “Measuring the Complexity of Join Enumeration in Query Optimization” by Ono et al. This JIRA implements the star join filter. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-20202) Remove references to org.spark-project.hive
[ https://issues.apache.org/jira/browse/SPARK-20202?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15957843#comment-15957843 ] Reynold Xin commented on SPARK-20202: - I've created a ticket on the Hive side to publish 1.2.x: https://issues.apache.org/jira/browse/HIVE-16391 Until that is resolved, I also wonder if there are other things we should do. For example, vote on the current fork to rectify it? > Remove references to org.spark-project.hive > --- > > Key: SPARK-20202 > URL: https://issues.apache.org/jira/browse/SPARK-20202 > Project: Spark > Issue Type: Bug > Components: Build, SQL >Affects Versions: 1.6.4, 2.0.3, 2.1.1 >Reporter: Owen O'Malley > > Spark can't continue to depend on their fork of Hive and must move to > standard Hive versions. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-20202) Remove references to org.spark-project.hive
[ https://issues.apache.org/jira/browse/SPARK-20202?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15957811#comment-15957811 ] Reynold Xin commented on SPARK-20202: - Yes this is really important. The proper way to do this is to publish a proper version of Hive with the right dependency declared (rather than including all the dependencies in a uber jar). Looks like there are broad support to do this. I'm going to create a JIRA ticket on Hive and add a dependency on this. This ticket will depend on that. > Remove references to org.spark-project.hive > --- > > Key: SPARK-20202 > URL: https://issues.apache.org/jira/browse/SPARK-20202 > Project: Spark > Issue Type: Bug > Components: Build, SQL >Affects Versions: 1.6.4, 2.0.3, 2.1.1 >Reporter: Owen O'Malley > > Spark can't continue to depend on their fork of Hive and must move to > standard Hive versions. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-20202) Remove references to org.spark-project.hive
[ https://issues.apache.org/jira/browse/SPARK-20202?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Reynold Xin updated SPARK-20202: Priority: Major (was: Blocker) > Remove references to org.spark-project.hive > --- > > Key: SPARK-20202 > URL: https://issues.apache.org/jira/browse/SPARK-20202 > Project: Spark > Issue Type: Bug > Components: Build, SQL >Affects Versions: 1.6.4, 2.0.3, 2.1.1 >Reporter: Owen O'Malley > > Spark can't continue to depend on their fork of Hive and must move to > standard Hive versions. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-20226) Call to sqlContext.cacheTable takes an incredibly long time in some cases
[ https://issues.apache.org/jira/browse/SPARK-20226?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Barry Becker updated SPARK-20226: - Attachment: profile_indexer2.PNG A snapshot of the hotspot sampler from JVisualVM while cacheTable was running. > Call to sqlContext.cacheTable takes an incredibly long time in some cases > - > > Key: SPARK-20226 > URL: https://issues.apache.org/jira/browse/SPARK-20226 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.1.0 > Environment: linux or windows >Reporter: Barry Becker > Labels: cache > Attachments: profile_indexer2.PNG, xyzzy.csv > > > I have a case where the call to sqlContext.cacheTable can take an arbitrarily > long time depending on the number of columns that are referenced in a > withColumn expression applied to a dataframe. > The dataset is small (20 columns 7861 rows). The sequence to reproduce is the > following: > 1) add a new column that references 8 - 14 of the columns in the dataset. >- If I add 8 columns, then the call to cacheTable is fast - like *5 > seconds* >- If I add 11 columns, then it is slow - like *60 seconds* >- and if I add 14 columns, then it basically *takes forever* - I gave up > after 10 minutes or so. > The Column expression that is added, is basically just concatenating > the columns together in a single string. If a number is concatenated on a > string (or vice versa) the number is first converted to a string. > The expression looks something like this: > {code} > `Plate` + `State` + `License Type` + `Summons Number` + `Issue Date` + > `Violation Time` + `Violation` + `Judgment Entry Date` + `Fine Amount` + > `Penalty Amount` + `Interest Amount` > {code} > which we then convert to a Column expression that looks like this: > {code} > UDF(UDF(UDF(UDF(UDF(UDF(UDF(UDF(UDF(UDF('Plate, 'State), 'License Type), > UDF('Summons Number)), UDF('Issue Date)), 'Violation Time), 'Violation), > UDF('Judgment Entry Date)), UDF('Fine Amount)), UDF('Penalty Amount)), > UDF('Interest Amount)) > {code} >where the UDFs are very simple functions that basically call toString > and + as needed. > 2) apply a pipeline that includes some transformers that was saved earlier. > Here are the steps of the pipeline (extracted from parquet) > - > {code}{"class":"org.apache.spark.ml.feature.StringIndexerModel","timestamp":1491333200603,"sparkVersion":"2.1.0","uid":"strIdx_aeb04d2777cc","paramMap":{"handleInvalid":"skip","outputCol":"State_IDX__","inputCol":"State_CLEANED__"}}{code} > - > {code}{"class":"org.apache.spark.ml.feature.StringIndexerModel","timestamp":1491333200837,"sparkVersion":"2.1.0","uid":"strIdx_0164c4c13979","paramMap":{"inputCol":"License > Type_CLEANED__","handleInvalid":"skip","outputCol":"License > Type_IDX__"}}{code} > - > {code}{"class":"org.apache.spark.ml.feature.StringIndexerModel","timestamp":1491333201068,"sparkVersion":"2.1.0","uid":"strIdx_25b6cbd02751","paramMap":{"inputCol":"Violation_CLEANED__","handleInvalid":"skip","outputCol":"Violation_IDX__"}}{code} > - > {code}{"class":"org.apache.spark.ml.feature.StringIndexerModel","timestamp":1491333201282,"sparkVersion":"2.1.0","uid":"strIdx_aa12df0354d9","paramMap":{"handleInvalid":"skip","inputCol":"County_CLEANED__","outputCol":"County_IDX__"}}{code} > - > {code}{"class":"org.apache.spark.ml.feature.StringIndexerModel","timestamp":1491333201552,"sparkVersion":"2.1.0","uid":"strIdx_babb120f3cc1","paramMap":{"handleInvalid":"skip","outputCol":"Issuing > Agency_IDX__","inputCol":"Issuing Agency_CLEANED__"}}{code} > - > {code}{"class":"org.apache.spark.ml.feature.StringIndexerModel","timestamp":1491333201759,"sparkVersion":"2.1.0","uid":"strIdx_5f2de9d9542d","paramMap":{"handleInvalid":"skip","outputCol":"Violation > Status_IDX__","inputCol":"Violation Status_CLEANED__"}}{code} > - > {code}{"class":"org.apache.spark.ml.feature.Bucketizer","timestamp":1491333201987,"sparkVersion":"2.1.0", > "uid":"bucketizer_6f65ca9fa813", > "paramMap":{ > "outputCol":"Summons > Number_BINNED__","handleInvalid":"keep","splits":["-Inf",1.386630656E9,3.696078592E9,4.005258752E9,6.045063168E9,8.136507392E9,"Inf"],"inputCol":"Summons > Number_CLEANED__" >} >}{code} > - > {code}{"class":"org.apache.spark.ml.feature.Bucketizer","timestamp":1491333202079,"sparkVersion":"2.1.0", > "uid":"bucketizer_f5db4fb8120e", > "paramMap":{ > > "splits":["-Inf",1.435215616E9,1.443855616E9,1.447271936E9,1.448222464E9,1.448395264E9,1.448481536E9,1.448827136E9,1.449259264E9,1.449432064E9,1.449518336E9,"Inf"], > "handleInvalid":"keep","outputCol":"Issue > Date_BINNED__","inputCol":"Issue Date_CLEANED__" >} >}{code}
[jira] [Commented] (SPARK-20226) Call to sqlContext.cacheTable takes an incredibly long time in some cases
[ https://issues.apache.org/jira/browse/SPARK-20226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15957732#comment-15957732 ] Barry Becker commented on SPARK-20226: -- I did some profiling using the sampler in JVisualVM and took some threaddumps at different points while it was running. The main hotspot indicated by the sampler is scala.collection.GenSeqLike$class.equals at the lowest level. The threaddumps show that that is what is being called at the bottom of the stack. Below are 2 representative thread dumps that I get while its processing: {code} "main-ScalaTest-running-ApplyModelSuite@1" prio=5 tid=0x1 nid=NA runnable java.lang.Thread.State: RUNNABLE at scala.collection.GenSeqLike$class.equals(GenSeqLike.scala:474) at scala.collection.AbstractSeq.equals(Seq.scala:41) at org.apache.spark.sql.catalyst.expressions.ScalaUDF.equals(ScalaUDF.scala:39) at scala.collection.IndexedSeqOptimized$class.sameElements(IndexedSeqOptimized.scala:167) at scala.collection.mutable.ArrayBuffer.sameElements(ArrayBuffer.scala:48) at scala.collection.GenSeqLike$class.equals(GenSeqLike.scala:474) at scala.collection.AbstractSeq.equals(Seq.scala:41) at org.apache.spark.sql.catalyst.expressions.ScalaUDF.equals(ScalaUDF.scala:39) at scala.collection.IndexedSeqOptimized$class.sameElements(IndexedSeqOptimized.scala:167) at scala.collection.mutable.ArrayBuffer.sameElements(ArrayBuffer.scala:48) at scala.collection.GenSeqLike$class.equals(GenSeqLike.scala:474) at scala.collection.AbstractSeq.equals(Seq.scala:41) at org.apache.spark.sql.catalyst.expressions.ScalaUDF.equals(ScalaUDF.scala:39) at scala.collection.IndexedSeqOptimized$class.sameElements(IndexedSeqOptimized.scala:167) at scala.collection.mutable.ArrayBuffer.sameElements(ArrayBuffer.scala:48) at scala.collection.GenSeqLike$class.equals(GenSeqLike.scala:474) at scala.collection.AbstractSeq.equals(Seq.scala:41) at org.apache.spark.sql.catalyst.expressions.ScalaUDF.equals(ScalaUDF.scala:39) at org.apache.spark.sql.catalyst.expressions.Cast.equals(Cast.scala:123) at org.apache.spark.sql.catalyst.expressions.EqualNullSafe.equals(predicates.scala:446) at scala.collection.mutable.FlatHashTable$class.addEntry(FlatHashTable.scala:151) at scala.collection.mutable.HashSet.addEntry(HashSet.scala:40) at scala.collection.mutable.FlatHashTable$class.growTable(FlatHashTable.scala:225) at scala.collection.mutable.FlatHashTable$class.addEntry(FlatHashTable.scala:159) at scala.collection.mutable.HashSet.addEntry(HashSet.scala:40) at scala.collection.mutable.FlatHashTable$class.addElem(FlatHashTable.scala:139) at scala.collection.mutable.HashSet.addElem(HashSet.scala:40) at scala.collection.mutable.HashSet.$plus$eq(HashSet.scala:59) at scala.collection.mutable.HashSet.$plus$eq(HashSet.scala:40) at scala.collection.generic.Growable$$anonfun$$plus$plus$eq$1.apply(Growable.scala:59) at scala.collection.generic.Growable$$anonfun$$plus$plus$eq$1.apply(Growable.scala:59) at scala.collection.mutable.HashSet.foreach(HashSet.scala:78) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59) at scala.collection.mutable.AbstractSet.$plus$plus$eq(Set.scala:46) at scala.collection.mutable.HashSet.clone(HashSet.scala:83) at scala.collection.mutable.HashSet.clone(HashSet.scala:40) at org.apache.spark.sql.catalyst.expressions.ExpressionSet.$plus(ExpressionSet.scala:65) at org.apache.spark.sql.catalyst.expressions.ExpressionSet.$plus(ExpressionSet.scala:50) at scala.collection.SetLike$$anonfun$$plus$plus$1.apply(SetLike.scala:141) at scala.collection.SetLike$$anonfun$$plus$plus$1.apply(SetLike.scala:141) at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:142) at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:142) at scala.collection.immutable.HashSet$HashSet1.foreach(HashSet.scala:322) at scala.collection.immutable.HashSet$HashTrieSet.foreach(HashSet.scala:978) at scala.collection.immutable.HashSet$HashTrieSet.foreach(HashSet.scala:978) at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:142) at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) at scala.collection.TraversableOnce$class.$div$colon(TraversableOnce.scala:136) at scala.collection.AbstractTraversable.$div$colon(Traversable.scala:104) at scala.collection.SetLike$class.$plus$plus(SetLike.scala:141) at
[jira] [Commented] (SPARK-20232) Better combineByKey documentation: clarify memory allocation, better example
[ https://issues.apache.org/jira/browse/SPARK-20232?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15957537#comment-15957537 ] Apache Spark commented on SPARK-20232: -- User 'dgingrich' has created a pull request for this issue: https://github.com/apache/spark/pull/17545 > Better combineByKey documentation: clarify memory allocation, better example > > > Key: SPARK-20232 > URL: https://issues.apache.org/jira/browse/SPARK-20232 > Project: Spark > Issue Type: Improvement > Components: Documentation >Affects Versions: 2.1.0 > Environment: macOS Sierra 10.12.4 > Spark 2.1.0 installed via Homebrew >Reporter: David Gingrich >Priority: Trivial > > combineByKey docs has a few flaws: > - Doesn't include note about memory allocation (on aggregateBykey) > - Example doesn't show difference between mergeValue and mergeCombiners (both > are add) > I have a trivial patch, will attach momentarily. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-20232) Better combineByKey documentation: clarify memory allocation, better example
[ https://issues.apache.org/jira/browse/SPARK-20232?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-20232: Assignee: Apache Spark > Better combineByKey documentation: clarify memory allocation, better example > > > Key: SPARK-20232 > URL: https://issues.apache.org/jira/browse/SPARK-20232 > Project: Spark > Issue Type: Improvement > Components: Documentation >Affects Versions: 2.1.0 > Environment: macOS Sierra 10.12.4 > Spark 2.1.0 installed via Homebrew >Reporter: David Gingrich >Assignee: Apache Spark >Priority: Trivial > > combineByKey docs has a few flaws: > - Doesn't include note about memory allocation (on aggregateBykey) > - Example doesn't show difference between mergeValue and mergeCombiners (both > are add) > I have a trivial patch, will attach momentarily. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-20232) Better combineByKey documentation: clarify memory allocation, better example
[ https://issues.apache.org/jira/browse/SPARK-20232?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-20232: Assignee: (was: Apache Spark) > Better combineByKey documentation: clarify memory allocation, better example > > > Key: SPARK-20232 > URL: https://issues.apache.org/jira/browse/SPARK-20232 > Project: Spark > Issue Type: Improvement > Components: Documentation >Affects Versions: 2.1.0 > Environment: macOS Sierra 10.12.4 > Spark 2.1.0 installed via Homebrew >Reporter: David Gingrich >Priority: Trivial > > combineByKey docs has a few flaws: > - Doesn't include note about memory allocation (on aggregateBykey) > - Example doesn't show difference between mergeValue and mergeCombiners (both > are add) > I have a trivial patch, will attach momentarily. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-20228) Random Forest instable results depending on spark.executor.memory
[ https://issues.apache.org/jira/browse/SPARK-20228?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15957530#comment-15957530 ] Sean Owen commented on SPARK-20228: --- This alone wouldn't matter directly, but could certainly matter indirectly. More resource might mean you can run faster, and build better trees in the same time. More memory might mean you run fewer executors and the way the trees are built might actually benefit from that. There is a maxMemoryMB parameter you might be increasing which allows more splits to be evaluated. This isn't enough info and isn't evidence of a problem. > Random Forest instable results depending on spark.executor.memory > - > > Key: SPARK-20228 > URL: https://issues.apache.org/jira/browse/SPARK-20228 > Project: Spark > Issue Type: Bug > Components: PySpark >Affects Versions: 2.1.0 >Reporter: Ansgar Schulze > > If I deploy a random forrest modeling with example > spark.executor.memory20480M > I got another result as if i depoy the modeling with > spark.executor.memory6000M > I excpected the same results but different runtimes. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-20231) Refactor star schema code for the subsequent star join detection in CBO
[ https://issues.apache.org/jira/browse/SPARK-20231?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-20231: Assignee: Apache Spark > Refactor star schema code for the subsequent star join detection in CBO > --- > > Key: SPARK-20231 > URL: https://issues.apache.org/jira/browse/SPARK-20231 > Project: Spark > Issue Type: Sub-task > Components: SQL >Affects Versions: 2.2.0 >Reporter: Ioana Delaney >Assignee: Apache Spark >Priority: Minor > > Decouples star schema code from the default join planning. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-20231) Refactor star schema code for the subsequent star join detection in CBO
[ https://issues.apache.org/jira/browse/SPARK-20231?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-20231: Assignee: (was: Apache Spark) > Refactor star schema code for the subsequent star join detection in CBO > --- > > Key: SPARK-20231 > URL: https://issues.apache.org/jira/browse/SPARK-20231 > Project: Spark > Issue Type: Sub-task > Components: SQL >Affects Versions: 2.2.0 >Reporter: Ioana Delaney >Priority: Minor > > Decouples star schema code from the default join planning. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-20231) Refactor star schema code for the subsequent star join detection in CBO
[ https://issues.apache.org/jira/browse/SPARK-20231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15957523#comment-15957523 ] Apache Spark commented on SPARK-20231: -- User 'ioana-delaney' has created a pull request for this issue: https://github.com/apache/spark/pull/17544 > Refactor star schema code for the subsequent star join detection in CBO > --- > > Key: SPARK-20231 > URL: https://issues.apache.org/jira/browse/SPARK-20231 > Project: Spark > Issue Type: Sub-task > Components: SQL >Affects Versions: 2.2.0 >Reporter: Ioana Delaney >Priority: Minor > > Decouples star schema code from the default join planning. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-20232) Better combineByKey documentation: clarify memory allocation, better example
David Gingrich created SPARK-20232: -- Summary: Better combineByKey documentation: clarify memory allocation, better example Key: SPARK-20232 URL: https://issues.apache.org/jira/browse/SPARK-20232 Project: Spark Issue Type: Improvement Components: Documentation Affects Versions: 2.1.0 Environment: macOS Sierra 10.12.4 Spark 2.1.0 installed via Homebrew Reporter: David Gingrich Priority: Trivial combineByKey docs has a few flaws: - Doesn't include note about memory allocation (on aggregateBykey) - Example doesn't show difference between mergeValue and mergeCombiners (both are add) I have a trivial patch, will attach momentarily. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-20226) Call to sqlContext.cacheTable takes an incredibly long time in some cases
[ https://issues.apache.org/jira/browse/SPARK-20226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15957496#comment-15957496 ] Sean Owen commented on SPARK-20226: --- It could just look that way because caching means evaluating, and evaluating takes the time. It seems like you've shown it's all due to time in the StringIndexerModels, no? I think you'd be best off finding out where it's spending time. Calling {{kill -QUIT (pid)}} a couple times on an executor is a good poor man's way to see where the threads are spending time. (Real profiling isn't hard either if you have something like JProfiler) > Call to sqlContext.cacheTable takes an incredibly long time in some cases > - > > Key: SPARK-20226 > URL: https://issues.apache.org/jira/browse/SPARK-20226 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.1.0 > Environment: linux or windows >Reporter: Barry Becker > Labels: cache > Attachments: xyzzy.csv > > > I have a case where the call to sqlContext.cacheTable can take an arbitrarily > long time depending on the number of columns that are referenced in a > withColumn expression applied to a dataframe. > The dataset is small (20 columns 7861 rows). The sequence to reproduce is the > following: > 1) add a new column that references 8 - 14 of the columns in the dataset. >- If I add 8 columns, then the call to cacheTable is fast - like *5 > seconds* >- If I add 11 columns, then it is slow - like *60 seconds* >- and if I add 14 columns, then it basically *takes forever* - I gave up > after 10 minutes or so. > The Column expression that is added, is basically just concatenating > the columns together in a single string. If a number is concatenated on a > string (or vice versa) the number is first converted to a string. > The expression looks something like this: > {code} > `Plate` + `State` + `License Type` + `Summons Number` + `Issue Date` + > `Violation Time` + `Violation` + `Judgment Entry Date` + `Fine Amount` + > `Penalty Amount` + `Interest Amount` > {code} > which we then convert to a Column expression that looks like this: > {code} > UDF(UDF(UDF(UDF(UDF(UDF(UDF(UDF(UDF(UDF('Plate, 'State), 'License Type), > UDF('Summons Number)), UDF('Issue Date)), 'Violation Time), 'Violation), > UDF('Judgment Entry Date)), UDF('Fine Amount)), UDF('Penalty Amount)), > UDF('Interest Amount)) > {code} >where the UDFs are very simple functions that basically call toString > and + as needed. > 2) apply a pipeline that includes some transformers that was saved earlier. > Here are the steps of the pipeline (extracted from parquet) > - > {code}{"class":"org.apache.spark.ml.feature.StringIndexerModel","timestamp":1491333200603,"sparkVersion":"2.1.0","uid":"strIdx_aeb04d2777cc","paramMap":{"handleInvalid":"skip","outputCol":"State_IDX__","inputCol":"State_CLEANED__"}}{code} > - > {code}{"class":"org.apache.spark.ml.feature.StringIndexerModel","timestamp":1491333200837,"sparkVersion":"2.1.0","uid":"strIdx_0164c4c13979","paramMap":{"inputCol":"License > Type_CLEANED__","handleInvalid":"skip","outputCol":"License > Type_IDX__"}}{code} > - > {code}{"class":"org.apache.spark.ml.feature.StringIndexerModel","timestamp":1491333201068,"sparkVersion":"2.1.0","uid":"strIdx_25b6cbd02751","paramMap":{"inputCol":"Violation_CLEANED__","handleInvalid":"skip","outputCol":"Violation_IDX__"}}{code} > - > {code}{"class":"org.apache.spark.ml.feature.StringIndexerModel","timestamp":1491333201282,"sparkVersion":"2.1.0","uid":"strIdx_aa12df0354d9","paramMap":{"handleInvalid":"skip","inputCol":"County_CLEANED__","outputCol":"County_IDX__"}}{code} > - > {code}{"class":"org.apache.spark.ml.feature.StringIndexerModel","timestamp":1491333201552,"sparkVersion":"2.1.0","uid":"strIdx_babb120f3cc1","paramMap":{"handleInvalid":"skip","outputCol":"Issuing > Agency_IDX__","inputCol":"Issuing Agency_CLEANED__"}}{code} > - > {code}{"class":"org.apache.spark.ml.feature.StringIndexerModel","timestamp":1491333201759,"sparkVersion":"2.1.0","uid":"strIdx_5f2de9d9542d","paramMap":{"handleInvalid":"skip","outputCol":"Violation > Status_IDX__","inputCol":"Violation Status_CLEANED__"}}{code} > - > {code}{"class":"org.apache.spark.ml.feature.Bucketizer","timestamp":1491333201987,"sparkVersion":"2.1.0", > "uid":"bucketizer_6f65ca9fa813", > "paramMap":{ > "outputCol":"Summons > Number_BINNED__","handleInvalid":"keep","splits":["-Inf",1.386630656E9,3.696078592E9,4.005258752E9,6.045063168E9,8.136507392E9,"Inf"],"inputCol":"Summons > Number_CLEANED__" >} >}{code} > - > {code}{"class":"org.apache.spark.ml.feature.Bucketizer","timestamp":1491333202079,"sparkVersion":"2.1.0", > "uid":
[jira] [Commented] (SPARK-20226) Call to sqlContext.cacheTable takes an incredibly long time in some cases
[ https://issues.apache.org/jira/browse/SPARK-20226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15957489#comment-15957489 ] Barry Becker commented on SPARK-20226: -- I thought the problem was in the cacheTable call because that is where all the time was spent. Do you think the problem is with the generated query plan or something else? Any hints or tips you could provide would be appreciated. > Call to sqlContext.cacheTable takes an incredibly long time in some cases > - > > Key: SPARK-20226 > URL: https://issues.apache.org/jira/browse/SPARK-20226 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.1.0 > Environment: linux or windows >Reporter: Barry Becker > Labels: cache > Attachments: xyzzy.csv > > > I have a case where the call to sqlContext.cacheTable can take an arbitrarily > long time depending on the number of columns that are referenced in a > withColumn expression applied to a dataframe. > The dataset is small (20 columns 7861 rows). The sequence to reproduce is the > following: > 1) add a new column that references 8 - 14 of the columns in the dataset. >- If I add 8 columns, then the call to cacheTable is fast - like *5 > seconds* >- If I add 11 columns, then it is slow - like *60 seconds* >- and if I add 14 columns, then it basically *takes forever* - I gave up > after 10 minutes or so. > The Column expression that is added, is basically just concatenating > the columns together in a single string. If a number is concatenated on a > string (or vice versa) the number is first converted to a string. > The expression looks something like this: > {code} > `Plate` + `State` + `License Type` + `Summons Number` + `Issue Date` + > `Violation Time` + `Violation` + `Judgment Entry Date` + `Fine Amount` + > `Penalty Amount` + `Interest Amount` > {code} > which we then convert to a Column expression that looks like this: > {code} > UDF(UDF(UDF(UDF(UDF(UDF(UDF(UDF(UDF(UDF('Plate, 'State), 'License Type), > UDF('Summons Number)), UDF('Issue Date)), 'Violation Time), 'Violation), > UDF('Judgment Entry Date)), UDF('Fine Amount)), UDF('Penalty Amount)), > UDF('Interest Amount)) > {code} >where the UDFs are very simple functions that basically call toString > and + as needed. > 2) apply a pipeline that includes some transformers that was saved earlier. > Here are the steps of the pipeline (extracted from parquet) > - > {code}{"class":"org.apache.spark.ml.feature.StringIndexerModel","timestamp":1491333200603,"sparkVersion":"2.1.0","uid":"strIdx_aeb04d2777cc","paramMap":{"handleInvalid":"skip","outputCol":"State_IDX__","inputCol":"State_CLEANED__"}}{code} > - > {code}{"class":"org.apache.spark.ml.feature.StringIndexerModel","timestamp":1491333200837,"sparkVersion":"2.1.0","uid":"strIdx_0164c4c13979","paramMap":{"inputCol":"License > Type_CLEANED__","handleInvalid":"skip","outputCol":"License > Type_IDX__"}}{code} > - > {code}{"class":"org.apache.spark.ml.feature.StringIndexerModel","timestamp":1491333201068,"sparkVersion":"2.1.0","uid":"strIdx_25b6cbd02751","paramMap":{"inputCol":"Violation_CLEANED__","handleInvalid":"skip","outputCol":"Violation_IDX__"}}{code} > - > {code}{"class":"org.apache.spark.ml.feature.StringIndexerModel","timestamp":1491333201282,"sparkVersion":"2.1.0","uid":"strIdx_aa12df0354d9","paramMap":{"handleInvalid":"skip","inputCol":"County_CLEANED__","outputCol":"County_IDX__"}}{code} > - > {code}{"class":"org.apache.spark.ml.feature.StringIndexerModel","timestamp":1491333201552,"sparkVersion":"2.1.0","uid":"strIdx_babb120f3cc1","paramMap":{"handleInvalid":"skip","outputCol":"Issuing > Agency_IDX__","inputCol":"Issuing Agency_CLEANED__"}}{code} > - > {code}{"class":"org.apache.spark.ml.feature.StringIndexerModel","timestamp":1491333201759,"sparkVersion":"2.1.0","uid":"strIdx_5f2de9d9542d","paramMap":{"handleInvalid":"skip","outputCol":"Violation > Status_IDX__","inputCol":"Violation Status_CLEANED__"}}{code} > - > {code}{"class":"org.apache.spark.ml.feature.Bucketizer","timestamp":1491333201987,"sparkVersion":"2.1.0", > "uid":"bucketizer_6f65ca9fa813", > "paramMap":{ > "outputCol":"Summons > Number_BINNED__","handleInvalid":"keep","splits":["-Inf",1.386630656E9,3.696078592E9,4.005258752E9,6.045063168E9,8.136507392E9,"Inf"],"inputCol":"Summons > Number_CLEANED__" >} >}{code} > - > {code}{"class":"org.apache.spark.ml.feature.Bucketizer","timestamp":1491333202079,"sparkVersion":"2.1.0", > "uid":"bucketizer_f5db4fb8120e", > "paramMap":{ > > "splits":["-Inf",1.435215616E9,1.443855616E9,1.447271936E9,1.448222464E9,1.448395264E9,1.448481536E9,1.448827136E9,1.449259264E9,1.449432064E9,1
[jira] [Created] (SPARK-20231) Refactor star schema code for the subsequent star join detection in CBO
Ioana Delaney created SPARK-20231: - Summary: Refactor star schema code for the subsequent star join detection in CBO Key: SPARK-20231 URL: https://issues.apache.org/jira/browse/SPARK-20231 Project: Spark Issue Type: Sub-task Components: SQL Affects Versions: 2.2.0 Reporter: Ioana Delaney Priority: Minor Decouples star schema code from the default join planning. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-20230) FetchFailedExceptions should invalidate file caches in MapOutputTracker even if newer stages are launched
[ https://issues.apache.org/jira/browse/SPARK-20230?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-20230: Assignee: (was: Apache Spark) > FetchFailedExceptions should invalidate file caches in MapOutputTracker even > if newer stages are launched > - > > Key: SPARK-20230 > URL: https://issues.apache.org/jira/browse/SPARK-20230 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.1.0 >Reporter: Burak Yavuz > > If you lose instances that have shuffle outputs, you will start observing > messages like: > {code} > 17/03/24 11:49:23 WARN TaskSetManager: Lost task 0.0 in stage 64.1 (TID 3849, > 172.128.196.240, executor 0): FetchFailed(BlockManagerId(4, 172.128.200.157, > 4048, None), shuffleId=16, mapId=2, reduceId=3, message= > {code} > Generally, these messages are followed by: > {code} > 17/03/24 11:49:23 INFO DAGScheduler: Executor lost: 4 (epoch 20) > 17/03/24 11:49:23 INFO BlockManagerMasterEndpoint: Trying to remove executor > 4 from BlockManagerMaster. > 17/03/24 11:49:23 INFO BlockManagerMaster: Removed 4 successfully in > removeExecutor > 17/03/24 11:49:23 INFO DAGScheduler: Shuffle files lost for executor: 4 > (epoch 20) > 17/03/24 11:49:23 INFO ShuffleMapStage: ShuffleMapStage 63 is now unavailable > on executor 4 (73/89, false) > {code} > which is great. Spark resubmits tasks for data that has been lost. However, > if you have cascading instance failures, then you may come across: > {code} > 17/03/24 11:48:39 INFO DAGScheduler: Ignoring fetch failure from > ResultTask(64, 46) as it's from ResultStage 64 attempt 0 and there is a more > recent attempt for that stage (attempt ID 1) running > {code} > which don't invalidate file outputs. In later retries of the stage, Spark > will attempt to access files on machines that don't exist anymore, and then > after 4 tries, Spark will give up. If it had not ignored the fetch failure, > and invalidated the cache, then most of the lost files could have been > computed during one of the previous retries. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-20230) FetchFailedExceptions should invalidate file caches in MapOutputTracker even if newer stages are launched
[ https://issues.apache.org/jira/browse/SPARK-20230?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-20230: Assignee: Apache Spark > FetchFailedExceptions should invalidate file caches in MapOutputTracker even > if newer stages are launched > - > > Key: SPARK-20230 > URL: https://issues.apache.org/jira/browse/SPARK-20230 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.1.0 >Reporter: Burak Yavuz >Assignee: Apache Spark > > If you lose instances that have shuffle outputs, you will start observing > messages like: > {code} > 17/03/24 11:49:23 WARN TaskSetManager: Lost task 0.0 in stage 64.1 (TID 3849, > 172.128.196.240, executor 0): FetchFailed(BlockManagerId(4, 172.128.200.157, > 4048, None), shuffleId=16, mapId=2, reduceId=3, message= > {code} > Generally, these messages are followed by: > {code} > 17/03/24 11:49:23 INFO DAGScheduler: Executor lost: 4 (epoch 20) > 17/03/24 11:49:23 INFO BlockManagerMasterEndpoint: Trying to remove executor > 4 from BlockManagerMaster. > 17/03/24 11:49:23 INFO BlockManagerMaster: Removed 4 successfully in > removeExecutor > 17/03/24 11:49:23 INFO DAGScheduler: Shuffle files lost for executor: 4 > (epoch 20) > 17/03/24 11:49:23 INFO ShuffleMapStage: ShuffleMapStage 63 is now unavailable > on executor 4 (73/89, false) > {code} > which is great. Spark resubmits tasks for data that has been lost. However, > if you have cascading instance failures, then you may come across: > {code} > 17/03/24 11:48:39 INFO DAGScheduler: Ignoring fetch failure from > ResultTask(64, 46) as it's from ResultStage 64 attempt 0 and there is a more > recent attempt for that stage (attempt ID 1) running > {code} > which don't invalidate file outputs. In later retries of the stage, Spark > will attempt to access files on machines that don't exist anymore, and then > after 4 tries, Spark will give up. If it had not ignored the fetch failure, > and invalidated the cache, then most of the lost files could have been > computed during one of the previous retries. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-20230) FetchFailedExceptions should invalidate file caches in MapOutputTracker even if newer stages are launched
[ https://issues.apache.org/jira/browse/SPARK-20230?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15957477#comment-15957477 ] Apache Spark commented on SPARK-20230: -- User 'brkyvz' has created a pull request for this issue: https://github.com/apache/spark/pull/17543 > FetchFailedExceptions should invalidate file caches in MapOutputTracker even > if newer stages are launched > - > > Key: SPARK-20230 > URL: https://issues.apache.org/jira/browse/SPARK-20230 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.1.0 >Reporter: Burak Yavuz > > If you lose instances that have shuffle outputs, you will start observing > messages like: > {code} > 17/03/24 11:49:23 WARN TaskSetManager: Lost task 0.0 in stage 64.1 (TID 3849, > 172.128.196.240, executor 0): FetchFailed(BlockManagerId(4, 172.128.200.157, > 4048, None), shuffleId=16, mapId=2, reduceId=3, message= > {code} > Generally, these messages are followed by: > {code} > 17/03/24 11:49:23 INFO DAGScheduler: Executor lost: 4 (epoch 20) > 17/03/24 11:49:23 INFO BlockManagerMasterEndpoint: Trying to remove executor > 4 from BlockManagerMaster. > 17/03/24 11:49:23 INFO BlockManagerMaster: Removed 4 successfully in > removeExecutor > 17/03/24 11:49:23 INFO DAGScheduler: Shuffle files lost for executor: 4 > (epoch 20) > 17/03/24 11:49:23 INFO ShuffleMapStage: ShuffleMapStage 63 is now unavailable > on executor 4 (73/89, false) > {code} > which is great. Spark resubmits tasks for data that has been lost. However, > if you have cascading instance failures, then you may come across: > {code} > 17/03/24 11:48:39 INFO DAGScheduler: Ignoring fetch failure from > ResultTask(64, 46) as it's from ResultStage 64 attempt 0 and there is a more > recent attempt for that stage (attempt ID 1) running > {code} > which don't invalidate file outputs. In later retries of the stage, Spark > will attempt to access files on machines that don't exist anymore, and then > after 4 tries, Spark will give up. If it had not ignored the fetch failure, > and invalidated the cache, then most of the lost files could have been > computed during one of the previous retries. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-20226) Call to sqlContext.cacheTable takes an incredibly long time in some cases
[ https://issues.apache.org/jira/browse/SPARK-20226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15957468#comment-15957468 ] Sean Owen commented on SPARK-20226: --- OK, this doesn't sound like it's anything to do with SQLContext or caching. > Call to sqlContext.cacheTable takes an incredibly long time in some cases > - > > Key: SPARK-20226 > URL: https://issues.apache.org/jira/browse/SPARK-20226 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.1.0 > Environment: linux or windows >Reporter: Barry Becker > Labels: cache > Attachments: xyzzy.csv > > > I have a case where the call to sqlContext.cacheTable can take an arbitrarily > long time depending on the number of columns that are referenced in a > withColumn expression applied to a dataframe. > The dataset is small (20 columns 7861 rows). The sequence to reproduce is the > following: > 1) add a new column that references 8 - 14 of the columns in the dataset. >- If I add 8 columns, then the call to cacheTable is fast - like *5 > seconds* >- If I add 11 columns, then it is slow - like *60 seconds* >- and if I add 14 columns, then it basically *takes forever* - I gave up > after 10 minutes or so. > The Column expression that is added, is basically just concatenating > the columns together in a single string. If a number is concatenated on a > string (or vice versa) the number is first converted to a string. > The expression looks something like this: > {code} > `Plate` + `State` + `License Type` + `Summons Number` + `Issue Date` + > `Violation Time` + `Violation` + `Judgment Entry Date` + `Fine Amount` + > `Penalty Amount` + `Interest Amount` > {code} > which we then convert to a Column expression that looks like this: > {code} > UDF(UDF(UDF(UDF(UDF(UDF(UDF(UDF(UDF(UDF('Plate, 'State), 'License Type), > UDF('Summons Number)), UDF('Issue Date)), 'Violation Time), 'Violation), > UDF('Judgment Entry Date)), UDF('Fine Amount)), UDF('Penalty Amount)), > UDF('Interest Amount)) > {code} >where the UDFs are very simple functions that basically call toString > and + as needed. > 2) apply a pipeline that includes some transformers that was saved earlier. > Here are the steps of the pipeline (extracted from parquet) > - > {code}{"class":"org.apache.spark.ml.feature.StringIndexerModel","timestamp":1491333200603,"sparkVersion":"2.1.0","uid":"strIdx_aeb04d2777cc","paramMap":{"handleInvalid":"skip","outputCol":"State_IDX__","inputCol":"State_CLEANED__"}}{code} > - > {code}{"class":"org.apache.spark.ml.feature.StringIndexerModel","timestamp":1491333200837,"sparkVersion":"2.1.0","uid":"strIdx_0164c4c13979","paramMap":{"inputCol":"License > Type_CLEANED__","handleInvalid":"skip","outputCol":"License > Type_IDX__"}}{code} > - > {code}{"class":"org.apache.spark.ml.feature.StringIndexerModel","timestamp":1491333201068,"sparkVersion":"2.1.0","uid":"strIdx_25b6cbd02751","paramMap":{"inputCol":"Violation_CLEANED__","handleInvalid":"skip","outputCol":"Violation_IDX__"}}{code} > - > {code}{"class":"org.apache.spark.ml.feature.StringIndexerModel","timestamp":1491333201282,"sparkVersion":"2.1.0","uid":"strIdx_aa12df0354d9","paramMap":{"handleInvalid":"skip","inputCol":"County_CLEANED__","outputCol":"County_IDX__"}}{code} > - > {code}{"class":"org.apache.spark.ml.feature.StringIndexerModel","timestamp":1491333201552,"sparkVersion":"2.1.0","uid":"strIdx_babb120f3cc1","paramMap":{"handleInvalid":"skip","outputCol":"Issuing > Agency_IDX__","inputCol":"Issuing Agency_CLEANED__"}}{code} > - > {code}{"class":"org.apache.spark.ml.feature.StringIndexerModel","timestamp":1491333201759,"sparkVersion":"2.1.0","uid":"strIdx_5f2de9d9542d","paramMap":{"handleInvalid":"skip","outputCol":"Violation > Status_IDX__","inputCol":"Violation Status_CLEANED__"}}{code} > - > {code}{"class":"org.apache.spark.ml.feature.Bucketizer","timestamp":1491333201987,"sparkVersion":"2.1.0", > "uid":"bucketizer_6f65ca9fa813", > "paramMap":{ > "outputCol":"Summons > Number_BINNED__","handleInvalid":"keep","splits":["-Inf",1.386630656E9,3.696078592E9,4.005258752E9,6.045063168E9,8.136507392E9,"Inf"],"inputCol":"Summons > Number_CLEANED__" >} >}{code} > - > {code}{"class":"org.apache.spark.ml.feature.Bucketizer","timestamp":1491333202079,"sparkVersion":"2.1.0", > "uid":"bucketizer_f5db4fb8120e", > "paramMap":{ > > "splits":["-Inf",1.435215616E9,1.443855616E9,1.447271936E9,1.448222464E9,1.448395264E9,1.448481536E9,1.448827136E9,1.449259264E9,1.449432064E9,1.449518336E9,"Inf"], > "handleInvalid":"keep","outputCol":"Issue > Date_BINNED__","inputCol":"Issue Date_CLEANED__" >} >}{code} > - > {c
[jira] [Created] (SPARK-20230) FetchFailedExceptions should invalidate file caches in MapOutputTracker even if newer stages are launched
Burak Yavuz created SPARK-20230: --- Summary: FetchFailedExceptions should invalidate file caches in MapOutputTracker even if newer stages are launched Key: SPARK-20230 URL: https://issues.apache.org/jira/browse/SPARK-20230 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 2.1.0 Reporter: Burak Yavuz If you lose instances that have shuffle outputs, you will start observing messages like: {code} 17/03/24 11:49:23 WARN TaskSetManager: Lost task 0.0 in stage 64.1 (TID 3849, 172.128.196.240, executor 0): FetchFailed(BlockManagerId(4, 172.128.200.157, 4048, None), shuffleId=16, mapId=2, reduceId=3, message= {code} Generally, these messages are followed by: {code} 17/03/24 11:49:23 INFO DAGScheduler: Executor lost: 4 (epoch 20) 17/03/24 11:49:23 INFO BlockManagerMasterEndpoint: Trying to remove executor 4 from BlockManagerMaster. 17/03/24 11:49:23 INFO BlockManagerMaster: Removed 4 successfully in removeExecutor 17/03/24 11:49:23 INFO DAGScheduler: Shuffle files lost for executor: 4 (epoch 20) 17/03/24 11:49:23 INFO ShuffleMapStage: ShuffleMapStage 63 is now unavailable on executor 4 (73/89, false) {code} which is great. Spark resubmits tasks for data that has been lost. However, if you have cascading instance failures, then you may come across: {code} 17/03/24 11:48:39 INFO DAGScheduler: Ignoring fetch failure from ResultTask(64, 46) as it's from ResultStage 64 attempt 0 and there is a more recent attempt for that stage (attempt ID 1) running {code} which don't invalidate file outputs. In later retries of the stage, Spark will attempt to access files on machines that don't exist anymore, and then after 4 tries, Spark will give up. If it had not ignored the fetch failure, and invalidated the cache, then most of the lost files could have been computed during one of the previous retries. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-20226) Call to sqlContext.cacheTable takes an incredibly long time in some cases
[ https://issues.apache.org/jira/browse/SPARK-20226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15957457#comment-15957457 ] Barry Becker commented on SPARK-20226: -- It seems like it has to do with the interaction between the StringIndexerModels in the pipeline and the column that was added in the first step. None of my StringIndexers are applied to columns with more than 100 values. Removing the bucketizers from the pipeline had much less of an impact than removing the StringIndexerModels. There were 6 StringIndexerModels and 8 Bucketizers in the original pipeline that took 80s to cache. When I remove all the StringIndexers from the pipeline the time goes way down to 0.3s. When I remove all the Bucketizers the time only goes down to only 66s when calling cacheTable. > Call to sqlContext.cacheTable takes an incredibly long time in some cases > - > > Key: SPARK-20226 > URL: https://issues.apache.org/jira/browse/SPARK-20226 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.1.0 > Environment: linux or windows >Reporter: Barry Becker > Labels: cache > Attachments: xyzzy.csv > > > I have a case where the call to sqlContext.cacheTable can take an arbitrarily > long time depending on the number of columns that are referenced in a > withColumn expression applied to a dataframe. > The dataset is small (20 columns 7861 rows). The sequence to reproduce is the > following: > 1) add a new column that references 8 - 14 of the columns in the dataset. >- If I add 8 columns, then the call to cacheTable is fast - like *5 > seconds* >- If I add 11 columns, then it is slow - like *60 seconds* >- and if I add 14 columns, then it basically *takes forever* - I gave up > after 10 minutes or so. > The Column expression that is added, is basically just concatenating > the columns together in a single string. If a number is concatenated on a > string (or vice versa) the number is first converted to a string. > The expression looks something like this: > {code} > `Plate` + `State` + `License Type` + `Summons Number` + `Issue Date` + > `Violation Time` + `Violation` + `Judgment Entry Date` + `Fine Amount` + > `Penalty Amount` + `Interest Amount` > {code} > which we then convert to a Column expression that looks like this: > {code} > UDF(UDF(UDF(UDF(UDF(UDF(UDF(UDF(UDF(UDF('Plate, 'State), 'License Type), > UDF('Summons Number)), UDF('Issue Date)), 'Violation Time), 'Violation), > UDF('Judgment Entry Date)), UDF('Fine Amount)), UDF('Penalty Amount)), > UDF('Interest Amount)) > {code} >where the UDFs are very simple functions that basically call toString > and + as needed. > 2) apply a pipeline that includes some transformers that was saved earlier. > Here are the steps of the pipeline (extracted from parquet) > - > {code}{"class":"org.apache.spark.ml.feature.StringIndexerModel","timestamp":1491333200603,"sparkVersion":"2.1.0","uid":"strIdx_aeb04d2777cc","paramMap":{"handleInvalid":"skip","outputCol":"State_IDX__","inputCol":"State_CLEANED__"}}{code} > - > {code}{"class":"org.apache.spark.ml.feature.StringIndexerModel","timestamp":1491333200837,"sparkVersion":"2.1.0","uid":"strIdx_0164c4c13979","paramMap":{"inputCol":"License > Type_CLEANED__","handleInvalid":"skip","outputCol":"License > Type_IDX__"}}{code} > - > {code}{"class":"org.apache.spark.ml.feature.StringIndexerModel","timestamp":1491333201068,"sparkVersion":"2.1.0","uid":"strIdx_25b6cbd02751","paramMap":{"inputCol":"Violation_CLEANED__","handleInvalid":"skip","outputCol":"Violation_IDX__"}}{code} > - > {code}{"class":"org.apache.spark.ml.feature.StringIndexerModel","timestamp":1491333201282,"sparkVersion":"2.1.0","uid":"strIdx_aa12df0354d9","paramMap":{"handleInvalid":"skip","inputCol":"County_CLEANED__","outputCol":"County_IDX__"}}{code} > - > {code}{"class":"org.apache.spark.ml.feature.StringIndexerModel","timestamp":1491333201552,"sparkVersion":"2.1.0","uid":"strIdx_babb120f3cc1","paramMap":{"handleInvalid":"skip","outputCol":"Issuing > Agency_IDX__","inputCol":"Issuing Agency_CLEANED__"}}{code} > - > {code}{"class":"org.apache.spark.ml.feature.StringIndexerModel","timestamp":1491333201759,"sparkVersion":"2.1.0","uid":"strIdx_5f2de9d9542d","paramMap":{"handleInvalid":"skip","outputCol":"Violation > Status_IDX__","inputCol":"Violation Status_CLEANED__"}}{code} > - > {code}{"class":"org.apache.spark.ml.feature.Bucketizer","timestamp":1491333201987,"sparkVersion":"2.1.0", > "uid":"bucketizer_6f65ca9fa813", > "paramMap":{ > "outputCol":"Summons > Number_BINNED__","handleInvalid":"keep","splits":["-Inf",1.386630656E9,3.696078592E9,4.005258752E9,6.045063168E9,8.136507392E9,"Inf"],"input
[jira] [Resolved] (SPARK-19454) Improve DataFrame.replace API
[ https://issues.apache.org/jira/browse/SPARK-19454?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] holdenk resolved SPARK-19454. - Resolution: Fixed Fix Version/s: 2.2.0 Issue resolved by pull request 16793 [https://github.com/apache/spark/pull/16793] > Improve DataFrame.replace API > - > > Key: SPARK-19454 > URL: https://issues.apache.org/jira/browse/SPARK-19454 > Project: Spark > Issue Type: Improvement > Components: PySpark, SQL >Affects Versions: 1.5.0, 1.6.0, 2.0.0, 2.1.0, 2.2.0 >Reporter: Maciej Szymkiewicz >Assignee: Maciej Szymkiewicz > Fix For: 2.2.0 > > > Current implementation suffers from following issues: > - It is possible to use {{dict}} as {{to_replace}}, but we cannot skip or use > {{None}} as the value {{value}} (although it is ignored). This requires > passing "magic" values: > {code} > df = sc.parallelize([("Alice", 1, 3.0)]).toDF() > df.replace({"Alice": "Bob"}, 1) > {code} > - Code doesn't check if provided types are correct. This can lead to > exception in Py4j (harder to diagnose): > {code} > df.replace({"Alice": 1}, 1) > {code} > or silent failures (with bundled Py4j version): > {code} > df.replace({1: 2, 3.0: 4.1, "a": "b"}, 1) > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-19454) Improve DataFrame.replace API
[ https://issues.apache.org/jira/browse/SPARK-19454?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] holdenk reassigned SPARK-19454: --- Assignee: Maciej Szymkiewicz > Improve DataFrame.replace API > - > > Key: SPARK-19454 > URL: https://issues.apache.org/jira/browse/SPARK-19454 > Project: Spark > Issue Type: Improvement > Components: PySpark, SQL >Affects Versions: 1.5.0, 1.6.0, 2.0.0, 2.1.0, 2.2.0 >Reporter: Maciej Szymkiewicz >Assignee: Maciej Szymkiewicz > Fix For: 2.2.0 > > > Current implementation suffers from following issues: > - It is possible to use {{dict}} as {{to_replace}}, but we cannot skip or use > {{None}} as the value {{value}} (although it is ignored). This requires > passing "magic" values: > {code} > df = sc.parallelize([("Alice", 1, 3.0)]).toDF() > df.replace({"Alice": "Bob"}, 1) > {code} > - Code doesn't check if provided types are correct. This can lead to > exception in Py4j (harder to diagnose): > {code} > df.replace({"Alice": 1}, 1) > {code} > or silent failures (with bundled Py4j version): > {code} > df.replace({1: 2, 3.0: 4.1, "a": "b"}, 1) > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-20228) Random Forest instable results depending on spark.executor.memory
[ https://issues.apache.org/jira/browse/SPARK-20228?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15957411#comment-15957411 ] Ansgar Schulze commented on SPARK-20228: Hi Sean, thanks for your comment! Its not a problem that the results are not the same with the second configuration but they are always much worse (about 20% more wrong predictions for the test data set). > Random Forest instable results depending on spark.executor.memory > - > > Key: SPARK-20228 > URL: https://issues.apache.org/jira/browse/SPARK-20228 > Project: Spark > Issue Type: Bug > Components: PySpark >Affects Versions: 2.1.0 >Reporter: Ansgar Schulze > > If I deploy a random forrest modeling with example > spark.executor.memory20480M > I got another result as if i depoy the modeling with > spark.executor.memory6000M > I excpected the same results but different runtimes. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-20202) Remove references to org.spark-project.hive
[ https://issues.apache.org/jira/browse/SPARK-20202?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15957377#comment-15957377 ] Ryan Blue commented on SPARK-20202: --- +1 for a release of the Spark fork from the Hive community. While the reasons for the fork appear to be fixed in the latest version, there's a lot of work to do to get Spark on a newer Hive version. And for patch releases like 2.0.3 and 2.1.1, I don't think updating Hive is an option. I'm also all for getting master on a real Hive release. A release of alternate Hive binaries was inappropriate. I think that if a third-party organization had done the same, it would be entirely reasonable to treat it as a trademark violation and ask them to stop. http://www.apache.org/foundation/marks/faq/#products > Remove references to org.spark-project.hive > --- > > Key: SPARK-20202 > URL: https://issues.apache.org/jira/browse/SPARK-20202 > Project: Spark > Issue Type: Bug > Components: Build, SQL >Affects Versions: 1.6.4, 2.0.3, 2.1.1 >Reporter: Owen O'Malley >Priority: Blocker > > Spark can't continue to depend on their fork of Hive and must move to > standard Hive versions. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-20216) Install pandoc on machine(s) used for packaging
[ https://issues.apache.org/jira/browse/SPARK-20216?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15957346#comment-15957346 ] holdenk commented on SPARK-20216: - Thanks [~marmbrus] :) So looking at that host it seems like pandoc is installed but pypandoc is installed for the wrong version of Python (namely its installed for Python 2.6, but the packaging is running in conda's Python 2.7. > Install pandoc on machine(s) used for packaging > --- > > Key: SPARK-20216 > URL: https://issues.apache.org/jira/browse/SPARK-20216 > Project: Spark > Issue Type: Bug > Components: Project Infra, PySpark >Affects Versions: 2.1.1, 2.2.0 >Reporter: holdenk >Priority: Blocker > > For Python packaging having pandoc is required to have a reasonable package > doc string. Which ever machine(s) are used for packaging should have both > pandoc and pypandoc installed on them. > cc [~joshrosen] who I know was doing something related to this -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-20216) Install pandoc on machine(s) used for packaging
[ https://issues.apache.org/jira/browse/SPARK-20216?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] holdenk resolved SPARK-20216. - Resolution: Fixed Assignee: holdenk This has been fixed with install pypandoc into the conda env and verified by running the Python package sdist. > Install pandoc on machine(s) used for packaging > --- > > Key: SPARK-20216 > URL: https://issues.apache.org/jira/browse/SPARK-20216 > Project: Spark > Issue Type: Bug > Components: Project Infra, PySpark >Affects Versions: 2.1.1, 2.2.0 >Reporter: holdenk >Assignee: holdenk >Priority: Blocker > > For Python packaging having pandoc is required to have a reasonable package > doc string. Which ever machine(s) are used for packaging should have both > pandoc and pypandoc installed on them. > cc [~joshrosen] who I know was doing something related to this -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-20178) Improve Scheduler fetch failures
[ https://issues.apache.org/jira/browse/SPARK-20178?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15957341#comment-15957341 ] Sital Kedia commented on SPARK-20178: - [~tgraves] Thanks for creating the JIRA and driving the discussion [~tgraves], [~imranr], [~markhamstra], [~kayousterhout] - . As discussed over the PR, I have created a doc consolidating the issues related to fetch failure and a high-level design goals for the scheduler https://docs.google.com/document/d/1D3b_ishMfm5sXmRS494JrOJmL9V_TRVZUB4TgK1l1fY/edit?usp=sharing. Please take a look and let me know what you think. Once we reach a consensus about the desired behavior and identify the changes that needs to be done, I can work on having more detailed design doc and also split the change into multiple small logical PRs. > Improve Scheduler fetch failures > > > Key: SPARK-20178 > URL: https://issues.apache.org/jira/browse/SPARK-20178 > Project: Spark > Issue Type: Epic > Components: Scheduler >Affects Versions: 2.1.0 >Reporter: Thomas Graves > > We have been having a lot of discussions around improving the handling of > fetch failures. There are 4 jira currently related to this. > We should try to get a list of things we want to improve and come up with one > cohesive design. > SPARK-20163, SPARK-20091, SPARK-14649 , and SPARK-19753 > I will put my initial thoughts in a follow on comment. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-20229) add semanticHash to QueryPlan
[ https://issues.apache.org/jira/browse/SPARK-20229?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-20229: Assignee: Wenchen Fan (was: Apache Spark) > add semanticHash to QueryPlan > - > > Key: SPARK-20229 > URL: https://issues.apache.org/jira/browse/SPARK-20229 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.2.0 >Reporter: Wenchen Fan >Assignee: Wenchen Fan > -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-20229) add semanticHash to QueryPlan
[ https://issues.apache.org/jira/browse/SPARK-20229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15957324#comment-15957324 ] Apache Spark commented on SPARK-20229: -- User 'cloud-fan' has created a pull request for this issue: https://github.com/apache/spark/pull/17541 > add semanticHash to QueryPlan > - > > Key: SPARK-20229 > URL: https://issues.apache.org/jira/browse/SPARK-20229 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.2.0 >Reporter: Wenchen Fan >Assignee: Wenchen Fan > -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-20229) add semanticHash to QueryPlan
[ https://issues.apache.org/jira/browse/SPARK-20229?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-20229: Assignee: Apache Spark (was: Wenchen Fan) > add semanticHash to QueryPlan > - > > Key: SPARK-20229 > URL: https://issues.apache.org/jira/browse/SPARK-20229 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.2.0 >Reporter: Wenchen Fan >Assignee: Apache Spark > -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-20219) Schedule tasks based on size of input from ScheduledRDD
[ https://issues.apache.org/jira/browse/SPARK-20219?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15957309#comment-15957309 ] Imran Rashid commented on SPARK-20219: -- I agree with Kay -- the idea here is a good one, I just worry about the complexity. In my mind, this is pretty low-priority > Schedule tasks based on size of input from ScheduledRDD > --- > > Key: SPARK-20219 > URL: https://issues.apache.org/jira/browse/SPARK-20219 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 2.1.0 >Reporter: jin xing > > When data is highly skewed on ShuffledRDD, it make sense to launch those > tasks which process much more input as soon as possible. The current > scheduling mechanism in *TaskSetManager* is quite simple: > {code} > for (i <- (0 until numTasks).reverse) { > addPendingTask(i) > } > {code} > In scenario that "large tasks" locate at bottom half of tasks array, if tasks > with much more input are launched early, we can significantly reduce the time > cost and save resource when *"dynamic allocation"* is disabled. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-20226) Call to sqlContext.cacheTable takes an incredibly long time in some cases
[ https://issues.apache.org/jira/browse/SPARK-20226?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Barry Becker updated SPARK-20226: - Labels: cache (was: ) > Call to sqlContext.cacheTable takes an incredibly long time in some cases > - > > Key: SPARK-20226 > URL: https://issues.apache.org/jira/browse/SPARK-20226 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.1.0 > Environment: linux or windows >Reporter: Barry Becker > Labels: cache > Attachments: xyzzy.csv > > > I have a case where the call to sqlContext.cacheTable can take an arbitrarily > long time depending on the number of columns that are referenced in a > withColumn expression applied to a dataframe. > The dataset is small (20 columns 7861 rows). The sequence to reproduce is the > following: > 1) add a new column that references 8 - 14 of the columns in the dataset. >- If I add 8 columns, then the call to cacheTable is fast - like *5 > seconds* >- If I add 11 columns, then it is slow - like *60 seconds* >- and if I add 14 columns, then it basically *takes forever* - I gave up > after 10 minutes or so. > The Column expression that is added, is basically just concatenating > the columns together in a single string. If a number is concatenated on a > string (or vice versa) the number is first converted to a string. > The expression looks something like this: > {code} > `Plate` + `State` + `License Type` + `Summons Number` + `Issue Date` + > `Violation Time` + `Violation` + `Judgment Entry Date` + `Fine Amount` + > `Penalty Amount` + `Interest Amount` > {code} > which we then convert to a Column expression that looks like this: > {code} > UDF(UDF(UDF(UDF(UDF(UDF(UDF(UDF(UDF(UDF('Plate, 'State), 'License Type), > UDF('Summons Number)), UDF('Issue Date)), 'Violation Time), 'Violation), > UDF('Judgment Entry Date)), UDF('Fine Amount)), UDF('Penalty Amount)), > UDF('Interest Amount)) > {code} >where the UDFs are very simple functions that basically call toString > and + as needed. > 2) apply a pipeline that includes some transformers that was saved earlier. > Here are the steps of the pipeline (extracted from parquet) > - > {code}{"class":"org.apache.spark.ml.feature.StringIndexerModel","timestamp":1491333200603,"sparkVersion":"2.1.0","uid":"strIdx_aeb04d2777cc","paramMap":{"handleInvalid":"skip","outputCol":"State_IDX__","inputCol":"State_CLEANED__"}}{code} > - > {code}{"class":"org.apache.spark.ml.feature.StringIndexerModel","timestamp":1491333200837,"sparkVersion":"2.1.0","uid":"strIdx_0164c4c13979","paramMap":{"inputCol":"License > Type_CLEANED__","handleInvalid":"skip","outputCol":"License > Type_IDX__"}}{code} > - > {code}{"class":"org.apache.spark.ml.feature.StringIndexerModel","timestamp":1491333201068,"sparkVersion":"2.1.0","uid":"strIdx_25b6cbd02751","paramMap":{"inputCol":"Violation_CLEANED__","handleInvalid":"skip","outputCol":"Violation_IDX__"}}{code} > - > {code}{"class":"org.apache.spark.ml.feature.StringIndexerModel","timestamp":1491333201282,"sparkVersion":"2.1.0","uid":"strIdx_aa12df0354d9","paramMap":{"handleInvalid":"skip","inputCol":"County_CLEANED__","outputCol":"County_IDX__"}}{code} > - > {code}{"class":"org.apache.spark.ml.feature.StringIndexerModel","timestamp":1491333201552,"sparkVersion":"2.1.0","uid":"strIdx_babb120f3cc1","paramMap":{"handleInvalid":"skip","outputCol":"Issuing > Agency_IDX__","inputCol":"Issuing Agency_CLEANED__"}}{code} > - > {code}{"class":"org.apache.spark.ml.feature.StringIndexerModel","timestamp":1491333201759,"sparkVersion":"2.1.0","uid":"strIdx_5f2de9d9542d","paramMap":{"handleInvalid":"skip","outputCol":"Violation > Status_IDX__","inputCol":"Violation Status_CLEANED__"}}{code} > - > {code}{"class":"org.apache.spark.ml.feature.Bucketizer","timestamp":1491333201987,"sparkVersion":"2.1.0", > "uid":"bucketizer_6f65ca9fa813", > "paramMap":{ > "outputCol":"Summons > Number_BINNED__","handleInvalid":"keep","splits":["-Inf",1.386630656E9,3.696078592E9,4.005258752E9,6.045063168E9,8.136507392E9,"Inf"],"inputCol":"Summons > Number_CLEANED__" >} >}{code} > - > {code}{"class":"org.apache.spark.ml.feature.Bucketizer","timestamp":1491333202079,"sparkVersion":"2.1.0", > "uid":"bucketizer_f5db4fb8120e", > "paramMap":{ > > "splits":["-Inf",1.435215616E9,1.443855616E9,1.447271936E9,1.448222464E9,1.448395264E9,1.448481536E9,1.448827136E9,1.449259264E9,1.449432064E9,1.449518336E9,"Inf"], > "handleInvalid":"keep","outputCol":"Issue > Date_BINNED__","inputCol":"Issue Date_CLEANED__" >} >}{code} > - > {code}{"class":"org.apache.spark.ml.feature.Bucketizer","timestamp":1491333202172,"sparkVersion":"2.1.0"
[jira] [Comment Edited] (SPARK-20226) Call to sqlContext.cacheTable takes an incredibly long time in some cases
[ https://issues.apache.org/jira/browse/SPARK-20226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15957296#comment-15957296 ] Barry Becker edited comment on SPARK-20226 at 4/5/17 5:36 PM: -- We noticed that this is reproducible just by adding a new column that is based on a very simple UDF like {code} def newColumn = udf((values: Row) => { "foo" }) {code} The time taken to cache the query plan is highly dependent on the number of columns columns that are pass to this udf - even though they are not used by it. If 8 or fewer columns are in the Row based in, then everything is reasonably fast, but as soon as you get to 11 or so, things slow dramatically. By the time you get to 14 columns, it takes forever. I will try using a simpler pipeline to see if I can eliminate either the StringIndexers or Bucketizers as contributing factors. was (Author: barrybecker4): We noticed that this is reproducible just by adding a new column that is based on a very simple UDF like {code} def newColumn = udf((values: Row) => { "foo" }) {code} The time taken to cache the query plan is highly dependent on the nunber of columns columns that are pass to this udf - even though they are not used by it. If 8 or fewer columns are in the Row based in, then everything is reasonably fast, but as soon as you get to 11 or so, things slow dramatically. By the time you get to 14 columns, it takes forever. I will try using a simpler pipeline to see if I can eliminate either the StringIndexers or Bucketizers as contributing factors. > Call to sqlContext.cacheTable takes an incredibly long time in some cases > - > > Key: SPARK-20226 > URL: https://issues.apache.org/jira/browse/SPARK-20226 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.1.0 > Environment: linux or windows >Reporter: Barry Becker > Attachments: xyzzy.csv > > > I have a case where the call to sqlContext.cacheTable can take an arbitrarily > long time depending on the number of columns that are referenced in a > withColumn expression applied to a dataframe. > The dataset is small (20 columns 7861 rows). The sequence to reproduce is the > following: > 1) add a new column that references 8 - 14 of the columns in the dataset. >- If I add 8 columns, then the call to cacheTable is fast - like *5 > seconds* >- If I add 11 columns, then it is slow - like *60 seconds* >- and if I add 14 columns, then it basically *takes forever* - I gave up > after 10 minutes or so. > The Column expression that is added, is basically just concatenating > the columns together in a single string. If a number is concatenated on a > string (or vice versa) the number is first converted to a string. > The expression looks something like this: > {code} > `Plate` + `State` + `License Type` + `Summons Number` + `Issue Date` + > `Violation Time` + `Violation` + `Judgment Entry Date` + `Fine Amount` + > `Penalty Amount` + `Interest Amount` > {code} > which we then convert to a Column expression that looks like this: > {code} > UDF(UDF(UDF(UDF(UDF(UDF(UDF(UDF(UDF(UDF('Plate, 'State), 'License Type), > UDF('Summons Number)), UDF('Issue Date)), 'Violation Time), 'Violation), > UDF('Judgment Entry Date)), UDF('Fine Amount)), UDF('Penalty Amount)), > UDF('Interest Amount)) > {code} >where the UDFs are very simple functions that basically call toString > and + as needed. > 2) apply a pipeline that includes some transformers that was saved earlier. > Here are the steps of the pipeline (extracted from parquet) > - > {code}{"class":"org.apache.spark.ml.feature.StringIndexerModel","timestamp":1491333200603,"sparkVersion":"2.1.0","uid":"strIdx_aeb04d2777cc","paramMap":{"handleInvalid":"skip","outputCol":"State_IDX__","inputCol":"State_CLEANED__"}}{code} > - > {code}{"class":"org.apache.spark.ml.feature.StringIndexerModel","timestamp":1491333200837,"sparkVersion":"2.1.0","uid":"strIdx_0164c4c13979","paramMap":{"inputCol":"License > Type_CLEANED__","handleInvalid":"skip","outputCol":"License > Type_IDX__"}}{code} > - > {code}{"class":"org.apache.spark.ml.feature.StringIndexerModel","timestamp":1491333201068,"sparkVersion":"2.1.0","uid":"strIdx_25b6cbd02751","paramMap":{"inputCol":"Violation_CLEANED__","handleInvalid":"skip","outputCol":"Violation_IDX__"}}{code} > - > {code}{"class":"org.apache.spark.ml.feature.StringIndexerModel","timestamp":1491333201282,"sparkVersion":"2.1.0","uid":"strIdx_aa12df0354d9","paramMap":{"handleInvalid":"skip","inputCol":"County_CLEANED__","outputCol":"County_IDX__"}}{code} > - > {code}{"class":"org.apache.spark.ml.feature.StringIndexerModel","timestamp":1491333201552,"sparkVersion":"2.1.0","uid":"strIdx_babb12
[jira] [Commented] (SPARK-20226) Call to sqlContext.cacheTable takes an incredibly long time in some cases
[ https://issues.apache.org/jira/browse/SPARK-20226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15957296#comment-15957296 ] Barry Becker commented on SPARK-20226: -- We noticed that this is reproducible just by adding a new column that is based on a very simple UDF like {code} def newColumn = udf((values: Row) => { "foo" }) {code} The time taken to cache the query plan is highly dependent on the nunber of columns columns that are pass to this udf - even though they are not used by it. If 8 or fewer columns are in the Row based in, then everything is reasonably fast, but as soon as you get to 11 or so, things slow dramatically. By the time you get to 14 columns, it takes forever. I will try using a simpler pipeline to see if I can eliminate either the StringIndexers or Bucketizers as contributing factors. > Call to sqlContext.cacheTable takes an incredibly long time in some cases > - > > Key: SPARK-20226 > URL: https://issues.apache.org/jira/browse/SPARK-20226 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.1.0 > Environment: linux or windows >Reporter: Barry Becker > Attachments: xyzzy.csv > > > I have a case where the call to sqlContext.cacheTable can take an arbitrarily > long time depending on the number of columns that are referenced in a > withColumn expression applied to a dataframe. > The dataset is small (20 columns 7861 rows). The sequence to reproduce is the > following: > 1) add a new column that references 8 - 14 of the columns in the dataset. >- If I add 8 columns, then the call to cacheTable is fast - like *5 > seconds* >- If I add 11 columns, then it is slow - like *60 seconds* >- and if I add 14 columns, then it basically *takes forever* - I gave up > after 10 minutes or so. > The Column expression that is added, is basically just concatenating > the columns together in a single string. If a number is concatenated on a > string (or vice versa) the number is first converted to a string. > The expression looks something like this: > {code} > `Plate` + `State` + `License Type` + `Summons Number` + `Issue Date` + > `Violation Time` + `Violation` + `Judgment Entry Date` + `Fine Amount` + > `Penalty Amount` + `Interest Amount` > {code} > which we then convert to a Column expression that looks like this: > {code} > UDF(UDF(UDF(UDF(UDF(UDF(UDF(UDF(UDF(UDF('Plate, 'State), 'License Type), > UDF('Summons Number)), UDF('Issue Date)), 'Violation Time), 'Violation), > UDF('Judgment Entry Date)), UDF('Fine Amount)), UDF('Penalty Amount)), > UDF('Interest Amount)) > {code} >where the UDFs are very simple functions that basically call toString > and + as needed. > 2) apply a pipeline that includes some transformers that was saved earlier. > Here are the steps of the pipeline (extracted from parquet) > - > {code}{"class":"org.apache.spark.ml.feature.StringIndexerModel","timestamp":1491333200603,"sparkVersion":"2.1.0","uid":"strIdx_aeb04d2777cc","paramMap":{"handleInvalid":"skip","outputCol":"State_IDX__","inputCol":"State_CLEANED__"}}{code} > - > {code}{"class":"org.apache.spark.ml.feature.StringIndexerModel","timestamp":1491333200837,"sparkVersion":"2.1.0","uid":"strIdx_0164c4c13979","paramMap":{"inputCol":"License > Type_CLEANED__","handleInvalid":"skip","outputCol":"License > Type_IDX__"}}{code} > - > {code}{"class":"org.apache.spark.ml.feature.StringIndexerModel","timestamp":1491333201068,"sparkVersion":"2.1.0","uid":"strIdx_25b6cbd02751","paramMap":{"inputCol":"Violation_CLEANED__","handleInvalid":"skip","outputCol":"Violation_IDX__"}}{code} > - > {code}{"class":"org.apache.spark.ml.feature.StringIndexerModel","timestamp":1491333201282,"sparkVersion":"2.1.0","uid":"strIdx_aa12df0354d9","paramMap":{"handleInvalid":"skip","inputCol":"County_CLEANED__","outputCol":"County_IDX__"}}{code} > - > {code}{"class":"org.apache.spark.ml.feature.StringIndexerModel","timestamp":1491333201552,"sparkVersion":"2.1.0","uid":"strIdx_babb120f3cc1","paramMap":{"handleInvalid":"skip","outputCol":"Issuing > Agency_IDX__","inputCol":"Issuing Agency_CLEANED__"}}{code} > - > {code}{"class":"org.apache.spark.ml.feature.StringIndexerModel","timestamp":1491333201759,"sparkVersion":"2.1.0","uid":"strIdx_5f2de9d9542d","paramMap":{"handleInvalid":"skip","outputCol":"Violation > Status_IDX__","inputCol":"Violation Status_CLEANED__"}}{code} > - > {code}{"class":"org.apache.spark.ml.feature.Bucketizer","timestamp":1491333201987,"sparkVersion":"2.1.0", > "uid":"bucketizer_6f65ca9fa813", > "paramMap":{ > "outputCol":"Summons > Number_BINNED__","handleInvalid":"keep","splits":["-Inf",1.386630656E9,3.696078592E9,4.005258752E9,6.045063168E9,8.136507392E9,"In
[jira] [Assigned] (SPARK-20213) DataFrameWriter operations do not show up in SQL tab
[ https://issues.apache.org/jira/browse/SPARK-20213?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-20213: Assignee: Apache Spark > DataFrameWriter operations do not show up in SQL tab > > > Key: SPARK-20213 > URL: https://issues.apache.org/jira/browse/SPARK-20213 > Project: Spark > Issue Type: Bug > Components: SQL, Web UI >Affects Versions: 2.0.2, 2.1.0 >Reporter: Ryan Blue >Assignee: Apache Spark > > In 1.6.1, {{DataFrame}} writes started using {{DataFrameWriter}} actions like > {{insertInto}} would show up in the SQL tab. In 2.0.0 and later, they no > longer do. The problem is that 2.0.0 and later no longer wrap execution with > {{SQLExecution.withNewExecutionId}}, which emits > {{SparkListenerSQLExecutionStart}}. > Here are the relevant parts of the stack traces: > {code:title=Spark 1.6.1} > org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:130) > org.apache.spark.sql.execution.QueryExecution$$anonfun$toRdd$1.apply(QueryExecution.scala:56) > org.apache.spark.sql.execution.QueryExecution$$anonfun$toRdd$1.apply(QueryExecution.scala:56) > org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:53) > org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:56) > => holding > Monitor(org.apache.spark.sql.hive.HiveContext$QueryExecution@424773807}) > org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:55) > org.apache.spark.sql.DataFrameWriter.insertInto(DataFrameWriter.scala:196) > {code} > {code:title=Spark 2.0.0} > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) > org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133) > org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:114) > org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:86) > => holding Monitor(org.apache.spark.sql.execution.QueryExecution@490977924}) > org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:86) > org.apache.spark.sql.DataFrameWriter.insertInto(DataFrameWriter.scala:301) > {code} > I think this was introduced by > [54d23599|https://github.com/apache/spark/commit/54d23599]. The fix should be > to add withNewExecutionId to > https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala#L610 -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-20213) DataFrameWriter operations do not show up in SQL tab
[ https://issues.apache.org/jira/browse/SPARK-20213?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-20213: Assignee: (was: Apache Spark) > DataFrameWriter operations do not show up in SQL tab > > > Key: SPARK-20213 > URL: https://issues.apache.org/jira/browse/SPARK-20213 > Project: Spark > Issue Type: Bug > Components: SQL, Web UI >Affects Versions: 2.0.2, 2.1.0 >Reporter: Ryan Blue > > In 1.6.1, {{DataFrame}} writes started using {{DataFrameWriter}} actions like > {{insertInto}} would show up in the SQL tab. In 2.0.0 and later, they no > longer do. The problem is that 2.0.0 and later no longer wrap execution with > {{SQLExecution.withNewExecutionId}}, which emits > {{SparkListenerSQLExecutionStart}}. > Here are the relevant parts of the stack traces: > {code:title=Spark 1.6.1} > org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:130) > org.apache.spark.sql.execution.QueryExecution$$anonfun$toRdd$1.apply(QueryExecution.scala:56) > org.apache.spark.sql.execution.QueryExecution$$anonfun$toRdd$1.apply(QueryExecution.scala:56) > org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:53) > org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:56) > => holding > Monitor(org.apache.spark.sql.hive.HiveContext$QueryExecution@424773807}) > org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:55) > org.apache.spark.sql.DataFrameWriter.insertInto(DataFrameWriter.scala:196) > {code} > {code:title=Spark 2.0.0} > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) > org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133) > org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:114) > org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:86) > => holding Monitor(org.apache.spark.sql.execution.QueryExecution@490977924}) > org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:86) > org.apache.spark.sql.DataFrameWriter.insertInto(DataFrameWriter.scala:301) > {code} > I think this was introduced by > [54d23599|https://github.com/apache/spark/commit/54d23599]. The fix should be > to add withNewExecutionId to > https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala#L610 -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-20213) DataFrameWriter operations do not show up in SQL tab
[ https://issues.apache.org/jira/browse/SPARK-20213?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15957292#comment-15957292 ] Apache Spark commented on SPARK-20213: -- User 'rdblue' has created a pull request for this issue: https://github.com/apache/spark/pull/17540 > DataFrameWriter operations do not show up in SQL tab > > > Key: SPARK-20213 > URL: https://issues.apache.org/jira/browse/SPARK-20213 > Project: Spark > Issue Type: Bug > Components: SQL, Web UI >Affects Versions: 2.0.2, 2.1.0 >Reporter: Ryan Blue > > In 1.6.1, {{DataFrame}} writes started using {{DataFrameWriter}} actions like > {{insertInto}} would show up in the SQL tab. In 2.0.0 and later, they no > longer do. The problem is that 2.0.0 and later no longer wrap execution with > {{SQLExecution.withNewExecutionId}}, which emits > {{SparkListenerSQLExecutionStart}}. > Here are the relevant parts of the stack traces: > {code:title=Spark 1.6.1} > org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:130) > org.apache.spark.sql.execution.QueryExecution$$anonfun$toRdd$1.apply(QueryExecution.scala:56) > org.apache.spark.sql.execution.QueryExecution$$anonfun$toRdd$1.apply(QueryExecution.scala:56) > org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:53) > org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:56) > => holding > Monitor(org.apache.spark.sql.hive.HiveContext$QueryExecution@424773807}) > org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:55) > org.apache.spark.sql.DataFrameWriter.insertInto(DataFrameWriter.scala:196) > {code} > {code:title=Spark 2.0.0} > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) > org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:133) > org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:114) > org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:86) > => holding Monitor(org.apache.spark.sql.execution.QueryExecution@490977924}) > org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:86) > org.apache.spark.sql.DataFrameWriter.insertInto(DataFrameWriter.scala:301) > {code} > I think this was introduced by > [54d23599|https://github.com/apache/spark/commit/54d23599]. The fix should be > to add withNewExecutionId to > https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala#L610 -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-20223) Typo in tpcds q77.sql
[ https://issues.apache.org/jira/browse/SPARK-20223?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xiao Li reassigned SPARK-20223: --- Assignee: Zhenhua Wang > Typo in tpcds q77.sql > - > > Key: SPARK-20223 > URL: https://issues.apache.org/jira/browse/SPARK-20223 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.2.0 >Reporter: Zhenhua Wang >Assignee: Zhenhua Wang > Fix For: 2.0.3, 2.1.2, 2.2.0 > > -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-20223) Typo in tpcds q77.sql
[ https://issues.apache.org/jira/browse/SPARK-20223?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xiao Li resolved SPARK-20223. - Resolution: Fixed Fix Version/s: 2.2.0 2.1.2 2.0.3 > Typo in tpcds q77.sql > - > > Key: SPARK-20223 > URL: https://issues.apache.org/jira/browse/SPARK-20223 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.2.0 >Reporter: Zhenhua Wang >Assignee: Zhenhua Wang > Fix For: 2.0.3, 2.1.2, 2.2.0 > > -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-20214) pyspark linalg _convert_to_vector should check for sorted indices
[ https://issues.apache.org/jira/browse/SPARK-20214?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Joseph K. Bradley reassigned SPARK-20214: - Assignee: Liang-Chi Hsieh > pyspark linalg _convert_to_vector should check for sorted indices > - > > Key: SPARK-20214 > URL: https://issues.apache.org/jira/browse/SPARK-20214 > Project: Spark > Issue Type: Bug > Components: ML, MLlib, PySpark, Tests >Affects Versions: 2.0.2, 2.1.1, 2.2.0 >Reporter: Joseph K. Bradley >Assignee: Liang-Chi Hsieh > > I've seen a few failures of this line: > https://github.com/apache/spark/blame/402bf2a50ddd4039ff9f376b641bd18fffa54171/python/pyspark/mllib/tests.py#L847 > It converts a scipy.sparse.lil_matrix to a dok_matrix and then to a > pyspark.mllib.linalg.Vector. The failure happens in the conversion to a > vector and indicates that the dok_matrix is not returning its values in > sorted order. (Actually, the failure is in _convert_to_vector, which converts > the dok_matrix to a csc_matrix and then passes the CSC data to the MLlib > Vector constructor.) Here's the stack trace: > {code} > Traceback (most recent call last): > File "/home/jenkins/workspace/python/pyspark/mllib/tests.py", line 847, in > test_serialize > self.assertEqual(sv, _convert_to_vector(lil.todok())) > File "/home/jenkins/workspace/python/pyspark/mllib/linalg/__init__.py", > line 78, in _convert_to_vector > return SparseVector(l.shape[0], csc.indices, csc.data) > File "/home/jenkins/workspace/python/pyspark/mllib/linalg/__init__.py", > line 556, in __init__ > % (self.indices[i], self.indices[i + 1])) > TypeError: Indices 3 and 1 are not strictly increasing > {code} > This seems like a bug in _convert_to_vector, where we really should check > {{csc_matrix.has_sorted_indices}} first. > I haven't seen this bug in pyspark.ml.linalg, but it probably exists there > too. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-20214) pyspark linalg _convert_to_vector should check for sorted indices
[ https://issues.apache.org/jira/browse/SPARK-20214?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Joseph K. Bradley updated SPARK-20214: -- Summary: pyspark linalg _convert_to_vector should check for sorted indices (was: pyspark.mllib SciPyTests test_serialize) > pyspark linalg _convert_to_vector should check for sorted indices > - > > Key: SPARK-20214 > URL: https://issues.apache.org/jira/browse/SPARK-20214 > Project: Spark > Issue Type: Bug > Components: ML, MLlib, PySpark, Tests >Affects Versions: 2.0.2, 2.1.1, 2.2.0 >Reporter: Joseph K. Bradley > > I've seen a few failures of this line: > https://github.com/apache/spark/blame/402bf2a50ddd4039ff9f376b641bd18fffa54171/python/pyspark/mllib/tests.py#L847 > It converts a scipy.sparse.lil_matrix to a dok_matrix and then to a > pyspark.mllib.linalg.Vector. The failure happens in the conversion to a > vector and indicates that the dok_matrix is not returning its values in > sorted order. (Actually, the failure is in _convert_to_vector, which converts > the dok_matrix to a csc_matrix and then passes the CSC data to the MLlib > Vector constructor.) Here's the stack trace: > {code} > Traceback (most recent call last): > File "/home/jenkins/workspace/python/pyspark/mllib/tests.py", line 847, in > test_serialize > self.assertEqual(sv, _convert_to_vector(lil.todok())) > File "/home/jenkins/workspace/python/pyspark/mllib/linalg/__init__.py", > line 78, in _convert_to_vector > return SparseVector(l.shape[0], csc.indices, csc.data) > File "/home/jenkins/workspace/python/pyspark/mllib/linalg/__init__.py", > line 556, in __init__ > % (self.indices[i], self.indices[i + 1])) > TypeError: Indices 3 and 1 are not strictly increasing > {code} > This seems like a bug in _convert_to_vector, where we really should check > {{csc_matrix.has_sorted_indices}} first. > I haven't seen this bug in pyspark.ml.linalg, but it probably exists there > too. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-20214) pyspark.mllib SciPyTests test_serialize
[ https://issues.apache.org/jira/browse/SPARK-20214?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Joseph K. Bradley updated SPARK-20214: -- Target Version/s: 2.0.3, 2.1.2, 2.2.0 > pyspark.mllib SciPyTests test_serialize > --- > > Key: SPARK-20214 > URL: https://issues.apache.org/jira/browse/SPARK-20214 > Project: Spark > Issue Type: Bug > Components: ML, MLlib, PySpark, Tests >Affects Versions: 2.0.2, 2.1.1, 2.2.0 >Reporter: Joseph K. Bradley > > I've seen a few failures of this line: > https://github.com/apache/spark/blame/402bf2a50ddd4039ff9f376b641bd18fffa54171/python/pyspark/mllib/tests.py#L847 > It converts a scipy.sparse.lil_matrix to a dok_matrix and then to a > pyspark.mllib.linalg.Vector. The failure happens in the conversion to a > vector and indicates that the dok_matrix is not returning its values in > sorted order. (Actually, the failure is in _convert_to_vector, which converts > the dok_matrix to a csc_matrix and then passes the CSC data to the MLlib > Vector constructor.) Here's the stack trace: > {code} > Traceback (most recent call last): > File "/home/jenkins/workspace/python/pyspark/mllib/tests.py", line 847, in > test_serialize > self.assertEqual(sv, _convert_to_vector(lil.todok())) > File "/home/jenkins/workspace/python/pyspark/mllib/linalg/__init__.py", > line 78, in _convert_to_vector > return SparseVector(l.shape[0], csc.indices, csc.data) > File "/home/jenkins/workspace/python/pyspark/mllib/linalg/__init__.py", > line 556, in __init__ > % (self.indices[i], self.indices[i + 1])) > TypeError: Indices 3 and 1 are not strictly increasing > {code} > This seems like a bug in _convert_to_vector, where we really should check > {{csc_matrix.has_sorted_indices}} first. > I haven't seen this bug in pyspark.ml.linalg, but it probably exists there > too. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-20214) pyspark.mllib SciPyTests test_serialize
[ https://issues.apache.org/jira/browse/SPARK-20214?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Joseph K. Bradley updated SPARK-20214: -- Shepherd: Joseph K. Bradley > pyspark.mllib SciPyTests test_serialize > --- > > Key: SPARK-20214 > URL: https://issues.apache.org/jira/browse/SPARK-20214 > Project: Spark > Issue Type: Bug > Components: ML, MLlib, PySpark, Tests >Affects Versions: 2.0.2, 2.1.1, 2.2.0 >Reporter: Joseph K. Bradley > > I've seen a few failures of this line: > https://github.com/apache/spark/blame/402bf2a50ddd4039ff9f376b641bd18fffa54171/python/pyspark/mllib/tests.py#L847 > It converts a scipy.sparse.lil_matrix to a dok_matrix and then to a > pyspark.mllib.linalg.Vector. The failure happens in the conversion to a > vector and indicates that the dok_matrix is not returning its values in > sorted order. (Actually, the failure is in _convert_to_vector, which converts > the dok_matrix to a csc_matrix and then passes the CSC data to the MLlib > Vector constructor.) Here's the stack trace: > {code} > Traceback (most recent call last): > File "/home/jenkins/workspace/python/pyspark/mllib/tests.py", line 847, in > test_serialize > self.assertEqual(sv, _convert_to_vector(lil.todok())) > File "/home/jenkins/workspace/python/pyspark/mllib/linalg/__init__.py", > line 78, in _convert_to_vector > return SparseVector(l.shape[0], csc.indices, csc.data) > File "/home/jenkins/workspace/python/pyspark/mllib/linalg/__init__.py", > line 556, in __init__ > % (self.indices[i], self.indices[i + 1])) > TypeError: Indices 3 and 1 are not strictly increasing > {code} > This seems like a bug in _convert_to_vector, where we really should check > {{csc_matrix.has_sorted_indices}} first. > I haven't seen this bug in pyspark.ml.linalg, but it probably exists there > too. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-19068) Large number of executors causing a ton of ERROR scheduler.LiveListenerBus: SparkListenerBus has already stopped! Dropping event SparkListenerExecutorMetricsUpdate(41,Wr
[ https://issues.apache.org/jira/browse/SPARK-19068?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Artur Sukhenko updated SPARK-19068: --- Affects Version/s: 2.0.1 > Large number of executors causing a ton of ERROR scheduler.LiveListenerBus: > SparkListenerBus has already stopped! Dropping event > SparkListenerExecutorMetricsUpdate(41,WrappedArray()) > -- > > Key: SPARK-19068 > URL: https://issues.apache.org/jira/browse/SPARK-19068 > Project: Spark > Issue Type: Bug >Affects Versions: 2.0.1, 2.1.0 > Environment: RHEL 7.2 >Reporter: JESSE CHEN > Attachments: sparklog.tar.gz > > > On a large cluster with 45TB RAM and 1,000 cores, we used 1008 executors in > order to use all RAM and cores for a 100TB Spark SQL workload. Long-running > queries tend to report the following ERRORs > {noformat} > 16/12/27 12:44:28 ERROR scheduler.LiveListenerBus: SparkListenerBus has > already stopped! Dropping event > SparkListenerExecutorMetricsUpdate(136,WrappedArray()) > 16/12/27 12:44:28 ERROR scheduler.LiveListenerBus: SparkListenerBus has > already stopped! Dropping event > SparkListenerExecutorMetricsUpdate(853,WrappedArray()) > 16/12/27 12:44:28 ERROR scheduler.LiveListenerBus: SparkListenerBus has > already stopped! Dropping event > SparkListenerExecutorMetricsUpdate(395,WrappedArray()) > 16/12/27 12:44:28 ERROR scheduler.LiveListenerBus: SparkListenerBus has > already stopped! Dropping event > SparkListenerExecutorMetricsUpdate(736,WrappedArray()) > 16/12/27 12:44:28 ERROR scheduler.LiveListenerBus: SparkListenerBus has > already stopped! Dropping event > SparkListenerExecutorMetricsUpdate(439,WrappedArray()) > 16/12/27 12:44:28 ERROR scheduler.LiveListenerBus: SparkListenerBus has > already stopped! Dropping event > SparkListenerExecutorMetricsUpdate(16,WrappedArray()) > 16/12/27 12:44:28 ERROR scheduler.LiveListenerBus: SparkListenerBus has > already stopped! Dropping event > SparkListenerExecutorMetricsUpdate(307,WrappedArray()) > 16/12/27 12:44:28 ERROR scheduler.LiveListenerBus: SparkListenerBus has > already stopped! Dropping event > SparkListenerExecutorMetricsUpdate(51,WrappedArray()) > 16/12/27 12:44:29 ERROR scheduler.LiveListenerBus: SparkListenerBus has > already stopped! Dropping event > SparkListenerExecutorMetricsUpdate(535,WrappedArray()) > 16/12/27 12:44:29 ERROR scheduler.LiveListenerBus: SparkListenerBus has > already stopped! Dropping event > SparkListenerExecutorMetricsUpdate(63,WrappedArray()) > 16/12/27 12:44:29 ERROR scheduler.LiveListenerBus: SparkListenerBus has > already stopped! Dropping event > SparkListenerExecutorMetricsUpdate(333,WrappedArray()) > 16/12/27 12:44:29 ERROR scheduler.LiveListenerBus: SparkListenerBus has > already stopped! Dropping event > SparkListenerExecutorMetricsUpdate(484,WrappedArray()) > (omitted) > {noformat} > The message itself maybe a reasonable response to a already stopped > SparkListenerBus (so subsequent events are thrown away with that ERROR > message). The issue is that because SparkContext does NOT exit until all > these ERROR/events are reported, which is a huge number in our setup -- and > this can take, in some cases, hours!!! > We tried increasing the > Adding default property: spark.scheduler.listenerbus.eventqueue.size=13 > from 10K, this still occurs. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-19068) Large number of executors causing a ton of ERROR scheduler.LiveListenerBus: SparkListenerBus has already stopped! Dropping event SparkListenerExecutorMetricsUpdate(41,
[ https://issues.apache.org/jira/browse/SPARK-19068?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15957258#comment-15957258 ] Artur Sukhenko commented on SPARK-19068: Having similar problem. Reproduce: {panel}[spark-2.1.0-bin-without-hadoop]$ ./bin/run-example --master yarn --deploy-mode client --num-executors 4 SparkPi 100{panel} Made jstack of driver process and found this thread: {code}"SparkListenerBus" #10 daemon prio=5 os_prio=0 tid=0x7fc8fdc8c800 nid=0x37e7 runnable [0x7fc838764000] java.lang.Thread.State: RUNNABLE at scala.collection.mutable.HashTable$class.resize(HashTable.scala:262) at scala.collection.mutable.HashTable$class.scala$collection$mutable$HashTable$$addEntry0(HashTable.scala:154) at scala.collection.mutable.HashTable$class.findOrAddEntry(HashTable.scala:166) at scala.collection.mutable.LinkedHashMap.findOrAddEntry(LinkedHashMap.scala:49) at scala.collection.mutable.LinkedHashMap.put(LinkedHashMap.scala:71) at scala.collection.mutable.LinkedHashMap.$plus$eq(LinkedHashMap.scala:89) at scala.collection.mutable.LinkedHashMap.$plus$eq(LinkedHashMap.scala:49) at scala.collection.generic.Growable$$anonfun$$plus$plus$eq$1.apply(Growable.scala:59) at scala.collection.generic.Growable$$anonfun$$plus$plus$eq$1.apply(Growable.scala:59) at scala.collection.Iterator$class.foreach(Iterator.scala:893) at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59) at scala.collection.mutable.AbstractMap.$plus$plus$eq(Map.scala:80) at scala.collection.IterableLike$class.drop(IterableLike.scala:152) at scala.collection.AbstractIterable.drop(Iterable.scala:54) at org.apache.spark.ui.jobs.JobProgressListener.onTaskEnd(JobProgressListener.scala:412) - locked <0x800b9db8> (a org.apache.spark.ui.jobs.JobProgressListener) at org.apache.spark.scheduler.SparkListenerBus$class.doPostEvent(SparkListenerBus.scala:45) at org.apache.spark.scheduler.LiveListenerBus.doPostEvent(LiveListenerBus.scala:36) at org.apache.spark.scheduler.LiveListenerBus.doPostEvent(LiveListenerBus.scala:36) at org.apache.spark.util.ListenerBus$class.postToAll(ListenerBus.scala:63) at org.apache.spark.scheduler.LiveListenerBus.postToAll(LiveListenerBus.scala:36) at org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(LiveListenerBus.scala:94) at org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(LiveListenerBus.scala:79) at org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(LiveListenerBus.scala:79) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58) at org.apache.spark.scheduler.LiveListenerBus$$anon$1$$anonfun$run$1.apply$mcV$sp(LiveListenerBus.scala:78) at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1245) at org.apache.spark.scheduler.LiveListenerBus$$anon$1.run(LiveListenerBus.scala:77) {code} At 100k+ tasks job starts to be very slow and I am getting following errors: {code} [Stage 0:=> (108000 + 4) / 100]17/04/06 01:58:49 WARN LiveListenerBus: Dropped 182143 SparkListenerEvents since Thu Apr 06 01:57:49 JST 2017 [Stage 0:=> (109588 + 4) / 100]17/04/06 01:59:49 WARN LiveListenerBus: Dropped 196647 SparkListenerEvents since Thu Apr 06 01:58:49 JST 2017 [Stage 0:=> (111241 + 5) / 100] {code} After some time we get this: {code} rBus: SparkListenerBus has already stopped! Dropping event SparkListenerExecutorMetricsUpdate(2,WrappedArray()) [Stage 0:==> (126782 + 4) / 100]17/04/06 02:12:28 ERROR LiveListenerBus: SparkListenerBus has already stopped! Dropping event SparkListenerExecutorMetricsUpdate(3,WrappedArray()) [Stage 0:==> (126919 + 5) / 100]17/04/06 02:12:31 ERROR LiveListenerBus: SparkListenerBus has already stopped! Dropping event SparkListenerExecutorMetricsUpdate(4,WrappedArray()) [Stage 0:==> (126982 + 4) / 100]17/04/06 02:12:32 ERROR LiveListenerBus: SparkListenerBus has already stopped! Dropping event SparkListenerExecutorMetricsUpdate(1,WrappedArray()) [Stage 0:==> (127030 + 5) / 100]17/04/06 02:12:33 ERROR LiveListenerBus: SparkListenerBus has already stopped! Dropping event SparkListenerExecutorMetricsUpdate(2,WrappedArray()) [Stage 0:==>
[jira] [Commented] (SPARK-20228) Random Forest instable results depending on spark.executor.memory
[ https://issues.apache.org/jira/browse/SPARK-20228?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15957257#comment-15957257 ] Sean Owen commented on SPARK-20228: --- I don't think I would expect the exact same results even with identical settings. These are stochastic implementations. More resource can change partitioning, etc. > Random Forest instable results depending on spark.executor.memory > - > > Key: SPARK-20228 > URL: https://issues.apache.org/jira/browse/SPARK-20228 > Project: Spark > Issue Type: Bug > Components: PySpark >Affects Versions: 2.1.0 >Reporter: Ansgar Schulze > > If I deploy a random forrest modeling with example > spark.executor.memory20480M > I got another result as if i depoy the modeling with > spark.executor.memory6000M > I excpected the same results but different runtimes. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-20219) Schedule tasks based on size of input from ScheduledRDD
[ https://issues.apache.org/jira/browse/SPARK-20219?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15957253#comment-15957253 ] Kay Ousterhout commented on SPARK-20219: Like [~mridulm80] (as mentioned on the PR) I'm hesitant about this idea because of the added complexity and information "leakage" from the TaskScheduler back to the DAGScheduler (in general, we should be making this interface between these components smaller, to make the code easier to reason about -- not larger). [~jinxing6...@126.com] you mentioned some use cases when this is helpful; can you post some concrete performance numbers about difference in runtimes? cc [~imranr]-- thoughts here about whether the performance improvement is worth the added complexity? > Schedule tasks based on size of input from ScheduledRDD > --- > > Key: SPARK-20219 > URL: https://issues.apache.org/jira/browse/SPARK-20219 > Project: Spark > Issue Type: Improvement > Components: Spark Core >Affects Versions: 2.1.0 >Reporter: jin xing > > When data is highly skewed on ShuffledRDD, it make sense to launch those > tasks which process much more input as soon as possible. The current > scheduling mechanism in *TaskSetManager* is quite simple: > {code} > for (i <- (0 until numTasks).reverse) { > addPendingTask(i) > } > {code} > In scenario that "large tasks" locate at bottom half of tasks array, if tasks > with much more input are launched early, we can significantly reduce the time > cost and save resource when *"dynamic allocation"* is disabled. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-20229) add semanticHash to QueryPlan
Wenchen Fan created SPARK-20229: --- Summary: add semanticHash to QueryPlan Key: SPARK-20229 URL: https://issues.apache.org/jira/browse/SPARK-20229 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 2.2.0 Reporter: Wenchen Fan Assignee: Wenchen Fan -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-20228) Random Forest instable results depending on spark.executor.memory
Ansgar Schulze created SPARK-20228: -- Summary: Random Forest instable results depending on spark.executor.memory Key: SPARK-20228 URL: https://issues.apache.org/jira/browse/SPARK-20228 Project: Spark Issue Type: Bug Components: PySpark Affects Versions: 2.1.0 Reporter: Ansgar Schulze If I deploy a random forrest modeling with example spark.executor.memory20480M I got another result as if i depoy the modeling with spark.executor.memory6000M I excpected the same results but different runtimes. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-20227) Job hangs when joining a lot of aggregated columns
[ https://issues.apache.org/jira/browse/SPARK-20227?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15956985#comment-15956985 ] Sean Owen commented on SPARK-20227: --- Are you sure it's not just "a longer time" and not "forever"? 22 is a magic number given how some of the Scala internals work. Try 25 cols. > Job hangs when joining a lot of aggregated columns > -- > > Key: SPARK-20227 > URL: https://issues.apache.org/jira/browse/SPARK-20227 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.1.0 > Environment: AWS emr-5.4.0 m4.xlarge >Reporter: Quentin Auge > > I'm trying to replace a lot of different columns in a dataframe with > aggregates of themselves, and then join the resulting dataframe. > {code} > # Create a dataframe with 1 row and 50 columns > n = 50 > df = sc.parallelize([Row(*range(n))]).toDF() > cols = df.columns > # Replace each column values with aggregated values > window = Window.partitionBy(cols[0]) > for col in cols[1:]: > df = df.withColumn(col, sum(col).over(window)) > # Join > other_df = sc.parallelize([Row(0)]).toDF() > result = other_df.join(df, on = cols[0]) > result.show() > {code} > Spark hangs forever when executing the last line. The strange thing is, it > depends on the number of columns. Spark does not hang for n = 5, 10, or 20 > columns. For n = 50 and beyond, it does. > {code} > 17/04/05 14:39:28 INFO ExecutorAllocationManager: Removing executor 1 because > it has been idle for 60 seconds (new desired total will be 0) > 17/04/05 14:39:29 INFO YarnSchedulerBackend$YarnDriverEndpoint: Disabling > executor 1. > 17/04/05 14:39:29 INFO DAGScheduler: Executor lost: 1 (epoch 0) > 17/04/05 14:39:29 INFO BlockManagerMasterEndpoint: Trying to remove executor > 1 from BlockManagerMaster. > 17/04/05 14:39:29 INFO BlockManagerMasterEndpoint: Removing block manager > BlockManagerId(1, ip-172-30-0-149.ec2.internal, 35666, None) > 17/04/05 14:39:29 INFO BlockManagerMaster: Removed 1 successfully in > removeExecutor > 17/04/05 14:39:29 INFO YarnScheduler: Executor 1 on > ip-172-30-0-149.ec2.internal killed by driver. > 17/04/05 14:39:29 INFO ExecutorAllocationManager: Existing executor 1 has > been removed (new total is 0) > {code} > All executors are inactive and thus killed after 60 seconds, the master > spends some CPU on a process that hangs indefinitely, and the workers are > idle. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-20227) Job hangs when joining a lot of aggregated columns
[ https://issues.apache.org/jira/browse/SPARK-20227?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Quentin Auge updated SPARK-20227: - Description: I'm trying to replace a lot of different columns in a dataframe with aggregates of themselves, and then join the resulting dataframe. {code} # Create a dataframe with 1 row and 50 columns n = 50 df = sc.parallelize([Row(*range(n))]).toDF() cols = df.columns # Replace each column values with aggregated values window = Window.partitionBy(cols[0]) for col in cols[1:]: df = df.withColumn(col, sum(col).over(window)) # Join other_df = sc.parallelize([Row(0)]).toDF() result = other_df.join(df, on = cols[0]) result.show() {code} Spark hangs forever when executing the last line. The strange thing is, it depends on the number of columns. Spark does not hang for n = 5, 10, or 20 columns. For n = 50 and beyond, it does. {code} 17/04/05 14:39:28 INFO ExecutorAllocationManager: Removing executor 1 because it has been idle for 60 seconds (new desired total will be 0) 17/04/05 14:39:29 INFO YarnSchedulerBackend$YarnDriverEndpoint: Disabling executor 1. 17/04/05 14:39:29 INFO DAGScheduler: Executor lost: 1 (epoch 0) 17/04/05 14:39:29 INFO BlockManagerMasterEndpoint: Trying to remove executor 1 from BlockManagerMaster. 17/04/05 14:39:29 INFO BlockManagerMasterEndpoint: Removing block manager BlockManagerId(1, ip-172-30-0-149.ec2.internal, 35666, None) 17/04/05 14:39:29 INFO BlockManagerMaster: Removed 1 successfully in removeExecutor 17/04/05 14:39:29 INFO YarnScheduler: Executor 1 on ip-172-30-0-149.ec2.internal killed by driver. 17/04/05 14:39:29 INFO ExecutorAllocationManager: Existing executor 1 has been removed (new total is 0) {code} All executors are inactive and thus killed after 60 seconds, the master spends some CPU on a process that hangs indefinitely, and the workers are idle. was: I'm trying to replace a lot of different columns in a dataframe with aggregates of themselves, and then join the resulting dataframe. {code} # Create a dataframe with 1 row and 50 columns n = 50 df = sc.parallelize([Row(*range(n))]).toDF() cols = df.columns # Replace each column values with aggregated values window = Window.partitionBy(cols[0]) for col in cols[1:]: df = df.withColumn(col, sum(col).over(window)) # Join other_df = sc.parallelize([Row(0)]).toDF() result = other_df.join(df, on = cols[0]) result.show() {code} Spark hangs forever when executing the last line. The strange thing is, it depends on the number of columns. Spark does not hang for n = 5, 10, or 20 columns. For n = 50 and above, it does. {code} 17/04/05 14:39:28 INFO ExecutorAllocationManager: Removing executor 1 because it has been idle for 60 seconds (new desired total will be 0) 17/04/05 14:39:29 INFO YarnSchedulerBackend$YarnDriverEndpoint: Disabling executor 1. 17/04/05 14:39:29 INFO DAGScheduler: Executor lost: 1 (epoch 0) 17/04/05 14:39:29 INFO BlockManagerMasterEndpoint: Trying to remove executor 1 from BlockManagerMaster. 17/04/05 14:39:29 INFO BlockManagerMasterEndpoint: Removing block manager BlockManagerId(1, ip-172-30-0-149.ec2.internal, 35666, None) 17/04/05 14:39:29 INFO BlockManagerMaster: Removed 1 successfully in removeExecutor 17/04/05 14:39:29 INFO YarnScheduler: Executor 1 on ip-172-30-0-149.ec2.internal killed by driver. 17/04/05 14:39:29 INFO ExecutorAllocationManager: Existing executor 1 has been removed (new total is 0) {code} All executors are inactive and thus killed after 60 seconds, the master spends some CPU on a process that hangs indefinitely, and the workers are idle. > Job hangs when joining a lot of aggregated columns > -- > > Key: SPARK-20227 > URL: https://issues.apache.org/jira/browse/SPARK-20227 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.1.0 > Environment: AWS emr-5.4.0 m4.xlarge >Reporter: Quentin Auge > > I'm trying to replace a lot of different columns in a dataframe with > aggregates of themselves, and then join the resulting dataframe. > {code} > # Create a dataframe with 1 row and 50 columns > n = 50 > df = sc.parallelize([Row(*range(n))]).toDF() > cols = df.columns > # Replace each column values with aggregated values > window = Window.partitionBy(cols[0]) > for col in cols[1:]: > df = df.withColumn(col, sum(col).over(window)) > # Join > other_df = sc.parallelize([Row(0)]).toDF() > result = other_df.join(df, on = cols[0]) > result.show() > {code} > Spark hangs forever when executing the last line. The strange thing is, it > depends on the number of columns. Spark does not hang for n = 5, 10, or 20 > columns. For n = 50 and beyond, it does. > {code} > 17/04/05 14:39:28 INFO ExecutorAllocationManager: Removing executor 1 because > it has been idle for 60 seconds (new desired total will be 0) >
[jira] [Updated] (SPARK-20227) Job hangs when joining a lot of aggregated columns
[ https://issues.apache.org/jira/browse/SPARK-20227?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Quentin Auge updated SPARK-20227: - Description: I'm trying to replace a lot of different columns in a dataframe with aggregates of themselves, and then join the resulting dataframe. {code} # Create a dataframe with 1 row and 50 columns n = 50 df = sc.parallelize([Row(*range(n))]).toDF() cols = df.columns # Replace each column values with aggregated values window = Window.partitionBy(cols[0]) for col in cols[1:]: df = df.withColumn(col, sum(col).over(window)) # Join other_df = sc.parallelize([Row(0)]).toDF() result = other_df.join(df, on = cols[0]) result.show() {code} Spark hangs forever when executing the last line. The strange thing is, it depends on the number of columns. Spark does not hang for n = 5, 10, or 20 columns. For n = 50 and above, it does. {code} 17/04/05 14:39:28 INFO ExecutorAllocationManager: Removing executor 1 because it has been idle for 60 seconds (new desired total will be 0) 17/04/05 14:39:29 INFO YarnSchedulerBackend$YarnDriverEndpoint: Disabling executor 1. 17/04/05 14:39:29 INFO DAGScheduler: Executor lost: 1 (epoch 0) 17/04/05 14:39:29 INFO BlockManagerMasterEndpoint: Trying to remove executor 1 from BlockManagerMaster. 17/04/05 14:39:29 INFO BlockManagerMasterEndpoint: Removing block manager BlockManagerId(1, ip-172-30-0-149.ec2.internal, 35666, None) 17/04/05 14:39:29 INFO BlockManagerMaster: Removed 1 successfully in removeExecutor 17/04/05 14:39:29 INFO YarnScheduler: Executor 1 on ip-172-30-0-149.ec2.internal killed by driver. 17/04/05 14:39:29 INFO ExecutorAllocationManager: Existing executor 1 has been removed (new total is 0) {code} All executors are inactive and thus killed after 60 seconds, the master spends some CPU on a process that hangs indefinitely, and the workers are idle. was: I'm trying to replace a lot of different columns in a dataframe with aggregates of themselves, and then join the resulting dataframe. {code} # Create a dataframe with 1 row and 50 columns n = 50 df = sc.parallelize([Row(*range(n))]).toDF() cols = df.columns # Replace each column values with aggregated values window = Window.partitionBy(cols[0]) for col in cols[1:]: df = df.withColumn(col, sum(col).over(window)) # Join other_df = sc.parallelize([Row(0)]).toDF() result = other_df.join(df, on = cols[0]) result.show() {code} The issue is, Spark hangs forever when executing the last line. {code} 17/04/05 14:39:28 INFO ExecutorAllocationManager: Removing executor 1 because it has been idle for 60 seconds (new desired total will be 0) 17/04/05 14:39:29 INFO YarnSchedulerBackend$YarnDriverEndpoint: Disabling executor 1. 17/04/05 14:39:29 INFO DAGScheduler: Executor lost: 1 (epoch 0) 17/04/05 14:39:29 INFO BlockManagerMasterEndpoint: Trying to remove executor 1 from BlockManagerMaster. 17/04/05 14:39:29 INFO BlockManagerMasterEndpoint: Removing block manager BlockManagerId(1, ip-172-30-0-149.ec2.internal, 35666, None) 17/04/05 14:39:29 INFO BlockManagerMaster: Removed 1 successfully in removeExecutor 17/04/05 14:39:29 INFO YarnScheduler: Executor 1 on ip-172-30-0-149.ec2.internal killed by driver. 17/04/05 14:39:29 INFO ExecutorAllocationManager: Existing executor 1 has been removed (new total is 0) {code} All executors are inactive and thus killed after 60 seconds, the master spends some CPU on a process that hangs indefinitely, and the workers are idle. > Job hangs when joining a lot of aggregated columns > -- > > Key: SPARK-20227 > URL: https://issues.apache.org/jira/browse/SPARK-20227 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.1.0 > Environment: AWS emr-5.4.0 m4.xlarge >Reporter: Quentin Auge > > I'm trying to replace a lot of different columns in a dataframe with > aggregates of themselves, and then join the resulting dataframe. > {code} > # Create a dataframe with 1 row and 50 columns > n = 50 > df = sc.parallelize([Row(*range(n))]).toDF() > cols = df.columns > # Replace each column values with aggregated values > window = Window.partitionBy(cols[0]) > for col in cols[1:]: > df = df.withColumn(col, sum(col).over(window)) > # Join > other_df = sc.parallelize([Row(0)]).toDF() > result = other_df.join(df, on = cols[0]) > result.show() > {code} > Spark hangs forever when executing the last line. The strange thing is, it > depends on the number of columns. Spark does not hang for n = 5, 10, or 20 > columns. For n = 50 and above, it does. > {code} > 17/04/05 14:39:28 INFO ExecutorAllocationManager: Removing executor 1 because > it has been idle for 60 seconds (new desired total will be 0) > 17/04/05 14:39:29 INFO YarnSchedulerBackend$YarnDriverEndpoint: Disabling > executor 1. > 17/04/05 14:39:29 INFO DAGScheduler: E
[jira] [Comment Edited] (SPARK-20227) Job hangs when joining a lot of aggregated columns
[ https://issues.apache.org/jira/browse/SPARK-20227?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15956973#comment-15956973 ] Quentin Auge edited comment on SPARK-20227 at 4/5/17 2:55 PM: -- I managed to get it working using either of the following workarounds: * Quite unsatisfactory: persist df after the for loop * Better: replace the for loop with the following: {code} df = df.select([cols[0]] + [sum(col).over(window).alias(col) for col in cols[1:]]) {code} was (Author: quentin): I managed to get it working using either of the following workarounds: * Quite unsatisfactory: persist df after the for loop * Better: eplacing the for loop with the following: {code} df = df.select([cols[0]] + [sum(col).over(window).alias(col) for col in cols[1:]]) {code} > Job hangs when joining a lot of aggregated columns > -- > > Key: SPARK-20227 > URL: https://issues.apache.org/jira/browse/SPARK-20227 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.1.0 > Environment: AWS emr-5.4.0 m4.xlarge >Reporter: Quentin Auge > > I'm trying to replace a lot of different columns in a dataframe with > aggregates of themselves, and then join the resulting dataframe. > {code} > # Create a dataframe with 1 row and 50 columns > n = 50 > df = sc.parallelize([Row(*range(n))]).toDF() > cols = df.columns > # Replace each column values with aggregated values > window = Window.partitionBy(cols[0]) > for col in cols[1:]: > df = df.withColumn(col, sum(col).over(window)) > # Join > other_df = sc.parallelize([Row(0)]).toDF() > result = other_df.join(df, on = cols[0]) > result.show() > {code} > Spark hangs forever when executing the last line. The strange thing is, it > depends on the number of columns. Spark does not hang for n = 5, 10, or 20 > columns. For n = 50 and above, it does. > {code} > 17/04/05 14:39:28 INFO ExecutorAllocationManager: Removing executor 1 because > it has been idle for 60 seconds (new desired total will be 0) > 17/04/05 14:39:29 INFO YarnSchedulerBackend$YarnDriverEndpoint: Disabling > executor 1. > 17/04/05 14:39:29 INFO DAGScheduler: Executor lost: 1 (epoch 0) > 17/04/05 14:39:29 INFO BlockManagerMasterEndpoint: Trying to remove executor > 1 from BlockManagerMaster. > 17/04/05 14:39:29 INFO BlockManagerMasterEndpoint: Removing block manager > BlockManagerId(1, ip-172-30-0-149.ec2.internal, 35666, None) > 17/04/05 14:39:29 INFO BlockManagerMaster: Removed 1 successfully in > removeExecutor > 17/04/05 14:39:29 INFO YarnScheduler: Executor 1 on > ip-172-30-0-149.ec2.internal killed by driver. > 17/04/05 14:39:29 INFO ExecutorAllocationManager: Existing executor 1 has > been removed (new total is 0) > {code} > All executors are inactive and thus killed after 60 seconds, the master > spends some CPU on a process that hangs indefinitely, and the workers are > idle. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-20227) Job hangs when joining a lot of aggregated columns
[ https://issues.apache.org/jira/browse/SPARK-20227?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15956973#comment-15956973 ] Quentin Auge edited comment on SPARK-20227 at 4/5/17 2:51 PM: -- I managed to get it working using either of the following workarounds: * Quite unsatisfactory: persist df after the for loop * Better: eplacing the for loop with the following: {code} df = df.select([cols[0]] + [sum(col).over(window).alias(col) for col in cols[1:]]) {code} was (Author: quentin): I managed to get is working using either of the following workarounds: * Quite unsatisfactory: persist df after the for loop * Better: eplacing the for loop with the following: {code} df = df.select([cols[0]] + [sum(col).over(window).alias(col) for col in cols[1:]]) {code} > Job hangs when joining a lot of aggregated columns > -- > > Key: SPARK-20227 > URL: https://issues.apache.org/jira/browse/SPARK-20227 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.1.0 > Environment: AWS emr-5.4.0 m4.xlarge >Reporter: Quentin Auge > > I'm trying to replace a lot of different columns in a dataframe with > aggregates of themselves, and then join the resulting dataframe. > {code} > # Create a dataframe with 1 row and 50 columns > n = 50 > df = sc.parallelize([Row(*range(n))]).toDF() > cols = df.columns > # Replace each column values with aggregated values > window = Window.partitionBy(cols[0]) > for col in cols[1:]: > df = df.withColumn(col, sum(col).over(window)) > # Join > other_df = sc.parallelize([Row(0)]).toDF() > result = other_df.join(df, on = cols[0]) > result.show() > {code} > The issue is, Spark hangs forever when executing the last line. > {code} > 17/04/05 14:39:28 INFO ExecutorAllocationManager: Removing executor 1 because > it has been idle for 60 seconds (new desired total will be 0) > 17/04/05 14:39:29 INFO YarnSchedulerBackend$YarnDriverEndpoint: Disabling > executor 1. > 17/04/05 14:39:29 INFO DAGScheduler: Executor lost: 1 (epoch 0) > 17/04/05 14:39:29 INFO BlockManagerMasterEndpoint: Trying to remove executor > 1 from BlockManagerMaster. > 17/04/05 14:39:29 INFO BlockManagerMasterEndpoint: Removing block manager > BlockManagerId(1, ip-172-30-0-149.ec2.internal, 35666, None) > 17/04/05 14:39:29 INFO BlockManagerMaster: Removed 1 successfully in > removeExecutor > 17/04/05 14:39:29 INFO YarnScheduler: Executor 1 on > ip-172-30-0-149.ec2.internal killed by driver. > 17/04/05 14:39:29 INFO ExecutorAllocationManager: Existing executor 1 has > been removed (new total is 0) > {code} > All executors are inactive and thus killed after 60 seconds, the master > spends some CPU on a process that hangs indefinitely, and the workers are > idle. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-20227) Job hangs when joining a lot of aggregated columns
[ https://issues.apache.org/jira/browse/SPARK-20227?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15956973#comment-15956973 ] Quentin Auge edited comment on SPARK-20227 at 4/5/17 2:51 PM: -- I manages to get is working using either of the following workarounds: * Quite unsatisfactory: persist df after the for loop * Better: eplacing the for loop with the following: {code} df = df.select([cols[0]] + [sum(col).over(window).alias(col) for col in cols[1:]]) {code} was (Author: quentin): I found two distinct workarounds that prevent the job for hanging: * Quite unsatisfactory: persist df after the for loop * Better: eplacing the for loop with the following: {code} df = df.select([cols[0]] + [sum(col).over(window).alias(col) for col in cols[1:]]) {code} > Job hangs when joining a lot of aggregated columns > -- > > Key: SPARK-20227 > URL: https://issues.apache.org/jira/browse/SPARK-20227 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.1.0 > Environment: AWS emr-5.4.0 m4.xlarge >Reporter: Quentin Auge > > I'm trying to replace a lot of different columns in a dataframe with > aggregates of themselves, and then join the resulting dataframe. > {code} > # Create a dataframe with 1 row and 50 columns > n = 50 > df = sc.parallelize([Row(*range(n))]).toDF() > cols = df.columns > # Replace each column values with aggregated values > window = Window.partitionBy(cols[0]) > for col in cols[1:]: > df = df.withColumn(col, sum(col).over(window)) > # Join > other_df = sc.parallelize([Row(0)]).toDF() > result = other_df.join(df, on = cols[0]) > result.show() > {code} > The issue is, Spark hangs forever when executing the last line. > {code} > 17/04/05 14:39:28 INFO ExecutorAllocationManager: Removing executor 1 because > it has been idle for 60 seconds (new desired total will be 0) > 17/04/05 14:39:29 INFO YarnSchedulerBackend$YarnDriverEndpoint: Disabling > executor 1. > 17/04/05 14:39:29 INFO DAGScheduler: Executor lost: 1 (epoch 0) > 17/04/05 14:39:29 INFO BlockManagerMasterEndpoint: Trying to remove executor > 1 from BlockManagerMaster. > 17/04/05 14:39:29 INFO BlockManagerMasterEndpoint: Removing block manager > BlockManagerId(1, ip-172-30-0-149.ec2.internal, 35666, None) > 17/04/05 14:39:29 INFO BlockManagerMaster: Removed 1 successfully in > removeExecutor > 17/04/05 14:39:29 INFO YarnScheduler: Executor 1 on > ip-172-30-0-149.ec2.internal killed by driver. > 17/04/05 14:39:29 INFO ExecutorAllocationManager: Existing executor 1 has > been removed (new total is 0) > {code} > All executors are inactive and thus killed after 60 seconds, the master > spends some CPU on a process that hangs indefinitely, and the workers are > idle. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-20227) Job hangs when joining a lot of aggregated columns
[ https://issues.apache.org/jira/browse/SPARK-20227?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15956973#comment-15956973 ] Quentin Auge edited comment on SPARK-20227 at 4/5/17 2:51 PM: -- I managed to get is working using either of the following workarounds: * Quite unsatisfactory: persist df after the for loop * Better: eplacing the for loop with the following: {code} df = df.select([cols[0]] + [sum(col).over(window).alias(col) for col in cols[1:]]) {code} was (Author: quentin): I manages to get is working using either of the following workarounds: * Quite unsatisfactory: persist df after the for loop * Better: eplacing the for loop with the following: {code} df = df.select([cols[0]] + [sum(col).over(window).alias(col) for col in cols[1:]]) {code} > Job hangs when joining a lot of aggregated columns > -- > > Key: SPARK-20227 > URL: https://issues.apache.org/jira/browse/SPARK-20227 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.1.0 > Environment: AWS emr-5.4.0 m4.xlarge >Reporter: Quentin Auge > > I'm trying to replace a lot of different columns in a dataframe with > aggregates of themselves, and then join the resulting dataframe. > {code} > # Create a dataframe with 1 row and 50 columns > n = 50 > df = sc.parallelize([Row(*range(n))]).toDF() > cols = df.columns > # Replace each column values with aggregated values > window = Window.partitionBy(cols[0]) > for col in cols[1:]: > df = df.withColumn(col, sum(col).over(window)) > # Join > other_df = sc.parallelize([Row(0)]).toDF() > result = other_df.join(df, on = cols[0]) > result.show() > {code} > The issue is, Spark hangs forever when executing the last line. > {code} > 17/04/05 14:39:28 INFO ExecutorAllocationManager: Removing executor 1 because > it has been idle for 60 seconds (new desired total will be 0) > 17/04/05 14:39:29 INFO YarnSchedulerBackend$YarnDriverEndpoint: Disabling > executor 1. > 17/04/05 14:39:29 INFO DAGScheduler: Executor lost: 1 (epoch 0) > 17/04/05 14:39:29 INFO BlockManagerMasterEndpoint: Trying to remove executor > 1 from BlockManagerMaster. > 17/04/05 14:39:29 INFO BlockManagerMasterEndpoint: Removing block manager > BlockManagerId(1, ip-172-30-0-149.ec2.internal, 35666, None) > 17/04/05 14:39:29 INFO BlockManagerMaster: Removed 1 successfully in > removeExecutor > 17/04/05 14:39:29 INFO YarnScheduler: Executor 1 on > ip-172-30-0-149.ec2.internal killed by driver. > 17/04/05 14:39:29 INFO ExecutorAllocationManager: Existing executor 1 has > been removed (new total is 0) > {code} > All executors are inactive and thus killed after 60 seconds, the master > spends some CPU on a process that hangs indefinitely, and the workers are > idle. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-20227) Job hangs when joining a lot of aggregated columns
[ https://issues.apache.org/jira/browse/SPARK-20227?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15956973#comment-15956973 ] Quentin Auge commented on SPARK-20227: -- I found two distinct workarounds that prevent the job for hanging: * Quite unsatisfactory: persist df after the for loop * Better: eplacing the for loop with the following: {code} df = df.select([cols[0]] + [sum(col).over(window).alias(col) for col in cols[1:]]) {code} > Job hangs when joining a lot of aggregated columns > -- > > Key: SPARK-20227 > URL: https://issues.apache.org/jira/browse/SPARK-20227 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.1.0 > Environment: AWS emr-5.4.0 m4.xlarge >Reporter: Quentin Auge > > I'm trying to replace a lot of different columns in a dataframe with > aggregates of themselves, and then join the resulting dataframe. > {code} > # Create a dataframe with 1 row and 50 columns > n = 50 > df = sc.parallelize([Row(*range(n))]).toDF() > cols = df.columns > # Replace each column values with aggregated values > window = Window.partitionBy(cols[0]) > for col in cols[1:]: > df = df.withColumn(col, sum(col).over(window)) > # Join > other_df = sc.parallelize([Row(0)]).toDF() > result = other_df.join(df, on = cols[0]) > result.show() > {code} > The issue is, Spark hangs forever when executing the last line. > {code} > 17/04/05 14:39:28 INFO ExecutorAllocationManager: Removing executor 1 because > it has been idle for 60 seconds (new desired total will be 0) > 17/04/05 14:39:29 INFO YarnSchedulerBackend$YarnDriverEndpoint: Disabling > executor 1. > 17/04/05 14:39:29 INFO DAGScheduler: Executor lost: 1 (epoch 0) > 17/04/05 14:39:29 INFO BlockManagerMasterEndpoint: Trying to remove executor > 1 from BlockManagerMaster. > 17/04/05 14:39:29 INFO BlockManagerMasterEndpoint: Removing block manager > BlockManagerId(1, ip-172-30-0-149.ec2.internal, 35666, None) > 17/04/05 14:39:29 INFO BlockManagerMaster: Removed 1 successfully in > removeExecutor > 17/04/05 14:39:29 INFO YarnScheduler: Executor 1 on > ip-172-30-0-149.ec2.internal killed by driver. > 17/04/05 14:39:29 INFO ExecutorAllocationManager: Existing executor 1 has > been removed (new total is 0) > {code} > All executors are inactive and thus killed after 60 seconds, the master > spends some CPU on a process that hangs indefinitely, and the workers are > idle. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-20227) Job hangs when joining a lot of aggregated columns
[ https://issues.apache.org/jira/browse/SPARK-20227?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Quentin Auge updated SPARK-20227: - Description: I'm trying to replace a lot of different columns in a dataframe with aggregates of themselves, and then join the resulting dataframe. {code} # Create a dataframe with 1 row and 50 columns n = 50 df = sc.parallelize([Row(*range(n))]).toDF() cols = df.columns # Replace each column values with aggregated values window = Window.partitionBy(cols[0]) for col in cols[1:]: df = df.withColumn(col, sum(col).over(window)) # Join other_df = sc.parallelize([Row(0)]).toDF() result = other_df.join(df, on = cols[0]) result.show() {code} The issue is, Spark hangs forever when executing the last line. {code} 17/04/05 14:39:28 INFO ExecutorAllocationManager: Removing executor 1 because it has been idle for 60 seconds (new desired total will be 0) 17/04/05 14:39:29 INFO YarnSchedulerBackend$YarnDriverEndpoint: Disabling executor 1. 17/04/05 14:39:29 INFO DAGScheduler: Executor lost: 1 (epoch 0) 17/04/05 14:39:29 INFO BlockManagerMasterEndpoint: Trying to remove executor 1 from BlockManagerMaster. 17/04/05 14:39:29 INFO BlockManagerMasterEndpoint: Removing block manager BlockManagerId(1, ip-172-30-0-149.ec2.internal, 35666, None) 17/04/05 14:39:29 INFO BlockManagerMaster: Removed 1 successfully in removeExecutor 17/04/05 14:39:29 INFO YarnScheduler: Executor 1 on ip-172-30-0-149.ec2.internal killed by driver. 17/04/05 14:39:29 INFO ExecutorAllocationManager: Existing executor 1 has been removed (new total is 0) {code} All executors are inactive and thus killed after 60 seconds, the master spends some CPU on a process that hangs indefinitely, and the workers are idle. was: I'm trying to replace a lot of different columns in a dataframe with aggregates of themselves, and then join the resulting dataframe. {code:python} # Create a dataframe with 1 row and 50 columns n = 50 df = sc.parallelize([Row(*range(n))]).toDF() cols = df.columns # Replace each column values with aggregated values window = Window.partitionBy(cols[0]) for col in cols[1:]: df = df.withColumn(col, sum(col).over(window)) # Join other_df = sc.parallelize([Row(0)]).toDF() result = other_df.join(df, on = cols[0]) result.show() {code} The issue is, Spark hangs forever when executing the last line. {code} 17/04/05 14:39:28 INFO ExecutorAllocationManager: Removing executor 1 because it has been idle for 60 seconds (new desired total will be 0) 17/04/05 14:39:29 INFO YarnSchedulerBackend$YarnDriverEndpoint: Disabling executor 1. 17/04/05 14:39:29 INFO DAGScheduler: Executor lost: 1 (epoch 0) 17/04/05 14:39:29 INFO BlockManagerMasterEndpoint: Trying to remove executor 1 from BlockManagerMaster. 17/04/05 14:39:29 INFO BlockManagerMasterEndpoint: Removing block manager BlockManagerId(1, ip-172-30-0-149.ec2.internal, 35666, None) 17/04/05 14:39:29 INFO BlockManagerMaster: Removed 1 successfully in removeExecutor 17/04/05 14:39:29 INFO YarnScheduler: Executor 1 on ip-172-30-0-149.ec2.internal killed by driver. 17/04/05 14:39:29 INFO ExecutorAllocationManager: Existing executor 1 has been removed (new total is 0) {code} All executors are inactive and thus killed after 60 seconds, the master spends some CPU on a process that hangs indefinitely, and the workers are idle. > Job hangs when joining a lot of aggregated columns > -- > > Key: SPARK-20227 > URL: https://issues.apache.org/jira/browse/SPARK-20227 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.1.0 > Environment: AWS emr-5.4.0 m4.xlarge >Reporter: Quentin Auge > > I'm trying to replace a lot of different columns in a dataframe with > aggregates of themselves, and then join the resulting dataframe. > {code} > # Create a dataframe with 1 row and 50 columns > n = 50 > df = sc.parallelize([Row(*range(n))]).toDF() > cols = df.columns > # Replace each column values with aggregated values > window = Window.partitionBy(cols[0]) > for col in cols[1:]: > df = df.withColumn(col, sum(col).over(window)) > # Join > other_df = sc.parallelize([Row(0)]).toDF() > result = other_df.join(df, on = cols[0]) > result.show() > {code} > The issue is, Spark hangs forever when executing the last line. > {code} > 17/04/05 14:39:28 INFO ExecutorAllocationManager: Removing executor 1 because > it has been idle for 60 seconds (new desired total will be 0) > 17/04/05 14:39:29 INFO YarnSchedulerBackend$YarnDriverEndpoint: Disabling > executor 1. > 17/04/05 14:39:29 INFO DAGScheduler: Executor lost: 1 (epoch 0) > 17/04/05 14:39:29 INFO BlockManagerMasterEndpoint: Trying to remove executor > 1 from BlockManagerMaster. > 17/04/05 14:39:29 INFO BlockManagerMasterEndpoint: Removing block manager > BlockManagerId(1, ip-172-30-0-149.ec2.i
[jira] [Created] (SPARK-20227) Job hangs when joining a lot of aggregated columns
Quentin Auge created SPARK-20227: Summary: Job hangs when joining a lot of aggregated columns Key: SPARK-20227 URL: https://issues.apache.org/jira/browse/SPARK-20227 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 2.1.0 Environment: AWS emr-5.4.0 m4.xlarge Reporter: Quentin Auge I'm trying to replace a lot of different columns in a dataframe with aggregates of themselves, and then join the resulting dataframe. {code:python} # Create a dataframe with 1 row and 50 columns n = 50 df = sc.parallelize([Row(*range(n))]).toDF() cols = df.columns # Replace each column values with aggregated values window = Window.partitionBy(cols[0]) for col in cols[1:]: df = df.withColumn(col, sum(col).over(window)) # Join other_df = sc.parallelize([Row(0)]).toDF() result = other_df.join(df, on = cols[0]) result.show() {code} The issue is, Spark hangs forever when executing the last line. {code} 17/04/05 14:39:28 INFO ExecutorAllocationManager: Removing executor 1 because it has been idle for 60 seconds (new desired total will be 0) 17/04/05 14:39:29 INFO YarnSchedulerBackend$YarnDriverEndpoint: Disabling executor 1. 17/04/05 14:39:29 INFO DAGScheduler: Executor lost: 1 (epoch 0) 17/04/05 14:39:29 INFO BlockManagerMasterEndpoint: Trying to remove executor 1 from BlockManagerMaster. 17/04/05 14:39:29 INFO BlockManagerMasterEndpoint: Removing block manager BlockManagerId(1, ip-172-30-0-149.ec2.internal, 35666, None) 17/04/05 14:39:29 INFO BlockManagerMaster: Removed 1 successfully in removeExecutor 17/04/05 14:39:29 INFO YarnScheduler: Executor 1 on ip-172-30-0-149.ec2.internal killed by driver. 17/04/05 14:39:29 INFO ExecutorAllocationManager: Existing executor 1 has been removed (new total is 0) {code} All executors are inactive and thus killed after 60 seconds, the master spends some CPU on a process that hangs indefinitely, and the workers are idle. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-20226) Call to sqlContext.cacheTable takes an incredibly long time in some cases
[ https://issues.apache.org/jira/browse/SPARK-20226?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Barry Becker updated SPARK-20226: - Attachment: xyzzy.csv Attaching the datafile, but I don't think it is significant. This problem can be reproduced on other small datasets. > Call to sqlContext.cacheTable takes an incredibly long time in some cases > - > > Key: SPARK-20226 > URL: https://issues.apache.org/jira/browse/SPARK-20226 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.1.0 > Environment: linux or windows >Reporter: Barry Becker > Attachments: xyzzy.csv > > > I have a case where the call to sqlContext.cacheTable can take an arbitrarily > long time depending on the number of columns that are referenced in a > withColumn expression applied to a dataframe. > The dataset is small (20 columns 7861 rows). The sequence to reproduce is the > following: > 1) add a new column that references 8 - 14 of the columns in the dataset. >- If I add 8 columns, then the call to cacheTable is fast - like *5 > seconds* >- If I add 11 columns, then it is slow - like *60 seconds* >- and if I add 14 columns, then it basically *takes forever* - I gave up > after 10 minutes or so. > The Column expression that is added, is basically just concatenating > the columns together in a single string. If a number is concatenated on a > string (or vice versa) the number is first converted to a string. > The expression looks something like this: > {code} > `Plate` + `State` + `License Type` + `Summons Number` + `Issue Date` + > `Violation Time` + `Violation` + `Judgment Entry Date` + `Fine Amount` + > `Penalty Amount` + `Interest Amount` > {code} > which we then convert to a Column expression that looks like this: > {code} > UDF(UDF(UDF(UDF(UDF(UDF(UDF(UDF(UDF(UDF('Plate, 'State), 'License Type), > UDF('Summons Number)), UDF('Issue Date)), 'Violation Time), 'Violation), > UDF('Judgment Entry Date)), UDF('Fine Amount)), UDF('Penalty Amount)), > UDF('Interest Amount)) > {code} >where the UDFs are very simple functions that basically call toString > and + as needed. > 2) apply a pipeline that includes some transformers that was saved earlier. > Here are the steps of the pipeline (extracted from parquet) > - > {code}{"class":"org.apache.spark.ml.feature.StringIndexerModel","timestamp":1491333200603,"sparkVersion":"2.1.0","uid":"strIdx_aeb04d2777cc","paramMap":{"handleInvalid":"skip","outputCol":"State_IDX__","inputCol":"State_CLEANED__"}}{code} > - > {code}{"class":"org.apache.spark.ml.feature.StringIndexerModel","timestamp":1491333200837,"sparkVersion":"2.1.0","uid":"strIdx_0164c4c13979","paramMap":{"inputCol":"License > Type_CLEANED__","handleInvalid":"skip","outputCol":"License > Type_IDX__"}}{code} > - > {code}{"class":"org.apache.spark.ml.feature.StringIndexerModel","timestamp":1491333201068,"sparkVersion":"2.1.0","uid":"strIdx_25b6cbd02751","paramMap":{"inputCol":"Violation_CLEANED__","handleInvalid":"skip","outputCol":"Violation_IDX__"}}{code} > - > {code}{"class":"org.apache.spark.ml.feature.StringIndexerModel","timestamp":1491333201282,"sparkVersion":"2.1.0","uid":"strIdx_aa12df0354d9","paramMap":{"handleInvalid":"skip","inputCol":"County_CLEANED__","outputCol":"County_IDX__"}}{code} > - > {code}{"class":"org.apache.spark.ml.feature.StringIndexerModel","timestamp":1491333201552,"sparkVersion":"2.1.0","uid":"strIdx_babb120f3cc1","paramMap":{"handleInvalid":"skip","outputCol":"Issuing > Agency_IDX__","inputCol":"Issuing Agency_CLEANED__"}}{code} > - > {code}{"class":"org.apache.spark.ml.feature.StringIndexerModel","timestamp":1491333201759,"sparkVersion":"2.1.0","uid":"strIdx_5f2de9d9542d","paramMap":{"handleInvalid":"skip","outputCol":"Violation > Status_IDX__","inputCol":"Violation Status_CLEANED__"}}{code} > - > {code}{"class":"org.apache.spark.ml.feature.Bucketizer","timestamp":1491333201987,"sparkVersion":"2.1.0", > "uid":"bucketizer_6f65ca9fa813", > "paramMap":{ > "outputCol":"Summons > Number_BINNED__","handleInvalid":"keep","splits":["-Inf",1.386630656E9,3.696078592E9,4.005258752E9,6.045063168E9,8.136507392E9,"Inf"],"inputCol":"Summons > Number_CLEANED__" >} >}{code} > - > {code}{"class":"org.apache.spark.ml.feature.Bucketizer","timestamp":1491333202079,"sparkVersion":"2.1.0", > "uid":"bucketizer_f5db4fb8120e", > "paramMap":{ > > "splits":["-Inf",1.435215616E9,1.443855616E9,1.447271936E9,1.448222464E9,1.448395264E9,1.448481536E9,1.448827136E9,1.449259264E9,1.449432064E9,1.449518336E9,"Inf"], > "handleInvalid":"keep","outputCol":"Issue > Date_BINNED__","inputCol":"Issue Date_CLEANED__" >} >}{code} > - > {code}{"class":
[jira] [Updated] (SPARK-20226) Call to sqlContext.cacheTable takes an incredibly long time in some cases
[ https://issues.apache.org/jira/browse/SPARK-20226?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Barry Becker updated SPARK-20226: - Description: I have a case where the call to sqlContext.cacheTable can take an arbitrarily long time depending on the number of columns that are referenced in a withColumn expression applied to a dataframe. The dataset is small (20 columns 7861 rows). The sequence to reproduce is the following: 1) add a new column that references 8 - 14 of the columns in the dataset. - If I add 8 columns, then the call to cacheTable is fast - like *5 seconds* - If I add 11 columns, then it is slow - like *60 seconds* - and if I add 14 columns, then it basically *takes forever* - I gave up after 10 minutes or so. The Column expression that is added, is basically just concatenating the columns together in a single string. If a number is concatenated on a string (or vice versa) the number is first converted to a string. The expression looks something like this: {code} `Plate` + `State` + `License Type` + `Summons Number` + `Issue Date` + `Violation Time` + `Violation` + `Judgment Entry Date` + `Fine Amount` + `Penalty Amount` + `Interest Amount` {code} which we then convert to a Column expression that looks like this: {code} UDF(UDF(UDF(UDF(UDF(UDF(UDF(UDF(UDF(UDF('Plate, 'State), 'License Type), UDF('Summons Number)), UDF('Issue Date)), 'Violation Time), 'Violation), UDF('Judgment Entry Date)), UDF('Fine Amount)), UDF('Penalty Amount)), UDF('Interest Amount)) {code} where the UDFs are very simple functions that basically call toString and + as needed. 2) apply a pipeline that includes some transformers that was saved earlier. Here are the steps of the pipeline (extracted from parquet) - {code}{"class":"org.apache.spark.ml.feature.StringIndexerModel","timestamp":1491333200603,"sparkVersion":"2.1.0","uid":"strIdx_aeb04d2777cc","paramMap":{"handleInvalid":"skip","outputCol":"State_IDX__","inputCol":"State_CLEANED__"}}{code} - {code}{"class":"org.apache.spark.ml.feature.StringIndexerModel","timestamp":1491333200837,"sparkVersion":"2.1.0","uid":"strIdx_0164c4c13979","paramMap":{"inputCol":"License Type_CLEANED__","handleInvalid":"skip","outputCol":"License Type_IDX__"}}{code} - {code}{"class":"org.apache.spark.ml.feature.StringIndexerModel","timestamp":1491333201068,"sparkVersion":"2.1.0","uid":"strIdx_25b6cbd02751","paramMap":{"inputCol":"Violation_CLEANED__","handleInvalid":"skip","outputCol":"Violation_IDX__"}}{code} - {code}{"class":"org.apache.spark.ml.feature.StringIndexerModel","timestamp":1491333201282,"sparkVersion":"2.1.0","uid":"strIdx_aa12df0354d9","paramMap":{"handleInvalid":"skip","inputCol":"County_CLEANED__","outputCol":"County_IDX__"}}{code} - {code}{"class":"org.apache.spark.ml.feature.StringIndexerModel","timestamp":1491333201552,"sparkVersion":"2.1.0","uid":"strIdx_babb120f3cc1","paramMap":{"handleInvalid":"skip","outputCol":"Issuing Agency_IDX__","inputCol":"Issuing Agency_CLEANED__"}}{code} - {code}{"class":"org.apache.spark.ml.feature.StringIndexerModel","timestamp":1491333201759,"sparkVersion":"2.1.0","uid":"strIdx_5f2de9d9542d","paramMap":{"handleInvalid":"skip","outputCol":"Violation Status_IDX__","inputCol":"Violation Status_CLEANED__"}}{code} - {code}{"class":"org.apache.spark.ml.feature.Bucketizer","timestamp":1491333201987,"sparkVersion":"2.1.0", "uid":"bucketizer_6f65ca9fa813", "paramMap":{ "outputCol":"Summons Number_BINNED__","handleInvalid":"keep","splits":["-Inf",1.386630656E9,3.696078592E9,4.005258752E9,6.045063168E9,8.136507392E9,"Inf"],"inputCol":"Summons Number_CLEANED__" } }{code} - {code}{"class":"org.apache.spark.ml.feature.Bucketizer","timestamp":1491333202079,"sparkVersion":"2.1.0", "uid":"bucketizer_f5db4fb8120e", "paramMap":{ "splits":["-Inf",1.435215616E9,1.443855616E9,1.447271936E9,1.448222464E9,1.448395264E9,1.448481536E9,1.448827136E9,1.449259264E9,1.449432064E9,1.449518336E9,"Inf"], "handleInvalid":"keep","outputCol":"Issue Date_BINNED__","inputCol":"Issue Date_CLEANED__" } }{code} - {code}{"class":"org.apache.spark.ml.feature.Bucketizer","timestamp":1491333202172,"sparkVersion":"2.1.0", "uid":"bucketizer_74568a2a5cfd", "paramMap":{ "handleInvalid":"keep","outputCol":"Fine Amount_BINNED__","inputCol":"Fine Amount_CLEANED__","splits":["-Inf",47.5,57.5,62.5,105.0,"Inf"] } }{code} - {code}{"class":"org.apache.spark.ml.feature.Bucketizer","timestamp":1491333202269,"sparkVersion":"2.1.0", "uid":"bucketizer_109705dfdbcd", "paramMap":{"splits":["-Inf",0.00499888241291,"Inf"],"outputCol":"Interest Amount_BINNED__","handleInvalid":"keep","inputCol":"Interest Amount_CLEANED__"} }{code} - {code}{"class":"org.apache.spark.ml.feature.Bucketizer","timestamp":1491333202362,"
[jira] [Commented] (SPARK-20225) Spark Job hangs while writing parquet files to HDFS
[ https://issues.apache.org/jira/browse/SPARK-20225?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15956877#comment-15956877 ] Darshan Mehta commented on SPARK-20225: --- Attached are the Spark UI screenshots with Failed Tasks and stacktrace. > Spark Job hangs while writing parquet files to HDFS > --- > > Key: SPARK-20225 > URL: https://issues.apache.org/jira/browse/SPARK-20225 > Project: Spark > Issue Type: Bug > Components: Input/Output >Affects Versions: 2.1.0 >Reporter: Darshan Mehta > Attachments: Failed_Tasks_2.JPG, Failed_Tasks.JPG > > > While saving a dataframe to parquet using {{baseDataset.write.parquet("some > location")}} 3 stages failed, however, it did not notify the (parent) tasks > which got stuck on 80%. > Here's the sample (pseudo) code: > {code:title=Test.scala|borderStyle=solid} > val values = Map(...) > val range = Seq(..) > val collection = //get collection > collection.foreach(range=>{ > values.foreach(e=> { > val baseDataset = //get dataset > baseDataset.write.parquet("some location") > }) > }) > {code} > This is used as a paragraph in zeppelin and it gets stuck at 80% after > completing 5-6 iterations (I have tried debugging it by writing {{println}} > statements). > Here's what I can see in Thread dump : > {code} > "pool-2-thread-10" #174 prio=5 os_prio=0 tid=0x7f9f1807e800 nid=0x3945 > waiting on condition [0x7f9eddec1000] >java.lang.Thread.State: WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0x000552b4bef0> (a > scala.concurrent.impl.Promise$CompletionLatch) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304) > at > scala.concurrent.impl.Promise$DefaultPromise.tryAwait(Promise.scala:202) > at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:218) > at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:153) > at > org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:619) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:1918) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:1931) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:1951) > at > org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply$mcV$sp(FileFormatWriter.scala:127) > at > org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:121) > at > org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:121) > at > org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57) > at > org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:121) > at > org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:101) > at > org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:58) > - locked <0x000552b92c08> (a > org.apache.spark.sql.execution.command.ExecutedCommandExec) > at > org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:56) > at > org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:74) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:114) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:114) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:135) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) > at > org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:132) > at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:113) > at > org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:87) > - locked <0x000552b92c80> (a > org.apache.spark.sql.execution.QueryExecution) > at > org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:87) > at > org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:492) > at org.apache.spar
[jira] [Commented] (SPARK-20202) Remove references to org.spark-project.hive
[ https://issues.apache.org/jira/browse/SPARK-20202?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15956879#comment-15956879 ] Steve Loughran commented on SPARK-20202: # the ugliness need to inset the spark thrift stuff under the hive thrift stuff is obsolete, can be cut entirely. # with the shading of kryo not needed, an unshaded hive *may* work. I forget which troublespots there were last time, probably the usual suspects: jackson, guava, etc. # Hive 1.2.x refuses to work with Hadoop 3; it considers that an unsupported version. For basic client-side testing, you can build Hadoop 3 with a fake version (e..g {{mvn install -DskipShade -Ddeclared.hadoop.version=2.11}}, but as hadoop version is something which NN/DNs care about, not something that's really going to work in real systems. Presumably later hive versions will address that. If hive take over ownership of the spark 1.2.1-spark branch, this could be done first simply by pulling the spark branch into the Hive repo as a branch, defining the artifact naming properly and releasing it. If that is done, before any release of that 1.2.x branch is done, there's a couple of outstanding PRs to pull in (groovy version for security reasons, ... ).. A quick import & re-release would be the fast way to get this out as an asf-approved binary > Remove references to org.spark-project.hive > --- > > Key: SPARK-20202 > URL: https://issues.apache.org/jira/browse/SPARK-20202 > Project: Spark > Issue Type: Bug > Components: Build, SQL >Affects Versions: 1.6.4, 2.0.3, 2.1.1 >Reporter: Owen O'Malley >Priority: Blocker > > Spark can't continue to depend on their fork of Hive and must move to > standard Hive versions. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-20226) Call to sqlContext.cacheTable takes an incredibly long time in some cases
Barry Becker created SPARK-20226: Summary: Call to sqlContext.cacheTable takes an incredibly long time in some cases Key: SPARK-20226 URL: https://issues.apache.org/jira/browse/SPARK-20226 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 2.1.0 Environment: linux or windows Reporter: Barry Becker I have a case where the call to sqlContext.cacheTable can take an arbitrarily long time depending on the number of columns that are referenced in a withColumn expression applied to a dataframe. The dataset is small (20 columns 7861 rows). The sequence to reproduce is the following: 1) add a new column based on a function of 8 - 14 other colummns. -- If I add 8 columns, then the call to cacheTable is fast - like *5 seconds* -- If I add 11 columns, then it is slow - like *60 seconds* -- and if I add 14 columns, then it basically *takes forever* - I gave up after 10 minutes or so. The Column expression that is added, is basically just concatenating the columns together in a single string. If a number is concatenated on a string (or vice versa) the number is first converted to a string. The expression looks something like this: {code} `Plate` + `State` + `License Type` + `Summons Number` + `Issue Date` + `Violation Time` + `Violation` + `Judgment Entry Date` + `Fine Amount` + `Penalty Amount` + `Interest Amount` {code} which we then convert to a Column expression that looks like this: {code} UDF(UDF(UDF(UDF(UDF(UDF(UDF(UDF(UDF(UDF('Plate, 'State), 'License Type), UDF('Summons Number)), UDF('Issue Date)), 'Violation Time), 'Violation), UDF('Judgment Entry Date)), UDF('Fine Amount)), UDF('Penalty Amount)), UDF('Interest Amount)) {code} where the UDFs are very simple functions that basically call toString and + as needed. 2) apply a pipeline that includes some transformers that was saved earlier. Here are the steps of the pipeline (extracted from parquet) - {code}{"class":"org.apache.spark.ml.feature.StringIndexerModel","timestamp":1491333200603,"sparkVersion":"2.1.0","uid":"strIdx_aeb04d2777cc","paramMap":{"handleInvalid":"skip","outputCol":"State_IDX__","inputCol":"State_CLEANED__"}}{code} - {code}{"class":"org.apache.spark.ml.feature.StringIndexerModel","timestamp":1491333200837,"sparkVersion":"2.1.0","uid":"strIdx_0164c4c13979","paramMap":{"inputCol":"License Type_CLEANED__","handleInvalid":"skip","outputCol":"License Type_IDX__"}}{code} - {code}{"class":"org.apache.spark.ml.feature.StringIndexerModel","timestamp":1491333201068,"sparkVersion":"2.1.0","uid":"strIdx_25b6cbd02751","paramMap":{"inputCol":"Violation_CLEANED__","handleInvalid":"skip","outputCol":"Violation_IDX__"}}{code} - {code}{"class":"org.apache.spark.ml.feature.StringIndexerModel","timestamp":1491333201282,"sparkVersion":"2.1.0","uid":"strIdx_aa12df0354d9","paramMap":{"handleInvalid":"skip","inputCol":"County_CLEANED__","outputCol":"County_IDX__"}}{code} - {code}{"class":"org.apache.spark.ml.feature.StringIndexerModel","timestamp":1491333201552,"sparkVersion":"2.1.0","uid":"strIdx_babb120f3cc1","paramMap":{"handleInvalid":"skip","outputCol":"Issuing Agency_IDX__","inputCol":"Issuing Agency_CLEANED__"}}{code} - {code}{"class":"org.apache.spark.ml.feature.StringIndexerModel","timestamp":1491333201759,"sparkVersion":"2.1.0","uid":"strIdx_5f2de9d9542d","paramMap":{"handleInvalid":"skip","outputCol":"Violation Status_IDX__","inputCol":"Violation Status_CLEANED__"}}{code} - {code}{"class":"org.apache.spark.ml.feature.Bucketizer","timestamp":1491333201987,"sparkVersion":"2.1.0", "uid":"bucketizer_6f65ca9fa813", "paramMap":{ "outputCol":"Summons Number_BINNED__","handleInvalid":"keep","splits":["-Inf",1.386630656E9,3.696078592E9,4.005258752E9,6.045063168E9,8.136507392E9,"Inf"],"inputCol":"Summons Number_CLEANED__" } }{code} - {code}{"class":"org.apache.spark.ml.feature.Bucketizer","timestamp":1491333202079,"sparkVersion":"2.1.0", "uid":"bucketizer_f5db4fb8120e", "paramMap":{ "splits":["-Inf",1.435215616E9,1.443855616E9,1.447271936E9,1.448222464E9,1.448395264E9,1.448481536E9,1.448827136E9,1.449259264E9,1.449432064E9,1.449518336E9,"Inf"], "handleInvalid":"keep","outputCol":"Issue Date_BINNED__","inputCol":"Issue Date_CLEANED__" } }{code} - {code}{"class":"org.apache.spark.ml.feature.Bucketizer","timestamp":1491333202172,"sparkVersion":"2.1.0", "uid":"bucketizer_74568a2a5cfd", "paramMap":{ "handleInvalid":"keep","outputCol":"Fine Amount_BINNED__","inputCol":"Fine Amount_CLEANED__","splits":["-Inf",47.5,57.5,62.5,105.0,"Inf"] } }{code} - {code}{"class":"org.apache.spark.ml.feature.Bucketizer","timestamp":1491333202269,"sparkVersion":"2.1.0", "uid":"bucketizer_1097
[jira] [Updated] (SPARK-20225) Spark Job hangs while writing parquet files to HDFS
[ https://issues.apache.org/jira/browse/SPARK-20225?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Darshan Mehta updated SPARK-20225: -- Attachment: Failed_Tasks_2.JPG Failed_Tasks.JPG > Spark Job hangs while writing parquet files to HDFS > --- > > Key: SPARK-20225 > URL: https://issues.apache.org/jira/browse/SPARK-20225 > Project: Spark > Issue Type: Bug > Components: Input/Output >Affects Versions: 2.1.0 >Reporter: Darshan Mehta > Attachments: Failed_Tasks_2.JPG, Failed_Tasks.JPG > > > While saving a dataframe to parquet using {{baseDataset.write.parquet("some > location")}} 3 stages failed, however, it did not notify the (parent) tasks > which got stuck on 80%. > Here's the sample (pseudo) code: > {code:title=Test.scala|borderStyle=solid} > val values = Map(...) > val range = Seq(..) > val collection = //get collection > collection.foreach(range=>{ > values.foreach(e=> { > val baseDataset = //get dataset > baseDataset.write.parquet("some location") > }) > }) > {code} > This is used as a paragraph in zeppelin and it gets stuck at 80% after > completing 5-6 iterations (I have tried debugging it by writing {{println}} > statements). > Here's what I can see in Thread dump : > {code} > "pool-2-thread-10" #174 prio=5 os_prio=0 tid=0x7f9f1807e800 nid=0x3945 > waiting on condition [0x7f9eddec1000] >java.lang.Thread.State: WAITING (parking) > at sun.misc.Unsafe.park(Native Method) > - parking to wait for <0x000552b4bef0> (a > scala.concurrent.impl.Promise$CompletionLatch) > at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997) > at > java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304) > at > scala.concurrent.impl.Promise$DefaultPromise.tryAwait(Promise.scala:202) > at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:218) > at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:153) > at > org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:619) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:1918) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:1931) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:1951) > at > org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply$mcV$sp(FileFormatWriter.scala:127) > at > org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:121) > at > org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:121) > at > org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57) > at > org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:121) > at > org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:101) > at > org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:58) > - locked <0x000552b92c08> (a > org.apache.spark.sql.execution.command.ExecutedCommandExec) > at > org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:56) > at > org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:74) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:114) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:114) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:135) > at > org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) > at > org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:132) > at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:113) > at > org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:87) > - locked <0x000552b92c80> (a > org.apache.spark.sql.execution.QueryExecution) > at > org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:87) > at > org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:492) > at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:215) >
[jira] [Created] (SPARK-20225) Spark Job hangs while writing parquet files to HDFS
Darshan Mehta created SPARK-20225: - Summary: Spark Job hangs while writing parquet files to HDFS Key: SPARK-20225 URL: https://issues.apache.org/jira/browse/SPARK-20225 Project: Spark Issue Type: Bug Components: Input/Output Affects Versions: 2.1.0 Reporter: Darshan Mehta While saving a dataframe to parquet using {{baseDataset.write.parquet("some location")}} 3 stages failed, however, it did not notify the (parent) tasks which got stuck on 80%. Here's the sample (pseudo) code: {code:title=Test.scala|borderStyle=solid} val values = Map(...) val range = Seq(..) val collection = //get collection collection.foreach(range=>{ values.foreach(e=> { val baseDataset = //get dataset baseDataset.write.parquet("some location") }) }) {code} This is used as a paragraph in zeppelin and it gets stuck at 80% after completing 5-6 iterations (I have tried debugging it by writing {{println}} statements). Here's what I can see in Thread dump : {code} "pool-2-thread-10" #174 prio=5 os_prio=0 tid=0x7f9f1807e800 nid=0x3945 waiting on condition [0x7f9eddec1000] java.lang.Thread.State: WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for <0x000552b4bef0> (a scala.concurrent.impl.Promise$CompletionLatch) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836) at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:997) at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1304) at scala.concurrent.impl.Promise$DefaultPromise.tryAwait(Promise.scala:202) at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:218) at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:153) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:619) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1918) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1931) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1951) at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply$mcV$sp(FileFormatWriter.scala:127) at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:121) at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:121) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57) at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:121) at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:101) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:58) - locked <0x000552b92c08> (a org.apache.spark.sql.execution.command.ExecutedCommandExec) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:56) at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:74) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:114) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:114) at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:135) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:132) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:113) at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:87) - locked <0x000552b92c80> (a org.apache.spark.sql.execution.QueryExecution) at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:87) at org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:492) at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:215) at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:198) at org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:494) at $line167049345059.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1$$anonfun$apply$1.apply(:82) at $line167049345059.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1$$anonfun$apply$1.apply(:63) at scala.col
[jira] [Updated] (SPARK-20155) CSV-files with quoted quotes can't be parsed, if delimiter follows quoted quote
[ https://issues.apache.org/jira/browse/SPARK-20155?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rick Moritz updated SPARK-20155: Component/s: SQL > CSV-files with quoted quotes can't be parsed, if delimiter follows quoted > quote > --- > > Key: SPARK-20155 > URL: https://issues.apache.org/jira/browse/SPARK-20155 > Project: Spark > Issue Type: Bug > Components: Input/Output, SQL >Affects Versions: 2.0.0 >Reporter: Rick Moritz > > According to : > https://tools.ietf.org/html/rfc4180#section-2 > 7. If double-quotes are used to enclose fields, then a double-quote >appearing inside a field must be escaped by preceding it with >another double quote. For example: >"aaa","b""bb","ccc" > This currently works as is, but the following does not: > "aaa","b""b,b","ccc" > while "aaa","b\"b,b","ccc" does get parsed. > I assume, this happens because quotes are currently being parsed in pairs, > and that somehow ends up unquoting delimiter. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-20224) Update apache docs
[ https://issues.apache.org/jira/browse/SPARK-20224?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-20224: Assignee: Tathagata Das (was: Apache Spark) > Update apache docs > -- > > Key: SPARK-20224 > URL: https://issues.apache.org/jira/browse/SPARK-20224 > Project: Spark > Issue Type: Sub-task > Components: Structured Streaming >Affects Versions: 2.2.0 >Reporter: Tathagata Das >Assignee: Tathagata Das > Fix For: 2.2.0 > > -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-20224) Update apache docs
[ https://issues.apache.org/jira/browse/SPARK-20224?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-20224: Assignee: Apache Spark (was: Tathagata Das) > Update apache docs > -- > > Key: SPARK-20224 > URL: https://issues.apache.org/jira/browse/SPARK-20224 > Project: Spark > Issue Type: Sub-task > Components: Structured Streaming >Affects Versions: 2.2.0 >Reporter: Tathagata Das >Assignee: Apache Spark > Fix For: 2.2.0 > > -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-20224) Update apache docs
[ https://issues.apache.org/jira/browse/SPARK-20224?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15956818#comment-15956818 ] Apache Spark commented on SPARK-20224: -- User 'tdas' has created a pull request for this issue: https://github.com/apache/spark/pull/17539 > Update apache docs > -- > > Key: SPARK-20224 > URL: https://issues.apache.org/jira/browse/SPARK-20224 > Project: Spark > Issue Type: Sub-task > Components: Structured Streaming >Affects Versions: 2.2.0 >Reporter: Tathagata Das >Assignee: Tathagata Das > Fix For: 2.2.0 > > -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-20224) Update apache docs
Tathagata Das created SPARK-20224: - Summary: Update apache docs Key: SPARK-20224 URL: https://issues.apache.org/jira/browse/SPARK-20224 Project: Spark Issue Type: Sub-task Components: Structured Streaming Affects Versions: 2.2.0 Reporter: Tathagata Das Assignee: Tathagata Das -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-19067) mapGroupsWithState - arbitrary stateful operations with Structured Streaming (similar to DStream.mapWithState)
[ https://issues.apache.org/jira/browse/SPARK-19067?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tathagata Das updated SPARK-19067: -- Description: Right now the only way to do stateful operations with with Aggregator or UDAF. However, this does not give users control of emission or expiration of state making it hard to implement things like sessionization. We should add a more general construct (probably similar to {{DStream.mapWithState}}) to structured streaming. Here is the design. *Requirements* - Users should be able to specify a function that can do the following - Access the input row corresponding to a key - Access the previous state corresponding to a key - Optionally, update or remove the state - Output any number of new rows (or none at all) *Proposed API* {code} // New methods on KeyValueGroupedDataset class KeyValueGroupedDataset[K, V] { // Scala friendly def mapGroupsWithState[S: Encoder, U: Encoder](func: (K, Iterator[V], State[S]) => U) def flatMapGroupsWithState[S: Encode, U: Encoder](func: (K, Iterator[V], State[S]) => Iterator[U]) // Java friendly def mapGroupsWithState[S, U](func: MapGroupsWithStateFunction[K, V, S, R], stateEncoder: Encoder[S], resultEncoder: Encoder[U]) def flatMapGroupsWithState[S, U](func: FlatMapGroupsWithStateFunction[K, V, S, R], stateEncoder: Encoder[S], resultEncoder: Encoder[U]) } // --- New Java-friendly function classes --- public interface MapGroupsWithStateFunction extends Serializable { R call(K key, Iterator values, state: State) throws Exception; } public interface FlatMapGroupsWithStateFunction extends Serializable { Iterator call(K key, Iterator values, state: GroupState) throws Exception; } // -- Wrapper class for state data -- trait GroupState[S] { def exists(): Boolean def get(): S// throws Exception is state does not exist def getOption(): Option[S] def update(newState: S): Unit def remove(): Unit // exists() will be false after this } {code} Key Semantics of the State class - The state can be null. - If the state.remove() is called, then state.exists() will return false, and getOption will returm None. - After that state.update(newState) is called, then state.exists() will return true, and getOption will return Some(...). - None of the operations are thread-safe. This is to avoid memory barriers. *Usage* {code} val stateFunc = (word: String, words: Iterator[String, runningCount: GroupState[Long]) => { val newCount = words.size + runningCount.getOption.getOrElse(0L) runningCount.update(newCount) (word, newCount) } dataset // type is Dataset[String] .groupByKey[String](w => w) // generates KeyValueGroupedDataset[String, String] .mapGroupsWithState[Long, (String, Long)](stateFunc) // returns Dataset[(String, Long)] {code} *Future Directions* - Timeout based state expiration (that has not received data for a while) - Done - General expression based expiration - TODO. Any real usecases that cannot be done with timeouts? was: Right now the only way to do stateful operations with with Aggregator or UDAF. However, this does not give users control of emission or expiration of state making it hard to implement things like sessionization. We should add a more general construct (probably similar to {{DStream.mapWithState}}) to structured streaming. Here is the design. *Requirements* - Users should be able to specify a function that can do the following - Access the input row corresponding to a key - Access the previous state corresponding to a key - Optionally, update or remove the state - Output any number of new rows (or none at all) *Proposed API* {code} // New methods on KeyValueGroupedDataset class KeyValueGroupedDataset[K, V] { // Scala friendly def mapGroupsWithState[S: Encoder, U: Encoder](func: (K, Iterator[V], State[S]) => U) def flatMapGroupsWithState[S: Encode, U: Encoder](func: (K, Iterator[V], State[S]) => Iterator[U]) // Java friendly def mapGroupsWithState[S, U](func: MapGroupsWithStateFunction[K, V, S, R], stateEncoder: Encoder[S], resultEncoder: Encoder[U]) def flatMapGroupsWithState[S, U](func: FlatMapGroupsWithStateFunction[K, V, S, R], stateEncoder: Encoder[S], resultEncoder: Encoder[U]) } // --- New Java-friendly function classes --- public interface MapGroupsWithStateFunction extends Serializable { R call(K key, Iterator values, state: State) throws Exception; } public interface FlatMapGroupsWithStateFunction extends Serializable { Iterator call(K key, Iterator values, state:
[jira] [Assigned] (SPARK-19807) Add reason for cancellation when a stage is killed using web UI
[ https://issues.apache.org/jira/browse/SPARK-19807?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sean Owen reassigned SPARK-19807: - Assignee: shaolinliu > Add reason for cancellation when a stage is killed using web UI > --- > > Key: SPARK-19807 > URL: https://issues.apache.org/jira/browse/SPARK-19807 > Project: Spark > Issue Type: Improvement > Components: Web UI >Affects Versions: 2.1.0 >Reporter: Jacek Laskowski >Assignee: shaolinliu >Priority: Trivial > Fix For: 2.2.0 > > > When a user kills a stage using web UI (in Stages page), > {{StagesTab.handleKillRequest}} requests {{SparkContext}} to cancel the stage > without giving a reason. {{SparkContext}} has {{cancelStage(stageId: Int, > reason: String)}} that Spark could use to pass the information for > monitoring/debugging purposes. > {code} > scala> sc.range(0, 5, 1, 1).mapPartitions { nums => { Thread.sleep(60 * > 1000); nums } }.count > {code} > Use http://localhost:4040/stages/ and click Kill. > {code} > org.apache.spark.SparkException: Job 0 cancelled because Stage 0 was cancelled > at > org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1486) > at > org.apache.spark.scheduler.DAGScheduler.handleJobCancellation(DAGScheduler.scala:1426) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleStageCancellation$1.apply$mcVI$sp(DAGScheduler.scala:1415) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleStageCancellation$1.apply(DAGScheduler.scala:1408) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleStageCancellation$1.apply(DAGScheduler.scala:1408) > at > scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) > at scala.collection.mutable.ArrayOps$ofInt.foreach(ArrayOps.scala:234) > at > org.apache.spark.scheduler.DAGScheduler.handleStageCancellation(DAGScheduler.scala:1408) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1670) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1656) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1645) > at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) > at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:628) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:2019) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:2040) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:2059) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:2084) > at org.apache.spark.rdd.RDD.count(RDD.scala:1158) > ... 48 elided > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-19807) Add reason for cancellation when a stage is killed using web UI
[ https://issues.apache.org/jira/browse/SPARK-19807?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sean Owen resolved SPARK-19807. --- Resolution: Fixed Fix Version/s: 2.2.0 Issue resolved by pull request 17258 [https://github.com/apache/spark/pull/17258] > Add reason for cancellation when a stage is killed using web UI > --- > > Key: SPARK-19807 > URL: https://issues.apache.org/jira/browse/SPARK-19807 > Project: Spark > Issue Type: Improvement > Components: Web UI >Affects Versions: 2.1.0 >Reporter: Jacek Laskowski >Priority: Trivial > Fix For: 2.2.0 > > > When a user kills a stage using web UI (in Stages page), > {{StagesTab.handleKillRequest}} requests {{SparkContext}} to cancel the stage > without giving a reason. {{SparkContext}} has {{cancelStage(stageId: Int, > reason: String)}} that Spark could use to pass the information for > monitoring/debugging purposes. > {code} > scala> sc.range(0, 5, 1, 1).mapPartitions { nums => { Thread.sleep(60 * > 1000); nums } }.count > {code} > Use http://localhost:4040/stages/ and click Kill. > {code} > org.apache.spark.SparkException: Job 0 cancelled because Stage 0 was cancelled > at > org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1486) > at > org.apache.spark.scheduler.DAGScheduler.handleJobCancellation(DAGScheduler.scala:1426) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleStageCancellation$1.apply$mcVI$sp(DAGScheduler.scala:1415) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleStageCancellation$1.apply(DAGScheduler.scala:1408) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleStageCancellation$1.apply(DAGScheduler.scala:1408) > at > scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) > at scala.collection.mutable.ArrayOps$ofInt.foreach(ArrayOps.scala:234) > at > org.apache.spark.scheduler.DAGScheduler.handleStageCancellation(DAGScheduler.scala:1408) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1670) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1656) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1645) > at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) > at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:628) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:2019) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:2040) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:2059) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:2084) > at org.apache.spark.rdd.RDD.count(RDD.scala:1158) > ... 48 elided > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-14681) Provide label/impurity stats for spark.ml decision tree nodes
[ https://issues.apache.org/jira/browse/SPARK-14681?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-14681: Assignee: Apache Spark > Provide label/impurity stats for spark.ml decision tree nodes > - > > Key: SPARK-14681 > URL: https://issues.apache.org/jira/browse/SPARK-14681 > Project: Spark > Issue Type: Improvement > Components: ML >Reporter: Joseph K. Bradley >Assignee: Apache Spark > > Currently, spark.ml decision trees provide all node info except for the > aggregated stats about labels and impurities. This task is to provide those > publicly. We need to choose a good API for it, so we should discuss the > design on this issue before implementing it. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-14681) Provide label/impurity stats for spark.ml decision tree nodes
[ https://issues.apache.org/jira/browse/SPARK-14681?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15956646#comment-15956646 ] Apache Spark commented on SPARK-14681: -- User 'shaynativ' has created a pull request for this issue: https://github.com/apache/spark/pull/17466 > Provide label/impurity stats for spark.ml decision tree nodes > - > > Key: SPARK-14681 > URL: https://issues.apache.org/jira/browse/SPARK-14681 > Project: Spark > Issue Type: Improvement > Components: ML >Reporter: Joseph K. Bradley > > Currently, spark.ml decision trees provide all node info except for the > aggregated stats about labels and impurities. This task is to provide those > publicly. We need to choose a good API for it, so we should discuss the > design on this issue before implementing it. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Assigned] (SPARK-14681) Provide label/impurity stats for spark.ml decision tree nodes
[ https://issues.apache.org/jira/browse/SPARK-14681?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Apache Spark reassigned SPARK-14681: Assignee: (was: Apache Spark) > Provide label/impurity stats for spark.ml decision tree nodes > - > > Key: SPARK-14681 > URL: https://issues.apache.org/jira/browse/SPARK-14681 > Project: Spark > Issue Type: Improvement > Components: ML >Reporter: Joseph K. Bradley > > Currently, spark.ml decision trees provide all node info except for the > aggregated stats about labels and impurities. This task is to provide those > publicly. We need to choose a good API for it, so we should discuss the > design on this issue before implementing it. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org