spark git commit: [SPARK-15764][SQL] Replace N^2 loop in BindReferences

2016-06-06 Thread joshrosen
Repository: spark
Updated Branches:
  refs/heads/branch-2.0 d07bce49f -> 5363783af


[SPARK-15764][SQL] Replace N^2 loop in BindReferences

BindReferences contains a n^2 loop which causes performance issues when 
operating over large schemas: to determine the ordinal of an attribute 
reference, we perform a linear scan over the `input` array. Because input can 
sometimes be a `List`, the call to `input(ordinal).nullable` can also be O(n).

Instead of performing a linear scan, we can convert the input into an array and 
build a hash map to map from expression ids to ordinals. The greater up-front 
cost of the map construction is offset by the fact that an expression can 
contain multiple attribute references, so the cost of the map construction is 
amortized across a number of lookups.

Perf. benchmarks to follow. /cc ericl

Author: Josh Rosen 

Closes #13505 from JoshRosen/bind-references-improvement.

(cherry picked from commit 0b8d694999b43ada4833388cad6c285c7757cbf7)
Signed-off-by: Josh Rosen 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5363783a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5363783a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5363783a

Branch: refs/heads/branch-2.0
Commit: 5363783af7d93b7597181c8b39b83800fa059543
Parents: d07bce4
Author: Josh Rosen 
Authored: Mon Jun 6 11:44:51 2016 -0700
Committer: Josh Rosen 
Committed: Mon Jun 6 11:45:35 2016 -0700

--
 .../sql/catalyst/expressions/AttributeMap.scala |  7 
 .../catalyst/expressions/BoundAttribute.scala   |  6 ++--
 .../sql/catalyst/expressions/package.scala  | 34 +++-
 .../spark/sql/catalyst/plans/QueryPlan.scala|  2 +-
 .../execution/aggregate/HashAggregateExec.scala |  2 +-
 .../columnar/InMemoryTableScanExec.scala|  4 +--
 6 files changed, 40 insertions(+), 15 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/5363783a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/AttributeMap.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/AttributeMap.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/AttributeMap.scala
index ef3cc55..96a11e3 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/AttributeMap.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/AttributeMap.scala
@@ -26,13 +26,6 @@ object AttributeMap {
   def apply[A](kvs: Seq[(Attribute, A)]): AttributeMap[A] = {
 new AttributeMap(kvs.map(kv => (kv._1.exprId, kv)).toMap)
   }
-
-  /** Given a schema, constructs an [[AttributeMap]] from [[Attribute]] to 
ordinal */
-  def byIndex(schema: Seq[Attribute]): AttributeMap[Int] = 
apply(schema.zipWithIndex)
-
-  /** Given a schema, constructs a map from ordinal to Attribute. */
-  def toIndex(schema: Seq[Attribute]): Map[Int, Attribute] =
-schema.zipWithIndex.map { case (a, i) => i -> a }.toMap
 }
 
 class AttributeMap[A](baseMap: Map[ExprId, (Attribute, A)])

