Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20448#discussion_r164952844 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2QueryPlan.scala --- @@ -19,50 +19,31 @@ package org.apache.spark.sql.execution.datasources.v2 import java.util.Objects -import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} -import org.apache.spark.sql.sources.v2.reader._ +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet, Expression} +import org.apache.spark.sql.sources.v2.DataSourceV2 /** - * A base class for data source reader holder with customized equals/hashCode methods. + * A base class for data source v2 related query plan. It defines the equals/hashCode methods + * according to some common information. */ -trait DataSourceReaderHolder { +trait DataSourceV2QueryPlan { - /** - * The full output of the data source reader, without column pruning. - */ - def fullOutput: Seq[AttributeReference] + def output: Seq[Attribute] + def sourceClass: Class[_ <: DataSourceV2] + def filters: Set[Expression] - /** - * The held data source reader. - */ - def reader: DataSourceReader - - /** - * The metadata of this data source reader that can be used for equality test. - */ - private def metadata: Seq[Any] = { - val filters: Any = reader match { - case s: SupportsPushDownCatalystFilters => s.pushedCatalystFilters().toSet - case s: SupportsPushDownFilters => s.pushedFilters().toSet - case _ => Nil - } - Seq(fullOutput, reader.getClass, reader.readSchema(), filters) - } + // The metadata of this data source relation that can be used for equality test. + private def metadata: Seq[Any] = Seq(output, sourceClass, filters) def canEqual(other: Any): Boolean override def equals(other: Any): Boolean = other match { - case other: DataSourceReaderHolder => - canEqual(other) && metadata.length == other.metadata.length && - metadata.zip(other.metadata).forall { case (l, r) => l == r } + case other: DataSourceV2QueryPlan => + canEqual(other) && metadata == other.metadata case _ => false } override def hashCode(): Int = { metadata.map(Objects.hashCode).foldLeft(0)((a, b) => 31 * a + b) } - - lazy val output: Seq[Attribute] = reader.readSchema().map(_.name).map { name => --- End diff -- We don't need to do this anymore. Now the plan is immutable, we have to create a new plan when applying push down optimizations, and we can also update `output` at that time.
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org