[ 
https://issues.apache.org/jira/browse/SPARK-26450?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Herman van Hovell resolved SPARK-26450.
---------------------------------------
       Resolution: Fixed
         Assignee: Bruce Robbins
    Fix Version/s: 3.0.0

> Map of schema is built too frequently in some wide queries
> ----------------------------------------------------------
>
>                 Key: SPARK-26450
>                 URL: https://issues.apache.org/jira/browse/SPARK-26450
>             Project: Spark
>          Issue Type: Improvement
>          Components: SQL
>    Affects Versions: 2.4.0
>            Reporter: Bruce Robbins
>            Assignee: Bruce Robbins
>            Priority: Minor
>             Fix For: 3.0.0
>
>
> When executing queries with wide projections and wide schemas, Spark rebuilds 
> an attribute map for the same schema many times.
> For example:
> {noformat}
> select * from orctbl where id1 = 1
> {noformat}
> Assume {{orctbl}} has 6000 columns and 34 files. In that case, the above 
> query creates an AttributeSeq object 270,000 times[1]. Each AttributeSeq 
> instantiation builds a map of the entire list of 6000 attributes (but not 
> until lazy val exprIdToOrdinal is referenced).
> Whenever OrcFileFormat reads a new file, it generates a new unsafe 
> projection. That results in this 
> [function|https://github.com/apache/spark/blob/827383a97c11a61661440ff86ce0c3382a2a23b2/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala#L319]
>  getting called:
> {code:java}
> protected def bind(in: Seq[Expression], inputSchema: Seq[Attribute]): 
> Seq[Expression] =
>     in.map(BindReferences.bindReference(_, inputSchema))
> {code}
> For each column in the projection, this line calls bindReference. Each call 
> passes inputSchema, a Sequence of Attributes, to a parameter position 
> expecting an AttributeSeq. The compiler implicitly calls the constructor for 
> AttributeSeq, which (lazily) builds a map for every attribute in the schema. 
> Therefore, this function builds a map of the entire schema once for each 
> column in the projection, and it does this for each input file. For the above 
> example query, this accounts for 204K instantiations of AttributeSeq.
> Readers for CSV and JSON tables do something similar.
> In addition, ProjectExec also creates an unsafe projection for each task. As 
> a result, this 
> [line|https://github.com/apache/spark/blob/827383a97c11a61661440ff86ce0c3382a2a23b2/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala#L91]
>  gets called, which has the same issue:
> {code:java}
>   def toBoundExprs(exprs: Seq[Expression], inputSchema: Seq[Attribute]): 
> Seq[Expression] = {
>     exprs.map(BindReferences.bindReference(_, inputSchema))
>   }
> {code}
> The above affects all wide queries that have a projection node, regardless of 
> the file reader. For the example query, ProjectExec accounts for the 
> additional 66K instantiations of the AttributeSeq.
> Spark can save time by pre-building the AttributeSeq right before the map 
> operations in {{bind}} and {{toBoundExprs}}. The time saved depends on size 
> of schema, size of projection, number of input files (for Orc), number of 
> file splits (for CSV, and JSON tables), and number of tasks.
> For a 6000 column CSV table with 500K records and 34 input files, the time 
> savings is only 6%[1] because Spark doesn't create as many unsafe projections 
> as compared to Orc tables.
> On the other hand, for a 6000 column Orc table with 500K records and 34 input 
> files, the time savings is about 16%[1].
> [1] based on queries run in local mode with 8 executor threads on my laptop.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to