Structured Streaming AUTOMATICALLY saves the offsets in a checkpoint directory that you provide. And when you start the query again with the same directory it will just pick up where it left off. https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#recovering-from-failures-with-checkpointing
On Thu, Mar 22, 2018 at 8:06 PM, M Singh <mans2si...@yahoo.com.invalid> wrote: > Hi: > > I am working with Spark (2.2.1) and Kafka (0.10) on AWS EMR and for the > last few days, after running the application for 30-60 minutes get > exception from Kafka Consumer included below. > > The structured streaming application is processing 1 minute worth of data > from kafka topic. So I've tried increasing request.timeout.ms from 40000 > seconds default to 45000 seconds and receive.buffer.bytes to 1mb but still > get the same exception. > > Is there any spark/kafka configuration that can save the offset and retry > it next time rather than throwing an exception and killing the application. > > I've tried googling but have not found substantial > solution/recommendation. If anyone has any suggestions or a different > version etc, please let me know. > > Thanks > > Here is the exception stack trace. > > java.util.concurrent.TimeoutException: Cannot fetch record for offset > <offset#> in 120000 milliseconds > at org.apache.spark.sql.kafka010.CachedKafkaConsumer.org$ > apache$spark$sql$kafka010$CachedKafkaConsumer$$ > fetchData(CachedKafkaConsumer.scala:219) > at org.apache.spark.sql.kafka010.CachedKafkaConsumer$$anonfun$get$1.apply( > CachedKafkaConsumer.scala:117) > at org.apache.spark.sql.kafka010.CachedKafkaConsumer$$anonfun$get$1.apply( > CachedKafkaConsumer.scala:106) > at org.apache.spark.util.UninterruptibleThread.runUninterruptibly( > UninterruptibleThread.scala:85) > at org.apache.spark.sql.kafka010.CachedKafkaConsumer. > runUninterruptiblyIfPossible(CachedKafkaConsumer.scala:68) > at org.apache.spark.sql.kafka010.CachedKafkaConsumer.get( > CachedKafkaConsumer.scala:106) > at org.apache.spark.sql.kafka010.KafkaSourceRDD$$anon$1. > getNext(KafkaSourceRDD.scala:157) > at >