[jira] [Commented] (SPARK-23848) Structured Streaming fails with nested UDTs
[ https://issues.apache.org/jira/browse/SPARK-23848?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16423084#comment-16423084 ] Joseph K. Bradley commented on SPARK-23848: --- Whoops! Sorry, I should have caught that. I'll close this... > Structured Streaming fails with nested UDTs > --- > > Key: SPARK-23848 > URL: https://issues.apache.org/jira/browse/SPARK-23848 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.3.0 >Reporter: Joseph K. Bradley >Priority: Major > > While trying to write a test for org.apache.spark.ml.feature.MinHashLSHModel > with Structured Streaming (for prediction in a streaming job), I ran into a > bug which seems to indicate that nested UDTs don't work with streaming. > Here's a simplified version of the code: > {code} > package org.apache.spark.ml.feature > import org.apache.spark.ml.linalg.{Vector, Vectors} > import org.apache.spark.sql.{DataFrame, Dataset, Encoder, Row} > import org.apache.spark.sql.execution.streaming.MemoryStream > import org.apache.spark.sql.streaming.StreamTest > class MinHashLSHSuite extends StreamTest { > @transient var dataset: Dataset[_] = _ > override def beforeAll(): Unit = { > super.beforeAll() > val data = { > for (i <- 0 to 95) yield Vectors.sparse(100, (i until i + 5).map((_, > 1.0))) > } > dataset = spark.createDataFrame(data.map(Tuple1.apply)).toDF("keys") > } > test("a test") { > val localSpark = spark > import localSpark.implicits._ > val df = Seq[(Int, Array[Vector])]( > (1, Array(Vectors.dense(1.0, 2.0))), > (2, Array(Vectors.dense(1.1, 2.1))) > ).toDF("a", "b") > df.show() // THIS SUCCEEDS > df.collect().foreach(println) // THIS SUCCEEDS > testTransformerOnStreamData[(Int, Array[Vector])](df) { rows => // THIS > FAILS > rows.foreach { > case Row(a: Int, b: Array[_]) => > } > } > } > def testTransformerOnStreamData[A : Encoder]( > dataframe: DataFrame) > (globalCheckFunction: Seq[Row] => Unit): Unit = { > val stream = MemoryStream[A] > val streamDF = stream.toDS().toDF("a", "b") > val data = dataframe.as[A].collect() > val streamOutput = streamDF > .select("a", "b") > testStream(streamOutput) ( > AddData(stream, data: _*), > CheckAnswer(globalCheckFunction) > ) > } > } > {code} > The streaming test fails with stack trace: > {code} > [info] - a test *** FAILED *** (2 seconds, 325 milliseconds) > [info] scala.MatchError: [1,WrappedArray([1.0,2.0])] (of class > org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema) > [info] > [info] == Progress == > [info] AddData to MemoryStream[_1#24,_2#25]: > (1,[Lorg.apache.spark.ml.linalg.Vector;@5abf84a9),(2,[Lorg.apache.spark.ml.linalg.Vector;@4b4198ba) > [info] => CheckAnswerByFunc > [info] > [info] == Stream == > [info] Output Mode: Append > [info] Stream state: {MemoryStream[_1#24,_2#25]: 0} > [info] Thread state: alive > [info] Thread stack trace: java.lang.Thread.sleep(Native Method) > [info] > org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:163) > [info] > org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56) > [info] > org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:131) > [info] > org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:279) > [info] > org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189) > [info] > [info] > [info] == Sink == > [info] 0: [1,WrappedArray([1.0,2.0])] [2,WrappedArray([1.1,2.1])] > [info] > [info] > [info] == Plan == > [info] == Parsed Logical Plan == > [info] Project [a#27, b#28] > [info] +- Project [_1#24 AS a#27, _2#25 AS b#28] > [info] +- Project [_1#36 AS _1#24, _2#37 AS _2#25] > [info] +- Streaming RelationV2 MemoryStreamDataSource[_1#36, _2#37] > [info] > [info] == Analyzed Logical Plan == > [info] a: int, b: array > [info] Project [a#27, b#28] > [info] +- Project [_1#24 AS a#27, _2#25 AS b#28] > [info] +- Project [_1#36 AS _1#24, _2#37 AS _2#25] > [info] +- Streaming RelationV2 MemoryStreamDataSource[_1#36, _2#37] > [info] > [info] == Optimized Logical Plan == > [info] Project [_1#36 AS a#27, _2#37 AS b#28] > [info] +- Streaming RelationV2 MemoryStreamDataSource[_1#36, _2#37] > [info] > [info] == Physical Plan == > [info] *(1) Project [_1#36 AS a#27, _2#37 AS b#28] > [info] +- *(1) ScanV2 MemoryStreamDataSource[_1#36, _2#37] >
[jira] [Commented] (SPARK-23848) Structured Streaming fails with nested UDTs
[ https://issues.apache.org/jira/browse/SPARK-23848?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16423071#comment-16423071 ] Shixiong Zhu commented on SPARK-23848: -- To fix your codes, you can just change "case Row(a: Int, b: Array[_]) =>" to "case Row(a: Int, b: Seq[_]) =>". > Structured Streaming fails with nested UDTs > --- > > Key: SPARK-23848 > URL: https://issues.apache.org/jira/browse/SPARK-23848 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.3.0 >Reporter: Joseph K. Bradley >Priority: Major > > While trying to write a test for org.apache.spark.ml.feature.MinHashLSHModel > with Structured Streaming (for prediction in a streaming job), I ran into a > bug which seems to indicate that nested UDTs don't work with streaming. > Here's a simplified version of the code: > {code} > package org.apache.spark.ml.feature > import org.apache.spark.ml.linalg.{Vector, Vectors} > import org.apache.spark.sql.{DataFrame, Dataset, Encoder, Row} > import org.apache.spark.sql.execution.streaming.MemoryStream > import org.apache.spark.sql.streaming.StreamTest > class MinHashLSHSuite extends StreamTest { > @transient var dataset: Dataset[_] = _ > override def beforeAll(): Unit = { > super.beforeAll() > val data = { > for (i <- 0 to 95) yield Vectors.sparse(100, (i until i + 5).map((_, > 1.0))) > } > dataset = spark.createDataFrame(data.map(Tuple1.apply)).toDF("keys") > } > test("a test") { > val localSpark = spark > import localSpark.implicits._ > val df = Seq[(Int, Array[Vector])]( > (1, Array(Vectors.dense(1.0, 2.0))), > (2, Array(Vectors.dense(1.1, 2.1))) > ).toDF("a", "b") > df.show() // THIS SUCCEEDS > df.collect().foreach(println) // THIS SUCCEEDS > testTransformerOnStreamData[(Int, Array[Vector])](df) { rows => // THIS > FAILS > rows.foreach { > case Row(a: Int, b: Array[_]) => > } > } > } > def testTransformerOnStreamData[A : Encoder]( > dataframe: DataFrame) > (globalCheckFunction: Seq[Row] => Unit): Unit = { > val stream = MemoryStream[A] > val streamDF = stream.toDS().toDF("a", "b") > val data = dataframe.as[A].collect() > val streamOutput = streamDF > .select("a", "b") > testStream(streamOutput) ( > AddData(stream, data: _*), > CheckAnswer(globalCheckFunction) > ) > } > } > {code} > The streaming test fails with stack trace: > {code} > [info] - a test *** FAILED *** (2 seconds, 325 milliseconds) > [info] scala.MatchError: [1,WrappedArray([1.0,2.0])] (of class > org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema) > [info] > [info] == Progress == > [info] AddData to MemoryStream[_1#24,_2#25]: > (1,[Lorg.apache.spark.ml.linalg.Vector;@5abf84a9),(2,[Lorg.apache.spark.ml.linalg.Vector;@4b4198ba) > [info] => CheckAnswerByFunc > [info] > [info] == Stream == > [info] Output Mode: Append > [info] Stream state: {MemoryStream[_1#24,_2#25]: 0} > [info] Thread state: alive > [info] Thread stack trace: java.lang.Thread.sleep(Native Method) > [info] > org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:163) > [info] > org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56) > [info] > org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:131) > [info] > org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:279) > [info] > org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189) > [info] > [info] > [info] == Sink == > [info] 0: [1,WrappedArray([1.0,2.0])] [2,WrappedArray([1.1,2.1])] > [info] > [info] > [info] == Plan == > [info] == Parsed Logical Plan == > [info] Project [a#27, b#28] > [info] +- Project [_1#24 AS a#27, _2#25 AS b#28] > [info] +- Project [_1#36 AS _1#24, _2#37 AS _2#25] > [info] +- Streaming RelationV2 MemoryStreamDataSource[_1#36, _2#37] > [info] > [info] == Analyzed Logical Plan == > [info] a: int, b: array > [info] Project [a#27, b#28] > [info] +- Project [_1#24 AS a#27, _2#25 AS b#28] > [info] +- Project [_1#36 AS _1#24, _2#37 AS _2#25] > [info] +- Streaming RelationV2 MemoryStreamDataSource[_1#36, _2#37] > [info] > [info] == Optimized Logical Plan == > [info] Project [_1#36 AS a#27, _2#37 AS b#28] > [info] +- Streaming RelationV2 MemoryStreamDataSource[_1#36, _2#37] > [info] > [info] == Physical Plan == > [info] *(1) Project [_1#36 AS a#27, _2#37 AS b#28] > [info] +- *(1) ScanV2
[jira] [Commented] (SPARK-23848) Structured Streaming fails with nested UDTs
[ https://issues.apache.org/jira/browse/SPARK-23848?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16423070#comment-16423070 ] Shixiong Zhu commented on SPARK-23848: -- [~josephkb] this is unfortunately because DataFrame is not a type safe API. It always converts an array to Seq (WrappedArray as reported in the error message). This is the converter if you are curious: https://github.com/apache/spark/blob/7fdacbc77bbcf98c2c045a1873e749129769dcc0/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala#L176 > Structured Streaming fails with nested UDTs > --- > > Key: SPARK-23848 > URL: https://issues.apache.org/jira/browse/SPARK-23848 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.3.0 >Reporter: Joseph K. Bradley >Priority: Major > > While trying to write a test for org.apache.spark.ml.feature.MinHashLSHModel > with Structured Streaming (for prediction in a streaming job), I ran into a > bug which seems to indicate that nested UDTs don't work with streaming. > Here's a simplified version of the code: > {code} > package org.apache.spark.ml.feature > import org.apache.spark.ml.linalg.{Vector, Vectors} > import org.apache.spark.sql.{DataFrame, Dataset, Encoder, Row} > import org.apache.spark.sql.execution.streaming.MemoryStream > import org.apache.spark.sql.streaming.StreamTest > class MinHashLSHSuite extends StreamTest { > @transient var dataset: Dataset[_] = _ > override def beforeAll(): Unit = { > super.beforeAll() > val data = { > for (i <- 0 to 95) yield Vectors.sparse(100, (i until i + 5).map((_, > 1.0))) > } > dataset = spark.createDataFrame(data.map(Tuple1.apply)).toDF("keys") > } > test("a test") { > val localSpark = spark > import localSpark.implicits._ > val df = Seq[(Int, Array[Vector])]( > (1, Array(Vectors.dense(1.0, 2.0))), > (2, Array(Vectors.dense(1.1, 2.1))) > ).toDF("a", "b") > df.show() // THIS SUCCEEDS > df.collect().foreach(println) // THIS SUCCEEDS > testTransformerOnStreamData[(Int, Array[Vector])](df) { rows => // THIS > FAILS > rows.foreach { > case Row(a: Int, b: Array[_]) => > } > } > } > def testTransformerOnStreamData[A : Encoder]( > dataframe: DataFrame) > (globalCheckFunction: Seq[Row] => Unit): Unit = { > val stream = MemoryStream[A] > val streamDF = stream.toDS().toDF("a", "b") > val data = dataframe.as[A].collect() > val streamOutput = streamDF > .select("a", "b") > testStream(streamOutput) ( > AddData(stream, data: _*), > CheckAnswer(globalCheckFunction) > ) > } > } > {code} > The streaming test fails with stack trace: > {code} > [info] - a test *** FAILED *** (2 seconds, 325 milliseconds) > [info] scala.MatchError: [1,WrappedArray([1.0,2.0])] (of class > org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema) > [info] > [info] == Progress == > [info] AddData to MemoryStream[_1#24,_2#25]: > (1,[Lorg.apache.spark.ml.linalg.Vector;@5abf84a9),(2,[Lorg.apache.spark.ml.linalg.Vector;@4b4198ba) > [info] => CheckAnswerByFunc > [info] > [info] == Stream == > [info] Output Mode: Append > [info] Stream state: {MemoryStream[_1#24,_2#25]: 0} > [info] Thread state: alive > [info] Thread stack trace: java.lang.Thread.sleep(Native Method) > [info] > org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:163) > [info] > org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56) > [info] > org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:131) > [info] > org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:279) > [info] > org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189) > [info] > [info] > [info] == Sink == > [info] 0: [1,WrappedArray([1.0,2.0])] [2,WrappedArray([1.1,2.1])] > [info] > [info] > [info] == Plan == > [info] == Parsed Logical Plan == > [info] Project [a#27, b#28] > [info] +- Project [_1#24 AS a#27, _2#25 AS b#28] > [info] +- Project [_1#36 AS _1#24, _2#37 AS _2#25] > [info] +- Streaming RelationV2 MemoryStreamDataSource[_1#36, _2#37] > [info] > [info] == Analyzed Logical Plan == > [info] a: int, b: array > [info] Project [a#27, b#28] > [info] +- Project [_1#24 AS a#27, _2#25 AS b#28] > [info] +- Project [_1#36 AS _1#24, _2#37 AS _2#25] > [info] +- Streaming RelationV2 MemoryStreamDataSource[_1#36, _2#37] > [info] > [info] == Optimized Logical Plan