[jira] [Commented] (SPARK-37829) An outer-join using joinWith on DataFrames returns Rows with null fields instead of null values

2023-04-13 Thread Jason Xu (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-37829?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17712000#comment-17712000
 ] 

Jason Xu commented on SPARK-37829:
--

I created a pull request for this issue: (somehow it's not automatically linked 
here, so I manually comment here)
[https://github.com/apache/spark/pull/40755]

> An outer-join using joinWith on DataFrames returns Rows with null fields 
> instead of null values
> ---
>
> Key: SPARK-37829
> URL: https://issues.apache.org/jira/browse/SPARK-37829
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.0.0, 3.0.1, 3.0.2, 3.0.3, 3.1.0, 3.1.1, 3.1.2, 3.2.0
>Reporter: Clément de Groc
>Priority: Major
>
> Doing an outer-join using {{joinWith}} on {{{}DataFrame{}}}s used to return 
> missing values as {{null}} in Spark 2.4.8, but returns them as {{Rows}} with 
> {{null}} values in Spark 3+.
> The issue can be reproduced with [the following 
> test|https://github.com/cdegroc/spark/commit/79f4d6a1ec6c69b10b72dbc8f92ab6490d5ef5e5]
>  that succeeds on Spark 2.4.8 but fails starting from Spark 3.0.0.
> The problem only arises when working with DataFrames: Datasets of case 
> classes work as expected as demonstrated by [this other 
> test|https://github.com/apache/spark/blob/v3.0.0/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala#L1200-L1223].
> I couldn't find an explanation for this change in the Migration guide so I'm 
> assuming this is a bug.
> A {{git bisect}} pointed me to [that 
> commit|https://github.com/apache/spark/commit/cd92f25be5a221e0d4618925f7bc9dfd3bb8cb59].
> Reverting the commit solves the problem.
> A similar solution,  but without reverting, is shown 
> [here|https://github.com/cdegroc/spark/commit/684c675bf070876a475a9b225f6c2f92edce4c8a].
> Happy to help if you think of another approach / can provide some guidance.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-37829) An outer-join using joinWith on DataFrames returns Rows with null fields instead of null values

2023-04-05 Thread Jason Xu (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-37829?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17709013#comment-17709013
 ] 

Jason Xu commented on SPARK-37829:
--

Got it, thank you [~cdegroc]!
The codebase is kinda complicated, but I'll give it a shot.

> An outer-join using joinWith on DataFrames returns Rows with null fields 
> instead of null values
> ---
>
> Key: SPARK-37829
> URL: https://issues.apache.org/jira/browse/SPARK-37829
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.0.0, 3.0.1, 3.0.2, 3.0.3, 3.1.0, 3.1.1, 3.1.2, 3.2.0
>Reporter: Clément de Groc
>Priority: Major
>
> Doing an outer-join using {{joinWith}} on {{{}DataFrame{}}}s used to return 
> missing values as {{null}} in Spark 2.4.8, but returns them as {{Rows}} with 
> {{null}} values in Spark 3+.
> The issue can be reproduced with [the following 
> test|https://github.com/cdegroc/spark/commit/79f4d6a1ec6c69b10b72dbc8f92ab6490d5ef5e5]
>  that succeeds on Spark 2.4.8 but fails starting from Spark 3.0.0.
> The problem only arises when working with DataFrames: Datasets of case 
> classes work as expected as demonstrated by [this other 
> test|https://github.com/apache/spark/blob/v3.0.0/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala#L1200-L1223].
> I couldn't find an explanation for this change in the Migration guide so I'm 
> assuming this is a bug.
> A {{git bisect}} pointed me to [that 
> commit|https://github.com/apache/spark/commit/cd92f25be5a221e0d4618925f7bc9dfd3bb8cb59].
> Reverting the commit solves the problem.
> A similar solution,  but without reverting, is shown 
> [here|https://github.com/cdegroc/spark/commit/684c675bf070876a475a9b225f6c2f92edce4c8a].
> Happy to help if you think of another approach / can provide some guidance.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-37829) An outer-join using joinWith on DataFrames returns Rows with null fields instead of null values

2023-03-29 Thread Jason Xu (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-37829?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17706591#comment-17706591
 ] 

Jason Xu commented on SPARK-37829:
--


Our company encountered this issue during our migration from Spark 2.4 to 3. 
This issue may cause data correctness issues in our pipeline, as null is used 
to determine whether there is a matching row in a DataFrame outer join.

To unblock the migration, we would like to backport the fixing patch from 
upstream. However, I noticed that the pull requests above have been closed due 
to inactivity. 
[~cdegroc], are you planning to resume this work? 

By the way, I'm happy to help in any way!

> An outer-join using joinWith on DataFrames returns Rows with null fields 
> instead of null values
> ---
>
> Key: SPARK-37829
> URL: https://issues.apache.org/jira/browse/SPARK-37829
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.0.0, 3.0.1, 3.0.2, 3.0.3, 3.1.0, 3.1.1, 3.1.2, 3.2.0
>Reporter: Clément de Groc
>Priority: Major
>
> Doing an outer-join using {{joinWith}} on {{{}DataFrame{}}}s used to return 
> missing values as {{null}} in Spark 2.4.8, but returns them as {{Rows}} with 
> {{null}} values in Spark 3+.
> The issue can be reproduced with [the following 
> test|https://github.com/cdegroc/spark/commit/79f4d6a1ec6c69b10b72dbc8f92ab6490d5ef5e5]
>  that succeeds on Spark 2.4.8 but fails starting from Spark 3.0.0.
> The problem only arises when working with DataFrames: Datasets of case 
> classes work as expected as demonstrated by [this other 
> test|https://github.com/apache/spark/blob/v3.0.0/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala#L1200-L1223].
> I couldn't find an explanation for this change in the Migration guide so I'm 
> assuming this is a bug.
> A {{git bisect}} pointed me to [that 
> commit|https://github.com/apache/spark/commit/cd92f25be5a221e0d4618925f7bc9dfd3bb8cb59].
> Reverting the commit solves the problem.
> A similar solution,  but without reverting, is shown 
> [here|https://github.com/cdegroc/spark/commit/684c675bf070876a475a9b225f6c2f92edce4c8a].
> Happy to help if you think of another approach / can provide some guidance.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-38388) Repartition + Stage retries could lead to incorrect data

2022-06-28 Thread Jason Xu (Jira)
Title: Message Title


 
 
 
 

 
 
 

 
   
 Jason Xu updated an issue  
 

  
 
 
 
 

 
 
  
 
 
 
 

 
 Spark /  SPARK-38388  
 
 
  Repartition + Stage retries could lead to incorrect data
 

  
 
 
 
 

 
Change By: 
 Jason Xu  
 
 
Environment: 
 Spark 2.4 and 3. 1 x  
 

  
 
 
 
 

 
 
 

 
 
 Add Comment  
 

  
 

  
 
 
 
  
 

  
 
 
 
 

 
 This message was sent by Atlassian Jira (v8.20.10#820010-sha1:ace47f9)  
 
 

 
   
 

  
 

  
 

   



[jira] [Commented] (SPARK-38388) Repartition + Stage retries could lead to incorrect data

2022-04-27 Thread Jason Xu (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-38388?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17529097#comment-17529097
 ] 

Jason Xu commented on SPARK-38388:
--

Hi [~cloud_fan], could you assign this ticket to me? I have bandwidth to work 
on it in May.

Another possible solution:
Since the root cause is related to non-deterministic data in shuffling, is it 
possible to let driver to keep checksums of all shuffle blocks, if a map task 
re-attempt generates shuffle block with different checksum, Spark can detect 
on-the-fly and rerun all reduce tasks to avoid correctness issue.
I feel this could be a better solution because this is transparent to users, it 
doesn't require users to explicitly mark their data as nondeterminate. There 
are challenges for the other solution: 1. It wouldn't be easy to educate 
regular Spark users about the issue, they might not see the advice or they 
don't understand the importance of marking DeterministicLevel. 2. Even if they 
understand, it's hard for users to always remember to mark nondeterminate of 
their data.

Would do you think?

> Repartition + Stage retries could lead to incorrect data 
> -
>
> Key: SPARK-38388
> URL: https://issues.apache.org/jira/browse/SPARK-38388
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.4.0, 3.1.1
> Environment: Spark 2.4 and 3.1
>Reporter: Jason Xu
>Priority: Major
>  Labels: correctness, data-loss
>
> Spark repartition uses RoundRobinPartitioning, the generated results is 
> non-deterministic when data has some randomness and stage/task retries happen.
> The bug can be triggered when upstream data has some randomness, a 
> repartition is called on them, then followed by result stage (could be more 
> stages).
> As the pattern shows below:
> upstream stage (data with randomness) -> (repartition shuffle) -> result stage
> When one executor goes down at result stage, some tasks of that stage might 
> have finished, others would fail, shuffle files on that executor also get 
> lost, some tasks from previous stage (upstream data generation, repartition) 
> will need to rerun to generate dependent shuffle data files.
> Because data has some randomness, regenerated data in upstream retried tasks 
> is slightly different, repartition then generates inconsistent ordering, then 
> tasks at result stage will be retried generating different data.
> This is similar but different to 
> https://issues.apache.org/jira/browse/SPARK-23207, fix for it uses extra 
> local sort to make the row ordering deterministic, the sorting algorithm it 
> uses simply compares row/record hash. But in this case, upstream data has 
> some randomness, the sorting algorithm doesn't help keep the order, thus 
> RoundRobinPartitioning introduced non-deterministic result.
> The following code returns 986415, instead of 100:
> {code:java}
> import scala.sys.process._
> import org.apache.spark.TaskContext
> case class TestObject(id: Long, value: Double)
> val ds = spark.range(0, 1000 * 1000, 1).repartition(100, 
> $"id").withColumn("val", rand()).repartition(100).map { 
>   row => if (TaskContext.get.stageAttemptNumber == 0 && 
> TaskContext.get.attemptNumber == 0 && TaskContext.get.partitionId > 97) {
> throw new Exception("pkill -f java".!!)
>   }
>   TestObject(row.getLong(0), row.getDouble(1))
> }
> ds.toDF("id", "value").write.mode("overwrite").saveAsTable("tmp.test_table")
> spark.sql("select count(distinct id) from tmp.test_table").show{code}
> Command: 
> {code:java}
> spark-shell --num-executors 10 (--conf spark.dynamicAllocation.enabled=false 
> --conf spark.shuffle.service.enabled=false){code}
> To simulate the issue, disable external shuffle service is needed (if it's 
> also enabled by default in your environment),  this is to trigger shuffle 
> file loss and previous stage retries.
> In our production, we have external shuffle service enabled, this data 
> correctness issue happened when there were node losses.
> Although there's some non-deterministic factor in upstream data, user 
> wouldn't expect  to see incorrect result.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-38388) Repartition + Stage retries could lead to incorrect data

2022-03-25 Thread Jason Xu (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-38388?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17510162#comment-17510162
 ] 

Jason Xu edited comment on SPARK-38388 at 3/26/22, 5:32 AM:


Thank you [~mridulm80] ! Wenchen also suggested to propagate the deterministic 
level in dev email thread: 
[https://lists.apache.org/thread/z5b8qssg51024nmtvk6gr2skxctl6xcm. 
|https://lists.apache.org/thread/z5b8qssg51024nmtvk6gr2skxctl6xcm]. I'm looking 
into it.


was (Author: kings129):
Thank you [~mridulm80] ! Wenchen also suggested to propagate the deterministic 
level in dev email thread: 
[https://lists.apache.org/thread/z5b8qssg51024nmtvk6gr2skxctl6xcm. 
|https://lists.apache.org/thread/z5b8qssg51024nmtvk6gr2skxctl6xcm.]I'm looking 
into it.

> Repartition + Stage retries could lead to incorrect data 
> -
>
> Key: SPARK-38388
> URL: https://issues.apache.org/jira/browse/SPARK-38388
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.4.0, 3.1.1
> Environment: Spark 2.4 and 3.1
>Reporter: Jason Xu
>Priority: Major
>  Labels: correctness, data-loss
>
> Spark repartition uses RoundRobinPartitioning, the generated results is 
> non-deterministic when data has some randomness and stage/task retries happen.
> The bug can be triggered when upstream data has some randomness, a 
> repartition is called on them, then followed by result stage (could be more 
> stages).
> As the pattern shows below:
> upstream stage (data with randomness) -> (repartition shuffle) -> result stage
> When one executor goes down at result stage, some tasks of that stage might 
> have finished, others would fail, shuffle files on that executor also get 
> lost, some tasks from previous stage (upstream data generation, repartition) 
> will need to rerun to generate dependent shuffle data files.
> Because data has some randomness, regenerated data in upstream retried tasks 
> is slightly different, repartition then generates inconsistent ordering, then 
> tasks at result stage will be retried generating different data.
> This is similar but different to 
> https://issues.apache.org/jira/browse/SPARK-23207, fix for it uses extra 
> local sort to make the row ordering deterministic, the sorting algorithm it 
> uses simply compares row/record hash. But in this case, upstream data has 
> some randomness, the sorting algorithm doesn't help keep the order, thus 
> RoundRobinPartitioning introduced non-deterministic result.
> The following code returns 986415, instead of 100:
> {code:java}
> import scala.sys.process._
> import org.apache.spark.TaskContext
> case class TestObject(id: Long, value: Double)
> val ds = spark.range(0, 1000 * 1000, 1).repartition(100, 
> $"id").withColumn("val", rand()).repartition(100).map { 
>   row => if (TaskContext.get.stageAttemptNumber == 0 && 
> TaskContext.get.attemptNumber == 0 && TaskContext.get.partitionId > 97) {
> throw new Exception("pkill -f java".!!)
>   }
>   TestObject(row.getLong(0), row.getDouble(1))
> }
> ds.toDF("id", "value").write.mode("overwrite").saveAsTable("tmp.test_table")
> spark.sql("select count(distinct id) from tmp.test_table").show{code}
> Command: 
> {code:java}
> spark-shell --num-executors 10 (--conf spark.dynamicAllocation.enabled=false 
> --conf spark.shuffle.service.enabled=false){code}
> To simulate the issue, disable external shuffle service is needed (if it's 
> also enabled by default in your environment),  this is to trigger shuffle 
> file loss and previous stage retries.
> In our production, we have external shuffle service enabled, this data 
> correctness issue happened when there were node losses.
> Although there's some non-deterministic factor in upstream data, user 
> wouldn't expect  to see incorrect result.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-38388) Repartition + Stage retries could lead to incorrect data

2022-03-21 Thread Jason Xu (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-38388?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17510162#comment-17510162
 ] 

Jason Xu commented on SPARK-38388:
--

Thank you [~mridulm80] ! Wenchen also suggested to propagate the deterministic 
level in dev email thread: 
[https://lists.apache.org/thread/z5b8qssg51024nmtvk6gr2skxctl6xcm. 
|https://lists.apache.org/thread/z5b8qssg51024nmtvk6gr2skxctl6xcm.]I'm looking 
into it.

> Repartition + Stage retries could lead to incorrect data 
> -
>
> Key: SPARK-38388
> URL: https://issues.apache.org/jira/browse/SPARK-38388
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.4.0, 3.1.1
> Environment: Spark 2.4 and 3.1
>Reporter: Jason Xu
>Priority: Major
>  Labels: correctness, data-loss
>
> Spark repartition uses RoundRobinPartitioning, the generated results is 
> non-deterministic when data has some randomness and stage/task retries happen.
> The bug can be triggered when upstream data has some randomness, a 
> repartition is called on them, then followed by result stage (could be more 
> stages).
> As the pattern shows below:
> upstream stage (data with randomness) -> (repartition shuffle) -> result stage
> When one executor goes down at result stage, some tasks of that stage might 
> have finished, others would fail, shuffle files on that executor also get 
> lost, some tasks from previous stage (upstream data generation, repartition) 
> will need to rerun to generate dependent shuffle data files.
> Because data has some randomness, regenerated data in upstream retried tasks 
> is slightly different, repartition then generates inconsistent ordering, then 
> tasks at result stage will be retried generating different data.
> This is similar but different to 
> https://issues.apache.org/jira/browse/SPARK-23207, fix for it uses extra 
> local sort to make the row ordering deterministic, the sorting algorithm it 
> uses simply compares row/record hash. But in this case, upstream data has 
> some randomness, the sorting algorithm doesn't help keep the order, thus 
> RoundRobinPartitioning introduced non-deterministic result.
> The following code returns 986415, instead of 100:
> {code:java}
> import scala.sys.process._
> import org.apache.spark.TaskContext
> case class TestObject(id: Long, value: Double)
> val ds = spark.range(0, 1000 * 1000, 1).repartition(100, 
> $"id").withColumn("val", rand()).repartition(100).map { 
>   row => if (TaskContext.get.stageAttemptNumber == 0 && 
> TaskContext.get.attemptNumber == 0 && TaskContext.get.partitionId > 97) {
> throw new Exception("pkill -f java".!!)
>   }
>   TestObject(row.getLong(0), row.getDouble(1))
> }
> ds.toDF("id", "value").write.mode("overwrite").saveAsTable("tmp.test_table")
> spark.sql("select count(distinct id) from tmp.test_table").show{code}
> Command: 
> {code:java}
> spark-shell --num-executors 10 (--conf spark.dynamicAllocation.enabled=false 
> --conf spark.shuffle.service.enabled=false){code}
> To simulate the issue, disable external shuffle service is needed (if it's 
> also enabled by default in your environment),  this is to trigger shuffle 
> file loss and previous stage retries.
> In our production, we have external shuffle service enabled, this data 
> correctness issue happened when there were node losses.
> Although there's some non-deterministic factor in upstream data, user 
> wouldn't expect  to see incorrect result.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-38388) Repartition + Stage retries could lead to incorrect data

2022-03-12 Thread Jason Xu (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-38388?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Xu updated SPARK-38388:
-
Labels: correctness data-loss  (was: )

> Repartition + Stage retries could lead to incorrect data 
> -
>
> Key: SPARK-38388
> URL: https://issues.apache.org/jira/browse/SPARK-38388
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.4.0, 3.1.1
> Environment: Spark 2.4 and 3.1
>Reporter: Jason Xu
>Priority: Major
>  Labels: correctness, data-loss
>
> Spark repartition uses RoundRobinPartitioning, the generated results is 
> non-deterministic when data has some randomness and stage/task retries happen.
> The bug can be triggered when upstream data has some randomness, a 
> repartition is called on them, then followed by result stage (could be more 
> stages).
> As the pattern shows below:
> upstream stage (data with randomness) -> (repartition shuffle) -> result stage
> When one executor goes down at result stage, some tasks of that stage might 
> have finished, others would fail, shuffle files on that executor also get 
> lost, some tasks from previous stage (upstream data generation, repartition) 
> will need to rerun to generate dependent shuffle data files.
> Because data has some randomness, regenerated data in upstream retried tasks 
> is slightly different, repartition then generates inconsistent ordering, then 
> tasks at result stage will be retried generating different data.
> This is similar but different to 
> https://issues.apache.org/jira/browse/SPARK-23207, fix for it uses extra 
> local sort to make the row ordering deterministic, the sorting algorithm it 
> uses simply compares row/record hash. But in this case, upstream data has 
> some randomness, the sorting algorithm doesn't help keep the order, thus 
> RoundRobinPartitioning introduced non-deterministic result.
> The following code returns 986415, instead of 100:
> {code:java}
> import scala.sys.process._
> import org.apache.spark.TaskContext
> case class TestObject(id: Long, value: Double)
> val ds = spark.range(0, 1000 * 1000, 1).repartition(100, 
> $"id").withColumn("val", rand()).repartition(100).map { 
>   row => if (TaskContext.get.stageAttemptNumber == 0 && 
> TaskContext.get.attemptNumber == 0 && TaskContext.get.partitionId > 97) {
> throw new Exception("pkill -f java".!!)
>   }
>   TestObject(row.getLong(0), row.getDouble(1))
> }
> ds.toDF("id", "value").write.mode("overwrite").saveAsTable("tmp.test_table")
> spark.sql("select count(distinct id) from tmp.test_table").show{code}
> Command: 
> {code:java}
> spark-shell --num-executors 10 (--conf spark.dynamicAllocation.enabled=false 
> --conf spark.shuffle.service.enabled=false){code}
> To simulate the issue, disable external shuffle service is needed (if it's 
> also enabled by default in your environment),  this is to trigger shuffle 
> file loss and previous stage retries.
> In our production, we have external shuffle service enabled, this data 
> correctness issue happened when there were node losses.
> Although there's some non-deterministic factor in upstream data, user 
> wouldn't expect  to see incorrect result.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-38388) Repartition + Stage retries could lead to incorrect data

2022-03-08 Thread Jason Xu (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-38388?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17503105#comment-17503105
 ] 

Jason Xu edited comment on SPARK-38388 at 3/9/22, 7:34 AM:
---

[~cloud_fan] [~mridul] have lots of insight on this issue in 
[https://github.com/apache/spark/pull/20393] and 
[https://github.com/apache/spark/pull/22112], could you also help take a look? 
Appreciate!


was (Author: kings129):
[~cloud_fan] [~mridul] have lots of insight on this issue in 
[https://github.com/apache/spark/pull/20393] and 
[https://github.com/apache/spark/pull/22112], could you help take a look? 
Appreciate!

> Repartition + Stage retries could lead to incorrect data 
> -
>
> Key: SPARK-38388
> URL: https://issues.apache.org/jira/browse/SPARK-38388
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.4.0, 3.1.1
> Environment: Spark 2.4 and 3.1
>Reporter: Jason Xu
>Priority: Major
>
> Spark repartition uses RoundRobinPartitioning, the generated results is 
> non-deterministic when data has some randomness and stage/task retries happen.
> The bug can be triggered when upstream data has some randomness, a 
> repartition is called on them, then followed by result stage (could be more 
> stages).
> As the pattern shows below:
> upstream stage (data with randomness) -> (repartition shuffle) -> result stage
> When one executor goes down at result stage, some tasks of that stage might 
> have finished, others would fail, shuffle files on that executor also get 
> lost, some tasks from previous stage (upstream data generation, repartition) 
> will need to rerun to generate dependent shuffle data files.
> Because data has some randomness, regenerated data in upstream retried tasks 
> is slightly different, repartition then generates inconsistent ordering, then 
> tasks at result stage will be retried generating different data.
> This is similar but different to 
> https://issues.apache.org/jira/browse/SPARK-23207, fix for it uses extra 
> local sort to make the row ordering deterministic, the sorting algorithm it 
> uses simply compares row/record hash. But in this case, upstream data has 
> some randomness, the sorting algorithm doesn't help keep the order, thus 
> RoundRobinPartitioning introduced non-deterministic result.
> The following code returns 986415, instead of 100:
> {code:java}
> import scala.sys.process._
> import org.apache.spark.TaskContext
> case class TestObject(id: Long, value: Double)
> val ds = spark.range(0, 1000 * 1000, 1).repartition(100, 
> $"id").withColumn("val", rand()).repartition(100).map { 
>   row => if (TaskContext.get.stageAttemptNumber == 0 && 
> TaskContext.get.attemptNumber == 0 && TaskContext.get.partitionId > 97) {
> throw new Exception("pkill -f java".!!)
>   }
>   TestObject(row.getLong(0), row.getDouble(1))
> }
> ds.toDF("id", "value").write.mode("overwrite").saveAsTable("tmp.test_table")
> spark.sql("select count(distinct id) from tmp.test_table").show{code}
> Command: 
> {code:java}
> spark-shell --num-executors 10 (--conf spark.dynamicAllocation.enabled=false 
> --conf spark.shuffle.service.enabled=false){code}
> To simulate the issue, disable external shuffle service is needed (if it's 
> also enabled by default in your environment),  this is to trigger shuffle 
> file loss and previous stage retries.
> In our production, we have external shuffle service enabled, this data 
> correctness issue happened when there were node losses.
> Although there's some non-deterministic factor in upstream data, user 
> wouldn't expect  to see incorrect result.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-38388) Repartition + Stage retries could lead to incorrect data

2022-03-08 Thread Jason Xu (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-38388?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17503105#comment-17503105
 ] 

Jason Xu commented on SPARK-38388:
--

[~cloud_fan] [~mridul] have lots of insight on this issue in 
[https://github.com/apache/spark/pull/20393] and 
[https://github.com/apache/spark/pull/22112], could you help take a look? 
Appreciate!

> Repartition + Stage retries could lead to incorrect data 
> -
>
> Key: SPARK-38388
> URL: https://issues.apache.org/jira/browse/SPARK-38388
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.4.0, 3.1.1
> Environment: Spark 2.4 and 3.1
>Reporter: Jason Xu
>Priority: Major
>
> Spark repartition uses RoundRobinPartitioning, the generated results is 
> non-deterministic when data has some randomness and stage/task retries happen.
> The bug can be triggered when upstream data has some randomness, a 
> repartition is called on them, then followed by result stage (could be more 
> stages).
> As the pattern shows below:
> upstream stage (data with randomness) -> (repartition shuffle) -> result stage
> When one executor goes down at result stage, some tasks of that stage might 
> have finished, others would fail, shuffle files on that executor also get 
> lost, some tasks from previous stage (upstream data generation, repartition) 
> will need to rerun to generate dependent shuffle data files.
> Because data has some randomness, regenerated data in upstream retried tasks 
> is slightly different, repartition then generates inconsistent ordering, then 
> tasks at result stage will be retried generating different data.
> This is similar but different to 
> https://issues.apache.org/jira/browse/SPARK-23207, fix for it uses extra 
> local sort to make the row ordering deterministic, the sorting algorithm it 
> uses simply compares row/record hash. But in this case, upstream data has 
> some randomness, the sorting algorithm doesn't help keep the order, thus 
> RoundRobinPartitioning introduced non-deterministic result.
> The following code returns 986415, instead of 100:
> {code:java}
> import scala.sys.process._
> import org.apache.spark.TaskContext
> case class TestObject(id: Long, value: Double)
> val ds = spark.range(0, 1000 * 1000, 1).repartition(100, 
> $"id").withColumn("val", rand()).repartition(100).map { 
>   row => if (TaskContext.get.stageAttemptNumber == 0 && 
> TaskContext.get.attemptNumber == 0 && TaskContext.get.partitionId > 97) {
> throw new Exception("pkill -f java".!!)
>   }
>   TestObject(row.getLong(0), row.getDouble(1))
> }
> ds.toDF("id", "value").write.mode("overwrite").saveAsTable("tmp.test_table")
> spark.sql("select count(distinct id) from tmp.test_table").show{code}
> Command: 
> {code:java}
> spark-shell --num-executors 10 (--conf spark.dynamicAllocation.enabled=false 
> --conf spark.shuffle.service.enabled=false){code}
> To simulate the issue, disable external shuffle service is needed (if it's 
> also enabled by default in your environment),  this is to trigger shuffle 
> file loss and previous stage retries.
> In our production, we have external shuffle service enabled, this data 
> correctness issue happened when there were node losses.
> Although there's some non-deterministic factor in upstream data, user 
> wouldn't expect  to see incorrect result.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-38388) Repartition + Stage retries could lead to incorrect data

2022-03-08 Thread Jason Xu (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-38388?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Xu updated SPARK-38388:
-
Description: 
Spark repartition uses RoundRobinPartitioning, the generated results is 
non-deterministic when data has some randomness and stage/task retries happen.

The bug can be triggered when upstream data has some randomness, a repartition 
is called on them, then followed by result stage (could be more stages).
As the pattern shows below:
upstream stage (data with randomness) -> (repartition shuffle) -> result stage

When one executor goes down at result stage, some tasks of that stage might 
have finished, others would fail, shuffle files on that executor also get lost, 
some tasks from previous stage (upstream data generation, repartition) will 
need to rerun to generate dependent shuffle data files.
Because data has some randomness, regenerated data in upstream retried tasks is 
slightly different, repartition then generates inconsistent ordering, then 
tasks at result stage will be retried generating different data.

This is similar but different to 
https://issues.apache.org/jira/browse/SPARK-23207, fix for it uses extra local 
sort to make the row ordering deterministic, the sorting algorithm it uses 
simply compares row/record hash. But in this case, upstream data has some 
randomness, the sorting algorithm doesn't help keep the order, thus 
RoundRobinPartitioning introduced non-deterministic result.

The following code returns 986415, instead of 100:
{code:java}
import scala.sys.process._
import org.apache.spark.TaskContext

case class TestObject(id: Long, value: Double)

val ds = spark.range(0, 1000 * 1000, 1).repartition(100, 
$"id").withColumn("val", rand()).repartition(100).map { 
  row => if (TaskContext.get.stageAttemptNumber == 0 && 
TaskContext.get.attemptNumber == 0 && TaskContext.get.partitionId > 97) {
throw new Exception("pkill -f java".!!)
  }
  TestObject(row.getLong(0), row.getDouble(1))
}

ds.toDF("id", "value").write.mode("overwrite").saveAsTable("tmp.test_table")

spark.sql("select count(distinct id) from tmp.test_table").show{code}
Command: 
{code:java}
spark-shell --num-executors 10 (--conf spark.dynamicAllocation.enabled=false 
--conf spark.shuffle.service.enabled=false){code}
To simulate the issue, disable external shuffle service is needed (if it's also 
enabled by default in your environment),  this is to trigger shuffle file loss 
and previous stage retries.
In our production, we have external shuffle service enabled, this data 
correctness issue happened when there were node losses.

Although there's some non-deterministic factor in upstream data, user wouldn't 
expect  to see incorrect result.

  was:
Spark repartition uses RoundRobinPartitioning, the generated results is 
non-deterministic when data has some randomness and stage/task retries happen.

The bug can be triggered when upstream data has some randomness, a repartition 
is called on them, then followed by result stage (could be more stages).
As the pattern shows below:
upstream stage (data with randomness) -> (repartition shuffle) -> result stage

When one executor goes down at result stage, some tasks of that stage might 
have finished, others would fail, shuffle files on that executor also get lost, 
some tasks from previous stage (upstream data generation, repartition) will 
need to rerun to generate dependent shuffle data files.
Because data has some randomness, regenerated data in upstream retried tasks is 
slightly different, repartition then generates inconsistent ordering, then 
tasks at result stage will be retried generating different data.

This is similar but different to 
https://issues.apache.org/jira/browse/SPARK-23207, fix for it uses extra local 
sort to make the row ordering deterministic, the sorting algorithm it uses 
simply compares row/record binaries. But in this case, upstream data has some 
randomness, the sorting algorithm doesn't help keep the order, thus 
RoundRobinPartitioning introduced non-deterministic result.

The following code returns 986415, instead of 100:
{code:java}
import scala.sys.process._
import org.apache.spark.TaskContext

case class TestObject(id: Long, value: Double)

val ds = spark.range(0, 1000 * 1000, 1).repartition(100, 
$"id").withColumn("val", rand()).repartition(100).map { 
  row => if (TaskContext.get.stageAttemptNumber == 0 && 
TaskContext.get.attemptNumber == 0 && TaskContext.get.partitionId > 97) {
throw new Exception("pkill -f java".!!)
  }
  TestObject(row.getLong(0), row.getDouble(1))
}

ds.toDF("id", "value").write.mode("overwrite").saveAsTable("tmp.test_table")

spark.sql("select count(distinct id) from tmp.test_table").show{code}
Command: 
{code:java}
spark-shell --num-executors 10 (--conf spark.dynamicAllocation.enabled=false 
--conf spark.shuffle.service.enabled=false){code}
To simulate the issue, disable external shuffle service is n

[jira] [Commented] (SPARK-38388) Repartition + Stage retries could lead to incorrect data

2022-03-08 Thread Jason Xu (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-38388?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17503100#comment-17503100
 ] 

Jason Xu commented on SPARK-38388:
--

 [~jiangxb1987] using `rand()` in the reproduce example is to simulate the 
non-deterministic data in our production pipeline, we don't use rand() in 
production. We have seen two cases got into this incorrect data issue: 1. using 
.groupByKey(..).mapGroups(..) to pick one of the signals meet certain criteria 
2. using row_number() over a window function, then pick first row.
We are not using customized data source, the pipelines read in 
dataframe/dataset, then use public APIs for transformation.

> Repartition + Stage retries could lead to incorrect data 
> -
>
> Key: SPARK-38388
> URL: https://issues.apache.org/jira/browse/SPARK-38388
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.4.0, 3.1.1
> Environment: Spark 2.4 and 3.1
>Reporter: Jason Xu
>Priority: Major
>
> Spark repartition uses RoundRobinPartitioning, the generated results is 
> non-deterministic when data has some randomness and stage/task retries happen.
> The bug can be triggered when upstream data has some randomness, a 
> repartition is called on them, then followed by result stage (could be more 
> stages).
> As the pattern shows below:
> upstream stage (data with randomness) -> (repartition shuffle) -> result stage
> When one executor goes down at result stage, some tasks of that stage might 
> have finished, others would fail, shuffle files on that executor also get 
> lost, some tasks from previous stage (upstream data generation, repartition) 
> will need to rerun to generate dependent shuffle data files.
> Because data has some randomness, regenerated data in upstream retried tasks 
> is slightly different, repartition then generates inconsistent ordering, then 
> tasks at result stage will be retried generating different data.
> This is similar but different to 
> https://issues.apache.org/jira/browse/SPARK-23207, fix for it uses extra 
> local sort to make the row ordering deterministic, the sorting algorithm it 
> uses simply compares row/record binaries. But in this case, upstream data has 
> some randomness, the sorting algorithm doesn't help keep the order, thus 
> RoundRobinPartitioning introduced non-deterministic result.
> The following code returns 986415, instead of 100:
> {code:java}
> import scala.sys.process._
> import org.apache.spark.TaskContext
> case class TestObject(id: Long, value: Double)
> val ds = spark.range(0, 1000 * 1000, 1).repartition(100, 
> $"id").withColumn("val", rand()).repartition(100).map { 
>   row => if (TaskContext.get.stageAttemptNumber == 0 && 
> TaskContext.get.attemptNumber == 0 && TaskContext.get.partitionId > 97) {
> throw new Exception("pkill -f java".!!)
>   }
>   TestObject(row.getLong(0), row.getDouble(1))
> }
> ds.toDF("id", "value").write.mode("overwrite").saveAsTable("tmp.test_table")
> spark.sql("select count(distinct id) from tmp.test_table").show{code}
> Command: 
> {code:java}
> spark-shell --num-executors 10 (--conf spark.dynamicAllocation.enabled=false 
> --conf spark.shuffle.service.enabled=false){code}
> To simulate the issue, disable external shuffle service is needed (if it's 
> also enabled by default in your environment),  this is to trigger shuffle 
> file loss and previous stage retries.
> In our production, we have external shuffle service enabled, this data 
> correctness issue happened when there were node losses.
> Although there's some non-deterministic factor in upstream data, user 
> wouldn't expect  to see incorrect result.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-38388) Repartition + Stage retries could lead to incorrect data

2022-03-07 Thread Jason Xu (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-38388?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17501894#comment-17501894
 ] 

Jason Xu edited comment on SPARK-38388 at 3/7/22, 6:08 PM:
---

[~jiangxb1987] thanks for reply, above repo example uses DataFrame APIs and 
doesn't use RDD directly, I don't follow how to override the 
`getOutputDeterministicLevel` function in this case. If I missed something, 
could you help suggest how to modify above repo code?
I see `getOutputDeterministicLevel` function in RDD is introduced in 
[https://github.com/apache/spark/pull/22112|https://github.com/apache/spark/pull/22112,]
 ,does it only help when user create a customized RDD?


was (Author: kings129):
[~jiangxb1987] thanks for reply, above repo example uses DataFrame APIs and 
doesn't use RDD directly, I don't follow how to override the 
`getOutputDeterministicLevel` function in this case. If I missed something, 
could you help suggest how to modify above repo code?
I see `getOutputDeterministicLevel` function in RDD is introduced in 
[https://github.com/apache/spark/pull/22112,] does it only help when user 
create a customized RDD?

> Repartition + Stage retries could lead to incorrect data 
> -
>
> Key: SPARK-38388
> URL: https://issues.apache.org/jira/browse/SPARK-38388
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.4.0, 3.1.1
> Environment: Spark 2.4 and 3.1
>Reporter: Jason Xu
>Priority: Major
>
> Spark repartition uses RoundRobinPartitioning, the generated results is 
> non-deterministic when data has some randomness and stage/task retries happen.
> The bug can be triggered when upstream data has some randomness, a 
> repartition is called on them, then followed by result stage (could be more 
> stages).
> As the pattern shows below:
> upstream stage (data with randomness) -> (repartition shuffle) -> result stage
> When one executor goes down at result stage, some tasks of that stage might 
> have finished, others would fail, shuffle files on that executor also get 
> lost, some tasks from previous stage (upstream data generation, repartition) 
> will need to rerun to generate dependent shuffle data files.
> Because data has some randomness, regenerated data in upstream retried tasks 
> is slightly different, repartition then generates inconsistent ordering, then 
> tasks at result stage will be retried generating different data.
> This is similar but different to 
> https://issues.apache.org/jira/browse/SPARK-23207, fix for it uses extra 
> local sort to make the row ordering deterministic, the sorting algorithm it 
> uses simply compares row/record binaries. But in this case, upstream data has 
> some randomness, the sorting algorithm doesn't help keep the order, thus 
> RoundRobinPartitioning introduced non-deterministic result.
> The following code returns 986415, instead of 100:
> {code:java}
> import scala.sys.process._
> import org.apache.spark.TaskContext
> case class TestObject(id: Long, value: Double)
> val ds = spark.range(0, 1000 * 1000, 1).repartition(100, 
> $"id").withColumn("val", rand()).repartition(100).map { 
>   row => if (TaskContext.get.stageAttemptNumber == 0 && 
> TaskContext.get.attemptNumber == 0 && TaskContext.get.partitionId > 97) {
> throw new Exception("pkill -f java".!!)
>   }
>   TestObject(row.getLong(0), row.getDouble(1))
> }
> ds.toDF("id", "value").write.mode("overwrite").saveAsTable("tmp.test_table")
> spark.sql("select count(distinct id) from tmp.test_table").show{code}
> Command: 
> {code:java}
> spark-shell --num-executors 10 (--conf spark.dynamicAllocation.enabled=false 
> --conf spark.shuffle.service.enabled=false){code}
> To simulate the issue, disable external shuffle service is needed (if it's 
> also enabled by default in your environment),  this is to trigger shuffle 
> file loss and previous stage retries.
> In our production, we have external shuffle service enabled, this data 
> correctness issue happened when there were node losses.
> Although there's some non-deterministic factor in upstream data, user 
> wouldn't expect  to see incorrect result.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-38388) Repartition + Stage retries could lead to incorrect data

2022-03-06 Thread Jason Xu (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-38388?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17501894#comment-17501894
 ] 

Jason Xu commented on SPARK-38388:
--

[~jiangxb1987] thanks for reply, above repo example uses DataFrame APIs and 
doesn't use RDD directly, I don't follow how to override the 
`getOutputDeterministicLevel` function in this case. If I missed something, 
could you help suggest how to modify above repo code?
I see `getOutputDeterministicLevel` function in RDD is introduced in 
[https://github.com/apache/spark/pull/22112,] does it only help when user 
create a customized RDD?

> Repartition + Stage retries could lead to incorrect data 
> -
>
> Key: SPARK-38388
> URL: https://issues.apache.org/jira/browse/SPARK-38388
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.4.0, 3.1.1
> Environment: Spark 2.4 and 3.1
>Reporter: Jason Xu
>Priority: Major
>
> Spark repartition uses RoundRobinPartitioning, the generated results is 
> non-deterministic when data has some randomness and stage/task retries happen.
> The bug can be triggered when upstream data has some randomness, a 
> repartition is called on them, then followed by result stage (could be more 
> stages).
> As the pattern shows below:
> upstream stage (data with randomness) -> (repartition shuffle) -> result stage
> When one executor goes down at result stage, some tasks of that stage might 
> have finished, others would fail, shuffle files on that executor also get 
> lost, some tasks from previous stage (upstream data generation, repartition) 
> will need to rerun to generate dependent shuffle data files.
> Because data has some randomness, regenerated data in upstream retried tasks 
> is slightly different, repartition then generates inconsistent ordering, then 
> tasks at result stage will be retried generating different data.
> This is similar but different to 
> https://issues.apache.org/jira/browse/SPARK-23207, fix for it uses extra 
> local sort to make the row ordering deterministic, the sorting algorithm it 
> uses simply compares row/record binaries. But in this case, upstream data has 
> some randomness, the sorting algorithm doesn't help keep the order, thus 
> RoundRobinPartitioning introduced non-deterministic result.
> The following code returns 986415, instead of 100:
> {code:java}
> import scala.sys.process._
> import org.apache.spark.TaskContext
> case class TestObject(id: Long, value: Double)
> val ds = spark.range(0, 1000 * 1000, 1).repartition(100, 
> $"id").withColumn("val", rand()).repartition(100).map { 
>   row => if (TaskContext.get.stageAttemptNumber == 0 && 
> TaskContext.get.attemptNumber == 0 && TaskContext.get.partitionId > 97) {
> throw new Exception("pkill -f java".!!)
>   }
>   TestObject(row.getLong(0), row.getDouble(1))
> }
> ds.toDF("id", "value").write.mode("overwrite").saveAsTable("tmp.test_table")
> spark.sql("select count(distinct id) from tmp.test_table").show{code}
> Command: 
> {code:java}
> spark-shell --num-executors 10 (--conf spark.dynamicAllocation.enabled=false 
> --conf spark.shuffle.service.enabled=false){code}
> To simulate the issue, disable external shuffle service is needed (if it's 
> also enabled by default in your environment),  this is to trigger shuffle 
> file loss and previous stage retries.
> In our production, we have external shuffle service enabled, this data 
> correctness issue happened when there were node losses.
> Although there's some non-deterministic factor in upstream data, user 
> wouldn't expect  to see incorrect result.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-38388) Repartition + Stage retries could lead to incorrect data

2022-03-03 Thread Jason Xu (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-38388?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17500989#comment-17500989
 ] 

Jason Xu commented on SPARK-38388:
--

Hi [~jiangxb1987], would you have any suggestion on how to fix this issue? 
Thanks!

> Repartition + Stage retries could lead to incorrect data 
> -
>
> Key: SPARK-38388
> URL: https://issues.apache.org/jira/browse/SPARK-38388
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.4.0, 3.1.1
> Environment: Spark 2.4 and 3.1
>Reporter: Jason Xu
>Priority: Major
>
> Spark repartition uses RoundRobinPartitioning, the generated results is 
> non-deterministic when data has some randomness and stage/task retries happen.
> The bug can be triggered when upstream data has some randomness, a 
> repartition is called on them, then followed by result stage (could be more 
> stages).
> As the pattern shows below:
> upstream stage (data with randomness) -> (repartition shuffle) -> result stage
> When one executor goes down at result stage, some tasks of that stage might 
> have finished, others would fail, shuffle files on that executor also get 
> lost, some tasks from previous stage (upstream data generation, repartition) 
> will need to rerun to generate dependent shuffle data files.
> Because data has some randomness, regenerated data in upstream retried tasks 
> is slightly different, repartition then generates inconsistent ordering, then 
> tasks at result stage will be retried generating different data.
> This is similar but different to 
> https://issues.apache.org/jira/browse/SPARK-23207, fix for it uses extra 
> local sort to make the row ordering deterministic, the sorting algorithm it 
> uses simply compares row/record binaries. But in this case, upstream data has 
> some randomness, the sorting algorithm doesn't help keep the order, thus 
> RoundRobinPartitioning introduced non-deterministic result.
> The following code returns 986415, instead of 100:
> {code:java}
> import scala.sys.process._
> import org.apache.spark.TaskContext
> case class TestObject(id: Long, value: Double)
> val ds = spark.range(0, 1000 * 1000, 1).repartition(100, 
> $"id").withColumn("val", rand()).repartition(100).map { 
>   row => if (TaskContext.get.stageAttemptNumber == 0 && 
> TaskContext.get.attemptNumber == 0 && TaskContext.get.partitionId > 97) {
> throw new Exception("pkill -f java".!!)
>   }
>   TestObject(row.getLong(0), row.getDouble(1))
> }
> ds.toDF("id", "value").write.mode("overwrite").saveAsTable("tmp.test_table")
> spark.sql("select count(distinct id) from tmp.test_table").show{code}
> Command: 
> {code:java}
> spark-shell --num-executors 10 (--conf spark.dynamicAllocation.enabled=false 
> --conf spark.shuffle.service.enabled=false){code}
> To simulate the issue, disable external shuffle service is needed (if it's 
> also enabled by default in your environment),  this is to trigger shuffle 
> file loss and previous stage retries.
> In our production, we have external shuffle service enabled, this data 
> correctness issue happened when there were node losses.
> Although there's some non-deterministic factor in upstream data, user 
> wouldn't expect  to see incorrect result.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-38388) Repartition + Stage retries could lead to incorrect data

2022-03-02 Thread Jason Xu (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-38388?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Xu updated SPARK-38388:
-
Description: 
Spark repartition uses RoundRobinPartitioning, the generated results is 
non-deterministic when data has some randomness and stage/task retries happen.

The bug can be triggered when upstream data has some randomness, a repartition 
is called on them, then followed by result stage (could be more stages).
As the pattern shows below:
upstream stage (data with randomness) -> (repartition shuffle) -> result stage

When one executor goes down at result stage, some tasks of that stage might 
have finished, others would fail, shuffle files on that executor also get lost, 
some tasks from previous stage (upstream data generation, repartition) will 
need to rerun to generate dependent shuffle data files.
Because data has some randomness, regenerated data in upstream retried tasks is 
slightly different, repartition then generates inconsistent ordering, then 
tasks at result stage will be retried generating different data.

This is similar but different to 
https://issues.apache.org/jira/browse/SPARK-23207, fix for it uses extra local 
sort to make the row ordering deterministic, the sorting algorithm it uses 
simply compares row/record binaries. But in this case, upstream data has some 
randomness, the sorting algorithm doesn't help keep the order, thus 
RoundRobinPartitioning introduced non-deterministic result.

The following code returns 986415, instead of 100:
{code:java}
import scala.sys.process._
import org.apache.spark.TaskContext

case class TestObject(id: Long, value: Double)

val ds = spark.range(0, 1000 * 1000, 1).repartition(100, 
$"id").withColumn("val", rand()).repartition(100).map { 
  row => if (TaskContext.get.stageAttemptNumber == 0 && 
TaskContext.get.attemptNumber == 0 && TaskContext.get.partitionId > 97) {
throw new Exception("pkill -f java".!!)
  }
  TestObject(row.getLong(0), row.getDouble(1))
}

ds.toDF("id", "value").write.mode("overwrite").saveAsTable("tmp.test_table")

spark.sql("select count(distinct id) from tmp.test_table").show{code}
Command: 
{code:java}
spark-shell --num-executors 10 (--conf spark.dynamicAllocation.enabled=false 
--conf spark.shuffle.service.enabled=false){code}
To simulate the issue, disable external shuffle service is needed (if it's also 
enabled by default in your environment),  this is to trigger shuffle file loss 
and previous stage retries.
In our production, we have external shuffle service enabled, this data 
correctness issue happened when there were node losses.

Although there's some non-deterministic factor in upstream data, user wouldn't 
expect  to see incorrect result.

  was:
Spark repartition uses RoundRobinPartitioning, the generated results is 
non-deterministic when data has some randomness and stage/task retries happen.

The bug can be triggered when upstream data has some randomness, a repartition 
is called on them, then followed by result stage (could be more stages).
As the pattern shows below:
upstream stage (data with randomness) -> (repartition shuffle) -> result stage

When one executor goes down at result stage, some tasks of that stage might 
have finished, others would fail, shuffle files on that executor also get lost, 
some tasks from previous stage (upstream data generation, repartition) will 
need to rerun to generate dependent shuffle data files.
Because data has some randomness, regenerated data in upstream retried tasks is 
slightly different, repartition then generates inconsistent ordering, then 
tasks at result stage will be retried generating different data.

This is similar but different to 
https://issues.apache.org/jira/browse/SPARK-23207, fix for it uses extra local 
sort to make the row ordering deterministic, the sorting algorithm it uses 
simply compares row/record binaries. But in this case, upstream data has some 
randomness, the sorting algorithm doesn't help keep the order, thus 
RoundRobinPartitioning introduced non-deterministic result.

The following code returns 998818, instead of 100:
{code:java}
import scala.sys.process._
import org.apache.spark.TaskContext

case class TestObject(id: Long, value: Double)

val ds = spark.range(0, 1000 * 1000, 1).repartition(100, 
$"id").withColumn("val", rand()).repartition(100).map { 
  row => if (TaskContext.get.stageAttemptNumber == 0 && 
TaskContext.get.attemptNumber == 0 && TaskContext.get.partitionId > 97) {
throw new Exception("pkill -f java".!!)
  }
  TestObject(row.getLong(0), row.getDouble(1))
}

ds.toDF("id", "value").write.mode("overwrite").saveAsTable("tmp.test_table")

spark.sql("select count(distinct id) from tmp.test_table").show{code}
Command: 
{code:java}
spark-shell --num-executors 10 (--conf spark.dynamicAllocation.enabled=false 
--conf spark.shuffle.service.enabled=false){code}
To simulate the issue, disable external shuffle service 

[jira] [Updated] (SPARK-38388) Repartition + Stage retries could lead to incorrect data

2022-03-02 Thread Jason Xu (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-38388?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Xu updated SPARK-38388:
-
Description: 
Spark repartition uses RoundRobinPartitioning, the generated results is 
non-deterministic when data has some randomness and stage/task retries happen.

The bug can be triggered when upstream data has some randomness, a repartition 
is called on them, then followed by result stage (could be more stages).
As the pattern shows below:
upstream stage (data with randomness) -> (repartition shuffle) -> result stage

When one executor goes down at result stage, some tasks of that stage might 
have finished, others would fail, shuffle files on that executor also get lost, 
some tasks from previous stage (upstream data generation, repartition) will 
need to rerun to generate dependent shuffle data files.
Because data has some randomness, regenerated data in upstream retried tasks is 
slightly different, repartition then generates inconsistent ordering, then 
tasks at result stage will be retried generating different data.

This is similar but different to 
https://issues.apache.org/jira/browse/SPARK-23207, fix for it uses extra local 
sort to make the row ordering deterministic, the sorting algorithm it uses 
simply compares row/record binaries. But in this case, upstream data has some 
randomness, the sorting algorithm doesn't help keep the order, thus 
RoundRobinPartitioning introduced non-deterministic result.

The following code returns 998818, instead of 100:
{code:java}
import scala.sys.process._
import org.apache.spark.TaskContext

case class TestObject(id: Long, value: Double)

val ds = spark.range(0, 1000 * 1000, 1).repartition(100, 
$"id").withColumn("val", rand()).repartition(100).map { 
  row => if (TaskContext.get.stageAttemptNumber == 0 && 
TaskContext.get.attemptNumber == 0 && TaskContext.get.partitionId > 97) {
throw new Exception("pkill -f java".!!)
  }
  TestObject(row.getLong(0), row.getDouble(1))
}

ds.toDF("id", "value").write.mode("overwrite").saveAsTable("tmp.test_table")

spark.sql("select count(distinct id) from tmp.test_table").show{code}
Command: 
{code:java}
spark-shell --num-executors 10 (--conf spark.dynamicAllocation.enabled=false 
--conf spark.shuffle.service.enabled=false){code}
To simulate the issue, disable external shuffle service is needed (if it's also 
enabled by default in your environment),  this is to trigger shuffle file loss 
and previous stage retries.
In our production, we have external shuffle service enabled, this data 
correctness issue happened when there were node losses.

Although there's some non-deterministic factor in upstream data, user wouldn't 
expect  to see incorrect result.

  was:
Spark repartition uses RoundRobinPartitioning, the generated results is 
non-deterministic when data has some randomness and stage/task retries happen.

The bug can be triggered when upstream data has some randomness, a repartition 
is called on them, then followed by result stage (could be more stages).
As the pattern shows below:
upstream stage (data with randomness) -> (repartition shuffle) -> result stage

When one executor goes down at result stage, some tasks of that stage might 
have finished, others would fail, shuffle files on that executor also get lost, 
some tasks from previous stage (upstream data generation, repartition) will 
need to rerun to generate dependent shuffle data files.
Because data has some randomness, regenerated data in upstream retried tasks is 
slightly different, repartition then generates inconsistent ordering, then 
tasks at result stage will be retried generating different data.

This is similar but different to 
https://issues.apache.org/jira/browse/SPARK-23207, fix for it uses extra local 
sort to make the row ordering deterministic, the sorting algorithm it uses 
simply compares row/record binaries. But in this case, upstream data has some 
randomness, the sorting algorithm doesn't help keep the order, thus 
RoundRobinPartitioning introduced non-deterministic result.

The following code returns 998818, instead of 100:
{code:java}
import scala.sys.process._
import org.apache.spark.TaskContext

case class TestObject(id: Long, value: Double)

val ds = spark.range(0, 1000 * 1000, 1).repartition(100, 
$"id").withColumn("val", rand()).repartition(100).map { 
  row => if (TaskContext.get.stageAttemptNumber == 0 && 
TaskContext.get.attemptNumber == 0 && TaskContext.get.partitionId > 98) {
throw new Exception("pkill -f java".!!)
  }
  TestObject(row.getLong(0), row.getDouble(1))
}

ds.toDF("id", "value").write.saveAsTable("tmp.test_table")

spark.sql("select count(distinct id) from tmp.test_table").show{code}
Command: 
{code:java}
spark-shell --num-executors 10 (--conf spark.dynamicAllocation.enabled=false 
--conf spark.shuffle.service.enabled=false){code}
To simulate the issue, disable external shuffle service is needed (if it's

[jira] [Commented] (SPARK-38388) Repartition + Stage retries could lead to incorrect data

2022-03-02 Thread Jason Xu (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-38388?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17499979#comment-17499979
 ] 

Jason Xu commented on SPARK-38388:
--

I think one potential solution is do not allow partial retry of the stage after 
repartition, even succeeded tasks need to rerun.

> Repartition + Stage retries could lead to incorrect data 
> -
>
> Key: SPARK-38388
> URL: https://issues.apache.org/jira/browse/SPARK-38388
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.4.0, 3.1.1
> Environment: Spark 2.4 and 3.1
>Reporter: Jason Xu
>Priority: Major
>
> Spark repartition uses RoundRobinPartitioning, the generated results is 
> non-deterministic when data has some randomness and stage/task retries happen.
> The bug can be triggered when upstream data has some randomness, a 
> repartition is called on them, then followed by result stage (could be more 
> stages).
> As the pattern shows below:
> upstream stage (data with randomness) -> (repartition shuffle) -> result stage
> When one executor goes down at result stage, some tasks of that stage might 
> have finished, others would fail, shuffle files on that executor also get 
> lost, some tasks from previous stage (upstream data generation, repartition) 
> will need to rerun to generate dependent shuffle data files.
> Because data has some randomness, regenerated data in upstream retried tasks 
> is slightly different, repartition then generates inconsistent ordering, then 
> tasks at result stage will be retried generating different data.
> This is similar but different to 
> https://issues.apache.org/jira/browse/SPARK-23207, fix for it uses extra 
> local sort to make the row ordering deterministic, the sorting algorithm it 
> uses simply compares row/record binaries. But in this case, upstream data has 
> some randomness, the sorting algorithm doesn't help keep the order, thus 
> RoundRobinPartitioning introduced non-deterministic result.
> The following code returns 998818, instead of 100:
> {code:java}
> import scala.sys.process._
> import org.apache.spark.TaskContext
> case class TestObject(id: Long, value: Double)
> val ds = spark.range(0, 1000 * 1000, 1).repartition(100, 
> $"id").withColumn("val", rand()).repartition(100).map { 
>   row => if (TaskContext.get.stageAttemptNumber == 0 && 
> TaskContext.get.attemptNumber == 0 && TaskContext.get.partitionId > 98) {
> throw new Exception("pkill -f java".!!)
>   }
>   TestObject(row.getLong(0), row.getDouble(1))
> }
> ds.toDF("id", "value").write.saveAsTable("tmp.test_table")
> spark.sql("select count(distinct id) from tmp.test_table").show{code}
> Command: 
> {code:java}
> spark-shell --num-executors 10 (--conf spark.dynamicAllocation.enabled=false 
> --conf spark.shuffle.service.enabled=false){code}
> To simulate the issue, disable external shuffle service is needed (if it's 
> also enabled by default in your environment),  this is to trigger shuffle 
> file loss and previous stage retries.
> In our production, we have external shuffle service enabled, this data 
> correctness issue happened when there were node losses.
> Although there's some non-deterministic factor in upstream data, user 
> wouldn't expect  to see incorrect result.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-38388) Repartition + Stage retries could lead to incorrect data

2022-03-02 Thread Jason Xu (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-38388?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Xu updated SPARK-38388:
-
Description: 
Spark repartition uses RoundRobinPartitioning, the generated results is 
non-deterministic when data has some randomness and stage/task retries happen.

The bug can be triggered when upstream data has some randomness, a repartition 
is called on them, then followed by result stage (could be more stages).
As the pattern shows below:
upstream stage (data with randomness) -> (repartition shuffle) -> result stage

When one executor goes down at result stage, some tasks of that stage might 
have finished, others would fail, shuffle files on that executor also get lost, 
some tasks from previous stage (upstream data generation, repartition) will 
need to rerun to generate dependent shuffle data files.
Because data has some randomness, regenerated data in upstream retried tasks is 
slightly different, repartition then generates inconsistent ordering, then 
tasks at result stage will be retried generating different data.

This is similar but different to 
https://issues.apache.org/jira/browse/SPARK-23207, fix for it uses extra local 
sort to make the row ordering deterministic, the sorting algorithm it uses 
simply compares row/record binaries. But in this case, upstream data has some 
randomness, the sorting algorithm doesn't help keep the order, thus 
RoundRobinPartitioning introduced non-deterministic result.

The following code returns 998818, instead of 100:
{code:java}
import scala.sys.process._
import org.apache.spark.TaskContext

case class TestObject(id: Long, value: Double)

val ds = spark.range(0, 1000 * 1000, 1).repartition(100, 
$"id").withColumn("val", rand()).repartition(100).map { 
  row => if (TaskContext.get.stageAttemptNumber == 0 && 
TaskContext.get.attemptNumber == 0 && TaskContext.get.partitionId > 98) {
throw new Exception("pkill -f java".!!)
  }
  TestObject(row.getLong(0), row.getDouble(1))
}

ds.toDF("id", "value").write.saveAsTable("tmp.test_table")

spark.sql("select count(distinct id) from tmp.test_table").show{code}
Command: 
{code:java}
spark-shell --num-executors 10 (--conf spark.dynamicAllocation.enabled=false 
--conf spark.shuffle.service.enabled=false){code}
To simulate the issue, disable external shuffle service is needed (if it's also 
enabled by default in your environment),  this is to trigger shuffle file loss 
and previous stage retries.
In our production, we have external shuffle service enabled, this data 
correctness issue happened when there were node losses.

Although there's some non-deterministic factor in upstream data, user wouldn't 
expect  to see incorrect result.

  was:
Spark repartition uses RoundRobinPartitioning, the generated results is 
non-deterministic when data has some randomness and stage/task retries happen.

The bug can be triggered when upstream data has some randomness, a repartition 
is called on them, then followed by result stage (could be more stages).
As the pattern shows below:
upstream stage (data with randomness) -> (repartition shuffle) -> result stage

When one executor goes down at result stage, some tasks of that stage might 
have finished, others would fail, shuffle files on that executor also get lost, 
some tasks from previous stage (upstream data generation, repartition) will 
need to rerun to generate dependent shuffle data files.
Because data has some randomness, regenerated data in upstream retried tasks is 
slightly different, repartition then generates inconsistent ordering, then 
tasks at result stage will be retried generating different data.

This is similar but different to 
https://issues.apache.org/jira/browse/SPARK-23207, fix for it uses extra local 
sort to make the row ordering deterministic, the sorting algorithm it uses 
simply compares row/record binaries. But in this case, upstream data has some 
randomness, the sorting algorithm doesn't help keep the order, thus 
RoundRobinPartitioning introduced non-deterministic result.

The following code returns 998818, instead of 100:
{code:java}
import scala.sys.process._
import org.apache.spark.TaskContext

case class TestObject(id: Long, value: Double)

val ds = spark.range(0, 1000 * 1000, 1).repartition(100, 
$"id").withColumn("val", rand()).repartition(100).map { row =>
if (TaskContext.get.stageAttemptNumber == 0 && 
TaskContext.get.attemptNumber == 0 && TaskContext.get.partitionId > 98) 
  {
throw new Exception("pkill -f java".!!)
  }
  TestObject(row.getLong(0), row.getDouble(1))
}

ds.toDF("id", "value").write.saveAsTable("tmp.test_table")

spark.sql("select count(distinct id) from tmp.test_table").show{code}

Command: 
{code:java}
spark-shell --num-executors 10 (--conf spark.dynamicAllocation.enabled=false 
--conf spark.shuffle.service.enabled=false){code}
To simulate the issue, disable external shuffle service is needed (if it's 

[jira] [Created] (SPARK-38388) Repartition + Stage retries could lead to incorrect data

2022-03-02 Thread Jason Xu (Jira)
Jason Xu created SPARK-38388:


 Summary: Repartition + Stage retries could lead to incorrect data 
 Key: SPARK-38388
 URL: https://issues.apache.org/jira/browse/SPARK-38388
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 3.1.1, 2.4.0
 Environment: Spark 2.4 and 3.1
Reporter: Jason Xu


Spark repartition uses RoundRobinPartitioning, the generated results is 
non-deterministic when data has some randomness and stage/task retries happen.

The bug can be triggered when upstream data has some randomness, a repartition 
is called on them, then followed by result stage (could be more stages).
As the pattern shows below:
upstream stage (data with randomness) -> (repartition shuffle) -> result stage

When one executor goes down at result stage, some tasks of that stage might 
have finished, others would fail, shuffle files on that executor also get lost, 
some tasks from previous stage (upstream data generation, repartition) will 
need to rerun to generate dependent shuffle data files.
Because data has some randomness, regenerated data in upstream retried tasks is 
slightly different, repartition then generates inconsistent ordering, then 
tasks at result stage will be retried generating different data.

This is similar but different to 
https://issues.apache.org/jira/browse/SPARK-23207, fix for it uses extra local 
sort to make the row ordering deterministic, the sorting algorithm it uses 
simply compares row/record binaries. But in this case, upstream data has some 
randomness, the sorting algorithm doesn't help keep the order, thus 
RoundRobinPartitioning introduced non-deterministic result.

The following code returns 998818, instead of 100:
{code:java}
import scala.sys.process._
import org.apache.spark.TaskContext

case class TestObject(id: Long, value: Double)

val ds = spark.range(0, 1000 * 1000, 1).repartition(100, 
$"id").withColumn("val", rand()).repartition(100).map { row =>
if (TaskContext.get.stageAttemptNumber == 0 && 
TaskContext.get.attemptNumber == 0 && TaskContext.get.partitionId > 98) 
  {
throw new Exception("pkill -f java".!!)
  }
  TestObject(row.getLong(0), row.getDouble(1))
}

ds.toDF("id", "value").write.saveAsTable("tmp.test_table")

spark.sql("select count(distinct id) from tmp.test_table").show{code}

Command: 
{code:java}
spark-shell --num-executors 10 (--conf spark.dynamicAllocation.enabled=false 
--conf spark.shuffle.service.enabled=false){code}
To simulate the issue, disable external shuffle service is needed (if it's also 
enabled by default in your environment),  this is to trigger shuffle file loss 
and previous stage retries.
In our production, we have external shuffle service enabled, this data 
correctness issue happened when there were node losses.

Although there's some non-deterministic factor in upstream data, user wouldn't 
expect  to see incorrect result.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-30873) Handling Node Decommissioning for Yarn cluster manger in Spark

2021-08-18 Thread Jason Xu (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-30873?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17401282#comment-17401282
 ] 

Jason Xu edited comment on SPARK-30873 at 8/18/21, 6:01 PM:


Hi [~saurabhc100] , I found your work is useful in dealing with Yarn node 
decommissioning. I have one questions: In order to get "DECOMMISSIONING" node 
state update from RM, does it require using YARN / Hadoop later than version 
3.0.1? Seems support is added in 
https://issues.apache.org/jira/browse/YARN-6483.


was (Author: kings129):
Hi Saurabh, I found your work is useful in dealing with Yarn node 
decommissioning. I have one questions: In order to get "DECOMMISSIONING" node 
state update from RM, does it require using YARN / Hadoop later than version 
3.0.1? Seems support is added in 
https://issues.apache.org/jira/browse/YARN-6483.

> Handling Node Decommissioning for Yarn cluster manger in Spark
> --
>
> Key: SPARK-30873
> URL: https://issues.apache.org/jira/browse/SPARK-30873
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core, YARN
>Affects Versions: 3.1.0
>Reporter: Saurabh Chawla
>Priority: Major
>
> In many public cloud environments, the node loss (in case of AWS 
> SpotLoss,Spot blocks and GCP preemptible VMs) is a planned and informed 
> activity. 
> The cloud provider intimates the cluster manager about the possible loss of 
> node ahead of time. Few examples is listed here:
> a) Spot loss in AWS(2 min before event)
> b) GCP Pre-emptible VM loss (30 second before event)
> c) AWS Spot block loss with info on termination time (generally few tens of 
> minutes before decommission as configured in Yarn)
> This JIRA tries to make spark leverage the knowledge of the node loss in 
> future, and tries to adjust the scheduling of tasks to minimise the impact on 
> the application. 
> It is well known that when a host is lost, the executors, its running tasks, 
> their caches and also Shuffle data is lost. This could result in wastage of 
> compute and other resources.
> The focus here is to build a framework for YARN, that can be extended for 
> other cluster managers to handle such scenario.
> The framework must handle one or more of the following:-
> 1) Prevent new tasks from starting on any executors on decommissioning Nodes. 
> 2) Decide to kill the running tasks so that they can be restarted elsewhere 
> (assuming they will not complete within the deadline) OR we can allow them to 
> continue hoping they will finish within deadline.
> 3) Clear the shuffle data entry from MapOutputTracker of decommission node 
> hostname to prevent the shuffle fetchfailed exception.The most significant 
> advantage of unregistering shuffle outputs when Spark schedules the first 
> re-attempt to compute the missing blocks, it notices all of the missing 
> blocks from decommissioned nodes and recovers in only one attempt. This 
> speeds up the recovery process significantly over the scheduled Spark 
> implementation, where stages might be rescheduled multiple times to recompute 
> missing shuffles from all nodes, and prevent jobs from being stuck for hours 
> failing and recomputing.
> 4) Prevent the stage to abort due to the fetchfailed exception in case of 
> decommissioning of node. In Spark there is number of consecutive stage 
> attempts allowed before a stage is aborted.This is controlled by the config 
> spark.stage.maxConsecutiveAttempts. Not accounting fetch fails due 
> decommissioning of nodes towards stage failure improves the reliability of 
> the system.
> Main components of change
> 1) Get the ClusterInfo update from the Resource Manager -> Application Master 
> -> Spark Driver.
> 2) DecommissionTracker, resides inside driver, tracks all the decommissioned 
> nodes and take necessary action and state transition.
> 3) Based on the decommission node list add hooks at code to achieve
>  a) No new task on executor
>  b) Remove shuffle data mapping info for the node to be decommissioned from 
> the mapOutputTracker
>  c) Do not count fetchFailure from decommissioned towards stage failure
> On the receiving info that node is to be decommissioned, the below action 
> needs to be performed by DecommissionTracker on driver:
>  * Add the entry of Nodes in DecommissionTracker with termination time and 
> node state as "DECOMMISSIONING".
>  * Stop assigning any new tasks on executors on the nodes which are candidate 
> for decommission. This makes sure slowly as the tasks finish the usage of 
> this node would die down.
>  * Kill all the executors for the decommissioning nodes after configurable 
> period of time, say "spark.graceful.decommission.executor.leasetimePct". This 
> killing ensures two things. Firstly, the task failure will be attributed in 

[jira] [Commented] (SPARK-30873) Handling Node Decommissioning for Yarn cluster manger in Spark

2021-08-18 Thread Jason Xu (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-30873?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17401282#comment-17401282
 ] 

Jason Xu commented on SPARK-30873:
--

Hi Saurabh, I found your work is useful in dealing with Yarn node 
decommissioning. I have one questions: In order to get "DECOMMISSIONING" node 
state update from RM, does it require using YARN / Hadoop later than version 
3.0.1? Seems support is added in 
https://issues.apache.org/jira/browse/YARN-6483.

> Handling Node Decommissioning for Yarn cluster manger in Spark
> --
>
> Key: SPARK-30873
> URL: https://issues.apache.org/jira/browse/SPARK-30873
> Project: Spark
>  Issue Type: Improvement
>  Components: Spark Core, YARN
>Affects Versions: 3.1.0
>Reporter: Saurabh Chawla
>Priority: Major
>
> In many public cloud environments, the node loss (in case of AWS 
> SpotLoss,Spot blocks and GCP preemptible VMs) is a planned and informed 
> activity. 
> The cloud provider intimates the cluster manager about the possible loss of 
> node ahead of time. Few examples is listed here:
> a) Spot loss in AWS(2 min before event)
> b) GCP Pre-emptible VM loss (30 second before event)
> c) AWS Spot block loss with info on termination time (generally few tens of 
> minutes before decommission as configured in Yarn)
> This JIRA tries to make spark leverage the knowledge of the node loss in 
> future, and tries to adjust the scheduling of tasks to minimise the impact on 
> the application. 
> It is well known that when a host is lost, the executors, its running tasks, 
> their caches and also Shuffle data is lost. This could result in wastage of 
> compute and other resources.
> The focus here is to build a framework for YARN, that can be extended for 
> other cluster managers to handle such scenario.
> The framework must handle one or more of the following:-
> 1) Prevent new tasks from starting on any executors on decommissioning Nodes. 
> 2) Decide to kill the running tasks so that they can be restarted elsewhere 
> (assuming they will not complete within the deadline) OR we can allow them to 
> continue hoping they will finish within deadline.
> 3) Clear the shuffle data entry from MapOutputTracker of decommission node 
> hostname to prevent the shuffle fetchfailed exception.The most significant 
> advantage of unregistering shuffle outputs when Spark schedules the first 
> re-attempt to compute the missing blocks, it notices all of the missing 
> blocks from decommissioned nodes and recovers in only one attempt. This 
> speeds up the recovery process significantly over the scheduled Spark 
> implementation, where stages might be rescheduled multiple times to recompute 
> missing shuffles from all nodes, and prevent jobs from being stuck for hours 
> failing and recomputing.
> 4) Prevent the stage to abort due to the fetchfailed exception in case of 
> decommissioning of node. In Spark there is number of consecutive stage 
> attempts allowed before a stage is aborted.This is controlled by the config 
> spark.stage.maxConsecutiveAttempts. Not accounting fetch fails due 
> decommissioning of nodes towards stage failure improves the reliability of 
> the system.
> Main components of change
> 1) Get the ClusterInfo update from the Resource Manager -> Application Master 
> -> Spark Driver.
> 2) DecommissionTracker, resides inside driver, tracks all the decommissioned 
> nodes and take necessary action and state transition.
> 3) Based on the decommission node list add hooks at code to achieve
>  a) No new task on executor
>  b) Remove shuffle data mapping info for the node to be decommissioned from 
> the mapOutputTracker
>  c) Do not count fetchFailure from decommissioned towards stage failure
> On the receiving info that node is to be decommissioned, the below action 
> needs to be performed by DecommissionTracker on driver:
>  * Add the entry of Nodes in DecommissionTracker with termination time and 
> node state as "DECOMMISSIONING".
>  * Stop assigning any new tasks on executors on the nodes which are candidate 
> for decommission. This makes sure slowly as the tasks finish the usage of 
> this node would die down.
>  * Kill all the executors for the decommissioning nodes after configurable 
> period of time, say "spark.graceful.decommission.executor.leasetimePct". This 
> killing ensures two things. Firstly, the task failure will be attributed in 
> job failure count. Second, avoid generation on more shuffle data on the node 
> that will eventually be lost. The node state is set to 
> "EXECUTOR_DECOMMISSIONED". 
>  * Mark Shuffle data on the node as unavailable after 
> "spark.qubole.graceful.decommission.shuffedata.leasetimePct" time. This will 
> ensure that recomputation of missing shuffle partition is done early, rather 
> than 

[jira] [Resolved] (SPARK-36440) Spark3 fails to read hive table with mixed format

2021-08-06 Thread Jason Xu (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-36440?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Xu resolved SPARK-36440.
--
Fix Version/s: 3.2.0
   Resolution: Fixed

Already fixed by https://issues.apache.org/jira/browse/SPARK-36197 for future 
versions.

> Spark3 fails to read hive table with mixed format
> -
>
> Key: SPARK-36440
> URL: https://issues.apache.org/jira/browse/SPARK-36440
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.0.0, 3.1.1, 3.1.2
>Reporter: Jason Xu
>Priority: Major
> Fix For: 3.2.0
>
>
> Spark3 fails to read hive table with mixed format with hive Serde, this is a 
> regression compares to Spark 2.4. 
> Replication steps :
>  1. In spark 3 (3.0 or 3.1) spark shell:
> {code:java}
> scala> spark.sql("create table tmp.test_table (id int, name string) 
> partitioned by (pt int) stored as rcfile")
> scala> spark.sql("insert into tmp.test_table (pt = 1) values (1, 'Alice'), 
> (2, 'Bob')")
> {code}
> 2. Run hive command to change table file format (from RCFile to Parquet).
> {code:java}
> hive (default)> alter table set tmp.test_table fileformat Parquet;
> {code}
> 3. Try to read partition (in RCFile format) with hive serde using Spark shell:
> {code:java}
> scala> spark.conf.set("spark.sql.hive.convertMetastoreParquet", "false")
> scala> spark.sql("select * from tmp.test_table where pt=1").show{code}
> Exception: (anonymized file path with )
> {code:java}
> Caused by: java.lang.RuntimeException: 
> s3a:///data/part-0-22112178-5dd7-4065-89d7-2ee550296909-c000 is not 
> a Parquet file. expected magic number at tail [80, 65, 82, 49] but found [5, 
> 96, 1, -33]
>   at 
> org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:524)
>   at 
> org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:505)
>   at 
> org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:499)
>   at 
> org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:448)
>   at 
> org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:433)
>   at 
> org.apache.hadoop.hive.ql.io.parquet.ParquetRecordReaderBase.getSplit(ParquetRecordReaderBase.java:79)
>   at 
> org.apache.hadoop.hive.ql.io.parquet.read.ParquetRecordReaderWrapper.(ParquetRecordReaderWrapper.java:75)
>   at 
> org.apache.hadoop.hive.ql.io.parquet.read.ParquetRecordReaderWrapper.(ParquetRecordReaderWrapper.java:60)
>   at 
> org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat.getRecordReader(MapredParquetInputFormat.java:75)
>   at org.apache.spark.rdd.HadoopRDD$$anon$1.liftedTree1$1(HadoopRDD.scala:286)
>   at org.apache.spark.rdd.HadoopRDD$$anon$1.(HadoopRDD.scala:285)
>   at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:243)
>   at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:96)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
>   at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
>   at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>   {code}
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-36440) Spark3 fails to read hive table with mixed format

