GitHub user brkyvz opened a pull request:

    https://github.com/apache/spark/pull/19196

    [SPARK-21977] SinglePartition optimizations break certain Streaming 
Stateful Aggregation requirements

    ## What changes were proposed in this pull request?
    
    This is a bit hard to explain as there are several issues here, I'll try my 
best. Here are the requirements:
    1. A StructuredStreaming Source that can generate empty RDDs with 0 
partitions
    2. A StructuredStreaming query that uses the above source, performs a 
stateful aggregation (mapGroupsWithState, groupBy.count, ...), and coalesce's 
by 1
    The crux of the problem is that when a dataset has a `coalesce(1)` call, it 
receives a `SinglePartition` partitioning scheme. This scheme satisfies most 
required distributions used for aggregations such as HashAggregateExec. This 
causes a world of problems:
    Symptom 1. If the input RDD has 0 partitions, the whole lineage will 
receive 0 partitions, nothing will be executed, the state store will not create 
any delta files. When this happens, the next trigger fails, because the 
StateStore fails to load the delta file for the previous trigger
    Symptom 2. Let's say that there was data. Then in this case, if you stop 
your stream, and change `coalesce(1)` with `coalesce(2)`, then restart your 
stream, your stream will fail, because `spark.sql.shuffle.partitions - 1` 
number of StateStores will fail to find its delta files.
    To fix the issues above, we must check that the partitioning of the child 
of a `StatefulOperator` satisfies:
    If the grouping expressions are empty:
    a) AllTuple distribution
    b) Single physical partition
    If the grouping expressions are non empty:
    a) Clustered distribution
    b) spark.sql.shuffle.partition # of partitions
    whether or not coalesce(1) exists in the plan, and whether or not the input 
RDD for the trigger has any data.
    Once you fix the above problem by adding an Exchange to the plan, you come 
across the following bug:
    If you call `coalesce(1).groupBy().count()` on a Streaming DataFrame, and 
if you have a trigger with no data, `StateStoreRestoreExec` doesn't return the 
prior state. However, for this specific aggregation, `HashAggregateExec` after 
the restore returns a (0, 0) row, since we're performing a count, and there is 
no data. Then this data gets stored in `StateStoreSaveExec` causing the 
previous counts to be overwritten and lost.
    
    ## How was this patch tested?
    
    Regression tests

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/brkyvz/spark sa-0

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/19196.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #19196
    
----
commit b7aeed6af2aaf6eb347dd0a492a62e6530900eb5
Author: Burak Yavuz <brk...@gmail.com>
Date:   2017-09-08T18:36:02Z

    couldn't repro

commit 4a7d1240196cc4660d33aef33d893526da5f0ceb
Author: Burak Yavuz <brk...@gmail.com>
Date:   2017-09-11T17:44:15Z

    save

commit 00fa5923c7663f58df72937626bfadac5dc2f1fd
Author: Burak Yavuz <brk...@gmail.com>
Date:   2017-09-12T04:32:30Z

    ready for review

commit 090044ca089870befff464d37f098c4d4fd19657
Author: Burak Yavuz <brk...@gmail.com>
Date:   2017-09-12T04:33:05Z

    uncomment

----


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to