[jira] [Commented] (SPARK-23443) Spark with Glue as external catalog

2018-09-05 Thread Ameen Tayyebi (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-23443?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16604441#comment-16604441
 ] 

Ameen Tayyebi commented on SPARK-23443:
---

I've been sidetracked with lots of other projects, so at this time, I don't
have bandwidth to work on this unfortunately :( :(




> Spark with Glue as external catalog
> ---
>
> Key: SPARK-23443
> URL: https://issues.apache.org/jira/browse/SPARK-23443
> Project: Spark
>  Issue Type: New Feature
>  Components: Spark Core
>Affects Versions: 2.4.0
>Reporter: Ameen Tayyebi
>Priority: Major
>
> AWS Glue Catalog is an external Hive metastore backed by a web service. It 
> allows permanent storage of catalog data for BigData use cases.
> To find out more information about AWS Glue, please consult:
>  * AWS Glue - [https://aws.amazon.com/glue/]
>  * Using Glue as a Metastore catalog for Spark - 
> [https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-spark-glue.html]
> Today, the integration of Glue and Spark is through the Hive layer. Glue 
> implements the IMetaStore interface of Hive and for installations of Spark 
> that contain Hive, Glue can be used as the metastore.
> The feature set that Glue supports does not align 1-1 with the set of 
> features that the latest version of Spark supports. For example, Glue 
> interface supports more advanced partition pruning that the latest version of 
> Hive embedded in Spark.
> To enable a more natural integration with Spark and to allow leveraging 
> latest features of Glue, without being coupled to Hive, a direct integration 
> through Spark's own Catalog API is proposed. This Jira tracks this work.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-23443) Spark with Glue as external catalog

2018-03-01 Thread Ameen Tayyebi (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23443?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16382117#comment-16382117
 ] 

Ameen Tayyebi commented on SPARK-23443:
---

Great, thank you so much. I've been stuck with bunch of other tasks at work
unfortunately. I think I'll be able to pick this up again in 2-3 weeks.
I'll try to get a small change out so that we can start iterating on it.




> Spark with Glue as external catalog
> ---
>
> Key: SPARK-23443
> URL: https://issues.apache.org/jira/browse/SPARK-23443
> Project: Spark
>  Issue Type: New Feature
>  Components: Spark Core
>Affects Versions: 2.4.0
>Reporter: Ameen Tayyebi
>Priority: Major
>
> AWS Glue Catalog is an external Hive metastore backed by a web service. It 
> allows permanent storage of catalog data for BigData use cases.
> To find out more information about AWS Glue, please consult:
>  * AWS Glue - [https://aws.amazon.com/glue/]
>  * Using Glue as a Metastore catalog for Spark - 
> [https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-spark-glue.html]
> Today, the integration of Glue and Spark is through the Hive layer. Glue 
> implements the IMetaStore interface of Hive and for installations of Spark 
> that contain Hive, Glue can be used as the metastore.
> The feature set that Glue supports does not align 1-1 with the set of 
> features that the latest version of Spark supports. For example, Glue 
> interface supports more advanced partition pruning that the latest version of 
> Hive embedded in Spark.
> To enable a more natural integration with Spark and to allow leveraging 
> latest features of Glue, without being coupled to Hive, a direct integration 
> through Spark's own Catalog API is proposed. This Jira tracks this work.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-23443) Spark with Glue as external catalog

2018-02-20 Thread Ameen Tayyebi (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23443?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16370015#comment-16370015
 ] 

Ameen Tayyebi commented on SPARK-23443:
---

Hello,

I have this locally working (just limited read cases) and plan to submit my
first iteration this week.

It would be great to work on this together! Any help would be appreciated :)

Ameen




> Spark with Glue as external catalog
> ---
>
> Key: SPARK-23443
> URL: https://issues.apache.org/jira/browse/SPARK-23443
> Project: Spark
>  Issue Type: New Feature
>  Components: Spark Core
>Affects Versions: 2.4.0
>Reporter: Ameen Tayyebi
>Priority: Major
>
> AWS Glue Catalog is an external Hive metastore backed by a web service. It 
> allows permanent storage of catalog data for BigData use cases.
> To find out more information about AWS Glue, please consult:
>  * AWS Glue - [https://aws.amazon.com/glue/]
>  * Using Glue as a Metastore catalog for Spark - 
> [https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-spark-glue.html]
> Today, the integration of Glue and Spark is through the Hive layer. Glue 
> implements the IMetaStore interface of Hive and for installations of Spark 
> that contain Hive, Glue can be used as the metastore.
> The feature set that Glue supports does not align 1-1 with the set of 
> features that the latest version of Spark supports. For example, Glue 
> interface supports more advanced partition pruning that the latest version of 
> Hive embedded in Spark.
> To enable a more natural integration with Spark and to allow leveraging 
> latest features of Glue, without being coupled to Hive, a direct integration 
> through Spark's own Catalog API is proposed. This Jira tracks this work.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Resolved] (SPARK-22913) Hive Partition Pruning, Fractional and Timestamp types

2018-02-15 Thread Ameen Tayyebi (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-22913?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ameen Tayyebi resolved SPARK-22913.
---
Resolution: Won't Fix

Resolving in favor of native Glue integration. These advanced predicates can't 
be supported because the version of Hive embedded in Spark does not support 
them.

> Hive Partition Pruning, Fractional and Timestamp types
> --
>
> Key: SPARK-22913
> URL: https://issues.apache.org/jira/browse/SPARK-22913
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Ameen Tayyebi
>Priority: Major
>
> Spark currently pushes the predicates it has in the SQL query to Hive 
> Metastore. This only applies to predicates that are placed on top of 
> partitioning columns. As more and more hive metastore implementations come 
> around, this is an important optimization to allow data to be prefiltered to 
> only relevant partitions. Consider the following example:
> Table:
> create external table data (key string, quantity long)
> partitioned by (processing-date timestamp)
> Query:
> select * from data where processing-date = '2017-10-23 00:00:00'
> Currently, no filters will be pushed to the hive metastore for the above 
> query. The reason is that the code that tries to compute predicates to be 
> sent to hive metastore, only deals with integral and string column types. It 
> doesn't know how to handle fractional and timestamp columns.
> I have tables in my metastore (AWS Glue) with millions of partitions of type 
> timestamp and double. In my specific case, it takes Spark's master node about 
> 6.5 minutes to download all partitions for the table, and then filter the 
> partitions client-side. The actual processing time of my query is only 6 
> seconds. In other words, without partition pruning, I'm looking at 6.5 
> minutes of processing and with partition pruning, I'm looking at 6 seconds 
> only.
> I have a fix for this developed locally that I'll provide shortly as a pull 
> request.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-23443) Spark with Glue as external catalog

2018-02-15 Thread Ameen Tayyebi (JIRA)
Ameen Tayyebi created SPARK-23443:
-

 Summary: Spark with Glue as external catalog
 Key: SPARK-23443
 URL: https://issues.apache.org/jira/browse/SPARK-23443
 Project: Spark
  Issue Type: New Feature
  Components: Spark Core
Affects Versions: 2.4.0
Reporter: Ameen Tayyebi


AWS Glue Catalog is an external Hive metastore backed by a web service. It 
allows permanent storage of catalog data for BigData use cases.

To find out more information about AWS Glue, please consult:
 * AWS Glue - [https://aws.amazon.com/glue/]
 * Using Glue as a Metastore catalog for Spark - 
[https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-spark-glue.html]

Today, the integration of Glue and Spark is through the Hive layer. Glue 
implements the IMetaStore interface of Hive and for installations of Spark that 
contain Hive, Glue can be used as the metastore.

The feature set that Glue supports does not align 1-1 with the set of features 
that the latest version of Spark supports. For example, Glue interface supports 
more advanced partition pruning that the latest version of Hive embedded in 
Spark.

To enable a more natural integration with Spark and to allow leveraging latest 
features of Glue, without being coupled to Hive, a direct integration through 
Spark's own Catalog API is proposed. This Jira tracks this work.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-22913) Hive Partition Pruning, Fractional and Timestamp types

2017-12-27 Thread Ameen Tayyebi (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16305018#comment-16305018
 ] 

Ameen Tayyebi edited comment on SPARK-22913 at 12/28/17 3:31 AM:
-

Note: I'll be away from December 30th until January 18th so I'll be checking up 
on this issue and the pull request at that time.


was (Author: ameen.tayy...@gmail.com):
Note: I'll be away since December 30th until January 18th so I'll be checking 
up on this issue and the pull request at that time.

> Hive Partition Pruning, Fractional and Timestamp types
> --
>
> Key: SPARK-22913
> URL: https://issues.apache.org/jira/browse/SPARK-22913
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Ameen Tayyebi
> Fix For: 2.3.0
>
>
> Spark currently pushes the predicates it has in the SQL query to Hive 
> Metastore. This only applies to predicates that are placed on top of 
> partitioning columns. As more and more hive metastore implementations come 
> around, this is an important optimization to allow data to be prefiltered to 
> only relevant partitions. Consider the following example:
> Table:
> create external table data (key string, quantity long)
> partitioned by (processing-date timestamp)
> Query:
> select * from data where processing-date = '2017-10-23 00:00:00'
> Currently, no filters will be pushed to the hive metastore for the above 
> query. The reason is that the code that tries to compute predicates to be 
> sent to hive metastore, only deals with integral and string column types. It 
> doesn't know how to handle fractional and timestamp columns.
> I have tables in my metastore (AWS Glue) with millions of partitions of type 
> timestamp and double. In my specific case, it takes Spark's master node about 
> 6.5 minutes to download all partitions for the table, and then filter the 
> partitions client-side. The actual processing time of my query is only 6 
> seconds. In other words, without partition pruning, I'm looking at 6.5 
> minutes of processing and with partition pruning, I'm looking at 6 seconds 
> only.
> I have a fix for this developed locally that I'll provide shortly as a pull 
> request.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-22913) Hive Partition Pruning, Fractional and Timestamp types

2017-12-27 Thread Ameen Tayyebi (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-22913?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16305018#comment-16305018
 ] 

Ameen Tayyebi commented on SPARK-22913:
---

Note: I'll be away since December 30th until January 18th so I'll be checking 
up on this issue and the pull request at that time.

> Hive Partition Pruning, Fractional and Timestamp types
> --
>
> Key: SPARK-22913
> URL: https://issues.apache.org/jira/browse/SPARK-22913
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Ameen Tayyebi
> Fix For: 2.3.0
>
>
> Spark currently pushes the predicates it has in the SQL query to Hive 
> Metastore. This only applies to predicates that are placed on top of 
> partitioning columns. As more and more hive metastore implementations come 
> around, this is an important optimization to allow data to be prefiltered to 
> only relevant partitions. Consider the following example:
> Table:
> create external table data (key string, quantity long)
> partitioned by (processing-date timestamp)
> Query:
> select * from data where processing-date = '2017-10-23 00:00:00'
> Currently, no filters will be pushed to the hive metastore for the above 
> query. The reason is that the code that tries to compute predicates to be 
> sent to hive metastore, only deals with integral and string column types. It 
> doesn't know how to handle fractional and timestamp columns.
> I have tables in my metastore (AWS Glue) with millions of partitions of type 
> timestamp and double. In my specific case, it takes Spark's master node about 
> 6.5 minutes to download all partitions for the table, and then filter the 
> partitions client-side. The actual processing time of my query is only 6 
> seconds. In other words, without partition pruning, I'm looking at 6.5 
> minutes of processing and with partition pruning, I'm looking at 6 seconds 
> only.
> I have a fix for this developed locally that I'll provide shortly as a pull 
> request.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-22913) Hive Partition Pruning, Fractional and Timestamp types

2017-12-27 Thread Ameen Tayyebi (JIRA)
Ameen Tayyebi created SPARK-22913:
-

 Summary: Hive Partition Pruning, Fractional and Timestamp types
 Key: SPARK-22913
 URL: https://issues.apache.org/jira/browse/SPARK-22913
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 2.3.0
Reporter: Ameen Tayyebi
 Fix For: 2.3.0


Spark currently pushes the predicates it has in the SQL query to Hive 
Metastore. This only applies to predicates that are placed on top of 
partitioning columns. As more and more hive metastore implementations come 
around, this is an important optimization to allow data to be prefiltered to 
only relevant partitions. Consider the following example:

Table:
create external table data (key string, quantity long)
partitioned by (processing-date timestamp)

Query:
select * from data where processing-date = '2017-10-23 00:00:00'

Currently, no filters will be pushed to the hive metastore for the above query. 
The reason is that the code that tries to compute predicates to be sent to hive 
metastore, only deals with integral and string column types. It doesn't know 
how to handle fractional and timestamp columns.

I have tables in my metastore (AWS Glue) with millions of partitions of type 
timestamp and double. In my specific case, it takes Spark's master node about 
6.5 minutes to download all partitions for the table, and then filter the 
partitions client-side. The actual processing time of my query is only 6 
seconds. In other words, without partition pruning, I'm looking at 6.5 minutes 
of processing and with partition pruning, I'm looking at 6 seconds only.

I have a fix for this developed locally that I'll provide shortly as a pull 
request.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-14651) CREATE TEMPORARY TABLE is not supported yet

2017-11-24 Thread Ameen Tayyebi (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-14651?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16265545#comment-16265545
 ] 

Ameen Tayyebi commented on SPARK-14651:
---

This blocks some of our production use cases :( we only expose Spark-SQL 
interface to our customers, so using registerTempTable is not a choice for 
them. 

Any words on priority of this? Is this fix a matter of "plumbing" given that 
registerTempTable is already implemented?

> CREATE TEMPORARY TABLE is not supported yet
> ---
>
> Key: SPARK-14651
> URL: https://issues.apache.org/jira/browse/SPARK-14651
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.0.0, 2.2.0
>Reporter: Jacek Laskowski
>Priority: Minor
>
> With today's master it seems that {{CREATE TEMPORARY TABLE}} may or may not 
> work depending on how complete the DDL is (?)
> {code}
> scala> sql("CREATE temporary table t2")
> 16/04/14 23:29:26 INFO HiveSqlParser: Parsing command: CREATE temporary table 
> t2
> org.apache.spark.sql.catalyst.parser.ParseException:
> CREATE TEMPORARY TABLE is not supported yet. Please use registerTempTable as 
> an alternative.(line 1, pos 0)
> == SQL ==
> CREATE temporary table t2
> ^^^
>   at 
> org.apache.spark.sql.hive.execution.HiveSqlAstBuilder$$anonfun$visitCreateTable$1.apply(HiveSqlParser.scala:169)
>   at 
> org.apache.spark.sql.hive.execution.HiveSqlAstBuilder$$anonfun$visitCreateTable$1.apply(HiveSqlParser.scala:165)
>   at 
> org.apache.spark.sql.catalyst.parser.ParserUtils$.withOrigin(ParserUtils.scala:85)
>   at 
> org.apache.spark.sql.hive.execution.HiveSqlAstBuilder.visitCreateTable(HiveSqlParser.scala:165)
>   at 
> org.apache.spark.sql.hive.execution.HiveSqlAstBuilder.visitCreateTable(HiveSqlParser.scala:53)
>   at 
> org.apache.spark.sql.catalyst.parser.SqlBaseParser$CreateTableContext.accept(SqlBaseParser.java:1049)
>   at 
> org.antlr.v4.runtime.tree.AbstractParseTreeVisitor.visit(AbstractParseTreeVisitor.java:42)
>   at 
> org.apache.spark.sql.catalyst.parser.AstBuilder$$anonfun$visitSingleStatement$1.apply(AstBuilder.scala:63)
>   at 
> org.apache.spark.sql.catalyst.parser.AstBuilder$$anonfun$visitSingleStatement$1.apply(AstBuilder.scala:63)
>   at 
> org.apache.spark.sql.catalyst.parser.ParserUtils$.withOrigin(ParserUtils.scala:85)
>   at 
> org.apache.spark.sql.catalyst.parser.AstBuilder.visitSingleStatement(AstBuilder.scala:62)
>   at 
> org.apache.spark.sql.catalyst.parser.AbstractSqlParser$$anonfun$parsePlan$1.apply(ParseDriver.scala:54)
>   at 
> org.apache.spark.sql.catalyst.parser.AbstractSqlParser$$anonfun$parsePlan$1.apply(ParseDriver.scala:53)
>   at 
> org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parse(ParseDriver.scala:86)
>   at 
> org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parsePlan(ParseDriver.scala:53)
>   at org.apache.spark.sql.SQLContext.parseSql(SQLContext.scala:198)
>   at 
> org.apache.spark.sql.hive.HiveContext.org$apache$spark$sql$hive$HiveContext$$super$parseSql(HiveContext.scala:201)
>   at 
> org.apache.spark.sql.hive.HiveContext$$anonfun$parseSql$1.apply(HiveContext.scala:201)
>   at 
> org.apache.spark.sql.hive.HiveContext$$anonfun$parseSql$1.apply(HiveContext.scala:201)
>   at 
> org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$withHiveState$1.apply(HiveClientImpl.scala:228)
>   at 
> org.apache.spark.sql.hive.client.HiveClientImpl.liftedTree1$1(HiveClientImpl.scala:175)
>   at 
> org.apache.spark.sql.hive.client.HiveClientImpl.retryLocked(HiveClientImpl.scala:174)
>   at 
> org.apache.spark.sql.hive.client.HiveClientImpl.withHiveState(HiveClientImpl.scala:217)
>   at org.apache.spark.sql.hive.HiveContext.parseSql(HiveContext.scala:200)
>   at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:765)
>   ... 48 elided
> scala> sql("CREATE temporary table t2 USING PARQUET OPTIONS (PATH 'hello') AS 
> SELECT * FROM t1")
> 16/04/14 23:30:21 INFO HiveSqlParser: Parsing command: CREATE temporary table 
> t2 USING PARQUET OPTIONS (PATH 'hello') AS SELECT * FROM t1
> org.apache.spark.sql.AnalysisException: Table or View not found: t1; line 1 
> pos 80
>   at 
> org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
>   at 
> org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.org$apache$spark$sql$catalyst$analysis$Analyzer$ResolveRelations$$getTable(Analyzer.scala:412)
>   at 
> org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$8.applyOrElse(Analyzer.scala:421)
>   at 
> org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$8.applyOrElse(Analyzer.scala:416)
>   at 
> org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolveOperators$1.apply(LogicalPlan.scala:58)
>   at 
> 

[jira] [Commented] (SPARK-20588) from_utc_timestamp causes bottleneck

2017-05-04 Thread Ameen Tayyebi (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-20588?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15996794#comment-15996794
 ] 

Ameen Tayyebi commented on SPARK-20588:
---

You could consider caching per thread as well which would simplify the 
implementation since you no longer need a concurrent data structure to store 
the cached values.

> from_utc_timestamp causes bottleneck
> 
>
> Key: SPARK-20588
> URL: https://issues.apache.org/jira/browse/SPARK-20588
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 2.0.2
> Environment: AWS EMR AMI 5.2.1
>Reporter: Ameen Tayyebi
>
> We have a SQL query that makes use of the from_utc_timestamp function like 
> so: from_utc_timestamp(itemSigningTime,'America/Los_Angeles')
> This causes a major bottleneck. Our exact call is:
> date_add(from_utc_timestamp(itemSigningTime,'America/Los_Angeles'), 1)
> Switching from the above to date_add(itemSigningTime, 1) reduces the job 
> running time from 40 minutes to 9.
> When from_utc_timestamp function is used, several threads in the executors 
> are in the BLOCKED state, on this call stack:
> "Executor task launch worker-63" #261 daemon prio=5 os_prio=0 
> tid=0x7f848472e000 nid=0x4294 waiting for monitor entry 
> [0x7f501981c000]
>java.lang.Thread.State: BLOCKED (on object monitor)
> at java.util.TimeZone.getTimeZone(TimeZone.java:516)
> - waiting to lock <0x7f5216c2aa58> (a java.lang.Class for 
> java.util.TimeZone)
> at 
> org.apache.spark.sql.catalyst.util.DateTimeUtils$.stringToTimestamp(DateTimeUtils.scala:356)
> at 
> org.apache.spark.sql.catalyst.util.DateTimeUtils.stringToTimestamp(DateTimeUtils.scala)
> at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithKeys$(Unknown
>  Source)
> at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
>  Source)
> at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
> at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
> at 
> org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:161)
> at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
> at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
> at org.apache.spark.scheduler.Task.run(Task.scala:86)
> at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> Can we cache the locale's once per JVM so that we don't do this for every 
> record?



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-20588) from_utc_timestamp causes bottleneck

