[jira] [Comment Edited] (SPARK-38388) Repartition + Stage retries could lead to incorrect data

2022-06-23 Thread Descotte (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-38388?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17557965#comment-17557965
 ] 

Descotte edited comment on SPARK-38388 at 6/23/22 9:26 AM:
---

Hi,

Does this bug occur if you don't specify the number of partitions manually?

For example if you do
spark.range(0, 1000 * 1000, 1).repartition( $"id")

instead of
spark.range(0, 1000 * 1000, 1).repartition(100, $"id")

 

It seems easy to avoid repartition(n ) but repartionning by column seems a very 
important feature of spark that may be hard to avoid


was (Author: JIRAUSER291502):
Hi,

Does this bug occur if you don't specify the number of partitions manually?

For example if you do
spark.range(0, 1000 * 1000, 1).repartition( $"id")

instead of
spark.range(0, 1000 * 1000, 1).repartition(100, $"id")

 

It seems easy to avoid repartition ( n )but repartionning by column seems a 
very important feature of spark that may be hard to avoid

> Repartition + Stage retries could lead to incorrect data 
> -
>
> Key: SPARK-38388
> URL: https://issues.apache.org/jira/browse/SPARK-38388
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.4.0, 3.1.1
> Environment: Spark 2.4 and 3.1
>Reporter: Jason Xu
>Priority: Major
>  Labels: correctness, data-loss
>
> Spark repartition uses RoundRobinPartitioning, the generated results is 
> non-deterministic when data has some randomness and stage/task retries happen.
> The bug can be triggered when upstream data has some randomness, a 
> repartition is called on them, then followed by result stage (could be more 
> stages).
> As the pattern shows below:
> upstream stage (data with randomness) -> (repartition shuffle) -> result stage
> When one executor goes down at result stage, some tasks of that stage might 
> have finished, others would fail, shuffle files on that executor also get 
> lost, some tasks from previous stage (upstream data generation, repartition) 
> will need to rerun to generate dependent shuffle data files.
> Because data has some randomness, regenerated data in upstream retried tasks 
> is slightly different, repartition then generates inconsistent ordering, then 
> tasks at result stage will be retried generating different data.
> This is similar but different to 
> https://issues.apache.org/jira/browse/SPARK-23207, fix for it uses extra 
> local sort to make the row ordering deterministic, the sorting algorithm it 
> uses simply compares row/record hash. But in this case, upstream data has 
> some randomness, the sorting algorithm doesn't help keep the order, thus 
> RoundRobinPartitioning introduced non-deterministic result.
> The following code returns 986415, instead of 100:
> {code:java}
> import scala.sys.process._
> import org.apache.spark.TaskContext
> case class TestObject(id: Long, value: Double)
> val ds = spark.range(0, 1000 * 1000, 1).repartition(100, 
> $"id").withColumn("val", rand()).repartition(100).map { 
>   row => if (TaskContext.get.stageAttemptNumber == 0 && 
> TaskContext.get.attemptNumber == 0 && TaskContext.get.partitionId > 97) {
> throw new Exception("pkill -f java".!!)
>   }
>   TestObject(row.getLong(0), row.getDouble(1))
> }
> ds.toDF("id", "value").write.mode("overwrite").saveAsTable("tmp.test_table")
> spark.sql("select count(distinct id) from tmp.test_table").show{code}
> Command: 
> {code:java}
> spark-shell --num-executors 10 (--conf spark.dynamicAllocation.enabled=false 
> --conf spark.shuffle.service.enabled=false){code}
> To simulate the issue, disable external shuffle service is needed (if it's 
> also enabled by default in your environment),  this is to trigger shuffle 
> file loss and previous stage retries.
> In our production, we have external shuffle service enabled, this data 
> correctness issue happened when there were node losses.
> Although there's some non-deterministic factor in upstream data, user 
> wouldn't expect  to see incorrect result.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-38388) Repartition + Stage retries could lead to incorrect data

2022-06-23 Thread Descotte (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-38388?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17557965#comment-17557965
 ] 

Descotte edited comment on SPARK-38388 at 6/23/22 9:26 AM:
---

Hi,

Does this bug occur if you don't specify the number of partitions manually?

For example if you do
spark.range(0, 1000 * 1000, 1).repartition( $"id")

instead of
spark.range(0, 1000 * 1000, 1).repartition(100, $"id")

 

