[jira] [Commented] (SPARK-12449) Pushing down arbitrary logical plans to data sources

2016-02-18 Thread Max Seiden (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-12449?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15151904#comment-15151904
 ] 

Max Seiden commented on SPARK-12449:


[~stephank85] Dropping the partitioned bit makes sense for a first shot. As-per 
the discussion above, it seems pretty useful to keep the ability to ask the 
source about pushdown support for plans and exprs. 

Also, one thing I liked in your CatalystSource model was the extra work you did 
to inject subqueries into the plan; getting a good design for a select over a 
single table / subquery seems like a reasonable starting point IMO. It would 
also be nice to strive for the property of rendering a sources.* plan into SQL, 
but that may be a bit of a stretch. :-)

> Pushing down arbitrary logical plans to data sources
> 
>
> Key: SPARK-12449
> URL: https://issues.apache.org/jira/browse/SPARK-12449
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Reporter: Stephan Kessler
> Attachments: pushingDownLogicalPlans.pdf
>
>
> With the help of the DataSource API we can pull data from external sources 
> for processing. Implementing interfaces such as {{PrunedFilteredScan}} allows 
> to push down filters and projects pruning unnecessary fields and rows 
> directly in the data source.
> However, data sources such as SQL Engines are capable of doing even more 
> preprocessing, e.g., evaluating aggregates. This is beneficial because it 
> would reduce the amount of data transferred from the source to Spark. The 
> existing interfaces do not allow such kind of processing in the source.
> We would propose to add a new interface {{CatalystSource}} that allows to 
> defer the processing of arbitrary logical plans to the data source. We have 
> already shown the details at the Spark Summit 2015 Europe 
> [https://spark-summit.org/eu-2015/events/the-pushdown-of-everything/]
> I will add a design document explaining details. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-12449) Pushing down arbitrary logical plans to data sources

2016-02-17 Thread Max Seiden (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-12449?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15151880#comment-15151880
 ] 

Max Seiden commented on SPARK-12449:


Yea, that seems to be the case. There's code in the DataSourceStrategy that 
specifically resolves aliases, but the filtered scan case is pretty narrow 
relative to an expression tree.  

+1 for a generic way to avoid double execution of operations. On the flip, a 
boolean check would drop a neat property of "unhandledFilters" which is that it 
can accept a subset of what the planner tries to push down.

> Pushing down arbitrary logical plans to data sources
> 
>
> Key: SPARK-12449
> URL: https://issues.apache.org/jira/browse/SPARK-12449
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Reporter: Stephan Kessler
> Attachments: pushingDownLogicalPlans.pdf
>
>
> With the help of the DataSource API we can pull data from external sources 
> for processing. Implementing interfaces such as {{PrunedFilteredScan}} allows 
> to push down filters and projects pruning unnecessary fields and rows 
> directly in the data source.
> However, data sources such as SQL Engines are capable of doing even more 
> preprocessing, e.g., evaluating aggregates. This is beneficial because it 
> would reduce the amount of data transferred from the source to Spark. The 
> existing interfaces do not allow such kind of processing in the source.
> We would propose to add a new interface {{CatalystSource}} that allows to 
> defer the processing of arbitrary logical plans to the data source. We have 
> already shown the details at the Spark Summit 2015 Europe 
> [https://spark-summit.org/eu-2015/events/the-pushdown-of-everything/]
> I will add a design document explaining details. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-12449) Pushing down arbitrary logical plans to data sources

2016-02-17 Thread Max Seiden (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-12449?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15151368#comment-15151368
 ] 

Max Seiden commented on SPARK-12449:


Very interested in checking out that PR! It would be prudent to have a holistic 
high-level design for any work here too, mostly to answer a few major 
questions. A random sample of such Qs:

+ Should there be a new trait for each new `sources.*` type, or a single trait 
that communicates capabilities to the planner (i.e. the CatalystSource design)?
  a) a new trait for each source could get unwieldy given the potential # 
of permutations
  b) a single, generic trait is powerful, but it puts a lot of burden on 