2017-05-03 Thread Ameen Tayyebi (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-20588?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15995499#comment-15995499
 ] 

Ameen Tayyebi edited comment on SPARK-20588 at 5/3/17 7:40 PM:
---

Hopefully more readable version of the call stack:

{code:java}
"Executor task launch worker-63" #261 daemon prio=5 os_prio=0 
tid=0x7f848472e000 nid=0x4294 waiting for monitor entry [0x7f501981c000]
   java.lang.Thread.State: BLOCKED (on object monitor)
at java.util.TimeZone.getTimeZone(TimeZone.java:516)
- waiting to lock <0x7f5216c2aa58> (a java.lang.Class for 
java.util.TimeZone)
at 
org.apache.spark.sql.catalyst.util.DateTimeUtils$.stringToTimestamp(DateTimeUtils.scala:356)
at 
org.apache.spark.sql.catalyst.util.DateTimeUtils.stringToTimestamp(DateTimeUtils.scala)
at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithKeys$(Unknown
 Source)
at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
 Source)
at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at 
org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:161)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
at org.apache.spark.scheduler.Task.run(Task.scala:86)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
{code}


was (Author: ameen.tayy...@gmail.com):
Hopefully more readable version of the call stack:

"Executor task launch worker-63" #261 daemon prio=5 os_prio=0 
tid=0x7f848472e000 nid=0x4294 waiting for monitor entry [0x7f501981c000]
   java.lang.Thread.State: BLOCKED (on object monitor)
