That will be great if you can also try it!
As for retention policy, I had come across some issue with 0.8.1 version
where "retention.ms" is in milliseconds but actual server property is
"log.retention.minutes" and servers would take it as minutes? Is it true?
anyways, I have updated retention to 2 hours in milliseconds. Will update
this thread if problem still occurs.

On Thu, Aug 6, 2015 at 4:58 PM, Grant Henke <ghe...@cloudera.com> wrote:

> Looks like this is likely a case very similar to the case Parth mentioned
> storm users have seen, when processing falls behind the retention period.
>
> Perhaps Spark and Kafka can handle this scenario more gracefully. I would
> be happy to do some investigation/testing and report back with findings and
> potentially open a Jira to track any fix.
>
> On Thu, Aug 6, 2015 at 6:48 PM, Parth Brahmbhatt <
> pbrahmbh...@hortonworks.com> wrote:
>
> > retention.ms is actually millisecond, you want a value much larger then
> > 1440, which translates to 1.4 seconds.
> >
> >
> > On 8/6/15, 4:35 PM, "Cassa L" <lcas...@gmail.com> wrote:
> >
> > >Hi Grant,
> > >Yes, I saw exception in Spark and Kafka. In Kafka server logs I get this
> > >exception:
> > >kafka.common.OffsetOutOfRangeException: Request for offset 2823 but we
> > >only
> > >have log segments in the range 2824 to 2824.
> > >        at kafka.log.Log.read(Log.scala:380)
> > >        at
> >
> >kafka.server.KafkaApis.kafka$server$KafkaApis$$readMessageSet(KafkaApis.sc
> > >ala:530)
> > >        at
> >
> >kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$readMessageSets$1.
> > >apply(KafkaApis.scala:476)
> > >        at
> >
> >kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$readMessageSets$1.
> > >apply(KafkaApis.scala:471)
> > >        at
> >
> >scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scal
> > >a:206)
> > >        at
> >
> >scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scal
> > >a:206)
> > >        at scala.collection.immutable.Map$Map1.foreach(Map.scala:105)
> > >        at
> > >scala.collection.TraversableLike$class.map(TraversableLike.scala:206)
> > >        at scala.collection.immutable.Map$Map1.map(Map.scala:93)
> > >        at
> >
> >kafka.server.KafkaApis.kafka$server$KafkaApis$$readMessageSets(KafkaApis.s
> > >cala:471)
> > >        at
> > >kafka.server.KafkaApis$FetchRequestPurgatory.expire(KafkaApis.scala:783)
> > >        at
> > >kafka.server.KafkaApis$FetchRequestPurgatory.expire(KafkaApis.scala:765)
> > >        at
> >
> >kafka.server.RequestPurgatory$ExpiredRequestReaper.run(RequestPurgatory.sc
> > >a
> > >
> > >Similar kind of exception comes to Spark Job.
> > >
> > >Here are my versions :
> > >       Spark - 1.4.1
> > >        Kafka - 0.8.1
> > >
> > >I changed retention on config using this command :
> > >./kafka-topics.sh --alter --zookeeper  XXX:2181  --topic MyTopic
> --config
> > >retention.ms=1440  (I believe this is in minutes)
> > >
> > >I am also noticing something in Kafka. When I run below command on
> broker:
> > >./kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list
> > >vdc-vm8.apple.com:9092 --topic MyTopic --time -2
> > >Earliest offset is being set to latest just in few seconds. Am I
> > >co-relating this issue correctly?
> > >
> > >Here is my example on a new Topic. Initial output of this command is
> > >./kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list
> > >vdc-vm8.apple.com:9092 --topic MyTopic --time -2
> > >SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
> > >SLF4J: Defaulting to no-operation (NOP) logger implementation
> > >SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for
> further
> > >details.
> > >MyTopic:0:60
> > >
> > >I published 4 messages to Kafka. Immediately after few seconds, command
> > >output is:
> > >          MyTopic:0:64
> > >Isn't this supposed to stay at 60 for longer time based on retention
> > >policy?
> > >
> > >
> > >Thanks,
> > >Leena
> > >
> > >
> > >On Thu, Aug 6, 2015 at 12:09 PM, Grant Henke <ghe...@cloudera.com>
> wrote:
> > >
> > >> Does this Spark Jira match up with what you are seeing or sound
> related?
> > >> https://issues.apache.org/jira/browse/SPARK-8474
> > >>
> > >> What versions of Spark and Kafka are you using? Can you include more
> of
> > >>the
> > >> spark log? Any errors shown in the Kafka log?
> > >>
> > >> Thanks,
> > >> Grant
> > >>
> > >> On Thu, Aug 6, 2015 at 1:17 PM, Cassa L <lcas...@gmail.com> wrote:
> > >>
> > >> > Hi,
> > >> >  Has anyone tried streaming API of Spark with Kafka? I am
> > >>experimenting
> > >> new
> > >> > Spark API to read from Kafka.
> > >> > KafkaUtils.createDirectStream(...)
> > >> >
> > >> > Every now and then, I get following error "spark
> > >> > kafka.common.OffsetOutOfRangeException" and my spark script stops
> > >> working.
> > >> > I have simple topic with just one partition.
> > >> >
> > >> > I would appreciate any clues on how to debug this issue.
> > >> >
> > >> > Thanks,
> > >> > LCassa
> > >> >
> > >>
> > >>
> > >>
> > >> --
> > >> Grant Henke
> > >> Software Engineer | Cloudera
> > >> gr...@cloudera.com | twitter.com/gchenke | linkedin.com/in/granthenke
> > >>
> >
> >
>
>
> --
> Grant Henke
> Software Engineer | Cloudera
> gr...@cloudera.com | twitter.com/gchenke | linkedin.com/in/granthenke
>

Reply via email to