[ 
https://issues.apache.org/jira/browse/SPARK-27330?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Eyal Zituny updated SPARK-27330:
--------------------------------
    Description: 
in cases where a micro batch is being killed (interrupted), not during actual 
processing done by the {{ForeachDataWriter}} (when iterating the iterator), 
{{DataWritingSparkTask}} will handle the interruption and call 
{{dataWriter.abort()}}

the problem is that {{ForeachDataWriter}} has an empty implementation for the 
abort method.

due to that, I have tasks which uses the foreach writer and according to the 
[documentation|https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#foreach]
 they are opening connections in the "open" method and closing the connections 
on the "close" method but since the "close" is never called, the connections 
are never closed

this wasn't the behavior pre spark 2.4

my suggestion is to call {{ForeachWriter.abort()}} when {{DataWriter.abort()}} 
is called,  in order to notify the foreach writer that this task has failed

 
{code:java}
stack trace from the exception i have encountered:
 org.apache.spark.TaskKilledException: null
 at 
org.apache.spark.TaskContextImpl.killTaskIfInterrupted(TaskContextImpl.scala:149)
 at 
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:36)
 at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
 at 
org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$$anonfun$run$3.apply(WriteToDataSourceV2Exec.scala:117)
 at 
org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$$anonfun$run$3.apply(WriteToDataSourceV2Exec.scala:116)
 at 
org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1394)
 at 
org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:146)
 at 
org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec$$anonfun$doExecute$2.apply(WriteToDataSourceV2Exec.scala:67)
 at 
org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec$$anonfun$doExecute$2.apply(WriteToDataSourceV2Exec.scala:66)
{code}
 

  was:
in cases where a micro batch is being killed (interrupted), not during actual 
processing done by the {{ForeachDataWriter}} (when iterating the iterator), 
{{DataWritingSparkTask}} will handle the interruption and call 
{{dataWriter.abort()}}

the problem is that {{ForeachDataWriter}} has an empty implementation for the 
abort method.

due to that, I have tasks which uses the foreach writer and according to the 
[documentation|https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#foreach]
 they are opening connections in the "open" method and closing the connections 
on the "close" method but since the "close" is never called, the connections 
are never closed

this wasn't the behavior pre spark 2.4

my suggestion is to call {{ForeachWriter.close()}} when {{DataWriter.abort()}} 
is called, and exception should also be provided in order to notify the foreach 
writer that this task has failed

 
{code}
stack trace from the exception i have encountered:
 org.apache.spark.TaskKilledException: null
 at 
org.apache.spark.TaskContextImpl.killTaskIfInterrupted(TaskContextImpl.scala:149)
 at 
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:36)
 at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
 at 
org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$$anonfun$run$3.apply(WriteToDataSourceV2Exec.scala:117)
 at 
org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$$anonfun$run$3.apply(WriteToDataSourceV2Exec.scala:116)
 at 
org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1394)
 at 
org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:146)
 at 
org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec$$anonfun$doExecute$2.apply(WriteToDataSourceV2Exec.scala:67)
 at 
org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec$$anonfun$doExecute$2.apply(WriteToDataSourceV2Exec.scala:66)
{code}

 


> ForeachWriter is not being closed once a batch is aborted
> ---------------------------------------------------------
>
>                 Key: SPARK-27330
>                 URL: https://issues.apache.org/jira/browse/SPARK-27330
>             Project: Spark
>          Issue Type: Bug
>          Components: Structured Streaming
>    Affects Versions: 2.4.0
>            Reporter: Eyal Zituny
>            Priority: Major
>
> in cases where a micro batch is being killed (interrupted), not during actual 
> processing done by the {{ForeachDataWriter}} (when iterating the iterator), 
> {{DataWritingSparkTask}} will handle the interruption and call 
> {{dataWriter.abort()}}
> the problem is that {{ForeachDataWriter}} has an empty implementation for the 
> abort method.
> due to that, I have tasks which uses the foreach writer and according to the 
> [documentation|https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#foreach]
>  they are opening connections in the "open" method and closing the 
> connections on the "close" method but since the "close" is never called, the 
> connections are never closed
> this wasn't the behavior pre spark 2.4
> my suggestion is to call {{ForeachWriter.abort()}} when 
> {{DataWriter.abort()}} is called,  in order to notify the foreach writer that 
> this task has failed
>  
> {code:java}
> stack trace from the exception i have encountered:
>  org.apache.spark.TaskKilledException: null
>  at 
> org.apache.spark.TaskContextImpl.killTaskIfInterrupted(TaskContextImpl.scala:149)
>  at 
> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:36)
>  at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
>  at 
> org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$$anonfun$run$3.apply(WriteToDataSourceV2Exec.scala:117)
>  at 
> org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$$anonfun$run$3.apply(WriteToDataSourceV2Exec.scala:116)
>  at 
> org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1394)
>  at 
> org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:146)
>  at 
> org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec$$anonfun$doExecute$2.apply(WriteToDataSourceV2Exec.scala:67)
>  at 
> org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec$$anonfun$doExecute$2.apply(WriteToDataSourceV2Exec.scala:66)
> {code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to