[ 
https://issues.apache.org/jira/browse/SPARK-50661?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated SPARK-50661:
-----------------------------------
    Labels: pull-request-available  (was: )

> Scala Streaming foreachBatch doesn't work for input type Dataset[T] due to 
> missing encoder for T.
> -------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-50661
>                 URL: https://issues.apache.org/jira/browse/SPARK-50661
>             Project: Spark
>          Issue Type: Bug
>          Components: Connect, Structured Streaming
>    Affects Versions: 4.0.0, 3.5.4
>            Reporter: Haiyang Sun
>            Priority: Major
>              Labels: pull-request-available
>
> Currently, spark connect implementation of Scala streaming does not take the 
> encoder for T when the input is Dataset[T]. This will lead to a runtime 
> exception when the function passed is `(Dataset[T], Long) => Unit`, while it 
> is executed as `(Dataframe, Long) => Unit`.
>  
> We can reproduce the failure with the following test:
>  
> ```
> import org.apache.spark.sql._
> import org.apache.spark.sql.streaming._
> val inputPath = "/tmp/input"
> case class C(id: Int, name: String, age: Int)
> val data = Seq((1, "Alice", 29), (2, "Bob", 35), (3, "Cathy", 26), (4, 
> "David", 40), (5, "Eve", 30)).toDF("id", "name", "age")
> data.write.format("csv").mode("overwrite").save(inputPath)
> val df = spark.readStream.format("csv").schema("id INT, name String, age 
> INT").load("/tmp/input")
> val query = df.writeStream.foreachBatch \{ (batchDF: DataFrame, batchId: 
> Long) => { print(batchDF.collectAsList()) } 
> }.trigger(Trigger.ProcessingTime("1 seconds")).start()
> Thread.sleep(2000)
> query.stop()
> ```
>  
> The fix should address this by passing the encoder for T together with the 
> function definition (as other Scala UDFs), and then restore the encoder in 
> SparkConnectPlanner.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to