[ https://issues.apache.org/jira/browse/SPARK-50661?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
ASF GitHub Bot updated SPARK-50661: ----------------------------------- Labels: pull-request-available (was: ) > Scala Streaming foreachBatch doesn't work for input type Dataset[T] due to > missing encoder for T. > ------------------------------------------------------------------------------------------------- > > Key: SPARK-50661 > URL: https://issues.apache.org/jira/browse/SPARK-50661 > Project: Spark > Issue Type: Bug > Components: Connect, Structured Streaming > Affects Versions: 4.0.0, 3.5.4 > Reporter: Haiyang Sun > Priority: Major > Labels: pull-request-available > > Currently, spark connect implementation of Scala streaming does not take the > encoder for T when the input is Dataset[T]. This will lead to a runtime > exception when the function passed is `(Dataset[T], Long) => Unit`, while it > is executed as `(Dataframe, Long) => Unit`. > > We can reproduce the failure with the following test: > > ``` > import org.apache.spark.sql._ > import org.apache.spark.sql.streaming._ > val inputPath = "/tmp/input" > case class C(id: Int, name: String, age: Int) > val data = Seq((1, "Alice", 29), (2, "Bob", 35), (3, "Cathy", 26), (4, > "David", 40), (5, "Eve", 30)).toDF("id", "name", "age") > data.write.format("csv").mode("overwrite").save(inputPath) > val df = spark.readStream.format("csv").schema("id INT, name String, age > INT").load("/tmp/input") > val query = df.writeStream.foreachBatch \{ (batchDF: DataFrame, batchId: > Long) => { print(batchDF.collectAsList()) } > }.trigger(Trigger.ProcessingTime("1 seconds")).start() > Thread.sleep(2000) > query.stop() > ``` > > The fix should address this by passing the encoder for T together with the > function definition (as other Scala UDFs), and then restore the encoder in > SparkConnectPlanner. -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org