[ https://issues.apache.org/jira/browse/SPARK-27330?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Apache Spark reassigned SPARK-27330: ------------------------------------ Assignee: (was: Apache Spark) > ForeachWriter is not being closed once a batch is aborted > --------------------------------------------------------- > > Key: SPARK-27330 > URL: https://issues.apache.org/jira/browse/SPARK-27330 > Project: Spark > Issue Type: Bug > Components: Structured Streaming > Affects Versions: 2.4.0 > Reporter: Eyal Zituny > Priority: Major > > in cases where a micro batch is being killed (interrupted), not during actual > processing done by the {{ForeachDataWriter}} (when iterating the iterator), > {{DataWritingSparkTask}} will handle the interruption and call > {{dataWriter.abort()}} > the problem is that {{ForeachDataWriter}} has an empty implementation for the > abort method. > due to that, I have tasks which uses the foreach writer and according to the > [documentation|https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#foreach] > they are opening connections in the "open" method and closing the > connections on the "close" method but since the "close" is never called, the > connections are never closed > this wasn't the behavior pre spark 2.4 > my suggestion is to call {{ForeachWriter.abort()}} when > {{DataWriter.abort()}} is called, in order to notify the foreach writer that > this task has failed > > {code:java} > stack trace from the exception i have encountered: > org.apache.spark.TaskKilledException: null > at > org.apache.spark.TaskContextImpl.killTaskIfInterrupted(TaskContextImpl.scala:149) > at > org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:36) > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) > at > org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$$anonfun$run$3.apply(WriteToDataSourceV2Exec.scala:117) > at > org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$$anonfun$run$3.apply(WriteToDataSourceV2Exec.scala:116) > at > org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1394) > at > org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:146) > at > org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec$$anonfun$doExecute$2.apply(WriteToDataSourceV2Exec.scala:67) > at > org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec$$anonfun$doExecute$2.apply(WriteToDataSourceV2Exec.scala:66) > {code} > -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org