[jira] [Commented] (SPARK-23848) Structured Streaming fails with nested UDTs

2018-04-02 Thread Joseph K. Bradley (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23848?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16423084#comment-16423084
 ] 

Joseph K. Bradley commented on SPARK-23848:
---

Whoops!  Sorry, I should have caught that.  I'll close this...

> Structured Streaming fails with nested UDTs
> ---
>
> Key: SPARK-23848
> URL: https://issues.apache.org/jira/browse/SPARK-23848
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Joseph K. Bradley
>Priority: Major
>
> While trying to write a test for org.apache.spark.ml.feature.MinHashLSHModel 
> with Structured Streaming (for prediction in a streaming job), I ran into a 
> bug which seems to indicate that nested UDTs don't work with streaming.
> Here's a simplified version of the code:
> {code}
> package org.apache.spark.ml.feature
> import org.apache.spark.ml.linalg.{Vector, Vectors}
> import org.apache.spark.sql.{DataFrame, Dataset, Encoder, Row}
> import org.apache.spark.sql.execution.streaming.MemoryStream
> import org.apache.spark.sql.streaming.StreamTest
> class MinHashLSHSuite extends StreamTest {
>   @transient var dataset: Dataset[_] = _
>   override def beforeAll(): Unit = {
> super.beforeAll()
> val data = {
>   for (i <- 0 to 95) yield Vectors.sparse(100, (i until i + 5).map((_, 
> 1.0)))
> }
> dataset = spark.createDataFrame(data.map(Tuple1.apply)).toDF("keys")
>   }
>   test("a test") {
> val localSpark = spark
> import localSpark.implicits._
> val df = Seq[(Int, Array[Vector])](
>   (1, Array(Vectors.dense(1.0, 2.0))),
>   (2, Array(Vectors.dense(1.1, 2.1)))
> ).toDF("a", "b")
> df.show()  // THIS SUCCEEDS
> df.collect().foreach(println)  // THIS SUCCEEDS
> testTransformerOnStreamData[(Int, Array[Vector])](df) { rows =>  // THIS 
> FAILS
>   rows.foreach {
> case Row(a: Int, b: Array[_]) =>
>   }
> }
>   }
>   def testTransformerOnStreamData[A : Encoder](
>   dataframe: DataFrame)
> (globalCheckFunction: Seq[Row] => Unit): Unit = {
> val stream = MemoryStream[A]
> val streamDF = stream.toDS().toDF("a", "b")
> val data = dataframe.as[A].collect()
> val streamOutput = streamDF
>   .select("a", "b")
> testStream(streamOutput) (
>   AddData(stream, data: _*),
>   CheckAnswer(globalCheckFunction)
> )
>   }
> }
> {code}
> The streaming test fails with stack trace:
> {code}
> [info] - a test *** FAILED *** (2 seconds, 325 milliseconds)
> [info]   scala.MatchError: [1,WrappedArray([1.0,2.0])] (of class 
> org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema)
> [info]   
> [info]   == Progress ==
> [info]  AddData to MemoryStream[_1#24,_2#25]: 
> (1,[Lorg.apache.spark.ml.linalg.Vector;@5abf84a9),(2,[Lorg.apache.spark.ml.linalg.Vector;@4b4198ba)
> [info]   => CheckAnswerByFunc
> [info]   
> [info]   == Stream ==
> [info]   Output Mode: Append
> [info]   Stream state: {MemoryStream[_1#24,_2#25]: 0}
> [info]   Thread state: alive
> [info]   Thread stack trace: java.lang.Thread.sleep(Native Method)
> [info]   
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:163)
> [info]   
> org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
> [info]   
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:131)
> [info]   
> org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:279)
> [info]   
> org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189)
> [info]   
> [info]   
> [info]   == Sink ==
> [info]   0: [1,WrappedArray([1.0,2.0])] [2,WrappedArray([1.1,2.1])]
> [info]   
> [info]   
> [info]   == Plan ==
> [info]   == Parsed Logical Plan ==
> [info]   Project [a#27, b#28]
> [info]   +- Project [_1#24 AS a#27, _2#25 AS b#28]
> [info]  +- Project [_1#36 AS _1#24, _2#37 AS _2#25]
> [info] +- Streaming RelationV2 MemoryStreamDataSource[_1#36, _2#37]
> [info]   
> [info]   == Analyzed Logical Plan ==
> [info]   a: int, b: array
> [info]   Project [a#27, b#28]
> [info]   +- Project [_1#24 AS a#27, _2#25 AS b#28]
> [info]  +- Project [_1#36 AS _1#24, _2#37 AS _2#25]
> [info] +- Streaming RelationV2 MemoryStreamDataSource[_1#36, _2#37]
> [info]   
> [info]   == Optimized Logical Plan ==
> [info]   Project [_1#36 AS a#27, _2#37 AS b#28]
> [info]   +- Streaming RelationV2 MemoryStreamDataSource[_1#36, _2#37]
> [info]   
> [info]   == Physical Plan ==
> [info]   *(1) Project [_1#36 AS a#27, _2#37 AS b#28]
> [info]   +- *(1) ScanV2 MemoryStreamDataSource[_1#36, _2#37] 
> 

[jira] [Commented] (SPARK-23848) Structured Streaming fails with nested UDTs

2018-04-02 Thread Shixiong Zhu (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23848?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16423071#comment-16423071
 ] 

Shixiong Zhu commented on SPARK-23848:
--

To fix your codes, you can just change "case Row(a: Int, b: Array[_]) =>" to 
"case Row(a: Int, b: Seq[_]) =>".

> Structured Streaming fails with nested UDTs
> ---
>
> Key: SPARK-23848
> URL: https://issues.apache.org/jira/browse/SPARK-23848
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Joseph K. Bradley
>Priority: Major
>
> While trying to write a test for org.apache.spark.ml.feature.MinHashLSHModel 
> with Structured Streaming (for prediction in a streaming job), I ran into a 
> bug which seems to indicate that nested UDTs don't work with streaming.
> Here's a simplified version of the code:
> {code}
> package org.apache.spark.ml.feature
> import org.apache.spark.ml.linalg.{Vector, Vectors}
> import org.apache.spark.sql.{DataFrame, Dataset, Encoder, Row}
> import org.apache.spark.sql.execution.streaming.MemoryStream
> import org.apache.spark.sql.streaming.StreamTest
> class MinHashLSHSuite extends StreamTest {
>   @transient var dataset: Dataset[_] = _
>   override def beforeAll(): Unit = {
> super.beforeAll()
> val data = {
>   for (i <- 0 to 95) yield Vectors.sparse(100, (i until i + 5).map((_, 
> 1.0)))
> }
> dataset = spark.createDataFrame(data.map(Tuple1.apply)).toDF("keys")
>   }
>   test("a test") {
> val localSpark = spark
> import localSpark.implicits._
> val df = Seq[(Int, Array[Vector])](
>   (1, Array(Vectors.dense(1.0, 2.0))),
>   (2, Array(Vectors.dense(1.1, 2.1)))
> ).toDF("a", "b")
> df.show()  // THIS SUCCEEDS
> df.collect().foreach(println)  // THIS SUCCEEDS
> testTransformerOnStreamData[(Int, Array[Vector])](df) { rows =>  // THIS 
> FAILS
>   rows.foreach {
> case Row(a: Int, b: Array[_]) =>
>   }
> }
>   }
>   def testTransformerOnStreamData[A : Encoder](
>   dataframe: DataFrame)
> (globalCheckFunction: Seq[Row] => Unit): Unit = {
> val stream = MemoryStream[A]
> val streamDF = stream.toDS().toDF("a", "b")
> val data = dataframe.as[A].collect()
> val streamOutput = streamDF
>   .select("a", "b")
> testStream(streamOutput) (
>   AddData(stream, data: _*),
>   CheckAnswer(globalCheckFunction)
> )
>   }
> }
> {code}
> The streaming test fails with stack trace:
> {code}
> [info] - a test *** FAILED *** (2 seconds, 325 milliseconds)
> [info]   scala.MatchError: [1,WrappedArray([1.0,2.0])] (of class 
> org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema)
> [info]   
> [info]   == Progress ==
> [info]  AddData to MemoryStream[_1#24,_2#25]: 
> (1,[Lorg.apache.spark.ml.linalg.Vector;@5abf84a9),(2,[Lorg.apache.spark.ml.linalg.Vector;@4b4198ba)
> [info]   => CheckAnswerByFunc
> [info]   
> [info]   == Stream ==
> [info]   Output Mode: Append
> [info]   Stream state: {MemoryStream[_1#24,_2#25]: 0}
> [info]   Thread state: alive
> [info]   Thread stack trace: java.lang.Thread.sleep(Native Method)
> [info]   
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:163)
> [info]   
> org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
> [info]   
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:131)
> [info]   
> org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:279)
> [info]   
> org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189)
> [info]   
> [info]   
> [info]   == Sink ==
> [info]   0: [1,WrappedArray([1.0,2.0])] [2,WrappedArray([1.1,2.1])]
> [info]   
> [info]   
> [info]   == Plan ==
> [info]   == Parsed Logical Plan ==
> [info]   Project [a#27, b#28]
> [info]   +- Project [_1#24 AS a#27, _2#25 AS b#28]
> [info]  +- Project [_1#36 AS _1#24, _2#37 AS _2#25]
> [info] +- Streaming RelationV2 MemoryStreamDataSource[_1#36, _2#37]
> [info]   
> [info]   == Analyzed Logical Plan ==
> [info]   a: int, b: array
> [info]   Project [a#27, b#28]
> [info]   +- Project [_1#24 AS a#27, _2#25 AS b#28]
> [info]  +- Project [_1#36 AS _1#24, _2#37 AS _2#25]
> [info] +- Streaming RelationV2 MemoryStreamDataSource[_1#36, _2#37]
> [info]   
> [info]   == Optimized Logical Plan ==
> [info]   Project [_1#36 AS a#27, _2#37 AS b#28]
> [info]   +- Streaming RelationV2 MemoryStreamDataSource[_1#36, _2#37]
> [info]   
> [info]   == Physical Plan ==
> [info]   *(1) Project [_1#36 AS a#27, _2#37 AS b#28]
> [info]   +- *(1) ScanV2 

[jira] [Commented] (SPARK-23848) Structured Streaming fails with nested UDTs

2018-04-02 Thread Shixiong Zhu (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-23848?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16423070#comment-16423070
 ] 

Shixiong Zhu commented on SPARK-23848:
--

[~josephkb] this is unfortunately because DataFrame is not a type safe API. It 
always converts an array to Seq (WrappedArray as reported in the error 
message). This is the converter if you are curious: 
https://github.com/apache/spark/blob/7fdacbc77bbcf98c2c045a1873e749129769dcc0/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala#L176

> Structured Streaming fails with nested UDTs
> ---
>
> Key: SPARK-23848
> URL: https://issues.apache.org/jira/browse/SPARK-23848
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 2.3.0
>Reporter: Joseph K. Bradley
>Priority: Major
>
> While trying to write a test for org.apache.spark.ml.feature.MinHashLSHModel 
> with Structured Streaming (for prediction in a streaming job), I ran into a 
> bug which seems to indicate that nested UDTs don't work with streaming.
> Here's a simplified version of the code:
> {code}
> package org.apache.spark.ml.feature
> import org.apache.spark.ml.linalg.{Vector, Vectors}
> import org.apache.spark.sql.{DataFrame, Dataset, Encoder, Row}
> import org.apache.spark.sql.execution.streaming.MemoryStream
> import org.apache.spark.sql.streaming.StreamTest
> class MinHashLSHSuite extends StreamTest {
>   @transient var dataset: Dataset[_] = _
>   override def beforeAll(): Unit = {
> super.beforeAll()
> val data = {
>   for (i <- 0 to 95) yield Vectors.sparse(100, (i until i + 5).map((_, 
> 1.0)))
> }
> dataset = spark.createDataFrame(data.map(Tuple1.apply)).toDF("keys")
>   }
>   test("a test") {
> val localSpark = spark
> import localSpark.implicits._
> val df = Seq[(Int, Array[Vector])](
>   (1, Array(Vectors.dense(1.0, 2.0))),
>   (2, Array(Vectors.dense(1.1, 2.1)))
> ).toDF("a", "b")
> df.show()  // THIS SUCCEEDS
> df.collect().foreach(println)  // THIS SUCCEEDS
> testTransformerOnStreamData[(Int, Array[Vector])](df) { rows =>  // THIS 
> FAILS
>   rows.foreach {
> case Row(a: Int, b: Array[_]) =>
>   }
> }
>   }
>   def testTransformerOnStreamData[A : Encoder](
>   dataframe: DataFrame)
> (globalCheckFunction: Seq[Row] => Unit): Unit = {
> val stream = MemoryStream[A]
> val streamDF = stream.toDS().toDF("a", "b")
> val data = dataframe.as[A].collect()
> val streamOutput = streamDF
>   .select("a", "b")
> testStream(streamOutput) (
>   AddData(stream, data: _*),
>   CheckAnswer(globalCheckFunction)
> )
>   }
> }
> {code}
> The streaming test fails with stack trace:
> {code}
> [info] - a test *** FAILED *** (2 seconds, 325 milliseconds)
> [info]   scala.MatchError: [1,WrappedArray([1.0,2.0])] (of class 
> org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema)
> [info]   
> [info]   == Progress ==
> [info]  AddData to MemoryStream[_1#24,_2#25]: 
> (1,[Lorg.apache.spark.ml.linalg.Vector;@5abf84a9),(2,[Lorg.apache.spark.ml.linalg.Vector;@4b4198ba)
> [info]   => CheckAnswerByFunc
> [info]   
> [info]   == Stream ==
> [info]   Output Mode: Append
> [info]   Stream state: {MemoryStream[_1#24,_2#25]: 0}
> [info]   Thread state: alive
> [info]   Thread stack trace: java.lang.Thread.sleep(Native Method)
> [info]   
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:163)
> [info]   
> org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
> [info]   
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:131)
> [info]   
> org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:279)
> [info]   
> org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189)
> [info]   
> [info]   
> [info]   == Sink ==
> [info]   0: [1,WrappedArray([1.0,2.0])] [2,WrappedArray([1.1,2.1])]
> [info]   
> [info]   
> [info]   == Plan ==
> [info]   == Parsed Logical Plan ==
> [info]   Project [a#27, b#28]
> [info]   +- Project [_1#24 AS a#27, _2#25 AS b#28]
> [info]  +- Project [_1#36 AS _1#24, _2#37 AS _2#25]
> [info] +- Streaming RelationV2 MemoryStreamDataSource[_1#36, _2#37]
> [info]   
> [info]   == Analyzed Logical Plan ==
> [info]   a: int, b: array
> [info]   Project [a#27, b#28]
> [info]   +- Project [_1#24 AS a#27, _2#25 AS b#28]
> [info]  +- Project [_1#36 AS _1#24, _2#37 AS _2#25]
> [info] +- Streaming RelationV2 MemoryStreamDataSource[_1#36, _2#37]
> [info]   
> [info]   == Optimized Logical Plan