[ 
https://issues.apache.org/jira/browse/SPARK-26880?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16768824#comment-16768824
 ] 

Jungtaek Lim edited comment on SPARK-26880 at 2/15/19 12:20 AM:
----------------------------------------------------------------

The reason of failure is not because of `queryExecution.toRdd` but because of 
`queryExecution.toRdd.collect()`. You can modify your code to also print out 
value before collect (via mapPartitions), and see the values are correct.

Internally Spark optimizes on reusing same object - like you may be experienced 
in MapReduce - which is OK if you iterate and process them, but it produces 
hard-to-track issues when you try to store them into collection and use object 
after iteration. (collect() works exactly this way)

Given that it's due to reusing object, once you apply conversion they would 
produce new row (object) per iteration so safe to store into collection.

I agree there's no way to have knowledge unless playing with internal, so it's 
ideal to inform or restrict taking this approach, but not sure which is the 
best way to do that.


was (Author: kabhwan):
The reason of failure is not because of `queryExecution.toRdd` but because of 
`queryExecution.toRdd.collect()`. You can modify your code to also print out 
value before collect (via mapPartitions), and see the values are correct.

Internally Spark optimizes on reusing same object - like you may be experienced 
in MapReduce - which is OK if you iterate and process them, but it produces 
hard-to-track issues when we try to store them into collection and use object 
after iteration.

Given that it's due to reusing object, once you apply conversion they would 
produce new row (object) per iteration so safe to store into collection.

I agree there's no way to have knowledge unless playing with internal, so it's 
ideal to inform or restrict taking this approach, but not sure which is the 
best way to do that.

> dataDF.queryExecution.toRdd corrupt rows
> ----------------------------------------
>
>                 Key: SPARK-26880
>                 URL: https://issues.apache.org/jira/browse/SPARK-26880
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 2.4.0
>            Reporter: Grant Henke
>            Priority: Major
>
> I have seen a simple case where InternalRows returned by 
> `queryExecution.toRdd` are corrupt. Some rows are duplicated while other are 
> missing. 
> This simple test illustrates the issue:
> {code}
> import org.apache.spark.SparkConf
> import org.apache.spark.sql.Row
> import org.apache.spark.sql.SparkSession
> import org.apache.spark.sql.catalyst.encoders.RowEncoder
> import org.apache.spark.sql.types.IntegerType
> import org.apache.spark.sql.types.StringType
> import org.apache.spark.sql.types.StructField
> import org.apache.spark.sql.types.StructType
> import org.junit.Assert._
> import org.junit.Test
> import org.scalatest.Matchers
> import org.scalatest.junit.JUnitSuite
> import org.slf4j.Logger
> import org.slf4j.LoggerFactory
> class SparkTest extends JUnitSuite with Matchers {
>   val Log: Logger = LoggerFactory.getLogger(getClass)
>   @Test
>   def testSparkRowCorruption(): Unit = {
>     val conf = new SparkConf()
>       .setMaster("local[*]")
>       .setAppName("test")
>       .set("spark.ui.enabled", "false")
>     val ss = SparkSession.builder().config(conf).getOrCreate()
>     // Setup a DataFrame for testing.
>     val data = Seq(
>       Row.fromSeq(Seq(0, "0")),
>       Row.fromSeq(Seq(25, "25")),
>       Row.fromSeq(Seq(50, "50")),
>       Row.fromSeq(Seq(75, "75")),
>       Row.fromSeq(Seq(99, "99")),
>       Row.fromSeq(Seq(100, "100")),
>       Row.fromSeq(Seq(101, "101")),
>       Row.fromSeq(Seq(125, "125")),
>       Row.fromSeq(Seq(150, "150")),
>       Row.fromSeq(Seq(175, "175")),
>       Row.fromSeq(Seq(199, "199"))
>     )
>     val dataRDD = ss.sparkContext.parallelize(data)
>     val schema = StructType(
>       Seq(
>         StructField("key", IntegerType),
>         StructField("value", StringType)
>       ))
>     val dataDF = ss.sqlContext.createDataFrame(dataRDD, schema)
>     // Convert to an RDD.
>     val rdd = dataDF.queryExecution.toRdd
>     
>     // Collect the data to compare.
>     val resultData = rdd.collect
>     resultData.foreach { row =>
>       // Log for visualizing the corruption.
>       Log.error(s"${row.getInt(0)}")
>     }
>     // Ensure the keys in the original data and resulting data match.
>     val dataKeys = data.map(_.getInt(0)).toSet
>     val resultKeys = resultData.map(_.getInt(0)).toSet
>     assertEquals(dataKeys, resultKeys)
>   }
> }
> {code}
> That test fails with the following:
> {noformat}
> 10:38:26.967 [ERROR - Test worker] (PartitionByInternalRowTest.scala:57) 0
> 10:38:26.967 [ERROR - Test worker] (PartitionByInternalRowTest.scala:57) 25
> 10:38:26.967 [ERROR - Test worker] (PartitionByInternalRowTest.scala:57) 75
> 10:38:26.968 [ERROR - Test worker] (PartitionByInternalRowTest.scala:57) 75
> 10:38:26.968 [ERROR - Test worker] (PartitionByInternalRowTest.scala:57) 99
> 10:38:26.968 [ERROR - Test worker] (PartitionByInternalRowTest.scala:57) 100
> 10:38:26.969 [ERROR - Test worker] (PartitionByInternalRowTest.scala:57) 125
> 10:38:26.969 [ERROR - Test worker] (PartitionByInternalRowTest.scala:57) 125
> 10:38:26.969 [ERROR - Test worker] (PartitionByInternalRowTest.scala:57) 150
> 10:38:26.970 [ERROR - Test worker] (PartitionByInternalRowTest.scala:57) 199
> 10:38:26.970 [ERROR - Test worker] (PartitionByInternalRowTest.scala:57) 199
> expected:<Set(101, 0, 25, 125, 150, 50, 199, 175, 99, 75, 100)> but 
> was:<Set(0, 25, 125, 150, 199, 99, 75, 100)>
> Expected :Set(101, 0, 25, 125, 150, 50, 199, 175, 99, 75, 100)
> Actual   :Set(0, 25, 125, 150, 199, 99, 75, 100)
> {noformat}
> If I map from and InternalRow to a Row the issue goes away:
> {code}
> val rdd = dataDF.queryExecution.toRdd.mapPartitions { internalRows =>
>    val encoder = RowEncoder.apply(schema).resolveAndBind()
>    internalRows.map(encoder.fromRow)
> }
> {code}
> Converting with CatalystTypeConverters also appears to resolve the issue:
> {code}
> val rdd = dataDF.queryExecution.toRdd.mapPartitions { internalRows =>
>    val typeConverter = CatalystTypeConverters.createToScalaConverter(schema)
>    internalRows.map(ir => typeConverter(ir).asInstanceOf[Row])
> }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to