at java.util.TimeZone.getTimeZone(TimeZone.java:516)
- waiting to lock <0x7f5216c2aa58> (a java.lang.Class for 
java.util.TimeZone)
at 
org.apache.spark.sql.catalyst.util.DateTimeUtils$.stringToTimestamp(DateTimeUtils.scala:356)
at 
org.apache.spark.sql.catalyst.util.DateTimeUtils.stringToTimestamp(DateTimeUtils.scala)
at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithKeys$(Unknown
 Source)
at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
 Source)
at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at 
org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:161)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
at org.apache.spark.scheduler.Task.run(Task.scala:86)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

> from_utc_timestamp causes bottleneck
> 
>
> Key: SPARK-20588
> URL: https://issues.apache.org/jira/browse/SPARK-20588
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.0.2
> Environment: AWS EMR AMI 5.2.1
>Reporter: Ameen Tayyebi
>
> We have a SQL query that makes use of the from_utc_timestamp function like 
> so: from_utc_timestamp(itemSigningTime,'America/Los_Angeles')
> This causes a major bottleneck. Our exact call is:
> date_add(from_utc_timestamp(itemSigningTime,'America/Los_Angeles'), 1)
> Switching from the above to date_add(itemSigningTime, 1) reduces the job 
> running time from 40 minutes to 9.
> When from_utc_timestamp function is used, several threads in the executors 
> are in the BLOCKED state, on this call stack:
> "Executor task launch worker-63" #261 daemon prio=5 os_prio=0 
> tid=0x7f848472e000 nid=0x4294 waiting for monitor entry 
> [0x7f501981c000]
>