It seems easy to avoid repartition ( n )but repartionning by column seems a 
very important feature of spark that may be hard to avoid


was (Author: JIRAUSER291502):
Hi,

Does this bug occur if you don't specify the number of partitions manually?

For example if you do
spark.range(0, 1000 * 1000, 1).repartition( $"id")

instead of
spark.range(0, 1000 * 1000, 1).repartition(100, $"id")

 

It seems easy to avoid repartition(n) but repartionning by column seems a very 
important feature of spark that may be hard to avoid

> Repartition + Stage retries could lead to incorrect data 
> -
>
> Key: SPARK-38388
> URL: https://issues.apache.org/jira/browse/SPARK-38388
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.4.0, 3.1.1
> Environment: Spark 2.4 and 3.1
>Reporter: Jason Xu
>Priority: Major
>  Labels: correctness, data-loss
>
> Spark repartition uses RoundRobinPartitioning, the generated results is 
> non-deterministic when data has some randomness and stage/task retries happen.
> The bug can be triggered when upstream data has some randomness, a 
> repartition is called on them, then followed by result stage (could be more 
> stages).
> As the pattern shows below:
> upstream stage (data with randomness) -> (repartition shuffle) -> result stage
> When one executor goes down at result stage, some tasks of that stage might 
> have finished, others would fail, shuffle files on that executor also get 
> lost, some tasks from previous stage (upstream data generation, repartition) 
> will need to rerun to generate dependent shuffle data files.
> Because data has some randomness, regenerated data in upstream retried tasks 
> is slightly different, repartition then generates inconsistent ordering, then 
> tasks at result stage will be retried generating different data.
> This is similar but different to 
> https://issues.apache.org/jira/browse/SPARK-23207, fix for it uses extra 
> local sort to make the row ordering deterministic, the sorting algorithm it 
> uses simply compares row/record hash. But in this case, upstream data has 
> some randomness, the sorting algorithm doesn't help keep the order, thus 
> RoundRobinPartitioning introduced non-deterministic result.
> The following code returns 986415, instead of 100:
> {code:java}
> import scala.sys.process._
> import org.apache.spark.TaskContext
> case class TestObject(id: Long, value: Double)
> val ds = spark.range(0, 1000 * 1000, 1).repartition(100, 
> $"id").withColumn("val", rand()).repartition(100).map { 
>   row => if (TaskContext.get.stageAttemptNumber == 0 && 
> TaskContext.get.attemptNumber == 0 && TaskContext.get.partitionId > 97) {
> throw new Exception("pkill -f java".!!)
>   }
>   TestObject(row.getLong(0), row.getDouble(1))
> }
> ds.toDF("id", "value").write.mode("overwrite").saveAsTable("tmp.test_table")
> spark.sql("select count(distinct id) from tmp.test_table").show{code}
> Command: 
> {code:java}
> spark-shell --num-executors 10 (--conf spark.dynamicAllocation.enabled=false 
> --conf spark.shuffle.service.enabled=false){code}
> To simulate the issue, disable external shuffle service is needed (if it's 
> also enabled by default in your environment),  this is to trigger shuffle 
> file loss and previous stage retries.
> In our production, we have external shuffle service enabled, this data 
> correctness issue happened when there were node losses.
> Although there's some non-deterministic factor in upstream data, user 
> wouldn't expect  to see incorrect result.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-38388) Repartition + Stage retries could lead to incorrect data

2022-06-23 Thread Descotte (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-38388?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17557965#comment-17557965
 ] 

Descotte edited comment on SPARK-38388 at 6/23/22 9:26 AM:
---

Hi,

Does this bug occur if you don't specify the number of partitions manually?

For example if you do
spark.range(0, 1000 * 1000, 1).repartition( $"id")

instead of
spark.range(0, 1000 * 1000, 1).repartition(100, $"id")

 

It seems easy to avoid repartition(n) but repartionning by column seems a very 
important feature of spark that may be hard to avoid


was (Author: JIRAUSER291502):
Hi,

Does this bug occur if you don't specify the number of partitions manually?

For example if you do
spark.range(0, 1000 * 1000, 1).repartition( $"id")

 

instead of
spark.range(0, 1000 * 1000, 1).repartition(100, $"id")

