[ https://issues.apache.org/jira/browse/SPARK-35915?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17371210#comment-17371210 ]
Jungtaek Lim commented on SPARK-35915: -------------------------------------- releaseConsumer just returns the Kafka consumer to the pool. We don't destroy the consumer instance there. The possible case I can imagine is that InternalKafkaConsumer.close() is called (either automatically as it implements Closeable, or manually) from somewhere outside of pool. It's only expected to be called in destroying object in the pool. > Kafka doesn't recover from data loss > ------------------------------------ > > Key: SPARK-35915 > URL: https://issues.apache.org/jira/browse/SPARK-35915 > Project: Spark > Issue Type: Bug > Components: Structured Streaming > Affects Versions: 3.1.1 > Reporter: Yuval Yellin > Priority: Major > > I configured a strcutured streaming source for kafka with > failOnDataLoss=false, > Getting this error when checkopint offsets are not found : > > {code:java} > Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: > Task 7 in stage 5.0 failed 1 times, most recent failure: Lost task 7.0 in > stage 5.0 (TID 113) ( executor driver): java.lang.IllegalStateException: This > consumer has already been closed. > at > org.apache.kafka.clients.consumer.KafkaConsumer.acquireAndEnsureOpen(KafkaConsumer.java:2439) > at > org.apache.kafka.clients.consumer.KafkaConsumer.seekToBeginning(KafkaConsumer.java:1656) > at > org.apache.spark.sql.kafka010.consumer.InternalKafkaConsumer.getAvailableOffsetRange(KafkaDataConsumer.scala:108) > at > org.apache.spark.sql.kafka010.consumer.KafkaDataConsumer.getEarliestAvailableOffsetBetween(KafkaDataConsumer.scala:385) > at > org.apache.spark.sql.kafka010.consumer.KafkaDataConsumer.$anonfun$get$1(KafkaDataConsumer.scala:332) > at > org.apache.spark.util.UninterruptibleThread.runUninterruptibly(UninterruptibleThread.scala:77) > at > org.apache.spark.sql.kafka010.consumer.KafkaDataConsumer.runUninterruptiblyIfPossible(KafkaDataConsumer.scala:604) > at > org.apache.spark.sql.kafka010.consumer.KafkaDataConsumer.get(KafkaDataConsumer.scala:287) > at > org.apache.spark.sql.kafka010.KafkaBatchPartitionReader.next(KafkaBatchPartitionReader.scala:63) > at > org.apache.spark.sql.execution.datasources.v2.PartitionIterator.hasNext(DataSourceRDD.scala:79) > at > org.apache.spark.sql.execution.datasources.v2.MetricsIterator.hasNext(DataSourceRDD.scala:112) > at > org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) > {code} > > The issue seems to me to be related to the OffsetOutOfRange exception in > (line (323 in KafkaDataConsumer): > > {code:java} > case e: OffsetOutOfRangeException => > // When there is some error thrown, it's better to use a new consumer to > drop all cached > // states in the old consumer. We don't need to worry about the > performance because this > // is not a common path. > releaseConsumer() > fetchedData.reset() > reportDataLoss(topicPartition, groupId, failOnDataLoss, > s"Cannot fetch offset $toFetchOffset", e) > toFetchOffset = getEarliestAvailableOffsetBetween(consumer, > toFetchOffset, untilOffset) > } > {code} > seems like releaseConsumer will destoy the consumer , which later is used ... > > > -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org