Github user jiangxb1987 commented on the issue:

    https://github.com/apache/spark/pull/21698
  
    We fixed the DataFrame repartition correctness issue by inserting a local 
sort before repartition, and feedback for this approach is generally negative 
because the performance of repartition() goes down significantly, and that even 
queries that don't have potential correctness issue also have to pay for the 
performance regression (eg. rdd.repartition(...).map(...).collect()). That's 
the major reason why we are trying to resolve the correctness issues in a 
different way.
    
    I agree that correctness shall be allocated higher priority than 
performance, but things are not black or white, here we really care about both. 
We also want to guarantee that if you don't have correctness issue before hand, 
you are least affected by the proposed fix approach.
    
    I'm currently working on a extended approach based on that proposed in this 
PR, that shall handle the cascading stages issue @mridulm mentioned above 
(rdd1.zip(rdd2).map(v => (computeKey(v._1, v._2), computeValue(v._1, 
v._2))).groupByKey().map().save()). Please also note that this actually don't 
implies we will retry more stages, it's true we will retry more tasks to ensure 
correctness on FetchFailure/ExecutorLost, but we won't retry more stages.
    
    Having said that, IMO the best bet is to implement both approaches(the 
insert-local-sort one and the retry-all-tasks-on-failure one) and create a flag 
for each of them, so users may choose a approach based on different workload 
patterns, though it's also debatable which approach shall be enabled by 
default. Unfortunately we are not able to deliver them on 2.4, but I'm 
optimistic we may include them in 3.0 and of course backport them to all the 
active branches.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to