[ https://issues.apache.org/jira/browse/SPARK-26450?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Herman van Hovell resolved SPARK-26450. --------------------------------------- Resolution: Fixed Assignee: Bruce Robbins Fix Version/s: 3.0.0 > Map of schema is built too frequently in some wide queries > ---------------------------------------------------------- > > Key: SPARK-26450 > URL: https://issues.apache.org/jira/browse/SPARK-26450 > Project: Spark > Issue Type: Improvement > Components: SQL > Affects Versions: 2.4.0 > Reporter: Bruce Robbins > Assignee: Bruce Robbins > Priority: Minor > Fix For: 3.0.0 > > > When executing queries with wide projections and wide schemas, Spark rebuilds > an attribute map for the same schema many times. > For example: > {noformat} > select * from orctbl where id1 = 1 > {noformat} > Assume {{orctbl}} has 6000 columns and 34 files. In that case, the above > query creates an AttributeSeq object 270,000 times[1]. Each AttributeSeq > instantiation builds a map of the entire list of 6000 attributes (but not > until lazy val exprIdToOrdinal is referenced). > Whenever OrcFileFormat reads a new file, it generates a new unsafe > projection. That results in this > [function|https://github.com/apache/spark/blob/827383a97c11a61661440ff86ce0c3382a2a23b2/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeProjection.scala#L319] > getting called: > {code:java} > protected def bind(in: Seq[Expression], inputSchema: Seq[Attribute]): > Seq[Expression] = > in.map(BindReferences.bindReference(_, inputSchema)) > {code} > For each column in the projection, this line calls bindReference. Each call > passes inputSchema, a Sequence of Attributes, to a parameter position > expecting an AttributeSeq. The compiler implicitly calls the constructor for > AttributeSeq, which (lazily) builds a map for every attribute in the schema. > Therefore, this function builds a map of the entire schema once for each > column in the projection, and it does this for each input file. For the above > example query, this accounts for 204K instantiations of AttributeSeq. > Readers for CSV and JSON tables do something similar. > In addition, ProjectExec also creates an unsafe projection for each task. As > a result, this > [line|https://github.com/apache/spark/blob/827383a97c11a61661440ff86ce0c3382a2a23b2/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala#L91] > gets called, which has the same issue: > {code:java} > def toBoundExprs(exprs: Seq[Expression], inputSchema: Seq[Attribute]): > Seq[Expression] = { > exprs.map(BindReferences.bindReference(_, inputSchema)) > } > {code} > The above affects all wide queries that have a projection node, regardless of > the file reader. For the example query, ProjectExec accounts for the > additional 66K instantiations of the AttributeSeq. > Spark can save time by pre-building the AttributeSeq right before the map > operations in {{bind}} and {{toBoundExprs}}. The time saved depends on size > of schema, size of projection, number of input files (for Orc), number of > file splits (for CSV, and JSON tables), and number of tasks. > For a 6000 column CSV table with 500K records and 34 input files, the time > savings is only 6%[1] because Spark doesn't create as many unsafe projections > as compared to Orc tables. > On the other hand, for a 6000 column Orc table with 500K records and 34 input > files, the time savings is about 16%[1]. > [1] based on queries run in local mode with 8 executor threads on my laptop. -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org