the implementer to cover more cases than they may want
 
+ Depending on the above, should source plans be a tree of operators or a list 
of operators to be applied in-order?
  a) the first option is more natural, but is smells a lot like catalyst -- 
not a bad thing if it's a separate, stable API though

+ the more that's pushed down via sources.Expressions, the more complex things 
may get for implementers 
  a) for example, if Aliases are pushed down, there's a lot more 
opportunity for resolution bugs in the source impl
  b) a definitive stance would be needed for exprs like UDFs or those 
dealing with complex types
  c) without a way to signal capabilities (implicitly or explicitly) to the 
planner, there'd likely need to be a way to "bail out"

> Pushing down arbitrary logical plans to data sources
> 
>
> Key: SPARK-12449
> URL: https://issues.apache.org/jira/browse/SPARK-12449
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Reporter: Stephan Kessler
> Attachments: pushingDownLogicalPlans.pdf
>
>
> With the help of the DataSource API we can pull data from external sources 
> for processing. Implementing interfaces such as {{PrunedFilteredScan}} allows 
> to push down filters and projects pruning unnecessary fields and rows 
> directly in the data source.
> However, data sources such as SQL Engines are capable of doing even more 
> preprocessing, e.g., evaluating aggregates. This is beneficial because it 
> would reduce the amount of data transferred from the source to Spark. The 
> existing interfaces do not allow such kind of processing in the source.
> We would propose to add a new interface {{CatalystSource}} that allows to 
> defer the processing of arbitrary logical plans to the data source. We have 
> already shown the details at the Spark Summit 2015 Europe 
> [https://spark-summit.org/eu-2015/events/the-pushdown-of-everything/]
> I will add a design document explaining details. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-12449) Pushing down arbitrary logical plans to data sources

2016-02-17 Thread Max Seiden (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-12449?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15151116#comment-15151116
 ] 

Max Seiden commented on SPARK-12449:


[~rxin] Given that predicate pushdown via `sources.Filter` is (afaik) a stable 
API, conceivably that model could be extended to support ever richer operations 
(i.e. sources.Expression, sources.Limit, sources.Join, sources.Aggregation). In 
this case, the stable APIs remain a derivative of the Catalyst plans and all 
that needs to change between releases is the compilation from Catalyst => 
Sources. 

cc [~marmbrus] since we talked briefly about this idea in person at Spark Summit

> Pushing down arbitrary logical plans to data sources
> 
>
> Key: SPARK-12449
> URL: https://issues.apache.org/jira/browse/SPARK-12449
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Reporter: Stephan Kessler
> Attachments: pushingDownLogicalPlans.pdf
>
>
> With the help of the DataSource API we can pull data from external sources 
> for processing. Implementing interfaces such as {{PrunedFilteredScan}} allows 
> to push down filters and projects pruning unnecessary fields and rows 
> directly in the data source.
> However, data sources such as SQL Engines are capable of doing even more 
> preprocessing, e.g., evaluating aggregates. This is beneficial because it 
> would reduce the amount of data transferred from the source to Spark. The 
> existing interfaces do not allow such kind of processing in the source.
> We would propose to add a new interface {{CatalystSource}} that allows to 
> defer the processing of arbitrary logical plans to the data source. We have 
> already shown the details at the Spark Summit 2015 Europe 
> [https://spark-summit.org/eu-2015/events/the-pushdown-of-everything/]
> I will add a design document explaining details. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-7629) Unroll AggregateFunction update in the case of a single input expression

2015-05-14 Thread Max Seiden (JIRA)
Max Seiden created SPARK-7629:
-

 Summary: Unroll AggregateFunction update in the case of a single 
input expression
 Key: SPARK-7629
 URL: https://issues.apache.org/jira/browse/SPARK-7629
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 1.2.1
Reporter: Max Seiden
Priority: Minor


h3. Summary
This was observed on 1.2.1. Many of the AggregateFunctions take a list of 
Expressions as input, wrap them up in an InterpretedProjection, and evaluate 
the InterpretedProjection to get the arguments for the actual aggregate 
expression. In the case where there is only a single input expression however, 
this leads to gross inefficiencies. 

