[ 
https://issues.apache.org/jira/browse/SPARK-35915?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17371210#comment-17371210
 ] 

Jungtaek Lim commented on SPARK-35915:
--------------------------------------

releaseConsumer just returns the Kafka consumer to the pool. We don't destroy 
the consumer instance there.

The possible case I can imagine is that InternalKafkaConsumer.close() is called 
(either automatically as it implements Closeable, or manually) from somewhere 
outside of pool. It's only expected to be called in destroying object in the 
pool.

> Kafka doesn't recover from data loss
> ------------------------------------
>
>                 Key: SPARK-35915
>                 URL: https://issues.apache.org/jira/browse/SPARK-35915
>             Project: Spark
>          Issue Type: Bug
>          Components: Structured Streaming
>    Affects Versions: 3.1.1
>            Reporter: Yuval Yellin
>            Priority: Major
>
> I configured a strcutured streaming source for kafka with 
> failOnDataLoss=false, 
> Getting this error when checkopint offsets are not found :
>  
> {code:java}
> Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: 
> Task 7 in stage 5.0 failed 1 times, most recent failure: Lost task 7.0 in 
> stage 5.0 (TID 113) ( executor driver): java.lang.IllegalStateException: This 
> consumer has already been closed.
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.acquireAndEnsureOpen(KafkaConsumer.java:2439)
>   at 
> org.apache.kafka.clients.consumer.KafkaConsumer.seekToBeginning(KafkaConsumer.java:1656)
>   at 
> org.apache.spark.sql.kafka010.consumer.InternalKafkaConsumer.getAvailableOffsetRange(KafkaDataConsumer.scala:108)
>   at 
> org.apache.spark.sql.kafka010.consumer.KafkaDataConsumer.getEarliestAvailableOffsetBetween(KafkaDataConsumer.scala:385)
>   at 
> org.apache.spark.sql.kafka010.consumer.KafkaDataConsumer.$anonfun$get$1(KafkaDataConsumer.scala:332)
>   at 
> org.apache.spark.util.UninterruptibleThread.runUninterruptibly(UninterruptibleThread.scala:77)
>   at 
> org.apache.spark.sql.kafka010.consumer.KafkaDataConsumer.runUninterruptiblyIfPossible(KafkaDataConsumer.scala:604)
>   at 
> org.apache.spark.sql.kafka010.consumer.KafkaDataConsumer.get(KafkaDataConsumer.scala:287)
>   at 
> org.apache.spark.sql.kafka010.KafkaBatchPartitionReader.next(KafkaBatchPartitionReader.scala:63)
>   at 
> org.apache.spark.sql.execution.datasources.v2.PartitionIterator.hasNext(DataSourceRDD.scala:79)
>   at 
> org.apache.spark.sql.execution.datasources.v2.MetricsIterator.hasNext(DataSourceRDD.scala:112)
>   at 
> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
> {code}
>  
> The issue seems to me to be related to the OffsetOutOfRange exception in 
> (line (323 in KafkaDataConsumer): 
>  
> {code:java}
>  case e: OffsetOutOfRangeException =>
>     // When there is some error thrown, it's better to use a new consumer to 
> drop all cached
>     // states in the old consumer. We don't need to worry about the 
> performance because this
>     // is not a common path.
>     releaseConsumer()
>     fetchedData.reset()
>     reportDataLoss(topicPartition, groupId, failOnDataLoss,
>       s"Cannot fetch offset $toFetchOffset", e)
>     toFetchOffset = getEarliestAvailableOffsetBetween(consumer, 
> toFetchOffset, untilOffset)
> }
> {code}
> seems like releaseConsumer will destoy the consumer , which later is used ...
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to