> Repartition + Stage retries could lead to incorrect data 
> -
>
> Key: SPARK-38388
> URL: https://issues.apache.org/jira/browse/SPARK-38388
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.4.0, 3.1.1
> Environment: Spark 2.4 and 3.1
>Reporter: Jason Xu
>Priority: Major
>  Labels: correctness, data-loss
>
> Spark repartition uses RoundRobinPartitioning, the generated results is 
> non-deterministic when data has some randomness and stage/task retries happen.
> The bug can be triggered when upstream data has some randomness, a 
> repartition is called on them, then followed by result stage (could be more 
> stages).
> As the pattern shows below:
> upstream stage (data with randomness) -> (repartition shuffle) -> result stage
> When one executor goes down at result stage, some tasks of that stage might 
> have finished, others would fail, shuffle files on that executor also get 
> lost, some tasks from previous stage (upstream data generation, repartition) 
> will need to rerun to generate dependent shuffle data files.
> Because data has some randomness, regenerated data in upstream retried tasks 
> is slightly different, repartition then generates inconsistent ordering, then 
> tasks at result stage will be retried generating different data.
> This is similar but different to 
> https://issues.apache.org/jira/browse/SPARK-23207, fix for it uses extra 
> local sort to make the row ordering deterministic, the sorting algorithm it 
> uses simply compares row/record hash. But in this case, upstream data has 
> some randomness, the sorting algorithm doesn't help keep the order, thus 
> RoundRobinPartitioning introduced non-deterministic result.
> The following code returns 986415, instead of 100:
> {code:java}
> import scala.sys.process._
> import org.apache.spark.TaskContext
> case class TestObject(id: Long, value: Double)
> val ds = spark.range(0, 1000 * 1000, 1).repartition(100, 
> $"id").withColumn("val", rand()).repartition(100).map { 
>   row => if (TaskContext.get.stageAttemptNumber == 0 && 
> TaskContext.get.attemptNumber == 0 && TaskContext.get.partitionId > 97) {
> throw new Exception("pkill -f java".!!)
>   }
>   TestObject(row.getLong(0), row.getDouble(1))
> }
> ds.toDF("id", "value").write.mode("overwrite").saveAsTable("tmp.test_table")
> spark.sql("select count(distinct id) from tmp.test_table").show{code}
> Command: 
> {code:java}
> spark-shell --num-executors 10 (--conf spark.dynamicAllocation.enabled=false 
> --conf spark.shuffle.service.enabled=false){code}
> To simulate the issue, disable external shuffle service is needed (if it's 
> also enabled by default in your environment),  this is to trigger shuffle 
> file loss and previous stage retries.
> In our production, we have external shuffle service enabled, this data 
> correctness issue happened when there were node losses.
> Although there's some non-deterministic factor in upstream data, user 
> wouldn't expect  to see incorrect result.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-38388) Repartition + Stage retries could lead to incorrect data

2022-06-23 Thread Descotte (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-38388?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17557965#comment-17557965
 ] 

Descotte edited comment on SPARK-38388 at 6/23/22 9:25 AM:
---

Hi,

Does this bug occur if you don't specify the number of partitions manually?

For example if you do
spark.range(0, 1000 * 1000, 1).repartition( $"id")

 

instead of
spark.range(0, 1000 * 1000, 1).repartition(100, $"id")


was (Author: JIRAUSER291502):
Hi,

Does this bug occur if you don't specify the number of partitions manually?

For example if you do
spark.range(0, 1000 * 1000, 1).repartition(100, $"id")