[jira] [Commented] (SPARK-20588) from_utc_timestamp causes bottleneck

2017-05-03 Thread Ameen Tayyebi (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-20588?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15995499#comment-15995499
 ] 

Ameen Tayyebi commented on SPARK-20588:
---

Hopefully more readable version of the call stack:

"Executor task launch worker-63" #261 daemon prio=5 os_prio=0 
tid=0x7f848472e000 nid=0x4294 waiting for monitor entry [0x7f501981c000]
   java.lang.Thread.State: BLOCKED (on object monitor)
at java.util.TimeZone.getTimeZone(TimeZone.java:516)
- waiting to lock <0x7f5216c2aa58> (a java.lang.Class for 
java.util.TimeZone)
at 
org.apache.spark.sql.catalyst.util.DateTimeUtils$.stringToTimestamp(DateTimeUtils.scala:356)
at 
org.apache.spark.sql.catalyst.util.DateTimeUtils.stringToTimestamp(DateTimeUtils.scala)
at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithKeys$(Unknown
 Source)
at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
 Source)
at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at 
org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:161)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
at org.apache.spark.scheduler.Task.run(Task.scala:86)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

> from_utc_timestamp causes bottleneck
> 
>
> Key: SPARK-20588
> URL: https://issues.apache.org/jira/browse/SPARK-20588
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.0.2
> Environment: AWS EMR AMI 5.2.1
>Reporter: Ameen Tayyebi
>
> We have a SQL query that makes use of the from_utc_timestamp function like 
> so: from_utc_timestamp(itemSigningTime,'America/Los_Angeles')
> This causes a major bottleneck. Our exact call is:
> date_add(from_utc_timestamp(itemSigningTime,'America/Los_Angeles'), 1)
> Switching from the above to date_add(itemSigningTime, 1) reduces the job 
> running time from 40 minutes to 9.
> When from_utc_timestamp function is used, several threads in the executors 
> are in the BLOCKED state, on this call stack:
> "Executor task launch worker-63" #261 daemon prio=5 os_prio=0 
> tid=0x7f848472e000 nid=0x4294 waiting for monitor entry 
> [0x7f501981c000]
>java.lang.Thread.State: BLOCKED (on object monitor)
> at java.util.TimeZone.getTimeZone(TimeZone.java:516)
> - waiting to lock <0x7f5216c2aa58> (a java.lang.Class for 
> java.util.TimeZone)
> at 
> org.apache.spark.sql.catalyst.util.DateTimeUtils$.stringToTimestamp(DateTimeUtils.scala:356)
> at 
> org.apache.spark.sql.catalyst.util.DateTimeUtils.stringToTimestamp(DateTimeUtils.scala)
> at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithKeys$(Unknown
>  Source)
> at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
>  Source)
> at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
> at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
> at 
> org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:161)
> at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
> at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
> at org.apache.spark.scheduler.Task.run(Task.scala:86)
> at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> Can we cache the locale's once per JVM so that we don't do this for every 
> record?



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: 

