[jira] [Updated] (SPARK-28699) Cache an indeterminate RDD could lead to incorrect result while stage rerun
[ https://issues.apache.org/jira/browse/SPARK-28699?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yuanjian Li updated SPARK-28699: Description: It's another case for the indeterminate stage/RDD rerun while stage rerun happened. We can reproduce this by the following code, thanks to Tyson for reporting this! {code:scala} import scala.sys.process._ import org.apache.spark.TaskContext val res = spark.range(0, 1 * 1, 1).map{ x => (x % 1000, x)} // kill an executor in the stage that performs repartition(239) val df = res.repartition(113).cache.repartition(239).map { x => if (TaskContext.get.attemptNumber == 0 && TaskContext.get.partitionId < 1 && TaskContext.get.stageAttemptNumber == 0) { throw new Exception("pkill -f -n java".!!) } x } val r2 = df.distinct.count() {code} was: It's another case for the indeterminate stage/RDD rerun while stage rerun happened. In the CachedRDDBuilder. We can reproduce this by the following code, thanks to Tyson for reporting this! {code:scala} import scala.sys.process._ import org.apache.spark.TaskContext val res = spark.range(0, 1 * 1, 1).map{ x => (x % 1000, x)} // kill an executor in the stage that performs repartition(239) val df = res.repartition(113).cache.repartition(239).map { x => if (TaskContext.get.attemptNumber == 0 && TaskContext.get.partitionId < 1 && TaskContext.get.stageAttemptNumber == 0) { throw new Exception("pkill -f -n java".!!) } x } val r2 = df.distinct.count() {code} > Cache an indeterminate RDD could lead to incorrect result while stage rerun > --- > > Key: SPARK-28699 > URL: https://issues.apache.org/jira/browse/SPARK-28699 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.3.3, 3.0.0, 2.4.3 >Reporter: Yuanjian Li >Priority: Blocker > Labels: correctness > > It's another case for the indeterminate stage/RDD rerun while stage rerun > happened. > We can reproduce this by the following code, thanks to Tyson for reporting > this! > > {code:scala} > import scala.sys.process._ > import org.apache.spark.TaskContext > val res = spark.range(0, 1 * 1, 1).map{ x => (x % 1000, x)} > // kill an executor in the stage that performs repartition(239) > val df = res.repartition(113).cache.repartition(239).map { x => > if (TaskContext.get.attemptNumber == 0 && TaskContext.get.partitionId < 1 && > TaskContext.get.stageAttemptNumber == 0) { > throw new Exception("pkill -f -n java".!!) > } > x > } > val r2 = df.distinct.count() > {code} -- This message was sent by Atlassian Jira (v8.3.2#803003) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-28699) Cache an indeterminate RDD could lead to incorrect result while stage rerun
[ https://issues.apache.org/jira/browse/SPARK-28699?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dongjoon Hyun updated SPARK-28699: -- Affects Version/s: 2.3.3 2.4.3 > Cache an indeterminate RDD could lead to incorrect result while stage rerun > --- > > Key: SPARK-28699 > URL: https://issues.apache.org/jira/browse/SPARK-28699 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.3.3, 3.0.0, 2.4.3 >Reporter: Yuanjian Li >Priority: Blocker > Labels: correctness > > It's another case for the indeterminate stage/RDD rerun while stage rerun > happened. In the CachedRDDBuilder. > We can reproduce this by the following code, thanks to Tyson for reporting > this! > > {code:scala} > import scala.sys.process._ > import org.apache.spark.TaskContext > val res = spark.range(0, 1 * 1, 1).map{ x => (x % 1000, x)} > // kill an executor in the stage that performs repartition(239) > val df = res.repartition(113).cache.repartition(239).map { x => > if (TaskContext.get.attemptNumber == 0 && TaskContext.get.partitionId < 1 && > TaskContext.get.stageAttemptNumber == 0) { > throw new Exception("pkill -f -n java".!!) > } > x > } > val r2 = df.distinct.count() > {code} -- This message was sent by Atlassian Jira (v8.3.2#803003) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-28699) Cache an indeterminate RDD could lead to incorrect result while stage rerun
[ https://issues.apache.org/jira/browse/SPARK-28699?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yuanjian Li updated SPARK-28699: Affects Version/s: 2.3.3 2.4.3 > Cache an indeterminate RDD could lead to incorrect result while stage rerun > --- > > Key: SPARK-28699 > URL: https://issues.apache.org/jira/browse/SPARK-28699 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.3.3, 3.0.0, 2.4.3 >Reporter: Yuanjian Li >Priority: Blocker > Labels: correctness > > It's another case for the indeterminate stage/RDD rerun while stage rerun > happened. In the CachedRDDBuilder. > We can reproduce this by the following code, thanks to Tyson for reporting > this! > > {code:scala} > import scala.sys.process._ > import org.apache.spark.TaskContext > val res = spark.range(0, 1 * 1, 1).map{ x => (x % 1000, x)} > // kill an executor in the stage that performs repartition(239) > val df = res.repartition(113).cache.repartition(239).map { x => > if (TaskContext.get.attemptNumber == 0 && TaskContext.get.partitionId < 1 && > TaskContext.get.stageAttemptNumber == 0) { > throw new Exception("pkill -f -n java".!!) > } > x > } > val r2 = df.distinct.count() > {code} -- This message was sent by Atlassian Jira (v8.3.2#803003) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-28699) Cache an indeterminate RDD could lead to incorrect result while stage rerun
[ https://issues.apache.org/jira/browse/SPARK-28699?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dongjoon Hyun updated SPARK-28699: -- Target Version/s: 2.3.4, 2.4.4 Affects Version/s: (was: 2.4.3) (was: 2.3.3) > Cache an indeterminate RDD could lead to incorrect result while stage rerun > --- > > Key: SPARK-28699 > URL: https://issues.apache.org/jira/browse/SPARK-28699 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 3.0.0 >Reporter: Yuanjian Li >Priority: Blocker > Labels: correctness > > It's another case for the indeterminate stage/RDD rerun while stage rerun > happened. In the CachedRDDBuilder. > We can reproduce this by the following code, thanks to Tyson for reporting > this! > > {code:scala} > import scala.sys.process._ > import org.apache.spark.TaskContext > val res = spark.range(0, 1 * 1, 1).map{ x => (x % 1000, x)} > // kill an executor in the stage that performs repartition(239) > val df = res.repartition(113).cache.repartition(239).map { x => > if (TaskContext.get.attemptNumber == 0 && TaskContext.get.partitionId < 1 && > TaskContext.get.stageAttemptNumber == 0) { > throw new Exception("pkill -f -n java".!!) > } > x > } > val r2 = df.distinct.count() > {code} -- This message was sent by Atlassian Jira (v8.3.2#803003) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-28699) Cache an indeterminate RDD could lead to incorrect result while stage rerun
[ https://issues.apache.org/jira/browse/SPARK-28699?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dongjoon Hyun updated SPARK-28699: -- Priority: Blocker (was: Major) > Cache an indeterminate RDD could lead to incorrect result while stage rerun > --- > > Key: SPARK-28699 > URL: https://issues.apache.org/jira/browse/SPARK-28699 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 3.0.0 >Reporter: Yuanjian Li >Priority: Blocker > Labels: correctness > > It's another case for the indeterminate stage/RDD rerun while stage rerun > happened. In the CachedRDDBuilder. > We can reproduce this by the following code, thanks to Tyson for reporting > this! > > {code:scala} > import scala.sys.process._ > import org.apache.spark.TaskContext > val res = spark.range(0, 1 * 1, 1).map{ x => (x % 1000, x)} > // kill an executor in the stage that performs repartition(239) > val df = res.repartition(113).cache.repartition(239).map { x => > if (TaskContext.get.attemptNumber == 0 && TaskContext.get.partitionId < 1 && > TaskContext.get.stageAttemptNumber == 0) { > throw new Exception("pkill -f -n java".!!) > } > x > } > val r2 = df.distinct.count() > {code} -- This message was sent by Atlassian Jira (v8.3.2#803003) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-28699) Cache an indeterminate RDD could lead to incorrect result while stage rerun
[ https://issues.apache.org/jira/browse/SPARK-28699?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dongjoon Hyun updated SPARK-28699: -- Description: It's another case for the indeterminate stage/RDD rerun while stage rerun happened. In the CachedRDDBuilder. We can reproduce this by the following code, thanks to Tyson for reporting this! {code:scala} import scala.sys.process._ import org.apache.spark.TaskContext val res = spark.range(0, 1 * 1, 1).map{ x => (x % 1000, x)} // kill an executor in the stage that performs repartition(239) val df = res.repartition(113).cache.repartition(239).map { x => if (TaskContext.get.attemptNumber == 0 && TaskContext.get.partitionId < 1 && TaskContext.get.stageAttemptNumber == 0) { throw new Exception("pkill -f -n java".!!) } x } val r2 = df.distinct.count() {code} was: Related with SPARK-23207 SPARK-23243 It's another case for the indeterminate stage/RDD rerun while stage rerun happened. In the CachedRDDBuilder. We can reproduce this by the following code, thanks to Tyson for reporting this! {code:scala} import scala.sys.process._ import org.apache.spark.TaskContext val res = spark.range(0, 1 * 1, 1).map{ x => (x % 1000, x)} // kill an executor in the stage that performs repartition(239) val df = res.repartition(113).cache.repartition(239).map { x => if (TaskContext.get.attemptNumber == 0 && TaskContext.get.partitionId < 1 && TaskContext.get.stageAttemptNumber == 0) { throw new Exception("pkill -f -n java".!!) } x } val r2 = df.distinct.count() {code} > Cache an indeterminate RDD could lead to incorrect result while stage rerun > --- > > Key: SPARK-28699 > URL: https://issues.apache.org/jira/browse/SPARK-28699 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 3.0.0 >Reporter: Yuanjian Li >Priority: Major > Labels: correctness > > It's another case for the indeterminate stage/RDD rerun while stage rerun > happened. In the CachedRDDBuilder. > We can reproduce this by the following code, thanks to Tyson for reporting > this! > > {code:scala} > import scala.sys.process._ > import org.apache.spark.TaskContext > val res = spark.range(0, 1 * 1, 1).map{ x => (x % 1000, x)} > // kill an executor in the stage that performs repartition(239) > val df = res.repartition(113).cache.repartition(239).map { x => > if (TaskContext.get.attemptNumber == 0 && TaskContext.get.partitionId < 1 && > TaskContext.get.stageAttemptNumber == 0) { > throw new Exception("pkill -f -n java".!!) > } > x > } > val r2 = df.distinct.count() > {code} -- This message was sent by Atlassian Jira (v8.3.2#803003) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-28699) Cache an indeterminate RDD could lead to incorrect result while stage rerun
[ https://issues.apache.org/jira/browse/SPARK-28699?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dongjoon Hyun updated SPARK-28699: -- Description: Related with SPARK-23207 SPARK-23243 It's another case for the indeterminate stage/RDD rerun while stage rerun happened. In the CachedRDDBuilder. We can reproduce this by the following code, thanks to Tyson for reporting this! {code:scala} import scala.sys.process._ import org.apache.spark.TaskContext val res = spark.range(0, 1 * 1, 1).map{ x => (x % 1000, x)} // kill an executor in the stage that performs repartition(239) val df = res.repartition(113).cache.repartition(239).map { x => if (TaskContext.get.attemptNumber == 0 && TaskContext.get.partitionId < 1 && TaskContext.get.stageAttemptNumber == 0) { throw new Exception("pkill -f -n java".!!) } x } val r2 = df.distinct.count() {code} was: Related with SPARK-23207 SPARK-23243 It's another case for the indeterminate stage/RDD rerun while stage rerun happened. In the CachedRDDBuilder. We can reproduce this by the following code, thanks to Tyson for reporting this! {code:scala} import scala.sys.process._ import org.apache.spark.TaskContext val res = spark.range(0, 1 * 1, 1).map\{ x => (x % 1000, x)} // kill an executor in the stage that performs repartition(239) val df = res.repartition(113).cache.repartition(239).map { x => if (TaskContext.get.attemptNumber == 0 && TaskContext.get.partitionId < 1 && TaskContext.get.stageAttemptNumber == 0) { throw new Exception("pkill -f -n java".!!) } x } val r2 = df.distinct.count() {code} > Cache an indeterminate RDD could lead to incorrect result while stage rerun > --- > > Key: SPARK-28699 > URL: https://issues.apache.org/jira/browse/SPARK-28699 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 3.0.0 >Reporter: Yuanjian Li >Priority: Major > Labels: correctness > > Related with SPARK-23207 SPARK-23243 > It's another case for the indeterminate stage/RDD rerun while stage rerun > happened. In the CachedRDDBuilder. > We can reproduce this by the following code, thanks to Tyson for reporting > this! > > {code:scala} > import scala.sys.process._ > import org.apache.spark.TaskContext > val res = spark.range(0, 1 * 1, 1).map{ x => (x % 1000, x)} > // kill an executor in the stage that performs repartition(239) > val df = res.repartition(113).cache.repartition(239).map { x => > if (TaskContext.get.attemptNumber == 0 && TaskContext.get.partitionId < 1 && > TaskContext.get.stageAttemptNumber == 0) { > throw new Exception("pkill -f -n java".!!) > } > x > } > val r2 = df.distinct.count() > {code} -- This message was sent by Atlassian Jira (v8.3.2#803003) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-28699) Cache an indeterminate RDD could lead to incorrect result while stage rerun
[ https://issues.apache.org/jira/browse/SPARK-28699?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yuanjian Li updated SPARK-28699: Description: Related with SPARK-23207 SPARK-23243 It's another case for the indeterminate stage/RDD rerun while stage rerun happened. In the CachedRDDBuilder. We can reproduce this by the following code, thanks to Tyson for reporting this! {code:scala} import scala.sys.process._ import org.apache.spark.TaskContext val res = spark.range(0, 1 * 1, 1).map\{ x => (x % 1000, x)} // kill an executor in the stage that performs repartition(239) val df = res.repartition(113).cache.repartition(239).map { x => if (TaskContext.get.attemptNumber == 0 && TaskContext.get.partitionId < 1 && TaskContext.get.stageAttemptNumber == 0) { throw new Exception("pkill -f -n java".!!) } x } val r2 = df.distinct.count() {code} was: Related with SPARK-23207 SPARK-23243 It's another case for the indeterminate stage/RDD rerun while stage rerun happened. In the CachedRDDBuilder, we miss tracking the `isOrderSensitive` characteristic to the newly created MapPartitionsRDD. We can reproduce this by the following code, thanks to Tyson for reporting this! {code:scala} import scala.sys.process._ import org.apache.spark.TaskContext val res = spark.range(0, 1 * 1, 1).map\{ x => (x % 1000, x)} // kill an executor in the stage that performs repartition(239) val df = res.repartition(113).cache.repartition(239).map { x => if (TaskContext.get.attemptNumber == 0 && TaskContext.get.partitionId < 1 && TaskContext.get.stageAttemptNumber == 0) { throw new Exception("pkill -f -n java".!!) } x } val r2 = df.distinct.count() {code} > Cache an indeterminate RDD could lead to incorrect result while stage rerun > --- > > Key: SPARK-28699 > URL: https://issues.apache.org/jira/browse/SPARK-28699 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 3.0.0 >Reporter: Yuanjian Li >Priority: Major > Labels: correctness > > Related with SPARK-23207 SPARK-23243 > It's another case for the indeterminate stage/RDD rerun while stage rerun > happened. In the CachedRDDBuilder. > We can reproduce this by the following code, thanks to Tyson for reporting > this! > > {code:scala} > import scala.sys.process._ > import org.apache.spark.TaskContext > val res = spark.range(0, 1 * 1, 1).map\{ x => (x % 1000, x)} > // kill an executor in the stage that performs repartition(239) > val df = res.repartition(113).cache.repartition(239).map { x => > if (TaskContext.get.attemptNumber == 0 && TaskContext.get.partitionId < 1 && > TaskContext.get.stageAttemptNumber == 0) { > throw new Exception("pkill -f -n java".!!) > } > x > } > val r2 = df.distinct.count() > {code} -- This message was sent by Atlassian Jira (v8.3.2#803003) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-28699) Cache an indeterminate RDD could lead to incorrect result while stage rerun
[ https://issues.apache.org/jira/browse/SPARK-28699?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Josh Rosen updated SPARK-28699: --- Labels: correctness (was: ) > Cache an indeterminate RDD could lead to incorrect result while stage rerun > --- > > Key: SPARK-28699 > URL: https://issues.apache.org/jira/browse/SPARK-28699 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 3.0.0 >Reporter: Yuanjian Li >Priority: Major > Labels: correctness > > Related with SPARK-23207 SPARK-23243 > It's another case for the indeterminate stage/RDD rerun while stage rerun > happened. In the CachedRDDBuilder, we miss tracking the `isOrderSensitive` > characteristic to the newly created MapPartitionsRDD. > We can reproduce this by the following code, thanks to Tyson for reporting > this! > > {code:scala} > import scala.sys.process._ > import org.apache.spark.TaskContext > val res = spark.range(0, 1 * 1, 1).map\{ x => (x % 1000, x)} > // kill an executor in the stage that performs repartition(239) > val df = res.repartition(113).cache.repartition(239).map { x => > if (TaskContext.get.attemptNumber == 0 && TaskContext.get.partitionId < 1 && > TaskContext.get.stageAttemptNumber == 0) { > throw new Exception("pkill -f -n java".!!) > } > x > } > val r2 = df.distinct.count() > {code} -- This message was sent by Atlassian Jira (v8.3.2#803003) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org