Structured Streaming AUTOMATICALLY saves the offsets in a checkpoint
directory that you provide. And when you start the query again with the
same directory it will just pick up where it left off.

On Thu, Mar 22, 2018 at 8:06 PM, M Singh <>

> Hi:
> I am working with Spark (2.2.1) and Kafka (0.10) on AWS EMR and for the
> last few days, after running the application for 30-60 minutes get
> exception from Kafka Consumer included below.
> The structured streaming application is processing 1 minute worth of data
> from kafka topic. So I've tried increasing from 40000
> seconds default to 45000 seconds and receive.buffer.bytes to 1mb but still
> get the same exception.
> Is there any spark/kafka configuration that can save the offset and retry
> it next time rather than throwing an exception and killing the application.
> I've tried googling but have not found substantial
> solution/recommendation.  If anyone has any suggestions or a different
> version etc, please let me know.
> Thanks
> Here is the exception stack trace.
> java.util.concurrent.TimeoutException: Cannot fetch record for offset
> <offset#> in 120000 milliseconds
> at$
> apache$spark$sql$kafka010$CachedKafkaConsumer$$
> fetchData(CachedKafkaConsumer.scala:219)
> at org.apache.spark.sql.kafka010.CachedKafkaConsumer$$anonfun$get$1.apply(
> CachedKafkaConsumer.scala:117)
> at org.apache.spark.sql.kafka010.CachedKafkaConsumer$$anonfun$get$1.apply(
> CachedKafkaConsumer.scala:106)
> at org.apache.spark.util.UninterruptibleThread.runUninterruptibly(
> UninterruptibleThread.scala:85)
> at org.apache.spark.sql.kafka010.CachedKafkaConsumer.
> runUninterruptiblyIfPossible(CachedKafkaConsumer.scala:68)
> at org.apache.spark.sql.kafka010.CachedKafkaConsumer.get(
> CachedKafkaConsumer.scala:106)
> at org.apache.spark.sql.kafka010.KafkaSourceRDD$$anon$1.
> getNext(KafkaSourceRDD.scala:157)
> at

Reply via email to