[jira] [Created] (SPARK-20588) from_utc_timestamp causes bottleneck

2017-05-03 Thread Ameen Tayyebi (JIRA)
Ameen Tayyebi created SPARK-20588:
-

 Summary: from_utc_timestamp causes bottleneck
 Key: SPARK-20588
 URL: https://issues.apache.org/jira/browse/SPARK-20588
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 2.0.2
 Environment: AWS EMR AMI 5.2.1
Reporter: Ameen Tayyebi


We have a SQL query that makes use of the from_utc_timestamp function like so: 
from_utc_timestamp(itemSigningTime,'America/Los_Angeles')

This causes a major bottleneck. Our exact call is:
date_add(from_utc_timestamp(itemSigningTime,'America/Los_Angeles'), 1)

Switching from the above to date_add(itemSigningTime, 1) reduces the job 
running time from 40 minutes to 9.

When from_utc_timestamp function is used, several threads in the executors are 
in the BLOCKED state, on this call stack:

"Executor task launch worker-63" #261 daemon prio=5 os_prio=0 
tid=0x7f848472e000 nid=0x4294 waiting for monitor entry [0x7f501981c000]
   java.lang.Thread.State: BLOCKED (on object monitor)
at java.util.TimeZone.getTimeZone(TimeZone.java:516)
- waiting to lock <0x7f5216c2aa58> (a java.lang.Class for 
java.util.TimeZone)
at 
org.apache.spark.sql.catalyst.util.DateTimeUtils$.stringToTimestamp(DateTimeUtils.scala:356)
at 
org.apache.spark.sql.catalyst.util.DateTimeUtils.stringToTimestamp(DateTimeUtils.scala)
at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithKeys$(Unknown
 Source)
at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
 Source)
at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at 
org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:161)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
at org.apache.spark.scheduler.Task.run(Task.scala:86)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
 at java.lang.Thread.run(Thread.java:745)

Can we cache the locale's once per JVM so that we don't do this for every 
record?



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-17777) Spark Scheduler Hangs Indefinitely

2016-10-05 Thread Ameen Tayyebi (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-1?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ameen Tayyebi updated SPARK-1:
--
Attachment: jstack-dump.txt

> Spark Scheduler Hangs Indefinitely
> --
>
> Key: SPARK-1
> URL: https://issues.apache.org/jira/browse/SPARK-1
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 1.6.0
> Environment: AWS EMR 4.3, can also be reproduced locally
>Reporter: Ameen Tayyebi
> Attachments: jstack-dump.txt, repro.scala
>
>
> We've identified a problem with Spark scheduling. The issue manifests itself 
> when an RDD calls SparkContext.parallelize within its getPartitions method. 
> This seemingly "recursive" call causes the problem. We have a repro case that 
> can easily be run.
> Please advise on what the issue might be and how we can work around it in the 
> mean time.
> I've attached repro.scala which can simply be pasted in spark-shell to 
> reproduce the problem.
> Why are we calling sc.parallelize in production within getPartitions? Well, 
> we have an RDD that is composed of several thousands of Parquet files. To 
> compute the partitioning strategy for this RDD, we create an RDD to read all 
> file sizes from S3 in parallel, so that we can quickly determine the proper 
> partitions. We do this to avoid executing this serially from the master node 
> which can result in significant slowness in the execution. Pseudo-code:
> val splitInfo = sc.parallelize(filePaths).map(f => (f, 
> s3.getObjectSummary)).collect()
> A similar logic is used in DataFrame by Spark itself:
> https://github.com/apache/spark/blob/branch-1.6/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala#L902
>  
> Thanks,
> -Ameen



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-17777) Spark Scheduler Hangs Indefinitely

2016-10-05 Thread Ameen Tayyebi (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-1?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ameen Tayyebi updated SPARK-1:
--
Attachment: (was: jstack-dump.txt)

> Spark Scheduler Hangs Indefinitely
> --
>
> Key: SPARK-1
> URL: https://issues.apache.org/jira/browse/SPARK-1
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 1.6.0
> Environment: AWS EMR 4.3, can also be reproduced locally
>Reporter: Ameen Tayyebi
> Attachments: repro.scala
>
>
> We've identified a problem with Spark scheduling. The issue manifests itself 
> when an RDD calls SparkContext.parallelize within its getPartitions method. 
> This seemingly "recursive" call causes the problem. We have a repro case that 
> can easily be run.
> Please advise on what the issue might be and how we can work around it in the 
> mean time.
> I've attached repro.scala which can simply be pasted in spark-shell to 
> reproduce the problem.
> Why are we calling sc.parallelize in production within getPartitions? Well, 
> we have an RDD that is composed of several thousands of Parquet files. To 
> compute the partitioning strategy for this RDD, we create an RDD to read all 
> file sizes from S3 in parallel, so that we can quickly determine the proper 
> partitions. We do this to avoid executing this serially from the master node 
> which can result in significant slowness in the execution. Pseudo-code:
> val splitInfo = sc.parallelize(filePaths).map(f => (f, 
> s3.getObjectSummary)).collect()
> A similar logic is used in DataFrame by Spark itself:
> https://github.com/apache/spark/blob/branch-1.6/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala#L902
>  
> Thanks,
> -Ameen



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-17777) Spark Scheduler Hangs Indefinitely

