Structured Streaming AUTOMATICALLY saves the offsets in a checkpoint
directory that you provide. And when you start the query again with the
same directory it will just pick up where it left off.
https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#recovering-from-failures-with-checkpointing

On Thu, Mar 22, 2018 at 8:06 PM, M Singh <mans2si...@yahoo.com.invalid>
wrote:

> Hi:
>
> I am working with Spark (2.2.1) and Kafka (0.10) on AWS EMR and for the
> last few days, after running the application for 30-60 minutes get
> exception from Kafka Consumer included below.
>
> The structured streaming application is processing 1 minute worth of data
> from kafka topic. So I've tried increasing request.timeout.ms from 40000
> seconds default to 45000 seconds and receive.buffer.bytes to 1mb but still
> get the same exception.
>
> Is there any spark/kafka configuration that can save the offset and retry
> it next time rather than throwing an exception and killing the application.
>
> I've tried googling but have not found substantial
> solution/recommendation.  If anyone has any suggestions or a different
> version etc, please let me know.
>
> Thanks
>
> Here is the exception stack trace.
>
> java.util.concurrent.TimeoutException: Cannot fetch record for offset
> <offset#> in 120000 milliseconds
> at org.apache.spark.sql.kafka010.CachedKafkaConsumer.org$
> apache$spark$sql$kafka010$CachedKafkaConsumer$$
> fetchData(CachedKafkaConsumer.scala:219)
> at org.apache.spark.sql.kafka010.CachedKafkaConsumer$$anonfun$get$1.apply(
> CachedKafkaConsumer.scala:117)
> at org.apache.spark.sql.kafka010.CachedKafkaConsumer$$anonfun$get$1.apply(
> CachedKafkaConsumer.scala:106)
> at org.apache.spark.util.UninterruptibleThread.runUninterruptibly(
> UninterruptibleThread.scala:85)
> at org.apache.spark.sql.kafka010.CachedKafkaConsumer.
> runUninterruptiblyIfPossible(CachedKafkaConsumer.scala:68)
> at org.apache.spark.sql.kafka010.CachedKafkaConsumer.get(
> CachedKafkaConsumer.scala:106)
> at org.apache.spark.sql.kafka010.KafkaSourceRDD$$anon$1.
> getNext(KafkaSourceRDD.scala:157)
> at
>

Reply via email to