> Repartition + Stage retries could lead to incorrect data 
> -
>
> Key: SPARK-38388
> URL: https://issues.apache.org/jira/browse/SPARK-38388
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.4.0, 3.1.1
> Environment: Spark 2.4 and 3.1
>Reporter: Jason Xu
>Priority: Major
>  Labels: correctness, data-loss
>
> Spark repartition uses RoundRobinPartitioning, the generated results is 
> non-deterministic when data has some randomness and stage/task retries happen.
> The bug can be triggered when upstream data has some randomness, a 
> repartition is called on them, then followed by result stage (could be more 
> stages).
> As the pattern shows below:
> upstream stage (data with randomness) -> (repartition shuffle) -> result stage
> When one executor goes down at result stage, some tasks of that stage might 
> have finished, others would fail, shuffle files on that executor also get 
> lost, some tasks from previous stage (upstream data generation, repartition) 
> will need to rerun to generate dependent shuffle data files.
> Because data has some randomness, regenerated data in upstream retried tasks 
> is slightly different, repartition then generates inconsistent ordering, then 
> tasks at result stage will be retried generating different data.
> This is similar but different to 
> https://issues.apache.org/jira/browse/SPARK-23207, fix for it uses extra 
> local sort to make the row ordering deterministic, the sorting algorithm it 
> uses simply compares row/record hash. But in this case, upstream data has 
> some randomness, the sorting algorithm doesn't help keep the order, thus 
> RoundRobinPartitioning introduced non-deterministic result.
> The following code returns 986415, instead of 100:
> {code:java}
> import scala.sys.process._
> import org.apache.spark.TaskContext
> case class TestObject(id: Long, value: Double)
> val ds = spark.range(0, 1000 * 1000, 1).repartition(100, 
> $"id").withColumn("val", rand()).repartition(100).map { 
>   row => if (TaskContext.get.stageAttemptNumber == 0 && 
> TaskContext.get.attemptNumber == 0 && TaskContext.get.partitionId > 97) {
> throw new Exception("pkill -f java".!!)
>   }
>   TestObject(row.getLong(0), row.getDouble(1))
> }
> ds.toDF("id", "value").write.mode("overwrite").saveAsTable("tmp.test_table")
> spark.sql("select count(distinct id) from tmp.test_table").show{code}
> Command: 
> {code:java}
> spark-shell --num-executors 10 (--conf spark.dynamicAllocation.enabled=false 
> --conf spark.shuffle.service.enabled=false){code}
> To simulate the issue, disable external shuffle service is needed (if it's 
> also enabled by default in your environment),  this is to trigger shuffle 
> file loss and previous stage retries.
> In our production, we have external shuffle service enabled, this data 
> correctness issue happened when there were node losses.
> Although there's some non-deterministic factor in upstream data, user 
> wouldn't expect  to see incorrect result.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-38388) Repartition + Stage retries could lead to incorrect data

2022-06-19 Thread Tom Sisso (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-38388?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17556051#comment-17556051
 ] 

Tom Sisso edited comment on SPARK-38388 at 6/19/22 10:19 AM:
-

Hi
We want to share that we experience a variant of this issue as well (at 
Taboola, using Spark 3.1.3), and stress that this issue can occur in any 
scenario where the recalculation won't produce exactly the same rows, not only 
randomness related or the ones mentioned here.
In our case we suffered from it in multiple jobs that perform simple 
aggregation over some double/float type column and then repartition it to 
control the number of output files, for example:
{code:java}
sqlContext.sql(
" SELECT integerColumn, SUM(someDoubleTypeValue) AS value
  FROM data
  GROUP BY integerColumn "
).repartition(3)
.write()
…{code}
In cases where we perform such calculation, we might get slightly different 
value on a retry due to precision issues.
Similar to what was already explained here, such case will again lead to 
different hashcode for this row, which will later cause multiple rows to be 
mapped to different partitions, which results in incorrect data that contains 
duplicate & missing rows.

It can be reproduced in a similar manner - add failure after some parts already 
succeeded (& disable external shuffle service, speculation and dynamic 
allocation):
{code:java}
sqlContext.sql(
" SELECT integerColumn, SUM(someDoubleTypeValue) AS value
  FROM data
  GROUP BY integerColumn "
).repartition(3)
.map((MapFunction) row -> {
if (TaskContext.get().stageAttemptNumber() == 0 &&
TaskContext.get().attemptNumber() == 0 &&
TaskContext.get().partitionId() > 1) {
Thread.sleep(60_000);
System.exit(1);
}

return row;
}, RowEncoder.apply(schema))
.write()
… {code}
 

We will appreciate if it can get higher priority and think that it should be 
communicated to users until a proper solution will be implemented, maybe with a 
warning on the repartition(numPartitions) api, we stopped using it.
Thanks


was (Author: JIRAUSER291139):
Hi
We want to share that we experience a variant of this issue as well (at 
Taboola, using Spark 3.1.3), and stress that this issue can occur in any 
scenario where the recalculation won't produce exactly the same rows, not only 
randomness related or the ones mentioned here.
In our case we suffered from it in multiple jobs that perform simple 
aggregation over some double/float type column and then repartition it to 
control the number of output files, for example:
{code:java}
sqlContext.sql(
" SELECT integerColumn, SUM(someDoubleTypeValue) AS value
FROM data
GROUP BY integerColumn "
).repartition(3)
.write()
…{code}