Take CountDistinctFunction as an example, and assume it has a single input 
expression of type String. In this case spark uses an OpenHashSet[Any] to 
collect a distinct set of GenericRow(Array[Any](string)). This is hugely 
inefficient, since every String object must be wrapped up in first an Array and 
second a GenericRow, and then inserted into the OpenHashSet where all hashing 
and equality comparisons happen on the GenericRow. 

This means that any single OpenHashSet entry has unnecessary overhead from the 
GenericRow and Array[Any], and all hashcode and equality operations must go 
through the Row. In the case of hashcode, this means that every invocation 
requires a while loop and pattern match. In the case of equality, 
Seq[Any].equals is used, which requires (for both the candidate and existing 
objects) a pattern match, call to canEqual, and call to sameElements - the 
last of these (in IterableLike as far as I can tell) constructs a Scala 
Iterator over the Array[Any] and does element-by-element comparisons. 

Needless to say, this requires far too many cycles in the case of a single 
input Expression, has a high and unnecessary memory overhead, and generates a 
ton of garbage. To give a concrete example, I am unable to compute a grand 
distinct on an input of 15M unique IP addresses in a 2GB JVM. After a few 
seconds of running, I start to get substantial GCs, and eventually get into a 
state where the JVM is stuck doing full GCs. Additionally a profile of the 
executor thread shows that 85% of the time is spent rehashing (perhaps 
explainable at that scale), but that 65% of the time is spent in 
GenericRow.hashCode. 

My proposed improvement is to unroll those aggregate functions in the case of a 
single input expression so that the inefficiencies described above can be 
avoided altogether. The only tricky bits here are dealing with NULLs (Row does 
that for us) and efficiently handling both the single input and multi-input 
cases within the same aggregate function impl.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-6652) SQLContext and HiveContext do not handle tricky names well

2015-04-01 Thread Max Seiden (JIRA)
Max Seiden created SPARK-6652:
-

 Summary: SQLContext and HiveContext do not handle tricky names 
well
 Key: SPARK-6652
 URL: https://issues.apache.org/jira/browse/SPARK-6652
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 1.2.1
Reporter: Max Seiden


h3. Summary
There are cases where both the SQLContext and HiveContext fail when handling 
tricky names (containing UTF-8, tabs, newlines, etc) well. For example, the 
following string:

{noformat}
val tricky = Tricky-\u4E2D[x.][\,/\\n * ? é\n$(x)\t(':;#!^-Name
{noformat}

causes the following exceptions during parsing and resolution (respectively).

h5. SQLContext parse failure
{noformat}
// pseudocode
val data = 0 until 100
val rdd = sc.parallelize(data)
val schema = StructType(StructField(Tricky, IntegerType, false) :: Nil)
val schemaRDD = sqlContext.applySchema(rdd.map(i = Row(i)), schema)
schemaRDD.registerAsTable(Tricky)
sqlContext.sql(sselect `$Tricky` from `$Tricky`)

java.lang.RuntimeException: [1.33] failure: ``UNION'' expected but 
ErrorToken(``' expected but 
 found) found

select `Tricky-中[x.][,/\n * ? é

^
at scala.sys.package$.error(package.scala:27)
at 
org.apache.spark.sql.catalyst.AbstractSparkSQLParser.apply(SparkSQLParser.scala:33)
at org.apache.spark.sql.SQLContext$$anonfun$1.apply(SQLContext.scala:79)
at org.apache.spark.sql.SQLContext$$anonfun$1.apply(SQLContext.scala:79)
at 
org.apache.spark.sql.catalyst.SparkSQLParser$$anonfun$org$apache$spark$sql$catalyst$SparkSQLParser$$others$1.apply(SparkSQLParser.scala:174)
at 
org.apache.spark.sql.catalyst.SparkSQLParser$$anonfun$org$apache$spark$sql$catalyst$SparkSQLParser$$others$1.apply(SparkSQLParser.scala:173)
at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:136)
at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:135)
at 
scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
at 
scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
at 
scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
at 
scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1$$anonfun$apply$2.apply(Parsers.scala:254)
at 
scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1$$anonfun$apply$2.apply(Parsers.scala:254)
at 
scala.util.parsing.combinator.Parsers$Failure.append(Parsers.scala:202)
at 
scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
at 
scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
at 
scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
at 
scala.util.parsing.combinator.Parsers$$anon$2$$anonfun$apply$14.apply(Parsers.scala:891)
at 
scala.util.parsing.combinator.Parsers$$anon$2$$anonfun$apply$14.apply(Parsers.scala:891)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at 
scala.util.parsing.combinator.Parsers$$anon$2.apply(Parsers.scala:890)
at 
scala.util.parsing.combinator.PackratParsers$$anon$1.apply(PackratParsers.scala:110)
at 
org.apache.spark.sql.catalyst.AbstractSparkSQLParser.apply(SparkSQLParser.scala:31)
at 
org.apache.spark.sql.SQLContext$$anonfun$parseSql$1.apply(SQLContext.scala:83)
at 
org.apache.spark.sql.SQLContext$$anonfun$parseSql$1.apply(SQLContext.scala:83)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.sql.SQLContext.parseSql(SQLContext.scala:83)
at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:303)
{noformat}

h5. HiveContext resolution failure
{noformat}
// pseudocode
val data = 0 until 100
val rdd = sc.parallelize(data)
val schema = StructType(StructField(Tricky, IntegerType, false) :: Nil)
val schemaRDD = sqlContext.applySchema(rdd.map(i = Row(i)), schema)
schemaRDD.registerAsTable(Tricky)
sqlContext.sql(sselect `$Tricky` from `$Tricky`).collect()

// the parse is ok in this case...
15/04/01 10:41:48 WARN HiveConf: DEPRECATED: hive.metastore.ds.retry.* no 
longer has any effect.  Use hive.hmshandler.retry.* instead
15/04/01 10:41:48 INFO ParseDriver: Parsing command: select `Tricky-中[x.][,/\n 
* ? é
$(x)   (':;#!^-Name` from `Tricky-中[x.][,/\n * ? é
$(x)   (':;#!^-Name`
15/04/01 10:41:48 INFO ParseDriver: Parse Completed

// but resolution fails
org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Unresolved 
attributes: 'Tricky-中[x.][,/\n * ? é
$(x)   (':;#!^-Name, tree:
'Project ['Tricky-中[x.][,/\n * ? é
$(x)   (':;#!^-Name]
 Subquery tricky-中[x.][,/\n * ? é
$(x)   (':;#!^-name
  LogicalRDD [Tricky-中[x.][,/\n * ? é
$(x)   (':;#!^-Name#2], MappedRDD[16] at map at console:30

at 

[jira] [Updated] (SPARK-5277) SparkSqlSerializer does not register user specified KryoRegistrators

2015-03-28 Thread Max Seiden (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-5277?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Max Seiden updated SPARK-5277:
--
Affects Version/s: (was: 1.2.0)
   1.2.1
   1.3.0

 SparkSqlSerializer does not register user specified KryoRegistrators 
 -

 Key: SPARK-5277
 URL: https://issues.apache.org/jira/browse/SPARK-5277
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 1.2.1, 1.3.0
Reporter: Max Seiden

 Although the SparkSqlSerializer class extends the KryoSerializer in core, 
 it's overridden newKryo() does not call super.newKryo(). This results in 
 inconsistent serializer behaviors depending on whether a KryoSerializer 
 instance or a SparkSqlSerializer instance is used. This may also be related 
 to the TODO in KryoResourcePool, which uses KryoSerializer instead of 
 SparkSqlSerializer due to yet-to-be-investigated test failures.
 An example of the divergence in behavior: The Exchange operator creates a new 
 SparkSqlSerializer instance (with an empty conf; another issue) when it is 
 constructed, whereas the GENERIC ColumnType pulls a KryoSerializer out of the 
 resource pool (see above). The result is that the serialized in-memory 
 columns are created using the user provided serializers / registrators, while 
 serialization during exchange does not.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-6012) Deadlock when asking for partitions from CoalescedRDD on top of a TakeOrdered operator

2015-02-25 Thread Max Seiden (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-6012?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Max Seiden updated SPARK-6012:
--
Summary: Deadlock when asking for partitions from CoalescedRDD on top of a 
TakeOrdered operator  (was: Deadlock when asking for SchemaRDD partitions with 
TakeOrdered operator)

 Deadlock when asking for partitions from CoalescedRDD on top of a TakeOrdered 
 operator
 --

 Key: SPARK-6012
 URL: https://issues.apache.org/jira/browse/SPARK-6012
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 1.2.1
Reporter: Max Seiden
Priority: Critical

 h3. Summary
 I've found that a deadlock occurs when asking for the partitions from a 
 SchemaRDD that has a TakeOrdered as its terminal operator. The problem occurs 
 when a child RDDs asks the DAGScheduler for preferred partition locations 
 (which locks the scheduler) and eventually hits the #execute() of the 
 TakeOrdered operator, which submits tasks but is blocked when it also tries 
 to get preferred locations (in a separate thread). It seems like the 
 TakeOrdered op's #execute() method should not actually submit a task (it is 
 calling #executeCollect() and creating a new RDD) and should instead stay 
 more true to the comment a logically apply a Limit on top of a Sort. 
 In my particular case, I am forcing a repartition of a SchemaRDD with a 
 terminal Limit(..., Sort(...)), which is where the CoalescedRDD comes into 
 play.
 h3. Stack Traces
 h4. Task Submission
 {noformat}
 main prio=5 tid=0x7f8e7280 nid=0x1303 in Object.wait() 
 [0x00010ed5e000]
java.lang.Thread.State: WAITING (on object monitor)
 at java.lang.Object.wait(Native Method)
 - waiting on 0x0007c4c239b8 (a 
 org.apache.spark.scheduler.JobWaiter)
 at java.lang.Object.wait(Object.java:503)
 at 
 org.apache.spark.scheduler.JobWaiter.awaitResult(JobWaiter.scala:73)
 - locked 0x0007c4c239b8 (a org.apache.spark.scheduler.JobWaiter)
 at 
 org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:514)
 at org.apache.spark.SparkContext.runJob(SparkContext.scala:1321)
 at org.apache.spark.SparkContext.runJob(SparkContext.scala:1390)
 at org.apache.spark.rdd.RDD.reduce(RDD.scala:884)
 at org.apache.spark.rdd.RDD.takeOrdered(RDD.scala:1161)
 at 
 org.apache.spark.sql.execution.TakeOrdered.executeCollect(basicOperators.scala:183)
 at 
 org.apache.spark.sql.execution.TakeOrdered.execute(basicOperators.scala:188)
 at 
 org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:425)
 - locked 0x0007c36ce038 (a 
 org.apache.spark.sql.hive.HiveContext$$anon$7)
 at 
 org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:425)
 at org.apache.spark.sql.SchemaRDD.getDependencies(SchemaRDD.scala:127)
 at 
 org.apache.spark.rdd.RDD$$anonfun$dependencies$2.apply(RDD.scala:209)
 at 
 org.apache.spark.rdd.RDD$$anonfun$dependencies$2.apply(RDD.scala:207)
 at scala.Option.getOrElse(Option.scala:120)
 at org.apache.spark.rdd.RDD.dependencies(RDD.scala:207)
 at org.apache.spark.rdd.RDD.firstParent(RDD.scala:1278)
 at org.apache.spark.sql.SchemaRDD.getPartitions(SchemaRDD.scala:122)
 at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:222)
 at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:220)
 at scala.Option.getOrElse(Option.scala:120)
 at org.apache.spark.rdd.RDD.partitions(RDD.scala:220)
 at 
 org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32)
 at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:222)
 at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:220)
 at scala.Option.getOrElse(Option.scala:120)
 at org.apache.spark.rdd.RDD.partitions(RDD.scala:220)
 at org.apache.spark.ShuffleDependency.init(Dependency.scala:79)
 at 
 org.apache.spark.rdd.ShuffledRDD.getDependencies(ShuffledRDD.scala:80)
 at 
 org.apache.spark.rdd.RDD$$anonfun$dependencies$2.apply(RDD.scala:209)
 at 
 org.apache.spark.rdd.RDD$$anonfun$dependencies$2.apply(RDD.scala:207)
 at scala.Option.getOrElse(Option.scala:120)
 at org.apache.spark.rdd.RDD.dependencies(RDD.scala:207)
 at 
 org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal(DAGScheduler.scala:1333)
 at 
 org.apache.spark.scheduler.DAGScheduler.getPreferredLocs(DAGScheduler.scala:1304)
 - locked 0x0007f55c2238 (a 
 org.apache.spark.scheduler.DAGScheduler)
 at 
 

[jira] [Updated] (SPARK-6012) Deadlock when asking for SchemaRDD partitions with TakeOrdered operator

2015-02-25 Thread Max Seiden (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-6012?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Max Seiden updated SPARK-6012:
--
Description: 
h3. Summary
I've found that a deadlock occurs when asking for the partitions from a 
SchemaRDD that has a TakeOrdered as its terminal operator. The problem occurs 
when a child RDDs asks the DAGScheduler for preferred partition locations 
(which locks the scheduler) and eventually hits the #execute() of the 
TakeOrdered operator, which submits tasks but is blocked when it also tries to 
get preferred locations (in a separate thread). It seems like the TakeOrdered 
op's #execute() method should not actually submit a task (it is calling 
#executeCollect() and creating a new RDD) and should instead stay more true to 
the comment a logically apply a Limit on top of a Sort. 

In my particular case, I am forcing a repartition of a SchemaRDD with a 
terminal Limit(..., Sort(...)), which is where the CoalescedRDD comes into play.

h3. Stack Traces
h4. Task Submission
{noformat}
main prio=5 tid=0x7f8e7280 nid=0x1303 in Object.wait() 
[0x00010ed5e000]
   java.lang.Thread.State: WAITING (on object monitor)
at java.lang.Object.wait(Native Method)
- waiting on 0x0007c4c239b8 (a 
org.apache.spark.scheduler.JobWaiter)
at java.lang.Object.wait(Object.java:503)
at org.apache.spark.scheduler.JobWaiter.awaitResult(JobWaiter.scala:73)
- locked 0x0007c4c239b8 (a org.apache.spark.scheduler.JobWaiter)
at 
org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:514)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1321)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1390)
at org.apache.spark.rdd.RDD.reduce(RDD.scala:884)
at org.apache.spark.rdd.RDD.takeOrdered(RDD.scala:1161)
at 
org.apache.spark.sql.execution.TakeOrdered.executeCollect(basicOperators.scala:183)
at 
org.apache.spark.sql.execution.TakeOrdered.execute(basicOperators.scala:188)
at 
org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:425)
- locked 0x0007c36ce038 (a 
org.apache.spark.sql.hive.HiveContext$$anon$7)
at 
org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:425)
at org.apache.spark.sql.SchemaRDD.getDependencies(SchemaRDD.scala:127)
at org.apache.spark.rdd.RDD$$anonfun$dependencies$2.apply(RDD.scala:209)
at org.apache.spark.rdd.RDD$$anonfun$dependencies$2.apply(RDD.scala:207)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.dependencies(RDD.scala:207)
at org.apache.spark.rdd.RDD.firstParent(RDD.scala:1278)
at org.apache.spark.sql.SchemaRDD.getPartitions(SchemaRDD.scala:122)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:222)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:220)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:220)
at 
org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:222)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:220)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:220)
at org.apache.spark.ShuffleDependency.init(Dependency.scala:79)
at 
org.apache.spark.rdd.ShuffledRDD.getDependencies(ShuffledRDD.scala:80)
at org.apache.spark.rdd.RDD$$anonfun$dependencies$2.apply(RDD.scala:209)
at org.apache.spark.rdd.RDD$$anonfun$dependencies$2.apply(RDD.scala:207)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.dependencies(RDD.scala:207)
at 
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal(DAGScheduler.scala:1333)
at 
org.apache.spark.scheduler.DAGScheduler.getPreferredLocs(DAGScheduler.scala:1304)
- locked 0x0007f55c2238 (a 
org.apache.spark.scheduler.DAGScheduler)
at 
org.apache.spark.SparkContext.getPreferredLocs(SparkContext.scala:1148)
at 
org.apache.spark.rdd.PartitionCoalescer.currPrefLocs(CoalescedRDD.scala:175)
at 
org.apache.spark.rdd.PartitionCoalescer$LocationIterator$$anonfun$5$$anonfun$apply$2.apply(CoalescedRDD.scala:192)
at 
org.apache.spark.rdd.PartitionCoalescer$LocationIterator$$anonfun$5$$anonfun$apply$2.apply(CoalescedRDD.scala:191)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:350)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:350)
at 

