Github user jiangxb1987 commented on the issue:

    https://github.com/apache/spark/pull/21698
  
    > I see some discussion about making shuffles deterministic, but it proved 
to be very difficult. Is there a prior discussion on this you can point me to? 
Is it that even if you used fetch-to-disk and had the shuffle-fetch side read 
the map-output in a set order, you'd still have random variations in spills?
    
    Related discussion can be found https://github.com/apache/spark/pull/20414 
. Also, let me list some of the scenarios that might generate non-deterministic 
row ordering below:
    **Random Shuffle Blocks Fetch**
    We randomize the ordering of block fetches on purpose, for avoiding all the 
nodes fetching from the same node at the same time. That means, we send out 
multiple concurrent pull requests, and the fetched blocks are processed in 
FIFO. Therefore, the row ordering of shuffle output are non-deterministic.
    **Shuffle Merge With Spills**
    The shuffle using Aggregator (for instance, combineByKey) uses 
ExternalAppendOnlyMap to combine the values. The ExternalAppendOnlyMap claims 
that it keeps the row orders, but it actually uses the hash to compare the 
elements (i.e., HashComparator). Even though the sort algorithm is stable, the 
map sizes can be different when the spilling happens. The requests for 
additional memory might be in different orders. The spilling could be non 
deterministic and thus the resulting order can still be non-deterministic.
    **Read From External Data Source**
    Some external data sources might generate different row ordering of outputs 
on different read request.
    
    > since we only need to do this sort on RDDs post shuffle
    
    IIUC this is not the case in RDD.repartition(), see 
https://github.com/apache/spark/blob/94c67a76ec1fda908a671a47a2a1fa63b3ab1b06/core/src/main/scala/org/apache/spark/rdd/RDD.scala#L453~L461
 , it requires the input rows are ordered then perform a round-robin style data 
transformation, so I don't see what we can do if the input data type is not 
sortable.
    
    > on a fetch-failure in repartition, fail the entire job
    
    Currently I can't figure out a case that a customer may vote for this 
behavior change, esp. FetchFailure tends to occur more often on  long-running 
jobs on big datasets compared to interactive queries.
    
    > We could add logic to detect whether even an order-dependent operation 
was safe to retry -- eg. repartition just after reading from hdfs or after a 
checkpoint can be done as it is now. Each stage would need to know this based 
on extra properties of all the RDDs it was defined from.
    
    This is something I'm also trying to figure out, that we shall enable users 
to tell Spark that an RDD will generate deterministic output, so you don't have 
to worry about data correctness issue over these RDDs. Please also note that 
actually checkpoint can not guarantee that you shall always get the same output 
on each read operation, because you may have executorLost, and then you have to 
recompute the partitions thus may fetch different data.
    
    > Honestly I consider this bug so serious I'd consider loudly warning from 
every api which suffers from this if we can't fix -- make them deprecated and 
log a warning.
    
    We shall definitely update the comments, but shall we make the apis 
deprecated? I can't say I agree or disagree on this. I'm still trying to extend 
the current approach to allow data correctness, and the code changes shall be 
well flagged off. Maybe we can revisit the deprecated apis proposal after that?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to