Github user rdblue commented on a diff in the pull request:

    https://github.com/apache/spark/pull/23086#discussion_r236856960
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala
 ---
    @@ -23,29 +23,28 @@ import org.apache.spark.sql.catalyst.expressions._
     import org.apache.spark.sql.catalyst.plans.physical
     import org.apache.spark.sql.catalyst.plans.physical.SinglePartition
     import org.apache.spark.sql.execution.{ColumnarBatchScan, LeafExecNode, 
WholeStageCodegenExec}
    -import org.apache.spark.sql.execution.streaming.continuous._
     import org.apache.spark.sql.sources.v2.DataSourceV2
     import org.apache.spark.sql.sources.v2.reader._
    -import 
org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousPartitionReaderFactory,
 ContinuousReadSupport, MicroBatchReadSupport}
     
     /**
    - * Physical plan node for scanning data from a data source.
    + * Physical plan node for scanning a batch of data from a data source.
      */
     case class DataSourceV2ScanExec(
         output: Seq[AttributeReference],
         @transient source: DataSourceV2,
         @transient options: Map[String, String],
         @transient pushedFilters: Seq[Expression],
    -    @transient readSupport: ReadSupport,
    -    @transient scanConfig: ScanConfig)
    +    @transient scan: Scan,
    +    @transient partitions: Array[InputPartition],
    +    @transient readerFactory: PartitionReaderFactory)
       extends LeafExecNode with DataSourceV2StringFormat with 
ColumnarBatchScan {
     
       override def simpleString: String = "ScanV2 " + metadataString
     
       // TODO: unify the equal/hashCode implementation for all data source v2 
query plans.
       override def equals(other: Any): Boolean = other match {
    -    case other: DataSourceV2ScanExec =>
    -      output == other.output && readSupport.getClass == 
other.readSupport.getClass &&
    +    case other: DataSourceV2StreamingScanExec =>
    +      output == other.output && source.getClass == other.source.getClass &&
    --- End diff --
    
    Should this implement identity instead of equality? When would two ScanExec 
nodes be equal instead of identical?
    
    Also, I don't think that this equals implementation is correct. First, it 
should not check for the streaming class. Second, it should check whether the 
scan is equal, not whether the options and the source are the same (plus, 
source will be removed).
    
    Unfortunately, implementing true equality (not just identity) must in some 
way rely on a user-supplied class. A scan is the same if it will produce the 
same set of rows and columns in those rows. That means equality depends on the 
filter, projection, and source data (i.e. table). We can use `pushedFilters` 
and `output` for the filter and projection. But checking that the source data 
is the same requires using either the scan's `equals` method (which would also 
satisfy the filter and projection checks) or checking that the partitions are 
the same. Both `Scan` and `InputPartition` implementations are provided by 
sources, so their `equals` methods may not be implemented.
    
    Because this must depend on checking equality of user-supplied objects, I 
think it would be much easier to make this depend only on equality of the 
`Scan`:
    
    ```
      override def equals(other: Any): Boolean = other match {
        case scanExec: DataSourceV2ScanExec => scanExec.scan == this.scan
      }
    ```
    
    That may fall back to identity if the user hasn't supplied an equals 
method, but I don't see a way to avoid it.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to