[ 
https://issues.apache.org/jira/browse/SPARK-43796?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Herman van Hövell resolved SPARK-43796.
---------------------------------------
    Fix Version/s: 3.5.0
       Resolution: Fixed

> Streaming ForeachWriter can't accept custom user defined class
> --------------------------------------------------------------
>
>                 Key: SPARK-43796
>                 URL: https://issues.apache.org/jira/browse/SPARK-43796
>             Project: Spark
>          Issue Type: Bug
>          Components: Connect, Structured Streaming
>    Affects Versions: 3.5.0
>            Reporter: Wei Liu
>            Assignee: Herman van Hövell
>            Priority: Major
>             Fix For: 3.5.0
>
>
> [https://github.com/apache/spark/pull/41129]
> The last example in the PR description doesn't work with current REPL 
> implementation. 
> Code:
>  
> {code:java}
> import org.apache.spark.sql.{ForeachWriter, Row} 
> import java.io._ 
> val filePath = "/home/wei.liu/test_foreach/output-custom" 
> case class MyTestClass(value: Int) {
>       override def toString: String = value.toString
> }
> val writer = new ForeachWriter[MyTestClass] {
>     var fileWriter: FileWriter = _
>     def open(partitionId: Long, version: Long): Boolean = {
>       fileWriter = new FileWriter(filePath, true)
>       true
>     }
>     def process(row: MyTestClass): Unit = {
>       fileWriter.write(row.toString)
>       fileWriter.write("\n")
>     }
>     def close(errorOrNull: Throwable): Unit = {
>       fileWriter.close()
>     }
> }
> val df = spark.readStream .format("rate") .option("rowsPerSecond", "10") 
> .load()
> val query = df .selectExpr("CAST(value AS INT)") .as[MyTestClass] 
> .writeStream .foreach(writer) .outputMode("update") .start()
> {code}
> Error:
> {code:java}
> 23/05/24 19:17:31 ERROR Utils: Aborting task
> java.lang.NoClassDefFoundError: Could not initialize class 
> ammonite.$sess.cmd4$
>       at java.lang.Class.forName0(Native Method)
>       at java.lang.Class.forName(Class.java:348)
>       at 
> org.apache.spark.util.SparkClassUtils.classForName(SparkClassUtils.scala:35)
>       at 
> org.apache.spark.util.SparkClassUtils.classForName$(SparkClassUtils.scala:30)
>       at org.apache.spark.util.Utils$.classForName(Utils.scala:94)
>       at 
> org.apache.spark.sql.catalyst.encoders.OuterScopes$.$anonfun$getOuterScope$1(OuterScopes.scala:59)
>       at 
> org.apache.spark.sql.catalyst.expressions.objects.NewInstance.$anonfun$doGenCode$1(objects.scala:598)
>       at scala.Option.map(Option.scala:230)
>       at 
> org.apache.spark.sql.catalyst.expressions.objects.NewInstance.doGenCode(objects.scala:598)
>       at 
> org.apache.spark.sql.catalyst.expressions.Expression.$anonfun$genCode$3(Expression.scala:201)
>       at scala.Option.getOrElse(Option.scala:189)
>       at 
> org.apache.spark.sql.catalyst.expressions.Expression.genCode(Expression.scala:196)
>       at 
> org.apache.spark.sql.catalyst.expressions.codegen.GenerateSafeProjection$.$anonfun$create$1(GenerateSafeProjection.scala:156)
>       at scala.collection.immutable.List.map(List.scala:293)
>       at 
> org.apache.spark.sql.catalyst.expressions.codegen.GenerateSafeProjection$.create(GenerateSafeProjection.scala:153)
>       at 
> org.apache.spark.sql.catalyst.expressions.codegen.GenerateSafeProjection$.create(GenerateSafeProjection.scala:39)
>       at 
> org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator.generate(CodeGenerator.scala:1369)
>       at 
> org.apache.spark.sql.catalyst.expressions.SafeProjection$.createCodeGeneratedObject(Projection.scala:171)
>       at 
> org.apache.spark.sql.catalyst.expressions.SafeProjection$.createCodeGeneratedObject(Projection.scala:168)
>       at 
> org.apache.spark.sql.catalyst.expressions.CodeGeneratorWithInterpretedFallback.createObject(CodeGeneratorWithInterpretedFallback.scala:51)
>       at 
> org.apache.spark.sql.catalyst.expressions.SafeProjection$.create(Projection.scala:194)
>       at 
> org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$Deserializer.apply(ExpressionEncoder.scala:173)
>       at 
> org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$Deserializer.apply(ExpressionEncoder.scala:166)
>       at 
> org.apache.spark.sql.execution.streaming.sources.ForeachDataWriter.write(ForeachWriterTable.scala:147)
>       at 
> org.apache.spark.sql.execution.streaming.sources.ForeachDataWriter.write(ForeachWriterTable.scala:132)
>       at 
> org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.write(WriteToDataSourceV2Exec.scala:493)
>       at 
> org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.$anonfun$run$1(WriteToDataSourceV2Exec.scala:448)
>       at 
> org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1521)
>       at 
> org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.run(WriteToDataSourceV2Exec.scala:486)
>       at 
> org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.run$(WriteToDataSourceV2Exec.scala:425)
>       at 
> org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:491)
>       at 
> org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.$anonfun$writeWithV2$2(WriteToDataSourceV2Exec.scala:388)
>       at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:92)
>       at 
> org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
>       at org.apache.spark.scheduler.Task.run(Task.scala:139)
>       at 
> org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
>       at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1487)
>       at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>       at java.lang.Thread.run(Thread.java:748) {code}
> This issue is similar to SPARK-43198
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to