[jira] [Commented] (SPARK-12449) Pushing down arbitrary logical plans to data sources
[ https://issues.apache.org/jira/browse/SPARK-12449?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15151904#comment-15151904 ] Max Seiden commented on SPARK-12449: [~stephank85] Dropping the partitioned bit makes sense for a first shot. As-per the discussion above, it seems pretty useful to keep the ability to ask the source about pushdown support for plans and exprs. Also, one thing I liked in your CatalystSource model was the extra work you did to inject subqueries into the plan; getting a good design for a select over a single table / subquery seems like a reasonable starting point IMO. It would also be nice to strive for the property of rendering a sources.* plan into SQL, but that may be a bit of a stretch. :-) > Pushing down arbitrary logical plans to data sources > > > Key: SPARK-12449 > URL: https://issues.apache.org/jira/browse/SPARK-12449 > Project: Spark > Issue Type: Improvement > Components: SQL >Reporter: Stephan Kessler > Attachments: pushingDownLogicalPlans.pdf > > > With the help of the DataSource API we can pull data from external sources > for processing. Implementing interfaces such as {{PrunedFilteredScan}} allows > to push down filters and projects pruning unnecessary fields and rows > directly in the data source. > However, data sources such as SQL Engines are capable of doing even more > preprocessing, e.g., evaluating aggregates. This is beneficial because it > would reduce the amount of data transferred from the source to Spark. The > existing interfaces do not allow such kind of processing in the source. > We would propose to add a new interface {{CatalystSource}} that allows to > defer the processing of arbitrary logical plans to the data source. We have > already shown the details at the Spark Summit 2015 Europe > [https://spark-summit.org/eu-2015/events/the-pushdown-of-everything/] > I will add a design document explaining details. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-12449) Pushing down arbitrary logical plans to data sources
[ https://issues.apache.org/jira/browse/SPARK-12449?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15151880#comment-15151880 ] Max Seiden commented on SPARK-12449: Yea, that seems to be the case. There's code in the DataSourceStrategy that specifically resolves aliases, but the filtered scan case is pretty narrow relative to an expression tree. +1 for a generic way to avoid double execution of operations. On the flip, a boolean check would drop a neat property of "unhandledFilters" which is that it can accept a subset of what the planner tries to push down. > Pushing down arbitrary logical plans to data sources > > > Key: SPARK-12449 > URL: https://issues.apache.org/jira/browse/SPARK-12449 > Project: Spark > Issue Type: Improvement > Components: SQL >Reporter: Stephan Kessler > Attachments: pushingDownLogicalPlans.pdf > > > With the help of the DataSource API we can pull data from external sources > for processing. Implementing interfaces such as {{PrunedFilteredScan}} allows > to push down filters and projects pruning unnecessary fields and rows > directly in the data source. > However, data sources such as SQL Engines are capable of doing even more > preprocessing, e.g., evaluating aggregates. This is beneficial because it > would reduce the amount of data transferred from the source to Spark. The > existing interfaces do not allow such kind of processing in the source. > We would propose to add a new interface {{CatalystSource}} that allows to > defer the processing of arbitrary logical plans to the data source. We have > already shown the details at the Spark Summit 2015 Europe > [https://spark-summit.org/eu-2015/events/the-pushdown-of-everything/] > I will add a design document explaining details. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-12449) Pushing down arbitrary logical plans to data sources
[ https://issues.apache.org/jira/browse/SPARK-12449?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15151368#comment-15151368 ] Max Seiden commented on SPARK-12449: Very interested in checking out that PR! It would be prudent to have a holistic high-level design for any work here too, mostly to answer a few major questions. A random sample of such Qs: + Should there be a new trait for each new `sources.*` type, or a single trait that communicates capabilities to the planner (i.e. the CatalystSource design)? a) a new trait for each source could get unwieldy given the potential # of permutations b) a single, generic trait is powerful, but it puts a lot of burden on the implementer to cover more cases than they may want + Depending on the above, should source plans be a tree of operators or a list of operators to be applied in-order? a) the first option is more natural, but is smells a lot like catalyst -- not a bad thing if it's a separate, stable API though + the more that's pushed down via sources.Expressions, the more complex things may get for implementers a) for example, if Aliases are pushed down, there's a lot more opportunity for resolution bugs in the source impl b) a definitive stance would be needed for exprs like UDFs or those dealing with complex types c) without a way to signal capabilities (implicitly or explicitly) to the planner, there'd likely need to be a way to "bail out" > Pushing down arbitrary logical plans to data sources > > > Key: SPARK-12449 > URL: https://issues.apache.org/jira/browse/SPARK-12449 > Project: Spark > Issue Type: Improvement > Components: SQL >Reporter: Stephan Kessler > Attachments: pushingDownLogicalPlans.pdf > > > With the help of the DataSource API we can pull data from external sources > for processing. Implementing interfaces such as {{PrunedFilteredScan}} allows > to push down filters and projects pruning unnecessary fields and rows > directly in the data source. > However, data sources such as SQL Engines are capable of doing even more > preprocessing, e.g., evaluating aggregates. This is beneficial because it > would reduce the amount of data transferred from the source to Spark. The > existing interfaces do not allow such kind of processing in the source. > We would propose to add a new interface {{CatalystSource}} that allows to > defer the processing of arbitrary logical plans to the data source. We have > already shown the details at the Spark Summit 2015 Europe > [https://spark-summit.org/eu-2015/events/the-pushdown-of-everything/] > I will add a design document explaining details. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-12449) Pushing down arbitrary logical plans to data sources
[ https://issues.apache.org/jira/browse/SPARK-12449?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15151116#comment-15151116 ] Max Seiden commented on SPARK-12449: [~rxin] Given that predicate pushdown via `sources.Filter` is (afaik) a stable API, conceivably that model could be extended to support ever richer operations (i.e. sources.Expression, sources.Limit, sources.Join, sources.Aggregation). In this case, the stable APIs remain a derivative of the Catalyst plans and all that needs to change between releases is the compilation from Catalyst => Sources. cc [~marmbrus] since we talked briefly about this idea in person at Spark Summit > Pushing down arbitrary logical plans to data sources > > > Key: SPARK-12449 > URL: https://issues.apache.org/jira/browse/SPARK-12449 > Project: Spark > Issue Type: Improvement > Components: SQL >Reporter: Stephan Kessler > Attachments: pushingDownLogicalPlans.pdf > > > With the help of the DataSource API we can pull data from external sources > for processing. Implementing interfaces such as {{PrunedFilteredScan}} allows > to push down filters and projects pruning unnecessary fields and rows > directly in the data source. > However, data sources such as SQL Engines are capable of doing even more > preprocessing, e.g., evaluating aggregates. This is beneficial because it > would reduce the amount of data transferred from the source to Spark. The > existing interfaces do not allow such kind of processing in the source. > We would propose to add a new interface {{CatalystSource}} that allows to > defer the processing of arbitrary logical plans to the data source. We have > already shown the details at the Spark Summit 2015 Europe > [https://spark-summit.org/eu-2015/events/the-pushdown-of-everything/] > I will add a design document explaining details. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-7629) Unroll AggregateFunction update in the case of a single input expression
Max Seiden created SPARK-7629: - Summary: Unroll AggregateFunction update in the case of a single input expression Key: SPARK-7629 URL: https://issues.apache.org/jira/browse/SPARK-7629 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 1.2.1 Reporter: Max Seiden Priority: Minor h3. Summary This was observed on 1.2.1. Many of the AggregateFunctions take a list of Expressions as input, wrap them up in an InterpretedProjection, and evaluate the InterpretedProjection to get the arguments for the actual aggregate expression. In the case where there is only a single input expression however, this leads to gross inefficiencies. Take CountDistinctFunction as an example, and assume it has a single input expression of type String. In this case spark uses an OpenHashSet[Any] to collect a distinct set of GenericRow(Array[Any](string)). This is hugely inefficient, since every String object must be wrapped up in first an Array and second a GenericRow, and then inserted into the OpenHashSet where all hashing and equality comparisons happen on the GenericRow. This means that any single OpenHashSet entry has unnecessary overhead from the GenericRow and Array[Any], and all hashcode and equality operations must go through the Row. In the case of hashcode, this means that every invocation requires a while loop and pattern match. In the case of equality, Seq[Any].equals is used, which requires (for both the candidate and existing objects) a pattern match, call to canEqual, and call to sameElements - the last of these (in IterableLike as far as I can tell) constructs a Scala Iterator over the Array[Any] and does element-by-element comparisons. Needless to say, this requires far too many cycles in the case of a single input Expression, has a high and unnecessary memory overhead, and generates a ton of garbage. To give a concrete example, I am unable to compute a grand distinct on an input of 15M unique IP addresses in a 2GB JVM. After a few seconds of running, I start to get substantial GCs, and eventually get into a state where the JVM is stuck doing full GCs. Additionally a profile of the executor thread shows that 85% of the time is spent rehashing (perhaps explainable at that scale), but that 65% of the time is spent in GenericRow.hashCode. My proposed improvement is to unroll those aggregate functions in the case of a single input expression so that the inefficiencies described above can be avoided altogether. The only tricky bits here are dealing with NULLs (Row does that for us) and efficiently handling both the single input and multi-input cases within the same aggregate function impl. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-6652) SQLContext and HiveContext do not handle tricky names well
Max Seiden created SPARK-6652: - Summary: SQLContext and HiveContext do not handle tricky names well Key: SPARK-6652 URL: https://issues.apache.org/jira/browse/SPARK-6652 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 1.2.1 Reporter: Max Seiden h3. Summary There are cases where both the SQLContext and HiveContext fail when handling tricky names (containing UTF-8, tabs, newlines, etc) well. For example, the following string: {noformat} val tricky = Tricky-\u4E2D[x.][\,/\\n * ? é\n$(x)\t(':;#!^-Name {noformat} causes the following exceptions during parsing and resolution (respectively). h5. SQLContext parse failure {noformat} // pseudocode val data = 0 until 100 val rdd = sc.parallelize(data) val schema = StructType(StructField(Tricky, IntegerType, false) :: Nil) val schemaRDD = sqlContext.applySchema(rdd.map(i = Row(i)), schema) schemaRDD.registerAsTable(Tricky) sqlContext.sql(sselect `$Tricky` from `$Tricky`) java.lang.RuntimeException: [1.33] failure: ``UNION'' expected but ErrorToken(``' expected but found) found select `Tricky-中[x.][,/\n * ? é ^ at scala.sys.package$.error(package.scala:27) at org.apache.spark.sql.catalyst.AbstractSparkSQLParser.apply(SparkSQLParser.scala:33) at org.apache.spark.sql.SQLContext$$anonfun$1.apply(SQLContext.scala:79) at org.apache.spark.sql.SQLContext$$anonfun$1.apply(SQLContext.scala:79) at org.apache.spark.sql.catalyst.SparkSQLParser$$anonfun$org$apache$spark$sql$catalyst$SparkSQLParser$$others$1.apply(SparkSQLParser.scala:174) at org.apache.spark.sql.catalyst.SparkSQLParser$$anonfun$org$apache$spark$sql$catalyst$SparkSQLParser$$others$1.apply(SparkSQLParser.scala:173) at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:136) at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:135) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242) at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1$$anonfun$apply$2.apply(Parsers.scala:254) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1$$anonfun$apply$2.apply(Parsers.scala:254) at scala.util.parsing.combinator.Parsers$Failure.append(Parsers.scala:202) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254) at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) at scala.util.parsing.combinator.Parsers$$anon$2$$anonfun$apply$14.apply(Parsers.scala:891) at scala.util.parsing.combinator.Parsers$$anon$2$$anonfun$apply$14.apply(Parsers.scala:891) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) at scala.util.parsing.combinator.Parsers$$anon$2.apply(Parsers.scala:890) at scala.util.parsing.combinator.PackratParsers$$anon$1.apply(PackratParsers.scala:110) at org.apache.spark.sql.catalyst.AbstractSparkSQLParser.apply(SparkSQLParser.scala:31) at org.apache.spark.sql.SQLContext$$anonfun$parseSql$1.apply(SQLContext.scala:83) at org.apache.spark.sql.SQLContext$$anonfun$parseSql$1.apply(SQLContext.scala:83) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.sql.SQLContext.parseSql(SQLContext.scala:83) at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:303) {noformat} h5. HiveContext resolution failure {noformat} // pseudocode val data = 0 until 100 val rdd = sc.parallelize(data) val schema = StructType(StructField(Tricky, IntegerType, false) :: Nil) val schemaRDD = sqlContext.applySchema(rdd.map(i = Row(i)), schema) schemaRDD.registerAsTable(Tricky) sqlContext.sql(sselect `$Tricky` from `$Tricky`).collect() // the parse is ok in this case... 15/04/01 10:41:48 WARN HiveConf: DEPRECATED: hive.metastore.ds.retry.* no longer has any effect. Use hive.hmshandler.retry.* instead 15/04/01 10:41:48 INFO ParseDriver: Parsing command: select `Tricky-中[x.][,/\n * ? é $(x) (':;#!^-Name` from `Tricky-中[x.][,/\n * ? é $(x) (':;#!^-Name` 15/04/01 10:41:48 INFO ParseDriver: Parse Completed // but resolution fails org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Unresolved attributes: 'Tricky-中[x.][,/\n * ? é $(x) (':;#!^-Name, tree: 'Project ['Tricky-中[x.][,/\n * ? é $(x) (':;#!^-Name] Subquery tricky-中[x.][,/\n * ? é $(x) (':;#!^-name LogicalRDD [Tricky-中[x.][,/\n * ? é $(x) (':;#!^-Name#2], MappedRDD[16] at map at console:30 at
[jira] [Updated] (SPARK-5277) SparkSqlSerializer does not register user specified KryoRegistrators
[ https://issues.apache.org/jira/browse/SPARK-5277?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Max Seiden updated SPARK-5277: -- Affects Version/s: (was: 1.2.0) 1.2.1 1.3.0 SparkSqlSerializer does not register user specified KryoRegistrators - Key: SPARK-5277 URL: https://issues.apache.org/jira/browse/SPARK-5277 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 1.2.1, 1.3.0 Reporter: Max Seiden Although the SparkSqlSerializer class extends the KryoSerializer in core, it's overridden newKryo() does not call super.newKryo(). This results in inconsistent serializer behaviors depending on whether a KryoSerializer instance or a SparkSqlSerializer instance is used. This may also be related to the TODO in KryoResourcePool, which uses KryoSerializer instead of SparkSqlSerializer due to yet-to-be-investigated test failures. An example of the divergence in behavior: The Exchange operator creates a new SparkSqlSerializer instance (with an empty conf; another issue) when it is constructed, whereas the GENERIC ColumnType pulls a KryoSerializer out of the resource pool (see above). The result is that the serialized in-memory columns are created using the user provided serializers / registrators, while serialization during exchange does not. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-6012) Deadlock when asking for partitions from CoalescedRDD on top of a TakeOrdered operator
[ https://issues.apache.org/jira/browse/SPARK-6012?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Max Seiden updated SPARK-6012: -- Summary: Deadlock when asking for partitions from CoalescedRDD on top of a TakeOrdered operator (was: Deadlock when asking for SchemaRDD partitions with TakeOrdered operator) Deadlock when asking for partitions from CoalescedRDD on top of a TakeOrdered operator -- Key: SPARK-6012 URL: https://issues.apache.org/jira/browse/SPARK-6012 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 1.2.1 Reporter: Max Seiden Priority: Critical h3. Summary I've found that a deadlock occurs when asking for the partitions from a SchemaRDD that has a TakeOrdered as its terminal operator. The problem occurs when a child RDDs asks the DAGScheduler for preferred partition locations (which locks the scheduler) and eventually hits the #execute() of the TakeOrdered operator, which submits tasks but is blocked when it also tries to get preferred locations (in a separate thread). It seems like the TakeOrdered op's #execute() method should not actually submit a task (it is calling #executeCollect() and creating a new RDD) and should instead stay more true to the comment a logically apply a Limit on top of a Sort. In my particular case, I am forcing a repartition of a SchemaRDD with a terminal Limit(..., Sort(...)), which is where the CoalescedRDD comes into play. h3. Stack Traces h4. Task Submission {noformat} main prio=5 tid=0x7f8e7280 nid=0x1303 in Object.wait() [0x00010ed5e000] java.lang.Thread.State: WAITING (on object monitor) at java.lang.Object.wait(Native Method) - waiting on 0x0007c4c239b8 (a org.apache.spark.scheduler.JobWaiter) at java.lang.Object.wait(Object.java:503) at org.apache.spark.scheduler.JobWaiter.awaitResult(JobWaiter.scala:73) - locked 0x0007c4c239b8 (a org.apache.spark.scheduler.JobWaiter) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:514) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1321) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1390) at org.apache.spark.rdd.RDD.reduce(RDD.scala:884) at org.apache.spark.rdd.RDD.takeOrdered(RDD.scala:1161) at org.apache.spark.sql.execution.TakeOrdered.executeCollect(basicOperators.scala:183) at org.apache.spark.sql.execution.TakeOrdered.execute(basicOperators.scala:188) at org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:425) - locked 0x0007c36ce038 (a org.apache.spark.sql.hive.HiveContext$$anon$7) at org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:425) at org.apache.spark.sql.SchemaRDD.getDependencies(SchemaRDD.scala:127) at org.apache.spark.rdd.RDD$$anonfun$dependencies$2.apply(RDD.scala:209) at org.apache.spark.rdd.RDD$$anonfun$dependencies$2.apply(RDD.scala:207) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.dependencies(RDD.scala:207) at org.apache.spark.rdd.RDD.firstParent(RDD.scala:1278) at org.apache.spark.sql.SchemaRDD.getPartitions(SchemaRDD.scala:122) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:222) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:220) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:220) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:222) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:220) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:220) at org.apache.spark.ShuffleDependency.init(Dependency.scala:79) at org.apache.spark.rdd.ShuffledRDD.getDependencies(ShuffledRDD.scala:80) at org.apache.spark.rdd.RDD$$anonfun$dependencies$2.apply(RDD.scala:209) at org.apache.spark.rdd.RDD$$anonfun$dependencies$2.apply(RDD.scala:207) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.dependencies(RDD.scala:207) at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal(DAGScheduler.scala:1333) at org.apache.spark.scheduler.DAGScheduler.getPreferredLocs(DAGScheduler.scala:1304) - locked 0x0007f55c2238 (a org.apache.spark.scheduler.DAGScheduler) at
[jira] [Updated] (SPARK-6012) Deadlock when asking for SchemaRDD partitions with TakeOrdered operator
[ https://issues.apache.org/jira/browse/SPARK-6012?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Max Seiden updated SPARK-6012: -- Description: h3. Summary I've found that a deadlock occurs when asking for the partitions from a SchemaRDD that has a TakeOrdered as its terminal operator. The problem occurs when a child RDDs asks the DAGScheduler for preferred partition locations (which locks the scheduler) and eventually hits the #execute() of the TakeOrdered operator, which submits tasks but is blocked when it also tries to get preferred locations (in a separate thread). It seems like the TakeOrdered op's #execute() method should not actually submit a task (it is calling #executeCollect() and creating a new RDD) and should instead stay more true to the comment a logically apply a Limit on top of a Sort. In my particular case, I am forcing a repartition of a SchemaRDD with a terminal Limit(..., Sort(...)), which is where the CoalescedRDD comes into play. h3. Stack Traces h4. Task Submission {noformat} main prio=5 tid=0x7f8e7280 nid=0x1303 in Object.wait() [0x00010ed5e000] java.lang.Thread.State: WAITING (on object monitor) at java.lang.Object.wait(Native Method) - waiting on 0x0007c4c239b8 (a org.apache.spark.scheduler.JobWaiter) at java.lang.Object.wait(Object.java:503) at org.apache.spark.scheduler.JobWaiter.awaitResult(JobWaiter.scala:73) - locked 0x0007c4c239b8 (a org.apache.spark.scheduler.JobWaiter) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:514) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1321) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1390) at org.apache.spark.rdd.RDD.reduce(RDD.scala:884) at org.apache.spark.rdd.RDD.takeOrdered(RDD.scala:1161) at org.apache.spark.sql.execution.TakeOrdered.executeCollect(basicOperators.scala:183) at org.apache.spark.sql.execution.TakeOrdered.execute(basicOperators.scala:188) at org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:425) - locked 0x0007c36ce038 (a org.apache.spark.sql.hive.HiveContext$$anon$7) at org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:425) at org.apache.spark.sql.SchemaRDD.getDependencies(SchemaRDD.scala:127) at org.apache.spark.rdd.RDD$$anonfun$dependencies$2.apply(RDD.scala:209) at org.apache.spark.rdd.RDD$$anonfun$dependencies$2.apply(RDD.scala:207) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.dependencies(RDD.scala:207) at org.apache.spark.rdd.RDD.firstParent(RDD.scala:1278) at org.apache.spark.sql.SchemaRDD.getPartitions(SchemaRDD.scala:122) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:222) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:220) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:220) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:222) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:220) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:220) at org.apache.spark.ShuffleDependency.init(Dependency.scala:79) at org.apache.spark.rdd.ShuffledRDD.getDependencies(ShuffledRDD.scala:80) at org.apache.spark.rdd.RDD$$anonfun$dependencies$2.apply(RDD.scala:209) at org.apache.spark.rdd.RDD$$anonfun$dependencies$2.apply(RDD.scala:207) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.dependencies(RDD.scala:207) at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal(DAGScheduler.scala:1333) at org.apache.spark.scheduler.DAGScheduler.getPreferredLocs(DAGScheduler.scala:1304) - locked 0x0007f55c2238 (a org.apache.spark.scheduler.DAGScheduler) at org.apache.spark.SparkContext.getPreferredLocs(SparkContext.scala:1148) at org.apache.spark.rdd.PartitionCoalescer.currPrefLocs(CoalescedRDD.scala:175) at org.apache.spark.rdd.PartitionCoalescer$LocationIterator$$anonfun$5$$anonfun$apply$2.apply(CoalescedRDD.scala:192) at org.apache.spark.rdd.PartitionCoalescer$LocationIterator$$anonfun$5$$anonfun$apply$2.apply(CoalescedRDD.scala:191) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:350) at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:350) at
[jira] [Created] (SPARK-6012) Deadlock when asking for SchemaRDD partitions with TakeOrdered operator
Max Seiden created SPARK-6012: - Summary: Deadlock when asking for SchemaRDD partitions with TakeOrdered operator Key: SPARK-6012 URL: https://issues.apache.org/jira/browse/SPARK-6012 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 1.2.1 Reporter: Max Seiden Priority: Critical h3. Summary I've found that a deadlock occurs when asking for the partitions from a SchemaRDD that has a TakeOrdered as its terminal operator. The problem occurs when a child RDDs asks the DAGScheduler for preferred partition locations (which locks the scheduler) and eventually hits the #execute() of the TakeOrdered operator, which submits tasks but is blocked when it also tries to get preferred locations (in a separate thread). It seems like the TakeOrdered op's #execute() method should not actually submit a task (it is calling #executeCollect() and creating a new RDD) and should instead stay more true to the comment a logically apply a Limit on top of a Sort. h3. Stack Traces h4. Task Submission {noformat} main prio=5 tid=0x7f8e7280 nid=0x1303 in Object.wait() [0x00010ed5e000] java.lang.Thread.State: WAITING (on object monitor) at java.lang.Object.wait(Native Method) - waiting on 0x0007c4c239b8 (a org.apache.spark.scheduler.JobWaiter) at java.lang.Object.wait(Object.java:503) at org.apache.spark.scheduler.JobWaiter.awaitResult(JobWaiter.scala:73) - locked 0x0007c4c239b8 (a org.apache.spark.scheduler.JobWaiter) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:514) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1321) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1390) at org.apache.spark.rdd.RDD.reduce(RDD.scala:884) at org.apache.spark.rdd.RDD.takeOrdered(RDD.scala:1161) at org.apache.spark.sql.execution.TakeOrdered.executeCollect(basicOperators.scala:183) at org.apache.spark.sql.execution.TakeOrdered.execute(basicOperators.scala:188) at org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:425) - locked 0x0007c36ce038 (a org.apache.spark.sql.hive.HiveContext$$anon$7) at org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:425) at org.apache.spark.sql.SchemaRDD.getDependencies(SchemaRDD.scala:127) at org.apache.spark.rdd.RDD$$anonfun$dependencies$2.apply(RDD.scala:209) at org.apache.spark.rdd.RDD$$anonfun$dependencies$2.apply(RDD.scala:207) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.dependencies(RDD.scala:207) at org.apache.spark.rdd.RDD.firstParent(RDD.scala:1278) at org.apache.spark.sql.SchemaRDD.getPartitions(SchemaRDD.scala:122) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:222) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:220) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:220) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:222) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:220) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:220) at org.apache.spark.ShuffleDependency.init(Dependency.scala:79) at org.apache.spark.rdd.ShuffledRDD.getDependencies(ShuffledRDD.scala:80) at org.apache.spark.rdd.RDD$$anonfun$dependencies$2.apply(RDD.scala:209) at org.apache.spark.rdd.RDD$$anonfun$dependencies$2.apply(RDD.scala:207) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.dependencies(RDD.scala:207) at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal(DAGScheduler.scala:1333) at org.apache.spark.scheduler.DAGScheduler.getPreferredLocs(DAGScheduler.scala:1304) - locked 0x0007f55c2238 (a org.apache.spark.scheduler.DAGScheduler) at org.apache.spark.SparkContext.getPreferredLocs(SparkContext.scala:1148) at org.apache.spark.rdd.PartitionCoalescer.currPrefLocs(CoalescedRDD.scala:175) at org.apache.spark.rdd.PartitionCoalescer$LocationIterator$$anonfun$5$$anonfun$apply$2.apply(CoalescedRDD.scala:192) at org.apache.spark.rdd.PartitionCoalescer$LocationIterator$$anonfun$5$$anonfun$apply$2.apply(CoalescedRDD.scala:191) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:350) at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:350) at
[jira] [Updated] (SPARK-5277) SparkSqlSerializer does not register user specified KryoRegistrators
[ https://issues.apache.org/jira/browse/SPARK-5277?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Max Seiden updated SPARK-5277: -- Remaining Estimate: (was: 24h) Original Estimate: (was: 24h) SparkSqlSerializer does not register user specified KryoRegistrators - Key: SPARK-5277 URL: https://issues.apache.org/jira/browse/SPARK-5277 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 1.2.0 Reporter: Max Seiden Although the SparkSqlSerializer class extends the KryoSerializer in core, it's overridden newKryo() does not call super.newKryo(). This results in inconsistent serializer behaviors depending on whether a KryoSerializer instance or a SparkSqlSerializer instance is used. This may also be related to the TODO in KryoResourcePool, which uses KryoSerializer instead of SparkSqlSerializer due to yet-to-be-investigated test failures. An example of the divergence in behavior: The Exchange operator creates a new SparkSqlSerializer instance (with an empty conf; another issue) when it is constructed, whereas the GENERIC ColumnType pulls a KryoSerializer out of the resource pool (see above). The result is that the serialized in-memory columns are created using the user provided serializers / registrators, while serialization during exchange does not. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-5277) SparkSqlSerializer does not register user specified KryoRegistrators
Max Seiden created SPARK-5277: - Summary: SparkSqlSerializer does not register user specified KryoRegistrators Key: SPARK-5277 URL: https://issues.apache.org/jira/browse/SPARK-5277 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 1.2.0 Reporter: Max Seiden Although the SparkSqlSerializer class extends the KryoSerializer in core, it's overridden newKryo() does not call super.newKryo(). This results in inconsistent serializer behaviors depending on whether a KryoSerializer instance or a SparkSqlSerializer instance is used. This may also be related to the TODO in KryoResourcePool, which uses KryoSerializer instead of SparkSqlSerializer due to yet-to-be-investigated test failures. An example of the divergence in behavior: The Exchange operator creates a new SparkSqlSerializer instance (with an empty conf; another issue) when it is constructed, whereas the GENERIC ColumnType pulls a KryoSerializer out of the resource pool (see above). The result is that the serialized in-memory columns are created using the user provided serializers / registrators, while serialization during exchange does not. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org