2021-08-06 Thread Jason Xu (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-36440?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17394995#comment-17394995
 ] 

Jason Xu commented on SPARK-36440:
--

I found the root cause it's due to [https://github.com/apache/spark/pull/23559] 
removed partition input format support in Spark 2.4.
While I creating a fix PR for this issue, found it was recently fixed in 
https://issues.apache.org/jira/browse/SPARK-36197 and will be available in 
3.2.0.

> Spark3 fails to read hive table with mixed format
> -
>
> Key: SPARK-36440
> URL: https://issues.apache.org/jira/browse/SPARK-36440
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.0.0, 3.1.1, 3.1.2
>Reporter: Jason Xu
>Priority: Major
>
> Spark3 fails to read hive table with mixed format with hive Serde, this is a 
> regression compares to Spark 2.4. 
> Replication steps :
>  1. In spark 3 (3.0 or 3.1) spark shell:
> {code:java}
> scala> spark.sql("create table tmp.test_table (id int, name string) 
> partitioned by (pt int) stored as rcfile")
> scala> spark.sql("insert into tmp.test_table (pt = 1) values (1, 'Alice'), 
> (2, 'Bob')")
> {code}
> 2. Run hive command to change table file format (from RCFile to Parquet).
> {code:java}
> hive (default)> alter table set tmp.test_table fileformat Parquet;
> {code}
> 3. Try to read partition (in RCFile format) with hive serde using Spark shell:
> {code:java}
> scala> spark.conf.set("spark.sql.hive.convertMetastoreParquet", "false")
> scala> spark.sql("select * from tmp.test_table where pt=1").show{code}
> Exception: (anonymized file path with )
> {code:java}
> Caused by: java.lang.RuntimeException: 
> s3a:///data/part-0-22112178-5dd7-4065-89d7-2ee550296909-c000 is not 
> a Parquet file. expected magic number at tail [80, 65, 82, 49] but found [5, 
> 96, 1, -33]
>   at 
> org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:524)
>   at 
> org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:505)
>   at 
> org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:499)
>   at 
> org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:448)
>   at 
> org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:433)
>   at 
> org.apache.hadoop.hive.ql.io.parquet.ParquetRecordReaderBase.getSplit(ParquetRecordReaderBase.java:79)
>   at 
> org.apache.hadoop.hive.ql.io.parquet.read.ParquetRecordReaderWrapper.(ParquetRecordReaderWrapper.java:75)
>   at 
> org.apache.hadoop.hive.ql.io.parquet.read.ParquetRecordReaderWrapper.(ParquetRecordReaderWrapper.java:60)
>   at 
> org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat.getRecordReader(MapredParquetInputFormat.java:75)
>   at org.apache.spark.rdd.HadoopRDD$$anon$1.liftedTree1$1(HadoopRDD.scala:286)
>   at org.apache.spark.rdd.HadoopRDD$$anon$1.(HadoopRDD.scala:285)
>   at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:243)
>   at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:96)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
>   at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
>   at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>   {code}
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-36440) Spark3 fails to read hive table with mixed format

