[ 
https://issues.apache.org/jira/browse/SPARK-23848?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Joseph K. Bradley resolved SPARK-23848.
---------------------------------------
    Resolution: Not A Problem

> Structured Streaming fails with nested UDTs
> -------------------------------------------
>
>                 Key: SPARK-23848
>                 URL: https://issues.apache.org/jira/browse/SPARK-23848
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 2.3.0
>            Reporter: Joseph K. Bradley
>            Priority: Major
>
> While trying to write a test for org.apache.spark.ml.feature.MinHashLSHModel 
> with Structured Streaming (for prediction in a streaming job), I ran into a 
> bug which seems to indicate that nested UDTs don't work with streaming.
> Here's a simplified version of the code:
> {code}
> package org.apache.spark.ml.feature
> import org.apache.spark.ml.linalg.{Vector, Vectors}
> import org.apache.spark.sql.{DataFrame, Dataset, Encoder, Row}
> import org.apache.spark.sql.execution.streaming.MemoryStream
> import org.apache.spark.sql.streaming.StreamTest
> class MinHashLSHSuite extends StreamTest {
>   @transient var dataset: Dataset[_] = _
>   override def beforeAll(): Unit = {
>     super.beforeAll()
>     val data = {
>       for (i <- 0 to 95) yield Vectors.sparse(100, (i until i + 5).map((_, 
> 1.0)))
>     }
>     dataset = spark.createDataFrame(data.map(Tuple1.apply)).toDF("keys")
>   }
>   test("a test") {
>     val localSpark = spark
>     import localSpark.implicits._
>     val df = Seq[(Int, Array[Vector])](
>       (1, Array(Vectors.dense(1.0, 2.0))),
>       (2, Array(Vectors.dense(1.1, 2.1)))
>     ).toDF("a", "b")
>     df.show()  // THIS SUCCEEDS
>     df.collect().foreach(println)  // THIS SUCCEEDS
>     testTransformerOnStreamData[(Int, Array[Vector])](df) { rows =>  // THIS 
> FAILS
>       rows.foreach {
>         case Row(a: Int, b: Array[_]) =>
>       }
>     }
>   }
>   def testTransformerOnStreamData[A : Encoder](
>       dataframe: DataFrame)
>     (globalCheckFunction: Seq[Row] => Unit): Unit = {
>     val stream = MemoryStream[A]
>     val streamDF = stream.toDS().toDF("a", "b")
>     val data = dataframe.as[A].collect()
>     val streamOutput = streamDF
>       .select("a", "b")
>     testStream(streamOutput) (
>       AddData(stream, data: _*),
>       CheckAnswer(globalCheckFunction)
>     )
>   }
> }
> {code}
> The streaming test fails with stack trace:
> {code}
> [info] - a test *** FAILED *** (2 seconds, 325 milliseconds)
> [info]   scala.MatchError: [1,WrappedArray([1.0,2.0])] (of class 
> org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema)
> [info]   
> [info]   == Progress ==
> [info]      AddData to MemoryStream[_1#24,_2#25]: 
> (1,[Lorg.apache.spark.ml.linalg.Vector;@5abf84a9),(2,[Lorg.apache.spark.ml.linalg.Vector;@4b4198ba)
> [info]   => CheckAnswerByFunc
> [info]   
> [info]   == Stream ==
> [info]   Output Mode: Append
> [info]   Stream state: {MemoryStream[_1#24,_2#25]: 0}
> [info]   Thread state: alive
> [info]   Thread stack trace: java.lang.Thread.sleep(Native Method)
> [info]   
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:163)
> [info]   
> org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
> [info]   
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:131)
> [info]   
> org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:279)
> [info]   
> org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189)
> [info]   
> [info]   
> [info]   == Sink ==
> [info]   0: [1,WrappedArray([1.0,2.0])] [2,WrappedArray([1.1,2.1])]
> [info]   
> [info]   
> [info]   == Plan ==
> [info]   == Parsed Logical Plan ==
> [info]   Project [a#27, b#28]
> [info]   +- Project [_1#24 AS a#27, _2#25 AS b#28]
> [info]      +- Project [_1#36 AS _1#24, _2#37 AS _2#25]
> [info]         +- Streaming RelationV2 MemoryStreamDataSource[_1#36, _2#37]
> [info]   
> [info]   == Analyzed Logical Plan ==
> [info]   a: int, b: array<vector>
> [info]   Project [a#27, b#28]
> [info]   +- Project [_1#24 AS a#27, _2#25 AS b#28]
> [info]      +- Project [_1#36 AS _1#24, _2#37 AS _2#25]
> [info]         +- Streaming RelationV2 MemoryStreamDataSource[_1#36, _2#37]
> [info]   
> [info]   == Optimized Logical Plan ==
> [info]   Project [_1#36 AS a#27, _2#37 AS b#28]
> [info]   +- Streaming RelationV2 MemoryStreamDataSource[_1#36, _2#37]
> [info]   
> [info]   == Physical Plan ==
> [info]   *(1) Project [_1#36 AS a#27, _2#37 AS b#28]
> [info]   +- *(1) ScanV2 MemoryStreamDataSource[_1#36, _2#37] 
> (StreamTest.scala:430)
> [info]   org.scalatest.exceptions.TestFailedException:
> [info]   at 
> org.scalatest.Assertions$class.newAssertionFailedException(Assertions.scala:528)
> [info]   at 
> org.scalatest.FunSuite.newAssertionFailedException(FunSuite.scala:1560)
> [info]   at org.scalatest.Assertions$class.fail(Assertions.scala:1089)
> [info]   at org.scalatest.FunSuite.fail(FunSuite.scala:1560)
> [info]   at 
> org.apache.spark.sql.streaming.StreamTest$class.failTest$1(StreamTest.scala:430)
> [info]   at 
> org.apache.spark.sql.streaming.StreamTest$class.executeAction$1(StreamTest.scala:683)
> [info]   at 
> org.apache.spark.sql.streaming.StreamTest$$anonfun$liftedTree1$1$1.apply(StreamTest.scala:704)
> [info]   at 
> org.apache.spark.sql.streaming.StreamTest$$anonfun$liftedTree1$1$1.apply(StreamTest.scala:693)
> [info]   at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> [info]   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
> [info]   at 
> org.apache.spark.sql.streaming.StreamTest$class.liftedTree1$1(StreamTest.scala:693)
> [info]   at 
> org.apache.spark.sql.streaming.StreamTest$class.testStream(StreamTest.scala:692)
> [info]   at 
> org.apache.spark.ml.feature.MinHashLSHSuite.testStream(MinHashLSHSuite.scala:28)
> [info]   at 
> org.apache.spark.ml.feature.MinHashLSHSuite.testTransformerOnStreamData(MinHashLSHSuite.scala:201)
> [info]   at 
> org.apache.spark.ml.feature.MinHashLSHSuite$$anonfun$1.apply$mcV$sp(MinHashLSHSuite.scala:184)
> [info]   at 
> org.apache.spark.ml.feature.MinHashLSHSuite$$anonfun$1.apply(MinHashLSHSuite.scala:174)
> [info]   at 
> org.apache.spark.ml.feature.MinHashLSHSuite$$anonfun$1.apply(MinHashLSHSuite.scala:174)
> [info]   at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
> [info]   at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
> [info]   at org.scalatest.Transformer.apply(Transformer.scala:22)
> [info]   at org.scalatest.Transformer.apply(Transformer.scala:20)
> [info]   at org.scalatest.FunSuiteLike$$anon$1.apply(FunSuiteLike.scala:186)
> [info]   at 
> org.apache.spark.SparkFunSuite.withFixture(SparkFunSuite.scala:103)
> [info]   at 
> org.scalatest.FunSuiteLike$class.invokeWithFixture$1(FunSuiteLike.scala:183)
> [info]   at 
> org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:196)
> [info]   at 
> org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:196)
> [info]   at org.scalatest.SuperEngine.runTestImpl(Engine.scala:289)
> [info]   at org.scalatest.FunSuiteLike$class.runTest(FunSuiteLike.scala:196)
> [info]   at 
> org.apache.spark.ml.feature.MinHashLSHSuite.org$scalatest$BeforeAndAfterEach$$super$runTest(MinHashLSHSuite.scala:28)
> [info]   at 
> org.scalatest.BeforeAndAfterEach$class.runTest(BeforeAndAfterEach.scala:221)
> [info]   at 
> org.apache.spark.ml.feature.MinHashLSHSuite.runTest(MinHashLSHSuite.scala:28)
> [info]   at 
> org.scalatest.FunSuiteLike$$anonfun$runTests$1.apply(FunSuiteLike.scala:229)
> [info]   at 
> org.scalatest.FunSuiteLike$$anonfun$runTests$1.apply(FunSuiteLike.scala:229)
> [info]   at 
> org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:396)
> [info]   at 
> org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:384)
> [info]   at scala.collection.immutable.List.foreach(List.scala:381)
> [info]   at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:384)
> [info]   at 
> org.scalatest.SuperEngine.org$scalatest$SuperEngine$$runTestsInBranch(Engine.scala:379)
> [info]   at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:461)
> [info]   at org.scalatest.FunSuiteLike$class.runTests(FunSuiteLike.scala:229)
> [info]   at org.scalatest.FunSuite.runTests(FunSuite.scala:1560)
> [info]   at org.scalatest.Suite$class.run(Suite.scala:1147)
> [info]   at 
> org.scalatest.FunSuite.org$scalatest$FunSuiteLike$$super$run(FunSuite.scala:1560)
> [info]   at 
> org.scalatest.FunSuiteLike$$anonfun$run$1.apply(FunSuiteLike.scala:233)
> [info]   at 
> org.scalatest.FunSuiteLike$$anonfun$run$1.apply(FunSuiteLike.scala:233)
> [info]   at org.scalatest.SuperEngine.runImpl(Engine.scala:521)
> [info]   at org.scalatest.FunSuiteLike$class.run(FunSuiteLike.scala:233)
> [info]   at 
> org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterAll$$super$run(SparkFunSuite.scala:52)
> [info]   at 
> org.scalatest.BeforeAndAfterAll$class.liftedTree1$1(BeforeAndAfterAll.scala:213)
> [info]   at 
> org.scalatest.BeforeAndAfterAll$class.run(BeforeAndAfterAll.scala:210)
> [info]   at org.apache.spark.SparkFunSuite.run(SparkFunSuite.scala:52)
> [info]   at 
> org.scalatest.tools.Framework.org$scalatest$tools$Framework$$runSuite(Framework.scala:314)
> [info]   at 
> org.scalatest.tools.Framework$ScalaTestTask.execute(Framework.scala:480)
> [info]   at sbt.ForkMain$Run$2.call(ForkMain.java:296)
> [info]   at sbt.ForkMain$Run$2.call(ForkMain.java:286)
> [info]   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> [info]   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> [info]   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> [info]   at java.lang.Thread.run(Thread.java:745)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to