[ https://issues.apache.org/jira/browse/SPARK-26880?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16770866#comment-16770866 ]
Jungtaek Lim commented on SPARK-26880: -------------------------------------- Just submitted a PR to add note on proper usage of "QueryExecution.toRdd". https://github.com/apache/spark/pull/23822 Given this issue is filed as a bug, I just submitted PR as MINOR and don't link my PR to this issue. Marking this issue as 'Resolved' would provide false-alarm. > dataDF.queryExecution.toRdd corrupt rows > ---------------------------------------- > > Key: SPARK-26880 > URL: https://issues.apache.org/jira/browse/SPARK-26880 > Project: Spark > Issue Type: Bug > Components: SQL > Affects Versions: 2.4.0 > Reporter: Grant Henke > Priority: Major > > I have seen a simple case where InternalRows returned by > `queryExecution.toRdd` are corrupt. Some rows are duplicated while other are > missing. > This simple test illustrates the issue: > {code} > import org.apache.spark.SparkConf > import org.apache.spark.sql.Row > import org.apache.spark.sql.SparkSession > import org.apache.spark.sql.catalyst.encoders.RowEncoder > import org.apache.spark.sql.types.IntegerType > import org.apache.spark.sql.types.StringType > import org.apache.spark.sql.types.StructField > import org.apache.spark.sql.types.StructType > import org.junit.Assert._ > import org.junit.Test > import org.scalatest.Matchers > import org.scalatest.junit.JUnitSuite > import org.slf4j.Logger > import org.slf4j.LoggerFactory > class SparkTest extends JUnitSuite with Matchers { > val Log: Logger = LoggerFactory.getLogger(getClass) > @Test > def testSparkRowCorruption(): Unit = { > val conf = new SparkConf() > .setMaster("local[*]") > .setAppName("test") > .set("spark.ui.enabled", "false") > val ss = SparkSession.builder().config(conf).getOrCreate() > // Setup a DataFrame for testing. > val data = Seq( > Row.fromSeq(Seq(0, "0")), > Row.fromSeq(Seq(25, "25")), > Row.fromSeq(Seq(50, "50")), > Row.fromSeq(Seq(75, "75")), > Row.fromSeq(Seq(99, "99")), > Row.fromSeq(Seq(100, "100")), > Row.fromSeq(Seq(101, "101")), > Row.fromSeq(Seq(125, "125")), > Row.fromSeq(Seq(150, "150")), > Row.fromSeq(Seq(175, "175")), > Row.fromSeq(Seq(199, "199")) > ) > val dataRDD = ss.sparkContext.parallelize(data) > val schema = StructType( > Seq( > StructField("key", IntegerType), > StructField("value", StringType) > )) > val dataDF = ss.sqlContext.createDataFrame(dataRDD, schema) > // Convert to an RDD. > val rdd = dataDF.queryExecution.toRdd > > // Collect the data to compare. > val resultData = rdd.collect > resultData.foreach { row => > // Log for visualizing the corruption. > Log.error(s"${row.getInt(0)}") > } > // Ensure the keys in the original data and resulting data match. > val dataKeys = data.map(_.getInt(0)).toSet > val resultKeys = resultData.map(_.getInt(0)).toSet > assertEquals(dataKeys, resultKeys) > } > } > {code} > That test fails with the following: > {noformat} > 10:38:26.967 [ERROR - Test worker] (PartitionByInternalRowTest.scala:57) 0 > 10:38:26.967 [ERROR - Test worker] (PartitionByInternalRowTest.scala:57) 25 > 10:38:26.967 [ERROR - Test worker] (PartitionByInternalRowTest.scala:57) 75 > 10:38:26.968 [ERROR - Test worker] (PartitionByInternalRowTest.scala:57) 75 > 10:38:26.968 [ERROR - Test worker] (PartitionByInternalRowTest.scala:57) 99 > 10:38:26.968 [ERROR - Test worker] (PartitionByInternalRowTest.scala:57) 100 > 10:38:26.969 [ERROR - Test worker] (PartitionByInternalRowTest.scala:57) 125 > 10:38:26.969 [ERROR - Test worker] (PartitionByInternalRowTest.scala:57) 125 > 10:38:26.969 [ERROR - Test worker] (PartitionByInternalRowTest.scala:57) 150 > 10:38:26.970 [ERROR - Test worker] (PartitionByInternalRowTest.scala:57) 199 > 10:38:26.970 [ERROR - Test worker] (PartitionByInternalRowTest.scala:57) 199 > expected:<Set(101, 0, 25, 125, 150, 50, 199, 175, 99, 75, 100)> but > was:<Set(0, 25, 125, 150, 199, 99, 75, 100)> > Expected :Set(101, 0, 25, 125, 150, 50, 199, 175, 99, 75, 100) > Actual :Set(0, 25, 125, 150, 199, 99, 75, 100) > {noformat} > If I map from and InternalRow to a Row the issue goes away: > {code} > val rdd = dataDF.queryExecution.toRdd.mapPartitions { internalRows => > val encoder = RowEncoder.apply(schema).resolveAndBind() > internalRows.map(encoder.fromRow) > } > {code} > Converting with CatalystTypeConverters also appears to resolve the issue: > {code} > val rdd = dataDF.queryExecution.toRdd.mapPartitions { internalRows => > val typeConverter = CatalystTypeConverters.createToScalaConverter(schema) > internalRows.map(ir => typeConverter(ir).asInstanceOf[Row]) > } > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org