[ https://issues.apache.org/jira/browse/SPARK-43796?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Herman van Hövell resolved SPARK-43796. --------------------------------------- Fix Version/s: 3.5.0 Resolution: Fixed > Streaming ForeachWriter can't accept custom user defined class > -------------------------------------------------------------- > > Key: SPARK-43796 > URL: https://issues.apache.org/jira/browse/SPARK-43796 > Project: Spark > Issue Type: Bug > Components: Connect, Structured Streaming > Affects Versions: 3.5.0 > Reporter: Wei Liu > Assignee: Herman van Hövell > Priority: Major > Fix For: 3.5.0 > > > [https://github.com/apache/spark/pull/41129] > The last example in the PR description doesn't work with current REPL > implementation. > Code: > > {code:java} > import org.apache.spark.sql.{ForeachWriter, Row} > import java.io._ > val filePath = "/home/wei.liu/test_foreach/output-custom" > case class MyTestClass(value: Int) { > override def toString: String = value.toString > } > val writer = new ForeachWriter[MyTestClass] { > var fileWriter: FileWriter = _ > def open(partitionId: Long, version: Long): Boolean = { > fileWriter = new FileWriter(filePath, true) > true > } > def process(row: MyTestClass): Unit = { > fileWriter.write(row.toString) > fileWriter.write("\n") > } > def close(errorOrNull: Throwable): Unit = { > fileWriter.close() > } > } > val df = spark.readStream .format("rate") .option("rowsPerSecond", "10") > .load() > val query = df .selectExpr("CAST(value AS INT)") .as[MyTestClass] > .writeStream .foreach(writer) .outputMode("update") .start() > {code} > Error: > {code:java} > 23/05/24 19:17:31 ERROR Utils: Aborting task > java.lang.NoClassDefFoundError: Could not initialize class > ammonite.$sess.cmd4$ > at java.lang.Class.forName0(Native Method) > at java.lang.Class.forName(Class.java:348) > at > org.apache.spark.util.SparkClassUtils.classForName(SparkClassUtils.scala:35) > at > org.apache.spark.util.SparkClassUtils.classForName$(SparkClassUtils.scala:30) > at org.apache.spark.util.Utils$.classForName(Utils.scala:94) > at > org.apache.spark.sql.catalyst.encoders.OuterScopes$.$anonfun$getOuterScope$1(OuterScopes.scala:59) > at > org.apache.spark.sql.catalyst.expressions.objects.NewInstance.$anonfun$doGenCode$1(objects.scala:598) > at scala.Option.map(Option.scala:230) > at > org.apache.spark.sql.catalyst.expressions.objects.NewInstance.doGenCode(objects.scala:598) > at > org.apache.spark.sql.catalyst.expressions.Expression.$anonfun$genCode$3(Expression.scala:201) > at scala.Option.getOrElse(Option.scala:189) > at > org.apache.spark.sql.catalyst.expressions.Expression.genCode(Expression.scala:196) > at > org.apache.spark.sql.catalyst.expressions.codegen.GenerateSafeProjection$.$anonfun$create$1(GenerateSafeProjection.scala:156) > at scala.collection.immutable.List.map(List.scala:293) > at > org.apache.spark.sql.catalyst.expressions.codegen.GenerateSafeProjection$.create(GenerateSafeProjection.scala:153) > at > org.apache.spark.sql.catalyst.expressions.codegen.GenerateSafeProjection$.create(GenerateSafeProjection.scala:39) > at > org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator.generate(CodeGenerator.scala:1369) > at > org.apache.spark.sql.catalyst.expressions.SafeProjection$.createCodeGeneratedObject(Projection.scala:171) > at > org.apache.spark.sql.catalyst.expressions.SafeProjection$.createCodeGeneratedObject(Projection.scala:168) > at > org.apache.spark.sql.catalyst.expressions.CodeGeneratorWithInterpretedFallback.createObject(CodeGeneratorWithInterpretedFallback.scala:51) > at > org.apache.spark.sql.catalyst.expressions.SafeProjection$.create(Projection.scala:194) > at > org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$Deserializer.apply(ExpressionEncoder.scala:173) > at > org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$Deserializer.apply(ExpressionEncoder.scala:166) > at > org.apache.spark.sql.execution.streaming.sources.ForeachDataWriter.write(ForeachWriterTable.scala:147) > at > org.apache.spark.sql.execution.streaming.sources.ForeachDataWriter.write(ForeachWriterTable.scala:132) > at > org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.write(WriteToDataSourceV2Exec.scala:493) > at > org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.$anonfun$run$1(WriteToDataSourceV2Exec.scala:448) > at > org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1521) > at > org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.run(WriteToDataSourceV2Exec.scala:486) > at > org.apache.spark.sql.execution.datasources.v2.WritingSparkTask.run$(WriteToDataSourceV2Exec.scala:425) > at > org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:491) > at > org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.$anonfun$writeWithV2$2(WriteToDataSourceV2Exec.scala:388) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:92) > at > org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161) > at org.apache.spark.scheduler.Task.run(Task.scala:139) > at > org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554) > at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1487) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) {code} > This issue is similar to SPARK-43198 > -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org