[jira] [Created] (SPARK-6012) Deadlock when asking for SchemaRDD partitions with TakeOrdered operator

2015-02-25 Thread Max Seiden (JIRA)
Max Seiden created SPARK-6012:
-

 Summary: Deadlock when asking for SchemaRDD partitions with 
TakeOrdered operator
 Key: SPARK-6012
 URL: https://issues.apache.org/jira/browse/SPARK-6012
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 1.2.1
Reporter: Max Seiden
Priority: Critical


h3. Summary
I've found that a deadlock occurs when asking for the partitions from a 
SchemaRDD that has a TakeOrdered as its terminal operator. The problem occurs 
when a child RDDs asks the DAGScheduler for preferred partition locations 
(which locks the scheduler) and eventually hits the #execute() of the 
TakeOrdered operator, which submits tasks but is blocked when it also tries to 
get preferred locations (in a separate thread). It seems like the TakeOrdered 
op's #execute() method should not actually submit a task (it is calling 
#executeCollect() and creating a new RDD) and should instead stay more true to 
the comment a logically apply a Limit on top of a Sort. 

h3. Stack Traces
h4. Task Submission
{noformat}
main prio=5 tid=0x7f8e7280 nid=0x1303 in Object.wait() 
[0x00010ed5e000]
   java.lang.Thread.State: WAITING (on object monitor)
