GitHub user cloud-fan opened a pull request:

    https://github.com/apache/spark/pull/20647

    [SPARK-23303][SQL] improve the explain result for data source v2 relations

    ## What changes were proposed in this pull request?
    
    The current explain result for data source v2 relation is unreadable:
    ```
    == Parsed Logical Plan ==
    'Filter ('i > 6)
    +- AnalysisBarrier
          +- Project [j#1]
             +- DataSourceV2Relation [i#0, j#1], 
org.apache.spark.sql.sources.v2.AdvancedDataSourceV2$Reader@3b415940
    
    == Analyzed Logical Plan ==
    j: int
    Project [j#1]
    +- Filter (i#0 > 6)
       +- Project [j#1, i#0]
          +- DataSourceV2Relation [i#0, j#1], 
org.apache.spark.sql.sources.v2.AdvancedDataSourceV2$Reader@3b415940
    
    == Optimized Logical Plan ==
    Project [j#1]
    +- Filter isnotnull(i#0)
       +- DataSourceV2Relation [i#0, j#1], 
org.apache.spark.sql.sources.v2.AdvancedDataSourceV2$Reader@3b415940
    
    == Physical Plan ==
    *(1) Project [j#1]
    +- *(1) Filter isnotnull(i#0)
       +- *(1) DataSourceV2Scan [i#0, j#1], 
org.apache.spark.sql.sources.v2.AdvancedDataSourceV2$Reader@3b415940
    ```
    
    after this PR
    ```
    == Parsed Logical Plan ==
    'Project [unresolvedalias('j, None)]
    +- AnalysisBarrier
          +- Relation AdvancedDataSourceV2[i#0, j#1]
    
    == Analyzed Logical Plan ==
    j: int
    Project [j#1]
    +- Relation AdvancedDataSourceV2[i#0, j#1]
    
    == Optimized Logical Plan ==
    Relation AdvancedDataSourceV2[j#1]
    
    == Physical Plan ==
    *(1) Scan AdvancedDataSourceV2[j#1]
    ```
    -------
    ```
    == Analyzed Logical Plan ==
    i: int, j: int
    Filter (i#88 > 3)
    +- Relation JavaAdvancedDataSourceV2[i#88, j#89]
    
    == Optimized Logical Plan ==
    Filter isnotnull(i#88)
    +- Relation JavaAdvancedDataSourceV2[i#88, j#89] (PushedFilter: 
[GreaterThan(i,3)])
    
    == Physical Plan ==
    *(1) Filter isnotnull(i#88)
    +- *(1) Scan JavaAdvancedDataSourceV2[i#88, j#89] (PushedFilter: 
[GreaterThan(i,3)])
    ```
    
    an example for streaming query
    ```
    == Parsed Logical Plan ==
    Aggregate [value#6], [value#6, count(1) AS count(1)#11L]
    +- SerializeFromObject [staticinvoke(class 
org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, 
java.lang.String, true], true, false) AS value#6]
       +- MapElements <function1>, class java.lang.String, 
[StructField(value,StringType,true)], obj#5: java.lang.String
          +- DeserializeToObject cast(value#25 as string).toString, obj#4: 
java.lang.String
             +- Streaming Relation FakeDataSourceV2$[value#25]
    
    == Analyzed Logical Plan ==
    value: string, count(1): bigint
    Aggregate [value#6], [value#6, count(1) AS count(1)#11L]
    +- SerializeFromObject [staticinvoke(class 
org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, 
java.lang.String, true], true, false) AS value#6]
       +- MapElements <function1>, class java.lang.String, 
[StructField(value,StringType,true)], obj#5: java.lang.String
          +- DeserializeToObject cast(value#25 as string).toString, obj#4: 
java.lang.String
             +- Streaming Relation FakeDataSourceV2$[value#25]
    
    == Optimized Logical Plan ==
    Aggregate [value#6], [value#6, count(1) AS count(1)#11L]
    +- SerializeFromObject [staticinvoke(class 
org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, 
java.lang.String, true], true, false) AS value#6]
       +- MapElements <function1>, class java.lang.String, 
[StructField(value,StringType,true)], obj#5: java.lang.String
          +- DeserializeToObject value#25.toString, obj#4: java.lang.String
             +- Streaming Relation FakeDataSourceV2$[value#25]
    
    == Physical Plan ==
    *(4) HashAggregate(keys=[value#6], functions=[count(1)], output=[value#6, 
count(1)#11L])
    +- StateStoreSave [value#6], state info [ checkpoint = 
*********(redacted)/cloud/dev/spark/target/tmp/temporary-549f264b-2531-4fcb-a52f-433c77347c12/state,
 runId = f84d9da9-2f8c-45c1-9ea1-70791be684de, opId = 0, ver = 0, numPartitions 
= 5], Complete, 0
       +- *(3) HashAggregate(keys=[value#6], functions=[merge_count(1)], 
output=[value#6, count#16L])
          +- StateStoreRestore [value#6], state info [ checkpoint = 
*********(redacted)/cloud/dev/spark/target/tmp/temporary-549f264b-2531-4fcb-a52f-433c77347c12/state,
 runId = f84d9da9-2f8c-45c1-9ea1-70791be684de, opId = 0, ver = 0, numPartitions 
= 5]
             +- *(2) HashAggregate(keys=[value#6], functions=[merge_count(1)], 
output=[value#6, count#16L])
                +- Exchange hashpartitioning(value#6, 5)
                   +- *(1) HashAggregate(keys=[value#6], 
functions=[partial_count(1)], output=[value#6, count#16L])
                      +- *(1) SerializeFromObject [staticinvoke(class 
org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, 
java.lang.String, true], true, false) AS value#6]
                         +- *(1) MapElements <function1>, obj#5: 
java.lang.String
                            +- *(1) DeserializeToObject value#25.toString, 
obj#4: java.lang.String
                               +- *(1) Scan FakeDataSourceV2$[value#25]
    ```
    ## How was this patch tested?
    
    N/A

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/cloud-fan/spark explain

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/20647.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #20647
    
----
commit 138679ee8b713c20ac89d44a2e8e5d877772c69a
Author: Wenchen Fan <wenchen@...>
Date:   2018-02-13T05:12:22Z

    improve data source v2 explain

----


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to