2021-08-05 Thread Jason Xu (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-36440?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Xu updated SPARK-36440:
-
Description: 
Spark3 fails to read hive table with mixed format with hive Serde, this is a 
regression compares to Spark 2.4. 

Replication steps :
 1. In spark 3 (3.0 or 3.1) spark shell:
{code:java}
scala> spark.sql("create table tmp.test_table (id int, name string) partitioned 
by (pt int) stored as rcfile")

scala> spark.sql("insert into tmp.test_table (pt = 1) values (1, 'Alice'), (2, 
'Bob')")
{code}
2. Run hive command to change table file format (from RCFile to Parquet).
{code:java}
hive (default)> alter table set tmp.test_table fileformat Parquet;
{code}
3. Try to read partition (in RCFile format) with hive serde using Spark shell:
{code:java}
scala> spark.conf.set("spark.sql.hive.convertMetastoreParquet", "false")

scala> spark.sql("select * from tmp.test_table where pt=1").show{code}
Exception: (anonymized file path with )
{code:java}
Caused by: java.lang.RuntimeException: 
s3a:///data/part-0-22112178-5dd7-4065-89d7-2ee550296909-c000 is not a 
Parquet file. expected magic number at tail [80, 65, 82, 49] but found [5, 96, 
1, -33]
  at 
org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:524)
  at 
org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:505)
  at 
