GitHub user maropu opened a pull request:

    https://github.com/apache/spark/pull/18576

    [SPARK-21351][SQL] Update nullability based on children's output

    ## What changes were proposed in this pull request?
    This pr added a new optimizer rule `UpdateNullabilityInAttributeReferences 
` to update the nullability that `Filter` changes when having `IsNotNull`. In 
the master, optimized plans do not respect the nullability when `Filter` has 
`IsNotNull`. This wrongly generates unnecessary code. For example:
    
    ```
    scala> val df = Seq((Some(1), Some(2))).toDF("a", "b")
    scala> val bIsNotNull = df.where($"b" =!= 2).select($"b")
    scala> val targetQuery = bIsNotNull.distinct
    scala> val targetQuery.queryExecution.optimizedPlan.output(0).nullable
    res5: Boolean = true
    
    scala> targetQuery.debugCodegen
    Found 2 WholeStageCodegen subtrees.
    == Subtree 1 / 2 ==
    *HashAggregate(keys=[b#19], functions=[], output=[b#19])
    +- Exchange hashpartitioning(b#19, 200)
       +- *HashAggregate(keys=[b#19], functions=[], output=[b#19])
          +- *Project [_2#16 AS b#19]
             +- *Filter isnotnull(_2#16)
                +- LocalTableScan [_1#15, _2#16]
    
    Generated code:
    ...
    /* 124 */   protected void processNext() throws java.io.IOException {
    ...
    /* 132 */     // output the result
    /* 133 */
    /* 134 */     while (agg_mapIter.next()) {
    /* 135 */       wholestagecodegen_numOutputRows.add(1);
    /* 136 */       UnsafeRow agg_aggKey = (UnsafeRow) agg_mapIter.getKey();
    /* 137 */       UnsafeRow agg_aggBuffer = (UnsafeRow) 
agg_mapIter.getValue();
    /* 138 */
    /* 139 */       boolean agg_isNull4 = agg_aggKey.isNullAt(0);
    /* 140 */       int agg_value4 = agg_isNull4 ? -1 : (agg_aggKey.getInt(0));
    /* 141 */       agg_rowWriter1.zeroOutNullBytes();
    /* 142 */
                    // We don't need this NULL check because NULL is filtered 
out in `$"b" =!=2`
    /* 143 */       if (agg_isNull4) {
    /* 144 */         agg_rowWriter1.setNullAt(0);
    /* 145 */       } else {
    /* 146 */         agg_rowWriter1.write(0, agg_value4);
    /* 147 */       }
    /* 148 */       append(agg_result1);
    /* 149 */
    /* 150 */       if (shouldStop()) return;
    /* 151 */     }
    /* 152 */
    /* 153 */     agg_mapIter.close();
    /* 154 */     if (agg_sorter == null) {
    /* 155 */       agg_hashMap.free();
    /* 156 */     }
    /* 157 */   }
    /* 158 */
    /* 159 */ }
    ```
    
    In the line 143, we don't need this NULL check because NULL is filtered out 
in `$"b" =!=2`.
    This pr could remove this NULL check;
    
    ```
    scala> val targetQuery.queryExecution.optimizedPlan.output(0).nullable
    res5: Boolean = false
    
    scala> targetQuery.debugCodegen
    ...
    Generated code:
    ...
    /* 144 */   protected void processNext() throws java.io.IOException {
    ...
    /* 152 */     // output the result
    /* 153 */
    /* 154 */     while (agg_mapIter.next()) {
    /* 155 */       wholestagecodegen_numOutputRows.add(1);
    /* 156 */       UnsafeRow agg_aggKey = (UnsafeRow) agg_mapIter.getKey();
    /* 157 */       UnsafeRow agg_aggBuffer = (UnsafeRow) 
agg_mapIter.getValue();
    /* 158 */
    /* 159 */       int agg_value4 = agg_aggKey.getInt(0);
    /* 160 */       agg_rowWriter1.write(0, agg_value4);
    /* 161 */       append(agg_result1);
    /* 162 */
    /* 163 */       if (shouldStop()) return;
    /* 164 */     }
    /* 165 */
    /* 166 */     agg_mapIter.close();
    /* 167 */     if (agg_sorter == null) {
    /* 168 */       agg_hashMap.free();
    /* 169 */     }
    /* 170 */   }
    ```
    
    ## How was this patch tested?
    Added `UpdateNullabilityInAttributeReferencesSuite` for unit tests.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/maropu/spark SPARK-21351

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/18576.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #18576
    
----
commit fe5ecff62c0494b8c32c24b83a488bddbbbc6d89
Author: Takeshi Yamamuro <yamam...@apache.org>
Date:   2017-06-29T08:50:48Z

    Update nullability based on children's output

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to