That will be great if you can also try it! As for retention policy, I had come across some issue with 0.8.1 version where "retention.ms" is in milliseconds but actual server property is "log.retention.minutes" and servers would take it as minutes? Is it true? anyways, I have updated retention to 2 hours in milliseconds. Will update this thread if problem still occurs.
On Thu, Aug 6, 2015 at 4:58 PM, Grant Henke <ghe...@cloudera.com> wrote: > Looks like this is likely a case very similar to the case Parth mentioned > storm users have seen, when processing falls behind the retention period. > > Perhaps Spark and Kafka can handle this scenario more gracefully. I would > be happy to do some investigation/testing and report back with findings and > potentially open a Jira to track any fix. > > On Thu, Aug 6, 2015 at 6:48 PM, Parth Brahmbhatt < > pbrahmbh...@hortonworks.com> wrote: > > > retention.ms is actually millisecond, you want a value much larger then > > 1440, which translates to 1.4 seconds. > > > > > > On 8/6/15, 4:35 PM, "Cassa L" <lcas...@gmail.com> wrote: > > > > >Hi Grant, > > >Yes, I saw exception in Spark and Kafka. In Kafka server logs I get this > > >exception: > > >kafka.common.OffsetOutOfRangeException: Request for offset 2823 but we > > >only > > >have log segments in the range 2824 to 2824. > > > at kafka.log.Log.read(Log.scala:380) > > > at > > > >kafka.server.KafkaApis.kafka$server$KafkaApis$$readMessageSet(KafkaApis.sc > > >ala:530) > > > at > > > >kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$readMessageSets$1. > > >apply(KafkaApis.scala:476) > > > at > > > >kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$readMessageSets$1. > > >apply(KafkaApis.scala:471) > > > at > > > >scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scal > > >a:206) > > > at > > > >scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scal > > >a:206) > > > at scala.collection.immutable.Map$Map1.foreach(Map.scala:105) > > > at > > >scala.collection.TraversableLike$class.map(TraversableLike.scala:206) > > > at scala.collection.immutable.Map$Map1.map(Map.scala:93) > > > at > > > >kafka.server.KafkaApis.kafka$server$KafkaApis$$readMessageSets(KafkaApis.s > > >cala:471) > > > at > > >kafka.server.KafkaApis$FetchRequestPurgatory.expire(KafkaApis.scala:783) > > > at > > >kafka.server.KafkaApis$FetchRequestPurgatory.expire(KafkaApis.scala:765) > > > at > > > >kafka.server.RequestPurgatory$ExpiredRequestReaper.run(RequestPurgatory.sc > > >a > > > > > >Similar kind of exception comes to Spark Job. > > > > > >Here are my versions : > > > Spark - 1.4.1 > > > Kafka - 0.8.1 > > > > > >I changed retention on config using this command : > > >./kafka-topics.sh --alter --zookeeper XXX:2181 --topic MyTopic > --config > > >retention.ms=1440 (I believe this is in minutes) > > > > > >I am also noticing something in Kafka. When I run below command on > broker: > > >./kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list > > >vdc-vm8.apple.com:9092 --topic MyTopic --time -2 > > >Earliest offset is being set to latest just in few seconds. Am I > > >co-relating this issue correctly? > > > > > >Here is my example on a new Topic. Initial output of this command is > > >./kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list > > >vdc-vm8.apple.com:9092 --topic MyTopic --time -2 > > >SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder". > > >SLF4J: Defaulting to no-operation (NOP) logger implementation > > >SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for > further > > >details. > > >MyTopic:0:60 > > > > > >I published 4 messages to Kafka. Immediately after few seconds, command > > >output is: > > > MyTopic:0:64 > > >Isn't this supposed to stay at 60 for longer time based on retention > > >policy? > > > > > > > > >Thanks, > > >Leena > > > > > > > > >On Thu, Aug 6, 2015 at 12:09 PM, Grant Henke <ghe...@cloudera.com> > wrote: > > > > > >> Does this Spark Jira match up with what you are seeing or sound > related? > > >> https://issues.apache.org/jira/browse/SPARK-8474 > > >> > > >> What versions of Spark and Kafka are you using? Can you include more > of > > >>the > > >> spark log? Any errors shown in the Kafka log? > > >> > > >> Thanks, > > >> Grant > > >> > > >> On Thu, Aug 6, 2015 at 1:17 PM, Cassa L <lcas...@gmail.com> wrote: > > >> > > >> > Hi, > > >> > Has anyone tried streaming API of Spark with Kafka? I am > > >>experimenting > > >> new > > >> > Spark API to read from Kafka. > > >> > KafkaUtils.createDirectStream(...) > > >> > > > >> > Every now and then, I get following error "spark > > >> > kafka.common.OffsetOutOfRangeException" and my spark script stops > > >> working. > > >> > I have simple topic with just one partition. > > >> > > > >> > I would appreciate any clues on how to debug this issue. > > >> > > > >> > Thanks, > > >> > LCassa > > >> > > > >> > > >> > > >> > > >> -- > > >> Grant Henke > > >> Software Engineer | Cloudera > > >> gr...@cloudera.com | twitter.com/gchenke | linkedin.com/in/granthenke > > >> > > > > > > > -- > Grant Henke > Software Engineer | Cloudera > gr...@cloudera.com | twitter.com/gchenke | linkedin.com/in/granthenke >