at java.lang.Object.wait(Native Method)
- waiting on 0x0007c4c239b8 (a 
org.apache.spark.scheduler.JobWaiter)
at java.lang.Object.wait(Object.java:503)
at org.apache.spark.scheduler.JobWaiter.awaitResult(JobWaiter.scala:73)
- locked 0x0007c4c239b8 (a org.apache.spark.scheduler.JobWaiter)
at 
org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:514)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1321)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1390)
at org.apache.spark.rdd.RDD.reduce(RDD.scala:884)
at org.apache.spark.rdd.RDD.takeOrdered(RDD.scala:1161)
at 
org.apache.spark.sql.execution.TakeOrdered.executeCollect(basicOperators.scala:183)
at 
org.apache.spark.sql.execution.TakeOrdered.execute(basicOperators.scala:188)
at 
org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:425)
- locked 0x0007c36ce038 (a 
org.apache.spark.sql.hive.HiveContext$$anon$7)
at 
org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:425)
at org.apache.spark.sql.SchemaRDD.getDependencies(SchemaRDD.scala:127)
at org.apache.spark.rdd.RDD$$anonfun$dependencies$2.apply(RDD.scala:209)
at org.apache.spark.rdd.RDD$$anonfun$dependencies$2.apply(RDD.scala:207)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.dependencies(RDD.scala:207)
at org.apache.spark.rdd.RDD.firstParent(RDD.scala:1278)
at org.apache.spark.sql.SchemaRDD.getPartitions(SchemaRDD.scala:122)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:222)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:220)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:220)
at 
org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:222)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:220)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:220)
at org.apache.spark.ShuffleDependency.init(Dependency.scala:79)
at 
org.apache.spark.rdd.ShuffledRDD.getDependencies(ShuffledRDD.scala:80)
at org.apache.spark.rdd.RDD$$anonfun$dependencies$2.apply(RDD.scala:209)
at org.apache.spark.rdd.RDD$$anonfun$dependencies$2.apply(RDD.scala:207)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.dependencies(RDD.scala:207)
at 
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal(DAGScheduler.scala:1333)
at 
org.apache.spark.scheduler.DAGScheduler.getPreferredLocs(DAGScheduler.scala:1304)
- locked 0x0007f55c2238 (a 
org.apache.spark.scheduler.DAGScheduler)
at 
org.apache.spark.SparkContext.getPreferredLocs(SparkContext.scala:1148)
at 
org.apache.spark.rdd.PartitionCoalescer.currPrefLocs(CoalescedRDD.scala:175)
at 
org.apache.spark.rdd.PartitionCoalescer$LocationIterator$$anonfun$5$$anonfun$apply$2.apply(CoalescedRDD.scala:192)
at 
org.apache.spark.rdd.PartitionCoalescer$LocationIterator$$anonfun$5$$anonfun$apply$2.apply(CoalescedRDD.scala:191)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:350)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:350)
at 