http://git-wip-us.apache.org/repos/asf/spark/blob/5363783a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/BoundAttribute.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/BoundAttribute.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/BoundAttribute.scala
index a38f1ec..7d16118 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/BoundAttribute.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/BoundAttribute.scala
@@ -82,16 +82,16 @@ object BindReferences extends Logging {
 
   def bindReference[A <: Expression](
   expression: A,
-  input: Seq[Attribute],
+  input: AttributeSeq,
   allowFailures: Boolean = false): A = {
 expression.transform { case a: AttributeReference =>
   attachTree(a, "Binding attribute") {
-val ordinal = input.indexWhere(_.exprId == a.exprId)
+val ordinal = input.indexOf(a.exprId)
 if (ordinal == -1) {
   if (allowFailures) {
 a
   } else {
-sys.error(s"Couldn't find $a in ${input.mkString("[", ",", "]")}")
+sys.error(s"Couldn't find $a in ${input.attrs.mkString("[", ",", 
"]")}")
   }
 } else {
   BoundReference(ordinal, a.dataType, input(ordinal).nullable)

http://git-wip-us.apache.org/repos/asf/spark/blob/5363783a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala
--
diff --git 
a/sql/catalyst/s

spark git commit: [SPARK-15764][SQL] Replace N^2 loop in BindReferences

2016-06-06 Thread joshrosen
Repository: spark
Updated Branches:
  refs/heads/master 4c74ee8d8 -> 0b8d69499


[SPARK-15764][SQL] Replace N^2 loop in BindReferences

BindReferences contains a n^2 loop which causes performance issues when 
operating over large schemas: to determine the ordinal of an attribute 
reference, we perform a linear scan over the `input` array. Because input can 
sometimes be a `List`, the call to `input(ordinal).nullable` can also be O(n).

Instead of performing a linear scan, we can convert the input into an array and 
build a hash map to map from expression ids to ordinals. The greater up-front 
cost of the map construction is offset by the fact that an expression can 
contain multiple attribute references, so the cost of the map construction is 
amortized across a number of lookups.

Perf. benchmarks to follow. /cc ericl

Author: Josh Rosen 

Closes #13505 from JoshRosen/bind-references-improvement.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0b8d6949
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0b8d6949
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0b8d6949

Branch: refs/heads/master
Commit: 0b8d694999b43ada4833388cad6c285c7757cbf7
Parents: 4c74ee8
Author: Josh Rosen 
Authored: Mon Jun 6 11:44:51 2016 -0700
Committer: Josh Rosen 
Committed: Mon Jun 6 11:44:51 2016 -0700

--
 .../sql/catalyst/expressions/AttributeMap.scala |  7 
 .../catalyst/expressions/BoundAttribute.scala   |  6 ++--
 .../sql/catalyst/expressions/package.scala  | 34 +++-
 .../spark/sql/catalyst/plans/QueryPlan.scala|  2 +-
 .../execution/aggregate/HashAggregateExec.scala |  2 +-
 .../columnar/InMemoryTableScanExec.scala|  4 +--
 6 files changed, 40 insertions(+), 15 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/0b8d6949/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/AttributeMap.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/AttributeMap.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/AttributeMap.scala
index ef3cc55..96a11e3 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/AttributeMap.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/AttributeMap.scala
@@ -26,13 +26,6 @@ object AttributeMap {
   def apply[A](kvs: Seq[(Attribute, A)]): AttributeMap[A] = {
 new AttributeMap(kvs.map(kv => (kv._1.exprId, kv)).toMap)
   }
-
-  /** Given a schema, constructs an [[AttributeMap]] from [[Attribute]] to 
ordinal */
-  def byIndex(schema: Seq[Attribute]): AttributeMap[Int] = 
apply(schema.zipWithIndex)
-
-  /** Given a schema, constructs a map from ordinal to Attribute. */
-  def toIndex(schema: Seq[Attribute]): Map[Int, Attribute] =
-schema.zipWithIndex.map { case (a, i) => i -> a }.toMap
 }
 
 class AttributeMap[A](baseMap: Map[ExprId, (Attribute, A)])

http://git-wip-us.apache.org/repos/asf/spark/blob/0b8d6949/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/BoundAttribute.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/BoundAttribute.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/BoundAttribute.scala
index a38f1ec..7d16118 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/BoundAttribute.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/BoundAttribute.scala
@@ -82,16 +82,16 @@ object BindReferences extends Logging {
 
   def bindReference[A <: Expression](
   expression: A,
-  input: Seq[Attribute],
+  input: AttributeSeq,
   allowFailures: Boolean = false): A = {
 expression.transform { case a: AttributeReference =>
   attachTree(a, "Binding attribute") {
-val ordinal = input.indexWhere(_.exprId == a.exprId)
+val ordinal = input.indexOf(a.exprId)
 if (ordinal == -1) {
   if (allowFailures) {
 a
   } else {
-sys.error(s"Couldn't find $a in ${input.mkString("[", ",", "]")}")
+sys.error(s"Couldn't find $a in ${input.attrs.mkString("[", ",", 
"]")}")
   }
 } else {
   BoundReference(ordinal, a.dataType, input(ordinal).nullable)

http://git-wip-us.apache.org/repos/asf/spark/blob/0b8d6949/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala
 
b/sql/catalyst/src/main/scala/org