org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:499)
  at 
org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:448)
  at 
org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:433)
  at 
org.apache.hadoop.hive.ql.io.parquet.ParquetRecordReaderBase.getSplit(ParquetRecordReaderBase.java:79)
  at 
org.apache.hadoop.hive.ql.io.parquet.read.ParquetRecordReaderWrapper.(ParquetRecordReaderWrapper.java:75)
  at 
org.apache.hadoop.hive.ql.io.parquet.read.ParquetRecordReaderWrapper.(ParquetRecordReaderWrapper.java:60)
  at 
org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat.getRecordReader(MapredParquetInputFormat.java:75)
  at org.apache.spark.rdd.HadoopRDD$$anon$1.liftedTree1$1(HadoopRDD.scala:286)
  at org.apache.spark.rdd.HadoopRDD$$anon$1.(HadoopRDD.scala:285)
  at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:243)
  at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:96)
  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
  at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
  at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
  {code}
 

 

 

  was:
Spark3 fails to read hive table with mixed format with hive Serde, this is a 
regression compares to Spark 2.4. 

Replication steps :
 1. In spark 3 (3.0 or 3.1) spark shell:
{code:java}
scala> spark.sql("create table tmp.test_table (id int, name string) partitioned 
by (pt int) stored as rcfile")

scala> spark.sql("insert into tmp.test_table (pt = 1) values (1, 'Alice'), (2, 
'Bob')")
{code}
2. Run hive command to change table format (from RCFile to Parquet).
{code:java}
hive (default)> alter table set tmp.test_table fileformat Parquet;
{code}
3. Try to read partition (in RCFile format) with hive serde using Spark shell:
{code:java}
scala> spark.conf.set("spark.sql.hive.convertMetastoreParquet", "false")

scala> spark.sql("select * from tmp.test_table where pt=1").show{code}
Exception: (anonymized file path with )
{code:java}
Caused by: java.lang.RuntimeException: 
s3a:///data/part-0-22112178-5dd7-4065-89d7-2ee550296909-c000 is not a 
Parquet file. expected magic number at tail [80, 65, 82, 49] but found [5, 96, 
1, -33]
  at 
org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:524)
  at 
