Re: parquet predicate / projection pushdown into unionAll

2014-10-01 Thread DB Tsai
Hi Cody and Michael,

We ran into the same issue. Each day of data we have is stored into
one parquet, and we want to query it against monthly parquet data. The
data for each data is around 600GB, and we use 300 executors with 8GB
memory for each executor. Without the patch, it took forever, and
crashed in the end.

With patch, for table obtained by unionAll with 5 parquets (around
3TB), it takes 199.126 seconds to execute a simple HIVE query, while
it takes 64.36 seconds for individual parquet file. Great work, this
patch solves the issue. Hopefully to see this in next 1.1 series.

Thanks.

Sincerely,

DB Tsai
---
My Blog: https://www.dbtsai.com
LinkedIn: https://www.linkedin.com/in/dbtsai


On Fri, Sep 12, 2014 at 9:07 PM, Michael Armbrust
 wrote:
> Yeah, thanks for implementing it!
>
> Since Spark SQL is an alpha component and moving quickly the plan is to
> backport all of master into the next point release in the 1.1 series.
>
> On Fri, Sep 12, 2014 at 9:27 AM, Cody Koeninger  wrote:
>
>> Cool, thanks for your help on this.  Any chance of adding it to the 1.1.1
>> point release, assuming there ends up being one?
>>
>> On Wed, Sep 10, 2014 at 11:39 AM, Michael Armbrust > > wrote:
>>
>>> Hey Cody,
>>>
>>> Thanks for doing this!  Will look at your PR later today.
>>>
>>> Michael
>>>
>>> On Wed, Sep 10, 2014 at 9:31 AM, Cody Koeninger 
>>> wrote:
>>>
 Tested the patch against a cluster with some real data.  Initial results
 seem like going from one table to a union of 2 tables is now closer to a
 doubling of query time as expected, instead of 5 to 10x.

 Let me know if you see any issues with that PR.

 On Wed, Sep 10, 2014 at 8:19 AM, Cody Koeninger 
 wrote:

