[jira] [Updated] (SPARK-26880) dataDF.queryExecution.toRdd corrupt rows

2019-02-14 Thread Grant Henke (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26880?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Grant Henke updated SPARK-26880:

Description: 
I have seen a simple case where InternalRows returned by `queryExecution.toRdd` 
are corrupt. Some rows are duplicated while other are missing. 

This simple test illustrates the issue:
{code}
import org.apache.spark.SparkConf
import org.apache.spark.sql.Row
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.types.IntegerType
import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.types.StructType
import org.junit.Assert._
import org.junit.Test
import org.scalatest.Matchers
import org.scalatest.junit.JUnitSuite
import org.slf4j.Logger
import org.slf4j.LoggerFactory

class SparkTest extends JUnitSuite with Matchers {
  val Log: Logger = LoggerFactory.getLogger(getClass)

  @Test
  def testSparkRowCorruption(): Unit = {
val conf = new SparkConf()
  .setMaster("local[*]")
  .setAppName("test")
  .set("spark.ui.enabled", "false")
val ss = SparkSession.builder().config(conf).getOrCreate()

// Setup a DataFrame for testing.
val data = Seq(
  Row.fromSeq(Seq(0, "0")),
  Row.fromSeq(Seq(25, "25")),
  Row.fromSeq(Seq(50, "50")),
  Row.fromSeq(Seq(75, "75")),
  Row.fromSeq(Seq(99, "99")),
  Row.fromSeq(Seq(100, "100")),
  Row.fromSeq(Seq(101, "101")),
  Row.fromSeq(Seq(125, "125")),
  Row.fromSeq(Seq(150, "150")),
  Row.fromSeq(Seq(175, "175")),
  Row.fromSeq(Seq(199, "199"))
)
val dataRDD = ss.sparkContext.parallelize(data)
val schema = StructType(
  Seq(
StructField("key", IntegerType),
StructField("value", StringType)
  ))
val dataDF = ss.sqlContext.createDataFrame(dataRDD, schema)

// Convert to an RDD.
val rdd = dataDF.queryExecution.toRdd

// Collect the data to compare.
val resultData = rdd.collect
resultData.foreach { row =>
  // Log for visualizing the corruption.
  Log.error(s"${row.getInt(0)}")
}

// Ensure the keys in the original data and resulting data match.
val dataKeys = data.map(_.getInt(0)).toSet
val resultKeys = resultData.map(_.getInt(0)).toSet
assertEquals(dataKeys, resultKeys)
  }

}
{code}
That test fails with the following:
{noformat}
10:38:26.967 [ERROR - Test worker] (PartitionByInternalRowTest.scala:57) 0
10:38:26.967 [ERROR - Test worker] (PartitionByInternalRowTest.scala:57) 25
10:38:26.967 [ERROR - Test worker] (PartitionByInternalRowTest.scala:57) 75
10:38:26.968 [ERROR - Test worker] (PartitionByInternalRowTest.scala:57) 75
10:38:26.968 [ERROR - Test worker] (PartitionByInternalRowTest.scala:57) 99
10:38:26.968 [ERROR - Test worker] (PartitionByInternalRowTest.scala:57) 100
10:38:26.969 [ERROR - Test worker] (PartitionByInternalRowTest.scala:57) 125
10:38:26.969 [ERROR - Test worker] (PartitionByInternalRowTest.scala:57) 125
10:38:26.969 [ERROR - Test worker] (PartitionByInternalRowTest.scala:57) 150
10:38:26.970 [ERROR - Test worker] (PartitionByInternalRowTest.scala:57) 199
10:38:26.970 [ERROR - Test worker] (PartitionByInternalRowTest.scala:57) 199

expected: but was:
Expected :Set(101, 0, 25, 125, 150, 50, 199, 175, 99, 75, 100)
Actual   :Set(0, 25, 125, 150, 199, 99, 75, 100)
{noformat}
If I map from and InternalRow to a Row the issue goes away:
{code}
val rdd = dataDF.queryExecution.toRdd.mapPartitions { internalRows =>
   val encoder = RowEncoder.apply(schema).resolveAndBind()
   internalRows.map(encoder.fromRow)
}
{code}

Converting with CatalystTypeConverters also appears to resolve the issue:
{code}
val rdd = dataDF.queryExecution.toRdd.mapPartitions { internalRows =>
   val typeConverter = CatalystTypeConverters.createToScalaConverter(schema)
   internalRows.map(ir => typeConverter(ir).asInstanceOf[Row])
}
{code}

  was:
I have seen a simple case where InternalRows returned by `queryExecution.toRdd` 
are corrupt. Some rows are duplicated while other are missing. 

This simple test illustrates the issue:
{code}
import org.apache.spark.SparkConf
import org.apache.spark.sql.Row
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.types.IntegerType
import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.types.StructType
import org.junit.Assert._
import org.junit.Test
import org.scalatest.Matchers
import org.scalatest.junit.JUnitSuite
import org.slf4j.Logger
import org.slf4j.LoggerFactory

class SparkTest extends JUnitSuite with Matchers {
  val Log: Logger = LoggerFactory.getLogger(getClass)

  @Test
  def testSparkRowCorruption(): Unit = {
val conf = new SparkConf()
  .setMaster("local[*]")
  .setAppName("test")
  .set("

[jira] [Updated] (SPARK-26880) dataDF.queryExecution.toRdd corrupt rows

2019-02-14 Thread Grant Henke (JIRA)


 [ 
https://issues.apache.org/jira/browse/SPARK-26880?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Grant Henke updated SPARK-26880:

Description: 
I have seen a simple case where InternalRows returned by `queryExecution.toRdd` 
are corrupt. Some rows are duplicated while other are missing. 

This simple test illustrates the issue:
{code}
import org.apache.spark.SparkConf
import org.apache.spark.sql.Row
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.types.IntegerType
import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.types.StructType
import org.junit.Assert._
import org.junit.Test
import org.scalatest.Matchers
import org.scalatest.junit.JUnitSuite
import org.slf4j.Logger
import org.slf4j.LoggerFactory

class SparkTest extends JUnitSuite with Matchers {
  val Log: Logger = LoggerFactory.getLogger(getClass)

  @Test
  def testSparkRowCorruption(): Unit = {
val conf = new SparkConf()
  .setMaster("local[*]")
  .setAppName("test")
  .set("spark.ui.enabled", "false")
val ss = SparkSession.builder().config(conf).getOrCreate()

// Setup a DataFrame for testing.
val data = Seq(
  Row.fromSeq(Seq(0, "0")),
  Row.fromSeq(Seq(25, "25")),
  Row.fromSeq(Seq(50, "50")),
  Row.fromSeq(Seq(75, "75")),
  Row.fromSeq(Seq(99, "99")),
  Row.fromSeq(Seq(100, "100")),
  Row.fromSeq(Seq(101, "101")),
  Row.fromSeq(Seq(125, "125")),
  Row.fromSeq(Seq(150, "150")),
  Row.fromSeq(Seq(175, "175")),
  Row.fromSeq(Seq(199, "199"))
)
val dataRDD = ss.sparkContext.parallelize(data)
val schema = StructType(
  Seq(
StructField("key", IntegerType),
StructField("value", StringType)
  ))
val dataDF = ss.sqlContext.createDataFrame(dataRDD, schema)

// Convert to an RDD.
val rdd = dataDF.queryExecution.toRdd

// Collect the data to compare.
val resultData = rdd.collect
resultData.foreach { row =>
  // Log for visualizing the corruption.
  Log.error(s"${row.getInt(0)}")
}

// Ensure the keys in the original data and resulting data match.
val dataKeys = data.map(_.getInt(0)).toSet
val resultKeys = resultData.map(_.getInt(0)).toSet
assertEquals(dataKeys, resultKeys)
  }

}
{code}
That test fails with the following:
{noformat}
10:38:26.967 [ERROR - Test worker] (PartitionByInternalRowTest.scala:57) 0
10:38:26.967 [ERROR - Test worker] (PartitionByInternalRowTest.scala:57) 25
10:38:26.967 [ERROR - Test worker] (PartitionByInternalRowTest.scala:57) 75
10:38:26.968 [ERROR - Test worker] (PartitionByInternalRowTest.scala:57) 75
10:38:26.968 [ERROR - Test worker] (PartitionByInternalRowTest.scala:57) 99
10:38:26.968 [ERROR - Test worker] (PartitionByInternalRowTest.scala:57) 100
10:38:26.969 [ERROR - Test worker] (PartitionByInternalRowTest.scala:57) 125
10:38:26.969 [ERROR - Test worker] (PartitionByInternalRowTest.scala:57) 125
10:38:26.969 [ERROR - Test worker] (PartitionByInternalRowTest.scala:57) 150
10:38:26.970 [ERROR - Test worker] (PartitionByInternalRowTest.scala:57) 199
10:38:26.970 [ERROR - Test worker] (PartitionByInternalRowTest.scala:57) 199

expected: but was:
Expected :Set(101, 0, 25, 125, 150, 50, 199, 175, 99, 75, 100)
Actual   :Set(0, 25, 125, 150, 199, 99, 75, 100)
{noformat}
If I map from and InternalRow to a Row the issue goes away:
{code}
val rdd = dataDF.queryExecution.toRdd.mapPartitions { internalRows =>
  val encoder = RowEncoder.apply(schema).resolveAndBind()
  internalRows.map(encoder.fromRow)
}
{code}

  was:
I have seen a simple case where InternalRows returned by `queryExecution.toRdd` 
are corrupt. Some rows are duplicated while other are missing. 

This simple test illustrates the issue:
{code:scala}
package org.apache.kudu.spark.kudu
import org.apache.spark.SparkConf
import org.apache.spark.sql.Row
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.types.IntegerType
import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.types.StructType
import org.junit.Assert._
import org.junit.Test
import org.scalatest.Matchers
import org.scalatest.junit.JUnitSuite
import org.slf4j.Logger
import org.slf4j.LoggerFactory

class SparkTest extends JUnitSuite with Matchers {
  val Log: Logger = LoggerFactory.getLogger(getClass)

  @Test
  def testSparkRowCorruption(): Unit = {
val conf = new SparkConf()
  .setMaster("local[*]")
  .setAppName("test")
  .set("spark.ui.enabled", "false")
val ss = SparkSession.builder().config(conf).getOrCreate()

// Setup a DataFrame for testing.
val data = Seq(
  Row.fromSeq(Seq(0, "0")),
  Row.fromSeq(Seq(25, "25")),
  Row.fromSeq(Seq(50, "50")),