2016-10-05 Thread Ameen Tayyebi (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-1?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ameen Tayyebi updated SPARK-1:
--
Attachment: jstack-dump.txt

Jstack dump of driver when it's hung.

> Spark Scheduler Hangs Indefinitely
> --
>
> Key: SPARK-1
> URL: https://issues.apache.org/jira/browse/SPARK-1
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 1.6.0
> Environment: AWS EMR 4.3, can also be reproduced locally
>Reporter: Ameen Tayyebi
> Attachments: jstack-dump.txt, repro.scala
>
>
> We've identified a problem with Spark scheduling. The issue manifests itself 
> when an RDD calls SparkContext.parallelize within its getPartitions method. 
> This seemingly "recursive" call causes the problem. We have a repro case that 
> can easily be run.
> Please advise on what the issue might be and how we can work around it in the 
> mean time.
> I've attached repro.scala which can simply be pasted in spark-shell to 
> reproduce the problem.
> Why are we calling sc.parallelize in production within getPartitions? Well, 
> we have an RDD that is composed of several thousands of Parquet files. To 
> compute the partitioning strategy for this RDD, we create an RDD to read all 
> file sizes from S3 in parallel, so that we can quickly determine the proper 
> partitions. We do this to avoid executing this serially from the master node 
> which can result in significant slowness in the execution. Pseudo-code:
> val splitInfo = sc.parallelize(filePaths).map(f => (f, 
> s3.getObjectSummary)).collect()
> A similar logic is used in DataFrame by Spark itself:
> https://github.com/apache/spark/blob/branch-1.6/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala#L902
>  
> Thanks,
> -Ameen



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17777) Spark Scheduler Hangs Indefinitely

2016-10-05 Thread Ameen Tayyebi (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-1?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15548901#comment-15548901
 ] 

Ameen Tayyebi commented on SPARK-1:
---

I'll get you the call stack of the driver shortly.

The reason why I'm not doing this from the driver, using multiple threads
is that it takes a long time. That's also why in the other Spark code that
I showed you it's done in parallel (using the cluster).

It was taking about 40 minutes to list 6000 files using 10 threads from the
driver. Keep in mind the files are on S3. We can of course bump up the # of
threads used, but it's not the proper solution. When our customers are
reading that very many files, they tend to use large clusters. By large, I
mean 100 machines, with 8 cores each. It seems wasteful to not use the
executors in those cases and push all the burden to the master. Using the
RDD approach of getting the file sizes, I can compute the same 6000 in
about a minute, including scheduling overhead. I don't have exact data in
front of me right now, but I can dig it up if you really want it. I'd argue
doing this from the master doesn't scale.

I'm putting a workaround in right now, where I perform the computation
outside of the getPartitions method. This is okay for our applications use
case, where people are scheduling jobs. The overall time it takes their job
to process is not impacted by this, so it's cool. However, for
experimentation use case, where scientists are doing things interactively,
now they have to wait when they construct the RDD. Our scientists usually
run experiments on small clusters, hence why I'm pushing to truly postpone
the computation to when it's needed.




> Spark Scheduler Hangs Indefinitely
> --
>
> Key: SPARK-1
> URL: https://issues.apache.org/jira/browse/SPARK-1
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 1.6.0
> Environment: AWS EMR 4.3, can also be reproduced locally
>Reporter: Ameen Tayyebi
> Attachments: repro.scala
>
>
> We've identified a problem with Spark scheduling. The issue manifests itself 
> when an RDD calls SparkContext.parallelize within its getPartitions method. 
> This seemingly "recursive" call causes the problem. We have a repro case that 
> can easily be run.
> Please advise on what the issue might be and how we can work around it in the 
> mean time.
> I've attached repro.scala which can simply be pasted in spark-shell to 
> reproduce the problem.
> Why are we calling sc.parallelize in production within getPartitions? Well, 
> we have an RDD that is composed of several thousands of Parquet files. To 
> compute the partitioning strategy for this RDD, we create an RDD to read all 
> file sizes from S3 in parallel, so that we can quickly determine the proper 
> partitions. We do this to avoid executing this serially from the master node 
> which can result in significant slowness in the execution. Pseudo-code:
> val splitInfo = sc.parallelize(filePaths).map(f => (f, 
> s3.getObjectSummary)).collect()
> A similar logic is used in DataFrame by Spark itself:
> https://github.com/apache/spark/blob/branch-1.6/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala#L902
>  
> Thanks,
> -Ameen



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17777) Spark Scheduler Hangs Indefinitely

2016-10-05 Thread Ameen Tayyebi (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-1?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15548784#comment-15548784
 ] 

Ameen Tayyebi commented on SPARK-1:
---

Thanks for the comments Sean. 

Your statement is about behavior of things that are not supposed to work 
varying is fair. I usually run into situations where if something works, it's 
been intentional. Let me share some more data points with you:

If you change the very last line of the repro steps to y.map(r => r).count() 
then the computation will correctly finish. In other words, this only halts if 
a shuffle occurs.
If you do *not* specify spark.default.parallelism, the same exact repro step 
code will function correctly and finish.

Let's forget about the code I shared regarding DataFrame, I think that's 
distracting us from the issue. I'm not claiming that that piece of code is 
executing in the same "spot" in scheduler as the RDD code I'm providing. My 
intention was to demonstrate that acquiring splitting knowledge in parallel 
using the cluster itself has done before.

Before we can dismiss this as "by design", I would appreciate it if you'd 
explain to me why this is not supposed to work. I haven't found any 
documentation, or any references in code that would hint this is not supposed 
to work. It feels odd that not specifying spark.default.paralellism would make 
this work, or that if there's no shuffle, things work fine.

My use case is this:
I'd like to acquire split information for my RDD, inlined with the Spark 
philosophy of doing things only if necessary (lazy evaluation). Doing this in 
getPartitions allows me to accomplish this. If this is not supported, is there 
an alternative that you can suggest?

Ideally the following code executes "instantly":