org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:505)
  at 
org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:499)
  at 
org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:448)
  at 
org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:433)
  at 
org.apache.hadoop.hive.ql.io.parquet.ParquetRecordReaderBase.getSplit(ParquetRecordReaderBase.java:79)
  at 
org.apache.hadoop.hive.ql.io.parquet.read.ParquetRecordReaderWrapper.(ParquetRecordReaderWrapper.java:75)
  at 
org.apache.hadoop.hive.ql.io.parquet.read.ParquetRecordReaderWrapper.(ParquetRecordReaderWrapper.java:60)
  at 
org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat.getRecordReader(MapredParquetInputFormat.java:75)
  at org.apache.spark.rdd.HadoopRDD$$anon$1.liftedTree1$1(HadoopRDD.scala:286)
  at org.apache.spark.rdd.HadoopRDD$$anon$1.(HadoopRDD.scala:285)
  at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:243)

[jira] [Updated] (SPARK-36440) Spark3 fails to read hive table with mixed format

2021-08-05 Thread Jason Xu (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-36440?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Xu updated SPARK-36440:
-
Description: 
Spark3 fails to read hive table with mixed format with hive Serde, this is a 
regression compares to Spark 2.4. 

Replication steps :
 1. In spark 3 (3.0 or 3.1) spark shell:
{code:java}
scala> spark.sql("create table tmp.test_table (id int, name string) partitioned 
by (pt int) stored as rcfile")

scala> spark.sql("insert into tmp.test_table (pt = 1) values (1, 'Alice'), (2, 
'Bob')")
{code}
2. Run hive command to change table format (from RCFile to Parquet).
{code:java}
hive (default)> alter table set tmp.test_table fileformat Parquet;
{code}
3. Try to read partition (in RCFile format) with hive serde using Spark shell:
{code:java}
scala> spark.conf.set("spark.sql.hive.convertMetastoreParquet", "false")

scala> spark.sql("select * from tmp.test_table where pt=1").show{code}
Exception: (anonymized file path with )
{code:java}
Caused by: java.lang.RuntimeException: 
s3a:///data/part-0-22112178-5dd7-4065-89d7-2ee550296909-c000 is not a 
Parquet file. expected magic number at tail [80, 65, 82, 49] but found [5, 96, 
1, -33]
  at 
org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:524)
  at 
org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:505)
  at 
