[ https://issues.apache.org/jira/browse/SPARK-38388?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17500997#comment-17500997 ]
Xingbo Jiang commented on SPARK-38388: -------------------------------------- Have you tried override the `getOutputDeterministicLevel` function in your customized RDD that generated the upstream data? If the data could be non-deterministic, you can return DeterministicLevel.INDETERMINATE, thus every time a children stage need to retry, it restarts tasks for all the partitions (not just the partitions that are lost/failed). > Repartition + Stage retries could lead to incorrect data > --------------------------------------------------------- > > Key: SPARK-38388 > URL: https://issues.apache.org/jira/browse/SPARK-38388 > Project: Spark > Issue Type: Bug > Components: SQL > Affects Versions: 2.4.0, 3.1.1 > Environment: Spark 2.4 and 3.1 > Reporter: Jason Xu > Priority: Major > > Spark repartition uses RoundRobinPartitioning, the generated results is > non-deterministic when data has some randomness and stage/task retries happen. > The bug can be triggered when upstream data has some randomness, a > repartition is called on them, then followed by result stage (could be more > stages). > As the pattern shows below: > upstream stage (data with randomness) -> (repartition shuffle) -> result stage > When one executor goes down at result stage, some tasks of that stage might > have finished, others would fail, shuffle files on that executor also get > lost, some tasks from previous stage (upstream data generation, repartition) > will need to rerun to generate dependent shuffle data files. > Because data has some randomness, regenerated data in upstream retried tasks > is slightly different, repartition then generates inconsistent ordering, then > tasks at result stage will be retried generating different data. > This is similar but different to > https://issues.apache.org/jira/browse/SPARK-23207, fix for it uses extra > local sort to make the row ordering deterministic, the sorting algorithm it > uses simply compares row/record binaries. But in this case, upstream data has > some randomness, the sorting algorithm doesn't help keep the order, thus > RoundRobinPartitioning introduced non-deterministic result. > The following code returns 986415, instead of 1000000: > {code:java} > import scala.sys.process._ > import org.apache.spark.TaskContext > case class TestObject(id: Long, value: Double) > val ds = spark.range(0, 1000 * 1000, 1).repartition(100, > $"id").withColumn("val", rand()).repartition(100).map { > row => if (TaskContext.get.stageAttemptNumber == 0 && > TaskContext.get.attemptNumber == 0 && TaskContext.get.partitionId > 97) { > throw new Exception("pkill -f java".!!) > } > TestObject(row.getLong(0), row.getDouble(1)) > } > ds.toDF("id", "value").write.mode("overwrite").saveAsTable("tmp.test_table") > spark.sql("select count(distinct id) from tmp.test_table").show{code} > Command: > {code:java} > spark-shell --num-executors 10 (--conf spark.dynamicAllocation.enabled=false > --conf spark.shuffle.service.enabled=false){code} > To simulate the issue, disable external shuffle service is needed (if it's > also enabled by default in your environment), this is to trigger shuffle > file loss and previous stage retries. > In our production, we have external shuffle service enabled, this data > correctness issue happened when there were node losses. > Although there's some non-deterministic factor in upstream data, user > wouldn't expect to see incorrect result. -- This message was sent by Atlassian Jira (v8.20.1#820001) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org