Github user gatorsmile commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10942#discussion_r51052123
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
 ---
    @@ -251,6 +262,69 @@ private[sql] object DataSourceStrategy extends 
Strategy with Logging {
         }
       }
     
    +  // Get the bucket ID based on the bucketing values.
    +  // Restriction: Bucket pruning works iff the bucketing column has one 
and only one column.
    +  private def getBucketId(bucketColumn: Attribute, numBuckets: Int, value: 
Any): Int = {
    +    val mutableRow = new SpecificMutableRow(Seq(bucketColumn.dataType))
    +    mutableRow(0) = Cast(Literal(value), bucketColumn.dataType).eval(null)
    +    val bucketIdGeneration = UnsafeProjection.create(
    +      HashPartitioning(bucketColumn :: Nil, 
numBuckets).partitionIdExpression :: Nil,
    +      bucketColumn :: Nil)
    +
    +    bucketIdGeneration(mutableRow).getInt(0)
    +  }
    +
    +  // Get the bucket BitSet by reading the filters that only contains 
bucketing keys.
    +  // Note: When the returned BitSet is None, no pruning is possible.
    +  // Restriction: Bucket pruning works iff the bucketing column has one 
and only one column.
    +  private def getBuckets(
    +      filters: Seq[Expression],
    +      bucketSpec: Option[BucketSpec]): Option[BitSet] = {
    +
    +    if (bucketSpec.isEmpty ||
    +      bucketSpec.get.numBuckets == 1 ||
    +      bucketSpec.get.bucketColumnNames.length != 1) {
    +      // None means all the buckets need to be scanned
    +      return None
    +    }
    +
    +    // Just get the first because bucketing pruning only works when the 
column has one column
    +    val bucketColumnName = bucketSpec.get.bucketColumnNames.head
    +    val numBuckets = bucketSpec.get.numBuckets
    +    val matchedBuckets = new BitSet(numBuckets)
    +    matchedBuckets.clear()
    +
    +    filters.foreach {
    +      case expressions.EqualTo(a: Attribute, Literal(v, _)) if a.name == 
bucketColumnName =>
    +        matchedBuckets.set(getBucketId(a, numBuckets, v))
    +      case expressions.EqualTo(Literal(v, _), a: Attribute) if a.name == 
bucketColumnName =>
    +        matchedBuckets.set(getBucketId(a, numBuckets, v))
    +      case expressions.EqualNullSafe(a: Attribute, Literal(v, _)) if 
a.name == bucketColumnName =>
    +        matchedBuckets.set(getBucketId(a, numBuckets, v))
    +      case expressions.EqualNullSafe(Literal(v, _), a: Attribute) if 
a.name == bucketColumnName =>
    +        matchedBuckets.set(getBucketId(a, numBuckets, v))
    +      case expressions.InSet(a: Attribute, set) =>
    +        set.foreach(e => matchedBuckets.set(getBucketId(a, numBuckets, e)))
    +      // Because we only convert In to InSet in Optimizer when there are 
more than certain
    +      // items. So it is possible we still get an In expression here that 
needs to be pushed
    +      // down.
    +      case expressions.In(a: Attribute, list) if 
!list.exists(!_.isInstanceOf[Literal]) =>
    --- End diff --
    
    Thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to