org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:499)
  at 
org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:448)
  at 
org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:433)
  at 
org.apache.hadoop.hive.ql.io.parquet.ParquetRecordReaderBase.getSplit(ParquetRecordReaderBase.java:79)
  at 
org.apache.hadoop.hive.ql.io.parquet.read.ParquetRecordReaderWrapper.(ParquetRecordReaderWrapper.java:75)
  at 
org.apache.hadoop.hive.ql.io.parquet.read.ParquetRecordReaderWrapper.(ParquetRecordReaderWrapper.java:60)
  at 
org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat.getRecordReader(MapredParquetInputFormat.java:75)
  at org.apache.spark.rdd.HadoopRDD$$anon$1.liftedTree1$1(HadoopRDD.scala:286)
  at org.apache.spark.rdd.HadoopRDD$$anon$1.(HadoopRDD.scala:285)
  at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:243)
  at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:96)
  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
  at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
  at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
  {code}
 

 

 

  was:
Spark3 fails to read hive table with mixed format with hive Serde, this is a 
regression compares to Spark 2.4. 

Replication steps :
1. In spark 3 (3.0 or 3.1) spark shell:
{code:java}
scala> spark.sql("create table tmp.test_table (id int, name string) partitioned 
by (pt int) stored as rcfile")

scala> spark.sql("insert into tmp.test_table (pt = 1) values (1, 'Alice'), (2, 
'Bob')"
{code}
2. Run hive command to change table format (from RCFile to Parquet).
{code:java}
hive (default)> alter table set tmp.test_table fileformat Parquet;
{code}
3. Try to read partition (in RCFile format) with hive serde using Spark shell:
{code:java}
scala> spark.conf.set("spark.sql.hive.convertMetastoreParquet", "false")

scala> spark.sql("select * from tmp.test_table where pt=1").show{code}
Exception: (anonymized file path with )
{code:java}
Caused by: java.lang.RuntimeException: 
s3a:///data/part-0-22112178-5dd7-4065-89d7-2ee550296909-c000 is not a 
Parquet file. expected magic number at tail [80, 65, 82, 49] but found [5, 96, 
1, -33]
  at 
org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:524)
  at 