val r = new MyCustomRDD();
r.map(a => a);

If I'm forced to pull the computation of partitions out of getPartitions, then 
this line will have to do work:

val r = new MyCustomRDD(); // has to compute partitions even though it's not 
necessary at this point because no action has been performed against the RDD

Thanks for your time!

> Spark Scheduler Hangs Indefinitely
> --
>
> Key: SPARK-1
> URL: https://issues.apache.org/jira/browse/SPARK-1
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 1.6.0
> Environment: AWS EMR 4.3, can also be reproduced locally
>Reporter: Ameen Tayyebi
> Attachments: repro.scala
>
>
> We've identified a problem with Spark scheduling. The issue manifests itself 
> when an RDD calls SparkContext.parallelize within its getPartitions method. 
> This seemingly "recursive" call causes the problem. We have a repro case that 
> can easily be run.
> Please advise on what the issue might be and how we can work around it in the 
> mean time.
> I've attached repro.scala which can simply be pasted in spark-shell to 
> reproduce the problem.
> Why are we calling sc.parallelize in production within getPartitions? Well, 
> we have an RDD that is composed of several thousands of Parquet files. To 
> compute the partitioning strategy for this RDD, we create an RDD to read all 
> file sizes from S3 in parallel, so that we can quickly determine the proper 
> partitions. We do this to avoid executing this serially from the master node 
> which can result in significant slowness in the execution. Pseudo-code:
> val splitInfo = sc.parallelize(filePaths).map(f => (f, 
> s3.getObjectSummary)).collect()
> A similar logic is used in DataFrame by Spark itself:
> https://github.com/apache/spark/blob/branch-1.6/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala#L902
>  
> Thanks,
> -Ameen



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17777) Spark Scheduler Hangs Indefinitely

2016-10-04 Thread Ameen Tayyebi (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-1?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15546799#comment-15546799
 ] 

Ameen Tayyebi commented on SPARK-1:
---

Thanks for the comment. Can you please clarify why you think this should
not be possible?

Interestingly this works in local mode, however hangs in yarn mode. I'd
expect the same API calls to behave identically (semantically) irrespective
of which more you run Spark on. If this is not supported shouldn't we
expect a hang in all cases?

On Oct 4, 2016 2:50 PM, "Sean Owen (JIRA)"  wrote:


[ https://issues.apache.org/jira/browse/SPARK-1?page=
com.atlassian.jira.plugin.system.issuetabpanels:comment-
tabpanel=15546284#comment-15546284 ]

Sean Owen commented on SPARK-1:
---

I think you've demonstrated that this doesn't work, and I wouldn't say
that's supposed to work. The reference you give isn't the same thing in
that it doesn't lead to this hang / infinite loop. So I think the answer
is, you can't do that.

itself when an RDD calls SparkContext.parallelize within its getPartitions
method. This seemingly "recursive" call causes the problem. We have a repro
case that can easily be run.
the mean time.
reproduce the problem.
Well, we have an RDD that is composed of several thousands of Parquet
files. To compute the partitioning strategy for this RDD, we create an RDD
to read all file sizes from S3 in parallel, so that we can quickly
determine the proper partitions. We do this to avoid executing this
serially from the master node which can result in significant slowness in
the execution. Pseudo-code:
s3.getObjectSummary)).collect()
core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala#L902



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


> Spark Scheduler Hangs Indefinitely
> --
>
> Key: SPARK-1
> URL: https://issues.apache.org/jira/browse/SPARK-1
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 1.6.0
> Environment: AWS EMR 4.3, can also be reproduced locally
>Reporter: Ameen Tayyebi
> Attachments: repro.scala
>
>
> We've identified a problem with Spark scheduling. The issue manifests itself 
> when an RDD calls SparkContext.parallelize within its getPartitions method. 
> This seemingly "recursive" call causes the problem. We have a repro case that 
> can easily be run.
> Please advise on what the issue might be and how we can work around it in the 
> mean time.
> I've attached repro.scala which can simply be pasted in spark-shell to 
> reproduce the problem.
> Why are we calling sc.parallelize in production within getPartitions? Well, 
> we have an RDD that is composed of several thousands of Parquet files. To 
> compute the partitioning strategy for this RDD, we create an RDD to read all 
> file sizes from S3 in parallel, so that we can quickly determine the proper 
> partitions. We do this to avoid executing this serially from the master node 
> which can result in significant slowness in the execution. Pseudo-code:
> val splitInfo = sc.parallelize(filePaths).map(f => (f, 
> s3.getObjectSummary)).collect()
> A similar logic is used in DataFrame by Spark itself:
> https://github.com/apache/spark/blob/branch-1.6/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala#L902
>  
> Thanks,
> -Ameen



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-17777) Spark Scheduler Hangs Indefinitely

2016-10-04 Thread Ameen Tayyebi (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-1?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ameen Tayyebi updated SPARK-1:
--
Description: 
We've identified a problem with Spark scheduling. The issue manifests itself 
when an RDD calls SparkContext.parallelize within its getPartitions method. 
This seemingly "recursive" call causes the problem. We have a repro case that 
can easily be run.

Please advise on what the issue might be and how we can work around it in the 
mean time.

I've attached repro.scala which can simply be pasted in spark-shell to 
reproduce the problem.

Why are we calling sc.parallelize in production within getPartitions? Well, we 
have an RDD that is composed of several thousands of Parquet files. To compute 
the partitioning strategy for this RDD, we create an RDD to read all file sizes 
from S3 in parallel, so that we can quickly determine the proper partitions. We 
do this to avoid executing this serially from the master node which can result 
in significant slowness in the execution. Pseudo-code:

val splitInfo = sc.parallelize(filePaths).map(f => (f, 
s3.getObjectSummary)).collect()

A similar logic is used in DataFrame by Spark itself:
https://github.com/apache/spark/blob/branch-1.6/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala#L902
 

Thanks,
-Ameen

  was:
We've identified a problem with Spark scheduling. The issue manifests itself 
when an RDD calls SparkContext.parallelize within its getPartitions method. 
This seemingly "recursive" call causes the problem. We have a repro case that 
can easily be run.

Please advise on what the issue might be and how we can work around it in the 
mean time.

Thanks,
-Ameen


> Spark Scheduler Hangs Indefinitely
> --
>
> Key: SPARK-1
> URL: https://issues.apache.org/jira/browse/SPARK-1
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 1.6.0
> Environment: AWS EMR 4.3, can also be reproduced locally
>Reporter: Ameen Tayyebi
> Attachments: repro.scala
>
>
> We've identified a problem with Spark scheduling. The issue manifests itself 
> when an RDD calls SparkContext.parallelize within its getPartitions method. 
> This seemingly "recursive" call causes the problem. We have a repro case that 
> can easily be run.
> Please advise on what the issue might be and how we can work around it in the 
> mean time.
> I've attached repro.scala which can simply be pasted in spark-shell to 
> reproduce the problem.
> Why are we calling sc.parallelize in production within getPartitions? Well, 
> we have an RDD that is composed of several thousands of Parquet files. To 
> compute the partitioning strategy for this RDD, we create an RDD to read all 
> file sizes from S3 in parallel, so that we can quickly determine the proper 
> partitions. We do this to avoid executing this serially from the master node 
> which can result in significant slowness in the execution. Pseudo-code:
> val splitInfo = sc.parallelize(filePaths).map(f => (f, 
> s3.getObjectSummary)).collect()
> A similar logic is used in DataFrame by Spark itself:
> https://github.com/apache/spark/blob/branch-1.6/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala#L902
>  
> Thanks,
> -Ameen



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-17777) Spark Scheduler Hangs Indefinitely

