[jira] [Commented] (SPARK-23443) Spark with Glue as external catalog
[ https://issues.apache.org/jira/browse/SPARK-23443?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16604441#comment-16604441 ] Ameen Tayyebi commented on SPARK-23443: --- I've been sidetracked with lots of other projects, so at this time, I don't have bandwidth to work on this unfortunately :( :( > Spark with Glue as external catalog > --- > > Key: SPARK-23443 > URL: https://issues.apache.org/jira/browse/SPARK-23443 > Project: Spark > Issue Type: New Feature > Components: Spark Core >Affects Versions: 2.4.0 >Reporter: Ameen Tayyebi >Priority: Major > > AWS Glue Catalog is an external Hive metastore backed by a web service. It > allows permanent storage of catalog data for BigData use cases. > To find out more information about AWS Glue, please consult: > * AWS Glue - [https://aws.amazon.com/glue/] > * Using Glue as a Metastore catalog for Spark - > [https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-spark-glue.html] > Today, the integration of Glue and Spark is through the Hive layer. Glue > implements the IMetaStore interface of Hive and for installations of Spark > that contain Hive, Glue can be used as the metastore. > The feature set that Glue supports does not align 1-1 with the set of > features that the latest version of Spark supports. For example, Glue > interface supports more advanced partition pruning that the latest version of > Hive embedded in Spark. > To enable a more natural integration with Spark and to allow leveraging > latest features of Glue, without being coupled to Hive, a direct integration > through Spark's own Catalog API is proposed. This Jira tracks this work. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-23443) Spark with Glue as external catalog
[ https://issues.apache.org/jira/browse/SPARK-23443?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16382117#comment-16382117 ] Ameen Tayyebi commented on SPARK-23443: --- Great, thank you so much. I've been stuck with bunch of other tasks at work unfortunately. I think I'll be able to pick this up again in 2-3 weeks. I'll try to get a small change out so that we can start iterating on it. > Spark with Glue as external catalog > --- > > Key: SPARK-23443 > URL: https://issues.apache.org/jira/browse/SPARK-23443 > Project: Spark > Issue Type: New Feature > Components: Spark Core >Affects Versions: 2.4.0 >Reporter: Ameen Tayyebi >Priority: Major > > AWS Glue Catalog is an external Hive metastore backed by a web service. It > allows permanent storage of catalog data for BigData use cases. > To find out more information about AWS Glue, please consult: > * AWS Glue - [https://aws.amazon.com/glue/] > * Using Glue as a Metastore catalog for Spark - > [https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-spark-glue.html] > Today, the integration of Glue and Spark is through the Hive layer. Glue > implements the IMetaStore interface of Hive and for installations of Spark > that contain Hive, Glue can be used as the metastore. > The feature set that Glue supports does not align 1-1 with the set of > features that the latest version of Spark supports. For example, Glue > interface supports more advanced partition pruning that the latest version of > Hive embedded in Spark. > To enable a more natural integration with Spark and to allow leveraging > latest features of Glue, without being coupled to Hive, a direct integration > through Spark's own Catalog API is proposed. This Jira tracks this work. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-23443) Spark with Glue as external catalog
[ https://issues.apache.org/jira/browse/SPARK-23443?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16370015#comment-16370015 ] Ameen Tayyebi commented on SPARK-23443: --- Hello, I have this locally working (just limited read cases) and plan to submit my first iteration this week. It would be great to work on this together! Any help would be appreciated :) Ameen > Spark with Glue as external catalog > --- > > Key: SPARK-23443 > URL: https://issues.apache.org/jira/browse/SPARK-23443 > Project: Spark > Issue Type: New Feature > Components: Spark Core >Affects Versions: 2.4.0 >Reporter: Ameen Tayyebi >Priority: Major > > AWS Glue Catalog is an external Hive metastore backed by a web service. It > allows permanent storage of catalog data for BigData use cases. > To find out more information about AWS Glue, please consult: > * AWS Glue - [https://aws.amazon.com/glue/] > * Using Glue as a Metastore catalog for Spark - > [https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-spark-glue.html] > Today, the integration of Glue and Spark is through the Hive layer. Glue > implements the IMetaStore interface of Hive and for installations of Spark > that contain Hive, Glue can be used as the metastore. > The feature set that Glue supports does not align 1-1 with the set of > features that the latest version of Spark supports. For example, Glue > interface supports more advanced partition pruning that the latest version of > Hive embedded in Spark. > To enable a more natural integration with Spark and to allow leveraging > latest features of Glue, without being coupled to Hive, a direct integration > through Spark's own Catalog API is proposed. This Jira tracks this work. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-22913) Hive Partition Pruning, Fractional and Timestamp types
[ https://issues.apache.org/jira/browse/SPARK-22913?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ameen Tayyebi resolved SPARK-22913. --- Resolution: Won't Fix Resolving in favor of native Glue integration. These advanced predicates can't be supported because the version of Hive embedded in Spark does not support them. > Hive Partition Pruning, Fractional and Timestamp types > -- > > Key: SPARK-22913 > URL: https://issues.apache.org/jira/browse/SPARK-22913 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.3.0 >Reporter: Ameen Tayyebi >Priority: Major > > Spark currently pushes the predicates it has in the SQL query to Hive > Metastore. This only applies to predicates that are placed on top of > partitioning columns. As more and more hive metastore implementations come > around, this is an important optimization to allow data to be prefiltered to > only relevant partitions. Consider the following example: > Table: > create external table data (key string, quantity long) > partitioned by (processing-date timestamp) > Query: > select * from data where processing-date = '2017-10-23 00:00:00' > Currently, no filters will be pushed to the hive metastore for the above > query. The reason is that the code that tries to compute predicates to be > sent to hive metastore, only deals with integral and string column types. It > doesn't know how to handle fractional and timestamp columns. > I have tables in my metastore (AWS Glue) with millions of partitions of type > timestamp and double. In my specific case, it takes Spark's master node about > 6.5 minutes to download all partitions for the table, and then filter the > partitions client-side. The actual processing time of my query is only 6 > seconds. In other words, without partition pruning, I'm looking at 6.5 > minutes of processing and with partition pruning, I'm looking at 6 seconds > only. > I have a fix for this developed locally that I'll provide shortly as a pull > request. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-23443) Spark with Glue as external catalog
Ameen Tayyebi created SPARK-23443: - Summary: Spark with Glue as external catalog Key: SPARK-23443 URL: https://issues.apache.org/jira/browse/SPARK-23443 Project: Spark Issue Type: New Feature Components: Spark Core Affects Versions: 2.4.0 Reporter: Ameen Tayyebi AWS Glue Catalog is an external Hive metastore backed by a web service. It allows permanent storage of catalog data for BigData use cases. To find out more information about AWS Glue, please consult: * AWS Glue - [https://aws.amazon.com/glue/] * Using Glue as a Metastore catalog for Spark - [https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-spark-glue.html] Today, the integration of Glue and Spark is through the Hive layer. Glue implements the IMetaStore interface of Hive and for installations of Spark that contain Hive, Glue can be used as the metastore. The feature set that Glue supports does not align 1-1 with the set of features that the latest version of Spark supports. For example, Glue interface supports more advanced partition pruning that the latest version of Hive embedded in Spark. To enable a more natural integration with Spark and to allow leveraging latest features of Glue, without being coupled to Hive, a direct integration through Spark's own Catalog API is proposed. This Jira tracks this work. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-22913) Hive Partition Pruning, Fractional and Timestamp types
[ https://issues.apache.org/jira/browse/SPARK-22913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16305018#comment-16305018 ] Ameen Tayyebi edited comment on SPARK-22913 at 12/28/17 3:31 AM: - Note: I'll be away from December 30th until January 18th so I'll be checking up on this issue and the pull request at that time. was (Author: ameen.tayy...@gmail.com): Note: I'll be away since December 30th until January 18th so I'll be checking up on this issue and the pull request at that time. > Hive Partition Pruning, Fractional and Timestamp types > -- > > Key: SPARK-22913 > URL: https://issues.apache.org/jira/browse/SPARK-22913 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.3.0 >Reporter: Ameen Tayyebi > Fix For: 2.3.0 > > > Spark currently pushes the predicates it has in the SQL query to Hive > Metastore. This only applies to predicates that are placed on top of > partitioning columns. As more and more hive metastore implementations come > around, this is an important optimization to allow data to be prefiltered to > only relevant partitions. Consider the following example: > Table: > create external table data (key string, quantity long) > partitioned by (processing-date timestamp) > Query: > select * from data where processing-date = '2017-10-23 00:00:00' > Currently, no filters will be pushed to the hive metastore for the above > query. The reason is that the code that tries to compute predicates to be > sent to hive metastore, only deals with integral and string column types. It > doesn't know how to handle fractional and timestamp columns. > I have tables in my metastore (AWS Glue) with millions of partitions of type > timestamp and double. In my specific case, it takes Spark's master node about > 6.5 minutes to download all partitions for the table, and then filter the > partitions client-side. The actual processing time of my query is only 6 > seconds. In other words, without partition pruning, I'm looking at 6.5 > minutes of processing and with partition pruning, I'm looking at 6 seconds > only. > I have a fix for this developed locally that I'll provide shortly as a pull > request. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-22913) Hive Partition Pruning, Fractional and Timestamp types
[ https://issues.apache.org/jira/browse/SPARK-22913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16305018#comment-16305018 ] Ameen Tayyebi commented on SPARK-22913: --- Note: I'll be away since December 30th until January 18th so I'll be checking up on this issue and the pull request at that time. > Hive Partition Pruning, Fractional and Timestamp types > -- > > Key: SPARK-22913 > URL: https://issues.apache.org/jira/browse/SPARK-22913 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.3.0 >Reporter: Ameen Tayyebi > Fix For: 2.3.0 > > > Spark currently pushes the predicates it has in the SQL query to Hive > Metastore. This only applies to predicates that are placed on top of > partitioning columns. As more and more hive metastore implementations come > around, this is an important optimization to allow data to be prefiltered to > only relevant partitions. Consider the following example: > Table: > create external table data (key string, quantity long) > partitioned by (processing-date timestamp) > Query: > select * from data where processing-date = '2017-10-23 00:00:00' > Currently, no filters will be pushed to the hive metastore for the above > query. The reason is that the code that tries to compute predicates to be > sent to hive metastore, only deals with integral and string column types. It > doesn't know how to handle fractional and timestamp columns. > I have tables in my metastore (AWS Glue) with millions of partitions of type > timestamp and double. In my specific case, it takes Spark's master node about > 6.5 minutes to download all partitions for the table, and then filter the > partitions client-side. The actual processing time of my query is only 6 > seconds. In other words, without partition pruning, I'm looking at 6.5 > minutes of processing and with partition pruning, I'm looking at 6 seconds > only. > I have a fix for this developed locally that I'll provide shortly as a pull > request. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-22913) Hive Partition Pruning, Fractional and Timestamp types
Ameen Tayyebi created SPARK-22913: - Summary: Hive Partition Pruning, Fractional and Timestamp types Key: SPARK-22913 URL: https://issues.apache.org/jira/browse/SPARK-22913 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 2.3.0 Reporter: Ameen Tayyebi Fix For: 2.3.0 Spark currently pushes the predicates it has in the SQL query to Hive Metastore. This only applies to predicates that are placed on top of partitioning columns. As more and more hive metastore implementations come around, this is an important optimization to allow data to be prefiltered to only relevant partitions. Consider the following example: Table: create external table data (key string, quantity long) partitioned by (processing-date timestamp) Query: select * from data where processing-date = '2017-10-23 00:00:00' Currently, no filters will be pushed to the hive metastore for the above query. The reason is that the code that tries to compute predicates to be sent to hive metastore, only deals with integral and string column types. It doesn't know how to handle fractional and timestamp columns. I have tables in my metastore (AWS Glue) with millions of partitions of type timestamp and double. In my specific case, it takes Spark's master node about 6.5 minutes to download all partitions for the table, and then filter the partitions client-side. The actual processing time of my query is only 6 seconds. In other words, without partition pruning, I'm looking at 6.5 minutes of processing and with partition pruning, I'm looking at 6 seconds only. I have a fix for this developed locally that I'll provide shortly as a pull request. -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-14651) CREATE TEMPORARY TABLE is not supported yet
[ https://issues.apache.org/jira/browse/SPARK-14651?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16265545#comment-16265545 ] Ameen Tayyebi commented on SPARK-14651: --- This blocks some of our production use cases :( we only expose Spark-SQL interface to our customers, so using registerTempTable is not a choice for them. Any words on priority of this? Is this fix a matter of "plumbing" given that registerTempTable is already implemented? > CREATE TEMPORARY TABLE is not supported yet > --- > > Key: SPARK-14651 > URL: https://issues.apache.org/jira/browse/SPARK-14651 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.0.0, 2.2.0 >Reporter: Jacek Laskowski >Priority: Minor > > With today's master it seems that {{CREATE TEMPORARY TABLE}} may or may not > work depending on how complete the DDL is (?) > {code} > scala> sql("CREATE temporary table t2") > 16/04/14 23:29:26 INFO HiveSqlParser: Parsing command: CREATE temporary table > t2 > org.apache.spark.sql.catalyst.parser.ParseException: > CREATE TEMPORARY TABLE is not supported yet. Please use registerTempTable as > an alternative.(line 1, pos 0) > == SQL == > CREATE temporary table t2 > ^^^ > at > org.apache.spark.sql.hive.execution.HiveSqlAstBuilder$$anonfun$visitCreateTable$1.apply(HiveSqlParser.scala:169) > at > org.apache.spark.sql.hive.execution.HiveSqlAstBuilder$$anonfun$visitCreateTable$1.apply(HiveSqlParser.scala:165) > at > org.apache.spark.sql.catalyst.parser.ParserUtils$.withOrigin(ParserUtils.scala:85) > at > org.apache.spark.sql.hive.execution.HiveSqlAstBuilder.visitCreateTable(HiveSqlParser.scala:165) > at > org.apache.spark.sql.hive.execution.HiveSqlAstBuilder.visitCreateTable(HiveSqlParser.scala:53) > at > org.apache.spark.sql.catalyst.parser.SqlBaseParser$CreateTableContext.accept(SqlBaseParser.java:1049) > at > org.antlr.v4.runtime.tree.AbstractParseTreeVisitor.visit(AbstractParseTreeVisitor.java:42) > at > org.apache.spark.sql.catalyst.parser.AstBuilder$$anonfun$visitSingleStatement$1.apply(AstBuilder.scala:63) > at > org.apache.spark.sql.catalyst.parser.AstBuilder$$anonfun$visitSingleStatement$1.apply(AstBuilder.scala:63) > at > org.apache.spark.sql.catalyst.parser.ParserUtils$.withOrigin(ParserUtils.scala:85) > at > org.apache.spark.sql.catalyst.parser.AstBuilder.visitSingleStatement(AstBuilder.scala:62) > at > org.apache.spark.sql.catalyst.parser.AbstractSqlParser$$anonfun$parsePlan$1.apply(ParseDriver.scala:54) > at > org.apache.spark.sql.catalyst.parser.AbstractSqlParser$$anonfun$parsePlan$1.apply(ParseDriver.scala:53) > at > org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parse(ParseDriver.scala:86) > at > org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parsePlan(ParseDriver.scala:53) > at org.apache.spark.sql.SQLContext.parseSql(SQLContext.scala:198) > at > org.apache.spark.sql.hive.HiveContext.org$apache$spark$sql$hive$HiveContext$$super$parseSql(HiveContext.scala:201) > at > org.apache.spark.sql.hive.HiveContext$$anonfun$parseSql$1.apply(HiveContext.scala:201) > at > org.apache.spark.sql.hive.HiveContext$$anonfun$parseSql$1.apply(HiveContext.scala:201) > at > org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$withHiveState$1.apply(HiveClientImpl.scala:228) > at > org.apache.spark.sql.hive.client.HiveClientImpl.liftedTree1$1(HiveClientImpl.scala:175) > at > org.apache.spark.sql.hive.client.HiveClientImpl.retryLocked(HiveClientImpl.scala:174) > at > org.apache.spark.sql.hive.client.HiveClientImpl.withHiveState(HiveClientImpl.scala:217) > at org.apache.spark.sql.hive.HiveContext.parseSql(HiveContext.scala:200) > at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:765) > ... 48 elided > scala> sql("CREATE temporary table t2 USING PARQUET OPTIONS (PATH 'hello') AS > SELECT * FROM t1") > 16/04/14 23:30:21 INFO HiveSqlParser: Parsing command: CREATE temporary table > t2 USING PARQUET OPTIONS (PATH 'hello') AS SELECT * FROM t1 > org.apache.spark.sql.AnalysisException: Table or View not found: t1; line 1 > pos 80 > at > org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42) > at > org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.org$apache$spark$sql$catalyst$analysis$Analyzer$ResolveRelations$$getTable(Analyzer.scala:412) > at > org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$8.applyOrElse(Analyzer.scala:421) > at > org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$8.applyOrElse(Analyzer.scala:416) > at > org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolveOperators$1.apply(LogicalPlan.scala:58) > at >
[jira] [Commented] (SPARK-20588) from_utc_timestamp causes bottleneck
[ https://issues.apache.org/jira/browse/SPARK-20588?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15996794#comment-15996794 ] Ameen Tayyebi commented on SPARK-20588: --- You could consider caching per thread as well which would simplify the implementation since you no longer need a concurrent data structure to store the cached values. > from_utc_timestamp causes bottleneck > > > Key: SPARK-20588 > URL: https://issues.apache.org/jira/browse/SPARK-20588 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.0.2 > Environment: AWS EMR AMI 5.2.1 >Reporter: Ameen Tayyebi > > We have a SQL query that makes use of the from_utc_timestamp function like > so: from_utc_timestamp(itemSigningTime,'America/Los_Angeles') > This causes a major bottleneck. Our exact call is: > date_add(from_utc_timestamp(itemSigningTime,'America/Los_Angeles'), 1) > Switching from the above to date_add(itemSigningTime, 1) reduces the job > running time from 40 minutes to 9. > When from_utc_timestamp function is used, several threads in the executors > are in the BLOCKED state, on this call stack: > "Executor task launch worker-63" #261 daemon prio=5 os_prio=0 > tid=0x7f848472e000 nid=0x4294 waiting for monitor entry > [0x7f501981c000] >java.lang.Thread.State: BLOCKED (on object monitor) > at java.util.TimeZone.getTimeZone(TimeZone.java:516) > - waiting to lock <0x7f5216c2aa58> (a java.lang.Class for > java.util.TimeZone) > at > org.apache.spark.sql.catalyst.util.DateTimeUtils$.stringToTimestamp(DateTimeUtils.scala:356) > at > org.apache.spark.sql.catalyst.util.DateTimeUtils.stringToTimestamp(DateTimeUtils.scala) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithKeys$(Unknown > Source) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown > Source) > at > org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) > at > org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370) > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) > at > org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:161) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47) > at org.apache.spark.scheduler.Task.run(Task.scala:86) > at > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > Can we cache the locale's once per JVM so that we don't do this for every > record? -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-20588) from_utc_timestamp causes bottleneck
[ https://issues.apache.org/jira/browse/SPARK-20588?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15995499#comment-15995499 ] Ameen Tayyebi edited comment on SPARK-20588 at 5/3/17 7:40 PM: --- Hopefully more readable version of the call stack: {code:java} "Executor task launch worker-63" #261 daemon prio=5 os_prio=0 tid=0x7f848472e000 nid=0x4294 waiting for monitor entry [0x7f501981c000] java.lang.Thread.State: BLOCKED (on object monitor) at java.util.TimeZone.getTimeZone(TimeZone.java:516) - waiting to lock <0x7f5216c2aa58> (a java.lang.Class for java.util.TimeZone) at org.apache.spark.sql.catalyst.util.DateTimeUtils$.stringToTimestamp(DateTimeUtils.scala:356) at org.apache.spark.sql.catalyst.util.DateTimeUtils.stringToTimestamp(DateTimeUtils.scala) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithKeys$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:161) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47) at org.apache.spark.scheduler.Task.run(Task.scala:86) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) {code} was (Author: ameen.tayy...@gmail.com): Hopefully more readable version of the call stack: "Executor task launch worker-63" #261 daemon prio=5 os_prio=0 tid=0x7f848472e000 nid=0x4294 waiting for monitor entry [0x7f501981c000] java.lang.Thread.State: BLOCKED (on object monitor) at java.util.TimeZone.getTimeZone(TimeZone.java:516) - waiting to lock <0x7f5216c2aa58> (a java.lang.Class for java.util.TimeZone) at org.apache.spark.sql.catalyst.util.DateTimeUtils$.stringToTimestamp(DateTimeUtils.scala:356) at org.apache.spark.sql.catalyst.util.DateTimeUtils.stringToTimestamp(DateTimeUtils.scala) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithKeys$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:161) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47) at org.apache.spark.scheduler.Task.run(Task.scala:86) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) > from_utc_timestamp causes bottleneck > > > Key: SPARK-20588 > URL: https://issues.apache.org/jira/browse/SPARK-20588 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.0.2 > Environment: AWS EMR AMI 5.2.1 >Reporter: Ameen Tayyebi > > We have a SQL query that makes use of the from_utc_timestamp function like > so: from_utc_timestamp(itemSigningTime,'America/Los_Angeles') > This causes a major bottleneck. Our exact call is: > date_add(from_utc_timestamp(itemSigningTime,'America/Los_Angeles'), 1) > Switching from the above to date_add(itemSigningTime, 1) reduces the job > running time from 40 minutes to 9. > When from_utc_timestamp function is used, several threads in the executors > are in the BLOCKED state, on this call stack: > "Executor task launch worker-63" #261 daemon prio=5 os_prio=0 > tid=0x7f848472e000 nid=0x4294 waiting for monitor entry > [0x7f501981c000] >
[jira] [Commented] (SPARK-20588) from_utc_timestamp causes bottleneck
[ https://issues.apache.org/jira/browse/SPARK-20588?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15995499#comment-15995499 ] Ameen Tayyebi commented on SPARK-20588: --- Hopefully more readable version of the call stack: "Executor task launch worker-63" #261 daemon prio=5 os_prio=0 tid=0x7f848472e000 nid=0x4294 waiting for monitor entry [0x7f501981c000] java.lang.Thread.State: BLOCKED (on object monitor) at java.util.TimeZone.getTimeZone(TimeZone.java:516) - waiting to lock <0x7f5216c2aa58> (a java.lang.Class for java.util.TimeZone) at org.apache.spark.sql.catalyst.util.DateTimeUtils$.stringToTimestamp(DateTimeUtils.scala:356) at org.apache.spark.sql.catalyst.util.DateTimeUtils.stringToTimestamp(DateTimeUtils.scala) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithKeys$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:161) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47) at org.apache.spark.scheduler.Task.run(Task.scala:86) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) > from_utc_timestamp causes bottleneck > > > Key: SPARK-20588 > URL: https://issues.apache.org/jira/browse/SPARK-20588 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.0.2 > Environment: AWS EMR AMI 5.2.1 >Reporter: Ameen Tayyebi > > We have a SQL query that makes use of the from_utc_timestamp function like > so: from_utc_timestamp(itemSigningTime,'America/Los_Angeles') > This causes a major bottleneck. Our exact call is: > date_add(from_utc_timestamp(itemSigningTime,'America/Los_Angeles'), 1) > Switching from the above to date_add(itemSigningTime, 1) reduces the job > running time from 40 minutes to 9. > When from_utc_timestamp function is used, several threads in the executors > are in the BLOCKED state, on this call stack: > "Executor task launch worker-63" #261 daemon prio=5 os_prio=0 > tid=0x7f848472e000 nid=0x4294 waiting for monitor entry > [0x7f501981c000] >java.lang.Thread.State: BLOCKED (on object monitor) > at java.util.TimeZone.getTimeZone(TimeZone.java:516) > - waiting to lock <0x7f5216c2aa58> (a java.lang.Class for > java.util.TimeZone) > at > org.apache.spark.sql.catalyst.util.DateTimeUtils$.stringToTimestamp(DateTimeUtils.scala:356) > at > org.apache.spark.sql.catalyst.util.DateTimeUtils.stringToTimestamp(DateTimeUtils.scala) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithKeys$(Unknown > Source) > at > org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown > Source) > at > org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) > at > org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370) > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) > at > org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:161) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47) > at org.apache.spark.scheduler.Task.run(Task.scala:86) > at > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > Can we cache the locale's once per JVM so that we don't do this for every > record? -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail:
[jira] [Created] (SPARK-20588) from_utc_timestamp causes bottleneck
Ameen Tayyebi created SPARK-20588: - Summary: from_utc_timestamp causes bottleneck Key: SPARK-20588 URL: https://issues.apache.org/jira/browse/SPARK-20588 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 2.0.2 Environment: AWS EMR AMI 5.2.1 Reporter: Ameen Tayyebi We have a SQL query that makes use of the from_utc_timestamp function like so: from_utc_timestamp(itemSigningTime,'America/Los_Angeles') This causes a major bottleneck. Our exact call is: date_add(from_utc_timestamp(itemSigningTime,'America/Los_Angeles'), 1) Switching from the above to date_add(itemSigningTime, 1) reduces the job running time from 40 minutes to 9. When from_utc_timestamp function is used, several threads in the executors are in the BLOCKED state, on this call stack: "Executor task launch worker-63" #261 daemon prio=5 os_prio=0 tid=0x7f848472e000 nid=0x4294 waiting for monitor entry [0x7f501981c000] java.lang.Thread.State: BLOCKED (on object monitor) at java.util.TimeZone.getTimeZone(TimeZone.java:516) - waiting to lock <0x7f5216c2aa58> (a java.lang.Class for java.util.TimeZone) at org.apache.spark.sql.catalyst.util.DateTimeUtils$.stringToTimestamp(DateTimeUtils.scala:356) at org.apache.spark.sql.catalyst.util.DateTimeUtils.stringToTimestamp(DateTimeUtils.scala) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithKeys$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:161) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47) at org.apache.spark.scheduler.Task.run(Task.scala:86) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Can we cache the locale's once per JVM so that we don't do this for every record? -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-17777) Spark Scheduler Hangs Indefinitely
[ https://issues.apache.org/jira/browse/SPARK-1?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ameen Tayyebi updated SPARK-1: -- Attachment: jstack-dump.txt > Spark Scheduler Hangs Indefinitely > -- > > Key: SPARK-1 > URL: https://issues.apache.org/jira/browse/SPARK-1 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 1.6.0 > Environment: AWS EMR 4.3, can also be reproduced locally >Reporter: Ameen Tayyebi > Attachments: jstack-dump.txt, repro.scala > > > We've identified a problem with Spark scheduling. The issue manifests itself > when an RDD calls SparkContext.parallelize within its getPartitions method. > This seemingly "recursive" call causes the problem. We have a repro case that > can easily be run. > Please advise on what the issue might be and how we can work around it in the > mean time. > I've attached repro.scala which can simply be pasted in spark-shell to > reproduce the problem. > Why are we calling sc.parallelize in production within getPartitions? Well, > we have an RDD that is composed of several thousands of Parquet files. To > compute the partitioning strategy for this RDD, we create an RDD to read all > file sizes from S3 in parallel, so that we can quickly determine the proper > partitions. We do this to avoid executing this serially from the master node > which can result in significant slowness in the execution. Pseudo-code: > val splitInfo = sc.parallelize(filePaths).map(f => (f, > s3.getObjectSummary)).collect() > A similar logic is used in DataFrame by Spark itself: > https://github.com/apache/spark/blob/branch-1.6/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala#L902 > > Thanks, > -Ameen -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-17777) Spark Scheduler Hangs Indefinitely
[ https://issues.apache.org/jira/browse/SPARK-1?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ameen Tayyebi updated SPARK-1: -- Attachment: (was: jstack-dump.txt) > Spark Scheduler Hangs Indefinitely > -- > > Key: SPARK-1 > URL: https://issues.apache.org/jira/browse/SPARK-1 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 1.6.0 > Environment: AWS EMR 4.3, can also be reproduced locally >Reporter: Ameen Tayyebi > Attachments: repro.scala > > > We've identified a problem with Spark scheduling. The issue manifests itself > when an RDD calls SparkContext.parallelize within its getPartitions method. > This seemingly "recursive" call causes the problem. We have a repro case that > can easily be run. > Please advise on what the issue might be and how we can work around it in the > mean time. > I've attached repro.scala which can simply be pasted in spark-shell to > reproduce the problem. > Why are we calling sc.parallelize in production within getPartitions? Well, > we have an RDD that is composed of several thousands of Parquet files. To > compute the partitioning strategy for this RDD, we create an RDD to read all > file sizes from S3 in parallel, so that we can quickly determine the proper > partitions. We do this to avoid executing this serially from the master node > which can result in significant slowness in the execution. Pseudo-code: > val splitInfo = sc.parallelize(filePaths).map(f => (f, > s3.getObjectSummary)).collect() > A similar logic is used in DataFrame by Spark itself: > https://github.com/apache/spark/blob/branch-1.6/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala#L902 > > Thanks, > -Ameen -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-17777) Spark Scheduler Hangs Indefinitely
[ https://issues.apache.org/jira/browse/SPARK-1?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ameen Tayyebi updated SPARK-1: -- Attachment: jstack-dump.txt Jstack dump of driver when it's hung. > Spark Scheduler Hangs Indefinitely > -- > > Key: SPARK-1 > URL: https://issues.apache.org/jira/browse/SPARK-1 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 1.6.0 > Environment: AWS EMR 4.3, can also be reproduced locally >Reporter: Ameen Tayyebi > Attachments: jstack-dump.txt, repro.scala > > > We've identified a problem with Spark scheduling. The issue manifests itself > when an RDD calls SparkContext.parallelize within its getPartitions method. > This seemingly "recursive" call causes the problem. We have a repro case that > can easily be run. > Please advise on what the issue might be and how we can work around it in the > mean time. > I've attached repro.scala which can simply be pasted in spark-shell to > reproduce the problem. > Why are we calling sc.parallelize in production within getPartitions? Well, > we have an RDD that is composed of several thousands of Parquet files. To > compute the partitioning strategy for this RDD, we create an RDD to read all > file sizes from S3 in parallel, so that we can quickly determine the proper > partitions. We do this to avoid executing this serially from the master node > which can result in significant slowness in the execution. Pseudo-code: > val splitInfo = sc.parallelize(filePaths).map(f => (f, > s3.getObjectSummary)).collect() > A similar logic is used in DataFrame by Spark itself: > https://github.com/apache/spark/blob/branch-1.6/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala#L902 > > Thanks, > -Ameen -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-17777) Spark Scheduler Hangs Indefinitely
[ https://issues.apache.org/jira/browse/SPARK-1?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15548901#comment-15548901 ] Ameen Tayyebi commented on SPARK-1: --- I'll get you the call stack of the driver shortly. The reason why I'm not doing this from the driver, using multiple threads is that it takes a long time. That's also why in the other Spark code that I showed you it's done in parallel (using the cluster). It was taking about 40 minutes to list 6000 files using 10 threads from the driver. Keep in mind the files are on S3. We can of course bump up the # of threads used, but it's not the proper solution. When our customers are reading that very many files, they tend to use large clusters. By large, I mean 100 machines, with 8 cores each. It seems wasteful to not use the executors in those cases and push all the burden to the master. Using the RDD approach of getting the file sizes, I can compute the same 6000 in about a minute, including scheduling overhead. I don't have exact data in front of me right now, but I can dig it up if you really want it. I'd argue doing this from the master doesn't scale. I'm putting a workaround in right now, where I perform the computation outside of the getPartitions method. This is okay for our applications use case, where people are scheduling jobs. The overall time it takes their job to process is not impacted by this, so it's cool. However, for experimentation use case, where scientists are doing things interactively, now they have to wait when they construct the RDD. Our scientists usually run experiments on small clusters, hence why I'm pushing to truly postpone the computation to when it's needed. > Spark Scheduler Hangs Indefinitely > -- > > Key: SPARK-1 > URL: https://issues.apache.org/jira/browse/SPARK-1 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 1.6.0 > Environment: AWS EMR 4.3, can also be reproduced locally >Reporter: Ameen Tayyebi > Attachments: repro.scala > > > We've identified a problem with Spark scheduling. The issue manifests itself > when an RDD calls SparkContext.parallelize within its getPartitions method. > This seemingly "recursive" call causes the problem. We have a repro case that > can easily be run. > Please advise on what the issue might be and how we can work around it in the > mean time. > I've attached repro.scala which can simply be pasted in spark-shell to > reproduce the problem. > Why are we calling sc.parallelize in production within getPartitions? Well, > we have an RDD that is composed of several thousands of Parquet files. To > compute the partitioning strategy for this RDD, we create an RDD to read all > file sizes from S3 in parallel, so that we can quickly determine the proper > partitions. We do this to avoid executing this serially from the master node > which can result in significant slowness in the execution. Pseudo-code: > val splitInfo = sc.parallelize(filePaths).map(f => (f, > s3.getObjectSummary)).collect() > A similar logic is used in DataFrame by Spark itself: > https://github.com/apache/spark/blob/branch-1.6/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala#L902 > > Thanks, > -Ameen -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-17777) Spark Scheduler Hangs Indefinitely
[ https://issues.apache.org/jira/browse/SPARK-1?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15548784#comment-15548784 ] Ameen Tayyebi commented on SPARK-1: --- Thanks for the comments Sean. Your statement is about behavior of things that are not supposed to work varying is fair. I usually run into situations where if something works, it's been intentional. Let me share some more data points with you: If you change the very last line of the repro steps to y.map(r => r).count() then the computation will correctly finish. In other words, this only halts if a shuffle occurs. If you do *not* specify spark.default.parallelism, the same exact repro step code will function correctly and finish. Let's forget about the code I shared regarding DataFrame, I think that's distracting us from the issue. I'm not claiming that that piece of code is executing in the same "spot" in scheduler as the RDD code I'm providing. My intention was to demonstrate that acquiring splitting knowledge in parallel using the cluster itself has done before. Before we can dismiss this as "by design", I would appreciate it if you'd explain to me why this is not supposed to work. I haven't found any documentation, or any references in code that would hint this is not supposed to work. It feels odd that not specifying spark.default.paralellism would make this work, or that if there's no shuffle, things work fine. My use case is this: I'd like to acquire split information for my RDD, inlined with the Spark philosophy of doing things only if necessary (lazy evaluation). Doing this in getPartitions allows me to accomplish this. If this is not supported, is there an alternative that you can suggest? Ideally the following code executes "instantly": val r = new MyCustomRDD(); r.map(a => a); If I'm forced to pull the computation of partitions out of getPartitions, then this line will have to do work: val r = new MyCustomRDD(); // has to compute partitions even though it's not necessary at this point because no action has been performed against the RDD Thanks for your time! > Spark Scheduler Hangs Indefinitely > -- > > Key: SPARK-1 > URL: https://issues.apache.org/jira/browse/SPARK-1 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 1.6.0 > Environment: AWS EMR 4.3, can also be reproduced locally >Reporter: Ameen Tayyebi > Attachments: repro.scala > > > We've identified a problem with Spark scheduling. The issue manifests itself > when an RDD calls SparkContext.parallelize within its getPartitions method. > This seemingly "recursive" call causes the problem. We have a repro case that > can easily be run. > Please advise on what the issue might be and how we can work around it in the > mean time. > I've attached repro.scala which can simply be pasted in spark-shell to > reproduce the problem. > Why are we calling sc.parallelize in production within getPartitions? Well, > we have an RDD that is composed of several thousands of Parquet files. To > compute the partitioning strategy for this RDD, we create an RDD to read all > file sizes from S3 in parallel, so that we can quickly determine the proper > partitions. We do this to avoid executing this serially from the master node > which can result in significant slowness in the execution. Pseudo-code: > val splitInfo = sc.parallelize(filePaths).map(f => (f, > s3.getObjectSummary)).collect() > A similar logic is used in DataFrame by Spark itself: > https://github.com/apache/spark/blob/branch-1.6/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala#L902 > > Thanks, > -Ameen -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-17777) Spark Scheduler Hangs Indefinitely
[ https://issues.apache.org/jira/browse/SPARK-1?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15546799#comment-15546799 ] Ameen Tayyebi commented on SPARK-1: --- Thanks for the comment. Can you please clarify why you think this should not be possible? Interestingly this works in local mode, however hangs in yarn mode. I'd expect the same API calls to behave identically (semantically) irrespective of which more you run Spark on. If this is not supported shouldn't we expect a hang in all cases? On Oct 4, 2016 2:50 PM, "Sean Owen (JIRA)"wrote: [ https://issues.apache.org/jira/browse/SPARK-1?page= com.atlassian.jira.plugin.system.issuetabpanels:comment- tabpanel=15546284#comment-15546284 ] Sean Owen commented on SPARK-1: --- I think you've demonstrated that this doesn't work, and I wouldn't say that's supposed to work. The reference you give isn't the same thing in that it doesn't lead to this hang / infinite loop. So I think the answer is, you can't do that. itself when an RDD calls SparkContext.parallelize within its getPartitions method. This seemingly "recursive" call causes the problem. We have a repro case that can easily be run. the mean time. reproduce the problem. Well, we have an RDD that is composed of several thousands of Parquet files. To compute the partitioning strategy for this RDD, we create an RDD to read all file sizes from S3 in parallel, so that we can quickly determine the proper partitions. We do this to avoid executing this serially from the master node which can result in significant slowness in the execution. Pseudo-code: s3.getObjectSummary)).collect() core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala#L902 -- This message was sent by Atlassian JIRA (v6.3.4#6332) > Spark Scheduler Hangs Indefinitely > -- > > Key: SPARK-1 > URL: https://issues.apache.org/jira/browse/SPARK-1 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 1.6.0 > Environment: AWS EMR 4.3, can also be reproduced locally >Reporter: Ameen Tayyebi > Attachments: repro.scala > > > We've identified a problem with Spark scheduling. The issue manifests itself > when an RDD calls SparkContext.parallelize within its getPartitions method. > This seemingly "recursive" call causes the problem. We have a repro case that > can easily be run. > Please advise on what the issue might be and how we can work around it in the > mean time. > I've attached repro.scala which can simply be pasted in spark-shell to > reproduce the problem. > Why are we calling sc.parallelize in production within getPartitions? Well, > we have an RDD that is composed of several thousands of Parquet files. To > compute the partitioning strategy for this RDD, we create an RDD to read all > file sizes from S3 in parallel, so that we can quickly determine the proper > partitions. We do this to avoid executing this serially from the master node > which can result in significant slowness in the execution. Pseudo-code: > val splitInfo = sc.parallelize(filePaths).map(f => (f, > s3.getObjectSummary)).collect() > A similar logic is used in DataFrame by Spark itself: > https://github.com/apache/spark/blob/branch-1.6/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala#L902 > > Thanks, > -Ameen -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-17777) Spark Scheduler Hangs Indefinitely
[ https://issues.apache.org/jira/browse/SPARK-1?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ameen Tayyebi updated SPARK-1: -- Description: We've identified a problem with Spark scheduling. The issue manifests itself when an RDD calls SparkContext.parallelize within its getPartitions method. This seemingly "recursive" call causes the problem. We have a repro case that can easily be run. Please advise on what the issue might be and how we can work around it in the mean time. I've attached repro.scala which can simply be pasted in spark-shell to reproduce the problem. Why are we calling sc.parallelize in production within getPartitions? Well, we have an RDD that is composed of several thousands of Parquet files. To compute the partitioning strategy for this RDD, we create an RDD to read all file sizes from S3 in parallel, so that we can quickly determine the proper partitions. We do this to avoid executing this serially from the master node which can result in significant slowness in the execution. Pseudo-code: val splitInfo = sc.parallelize(filePaths).map(f => (f, s3.getObjectSummary)).collect() A similar logic is used in DataFrame by Spark itself: https://github.com/apache/spark/blob/branch-1.6/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala#L902 Thanks, -Ameen was: We've identified a problem with Spark scheduling. The issue manifests itself when an RDD calls SparkContext.parallelize within its getPartitions method. This seemingly "recursive" call causes the problem. We have a repro case that can easily be run. Please advise on what the issue might be and how we can work around it in the mean time. Thanks, -Ameen > Spark Scheduler Hangs Indefinitely > -- > > Key: SPARK-1 > URL: https://issues.apache.org/jira/browse/SPARK-1 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 1.6.0 > Environment: AWS EMR 4.3, can also be reproduced locally >Reporter: Ameen Tayyebi > Attachments: repro.scala > > > We've identified a problem with Spark scheduling. The issue manifests itself > when an RDD calls SparkContext.parallelize within its getPartitions method. > This seemingly "recursive" call causes the problem. We have a repro case that > can easily be run. > Please advise on what the issue might be and how we can work around it in the > mean time. > I've attached repro.scala which can simply be pasted in spark-shell to > reproduce the problem. > Why are we calling sc.parallelize in production within getPartitions? Well, > we have an RDD that is composed of several thousands of Parquet files. To > compute the partitioning strategy for this RDD, we create an RDD to read all > file sizes from S3 in parallel, so that we can quickly determine the proper > partitions. We do this to avoid executing this serially from the master node > which can result in significant slowness in the execution. Pseudo-code: > val splitInfo = sc.parallelize(filePaths).map(f => (f, > s3.getObjectSummary)).collect() > A similar logic is used in DataFrame by Spark itself: > https://github.com/apache/spark/blob/branch-1.6/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala#L902 > > Thanks, > -Ameen -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-17777) Spark Scheduler Hangs Indefinitely
[ https://issues.apache.org/jira/browse/SPARK-1?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ameen Tayyebi updated SPARK-1: -- Attachment: repro.scala > Spark Scheduler Hangs Indefinitely > -- > > Key: SPARK-1 > URL: https://issues.apache.org/jira/browse/SPARK-1 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 1.6.0 > Environment: AWS EMR 4.3, can also be reproduced locally >Reporter: Ameen Tayyebi > Attachments: repro.scala > > > We've identified a problem with Spark scheduling. The issue manifests itself > when an RDD calls SparkContext.parallelize within its getPartitions method. > This seemingly "recursive" call causes the problem. We have a repro case that > can easily be run. > Please advise on what the issue might be and how we can work around it in the > mean time. > Thanks, > -Ameen -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Issue Comment Deleted] (SPARK-17777) Spark Scheduler Hangs Indefinitely
[ https://issues.apache.org/jira/browse/SPARK-1?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ameen Tayyebi updated SPARK-1: -- Comment: was deleted (was: Here's repro code: import org.apache.spark.{Partition, SparkContext, TaskContext} import org.apache.spark.rdd.RDD class testRDD(@transient sc: SparkContext) extends RDD[(String, Int)](sc, Nil) with Serializable{ override def getPartitions: Array[Partition] = { sc.parallelize(Seq(("a",1),("b",2))).reduceByKey(_+_).collect() val result = new Array[Partition](4) for (i <- 0 until 4) { result(i) = new Partition { override def index: Int = 0 } } result } override def compute(split: Partition, context: TaskContext): Iterator[(String,Int)] = Seq(("a",3),("b",4)).iterator } val y = new testRDD(sc) y.map(r => r).reduceByKey(_+_).count() This can be simply pasted in spark-shell.) > Spark Scheduler Hangs Indefinitely > -- > > Key: SPARK-1 > URL: https://issues.apache.org/jira/browse/SPARK-1 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 1.6.0 > Environment: AWS EMR 4.3, can also be reproduced locally >Reporter: Ameen Tayyebi > > We've identified a problem with Spark scheduling. The issue manifests itself > when an RDD calls SparkContext.parallelize within its getPartitions method. > This seemingly "recursive" call causes the problem. We have a repro case that > can easily be run. > Please advise on what the issue might be and how we can work around it in the > mean time. > Thanks, > -Ameen -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-17777) Spark Scheduler Hangs Indefinitely
[ https://issues.apache.org/jira/browse/SPARK-1?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15546204#comment-15546204 ] Ameen Tayyebi commented on SPARK-1: --- Here's repro code: import org.apache.spark.{Partition, SparkContext, TaskContext} import org.apache.spark.rdd.RDD class testRDD(@transient sc: SparkContext) extends RDD[(String, Int)](sc, Nil) with Serializable{ override def getPartitions: Array[Partition] = { sc.parallelize(Seq(("a",1),("b",2))).reduceByKey(_+_).collect() val result = new Array[Partition](4) for (i <- 0 until 4) { result(i) = new Partition { override def index: Int = 0 } } result } override def compute(split: Partition, context: TaskContext): Iterator[(String,Int)] = Seq(("a",3),("b",4)).iterator } val y = new testRDD(sc) y.map(r => r).reduceByKey(_+_).count() This can be simply pasted in spark-shell. > Spark Scheduler Hangs Indefinitely > -- > > Key: SPARK-1 > URL: https://issues.apache.org/jira/browse/SPARK-1 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 1.6.0 > Environment: AWS EMR 4.3, can also be reproduced locally >Reporter: Ameen Tayyebi > > We've identified a problem with Spark scheduling. The issue manifests itself > when an RDD calls SparkContext.parallelize within its getPartitions method. > This seemingly "recursive" call causes the problem. We have a repro case that > can easily be run. > Please advise on what the issue might be and how we can work around it in the > mean time. > Thanks, > -Ameen -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-17777) Spark Scheduler Hangs Indefinitely
Ameen Tayyebi created SPARK-1: - Summary: Spark Scheduler Hangs Indefinitely Key: SPARK-1 URL: https://issues.apache.org/jira/browse/SPARK-1 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 1.6.0 Environment: AWS EMR 4.3, can also be reproduced locally Reporter: Ameen Tayyebi We've identified a problem with Spark scheduling. The issue manifests itself when an RDD calls SparkContext.parallelize within its getPartitions method. This seemingly "recursive" call causes the problem. We have a repro case that can easily be run. Please advise on what the issue might be and how we can work around it in the mean time. Thanks, -Ameen -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org