org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:505)
  at 
org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:499)
  at 
org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:448)
  at 
org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:433)
  at 
org.apache.hadoop.hive.ql.io.parquet.ParquetRecordReaderBase.getSplit(ParquetRecordReaderBase.java:79)
  at 
org.apache.hadoop.hive.ql.io.parquet.read.ParquetRecordReaderWrapper.(ParquetRecordReaderWrapper.java:75)
  at 
org.apache.hadoop.hive.ql.io.parquet.read.ParquetRecordReaderWrapper.(ParquetRecordReaderWrapper.java:60)
  at 
org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat.getRecordReader(MapredParquetInputFormat.java:75)
  at org.apache.spark.rdd.HadoopRDD$$anon$1.liftedTree1$1(HadoopRDD.scala:286)
  at org.apache.spark.rdd.HadoopRDD$$anon$1.(HadoopRDD.scala:285)
  at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:243)
  at o

[jira] [Created] (SPARK-36440) Spark3 fails to read hive table with mixed format

2021-08-05 Thread Jason Xu (Jira)
Jason Xu created SPARK-36440:


 Summary: Spark3 fails to read hive table with mixed format
 Key: SPARK-36440
 URL: https://issues.apache.org/jira/browse/SPARK-36440
 Project: Spark
  Issue Type: Bug
  Components: SQL