2016-10-04 Thread Ameen Tayyebi (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-1?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ameen Tayyebi updated SPARK-1:
--
Attachment: repro.scala

> Spark Scheduler Hangs Indefinitely
> --
>
> Key: SPARK-1
> URL: https://issues.apache.org/jira/browse/SPARK-1
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 1.6.0
> Environment: AWS EMR 4.3, can also be reproduced locally
>Reporter: Ameen Tayyebi
> Attachments: repro.scala
>
>
> We've identified a problem with Spark scheduling. The issue manifests itself 
> when an RDD calls SparkContext.parallelize within its getPartitions method. 
> This seemingly "recursive" call causes the problem. We have a repro case that 
> can easily be run.
> Please advise on what the issue might be and how we can work around it in the 
> mean time.
> Thanks,
> -Ameen



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Issue Comment Deleted] (SPARK-17777) Spark Scheduler Hangs Indefinitely

2016-10-04 Thread Ameen Tayyebi (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-1?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ameen Tayyebi updated SPARK-1:
--
Comment: was deleted

(was: Here's repro code:

import org.apache.spark.{Partition, SparkContext, TaskContext}
import org.apache.spark.rdd.RDD
 
class testRDD(@transient sc: SparkContext)
  extends RDD[(String, Int)](sc, Nil)
with Serializable{
 
  override def getPartitions: Array[Partition] = {
sc.parallelize(Seq(("a",1),("b",2))).reduceByKey(_+_).collect()
 
val result = new Array[Partition](4)
for (i <- 0 until 4) {
  result(i) = new Partition {
override def index: Int = 0
  }
}
result
  }
 
  override def compute(split: Partition, context: TaskContext):
Iterator[(String,Int)] = Seq(("a",3),("b",4)).iterator
}
 
val y = new testRDD(sc)
y.map(r => r).reduceByKey(_+_).count()

This can be simply pasted in spark-shell.)

> Spark Scheduler Hangs Indefinitely
> --
>
> Key: SPARK-1
> URL: https://issues.apache.org/jira/browse/SPARK-1
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 1.6.0
> Environment: AWS EMR 4.3, can also be reproduced locally
>Reporter: Ameen Tayyebi
>
> We've identified a problem with Spark scheduling. The issue manifests itself 
> when an RDD calls SparkContext.parallelize within its getPartitions method. 
> This seemingly "recursive" call causes the problem. We have a repro case that 
> can easily be run.
> Please advise on what the issue might be and how we can work around it in the 
> mean time.
> Thanks,
> -Ameen



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-17777) Spark Scheduler Hangs Indefinitely

2016-10-04 Thread Ameen Tayyebi (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-1?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15546204#comment-15546204
 ] 

Ameen Tayyebi commented on SPARK-1:
---

Here's repro code:

import org.apache.spark.{Partition, SparkContext, TaskContext}
import org.apache.spark.rdd.RDD
 
class testRDD(@transient sc: SparkContext)
  extends RDD[(String, Int)](sc, Nil)
with Serializable{
 
  override def getPartitions: Array[Partition] = {
sc.parallelize(Seq(("a",1),("b",2))).reduceByKey(_+_).collect()
 
val result = new Array[Partition](4)
for (i <- 0 until 4) {
  result(i) = new Partition {
override def index: Int = 0
  }
}
result
  }
 
  override def compute(split: Partition, context: TaskContext):
Iterator[(String,Int)] = Seq(("a",3),("b",4)).iterator
}
 
val y = new testRDD(sc)
y.map(r => r).reduceByKey(_+_).count()

This can be simply pasted in spark-shell.

> Spark Scheduler Hangs Indefinitely
> --
>
> Key: SPARK-1
> URL: https://issues.apache.org/jira/browse/SPARK-1
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 1.6.0
> Environment: AWS EMR 4.3, can also be reproduced locally
>Reporter: Ameen Tayyebi
>
> We've identified a problem with Spark scheduling. The issue manifests itself 
> when an RDD calls SparkContext.parallelize within its getPartitions method. 
> This seemingly "recursive" call causes the problem. We have a repro case that 
> can easily be run.
> Please advise on what the issue might be and how we can work around it in the 
> mean time.
> Thanks,
> -Ameen



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-17777) Spark Scheduler Hangs Indefinitely

2016-10-04 Thread Ameen Tayyebi (JIRA)
Ameen Tayyebi created SPARK-1:
-

 Summary: Spark Scheduler Hangs Indefinitely
 Key: SPARK-1
 URL: https://issues.apache.org/jira/browse/SPARK-1
 Project: Spark
  Issue Type: Bug
  Components: Spark Core
Affects Versions: 1.6.0
 Environment: AWS EMR 4.3, can also be reproduced locally
Reporter: Ameen Tayyebi


We've identified a problem with Spark scheduling. The issue manifests itself 
when an RDD calls SparkContext.parallelize within its getPartitions method. 
This seemingly "recursive" call causes the problem. We have a repro case that 
can easily be run.

Please advise on what the issue might be and how we can work around it in the 
mean time.

Thanks,
-Ameen



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org