Ok, so looking at the optimizer code for the first time and trying the
simplest rule that could possibly work,

object UnionPushdown extends Rule[LogicalPlan] {
  def apply(plan: LogicalPlan): LogicalPlan = plan transform {
    // Push down filter into
union
    case f @ Filter(condition, u @ Union(left, right)) =>

      u.copy(left = f.copy(child = left), right = f.copy(child =
right))


    // Push down projection into
union
    case p @ Project(projectList, u @ Union(left, right)) =>
      u.copy(left = p.copy(child = left), right = p.copy(child =
right))

}

}


If I try manually applying that rule to a logical plan in the repl, it
produces the query shape I'd expect, and executing that plan results in
parquet pushdowns as I'd expect.

But adding those cases to ColumnPruning results in a runtime exception
(below)

I can keep digging, but it seems like I'm missing some obvious initial
context around naming of attributes.  If you can provide any pointers to
speed me on my way I'd appreciate it.


java.lang.AssertionError: assertion failed: ArrayBuffer() + ArrayBuffer()
!= WrappedArray(name#6, age#7), List(name#9, age#10, phones#11)
        at scala.Predef$.assert(Predef.scala:179)
        at
org.apache.spark.sql.parquet.ParquetTableScan.<init>(ParquetTableOperations.scala:75)
        at
org.apache.spark.sql.execution.SparkStrategies$ParquetOperations$$anonfun$9.apply(SparkStrategies.scala:234)
        at
org.apache.spark.sql.execution.SparkStrategies$ParquetOperations$$anonfun$9.apply(SparkStrategies.scala:234)
        at
org.apache.spark.sql.SQLContext$SparkPlanner.pruneFilterProject(SQLContext.scala:367)
        at
org.apache.spark.sql.execution.SparkStrategies$ParquetOperations$.apply(SparkStrategies.scala:230)
        at
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
        at
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
        at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
        at
org.apache.spark.sql.catalyst.planning.QueryPlanner.apply(QueryPlanner.scala:59)
        at
org.apache.spark.sql.catalyst.planning.QueryPlanner.planLater(QueryPlanner.scala:54)
        at
org.apache.spark.sql.execution.SparkStrategies$BasicOperators$$anonfun$12.apply(SparkStrategies.scala:282)
        at
org.apache.spark.sql.execution.SparkStrategies$BasicOperators$$anonfun$12.apply(SparkStrategies.scala:282)
        at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
        at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
        at scala.collection.immutable.List.foreach(List.scala:318)
        at
scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
        at scala.collection.AbstractTraversable.map(Traversable.scala:105)
        at
org.apache.spark.sql.execution.SparkStrategies$BasicOperators$.apply(SparkStrategies.scala:282)
        at
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
        at
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
        at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
        at
org.apache.spark.sql.catalyst.planning.QueryPlanner.apply(QueryPlanner.scala:59)
        at
org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan$lzycompute(SQLContext.scala:402)
        at
org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan(SQLContext.scala:400)
        at
org.apache.spark.sql.SQLContext$QueryExecution.executedPlan$lzycompute(SQLContext.scala:406)
        at
org.apache.spark.sql.SQLContext$QueryExecution.executedPlan(SQLContext.scala:406)
        at
org.apache.spark.sql.SQLContext$QueryExecution.toString(SQLContext.scala:431)




On Tue, Sep 9, 2014 at 3:02 PM, Michael Armbrust <mich...@databricks.com>
wrote:

> What Patrick said is correct.  Two other points:
>  - In the 1.2 release we are hoping to beef up the support for working
> with partitioned parquet independent of the metastore.
>  - You can actually do operations like INSERT INTO for parquet tables to
> add data.  This creates new parquet files for each insertion.  This will
> break if there are multiple concurrent writers to the same table.
>
> On Tue, Sep 9, 2014 at 12:09 PM, Patrick Wendell <pwend...@gmail.com>
> wrote:
>
>> I think what Michael means is people often use this to read existing
>> partitioned Parquet tables that are defined in a Hive metastore rather
>> than data generated directly from within Spark and then reading it
>> back as a table. I'd expect the latter case to become more common, but
>> for now most users connect to an existing metastore.
>>
>> I think you could go this route by creating a partitioned external
>> table based on the on-disk layout you create. The downside is that
>> you'd have to go through a hive metastore whereas what you are doing
>> now doesn't need hive at all.
>>
>> We should also just fix the case you are mentioning where a union is
>> used directly from within spark. But that's the context.
>>
>> - Patrick
>>
>> On Tue, Sep 9, 2014 at 12:01 PM, Cody Koeninger <c...@koeninger.org>
>> wrote:
>> > Maybe I'm missing something, I thought parquet was generally a
>> write-once
>> > format and the sqlContext interface to it seems that way as well.
>> >
>> > d1.saveAsParquetFile("/foo/d1")
>> >
>> > // another day, another table, with same schema
>> > d2.saveAsParquetFile("/foo/d2")
>> >
>> > Will give a directory structure like
>> >
>> > /foo/d1/_metadata
>> > /foo/d1/part-r-1.parquet
>> > /foo/d1/part-r-2.parquet
>> > /foo/d1/_SUCCESS
>> >
>> > /foo/d2/_metadata
>> > /foo/d2/part-r-1.parquet
>> > /foo/d2/part-r-2.parquet
>> > /foo/d2/_SUCCESS
>> >
>> > // ParquetFileReader will fail, because /foo/d1 is a directory, not a
>> > parquet partition
>> > sqlContext.parquetFile("/foo")
>> >
>> > // works, but has the noted lack of pushdown
>> >
>> sqlContext.parquetFile("/foo/d1").unionAll(sqlContext.parquetFile("/foo/d2"))
>> >
>> >
>> > Is there another alternative?
>> >
>> >
>> >
>> > On Tue, Sep 9, 2014 at 1:29 PM, Michael Armbrust <
>> mich...@databricks.com>
>> > wrote:
>> >
>> >> I think usually people add these directories as multiple partitions of
>> the
>> >> same table instead of union.  This actually allows us to efficiently
>> prune
>> >> directories when reading in addition to standard column pruning.
>> >>
>> >> On Tue, Sep 9, 2014 at 11:26 AM, Gary Malouf <malouf.g...@gmail.com>
>> >> wrote:
>> >>
>> >>> I'm kind of surprised this was not run into before.  Do people not
>> >>> segregate their data by day/week in the HDFS directory structure?
>> >>>
>> >>>
>> >>> On Tue, Sep 9, 2014 at 2:08 PM, Michael Armbrust <
>> mich...@databricks.com>
>> >>> wrote:
>> >>>
>> >>>> Thanks!
>> >>>>
>> >>>> On Tue, Sep 9, 2014 at 11:07 AM, Cody Koeninger <c...@koeninger.org>
>> >>>> wrote:
>> >>>>
>> >>>> > Opened
>> >>>> >
>> >>>> > https://issues.apache.org/jira/browse/SPARK-3462
>> >>>> >
>> >>>> > I'll take a look at ColumnPruning and see what I can do
>> >>>> >
>> >>>> > On Tue, Sep 9, 2014 at 12:46 PM, Michael Armbrust <
>> >>>> mich...@databricks.com>
>> >>>> > wrote:
>> >>>> >
>> >>>> >> On Tue, Sep 9, 2014 at 10:17 AM, Cody Koeninger <
>> c...@koeninger.org>
>> >>>> >> wrote:
>> >>>> >>>
>> >>>> >>> Is there a reason in general not to push projections and
>> predicates
>> >>>> down
>> >>>> >>> into the individual ParquetTableScans in a union?
>> >>>> >>>
>> >>>> >>
>> >>>> >> This would be a great case to add to ColumnPruning.  Would be
>> awesome
>> >>>> if
>> >>>> >> you could open a JIRA or even a PR :)
>> >>>> >>
>> >>>> >
>> >>>> >
>> >>>>
>> >>>
>> >>>
>> >>
>>
>
>

Reply via email to