[jira] [Updated] (SPARK-5277) SparkSqlSerializer does not register user specified KryoRegistrators

2015-01-15 Thread Max Seiden (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-5277?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Max Seiden updated SPARK-5277:
--
Remaining Estimate: (was: 24h)
 Original Estimate: (was: 24h)

 SparkSqlSerializer does not register user specified KryoRegistrators 
 -

 Key: SPARK-5277
 URL: https://issues.apache.org/jira/browse/SPARK-5277
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 1.2.0
Reporter: Max Seiden

 Although the SparkSqlSerializer class extends the KryoSerializer in core, 
 it's overridden newKryo() does not call super.newKryo(). This results in 
 inconsistent serializer behaviors depending on whether a KryoSerializer 
 instance or a SparkSqlSerializer instance is used. This may also be related 
 to the TODO in KryoResourcePool, which uses KryoSerializer instead of 
 SparkSqlSerializer due to yet-to-be-investigated test failures.
 An example of the divergence in behavior: The Exchange operator creates a new 
 SparkSqlSerializer instance (with an empty conf; another issue) when it is 
 constructed, whereas the GENERIC ColumnType pulls a KryoSerializer out of the 
 resource pool (see above). The result is that the serialized in-memory 
 columns are created using the user provided serializers / registrators, while 
 serialization during exchange does not.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-5277) SparkSqlSerializer does not register user specified KryoRegistrators

2015-01-15 Thread Max Seiden (JIRA)
Max Seiden created SPARK-5277:
-

 Summary: SparkSqlSerializer does not register user specified 
KryoRegistrators 
 Key: SPARK-5277
 URL: https://issues.apache.org/jira/browse/SPARK-5277
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 1.2.0
Reporter: Max Seiden


Although the SparkSqlSerializer class extends the KryoSerializer in core, it's 
overridden newKryo() does not call super.newKryo(). This results in 
inconsistent serializer behaviors depending on whether a KryoSerializer 
instance or a SparkSqlSerializer instance is used. This may also be related to 
the TODO in KryoResourcePool, which uses KryoSerializer instead of 
SparkSqlSerializer due to yet-to-be-investigated test failures.

An example of the divergence in behavior: The Exchange operator creates a new 
SparkSqlSerializer instance (with an empty conf; another issue) when it is 
constructed, whereas the GENERIC ColumnType pulls a KryoSerializer out of the 
resource pool (see above). The result is that the serialized in-memory columns 
are created using the user provided serializers / registrators, while 
serialization during exchange does not.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org