In cases where we perform such calculation, we might get slightly different 
value on a retry due to precision issues.
Similar to what was already explained here, such case will again lead to 
different hashcode for this row, which will later cause multiple rows to be 
mapped to different partitions, which results in incorrect data that contains 
duplicate & missing rows.

It can be reproduced in a similar manner - add failure after some parts already 
succeeded (& disable external shuffle service, speculation and dynamic 
allocation):
{code:java}
sqlContext.sql(
" SELECT integerColumn, SUM(someDoubleTypeValue) AS value
FROM data
GROUP BY integerColumn "
).repartition(3)
.map((MapFunction) row -> {
if (TaskContext.get().stageAttemptNumber() == 0 &&
TaskContext.get().attemptNumber() == 0 &&
TaskContext.get().partitionId() > 1)
{ Thread.sleep(60_000); System.exit(1); }
return row;
}, RowEncoder.apply(schema))
.write()
…
{code}
 

We will appreciate if it can get higher priority and think that it should be 
communicated to users until a proper solution will be implemented, maybe with a 
warning on the repartition(numPartitions) api, we stopped using it.
Thanks

> Repartition + Stage retries could lead to incorrect data 
> -
>
> Key: SPARK-38388
> URL: https://issues.apache.org/jira/browse/SPARK-38388
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.4.0, 3.1.1
> Environment: Spark 2.4 and 3.1
>Reporter: Jason Xu
>Priority: Major
>  Labels: correctness, data-loss
>
> Spark repartition uses RoundRobinPartitioning, the generated results is 
> non-deterministic when data has some randomness and stage/task retries happen.
> The bug can be triggered when upstream data has some randomness, a 
> repartition is called on them, then followed by result stage (could be more 
> stages).
> As the pattern shows below:
> upstream stage (data with randomness) -> 

[jira] [Comment Edited] (SPARK-38388) Repartition + Stage retries could lead to incorrect data

2022-06-19 Thread Tom Sisso (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-38388?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17556051#comment-17556051
 ] 

Tom Sisso edited comment on SPARK-38388 at 6/19/22 10:18 AM:
-

Hi
We want to share that we experience a variant of this issue as well (at 
Taboola, using Spark 3.1.3), and stress that this issue can occur in any 
scenario where the recalculation won't produce exactly the same rows, not only 
randomness related or the ones mentioned here.
In our case we suffered from it in multiple jobs that perform simple 
aggregation over some double/float type column and then repartition it to 
control the number of output files, for example:
{code:java}
sqlContext.sql(
" SELECT integerColumn, SUM(someDoubleTypeValue) AS value
FROM data
GROUP BY integerColumn "
).repartition(3)
.write()
…{code}

In cases where we perform such calculation, we might get slightly different 
value on a retry due to precision issues.
Similar to what was already explained here, such case will again lead to 
different hashcode for this row, which will later cause multiple rows to be 
mapped to different partitions, which results in incorrect data that contains 
duplicate & missing rows.

It can be reproduced in a similar manner - add failure after some parts already 
succeeded (& disable external shuffle service, speculation and dynamic 
allocation):
{code:java}
sqlContext.sql(
" SELECT integerColumn, SUM(someDoubleTypeValue) AS value
FROM data
GROUP BY integerColumn "
).repartition(3)
.map((MapFunction) row -> {
if (TaskContext.get().stageAttemptNumber() == 0 &&
TaskContext.get().attemptNumber() == 0 &&
TaskContext.get().partitionId() > 1)
{ Thread.sleep(60_000); System.exit(1); }
return row;
}, RowEncoder.apply(schema))
.write()
…
{code}
 

We will appreciate if it can get higher priority and think that it should be 
communicated to users until a proper solution will be implemented, maybe with a 
warning on the repartition(numPartitions) api, we stopped using it.
Thanks


was (Author: JIRAUSER291139):
Hi
We want to share that we experience a variant of this issue as well (at 
Taboola, using Spark 3.1.3), and stress that this issue can occur in any 
scenario where the recalculation won't produce exactly the same rows, not only 
randomness related or the ones mentioned here.
In our case we suffered from it in multiple jobs that perform simple 
aggregation over some double/float type column and then repartition it to 
control the number of output files, for example:
sqlContext.sql(
" SELECT integerColumn, SUM(someDoubleTypeValue) AS value
  FROM data
  GROUP BY integerColumn "
).repartition(3)
.write()
…
In cases where we perform such calculation, we might get slightly different 
value on a retry due to precision issues.
Similar to what was already explained here, such case will again lead to 
different hashcode for this row, which will later cause multiple rows to be 
mapped to different partitions, which results in incorrect data that contains 
duplicate & missing rows.

