[jira] [Comment Edited] (SPARK-26880) dataDF.queryExecution.toRdd corrupt rows
[ https://issues.apache.org/jira/browse/SPARK-26880?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16770866#comment-16770866 ] Jungtaek Lim edited comment on SPARK-26880 at 2/18/19 8:40 AM: --- Just submitted a PR to add note on proper usage of "QueryExecution.toRdd". https://github.com/apache/spark/pull/23822 Given this issue is filed as a bug, I just submitted PR as MINOR and don't link my PR to this issue. Marking this issue as 'Resolved' would provide false-alarm - marking this issue as "invalid" or "information provided" would be OK for this? was (Author: kabhwan): Just submitted a PR to add note on proper usage of "QueryExecution.toRdd". https://github.com/apache/spark/pull/23822 Given this issue is filed as a bug, I just submitted PR as MINOR and don't link my PR to this issue. Marking this issue as 'Resolved' would provide false-alarm. > dataDF.queryExecution.toRdd corrupt rows > > > Key: SPARK-26880 > URL: https://issues.apache.org/jira/browse/SPARK-26880 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.4.0 >Reporter: Grant Henke >Priority: Major > > I have seen a simple case where InternalRows returned by > `queryExecution.toRdd` are corrupt. Some rows are duplicated while other are > missing. > This simple test illustrates the issue: > {code} > import org.apache.spark.SparkConf > import org.apache.spark.sql.Row > import org.apache.spark.sql.SparkSession > import org.apache.spark.sql.catalyst.encoders.RowEncoder > import org.apache.spark.sql.types.IntegerType > import org.apache.spark.sql.types.StringType > import org.apache.spark.sql.types.StructField > import org.apache.spark.sql.types.StructType > import org.junit.Assert._ > import org.junit.Test > import org.scalatest.Matchers > import org.scalatest.junit.JUnitSuite > import org.slf4j.Logger > import org.slf4j.LoggerFactory > class SparkTest extends JUnitSuite with Matchers { > val Log: Logger = LoggerFactory.getLogger(getClass) > @Test > def testSparkRowCorruption(): Unit = { > val conf = new SparkConf() > .setMaster("local[*]") > .setAppName("test") > .set("spark.ui.enabled", "false") > val ss = SparkSession.builder().config(conf).getOrCreate() > // Setup a DataFrame for testing. > val data = Seq( > Row.fromSeq(Seq(0, "0")), > Row.fromSeq(Seq(25, "25")), > Row.fromSeq(Seq(50, "50")), > Row.fromSeq(Seq(75, "75")), > Row.fromSeq(Seq(99, "99")), > Row.fromSeq(Seq(100, "100")), > Row.fromSeq(Seq(101, "101")), > Row.fromSeq(Seq(125, "125")), > Row.fromSeq(Seq(150, "150")), > Row.fromSeq(Seq(175, "175")), > Row.fromSeq(Seq(199, "199")) > ) > val dataRDD = ss.sparkContext.parallelize(data) > val schema = StructType( > Seq( > StructField("key", IntegerType), > StructField("value", StringType) > )) > val dataDF = ss.sqlContext.createDataFrame(dataRDD, schema) > // Convert to an RDD. > val rdd = dataDF.queryExecution.toRdd > > // Collect the data to compare. > val resultData = rdd.collect > resultData.foreach { row => > // Log for visualizing the corruption. > Log.error(s"${row.getInt(0)}") > } > // Ensure the keys in the original data and resulting data match. > val dataKeys = data.map(_.getInt(0)).toSet > val resultKeys = resultData.map(_.getInt(0)).toSet > assertEquals(dataKeys, resultKeys) > } > } > {code} > That test fails with the following: > {noformat} > 10:38:26.967 [ERROR - Test worker] (PartitionByInternalRowTest.scala:57) 0 > 10:38:26.967 [ERROR - Test worker] (PartitionByInternalRowTest.scala:57) 25 > 10:38:26.967 [ERROR - Test worker] (PartitionByInternalRowTest.scala:57) 75 > 10:38:26.968 [ERROR - Test worker] (PartitionByInternalRowTest.scala:57) 75 > 10:38:26.968 [ERROR - Test worker] (PartitionByInternalRowTest.scala:57) 99 > 10:38:26.968 [ERROR - Test worker] (PartitionByInternalRowTest.scala:57) 100 > 10:38:26.969 [ERROR - Test worker] (PartitionByInternalRowTest.scala:57) 125 > 10:38:26.969 [ERROR - Test worker] (PartitionByInternalRowTest.scala:57) 125 > 10:38:26.969 [ERROR - Test worker] (PartitionByInternalRowTest.scala:57) 150 > 10:38:26.970 [ERROR - Test worker] (PartitionByInternalRowTest.scala:57) 199 > 10:38:26.970 [ERROR - Test worker] (PartitionByInternalRowTest.scala:57) 199 > expected: but > was: > Expected :Set(101, 0, 25, 125, 150, 50, 199, 175, 99, 75, 100) > Actual :Set(0, 25, 125, 150, 199, 99, 75, 100) > {noformat} > If I map from and InternalRow to a Row the issue goes away: > {code} > val rdd = dataDF.queryExecution.toRdd.mapPartitions { internalRows => >val encoder = RowEncoder.apply(schema).resolveAndBind() >
[jira] [Comment Edited] (SPARK-26880) dataDF.queryExecution.toRdd corrupt rows
[ https://issues.apache.org/jira/browse/SPARK-26880?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16768824#comment-16768824 ] Jungtaek Lim edited comment on SPARK-26880 at 2/15/19 12:18 AM: The reason of failure is not because of `queryExecution.toRdd` but because of `queryExecution.toRdd.collect()`. You can modify your code to also print out value before collect (via mapPartitions), and see the values are correct. Internally Spark optimizes on reusing same object - like you may be experienced in MapReduce - which is OK if you iterate and process them, but it produces hard-to-track issues when we try to store them into collection and use object after iteration. Given that it's due to reusing object, once you apply conversion they would produce new row (object) per iteration so safe to store into collection. I agree there's no way to have knowledge unless playing with internal, so it's ideal to inform or restrict taking this approach, but not sure which is the best way to do that. was (Author: kabhwan): The reason of failure is not because of `queryExecution.toRdd` but because of `queryExecution.toRdd.collect()`. You can modify your code to also print out value before collect (via mapPartitions), and see the values are correct. Internally Spark optimizes on reusing same object - like you may be experienced in MapReduce - which is OK if you iterate and process them, but it produces hard-to-track issues when we try to store them into collection. Given that it's due to reusing object, once you apply conversion they would produce new row (object) per iteration so safe to store into collection. I agree there's no way to have knowledge unless playing with internal, so it's ideal to inform or restrict taking this approach, but not sure which is the best way to do that. > dataDF.queryExecution.toRdd corrupt rows > > > Key: SPARK-26880 > URL: https://issues.apache.org/jira/browse/SPARK-26880 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.4.0 >Reporter: Grant Henke >Priority: Major > > I have seen a simple case where InternalRows returned by > `queryExecution.toRdd` are corrupt. Some rows are duplicated while other are > missing. > This simple test illustrates the issue: > {code} > import org.apache.spark.SparkConf > import org.apache.spark.sql.Row > import org.apache.spark.sql.SparkSession > import org.apache.spark.sql.catalyst.encoders.RowEncoder > import org.apache.spark.sql.types.IntegerType > import org.apache.spark.sql.types.StringType > import org.apache.spark.sql.types.StructField > import org.apache.spark.sql.types.StructType > import org.junit.Assert._ > import org.junit.Test > import org.scalatest.Matchers > import org.scalatest.junit.JUnitSuite > import org.slf4j.Logger > import org.slf4j.LoggerFactory > class SparkTest extends JUnitSuite with Matchers { > val Log: Logger = LoggerFactory.getLogger(getClass) > @Test > def testSparkRowCorruption(): Unit = { > val conf = new SparkConf() > .setMaster("local[*]") > .setAppName("test") > .set("spark.ui.enabled", "false") > val ss = SparkSession.builder().config(conf).getOrCreate() > // Setup a DataFrame for testing. > val data = Seq( > Row.fromSeq(Seq(0, "0")), > Row.fromSeq(Seq(25, "25")), > Row.fromSeq(Seq(50, "50")), > Row.fromSeq(Seq(75, "75")), > Row.fromSeq(Seq(99, "99")), > Row.fromSeq(Seq(100, "100")), > Row.fromSeq(Seq(101, "101")), > Row.fromSeq(Seq(125, "125")), > Row.fromSeq(Seq(150, "150")), > Row.fromSeq(Seq(175, "175")), > Row.fromSeq(Seq(199, "199")) > ) > val dataRDD = ss.sparkContext.parallelize(data) > val schema = StructType( > Seq( > StructField("key", IntegerType), > StructField("value", StringType) > )) > val dataDF = ss.sqlContext.createDataFrame(dataRDD, schema) > // Convert to an RDD. > val rdd = dataDF.queryExecution.toRdd > > // Collect the data to compare. > val resultData = rdd.collect > resultData.foreach { row => > // Log for visualizing the corruption. > Log.error(s"${row.getInt(0)}") > } > // Ensure the keys in the original data and resulting data match. > val dataKeys = data.map(_.getInt(0)).toSet > val resultKeys = resultData.map(_.getInt(0)).toSet > assertEquals(dataKeys, resultKeys) > } > } > {code} > That test fails with the following: > {noformat} > 10:38:26.967 [ERROR - Test worker] (PartitionByInternalRowTest.scala:57) 0 > 10:38:26.967 [ERROR - Test worker] (PartitionByInternalRowTest.scala:57) 25 > 10:38:26.967 [ERROR - Test worker] (PartitionByInternalRowTest.scala:57) 75 > 10:38:26.968 [ERROR - Test worker]
[jira] [Comment Edited] (SPARK-26880) dataDF.queryExecution.toRdd corrupt rows
[ https://issues.apache.org/jira/browse/SPARK-26880?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16768824#comment-16768824 ] Jungtaek Lim edited comment on SPARK-26880 at 2/15/19 12:20 AM: The reason of failure is not because of `queryExecution.toRdd` but because of `queryExecution.toRdd.collect()`. You can modify your code to also print out value before collect (via mapPartitions), and see the values are correct. Internally Spark optimizes on reusing same object - like you may be experienced in MapReduce - which is OK if you iterate and process them, but it produces hard-to-track issues when you try to store them into collection and use object after iteration. (collect() works exactly this way) Given that it's due to reusing object, once you apply conversion they would produce new row (object) per iteration so safe to store into collection. I agree there's no way to have knowledge unless playing with internal, so it's ideal to inform or restrict taking this approach, but not sure which is the best way to do that. was (Author: kabhwan): The reason of failure is not because of `queryExecution.toRdd` but because of `queryExecution.toRdd.collect()`. You can modify your code to also print out value before collect (via mapPartitions), and see the values are correct. Internally Spark optimizes on reusing same object - like you may be experienced in MapReduce - which is OK if you iterate and process them, but it produces hard-to-track issues when we try to store them into collection and use object after iteration. Given that it's due to reusing object, once you apply conversion they would produce new row (object) per iteration so safe to store into collection. I agree there's no way to have knowledge unless playing with internal, so it's ideal to inform or restrict taking this approach, but not sure which is the best way to do that. > dataDF.queryExecution.toRdd corrupt rows > > > Key: SPARK-26880 > URL: https://issues.apache.org/jira/browse/SPARK-26880 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.4.0 >Reporter: Grant Henke >Priority: Major > > I have seen a simple case where InternalRows returned by > `queryExecution.toRdd` are corrupt. Some rows are duplicated while other are > missing. > This simple test illustrates the issue: > {code} > import org.apache.spark.SparkConf > import org.apache.spark.sql.Row > import org.apache.spark.sql.SparkSession > import org.apache.spark.sql.catalyst.encoders.RowEncoder > import org.apache.spark.sql.types.IntegerType > import org.apache.spark.sql.types.StringType > import org.apache.spark.sql.types.StructField > import org.apache.spark.sql.types.StructType > import org.junit.Assert._ > import org.junit.Test > import org.scalatest.Matchers > import org.scalatest.junit.JUnitSuite > import org.slf4j.Logger > import org.slf4j.LoggerFactory > class SparkTest extends JUnitSuite with Matchers { > val Log: Logger = LoggerFactory.getLogger(getClass) > @Test > def testSparkRowCorruption(): Unit = { > val conf = new SparkConf() > .setMaster("local[*]") > .setAppName("test") > .set("spark.ui.enabled", "false") > val ss = SparkSession.builder().config(conf).getOrCreate() > // Setup a DataFrame for testing. > val data = Seq( > Row.fromSeq(Seq(0, "0")), > Row.fromSeq(Seq(25, "25")), > Row.fromSeq(Seq(50, "50")), > Row.fromSeq(Seq(75, "75")), > Row.fromSeq(Seq(99, "99")), > Row.fromSeq(Seq(100, "100")), > Row.fromSeq(Seq(101, "101")), > Row.fromSeq(Seq(125, "125")), > Row.fromSeq(Seq(150, "150")), > Row.fromSeq(Seq(175, "175")), > Row.fromSeq(Seq(199, "199")) > ) > val dataRDD = ss.sparkContext.parallelize(data) > val schema = StructType( > Seq( > StructField("key", IntegerType), > StructField("value", StringType) > )) > val dataDF = ss.sqlContext.createDataFrame(dataRDD, schema) > // Convert to an RDD. > val rdd = dataDF.queryExecution.toRdd > > // Collect the data to compare. > val resultData = rdd.collect > resultData.foreach { row => > // Log for visualizing the corruption. > Log.error(s"${row.getInt(0)}") > } > // Ensure the keys in the original data and resulting data match. > val dataKeys = data.map(_.getInt(0)).toSet > val resultKeys = resultData.map(_.getInt(0)).toSet > assertEquals(dataKeys, resultKeys) > } > } > {code} > That test fails with the following: > {noformat} > 10:38:26.967 [ERROR - Test worker] (PartitionByInternalRowTest.scala:57) 0 > 10:38:26.967 [ERROR - Test worker] (PartitionByInternalRowTest.scala:57) 25 > 10:38:26.967 [ERROR - Test worker]
[jira] [Comment Edited] (SPARK-26880) dataDF.queryExecution.toRdd corrupt rows
[ https://issues.apache.org/jira/browse/SPARK-26880?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16768754#comment-16768754 ] Dilip Biswal edited comment on SPARK-26880 at 2/14/19 10:00 PM: Hello, Shouldn't we use `dataDF.rdd` as opposed to ` dataDF.queryExecution.toRdd` to get RDD[Rows] ? In my understanding, the former one returns the rdd after converting the rows from internal format to external one. The later one is the internal version of RDD i.e RDD[InternalRow]. cc [~sro...@scient.com] [~cloud_fan] Regards, – Dilip was (Author: dkbiswal): Hello, Shouldn't we use `dataDF.rdd` as opposed to ` dataDF.queryExecution.toRdd` to get RDD[Rows] ? The former one returns the rdd after converting the rows from internal format to external one. The later one is the internal version of RDD i.e RDD[InternalRow]. cc [~sro...@scient.com] [~cloud_fan] Regards, -- Dilip > dataDF.queryExecution.toRdd corrupt rows > > > Key: SPARK-26880 > URL: https://issues.apache.org/jira/browse/SPARK-26880 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.4.0 >Reporter: Grant Henke >Priority: Major > > I have seen a simple case where InternalRows returned by > `queryExecution.toRdd` are corrupt. Some rows are duplicated while other are > missing. > This simple test illustrates the issue: > {code} > import org.apache.spark.SparkConf > import org.apache.spark.sql.Row > import org.apache.spark.sql.SparkSession > import org.apache.spark.sql.catalyst.encoders.RowEncoder > import org.apache.spark.sql.types.IntegerType > import org.apache.spark.sql.types.StringType > import org.apache.spark.sql.types.StructField > import org.apache.spark.sql.types.StructType > import org.junit.Assert._ > import org.junit.Test > import org.scalatest.Matchers > import org.scalatest.junit.JUnitSuite > import org.slf4j.Logger > import org.slf4j.LoggerFactory > class SparkTest extends JUnitSuite with Matchers { > val Log: Logger = LoggerFactory.getLogger(getClass) > @Test > def testSparkRowCorruption(): Unit = { > val conf = new SparkConf() > .setMaster("local[*]") > .setAppName("test") > .set("spark.ui.enabled", "false") > val ss = SparkSession.builder().config(conf).getOrCreate() > // Setup a DataFrame for testing. > val data = Seq( > Row.fromSeq(Seq(0, "0")), > Row.fromSeq(Seq(25, "25")), > Row.fromSeq(Seq(50, "50")), > Row.fromSeq(Seq(75, "75")), > Row.fromSeq(Seq(99, "99")), > Row.fromSeq(Seq(100, "100")), > Row.fromSeq(Seq(101, "101")), > Row.fromSeq(Seq(125, "125")), > Row.fromSeq(Seq(150, "150")), > Row.fromSeq(Seq(175, "175")), > Row.fromSeq(Seq(199, "199")) > ) > val dataRDD = ss.sparkContext.parallelize(data) > val schema = StructType( > Seq( > StructField("key", IntegerType), > StructField("value", StringType) > )) > val dataDF = ss.sqlContext.createDataFrame(dataRDD, schema) > // Convert to an RDD. > val rdd = dataDF.queryExecution.toRdd > > // Collect the data to compare. > val resultData = rdd.collect > resultData.foreach { row => > // Log for visualizing the corruption. > Log.error(s"${row.getInt(0)}") > } > // Ensure the keys in the original data and resulting data match. > val dataKeys = data.map(_.getInt(0)).toSet > val resultKeys = resultData.map(_.getInt(0)).toSet > assertEquals(dataKeys, resultKeys) > } > } > {code} > That test fails with the following: > {noformat} > 10:38:26.967 [ERROR - Test worker] (PartitionByInternalRowTest.scala:57) 0 > 10:38:26.967 [ERROR - Test worker] (PartitionByInternalRowTest.scala:57) 25 > 10:38:26.967 [ERROR - Test worker] (PartitionByInternalRowTest.scala:57) 75 > 10:38:26.968 [ERROR - Test worker] (PartitionByInternalRowTest.scala:57) 75 > 10:38:26.968 [ERROR - Test worker] (PartitionByInternalRowTest.scala:57) 99 > 10:38:26.968 [ERROR - Test worker] (PartitionByInternalRowTest.scala:57) 100 > 10:38:26.969 [ERROR - Test worker] (PartitionByInternalRowTest.scala:57) 125 > 10:38:26.969 [ERROR - Test worker] (PartitionByInternalRowTest.scala:57) 125 > 10:38:26.969 [ERROR - Test worker] (PartitionByInternalRowTest.scala:57) 150 > 10:38:26.970 [ERROR - Test worker] (PartitionByInternalRowTest.scala:57) 199 > 10:38:26.970 [ERROR - Test worker] (PartitionByInternalRowTest.scala:57) 199 > expected: but > was: > Expected :Set(101, 0, 25, 125, 150, 50, 199, 175, 99, 75, 100) > Actual :Set(0, 25, 125, 150, 199, 99, 75, 100) > {noformat} > If I map from and InternalRow to a Row the issue goes away: > {code} > val rdd = dataDF.queryExecution.toRdd.mapPartitions { internalRows => >val encoder = RowEncoder.apply(schema).resolveAndBind() >