GitHub user henryr opened a pull request:

    https://github.com/apache/spark/pull/22211

    [SPARK-23207][SPARK-22905][SPARK-24564][SPARK-25114][SQL][BACKPORT-2.…

    ## What changes were proposed in this pull request?
    
        Back port of #20393 and #22079.
    
        Currently shuffle repartition uses RoundRobinPartitioning, the 
generated result is nondeterministic since the sequence of input rows are not 
determined.
    
        The bug can be triggered when there is a repartition call following a 
shuffle (which would lead to non-deterministic row ordering), as the pattern 
shows below:
        upstream stage -> repartition stage -> result stage
        (-> indicate a shuffle)
        When one of the executors process goes down, some tasks on the 
repartition stage will be retried and generate inconsistent ordering, and some 
tasks of the result stage will be retried generating different data.
    
        The following code returns 931532, instead of 1000000:
        ```
        import scala.sys.process._
    
        import org.apache.spark.TaskContext
        val res = spark.range(0, 1000 * 1000, 1).repartition(200).map { x =>
          x
        }.repartition(200).map { x =>
          if (TaskContext.get.attemptNumber == 0 && TaskContext.get.partitionId 
< 2) {
            throw new Exception("pkill -f java".!!)
          }
          x
        }
        res.distinct().count()
        ```
    
        In this PR, we propose a most straight-forward way to fix this problem 
by performing a local sort before partitioning, after we make the input row 
ordering deterministic, the function from rows to partitions is fully 
deterministic too.
    
        The downside of the approach is that with extra local sort inserted, 
the performance of repartition() will go down, so we add a new config named 
`spark.sql.execution.sortBeforeRepartition` to control whether this patch is 
applied. The patch is default enabled to be safe-by-default, but user may 
choose to manually turn it off to avoid performance regression.
    
        This patch also changes the output rows ordering of repartition(), that 
leads to a bunch of test cases failure because they are comparing the results 
directly.
    
        Add unit test in ExchangeSuite.
    
        With this patch(and `spark.sql.execution.sortBeforeRepartition` set to 
true), the following query returns 1000000:
        ```
        import scala.sys.process._
    
        import org.apache.spark.TaskContext
    
        spark.conf.set("spark.sql.execution.sortBeforeRepartition", "true")
    
        val res = spark.range(0, 1000 * 1000, 1).repartition(200).map { x =>
          x
        }.repartition(200).map { x =>
          if (TaskContext.get.attemptNumber == 0 && TaskContext.get.partitionId 
< 2) {
            throw new Exception("pkill -f java".!!)
          }
          x
        }
        res.distinct().count()
    
        res7: Long = 1000000
        ```
    
        Author: Xingbo Jiang <xingbo.jiangdatabricks.com>


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/henryr/spark spark-23207-branch-2.1

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/22211.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #22211
    
----
commit 5bdd9e353287c2a326138b25f80ee255d15942b0
Author: Xingbo Jiang <xingbo.jiang@...>
Date:   2018-08-23T21:22:56Z

    [SPARK-23207][SPARK-22905][SPARK-24564][SPARK-25114][SQL][BACKPORT-2.1] 
Shuffle+Repartition on a DataFrame could lead to incorrect answers
    
    ## What changes were proposed in this pull request?
    
    Back port of #20393 and #22079.
    
    Currently shuffle repartition uses RoundRobinPartitioning, the generated 
result is nondeterministic since the sequence of input rows are not determined.
    
    The bug can be triggered when there is a repartition call following a 
shuffle (which would lead to non-deterministic row ordering), as the pattern 
shows below:
    upstream stage -> repartition stage -> result stage
    (-> indicate a shuffle)
    When one of the executors process goes down, some tasks on the repartition 
stage will be retried and generate inconsistent ordering, and some tasks of the 
result stage will be retried generating different data.
    
    The following code returns 931532, instead of 1000000:
    ```
    import scala.sys.process._
    
    import org.apache.spark.TaskContext
    val res = spark.range(0, 1000 * 1000, 1).repartition(200).map { x =>
      x
    }.repartition(200).map { x =>
      if (TaskContext.get.attemptNumber == 0 && TaskContext.get.partitionId < 
2) {
        throw new Exception("pkill -f java".!!)
      }
      x
    }
    res.distinct().count()
    ```
    
    In this PR, we propose a most straight-forward way to fix this problem by 
performing a local sort before partitioning, after we make the input row 
ordering deterministic, the function from rows to partitions is fully 
deterministic too.
    
    The downside of the approach is that with extra local sort inserted, the 
performance of repartition() will go down, so we add a new config named 
`spark.sql.execution.sortBeforeRepartition` to control whether this patch is 
applied. The patch is default enabled to be safe-by-default, but user may 
choose to manually turn it off to avoid performance regression.
    
    This patch also changes the output rows ordering of repartition(), that 
leads to a bunch of test cases failure because they are comparing the results 
directly.
    
    Add unit test in ExchangeSuite.
    
    With this patch(and `spark.sql.execution.sortBeforeRepartition` set to 
true), the following query returns 1000000:
    ```
    import scala.sys.process._
    
    import org.apache.spark.TaskContext
    
    spark.conf.set("spark.sql.execution.sortBeforeRepartition", "true")
    
    val res = spark.range(0, 1000 * 1000, 1).repartition(200).map { x =>
      x
    }.repartition(200).map { x =>
      if (TaskContext.get.attemptNumber == 0 && TaskContext.get.partitionId < 
2) {
        throw new Exception("pkill -f java".!!)
      }
      x
    }
    res.distinct().count()
    
    res7: Long = 1000000
    ```
    
    Author: Xingbo Jiang <xingbo.jiangdatabricks.com>
    
    ## How was this patch tested?
    
    Ran all SBT unit tests for org.apache.spark.sql.*.
    
    Ran pyspark tests for module pyspark-sql.
    
    Closes #22079 from bersprockets/SPARK-23207.
    
    Lead-authored-by: Xingbo Jiang <xingbo.ji...@databricks.com>
    Co-authored-by: Bruce Robbins <bersprock...@gmail.com>
    Co-authored-by: Zheng RuiFeng <ruife...@foxmail.com>
    Signed-off-by: Xiao Li <gatorsm...@gmail.com>

----


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to