It can be reproduced in a similar manner - add failure after some parts already 
succeeded (& disable external shuffle service, speculation and dynamic 
allocation):
sqlContext.sql(
" SELECT integerColumn, SUM(someDoubleTypeValue) AS value
  FROM data
  GROUP BY integerColumn "
).repartition(3)
.map((MapFunction) row -> \{
if (TaskContext.get().stageAttemptNumber() == 0 &&
TaskContext.get().attemptNumber() == 0 &&
TaskContext.get().partitionId() > 1) {
Thread.sleep(60_000);
System.exit(1);
}

return row;
}, RowEncoder.apply(schema))
.write()
…

We will appreciate if it can get higher priority and think that it should be 
communicated to users until a proper solution will be implemented, maybe with a 
warning on the repartition(numPartitions) api, we stopped using it.
Thanks

> Repartition + Stage retries could lead to incorrect data 
> -
>
> Key: SPARK-38388
> URL: https://issues.apache.org/jira/browse/SPARK-38388
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.4.0, 3.1.1
> Environment: Spark 2.4 and 3.1
>Reporter: Jason Xu
>Priority: Major
>  Labels: correctness, data-loss
>
> Spark repartition uses RoundRobinPartitioning, the generated results is 
> non-deterministic when data has some randomness and stage/task retries happen.
> The bug can be triggered when upstream data has some randomness, a 
> repartition is called on them, then followed by result stage (could be more 
> stages).
> As the pattern shows below:
> upstream stage (data with randomness) -> (repartition shuffle) -> result stage

[jira] [Comment Edited] (SPARK-38388) Repartition + Stage retries could lead to incorrect data

2022-03-25 Thread Jason Xu (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-38388?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17510162#comment-17510162
 ] 

Jason Xu edited comment on SPARK-38388 at 3/26/22, 5:32 AM:


