Github user sadikovi commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19810#discussion_r154848470
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala
 ---
    @@ -193,38 +195,68 @@ case class InMemoryTableScanExec(
     
       private val inMemoryPartitionPruningEnabled = 
sqlContext.conf.inMemoryPartitionPruning
     
    +  private def doFilterCachedBatches(
    +      rdd: RDD[CachedBatch],
    +      partitionStatsSchema: Seq[AttributeReference]): RDD[CachedBatch] = {
    +    val schemaIndex = partitionStatsSchema.zipWithIndex
    +    rdd.mapPartitionsWithIndex {
    +      case (partitionIndex, cachedBatches) =>
    +        if (inMemoryPartitionPruningEnabled) {
    +          cachedBatches.filter { cachedBatch =>
    +            val partitionFilter = newPredicate(
    +              partitionFilters.reduceOption(And).getOrElse(Literal(true)),
    +              partitionStatsSchema)
    +            partitionFilter.initialize(partitionIndex)
    +            if (!partitionFilter.eval(cachedBatch.stats)) {
    --- End diff --
    
    All good, no need to change. I was trying to understand the code, so my 
question would be referring to statistics collection overall, not changes in 
this PR. Link points to a condition (exists before this PR) that could 
potentially result in exiting iterator before exhausting all records in it, so 
statistics would be partially collected, which might affect any filtering that 
uses such statistics - though it is quite possibly handled later, or a 
theoretical use case.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to