[jira] [Comment Edited] (SPARK-26880) dataDF.queryExecution.toRdd corrupt rows

2019-02-18 Thread Jungtaek Lim (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26880?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16770866#comment-16770866
 ] 

Jungtaek Lim edited comment on SPARK-26880 at 2/18/19 8:40 AM:
---

Just submitted a PR to add note on proper usage of "QueryExecution.toRdd".
https://github.com/apache/spark/pull/23822

Given this issue is filed as a bug, I just submitted PR as MINOR and don't link 
my PR to this issue. Marking this issue as 'Resolved' would provide false-alarm 
- marking this issue as "invalid" or "information provided" would be OK for 
this?


was (Author: kabhwan):
Just submitted a PR to add note on proper usage of "QueryExecution.toRdd".
https://github.com/apache/spark/pull/23822

Given this issue is filed as a bug, I just submitted PR as MINOR and don't link 
my PR to this issue. Marking this issue as 'Resolved' would provide false-alarm.

> dataDF.queryExecution.toRdd corrupt rows
> 
>
> Key: SPARK-26880
> URL: https://issues.apache.org/jira/browse/SPARK-26880
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.4.0
>Reporter: Grant Henke
>Priority: Major
>
> I have seen a simple case where InternalRows returned by 
> `queryExecution.toRdd` are corrupt. Some rows are duplicated while other are 
> missing. 
> This simple test illustrates the issue:
> {code}
> import org.apache.spark.SparkConf
> import org.apache.spark.sql.Row
> import org.apache.spark.sql.SparkSession
> import org.apache.spark.sql.catalyst.encoders.RowEncoder
> import org.apache.spark.sql.types.IntegerType
> import org.apache.spark.sql.types.StringType
> import org.apache.spark.sql.types.StructField
> import org.apache.spark.sql.types.StructType
> import org.junit.Assert._
> import org.junit.Test
> import org.scalatest.Matchers
> import org.scalatest.junit.JUnitSuite
> import org.slf4j.Logger
> import org.slf4j.LoggerFactory
> class SparkTest extends JUnitSuite with Matchers {
>   val Log: Logger = LoggerFactory.getLogger(getClass)
>   @Test
>   def testSparkRowCorruption(): Unit = {
> val conf = new SparkConf()
>   .setMaster("local[*]")
>   .setAppName("test")
>   .set("spark.ui.enabled", "false")
> val ss = SparkSession.builder().config(conf).getOrCreate()
> // Setup a DataFrame for testing.
> val data = Seq(
>   Row.fromSeq(Seq(0, "0")),
>   Row.fromSeq(Seq(25, "25")),
>   Row.fromSeq(Seq(50, "50")),
>   Row.fromSeq(Seq(75, "75")),
>   Row.fromSeq(Seq(99, "99")),
>   Row.fromSeq(Seq(100, "100")),
>   Row.fromSeq(Seq(101, "101")),
>   Row.fromSeq(Seq(125, "125")),
>   Row.fromSeq(Seq(150, "150")),
>   Row.fromSeq(Seq(175, "175")),
>   Row.fromSeq(Seq(199, "199"))
> )
> val dataRDD = ss.sparkContext.parallelize(data)
> val schema = StructType(
>   Seq(
> StructField("key", IntegerType),
> StructField("value", StringType)
>   ))
> val dataDF = ss.sqlContext.createDataFrame(dataRDD, schema)
> // Convert to an RDD.
> val rdd = dataDF.queryExecution.toRdd
> 
> // Collect the data to compare.
> val resultData = rdd.collect
> resultData.foreach { row =>
>   // Log for visualizing the corruption.
>   Log.error(s"${row.getInt(0)}")
> }
> // Ensure the keys in the original data and resulting data match.
> val dataKeys = data.map(_.getInt(0)).toSet
> val resultKeys = resultData.map(_.getInt(0)).toSet
> assertEquals(dataKeys, resultKeys)
>   }
> }
> {code}
> That test fails with the following:
> {noformat}
> 10:38:26.967 [ERROR - Test worker] (PartitionByInternalRowTest.scala:57) 0
> 10:38:26.967 [ERROR - Test worker] (PartitionByInternalRowTest.scala:57) 25
> 10:38:26.967 [ERROR - Test worker] (PartitionByInternalRowTest.scala:57) 75
> 10:38:26.968 [ERROR - Test worker] (PartitionByInternalRowTest.scala:57) 75
> 10:38:26.968 [ERROR - Test worker] (PartitionByInternalRowTest.scala:57) 99
> 10:38:26.968 [ERROR - Test worker] (PartitionByInternalRowTest.scala:57) 100
> 10:38:26.969 [ERROR - Test worker] (PartitionByInternalRowTest.scala:57) 125
> 10:38:26.969 [ERROR - Test worker] (PartitionByInternalRowTest.scala:57) 125
> 10:38:26.969 [ERROR - Test worker] (PartitionByInternalRowTest.scala:57) 150
> 10:38:26.970 [ERROR - Test worker] (PartitionByInternalRowTest.scala:57) 199
> 10:38:26.970 [ERROR - Test worker] (PartitionByInternalRowTest.scala:57) 199
> expected: but 
> was:
> Expected :Set(101, 0, 25, 125, 150, 50, 199, 175, 99, 75, 100)
> Actual   :Set(0, 25, 125, 150, 199, 99, 75, 100)
> {noformat}
> If I map from and InternalRow to a Row the issue goes away:
> {code}
> val rdd = dataDF.queryExecution.toRdd.mapPartitions { internalRows =>
>val encoder = RowEncoder.apply(schema).resolveAndBind()
>

[jira] [Comment Edited] (SPARK-26880) dataDF.queryExecution.toRdd corrupt rows

2019-02-14 Thread Jungtaek Lim (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26880?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16768824#comment-16768824
 ] 

Jungtaek Lim edited comment on SPARK-26880 at 2/15/19 12:18 AM:


The reason of failure is not because of `queryExecution.toRdd` but because of 
`queryExecution.toRdd.collect()`. You can modify your code to also print out 
value before collect (via mapPartitions), and see the values are correct.

Internally Spark optimizes on reusing same object - like you may be experienced 
in MapReduce - which is OK if you iterate and process them, but it produces 
hard-to-track issues when we try to store them into collection and use object 
after iteration.

Given that it's due to reusing object, once you apply conversion they would 
produce new row (object) per iteration so safe to store into collection.

I agree there's no way to have knowledge unless playing with internal, so it's 
ideal to inform or restrict taking this approach, but not sure which is the 
best way to do that.


was (Author: kabhwan):
The reason of failure is not because of `queryExecution.toRdd` but because of 
`queryExecution.toRdd.collect()`. You can modify your code to also print out 
value before collect (via mapPartitions), and see the values are correct.

Internally Spark optimizes on reusing same object - like you may be experienced 
in MapReduce - which is OK if you iterate and process them, but it produces 
hard-to-track issues when we try to store them into collection.

Given that it's due to reusing object, once you apply conversion they would 
produce new row (object) per iteration so safe to store into collection.

I agree there's no way to have knowledge unless playing with internal, so it's 
ideal to inform or restrict taking this approach, but not sure which is the 
best way to do that.

> dataDF.queryExecution.toRdd corrupt rows
> 
>
> Key: SPARK-26880
> URL: https://issues.apache.org/jira/browse/SPARK-26880
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.4.0
>Reporter: Grant Henke
>Priority: Major
>
> I have seen a simple case where InternalRows returned by 
> `queryExecution.toRdd` are corrupt. Some rows are duplicated while other are 
> missing. 
> This simple test illustrates the issue:
> {code}
> import org.apache.spark.SparkConf
> import org.apache.spark.sql.Row
> import org.apache.spark.sql.SparkSession
> import org.apache.spark.sql.catalyst.encoders.RowEncoder
> import org.apache.spark.sql.types.IntegerType
> import org.apache.spark.sql.types.StringType
> import org.apache.spark.sql.types.StructField
> import org.apache.spark.sql.types.StructType
> import org.junit.Assert._
> import org.junit.Test
> import org.scalatest.Matchers
> import org.scalatest.junit.JUnitSuite
> import org.slf4j.Logger
> import org.slf4j.LoggerFactory
> class SparkTest extends JUnitSuite with Matchers {
>   val Log: Logger = LoggerFactory.getLogger(getClass)
>   @Test
>   def testSparkRowCorruption(): Unit = {
> val conf = new SparkConf()
>   .setMaster("local[*]")
>   .setAppName("test")
>   .set("spark.ui.enabled", "false")
> val ss = SparkSession.builder().config(conf).getOrCreate()
> // Setup a DataFrame for testing.
> val data = Seq(
>   Row.fromSeq(Seq(0, "0")),
>   Row.fromSeq(Seq(25, "25")),
>   Row.fromSeq(Seq(50, "50")),
>   Row.fromSeq(Seq(75, "75")),
>   Row.fromSeq(Seq(99, "99")),
>   Row.fromSeq(Seq(100, "100")),
>   Row.fromSeq(Seq(101, "101")),
>   Row.fromSeq(Seq(125, "125")),
>   Row.fromSeq(Seq(150, "150")),
>   Row.fromSeq(Seq(175, "175")),
>   Row.fromSeq(Seq(199, "199"))
> )
> val dataRDD = ss.sparkContext.parallelize(data)
> val schema = StructType(
>   Seq(
> StructField("key", IntegerType),
> StructField("value", StringType)
>   ))
> val dataDF = ss.sqlContext.createDataFrame(dataRDD, schema)
> // Convert to an RDD.
> val rdd = dataDF.queryExecution.toRdd
> 
> // Collect the data to compare.
> val resultData = rdd.collect
> resultData.foreach { row =>
>   // Log for visualizing the corruption.
>   Log.error(s"${row.getInt(0)}")
> }
> // Ensure the keys in the original data and resulting data match.
> val dataKeys = data.map(_.getInt(0)).toSet
> val resultKeys = resultData.map(_.getInt(0)).toSet
> assertEquals(dataKeys, resultKeys)
>   }
> }
> {code}
> That test fails with the following:
> {noformat}
> 10:38:26.967 [ERROR - Test worker] (PartitionByInternalRowTest.scala:57) 0
> 10:38:26.967 [ERROR - Test worker] (PartitionByInternalRowTest.scala:57) 25
> 10:38:26.967 [ERROR - Test worker] (PartitionByInternalRowTest.scala:57) 75
> 10:38:26.968 [ERROR - Test worker] 

[jira] [Comment Edited] (SPARK-26880) dataDF.queryExecution.toRdd corrupt rows

2019-02-14 Thread Jungtaek Lim (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26880?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16768824#comment-16768824
 ] 

Jungtaek Lim edited comment on SPARK-26880 at 2/15/19 12:20 AM:


The reason of failure is not because of `queryExecution.toRdd` but because of 
`queryExecution.toRdd.collect()`. You can modify your code to also print out 
value before collect (via mapPartitions), and see the values are correct.

Internally Spark optimizes on reusing same object - like you may be experienced 
in MapReduce - which is OK if you iterate and process them, but it produces 
hard-to-track issues when you try to store them into collection and use object 
after iteration. (collect() works exactly this way)

Given that it's due to reusing object, once you apply conversion they would 
produce new row (object) per iteration so safe to store into collection.

I agree there's no way to have knowledge unless playing with internal, so it's 
ideal to inform or restrict taking this approach, but not sure which is the 
best way to do that.


was (Author: kabhwan):
The reason of failure is not because of `queryExecution.toRdd` but because of 
`queryExecution.toRdd.collect()`. You can modify your code to also print out 
value before collect (via mapPartitions), and see the values are correct.

Internally Spark optimizes on reusing same object - like you may be experienced 
in MapReduce - which is OK if you iterate and process them, but it produces 
hard-to-track issues when we try to store them into collection and use object 
after iteration.

Given that it's due to reusing object, once you apply conversion they would 
produce new row (object) per iteration so safe to store into collection.

I agree there's no way to have knowledge unless playing with internal, so it's 
ideal to inform or restrict taking this approach, but not sure which is the 
best way to do that.

> dataDF.queryExecution.toRdd corrupt rows
> 
>
> Key: SPARK-26880
> URL: https://issues.apache.org/jira/browse/SPARK-26880
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.4.0
>Reporter: Grant Henke
>Priority: Major
>
> I have seen a simple case where InternalRows returned by 
> `queryExecution.toRdd` are corrupt. Some rows are duplicated while other are 
> missing. 
> This simple test illustrates the issue:
> {code}
> import org.apache.spark.SparkConf
> import org.apache.spark.sql.Row
> import org.apache.spark.sql.SparkSession
> import org.apache.spark.sql.catalyst.encoders.RowEncoder
> import org.apache.spark.sql.types.IntegerType
> import org.apache.spark.sql.types.StringType
> import org.apache.spark.sql.types.StructField
> import org.apache.spark.sql.types.StructType
> import org.junit.Assert._
> import org.junit.Test
> import org.scalatest.Matchers
> import org.scalatest.junit.JUnitSuite
> import org.slf4j.Logger
> import org.slf4j.LoggerFactory
> class SparkTest extends JUnitSuite with Matchers {
>   val Log: Logger = LoggerFactory.getLogger(getClass)
>   @Test
>   def testSparkRowCorruption(): Unit = {
> val conf = new SparkConf()
>   .setMaster("local[*]")
>   .setAppName("test")
>   .set("spark.ui.enabled", "false")
> val ss = SparkSession.builder().config(conf).getOrCreate()
> // Setup a DataFrame for testing.
> val data = Seq(
>   Row.fromSeq(Seq(0, "0")),
>   Row.fromSeq(Seq(25, "25")),
>   Row.fromSeq(Seq(50, "50")),
>   Row.fromSeq(Seq(75, "75")),
>   Row.fromSeq(Seq(99, "99")),
>   Row.fromSeq(Seq(100, "100")),
>   Row.fromSeq(Seq(101, "101")),
>   Row.fromSeq(Seq(125, "125")),
>   Row.fromSeq(Seq(150, "150")),
>   Row.fromSeq(Seq(175, "175")),
>   Row.fromSeq(Seq(199, "199"))
> )
> val dataRDD = ss.sparkContext.parallelize(data)
> val schema = StructType(
>   Seq(
> StructField("key", IntegerType),
> StructField("value", StringType)
>   ))
> val dataDF = ss.sqlContext.createDataFrame(dataRDD, schema)
> // Convert to an RDD.
> val rdd = dataDF.queryExecution.toRdd
> 
> // Collect the data to compare.
> val resultData = rdd.collect
> resultData.foreach { row =>
>   // Log for visualizing the corruption.
>   Log.error(s"${row.getInt(0)}")
> }
> // Ensure the keys in the original data and resulting data match.
> val dataKeys = data.map(_.getInt(0)).toSet
> val resultKeys = resultData.map(_.getInt(0)).toSet
> assertEquals(dataKeys, resultKeys)
>   }
> }
> {code}
> That test fails with the following:
> {noformat}
> 10:38:26.967 [ERROR - Test worker] (PartitionByInternalRowTest.scala:57) 0
> 10:38:26.967 [ERROR - Test worker] (PartitionByInternalRowTest.scala:57) 25
> 10:38:26.967 [ERROR - Test worker] 

[jira] [Comment Edited] (SPARK-26880) dataDF.queryExecution.toRdd corrupt rows

2019-02-14 Thread Dilip Biswal (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-26880?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16768754#comment-16768754
 ] 

Dilip Biswal edited comment on SPARK-26880 at 2/14/19 10:00 PM:


Hello,

Shouldn't we use `dataDF.rdd` as opposed to ` dataDF.queryExecution.toRdd` to 
get RDD[Rows] ? In my understanding, the former one
 returns the rdd after converting the rows from internal format to external 
one. The later one is
 the internal version of RDD i.e RDD[InternalRow].

cc [~sro...@scient.com] [~cloud_fan]

Regards,

– Dilip


was (Author: dkbiswal):
Hello,

Shouldn't we use `dataDF.rdd` as opposed to ` dataDF.queryExecution.toRdd` to 
get RDD[Rows] ? The former one
 returns the rdd after converting the rows from internal format to external 
one. The later one is
 the internal version of RDD i.e RDD[InternalRow].

cc [~sro...@scient.com] [~cloud_fan]

Regards,

-- Dilip

> dataDF.queryExecution.toRdd corrupt rows
> 
>
> Key: SPARK-26880
> URL: https://issues.apache.org/jira/browse/SPARK-26880
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.4.0
>Reporter: Grant Henke
>Priority: Major
>
> I have seen a simple case where InternalRows returned by 
> `queryExecution.toRdd` are corrupt. Some rows are duplicated while other are 
> missing. 
> This simple test illustrates the issue:
> {code}
> import org.apache.spark.SparkConf
> import org.apache.spark.sql.Row
> import org.apache.spark.sql.SparkSession
> import org.apache.spark.sql.catalyst.encoders.RowEncoder
> import org.apache.spark.sql.types.IntegerType
> import org.apache.spark.sql.types.StringType
> import org.apache.spark.sql.types.StructField
> import org.apache.spark.sql.types.StructType
> import org.junit.Assert._
> import org.junit.Test
> import org.scalatest.Matchers
> import org.scalatest.junit.JUnitSuite
> import org.slf4j.Logger
> import org.slf4j.LoggerFactory
> class SparkTest extends JUnitSuite with Matchers {
>   val Log: Logger = LoggerFactory.getLogger(getClass)
>   @Test
>   def testSparkRowCorruption(): Unit = {
> val conf = new SparkConf()
>   .setMaster("local[*]")
>   .setAppName("test")
>   .set("spark.ui.enabled", "false")
> val ss = SparkSession.builder().config(conf).getOrCreate()
> // Setup a DataFrame for testing.
> val data = Seq(
>   Row.fromSeq(Seq(0, "0")),
>   Row.fromSeq(Seq(25, "25")),
>   Row.fromSeq(Seq(50, "50")),
>   Row.fromSeq(Seq(75, "75")),
>   Row.fromSeq(Seq(99, "99")),
>   Row.fromSeq(Seq(100, "100")),
>   Row.fromSeq(Seq(101, "101")),
>   Row.fromSeq(Seq(125, "125")),
>   Row.fromSeq(Seq(150, "150")),
>   Row.fromSeq(Seq(175, "175")),
>   Row.fromSeq(Seq(199, "199"))
> )
> val dataRDD = ss.sparkContext.parallelize(data)
> val schema = StructType(
>   Seq(
> StructField("key", IntegerType),
> StructField("value", StringType)
>   ))
> val dataDF = ss.sqlContext.createDataFrame(dataRDD, schema)
> // Convert to an RDD.
> val rdd = dataDF.queryExecution.toRdd
> 
> // Collect the data to compare.
> val resultData = rdd.collect
> resultData.foreach { row =>
>   // Log for visualizing the corruption.
>   Log.error(s"${row.getInt(0)}")
> }
> // Ensure the keys in the original data and resulting data match.
> val dataKeys = data.map(_.getInt(0)).toSet
> val resultKeys = resultData.map(_.getInt(0)).toSet
> assertEquals(dataKeys, resultKeys)
>   }
> }
> {code}
> That test fails with the following:
> {noformat}
> 10:38:26.967 [ERROR - Test worker] (PartitionByInternalRowTest.scala:57) 0
> 10:38:26.967 [ERROR - Test worker] (PartitionByInternalRowTest.scala:57) 25
> 10:38:26.967 [ERROR - Test worker] (PartitionByInternalRowTest.scala:57) 75
> 10:38:26.968 [ERROR - Test worker] (PartitionByInternalRowTest.scala:57) 75
> 10:38:26.968 [ERROR - Test worker] (PartitionByInternalRowTest.scala:57) 99
> 10:38:26.968 [ERROR - Test worker] (PartitionByInternalRowTest.scala:57) 100
> 10:38:26.969 [ERROR - Test worker] (PartitionByInternalRowTest.scala:57) 125
> 10:38:26.969 [ERROR - Test worker] (PartitionByInternalRowTest.scala:57) 125
> 10:38:26.969 [ERROR - Test worker] (PartitionByInternalRowTest.scala:57) 150
> 10:38:26.970 [ERROR - Test worker] (PartitionByInternalRowTest.scala:57) 199
> 10:38:26.970 [ERROR - Test worker] (PartitionByInternalRowTest.scala:57) 199
> expected: but 
> was:
> Expected :Set(101, 0, 25, 125, 150, 50, 199, 175, 99, 75, 100)
> Actual   :Set(0, 25, 125, 150, 199, 99, 75, 100)
> {noformat}
> If I map from and InternalRow to a Row the issue goes away:
> {code}
> val rdd = dataDF.queryExecution.toRdd.mapPartitions { internalRows =>
>val encoder = RowEncoder.apply(schema).resolveAndBind()
>