[jira] [Updated] (SPARK-26880) dataDF.queryExecution.toRdd corrupt rows
[ https://issues.apache.org/jira/browse/SPARK-26880?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Grant Henke updated SPARK-26880: Description: I have seen a simple case where InternalRows returned by `queryExecution.toRdd` are corrupt. Some rows are duplicated while other are missing. This simple test illustrates the issue: {code} import org.apache.spark.SparkConf import org.apache.spark.sql.Row import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.encoders.RowEncoder import org.apache.spark.sql.types.IntegerType import org.apache.spark.sql.types.StringType import org.apache.spark.sql.types.StructField import org.apache.spark.sql.types.StructType import org.junit.Assert._ import org.junit.Test import org.scalatest.Matchers import org.scalatest.junit.JUnitSuite import org.slf4j.Logger import org.slf4j.LoggerFactory class SparkTest extends JUnitSuite with Matchers { val Log: Logger = LoggerFactory.getLogger(getClass) @Test def testSparkRowCorruption(): Unit = { val conf = new SparkConf() .setMaster("local[*]") .setAppName("test") .set("spark.ui.enabled", "false") val ss = SparkSession.builder().config(conf).getOrCreate() // Setup a DataFrame for testing. val data = Seq( Row.fromSeq(Seq(0, "0")), Row.fromSeq(Seq(25, "25")), Row.fromSeq(Seq(50, "50")), Row.fromSeq(Seq(75, "75")), Row.fromSeq(Seq(99, "99")), Row.fromSeq(Seq(100, "100")), Row.fromSeq(Seq(101, "101")), Row.fromSeq(Seq(125, "125")), Row.fromSeq(Seq(150, "150")), Row.fromSeq(Seq(175, "175")), Row.fromSeq(Seq(199, "199")) ) val dataRDD = ss.sparkContext.parallelize(data) val schema = StructType( Seq( StructField("key", IntegerType), StructField("value", StringType) )) val dataDF = ss.sqlContext.createDataFrame(dataRDD, schema) // Convert to an RDD. val rdd = dataDF.queryExecution.toRdd // Collect the data to compare. val resultData = rdd.collect resultData.foreach { row => // Log for visualizing the corruption. Log.error(s"${row.getInt(0)}") } // Ensure the keys in the original data and resulting data match. val dataKeys = data.map(_.getInt(0)).toSet val resultKeys = resultData.map(_.getInt(0)).toSet assertEquals(dataKeys, resultKeys) } } {code} That test fails with the following: {noformat} 10:38:26.967 [ERROR - Test worker] (PartitionByInternalRowTest.scala:57) 0 10:38:26.967 [ERROR - Test worker] (PartitionByInternalRowTest.scala:57) 25 10:38:26.967 [ERROR - Test worker] (PartitionByInternalRowTest.scala:57) 75 10:38:26.968 [ERROR - Test worker] (PartitionByInternalRowTest.scala:57) 75 10:38:26.968 [ERROR - Test worker] (PartitionByInternalRowTest.scala:57) 99 10:38:26.968 [ERROR - Test worker] (PartitionByInternalRowTest.scala:57) 100 10:38:26.969 [ERROR - Test worker] (PartitionByInternalRowTest.scala:57) 125 10:38:26.969 [ERROR - Test worker] (PartitionByInternalRowTest.scala:57) 125 10:38:26.969 [ERROR - Test worker] (PartitionByInternalRowTest.scala:57) 150 10:38:26.970 [ERROR - Test worker] (PartitionByInternalRowTest.scala:57) 199 10:38:26.970 [ERROR - Test worker] (PartitionByInternalRowTest.scala:57) 199 expected: but was: Expected :Set(101, 0, 25, 125, 150, 50, 199, 175, 99, 75, 100) Actual :Set(0, 25, 125, 150, 199, 99, 75, 100) {noformat} If I map from and InternalRow to a Row the issue goes away: {code} val rdd = dataDF.queryExecution.toRdd.mapPartitions { internalRows => val encoder = RowEncoder.apply(schema).resolveAndBind() internalRows.map(encoder.fromRow) } {code} Converting with CatalystTypeConverters also appears to resolve the issue: {code} val rdd = dataDF.queryExecution.toRdd.mapPartitions { internalRows => val typeConverter = CatalystTypeConverters.createToScalaConverter(schema) internalRows.map(ir => typeConverter(ir).asInstanceOf[Row]) } {code} was: I have seen a simple case where InternalRows returned by `queryExecution.toRdd` are corrupt. Some rows are duplicated while other are missing. This simple test illustrates the issue: {code} import org.apache.spark.SparkConf import org.apache.spark.sql.Row import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.encoders.RowEncoder import org.apache.spark.sql.types.IntegerType import org.apache.spark.sql.types.StringType import org.apache.spark.sql.types.StructField import org.apache.spark.sql.types.StructType import org.junit.Assert._ import org.junit.Test import org.scalatest.Matchers import org.scalatest.junit.JUnitSuite import org.slf4j.Logger import org.slf4j.LoggerFactory class SparkTest extends JUnitSuite with Matchers { val Log: Logger = LoggerFactory.getLogger(getClass) @Test def testSparkRowCorruption(): Unit = { val conf = new SparkConf() .setMaster("local[*]") .setAppName("test") .set("
[jira] [Updated] (SPARK-26880) dataDF.queryExecution.toRdd corrupt rows
[ https://issues.apache.org/jira/browse/SPARK-26880?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Grant Henke updated SPARK-26880: Description: I have seen a simple case where InternalRows returned by `queryExecution.toRdd` are corrupt. Some rows are duplicated while other are missing. This simple test illustrates the issue: {code} import org.apache.spark.SparkConf import org.apache.spark.sql.Row import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.encoders.RowEncoder import org.apache.spark.sql.types.IntegerType import org.apache.spark.sql.types.StringType import org.apache.spark.sql.types.StructField import org.apache.spark.sql.types.StructType import org.junit.Assert._ import org.junit.Test import org.scalatest.Matchers import org.scalatest.junit.JUnitSuite import org.slf4j.Logger import org.slf4j.LoggerFactory class SparkTest extends JUnitSuite with Matchers { val Log: Logger = LoggerFactory.getLogger(getClass) @Test def testSparkRowCorruption(): Unit = { val conf = new SparkConf() .setMaster("local[*]") .setAppName("test") .set("spark.ui.enabled", "false") val ss = SparkSession.builder().config(conf).getOrCreate() // Setup a DataFrame for testing. val data = Seq( Row.fromSeq(Seq(0, "0")), Row.fromSeq(Seq(25, "25")), Row.fromSeq(Seq(50, "50")), Row.fromSeq(Seq(75, "75")), Row.fromSeq(Seq(99, "99")), Row.fromSeq(Seq(100, "100")), Row.fromSeq(Seq(101, "101")), Row.fromSeq(Seq(125, "125")), Row.fromSeq(Seq(150, "150")), Row.fromSeq(Seq(175, "175")), Row.fromSeq(Seq(199, "199")) ) val dataRDD = ss.sparkContext.parallelize(data) val schema = StructType( Seq( StructField("key", IntegerType), StructField("value", StringType) )) val dataDF = ss.sqlContext.createDataFrame(dataRDD, schema) // Convert to an RDD. val rdd = dataDF.queryExecution.toRdd // Collect the data to compare. val resultData = rdd.collect resultData.foreach { row => // Log for visualizing the corruption. Log.error(s"${row.getInt(0)}") } // Ensure the keys in the original data and resulting data match. val dataKeys = data.map(_.getInt(0)).toSet val resultKeys = resultData.map(_.getInt(0)).toSet assertEquals(dataKeys, resultKeys) } } {code} That test fails with the following: {noformat} 10:38:26.967 [ERROR - Test worker] (PartitionByInternalRowTest.scala:57) 0 10:38:26.967 [ERROR - Test worker] (PartitionByInternalRowTest.scala:57) 25 10:38:26.967 [ERROR - Test worker] (PartitionByInternalRowTest.scala:57) 75 10:38:26.968 [ERROR - Test worker] (PartitionByInternalRowTest.scala:57) 75 10:38:26.968 [ERROR - Test worker] (PartitionByInternalRowTest.scala:57) 99 10:38:26.968 [ERROR - Test worker] (PartitionByInternalRowTest.scala:57) 100 10:38:26.969 [ERROR - Test worker] (PartitionByInternalRowTest.scala:57) 125 10:38:26.969 [ERROR - Test worker] (PartitionByInternalRowTest.scala:57) 125 10:38:26.969 [ERROR - Test worker] (PartitionByInternalRowTest.scala:57) 150 10:38:26.970 [ERROR - Test worker] (PartitionByInternalRowTest.scala:57) 199 10:38:26.970 [ERROR - Test worker] (PartitionByInternalRowTest.scala:57) 199 expected: but was: Expected :Set(101, 0, 25, 125, 150, 50, 199, 175, 99, 75, 100) Actual :Set(0, 25, 125, 150, 199, 99, 75, 100) {noformat} If I map from and InternalRow to a Row the issue goes away: {code} val rdd = dataDF.queryExecution.toRdd.mapPartitions { internalRows => val encoder = RowEncoder.apply(schema).resolveAndBind() internalRows.map(encoder.fromRow) } {code} was: I have seen a simple case where InternalRows returned by `queryExecution.toRdd` are corrupt. Some rows are duplicated while other are missing. This simple test illustrates the issue: {code:scala} package org.apache.kudu.spark.kudu import org.apache.spark.SparkConf import org.apache.spark.sql.Row import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.encoders.RowEncoder import org.apache.spark.sql.types.IntegerType import org.apache.spark.sql.types.StringType import org.apache.spark.sql.types.StructField import org.apache.spark.sql.types.StructType import org.junit.Assert._ import org.junit.Test import org.scalatest.Matchers import org.scalatest.junit.JUnitSuite import org.slf4j.Logger import org.slf4j.LoggerFactory class SparkTest extends JUnitSuite with Matchers { val Log: Logger = LoggerFactory.getLogger(getClass) @Test def testSparkRowCorruption(): Unit = { val conf = new SparkConf() .setMaster("local[*]") .setAppName("test") .set("spark.ui.enabled", "false") val ss = SparkSession.builder().config(conf).getOrCreate() // Setup a DataFrame for testing. val data = Seq( Row.fromSeq(Seq(0, "0")), Row.fromSeq(Seq(25, "25")), Row.fromSeq(Seq(50, "50")),