Github user jiangxb1987 commented on the issue: https://github.com/apache/spark/pull/21698 > I see some discussion about making shuffles deterministic, but it proved to be very difficult. Is there a prior discussion on this you can point me to? Is it that even if you used fetch-to-disk and had the shuffle-fetch side read the map-output in a set order, you'd still have random variations in spills? Related discussion can be found https://github.com/apache/spark/pull/20414 . Also, let me list some of the scenarios that might generate non-deterministic row ordering below: **Random Shuffle Blocks Fetch** We randomize the ordering of block fetches on purpose, for avoiding all the nodes fetching from the same node at the same time. That means, we send out multiple concurrent pull requests, and the fetched blocks are processed in FIFO. Therefore, the row ordering of shuffle output are non-deterministic. **Shuffle Merge With Spills** The shuffle using Aggregator (for instance, combineByKey) uses ExternalAppendOnlyMap to combine the values. The ExternalAppendOnlyMap claims that it keeps the row orders, but it actually uses the hash to compare the elements (i.e., HashComparator). Even though the sort algorithm is stable, the map sizes can be different when the spilling happens. The requests for additional memory might be in different orders. The spilling could be non deterministic and thus the resulting order can still be non-deterministic. **Read From External Data Source** Some external data sources might generate different row ordering of outputs on different read request. > since we only need to do this sort on RDDs post shuffle IIUC this is not the case in RDD.repartition(), see https://github.com/apache/spark/blob/94c67a76ec1fda908a671a47a2a1fa63b3ab1b06/core/src/main/scala/org/apache/spark/rdd/RDD.scala#L453~L461 , it requires the input rows are ordered then perform a round-robin style data transformation, so I don't see what we can do if the input data type is not sortable. > on a fetch-failure in repartition, fail the entire job Currently I can't figure out a case that a customer may vote for this behavior change, esp. FetchFailure tends to occur more often on long-running jobs on big datasets compared to interactive queries. > We could add logic to detect whether even an order-dependent operation was safe to retry -- eg. repartition just after reading from hdfs or after a checkpoint can be done as it is now. Each stage would need to know this based on extra properties of all the RDDs it was defined from. This is something I'm also trying to figure out, that we shall enable users to tell Spark that an RDD will generate deterministic output, so you don't have to worry about data correctness issue over these RDDs. Please also note that actually checkpoint can not guarantee that you shall always get the same output on each read operation, because you may have executorLost, and then you have to recompute the partitions thus may fetch different data. > Honestly I consider this bug so serious I'd consider loudly warning from every api which suffers from this if we can't fix -- make them deprecated and log a warning. We shall definitely update the comments, but shall we make the apis deprecated? I can't say I agree or disagree on this. I'm still trying to extend the current approach to allow data correctness, and the code changes shall be well flagged off. Maybe we can revisit the deprecated apis proposal after that?
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org