This is an automated email from the ASF dual-hosted git repository. viirya pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 23aec321bd8 [SPARK-41049][SQL][FOLLOWUP] Move expression initialization code to the base class 23aec321bd8 is described below commit 23aec321bd822867a698ee3bc000017b21753ce8 Author: Wenchen Fan <wenc...@databricks.com> AuthorDate: Tue Jan 3 10:46:44 2023 -0800 [SPARK-41049][SQL][FOLLOWUP] Move expression initialization code to the base class ### What changes were proposed in this pull request? This is a followup of https://github.com/apache/spark/pull/39248 , to add one more code cleanup. The expression initialization code is duplicated 6 times and we should put it in the base class. ### Why are the changes needed? code cleanup ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? existing tests Closes #39364 from cloud-fan/expr. Authored-by: Wenchen Fan <wenc...@databricks.com> Signed-off-by: Liang-Chi Hsieh <vii...@gmail.com> --- .../spark/sql/catalyst/expressions/ExpressionsEvaluator.scala | 7 +++++++ .../sql/catalyst/expressions/InterpretedMutableProjection.scala | 5 +---- .../spark/sql/catalyst/expressions/InterpretedSafeProjection.scala | 5 +---- .../sql/catalyst/expressions/InterpretedUnsafeProjection.scala | 5 +---- .../org/apache/spark/sql/catalyst/expressions/Projection.scala | 5 +---- .../org/apache/spark/sql/catalyst/expressions/predicates.scala | 6 +----- 6 files changed, 12 insertions(+), 21 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExpressionsEvaluator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExpressionsEvaluator.scala index dcbc6926cd3..1fc0144fede 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExpressionsEvaluator.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExpressionsEvaluator.scala @@ -42,4 +42,11 @@ trait ExpressionsEvaluator { * The default implementation does nothing. */ def initialize(partitionIndex: Int): Unit = {} + + protected def initializeExprs(exprs: Seq[Expression], partitionIndex: Int): Unit = { + exprs.foreach(_.foreach { + case n: Nondeterministic => n.initialize(partitionIndex) + case _ => + }) + } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InterpretedMutableProjection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InterpretedMutableProjection.scala index 682604b9bf7..01e9de085da 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InterpretedMutableProjection.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InterpretedMutableProjection.scala @@ -41,10 +41,7 @@ class InterpretedMutableProjection(expressions: Seq[Expression]) extends Mutable private[this] val buffer = new Array[Any](expressions.size) override def initialize(partitionIndex: Int): Unit = { - exprs.foreach(_.foreach { - case n: Nondeterministic => n.initialize(partitionIndex) - case _ => - }) + initializeExprs(exprs, partitionIndex) } private[this] val validExprs = expressions.zipWithIndex.filter { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InterpretedSafeProjection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InterpretedSafeProjection.scala index 84263d97f5d..87539e80b0b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InterpretedSafeProjection.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InterpretedSafeProjection.scala @@ -101,10 +101,7 @@ class InterpretedSafeProjection(expressions: Seq[Expression]) extends Projection } override def initialize(partitionIndex: Int): Unit = { - expressions.foreach(_.foreach { - case n: Nondeterministic => n.initialize(partitionIndex) - case _ => - }) + initializeExprs(exprs, partitionIndex) } override def apply(row: InternalRow): InternalRow = { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InterpretedUnsafeProjection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InterpretedUnsafeProjection.scala index 9108a045c09..90a90444695 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InterpretedUnsafeProjection.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InterpretedUnsafeProjection.scala @@ -67,10 +67,7 @@ class InterpretedUnsafeProjection(expressions: Array[Expression]) extends Unsafe } override def initialize(partitionIndex: Int): Unit = { - exprs.foreach(_.foreach { - case n: Nondeterministic => n.initialize(partitionIndex) - case _ => - }) + initializeExprs(exprs, partitionIndex) } override def apply(row: InternalRow): UnsafeRow = { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala index 20969fa584a..7d993d776d1 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala @@ -41,10 +41,7 @@ class InterpretedProjection(expressions: Seq[Expression]) extends Projection { } override def initialize(partitionIndex: Int): Unit = { - exprArray.foreach(_.foreach { - case n: Nondeterministic => n.initialize(partitionIndex) - case _ => - }) + initializeExprs(exprArray, partitionIndex) } def apply(input: InternalRow): InternalRow = { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala index 4e4ac6ee492..6a58f8d3416 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala @@ -53,11 +53,7 @@ case class InterpretedPredicate(expression: Expression) extends BasePredicate { } override def initialize(partitionIndex: Int): Unit = { - super.initialize(partitionIndex) - expr.foreach { - case n: Nondeterministic => n.initialize(partitionIndex) - case _ => - } + initializeExprs(Seq(expr), partitionIndex) } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org