[ https://issues.apache.org/jira/browse/SPARK-35915?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17371160#comment-17371160 ]
Hyukjin Kwon commented on SPARK-35915: -------------------------------------- cc [~kabhwan] FYI > Kafka doesn't recover from data loss > ------------------------------------ > > Key: SPARK-35915 > URL: https://issues.apache.org/jira/browse/SPARK-35915 > Project: Spark > Issue Type: Bug > Components: Structured Streaming > Affects Versions: 3.1.1 > Reporter: Yuval Yellin > Priority: Major > > I configured a strcutured streaming source for kafka with > failOnDataLoss=false, > Getting this error when checkopint offsets are not found : > > {code:java} > Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: > Task 7 in stage 5.0 failed 1 times, most recent failure: Lost task 7.0 in > stage 5.0 (TID 113) ( executor driver): java.lang.IllegalStateException: This > consumer has already been closed. > at > org.apache.kafka.clients.consumer.KafkaConsumer.acquireAndEnsureOpen(KafkaConsumer.java:2439) > at > org.apache.kafka.clients.consumer.KafkaConsumer.seekToBeginning(KafkaConsumer.java:1656) > at > org.apache.spark.sql.kafka010.consumer.InternalKafkaConsumer.getAvailableOffsetRange(KafkaDataConsumer.scala:108) > at > org.apache.spark.sql.kafka010.consumer.KafkaDataConsumer.getEarliestAvailableOffsetBetween(KafkaDataConsumer.scala:385) > at > org.apache.spark.sql.kafka010.consumer.KafkaDataConsumer.$anonfun$get$1(KafkaDataConsumer.scala:332) > at > org.apache.spark.util.UninterruptibleThread.runUninterruptibly(UninterruptibleThread.scala:77) > at > org.apache.spark.sql.kafka010.consumer.KafkaDataConsumer.runUninterruptiblyIfPossible(KafkaDataConsumer.scala:604) > at > org.apache.spark.sql.kafka010.consumer.KafkaDataConsumer.get(KafkaDataConsumer.scala:287) > at > org.apache.spark.sql.kafka010.KafkaBatchPartitionReader.next(KafkaBatchPartitionReader.scala:63) > at > org.apache.spark.sql.execution.datasources.v2.PartitionIterator.hasNext(DataSourceRDD.scala:79) > at > org.apache.spark.sql.execution.datasources.v2.MetricsIterator.hasNext(DataSourceRDD.scala:112) > at > org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) > {code} > > The issue seems to me to be related to the OffsetOutOfRange exception in > (line (323 in KafkaDataConsumer): > > {code:java} > case e: OffsetOutOfRangeException => > // When there is some error thrown, it's better to use a new consumer to > drop all cached > // states in the old consumer. We don't need to worry about the > performance because this > // is not a common path. > releaseConsumer() > fetchedData.reset() > reportDataLoss(topicPartition, groupId, failOnDataLoss, > s"Cannot fetch offset $toFetchOffset", e) > toFetchOffset = getEarliestAvailableOffsetBetween(consumer, > toFetchOffset, untilOffset) > } > {code} > seems like releaseConsumer will destoy the consumer , which later is used ... > > > -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org