> So the obvious thing I was missing is that the analyzer has already
> resolved attributes by the time the optimizer runs, so the references in
> the filter / projection need to be fixed up to match the children.
>
> Created a PR, let me know if there's a better way to do it.  I'll see
> about testing performance against some actual data sets.
>
> On Tue, Sep 9, 2014 at 6:09 PM, Cody Koeninger 
> wrote:
>
>> Ok, so looking at the optimizer code for the first time and trying the
>> simplest rule that could possibly work,
>>
>> object UnionPushdown extends Rule[LogicalPlan] {
>>   def apply(plan: LogicalPlan): LogicalPlan = plan transform {
>> // Push down filter into
>> union
>> case f @ Filter(condition, u @ Union(left, right)) =>
>>
>>   u.copy(left = f.copy(child = left), right = f.copy(child =
>> right))
>>
>>
>> // Push down projection into
>> union
>> case p @ Project(projectList, u @ Union(left, right)) =>
>>   u.copy(left = p.copy(child = left), right = p.copy(child =
>> right))
>>
>> }
>>
>> }
>>
>>
>> If I try manually applying that rule to a logical plan in the repl, it
>> produces the query shape I'd expect, and executing that plan results in
>> parquet pushdowns as I'd expect.
>>
>> But adding those cases to ColumnPruning results in a runtime exception
>> (below)
>>
>> I can keep digging, but it seems like I'm missing some obvious initial
>> context around naming of attributes.  If you can provide any pointers to
>> speed me on my way I'd appreciate it.
>>
>>
>> java.lang.AssertionError: assertion failed: ArrayBuffer() +
>> ArrayBuffer() != WrappedArray(name#6, age#7), List(name#9, age#10,
>> phones#11)
>> at scala.Predef$.assert(Predef.scala:179)
>> at
>> org.apache.spark.sql.parquet.ParquetTableScan.(ParquetTableOperations.scala:75)
>> at
>> org.apache.spark.sql.execution.SparkStrategies$ParquetOperations$$anonfun$9.apply(SparkStrategies.scala:234)
>> at
>> org.apache.spark.sql.execution.SparkStrategies$ParquetOperations$$anonfun$9.apply(SparkStrategies.scala:234)
>> at
>> org.apache.spark.sql.SQLContext$SparkPlanner.pruneFilterProject(SQLContext.scala:367)
>> at
>> org.apache.spark.sql.execution.SparkStrategies$ParquetOperations$.apply(SparkStrategies.scala:230)
>> at
>> org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
>> at
>> org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
>> at
>> scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
>> at
>> org.apache.spark.sql.catalyst.planning.QueryPlanner.apply(QueryPlanner.scala:59)
>> at
>> org.apache.spark.sql.catalyst.planning.QueryPlanner.planLater(QueryPlanner.scala:54)
>> at
>> org.apache.spark.sql.execution.SparkStrategies$BasicOperators$$anonfun$12.apply(SparkStrategies.scala:282)
>> at

Re: parquet predicate / projection pushdown into unionAll

2014-09-12 Thread Michael Armbrust
Yeah, thanks for implementing it!

Since Spark SQL is an alpha component and moving quickly the plan is to
backport all of master into the next point release in the 1.1 series.

On Fri, Sep 12, 2014 at 9:27 AM, Cody Koeninger  wrote:

> Cool, thanks for your help on this.  Any chance of adding it to the 1.1.1
> point release, assuming there ends up being one?
>
> On Wed, Sep 10, 2014 at 11:39 AM, Michael Armbrust  > wrote:
>
>> Hey Cody,
>>
>> Thanks for doing this!  Will look at your PR later today.
>>
>> Michael
>>
>> On Wed, Sep 10, 2014 at 9:31 AM, Cody Koeninger 
>> wrote:
>>
>>> Tested the patch against a cluster with some real data.  Initial results
>>> seem like going from one table to a union of 2 tables is now closer to a
>>> doubling of query time as expected, instead of 5 to 10x.
>>>
>>> Let me know if you see any issues with that PR.
>>>
>>> On Wed, Sep 10, 2014 at 8:19 AM, Cody Koeninger 
>>> wrote:
>>>
 So the obvious thing I was missing is that the analyzer has already
 resolved attributes by the time the optimizer runs, so the references in
 the filter / projection need to be fixed up to match the children.

 Created a PR, let me know if there's a better way to do it.  I'll see
 about testing performance against some actual data sets.

 On Tue, Sep 9, 2014 at 6:09 PM, Cody Koeninger 
 wrote:

> Ok, so looking at the optimizer code for the first time and trying the
> simplest rule that could possibly work,
>
> object UnionPushdown extends Rule[LogicalPlan] {
>   def apply(plan: LogicalPlan): LogicalPlan = plan transform {
> // Push down filter into
> union
> case f @ Filter(condition, u @ Union(left, right)) =>
>
>   u.copy(left = f.copy(child = left), right = f.copy(child =
> right))
>
>
> // Push down projection into
> union
> case p @ Project(projectList, u @ Union(left, right)) =>
>   u.copy(left = p.copy(child = left), right = p.copy(child =
> right))
>
> }
>
> }
>
>
> If I try manually applying that rule to a logical plan in the repl, it
> produces the query shape I'd expect, and executing that plan results in
> parquet pushdowns as I'd expect.
>
> But adding those cases to ColumnPruning results in a runtime exception
> (below)
>
> I can keep digging, but it seems like I'm missing some obvious initial
> context around naming of attributes.  If you can provide any pointers to
> speed me on my way I'd appreciate it.
>
>
> java.lang.AssertionError: assertion failed: ArrayBuffer() +
> ArrayBuffer() != WrappedArray(name#6, age#7), List(name#9, age#10,
> phones#11)
> at scala.Predef$.assert(Predef.scala:179)
> at
> org.apache.spark.sql.parquet.ParquetTableScan.(ParquetTableOperations.scala:75)
> at
> org.apache.spark.sql.execution.SparkStrategies$ParquetOperations$$anonfun$9.apply(SparkStrategies.scala:234)
> at
> org.apache.spark.sql.execution.SparkStrategies$ParquetOperations$$anonfun$9.apply(SparkStrategies.scala:234)
> at
> org.apache.spark.sql.SQLContext$SparkPlanner.pruneFilterProject(SQLContext.scala:367)
> at
> org.apache.spark.sql.execution.SparkStrategies$ParquetOperations$.apply(SparkStrategies.scala:230)
> at
> org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
> at
> org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
> at
> scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
> at
> org.apache.spark.sql.catalyst.planning.QueryPlanner.apply(QueryPlanner.scala:59)
> at
> org.apache.spark.sql.catalyst.planning.QueryPlanner.planLater(QueryPlanner.scala:54)
> at
> org.apache.spark.sql.execution.SparkStrategies$BasicOperators$$anonfun$12.apply(SparkStrategies.scala:282)
> at
> org.apache.spark.sql.execution.SparkStrategies$BasicOperators$$anonfun$12.apply(SparkStrategies.scala:282)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> at scala.collection.immutable.List.foreach(List.scala:318)
> at
> scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
> at
> scala.collection.AbstractTraversable.map(Traversable.scala:105)
> at
> org.apache.spark.sql.execution.SparkStrategies$BasicOperators$.apply(SparkStrategies.scala:282)
> at
> org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
> at
> org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
>

Re: parquet predicate / projection pushdown into unionAll

2014-09-12 Thread Cody Koeninger
Cool, thanks for your help on this.  Any chance of adding it to the 1.1.1
point release, assuming there ends up being one?

On Wed, Sep 10, 2014 at 11:39 AM, Michael Armbrust 
wrote:

> Hey Cody,
>
> Thanks for doing this!  Will look at your PR later today.
>
> Michael
>
> On Wed, Sep 10, 2014 at 9:31 AM, Cody Koeninger 
> wrote:
>
>> Tested the patch against a cluster with some real data.  Initial results
>> seem like going from one table to a union of 2 tables is now closer to a
>> doubling of query time as expected, instead of 5 to 10x.
>>
>> Let me know if you see any issues with that PR.
>>
>> On Wed, Sep 10, 2014 at 8:19 AM, Cody Koeninger 
>> wrote:
>>
>>> So the obvious thing I was missing is that the analyzer has already
>>> resolved attributes by the time the optimizer runs, so the references in
>>> the filter / projection need to be fixed up to match the children.
>>>
>>> Created a PR, let me know if there's a better way to do it.  I'll see
>>> about testing performance against some actual data sets.
>>>
>>> On Tue, Sep 9, 2014 at 6:09 PM, Cody Koeninger 
>>> wrote:
>>>
 Ok, so looking at the optimizer code for the first time and trying the
 simplest rule that could possibly work,

 object UnionPushdown extends Rule[LogicalPlan] {
   def apply(plan: LogicalPlan): LogicalPlan = plan transform {
 // Push down filter into
 union
 case f @ Filter(condition, u @ Union(left, right)) =>

   u.copy(left = f.copy(child = left), right = f.copy(child =
 right))


 // Push down projection into
 union
 case p @ Project(projectList, u @ Union(left, right)) =>
   u.copy(left = p.copy(child = left), right = p.copy(child =
 right))

 }

 }


 If I try manually applying that rule to a logical plan in the repl, it
 produces the query shape I'd expect, and executing that plan results in
 parquet pushdowns as I'd expect.

 But adding those cases to ColumnPruning results in a runtime exception
 (below)

 I can keep digging, but it seems like I'm missing some obvious initial
 context around naming of attributes.  If you can provide any pointers to
 speed me on my way I'd appreciate it.


 java.lang.AssertionError: assertion failed: ArrayBuffer() +
 ArrayBuffer() != WrappedArray(name#6, age#7), List(name#9, age#10,
 phones#11)
 at scala.Predef$.assert(Predef.scala:179)
 at
 org.apache.spark.sql.parquet.ParquetTableScan.(ParquetTableOperations.scala:75)
 at
 org.apache.spark.sql.execution.SparkStrategies$ParquetOperations$$anonfun$9.apply(SparkStrategies.scala:234)
 at
 org.apache.spark.sql.execution.SparkStrategies$ParquetOperations$$anonfun$9.apply(SparkStrategies.scala:234)
 at
 org.apache.spark.sql.SQLContext$SparkPlanner.pruneFilterProject(SQLContext.scala:367)
 at
 org.apache.spark.sql.execution.SparkStrategies$ParquetOperations$.apply(SparkStrategies.scala:230)
 at
 org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
 at
 org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
 at
 scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
 at
 org.apache.spark.sql.catalyst.planning.QueryPlanner.apply(QueryPlanner.scala:59)
 at
 org.apache.spark.sql.catalyst.planning.QueryPlanner.planLater(QueryPlanner.scala:54)
 at
 org.apache.spark.sql.execution.SparkStrategies$BasicOperators$$anonfun$12.apply(SparkStrategies.scala:282)
 at
 org.apache.spark.sql.execution.SparkStrategies$BasicOperators$$anonfun$12.apply(SparkStrategies.scala:282)
 at
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
 at
 scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
 at scala.collection.immutable.List.foreach(List.scala:318)
 at
 scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
 at
 scala.collection.AbstractTraversable.map(Traversable.scala:105)
 at
 org.apache.spark.sql.execution.SparkStrategies$BasicOperators$.apply(SparkStrategies.scala:282)
 at
 org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
 at
 org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
 at
 scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
 at
 org.apache.spark.sql.catalyst.planning.QueryPlanner.apply(QueryPlanner.scala:59)
 at
 org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan$lzycompute(SQLContext.scala:402)
 at
 org.apache.spark.sql.SQLContext$Que

Re: parquet predicate / projection pushdown into unionAll

2014-09-10 Thread Michael Armbrust
Hey Cody,

Thanks for doing this!  Will look at your PR later today.

Michael

On Wed, Sep 10, 2014 at 9:31 AM, Cody Koeninger  wrote:

> Tested the patch against a cluster with some real data.  Initial results
> seem like going from one table to a union of 2 tables is now closer to a
> doubling of query time as expected, instead of 5 to 10x.
>
> Let me know if you see any issues with that PR.
>
> On Wed, Sep 10, 2014 at 8:19 AM, Cody Koeninger 
> wrote:
>
>> So the obvious thing I was missing is that the analyzer has already
>> resolved attributes by the time the optimizer runs, so the references in
>> the filter / projection need to be fixed up to match the children.
>>
>> Created a PR, let me know if there's a better way to do it.  I'll see
>> about testing performance against some actual data sets.
>>
>> On Tue, Sep 9, 2014 at 6:09 PM, Cody Koeninger 
>> wrote:
>>
>>> Ok, so looking at the optimizer code for the first time and trying the
>>> simplest rule that could possibly work,
>>>
>>> object UnionPushdown extends Rule[LogicalPlan] {
>>>   def apply(plan: LogicalPlan): LogicalPlan = plan transform {
>>> // Push down filter into
>>> union
>>> case f @ Filter(condition, u @ Union(left, right)) =>
>>>
>>>   u.copy(left = f.copy(child = left), right = f.copy(child =
>>> right))
>>>
>>>
>>> // Push down projection into
>>> union
>>> case p @ Project(projectList, u @ Union(left, right)) =>
>>>   u.copy(left = p.copy(child = left), right = p.copy(child =
>>> right))
>>>
>>> }
>>>
>>> }
>>>
>>>
>>> If I try manually applying that rule to a logical plan in the repl, it
>>> produces the query shape I'd expect, and executing that plan results in
>>> parquet pushdowns as I'd expect.
>>>
>>> But adding those cases to ColumnPruning results in a runtime exception
>>> (below)
>>>
>>> I can keep digging, but it seems like I'm missing some obvious initial
>>> context around naming of attributes.  If you can provide any pointers to
>>> speed me on my way I'd appreciate it.
>>>
>>>
>>> java.lang.AssertionError: assertion failed: ArrayBuffer() +
>>> ArrayBuffer() != WrappedArray(name#6, age#7), List(name#9, age#10,
>>> phones#11)
>>> at scala.Predef$.assert(Predef.scala:179)
>>> at
>>> org.apache.spark.sql.parquet.ParquetTableScan.(ParquetTableOperations.scala:75)
>>> at
>>> org.apache.spark.sql.execution.SparkStrategies$ParquetOperations$$anonfun$9.apply(SparkStrategies.scala:234)
>>> at
>>> org.apache.spark.sql.execution.SparkStrategies$ParquetOperations$$anonfun$9.apply(SparkStrategies.scala:234)
>>> at
>>> org.apache.spark.sql.SQLContext$SparkPlanner.pruneFilterProject(SQLContext.scala:367)
>>> at
>>> org.apache.spark.sql.execution.SparkStrategies$ParquetOperations$.apply(SparkStrategies.scala:230)
>>> at
>>> org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
>>> at
>>> org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
>>> at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
>>> at
>>> org.apache.spark.sql.catalyst.planning.QueryPlanner.apply(QueryPlanner.scala:59)
>>> at
>>> org.apache.spark.sql.catalyst.planning.QueryPlanner.planLater(QueryPlanner.scala:54)
>>> at
>>> org.apache.spark.sql.execution.SparkStrategies$BasicOperators$$anonfun$12.apply(SparkStrategies.scala:282)
>>> at
>>> org.apache.spark.sql.execution.SparkStrategies$BasicOperators$$anonfun$12.apply(SparkStrategies.scala:282)
>>> at
>>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>>> at
>>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>>> at scala.collection.immutable.List.foreach(List.scala:318)
>>> at
>>> scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
>>> at
>>> scala.collection.AbstractTraversable.map(Traversable.scala:105)
>>> at
>>> org.apache.spark.sql.execution.SparkStrategies$BasicOperators$.apply(SparkStrategies.scala:282)
>>> at
>>> org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
>>> at
>>> org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
>>> at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
>>> at
>>> org.apache.spark.sql.catalyst.planning.QueryPlanner.apply(QueryPlanner.scala:59)
>>> at
>>> org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan$lzycompute(SQLContext.scala:402)
>>> at
>>> org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan(SQLContext.scala:400)
>>> at
>>> org.apache.spark.sql.SQLContext$QueryExecution.executedPlan$lzycompute(SQLContext.scala:406)
>>> at
>>> org.apache.spark.sql.SQLContext$QueryExecution.executedPlan(SQLContext.scala:406)
>>> at
>>> org.apache.spark.sql.SQLContext$Quer

Re: parquet predicate / projection pushdown into unionAll

2014-09-10 Thread Cody Koeninger
Tested the patch against a cluster with some real data.  Initial results
seem like going from one table to a union of 2 tables is now closer to a
doubling of query time as expected, instead of 5 to 10x.

Let me know if you see any issues with that PR.

On Wed, Sep 10, 2014 at 8:19 AM, Cody Koeninger  wrote:

> So the obvious thing I was missing is that the analyzer has already
> resolved attributes by the time the optimizer runs, so the references in
> the filter / projection need to be fixed up to match the children.
>
> Created a PR, let me know if there's a better way to do it.  I'll see
> about testing performance against some actual data sets.
>
> On Tue, Sep 9, 2014 at 6:09 PM, Cody Koeninger  wrote:
>
>> Ok, so looking at the optimizer code for the first time and trying the
>> simplest rule that could possibly work,
>>
>> object UnionPushdown extends Rule[LogicalPlan] {
>>   def apply(plan: LogicalPlan): LogicalPlan = plan transform {
>> // Push down filter into
>> union
>> case f @ Filter(condition, u @ Union(left, right)) =>
>>
>>   u.copy(left = f.copy(child = left), right = f.copy(child =
>> right))
>>
>>
>> // Push down projection into
>> union
>> case p @ Project(projectList, u @ Union(left, right)) =>
>>   u.copy(left = p.copy(child = left), right = p.copy(child =
>> right))
>>
>> }
>>
>> }
>>
>>
>> If I try manually applying that rule to a logical plan in the repl, it
>> produces the query shape I'd expect, and executing that plan results in
>> parquet pushdowns as I'd expect.
>>
>> But adding those cases to ColumnPruning results in a runtime exception
>> (below)
>>
>> I can keep digging, but it seems like I'm missing some obvious initial
>> context around naming of attributes.  If you can provide any pointers to
>> speed me on my way I'd appreciate it.
>>
>>
>> java.lang.AssertionError: assertion failed: ArrayBuffer() + ArrayBuffer()
>> != WrappedArray(name#6, age#7), List(name#9, age#10, phones#11)
>> at scala.Predef$.assert(Predef.scala:179)
>> at
>> org.apache.spark.sql.parquet.ParquetTableScan.(ParquetTableOperations.scala:75)
>> at
>> org.apache.spark.sql.execution.SparkStrategies$ParquetOperations$$anonfun$9.apply(SparkStrategies.scala:234)
>> at
>> org.apache.spark.sql.execution.SparkStrategies$ParquetOperations$$anonfun$9.apply(SparkStrategies.scala:234)
>> at
>> org.apache.spark.sql.SQLContext$SparkPlanner.pruneFilterProject(SQLContext.scala:367)
>> at
>> org.apache.spark.sql.execution.SparkStrategies$ParquetOperations$.apply(SparkStrategies.scala:230)
>> at
>> org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
>> at
>> org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
>> at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
>> at
>> org.apache.spark.sql.catalyst.planning.QueryPlanner.apply(QueryPlanner.scala:59)
>> at
>> org.apache.spark.sql.catalyst.planning.QueryPlanner.planLater(QueryPlanner.scala:54)
>> at
>> org.apache.spark.sql.execution.SparkStrategies$BasicOperators$$anonfun$12.apply(SparkStrategies.scala:282)
>> at
>> org.apache.spark.sql.execution.SparkStrategies$BasicOperators$$anonfun$12.apply(SparkStrategies.scala:282)
>> at
>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>> at
>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>> at scala.collection.immutable.List.foreach(List.scala:318)
>> at
>> scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
>> at scala.collection.AbstractTraversable.map(Traversable.scala:105)
>> at
>> org.apache.spark.sql.execution.SparkStrategies$BasicOperators$.apply(SparkStrategies.scala:282)
>> at
>> org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
>> at
>> org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
>> at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
>> at
>> org.apache.spark.sql.catalyst.planning.QueryPlanner.apply(QueryPlanner.scala:59)
>> at
>> org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan$lzycompute(SQLContext.scala:402)
>> at
>> org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan(SQLContext.scala:400)
>> at
>> org.apache.spark.sql.SQLContext$QueryExecution.executedPlan$lzycompute(SQLContext.scala:406)
>> at
>> org.apache.spark.sql.SQLContext$QueryExecution.executedPlan(SQLContext.scala:406)
>> at
>> org.apache.spark.sql.SQLContext$QueryExecution.toString(SQLContext.scala:431)
>>
>>
>>
>>
>> On Tue, Sep 9, 2014 at 3:02 PM, Michael Armbrust 
>> wrote:
>>
>>> What Patrick said is correct.  Two other points:
>>>  - In the 1.2 release we are hoping to beef up the support for working
>>> with par

Re: parquet predicate / projection pushdown into unionAll

2014-09-10 Thread Cody Koeninger
So the obvious thing I was missing is that the analyzer has already
resolved attributes by the time the optimizer runs, so the references in
the filter / projection need to be fixed up to match the children.

Created a PR, let me know if there's a better way to do it.  I'll see about
testing performance against some actual data sets.

On Tue, Sep 9, 2014 at 6:09 PM, Cody Koeninger  wrote:

> Ok, so looking at the optimizer code for the first time and trying the
> simplest rule that could possibly work,
>
> object UnionPushdown extends Rule[LogicalPlan] {
>   def apply(plan: LogicalPlan): LogicalPlan = plan transform {
> // Push down filter into
> union
> case f @ Filter(condition, u @ Union(left, right)) =>
>
>   u.copy(left = f.copy(child = left), right = f.copy(child =
> right))
>
>
> // Push down projection into
> union
> case p @ Project(projectList, u @ Union(left, right)) =>
>   u.copy(left = p.copy(child = left), right = p.copy(child =
> right))
>
> }
>
> }
>
>
> If I try manually applying that rule to a logical plan in the repl, it
> produces the query shape I'd expect, and executing that plan results in
> parquet pushdowns as I'd expect.
>
> But adding those cases to ColumnPruning results in a runtime exception
> (below)
>
> I can keep digging, but it seems like I'm missing some obvious initial
> context around naming of attributes.  If you can provide any pointers to
> speed me on my way I'd appreciate it.
>
>
> java.lang.AssertionError: assertion failed: ArrayBuffer() + ArrayBuffer()
> != WrappedArray(name#6, age#7), List(name#9, age#10, phones#11)
> at scala.Predef$.assert(Predef.scala:179)
> at
> org.apache.spark.sql.parquet.ParquetTableScan.(ParquetTableOperations.scala:75)
> at
> org.apache.spark.sql.execution.SparkStrategies$ParquetOperations$$anonfun$9.apply(SparkStrategies.scala:234)
> at
> org.apache.spark.sql.execution.SparkStrategies$ParquetOperations$$anonfun$9.apply(SparkStrategies.scala:234)
> at
> org.apache.spark.sql.SQLContext$SparkPlanner.pruneFilterProject(SQLContext.scala:367)
> at
> org.apache.spark.sql.execution.SparkStrategies$ParquetOperations$.apply(SparkStrategies.scala:230)
> at
> org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
> at
> org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
> at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
> at
> org.apache.spark.sql.catalyst.planning.QueryPlanner.apply(QueryPlanner.scala:59)
> at
> org.apache.spark.sql.catalyst.planning.QueryPlanner.planLater(QueryPlanner.scala:54)
> at
> org.apache.spark.sql.execution.SparkStrategies$BasicOperators$$anonfun$12.apply(SparkStrategies.scala:282)
> at
> org.apache.spark.sql.execution.SparkStrategies$BasicOperators$$anonfun$12.apply(SparkStrategies.scala:282)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> at scala.collection.immutable.List.foreach(List.scala:318)
> at
> scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
> at scala.collection.AbstractTraversable.map(Traversable.scala:105)
> at
> org.apache.spark.sql.execution.SparkStrategies$BasicOperators$.apply(SparkStrategies.scala:282)
> at
> org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
> at
> org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
> at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
> at
> org.apache.spark.sql.catalyst.planning.QueryPlanner.apply(QueryPlanner.scala:59)
> at
> org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan$lzycompute(SQLContext.scala:402)
> at
> org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan(SQLContext.scala:400)
> at
> org.apache.spark.sql.SQLContext$QueryExecution.executedPlan$lzycompute(SQLContext.scala:406)
> at
> org.apache.spark.sql.SQLContext$QueryExecution.executedPlan(SQLContext.scala:406)
> at
> org.apache.spark.sql.SQLContext$QueryExecution.toString(SQLContext.scala:431)
>
>
>
>
> On Tue, Sep 9, 2014 at 3:02 PM, Michael Armbrust 
> wrote:
>
>> What Patrick said is correct.  Two other points:
>>  - In the 1.2 release we are hoping to beef up the support for working
>> with partitioned parquet independent of the metastore.
>>  - You can actually do operations like INSERT INTO for parquet tables to
>> add data.  This creates new parquet files for each insertion.  This will
>> break if there are multiple concurrent writers to the same table.
>>
>> On Tue, Sep 9, 2014 at 12:09 PM, Patrick Wendell 
>> wrote:
>>
>>> I think what Michael means is people often use this to read existing
>>> partitione

Re: parquet predicate / projection pushdown into unionAll

2014-09-09 Thread Cody Koeninger
Ok, so looking at the optimizer code for the first time and trying the
simplest rule that could possibly work,

object UnionPushdown extends Rule[LogicalPlan] {
  def apply(plan: LogicalPlan): LogicalPlan = plan transform {
// Push down filter into
union
case f @ Filter(condition, u @ Union(left, right)) =>

  u.copy(left = f.copy(child = left), right = f.copy(child =
right))


// Push down projection into
union
case p @ Project(projectList, u @ Union(left, right)) =>
  u.copy(left = p.copy(child = left), right = p.copy(child =
right))

}

}


If I try manually applying that rule to a logical plan in the repl, it
produces the query shape I'd expect, and executing that plan results in
parquet pushdowns as I'd expect.

But adding those cases to ColumnPruning results in a runtime exception
(below)

I can keep digging, but it seems like I'm missing some obvious initial
context around naming of attributes.  If you can provide any pointers to
speed me on my way I'd appreciate it.


java.lang.AssertionError: assertion failed: ArrayBuffer() + ArrayBuffer()
!= WrappedArray(name#6, age#7), List(name#9, age#10, phones#11)
at scala.Predef$.assert(Predef.scala:179)
at
org.apache.spark.sql.parquet.ParquetTableScan.(ParquetTableOperations.scala:75)
at
org.apache.spark.sql.execution.SparkStrategies$ParquetOperations$$anonfun$9.apply(SparkStrategies.scala:234)
at
org.apache.spark.sql.execution.SparkStrategies$ParquetOperations$$anonfun$9.apply(SparkStrategies.scala:234)
at
org.apache.spark.sql.SQLContext$SparkPlanner.pruneFilterProject(SQLContext.scala:367)
at
org.apache.spark.sql.execution.SparkStrategies$ParquetOperations$.apply(SparkStrategies.scala:230)
at
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
at
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
at
org.apache.spark.sql.catalyst.planning.QueryPlanner.apply(QueryPlanner.scala:59)
at
org.apache.spark.sql.catalyst.planning.QueryPlanner.planLater(QueryPlanner.scala:54)
at
org.apache.spark.sql.execution.SparkStrategies$BasicOperators$$anonfun$12.apply(SparkStrategies.scala:282)
at
org.apache.spark.sql.execution.SparkStrategies$BasicOperators$$anonfun$12.apply(SparkStrategies.scala:282)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.immutable.List.foreach(List.scala:318)
at
scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at
org.apache.spark.sql.execution.SparkStrategies$BasicOperators$.apply(SparkStrategies.scala:282)
at
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
at
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
at
org.apache.spark.sql.catalyst.planning.QueryPlanner.apply(QueryPlanner.scala:59)
at
org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan$lzycompute(SQLContext.scala:402)
at
org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan(SQLContext.scala:400)
at
org.apache.spark.sql.SQLContext$QueryExecution.executedPlan$lzycompute(SQLContext.scala:406)
at
org.apache.spark.sql.SQLContext$QueryExecution.executedPlan(SQLContext.scala:406)
at
org.apache.spark.sql.SQLContext$QueryExecution.toString(SQLContext.scala:431)




On Tue, Sep 9, 2014 at 3:02 PM, Michael Armbrust 
wrote:

> What Patrick said is correct.  Two other points:
>  - In the 1.2 release we are hoping to beef up the support for working
> with partitioned parquet independent of the metastore.
>  - You can actually do operations like INSERT INTO for parquet tables to
> add data.  This creates new parquet files for each insertion.  This will
> break if there are multiple concurrent writers to the same table.
>
> On Tue, Sep 9, 2014 at 12:09 PM, Patrick Wendell 
> wrote:
>
>> I think what Michael means is people often use this to read existing
>> partitioned Parquet tables that are defined in a Hive metastore rather
>> than data generated directly from within Spark and then reading it
>> back as a table. I'd expect the latter case to become more common, but
>> for now most users connect to an existing metastore.
>>
>> I think you could go this route by creating a partitioned external
>> table based on the on-disk layout you create. The downside is that
>> you'd have to go through a hive metastore whereas what you are doing
>> now doesn't need hive at all.
>>
>> We should also just fix the case you are mentioning where a union

Re: parquet predicate / projection pushdown into unionAll

2014-09-09 Thread Michael Armbrust
What Patrick said is correct.  Two other points:
 - In the 1.2 release we are hoping to beef up the support for working with
partitioned parquet independent of the metastore.
 - You can actually do operations like INSERT INTO for parquet tables to
add data.  This creates new parquet files for each insertion.  This will
break if there are multiple concurrent writers to the same table.

On Tue, Sep 9, 2014 at 12:09 PM, Patrick Wendell  wrote:

> I think what Michael means is people often use this to read existing
> partitioned Parquet tables that are defined in a Hive metastore rather
> than data generated directly from within Spark and then reading it
> back as a table. I'd expect the latter case to become more common, but
> for now most users connect to an existing metastore.
>
> I think you could go this route by creating a partitioned external
> table based on the on-disk layout you create. The downside is that
> you'd have to go through a hive metastore whereas what you are doing
> now doesn't need hive at all.
>
> We should also just fix the case you are mentioning where a union is
> used directly from within spark. But that's the context.
>
> - Patrick
>
> On Tue, Sep 9, 2014 at 12:01 PM, Cody Koeninger 
> wrote:
> > Maybe I'm missing something, I thought parquet was generally a write-once
> > format and the sqlContext interface to it seems that way as well.
> >
> > d1.saveAsParquetFile("/foo/d1")
> >
> > // another day, another table, with same schema
> > d2.saveAsParquetFile("/foo/d2")
> >
> > Will give a directory structure like
> >
> > /foo/d1/_metadata
> > /foo/d1/part-r-1.parquet
> > /foo/d1/part-r-2.parquet
> > /foo/d1/_SUCCESS
> >
> > /foo/d2/_metadata
> > /foo/d2/part-r-1.parquet
> > /foo/d2/part-r-2.parquet
> > /foo/d2/_SUCCESS
> >
> > // ParquetFileReader will fail, because /foo/d1 is a directory, not a
> > parquet partition
> > sqlContext.parquetFile("/foo")
> >
> > // works, but has the noted lack of pushdown
> >
> sqlContext.parquetFile("/foo/d1").unionAll(sqlContext.parquetFile("/foo/d2"))
> >
> >
> > Is there another alternative?
> >
> >
> >
> > On Tue, Sep 9, 2014 at 1:29 PM, Michael Armbrust  >
> > wrote:
> >
> >> I think usually people add these directories as multiple partitions of
> the
> >> same table instead of union.  This actually allows us to efficiently
> prune
> >> directories when reading in addition to standard column pruning.
> >>
> >> On Tue, Sep 9, 2014 at 11:26 AM, Gary Malouf 
> >> wrote:
> >>
> >>> I'm kind of surprised this was not run into before.  Do people not
> >>> segregate their data by day/week in the HDFS directory structure?
> >>>
> >>>
> >>> On Tue, Sep 9, 2014 at 2:08 PM, Michael Armbrust <
> mich...@databricks.com>
> >>> wrote:
> >>>
>  Thanks!
> 
>  On Tue, Sep 9, 2014 at 11:07 AM, Cody Koeninger 
>  wrote:
> 
>  > Opened
>  >
>  > https://issues.apache.org/jira/browse/SPARK-3462
>  >
>  > I'll take a look at ColumnPruning and see what I can do
>  >
>  > On Tue, Sep 9, 2014 at 12:46 PM, Michael Armbrust <
>  mich...@databricks.com>
>  > wrote:
>  >
>  >> On Tue, Sep 9, 2014 at 10:17 AM, Cody Koeninger <
> c...@koeninger.org>
>  >> wrote:
>  >>>
>  >>> Is there a reason in general not to push projections and
> predicates
>  down
>  >>> into the individual ParquetTableScans in a union?
>  >>>
>  >>
>  >> This would be a great case to add to ColumnPruning.  Would be
> awesome
>  if
>  >> you could open a JIRA or even a PR :)
>  >>
>  >
>  >
> 
> >>>
> >>>
> >>
>


Re: parquet predicate / projection pushdown into unionAll

2014-09-09 Thread Patrick Wendell
I think what Michael means is people often use this to read existing
partitioned Parquet tables that are defined in a Hive metastore rather
than data generated directly from within Spark and then reading it
back as a table. I'd expect the latter case to become more common, but
for now most users connect to an existing metastore.

I think you could go this route by creating a partitioned external
table based on the on-disk layout you create. The downside is that
you'd have to go through a hive metastore whereas what you are doing
now doesn't need hive at all.

We should also just fix the case you are mentioning where a union is
used directly from within spark. But that's the context.

- Patrick

On Tue, Sep 9, 2014 at 12:01 PM, Cody Koeninger  wrote:
> Maybe I'm missing something, I thought parquet was generally a write-once
> format and the sqlContext interface to it seems that way as well.
>
> d1.saveAsParquetFile("/foo/d1")
>
> // another day, another table, with same schema
> d2.saveAsParquetFile("/foo/d2")
>
> Will give a directory structure like
>
> /foo/d1/_metadata
> /foo/d1/part-r-1.parquet
> /foo/d1/part-r-2.parquet
> /foo/d1/_SUCCESS
>
> /foo/d2/_metadata
> /foo/d2/part-r-1.parquet
> /foo/d2/part-r-2.parquet
> /foo/d2/_SUCCESS
>
> // ParquetFileReader will fail, because /foo/d1 is a directory, not a
> parquet partition
> sqlContext.parquetFile("/foo")
>
> // works, but has the noted lack of pushdown
> sqlContext.parquetFile("/foo/d1").unionAll(sqlContext.parquetFile("/foo/d2"))
>
>
> Is there another alternative?
>
>
>
> On Tue, Sep 9, 2014 at 1:29 PM, Michael Armbrust 
> wrote:
>
>> I think usually people add these directories as multiple partitions of the
>> same table instead of union.  This actually allows us to efficiently prune
>> directories when reading in addition to standard column pruning.
>>
>> On Tue, Sep 9, 2014 at 11:26 AM, Gary Malouf 
>> wrote:
>>
>>> I'm kind of surprised this was not run into before.  Do people not
>>> segregate their data by day/week in the HDFS directory structure?
>>>
>>>
>>> On Tue, Sep 9, 2014 at 2:08 PM, Michael Armbrust 
>>> wrote:
>>>
 Thanks!

 On Tue, Sep 9, 2014 at 11:07 AM, Cody Koeninger 
 wrote:

 > Opened
 >
 > https://issues.apache.org/jira/browse/SPARK-3462
 >
 > I'll take a look at ColumnPruning and see what I can do
 >
 > On Tue, Sep 9, 2014 at 12:46 PM, Michael Armbrust <
 mich...@databricks.com>
 > wrote:
 >
 >> On Tue, Sep 9, 2014 at 10:17 AM, Cody Koeninger 
 >> wrote:
 >>>
 >>> Is there a reason in general not to push projections and predicates
 down
 >>> into the individual ParquetTableScans in a union?
 >>>
 >>
 >> This would be a great case to add to ColumnPruning.  Would be awesome
 if
 >> you could open a JIRA or even a PR :)
 >>
 >
 >

>>>
>>>
>>

-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



Re: parquet predicate / projection pushdown into unionAll

2014-09-09 Thread Cody Koeninger
Maybe I'm missing something, I thought parquet was generally a write-once
format and the sqlContext interface to it seems that way as well.

d1.saveAsParquetFile("/foo/d1")

// another day, another table, with same schema
d2.saveAsParquetFile("/foo/d2")

Will give a directory structure like

/foo/d1/_metadata
/foo/d1/part-r-1.parquet
/foo/d1/part-r-2.parquet
/foo/d1/_SUCCESS

/foo/d2/_metadata
/foo/d2/part-r-1.parquet
/foo/d2/part-r-2.parquet
/foo/d2/_SUCCESS

// ParquetFileReader will fail, because /foo/d1 is a directory, not a
parquet partition
sqlContext.parquetFile("/foo")

// works, but has the noted lack of pushdown
sqlContext.parquetFile("/foo/d1").unionAll(sqlContext.parquetFile("/foo/d2"))


Is there another alternative?



On Tue, Sep 9, 2014 at 1:29 PM, Michael Armbrust 
wrote:

> I think usually people add these directories as multiple partitions of the
> same table instead of union.  This actually allows us to efficiently prune
> directories when reading in addition to standard column pruning.
>
> On Tue, Sep 9, 2014 at 11:26 AM, Gary Malouf 
> wrote:
>
>> I'm kind of surprised this was not run into before.  Do people not
>> segregate their data by day/week in the HDFS directory structure?
>>
>>
>> On Tue, Sep 9, 2014 at 2:08 PM, Michael Armbrust 
>> wrote:
>>
>>> Thanks!
>>>
>>> On Tue, Sep 9, 2014 at 11:07 AM, Cody Koeninger 
>>> wrote:
>>>
>>> > Opened
>>> >
>>> > https://issues.apache.org/jira/browse/SPARK-3462
>>> >
>>> > I'll take a look at ColumnPruning and see what I can do
>>> >
>>> > On Tue, Sep 9, 2014 at 12:46 PM, Michael Armbrust <
>>> mich...@databricks.com>
>>> > wrote:
>>> >
>>> >> On Tue, Sep 9, 2014 at 10:17 AM, Cody Koeninger 
>>> >> wrote:
>>> >>>
>>> >>> Is there a reason in general not to push projections and predicates
>>> down
>>> >>> into the individual ParquetTableScans in a union?
>>> >>>
>>> >>
>>> >> This would be a great case to add to ColumnPruning.  Would be awesome
>>> if
>>> >> you could open a JIRA or even a PR :)
>>> >>
>>> >
>>> >
>>>
>>
>>
>


Re: parquet predicate / projection pushdown into unionAll

2014-09-09 Thread Michael Armbrust
I think usually people add these directories as multiple partitions of the
same table instead of union.  This actually allows us to efficiently prune
directories when reading in addition to standard column pruning.

On Tue, Sep 9, 2014 at 11:26 AM, Gary Malouf  wrote:

> I'm kind of surprised this was not run into before.  Do people not
> segregate their data by day/week in the HDFS directory structure?
>
>
> On Tue, Sep 9, 2014 at 2:08 PM, Michael Armbrust 
> wrote:
>
>> Thanks!
>>
>> On Tue, Sep 9, 2014 at 11:07 AM, Cody Koeninger 
>> wrote:
>>
>> > Opened
>> >
>> > https://issues.apache.org/jira/browse/SPARK-3462
>> >
>> > I'll take a look at ColumnPruning and see what I can do
>> >
>> > On Tue, Sep 9, 2014 at 12:46 PM, Michael Armbrust <
>> mich...@databricks.com>
>> > wrote:
>> >
>> >> On Tue, Sep 9, 2014 at 10:17 AM, Cody Koeninger 
>> >> wrote:
>> >>>
>> >>> Is there a reason in general not to push projections and predicates
>> down
>> >>> into the individual ParquetTableScans in a union?
>> >>>
>> >>
>> >> This would be a great case to add to ColumnPruning.  Would be awesome
>> if
>> >> you could open a JIRA or even a PR :)
>> >>
>> >
>> >
>>
>
>


Re: parquet predicate / projection pushdown into unionAll

2014-09-09 Thread Gary Malouf
I'm kind of surprised this was not run into before.  Do people not
segregate their data by day/week in the HDFS directory structure?


On Tue, Sep 9, 2014 at 2:08 PM, Michael Armbrust 
wrote:

> Thanks!
>
> On Tue, Sep 9, 2014 at 11:07 AM, Cody Koeninger 
> wrote:
>
> > Opened
> >
> > https://issues.apache.org/jira/browse/SPARK-3462
> >
> > I'll take a look at ColumnPruning and see what I can do
> >
> > On Tue, Sep 9, 2014 at 12:46 PM, Michael Armbrust <
> mich...@databricks.com>
> > wrote:
> >
> >> On Tue, Sep 9, 2014 at 10:17 AM, Cody Koeninger 
> >> wrote:
> >>>
> >>> Is there a reason in general not to push projections and predicates
> down
> >>> into the individual ParquetTableScans in a union?
> >>>
> >>
> >> This would be a great case to add to ColumnPruning.  Would be awesome if
> >> you could open a JIRA or even a PR :)
> >>
> >
> >
>


Re: parquet predicate / projection pushdown into unionAll

2014-09-09 Thread Michael Armbrust
Thanks!

On Tue, Sep 9, 2014 at 11:07 AM, Cody Koeninger  wrote:

> Opened
>
> https://issues.apache.org/jira/browse/SPARK-3462
>
> I'll take a look at ColumnPruning and see what I can do
>
> On Tue, Sep 9, 2014 at 12:46 PM, Michael Armbrust 
> wrote:
>
>> On Tue, Sep 9, 2014 at 10:17 AM, Cody Koeninger 
>> wrote:
>>>
>>> Is there a reason in general not to push projections and predicates down
>>> into the individual ParquetTableScans in a union?
>>>
>>
>> This would be a great case to add to ColumnPruning.  Would be awesome if
>> you could open a JIRA or even a PR :)
>>
>
>


Re: parquet predicate / projection pushdown into unionAll

2014-09-09 Thread Cody Koeninger
Opened

https://issues.apache.org/jira/browse/SPARK-3462

I'll take a look at ColumnPruning and see what I can do

On Tue, Sep 9, 2014 at 12:46 PM, Michael Armbrust 
wrote:

> On Tue, Sep 9, 2014 at 10:17 AM, Cody Koeninger 
> wrote:
>>
>> Is there a reason in general not to push projections and predicates down
>> into the individual ParquetTableScans in a union?
>>
>
> This would be a great case to add to ColumnPruning.  Would be awesome if
> you could open a JIRA or even a PR :)
>


