Michael Munday created SPARK-32929:
--------------------------------------

             Summary: StreamSuite failure on IBM Z: - SPARK-20432: union one 
stream with itself
                 Key: SPARK-32929
                 URL: https://issues.apache.org/jira/browse/SPARK-32929
             Project: Spark
          Issue Type: Bug
          Components: SQL
    Affects Versions: 3.0.1
         Environment: openjdk version "11.0.8" 2020-07-14
OpenJDK Runtime Environment AdoptOpenJDK (build 11.0.8+10)
OpenJDK 64-Bit Server VM AdoptOpenJDK (build 11.0.8+10, mixed mode)

Linux 4.15.0-117-generic #118-Ubuntu SMP Fri Sep 4 20:00:20 UTC 2020 s390x 
s390x s390x GNU/Linux

            Reporter: Michael Munday


I am getting zeros in the output of this test on IBM Z. This is a big-endian 
system. See error below.

I think this issue is related to the use of {{IntegerType}} in the schema for 
{{FakeDefaultSource}}. Modifying the schema to use {{LongType}} fixes the 
issue. Another workaround is to remove {{.select("a")}} (see patch below).

My working theory is that long data (longs are generated by Range) is being 
read using unsafe int operations (as specified in the schema). This would 
'work' on little-endian systems but not big-endian systems. I'm still working 
to figure out what the mechanism is and I'd appreciate any hints or insights.

The error looks like this:
{noformat}
- SPARK-20432: union one stream with itself *** FAILED ***
  Decoded objects do not match expected objects:
  expected: WrappedArray(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 0, 1, 2, 3, 4, 5, 6, 
7, 8, 9, 10)
  actual:   WrappedArray(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 
0, 0, 0, 0)
  assertnotnull(upcast(getcolumnbyordinal(0, LongType), LongType, - root class: 
"scala.Long"))
  +- upcast(getcolumnbyordinal(0, LongType), LongType, - root class: 
"scala.Long")
     +- getcolumnbyordinal(0, LongType) (QueryTest.scala:88)
{noformat}
This change fixes the issue: 
{code:java}
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
@@ -45,7 +45,7 @@ import org.apache.spark.sql.functions._
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.sources.StreamSourceProvider
 import org.apache.spark.sql.streaming.util.{BlockOnStopSourceProvider, 
StreamManualClock}
-import org.apache.spark.sql.types.{IntegerType, StructField, StructType}
+import org.apache.spark.sql.types.{IntegerType, LongType, StructField, 
StructType}
 import org.apache.spark.util.Utils

 class StreamSuite extends StreamTest {
@@ -1265,7 +1265,7 @@ class StreamSuite extends StreamTest {
 }

 abstract class FakeSource extends StreamSourceProvider {
-  private val fakeSchema = StructType(StructField("a", IntegerType) :: Nil)
+  private val fakeSchema = StructType(StructField("a", LongType) :: Nil)

   override def sourceSchema(
       spark: SQLContext,
@@ -1287,7 +1287,7 @@ class FakeDefaultSource extends FakeSource {
     new Source {
       private var offset = -1L

-      override def schema: StructType = StructType(StructField("a", 
IntegerType) :: Nil)
+      override def schema: StructType = StructType(StructField("a", LongType) 
:: Nil)

       override def getOffset: Option[Offset] = {
         if (offset >= 10) {

{code}
Alternatively, this change also fixes the issue:
{code:java}
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
@@ -154,7 +154,7 @@ class StreamSuite extends StreamTest {
   }
 
   test("SPARK-20432: union one stream with itself") {
-    val df = 
spark.readStream.format(classOf[FakeDefaultSource].getName).load().select("a")
+    val df = spark.readStream.format(classOf[FakeDefaultSource].getName).load()
     val unioned = df.union(df)
     withTempDir { outputDir =>
       withTempDir { checkpointDir =>
{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to