Thank you [~mridulm80] ! Wenchen also suggested to propagate the deterministic 
level in dev email thread: 
[https://lists.apache.org/thread/z5b8qssg51024nmtvk6gr2skxctl6xcm. 
|https://lists.apache.org/thread/z5b8qssg51024nmtvk6gr2skxctl6xcm]. I'm looking 
into it.


was (Author: kings129):
Thank you [~mridulm80] ! Wenchen also suggested to propagate the deterministic 
level in dev email thread: 
[https://lists.apache.org/thread/z5b8qssg51024nmtvk6gr2skxctl6xcm. 
|https://lists.apache.org/thread/z5b8qssg51024nmtvk6gr2skxctl6xcm.]I'm looking 
into it.

> Repartition + Stage retries could lead to incorrect data 
> -
>
> Key: SPARK-38388
> URL: https://issues.apache.org/jira/browse/SPARK-38388
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.4.0, 3.1.1
> Environment: Spark 2.4 and 3.1
>Reporter: Jason Xu
>Priority: Major
>  Labels: correctness, data-loss
>
> Spark repartition uses RoundRobinPartitioning, the generated results is 
> non-deterministic when data has some randomness and stage/task retries happen.
> The bug can be triggered when upstream data has some randomness, a 
> repartition is called on them, then followed by result stage (could be more 
> stages).
> As the pattern shows below:
> upstream stage (data with randomness) -> (repartition shuffle) -> result stage
> When one executor goes down at result stage, some tasks of that stage might 
> have finished, others would fail, shuffle files on that executor also get 
> lost, some tasks from previous stage (upstream data generation, repartition) 
> will need to rerun to generate dependent shuffle data files.
> Because data has some randomness, regenerated data in upstream retried tasks 
> is slightly different, repartition then generates inconsistent ordering, then 
> tasks at result stage will be retried generating different data.
> This is similar but different to 
> https://issues.apache.org/jira/browse/SPARK-23207, fix for it uses extra 
> local sort to make the row ordering deterministic, the sorting algorithm it 
> uses simply compares row/record hash. But in this case, upstream data has 
> some randomness, the sorting algorithm doesn't help keep the order, thus 
> RoundRobinPartitioning introduced non-deterministic result.
> The following code returns 986415, instead of 100:
> {code:java}
> import scala.sys.process._
> import org.apache.spark.TaskContext
> case class TestObject(id: Long, value: Double)
> val ds = spark.range(0, 1000 * 1000, 1).repartition(100, 
> $"id").withColumn("val", rand()).repartition(100).map { 
>   row => if (TaskContext.get.stageAttemptNumber == 0 && 
> TaskContext.get.attemptNumber == 0 && TaskContext.get.partitionId > 97) {
> throw new Exception("pkill -f java".!!)
>   }
>   TestObject(row.getLong(0), row.getDouble(1))
> }
> ds.toDF("id", "value").write.mode("overwrite").saveAsTable("tmp.test_table")
> spark.sql("select count(distinct id) from tmp.test_table").show{code}
> Command: 
> {code:java}
> spark-shell --num-executors 10 (--conf spark.dynamicAllocation.enabled=false 
> --conf spark.shuffle.service.enabled=false){code}
> To simulate the issue, disable external shuffle service is needed (if it's 
> also enabled by default in your environment),  this is to trigger shuffle 
> file loss and previous stage retries.
> In our production, we have external shuffle service enabled, this data 
> correctness issue happened when there were node losses.
> Although there's some non-deterministic factor in upstream data, user 
> wouldn't expect  to see incorrect result.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-38388) Repartition + Stage retries could lead to incorrect data

2022-03-08 Thread Jason Xu (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-38388?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17503105#comment-17503105
 ] 

Jason Xu edited comment on SPARK-38388 at 3/9/22, 7:34 AM:
---

[~cloud_fan] [~mridul] have lots of insight on this issue in 
[https://github.com/apache/spark/pull/20393] and 
[https://github.com/apache/spark/pull/22112], could you also help take a look? 
Appreciate!


was (Author: kings129):
[~cloud_fan] [~mridul] have lots of insight on this issue in 
[https://github.com/apache/spark/pull/20393] and 
[https://github.com/apache/spark/pull/22112], could you help take a look? 
Appreciate!

> Repartition + Stage retries could lead to incorrect data 
> -
>
> Key: SPARK-38388
> URL: https://issues.apache.org/jira/browse/SPARK-38388
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.4.0, 3.1.1
> Environment: Spark 2.4 and 3.1
>Reporter: Jason Xu
>Priority: Major
>
> Spark repartition uses RoundRobinPartitioning, the generated results is 
> non-deterministic when data has some randomness and stage/task retries happen.
> The bug can be triggered when upstream data has some randomness, a 
> repartition is called on them, then followed by result stage (could be more 
> stages).
> As the pattern shows below:
> upstream stage (data with randomness) -> (repartition shuffle) -> result stage
> When one executor goes down at result stage, some tasks of that stage might 
> have finished, others would fail, shuffle files on that executor also get 
> lost, some tasks from previous stage (upstream data generation, repartition) 
> will need to rerun to generate dependent shuffle data files.
> Because data has some randomness, regenerated data in upstream retried tasks 
> is slightly different, repartition then generates inconsistent ordering, then 
> tasks at result stage will be retried generating different data.
> This is similar but different to 
> https://issues.apache.org/jira/browse/SPARK-23207, fix for it uses extra 
> local sort to make the row ordering deterministic, the sorting algorithm it 
> uses simply compares row/record hash. But in this case, upstream data has 
> some randomness, the sorting algorithm doesn't help keep the order, thus 
> RoundRobinPartitioning introduced non-deterministic result.
> The following code returns 986415, instead of 100:
> {code:java}
> import scala.sys.process._
> import org.apache.spark.TaskContext
> case class TestObject(id: Long, value: Double)
> val ds = spark.range(0, 1000 * 1000, 1).repartition(100, 
> $"id").withColumn("val", rand()).repartition(100).map { 
>   row => if (TaskContext.get.stageAttemptNumber == 0 && 
> TaskContext.get.attemptNumber == 0 && TaskContext.get.partitionId > 97) {
> throw new Exception("pkill -f java".!!)
>   }
>   TestObject(row.getLong(0), row.getDouble(1))
> }
> ds.toDF("id", "value").write.mode("overwrite").saveAsTable("tmp.test_table")
> spark.sql("select count(distinct id) from tmp.test_table").show{code}
> Command: 
> {code:java}
> spark-shell --num-executors 10 (--conf spark.dynamicAllocation.enabled=false 
> --conf spark.shuffle.service.enabled=false){code}
> To simulate the issue, disable external shuffle service is needed (if it's 
> also enabled by default in your environment),  this is to trigger shuffle 
> file loss and previous stage retries.
> In our production, we have external shuffle service enabled, this data 
> correctness issue happened when there were node losses.
> Although there's some non-deterministic factor in upstream data, user 
> wouldn't expect  to see incorrect result.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-38388) Repartition + Stage retries could lead to incorrect data

2022-03-07 Thread Jason Xu (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-38388?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17501894#comment-17501894
 ] 

Jason Xu edited comment on SPARK-38388 at 3/7/22, 6:08 PM:
---

[~jiangxb1987] thanks for reply, above repo example uses DataFrame APIs and 
doesn't use RDD directly, I don't follow how to override the 
`getOutputDeterministicLevel` function in this case. If I missed something, 
could you help suggest how to modify above repo code?
I see `getOutputDeterministicLevel` function in RDD is introduced in 
[https://github.com/apache/spark/pull/22112|https://github.com/apache/spark/pull/22112,]
 ,does it only help when user create a customized RDD?


was (Author: kings129):
[~jiangxb1987] thanks for reply, above repo example uses DataFrame APIs and 
doesn't use RDD directly, I don't follow how to override the 
`getOutputDeterministicLevel` function in this case. If I missed something, 
could you help suggest how to modify above repo code?
I see `getOutputDeterministicLevel` function in RDD is introduced in 
[https://github.com/apache/spark/pull/22112,] does it only help when user 
create a customized RDD?

> Repartition + Stage retries could lead to incorrect data 
> -
>
> Key: SPARK-38388
> URL: https://issues.apache.org/jira/browse/SPARK-38388
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.4.0, 3.1.1
> Environment: Spark 2.4 and 3.1
>Reporter: Jason Xu
>Priority: Major
>
> Spark repartition uses RoundRobinPartitioning, the generated results is 
> non-deterministic when data has some randomness and stage/task retries happen.
> The bug can be triggered when upstream data has some randomness, a 
> repartition is called on them, then followed by result stage (could be more 
> stages).
> As the pattern shows below:
> upstream stage (data with randomness) -> (repartition shuffle) -> result stage
> When one executor goes down at result stage, some tasks of that stage might 
> have finished, others would fail, shuffle files on that executor also get 
> lost, some tasks from previous stage (upstream data generation, repartition) 
> will need to rerun to generate dependent shuffle data files.
> Because data has some randomness, regenerated data in upstream retried tasks 
> is slightly different, repartition then generates inconsistent ordering, then 
> tasks at result stage will be retried generating different data.
> This is similar but different to 
> https://issues.apache.org/jira/browse/SPARK-23207, fix for it uses extra 
> local sort to make the row ordering deterministic, the sorting algorithm it 
> uses simply compares row/record binaries. But in this case, upstream data has 
> some randomness, the sorting algorithm doesn't help keep the order, thus 
> RoundRobinPartitioning introduced non-deterministic result.
> The following code returns 986415, instead of 100:
> {code:java}
> import scala.sys.process._
> import org.apache.spark.TaskContext
> case class TestObject(id: Long, value: Double)
> val ds = spark.range(0, 1000 * 1000, 1).repartition(100, 
> $"id").withColumn("val", rand()).repartition(100).map { 
>   row => if (TaskContext.get.stageAttemptNumber == 0 && 
> TaskContext.get.attemptNumber == 0 && TaskContext.get.partitionId > 97) {
> throw new Exception("pkill -f java".!!)
>   }
>   TestObject(row.getLong(0), row.getDouble(1))
> }
> ds.toDF("id", "value").write.mode("overwrite").saveAsTable("tmp.test_table")
> spark.sql("select count(distinct id) from tmp.test_table").show{code}
> Command: 
> {code:java}
> spark-shell --num-executors 10 (--conf spark.dynamicAllocation.enabled=false 
> --conf spark.shuffle.service.enabled=false){code}
> To simulate the issue, disable external shuffle service is needed (if it's 
> also enabled by default in your environment),  this is to trigger shuffle 
> file loss and previous stage retries.
> In our production, we have external shuffle service enabled, this data 
> correctness issue happened when there were node losses.
> Although there's some non-deterministic factor in upstream data, user 
> wouldn't expect  to see incorrect result.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org