Re: parquet predicate / projection pushdown into unionAll

2014-09-09 Thread Michael Armbrust
On Tue, Sep 9, 2014 at 10:17 AM, Cody Koeninger  wrote:
>
> Is there a reason in general not to push projections and predicates down
> into the individual ParquetTableScans in a union?
>

This would be a great case to add to ColumnPruning.  Would be awesome if
you could open a JIRA or even a PR :)


parquet predicate / projection pushdown into unionAll

2014-09-09 Thread Cody Koeninger
I've been looking at performance differences between spark sql queries
against single parquet tables, vs a unionAll of two tables.  It's a
significant difference, like 5 to 10x

Is there a reason in general not to push projections and predicates down
into the individual ParquetTableScans in a union?

Here's an example of what I'm talking about:


scala> p.printSchema
root
 |-- name: string (nullable = true)
 |-- age: integer (nullable = false)
 |-- phones: array (nullable = true)
 ||-- element: string (containsNull = true)


scala> p2.printSchema
root
 |-- name: string (nullable = true)
 |-- age: integer (nullable = false)
 |-- phones: array (nullable = true)
 ||-- element: string (containsNull = true)


scala> val b = p.unionAll(p2)


// single table, pushdown
scala> p.where('age < 40).select('name)
res36: org.apache.spark.sql.SchemaRDD =
SchemaRDD[97] at RDD at SchemaRDD.scala:103
== Query Plan ==
== Physical Plan ==
Project [name#3]
 ParquetTableScan [name#3,age#4], (ParquetRelation /var/tmp/people,
Some(Configuration: core-default.xml, core-site.xml, mapred-default.xml,
mapred-site.xml), org.apache.spark.sql.SQLContext@6d7e79f6, []), [(age#4 <
40)]


// union of 2 tables, no pushdown
scala> b.where('age < 40).select('name)
res37: org.apache.spark.sql.SchemaRDD =
SchemaRDD[99] at RDD at SchemaRDD.scala:103
== Query Plan ==
== Physical Plan ==
Project [name#3]
 Filter (age#4 < 40)
  Union [ParquetTableScan [name#3,age#4,phones#5], (ParquetRelation
/var/tmp/people, Some(Configuration: core-default.xml, core-site.xml,
mapred-default.xml, mapred-site.xml),
org.apache.spark.sql.SQLContext@6d7e79f6, []), []
,ParquetTableScan [name#0,age#1,phones#2], (ParquetRelation
/var/tmp/people2, Some(Configuration: core-default.xml, core-site.xml,
mapred-default.xml, mapred-site.xml),
org.apache.spark.sql.SQLContext@6d7e79f6, []), []
]
   ParquetTableScan [name#3,age#4,phones#5], (ParquetRelation
/var/tmp/people, Some(Configuration: core-default.xml, core-site.xml,
mapred-default.xml, mapred-site.xml), org.apache.spark.sql...