[ https://issues.apache.org/jira/browse/SPARK-19976?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Taukir updated SPARK-19976: --------------------------- Description: I am using following code. While data on kafka topic get deleted/retention period is over, it throws Exception and application crash def functionToCreateContext(sc:SparkContext):StreamingContext = { val kafkaParams = new mutable.HashMap[String, Object]() kafkaParams.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBrokers) kafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG, KafkaConsumerGrp) kafkaParams.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, classOf[StringDeserializer]) kafkaParams.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, classOf[StringDeserializer]) kafkaParams.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"true") kafkaParams.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") val consumerStrategy = ConsumerStrategies.Subscribe[String, String](topic, kafkaParams) val kafkaStream = KafkaUtils.createDirectStream(ssc,LocationStrategies.PreferConsistent,consumerStrategy) } spark throws error and crash once OffsetOutOf RangeException is thrown WARN scheduler.TaskSetManager: Lost task 1.0 in stage 0.0 : org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Offsets out of range with no configured reset policy for partitions: {test-2=127287} at org.apache.kafka.clients.consumer.internals.Fetcher.parseFetchedData(Fetcher.java:588) at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:354) at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1000) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:938) at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.poll(CachedKafkaConsumer.scala:98) at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:69) at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:227) at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:193) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) was: I am using following code. While data on kafka topic get deleted/retention period is over, it throws Exception and application crash def functionToCreateContext(sc:SparkContext):StreamingContext = { val kafkaParams = new mutable.HashMap[String, Object]() kafkaParams.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBrokers) kafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG, KafkaConsumerGrp) kafkaParams.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, classOf[StringDeserializer]) kafkaParams.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, classOf[StringDeserializer]) kafkaParams.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"true") kafkaParams.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") val consumerStrategy = ConsumerStrategies.Subscribe[String, String](topic.split(",").map(_.trim).filter(!_.isEmpty).toSet, kafkaParams) val kafkaStream = KafkaUtils.createDirectStream(ssc,LocationStrategies.PreferConsistent,consumerStrategy) } spark throws error and crash once OffsetOutOf RangeException is thrown WARN scheduler.TaskSetManager: Lost task 1.0 in stage 0.0 : org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Offsets out of range with no configured reset policy for partitions: {test-2=127287} at org.apache.kafka.clients.consumer.internals.Fetcher.parseFetchedData(Fetcher.java:588) at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:354) at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1000) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:938) at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.poll(CachedKafkaConsumer.scala:98) at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:69) at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:227) at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:193) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) > DirectStream API throws OffsetOutOfRange Exception > -------------------------------------------------- > > Key: SPARK-19976 > URL: https://issues.apache.org/jira/browse/SPARK-19976 > Project: Spark > Issue Type: Bug > Components: DStreams > Affects Versions: 2.0.0 > Reporter: Taukir > > I am using following code. While data on kafka topic get deleted/retention > period is over, it throws Exception and application crash > def functionToCreateContext(sc:SparkContext):StreamingContext = { > val kafkaParams = new mutable.HashMap[String, Object]() > kafkaParams.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBrokers) > kafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG, KafkaConsumerGrp) > kafkaParams.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, > classOf[StringDeserializer]) > kafkaParams.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, > classOf[StringDeserializer]) > kafkaParams.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"true") > kafkaParams.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") > val consumerStrategy = ConsumerStrategies.Subscribe[String, String](topic, > kafkaParams) > val kafkaStream = > KafkaUtils.createDirectStream(ssc,LocationStrategies.PreferConsistent,consumerStrategy) > } > spark throws error and crash once OffsetOutOf RangeException is thrown > WARN scheduler.TaskSetManager: Lost task 1.0 in stage 0.0 : > org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Offsets out of > range with no configured reset policy for partitions: {test-2=127287} > at > org.apache.kafka.clients.consumer.internals.Fetcher.parseFetchedData(Fetcher.java:588) > at > org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:354) > at > org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1000) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:938) > at > org.apache.spark.streaming.kafka010.CachedKafkaConsumer.poll(CachedKafkaConsumer.scala:98) > at > org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:69) > at > org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:227) > at > org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:193) > at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) -- This message was sent by Atlassian JIRA (v6.4.14#64029) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org