Affects Versions: 3.1.2, 3.1.1, 3.0.0
Reporter: Jason Xu


Spark3 fails to read hive table with mixed format with hive Serde, this is a 
regression compares to Spark 2.4. 

Replication steps :
1. In spark 3 (3.0 or 3.1) spark shell:
{code:java}
scala> spark.sql("create table tmp.test_table (id int, name string) partitioned 
by (pt int) stored as rcfile")

scala> spark.sql("insert into tmp.test_table (pt = 1) values (1, 'Alice'), (2, 
'Bob')"
{code}
2. Run hive command to change table format (from RCFile to Parquet).
{code:java}
hive (default)> alter table set tmp.test_table fileformat Parquet;
{code}
3. Try to read partition (in RCFile format) with hive serde using Spark shell:
{code:java}
scala> spark.conf.set("spark.sql.hive.convertMetastoreParquet", "false")

scala> spark.sql("select * from tmp.test_table where pt=1").show{code}
Exception: (anonymized file path with )
{code:java}
Caused by: java.lang.RuntimeException: 
s3a:///data/part-0-22112178-5dd7-4065-89d7-2ee550296909-c000 is not a 
Parquet file. expected magic number at tail [80, 65, 82, 49] but found [5, 96, 
1, -33]
  at 
org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:524)
  at 
org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:505)
  at 
org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:499)
  at 
org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:448)
  at 
org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:433)
  at 
org.apache.hadoop.hive.ql.io.parquet.ParquetRecordReaderBase.getSplit(ParquetRecordReaderBase.java:79)
  at 
org.apache.hadoop.hive.ql.io.parquet.read.ParquetRecordReaderWrapper.(ParquetRecordReaderWrapper.java:75)
  at 
org.apache.hadoop.hive.ql.io.parquet.read.ParquetRecordReaderWrapper.(ParquetRecordReaderWrapper.java:60)
  at 
org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat.getRecordReader(MapredParquetInputFormat.java:75)
  at org.apache.spark.rdd.HadoopRDD$$anon$1.liftedTree1$1(HadoopRDD.scala:286)
  at org.apache.spark.rdd.HadoopRDD$$anon$1.(HadoopRDD.scala:285)
  at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:243)
  at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:96)
  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
  at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
  at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
  {code}
 

 

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org