Re: OffsetOutOfRangeError with Kafka-Spark streaming
Ok. Problem is resolved when I increased retention policy for topic. But now I see that whenever I restart Spark job, some old messages are being pulled up by Spark stream. For new Spark stream API, do we need to keep track of offsets? LCassa On Thu, Aug 6, 2015 at 4:58 PM, Grant Henke ghe...@cloudera.com wrote: Looks like this is likely a case very similar to the case Parth mentioned storm users have seen, when processing falls behind the retention period. Perhaps Spark and Kafka can handle this scenario more gracefully. I would be happy to do some investigation/testing and report back with findings and potentially open a Jira to track any fix. On Thu, Aug 6, 2015 at 6:48 PM, Parth Brahmbhatt pbrahmbh...@hortonworks.com wrote: retention.ms is actually millisecond, you want a value much larger then 1440, which translates to 1.4 seconds. On 8/6/15, 4:35 PM, Cassa L lcas...@gmail.com wrote: Hi Grant, Yes, I saw exception in Spark and Kafka. In Kafka server logs I get this exception: kafka.common.OffsetOutOfRangeException: Request for offset 2823 but we only have log segments in the range 2824 to 2824. at kafka.log.Log.read(Log.scala:380) at kafka.server.KafkaApis.kafka$server$KafkaApis$$readMessageSet(KafkaApis.sc ala:530) at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$readMessageSets$1. apply(KafkaApis.scala:476) at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$readMessageSets$1. apply(KafkaApis.scala:471) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scal a:206) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scal a:206) at scala.collection.immutable.Map$Map1.foreach(Map.scala:105) at scala.collection.TraversableLike$class.map(TraversableLike.scala:206) at scala.collection.immutable.Map$Map1.map(Map.scala:93) at kafka.server.KafkaApis.kafka$server$KafkaApis$$readMessageSets(KafkaApis.s cala:471) at kafka.server.KafkaApis$FetchRequestPurgatory.expire(KafkaApis.scala:783) at kafka.server.KafkaApis$FetchRequestPurgatory.expire(KafkaApis.scala:765) at kafka.server.RequestPurgatory$ExpiredRequestReaper.run(RequestPurgatory.sc a Similar kind of exception comes to Spark Job. Here are my versions : Spark - 1.4.1 Kafka - 0.8.1 I changed retention on config using this command : ./kafka-topics.sh --alter --zookeeper XXX:2181 --topic MyTopic --config retention.ms=1440 (I believe this is in minutes) I am also noticing something in Kafka. When I run below command on broker: ./kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list vdc-vm8.apple.com:9092 --topic MyTopic --time -2 Earliest offset is being set to latest just in few seconds. Am I co-relating this issue correctly? Here is my example on a new Topic. Initial output of this command is ./kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list vdc-vm8.apple.com:9092 --topic MyTopic --time -2 SLF4J: Failed to load class org.slf4j.impl.StaticLoggerBinder. SLF4J: Defaulting to no-operation (NOP) logger implementation SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details. MyTopic:0:60 I published 4 messages to Kafka. Immediately after few seconds, command output is: MyTopic:0:64 Isn't this supposed to stay at 60 for longer time based on retention policy? Thanks, Leena On Thu, Aug 6, 2015 at 12:09 PM, Grant Henke ghe...@cloudera.com wrote: Does this Spark Jira match up with what you are seeing or sound related? https://issues.apache.org/jira/browse/SPARK-8474 What versions of Spark and Kafka are you using? Can you include more of the spark log? Any errors shown in the Kafka log? Thanks, Grant On Thu, Aug 6, 2015 at 1:17 PM, Cassa L lcas...@gmail.com wrote: Hi, Has anyone tried streaming API of Spark with Kafka? I am experimenting new Spark API to read from Kafka. KafkaUtils.createDirectStream(...) Every now and then, I get following error spark kafka.common.OffsetOutOfRangeException and my spark script stops working. I have simple topic with just one partition. I would appreciate any clues on how to debug this issue. Thanks, LCassa -- Grant Henke Software Engineer | Cloudera gr...@cloudera.com | twitter.com/gchenke | linkedin.com/in/granthenke -- Grant Henke Software Engineer | Cloudera gr...@cloudera.com | twitter.com/gchenke | linkedin.com/in/granthenke
Re: OffsetOutOfRangeError with Kafka-Spark streaming
That will be great if you can also try it! As for retention policy, I had come across some issue with 0.8.1 version where retention.ms is in milliseconds but actual server property is log.retention.minutes and servers would take it as minutes? Is it true? anyways, I have updated retention to 2 hours in milliseconds. Will update this thread if problem still occurs. On Thu, Aug 6, 2015 at 4:58 PM, Grant Henke ghe...@cloudera.com wrote: Looks like this is likely a case very similar to the case Parth mentioned storm users have seen, when processing falls behind the retention period. Perhaps Spark and Kafka can handle this scenario more gracefully. I would be happy to do some investigation/testing and report back with findings and potentially open a Jira to track any fix. On Thu, Aug 6, 2015 at 6:48 PM, Parth Brahmbhatt pbrahmbh...@hortonworks.com wrote: retention.ms is actually millisecond, you want a value much larger then 1440, which translates to 1.4 seconds. On 8/6/15, 4:35 PM, Cassa L lcas...@gmail.com wrote: Hi Grant, Yes, I saw exception in Spark and Kafka. In Kafka server logs I get this exception: kafka.common.OffsetOutOfRangeException: Request for offset 2823 but we only have log segments in the range 2824 to 2824. at kafka.log.Log.read(Log.scala:380) at kafka.server.KafkaApis.kafka$server$KafkaApis$$readMessageSet(KafkaApis.sc ala:530) at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$readMessageSets$1. apply(KafkaApis.scala:476) at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$readMessageSets$1. apply(KafkaApis.scala:471) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scal a:206) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scal a:206) at scala.collection.immutable.Map$Map1.foreach(Map.scala:105) at scala.collection.TraversableLike$class.map(TraversableLike.scala:206) at scala.collection.immutable.Map$Map1.map(Map.scala:93) at kafka.server.KafkaApis.kafka$server$KafkaApis$$readMessageSets(KafkaApis.s cala:471) at kafka.server.KafkaApis$FetchRequestPurgatory.expire(KafkaApis.scala:783) at kafka.server.KafkaApis$FetchRequestPurgatory.expire(KafkaApis.scala:765) at kafka.server.RequestPurgatory$ExpiredRequestReaper.run(RequestPurgatory.sc a Similar kind of exception comes to Spark Job. Here are my versions : Spark - 1.4.1 Kafka - 0.8.1 I changed retention on config using this command : ./kafka-topics.sh --alter --zookeeper XXX:2181 --topic MyTopic --config retention.ms=1440 (I believe this is in minutes) I am also noticing something in Kafka. When I run below command on broker: ./kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list vdc-vm8.apple.com:9092 --topic MyTopic --time -2 Earliest offset is being set to latest just in few seconds. Am I co-relating this issue correctly? Here is my example on a new Topic. Initial output of this command is ./kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list vdc-vm8.apple.com:9092 --topic MyTopic --time -2 SLF4J: Failed to load class org.slf4j.impl.StaticLoggerBinder. SLF4J: Defaulting to no-operation (NOP) logger implementation SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details. MyTopic:0:60 I published 4 messages to Kafka. Immediately after few seconds, command output is: MyTopic:0:64 Isn't this supposed to stay at 60 for longer time based on retention policy? Thanks, Leena On Thu, Aug 6, 2015 at 12:09 PM, Grant Henke ghe...@cloudera.com wrote: Does this Spark Jira match up with what you are seeing or sound related? https://issues.apache.org/jira/browse/SPARK-8474 What versions of Spark and Kafka are you using? Can you include more of the spark log? Any errors shown in the Kafka log? Thanks, Grant On Thu, Aug 6, 2015 at 1:17 PM, Cassa L lcas...@gmail.com wrote: Hi, Has anyone tried streaming API of Spark with Kafka? I am experimenting new Spark API to read from Kafka. KafkaUtils.createDirectStream(...) Every now and then, I get following error spark kafka.common.OffsetOutOfRangeException and my spark script stops working. I have simple topic with just one partition. I would appreciate any clues on how to debug this issue. Thanks, LCassa -- Grant Henke Software Engineer | Cloudera gr...@cloudera.com | twitter.com/gchenke | linkedin.com/in/granthenke -- Grant Henke Software Engineer | Cloudera gr...@cloudera.com | twitter.com/gchenke | linkedin.com/in/granthenke
Re: OffsetOutOfRangeError with Kafka-Spark streaming
In Apache Storm some users reported the same issue few months ago [1][2][3]. This was an unusual situation which in our experience only happened when storm topology was asking for offsets that were already trimmed by kafka. Multiple pathological cases(too low retention period, too slow topology, some poison pill message that kept retrying and finally got trimmed by kafka and was no longer available) because of which users may end up in that situation. Storm to an extent allows user to control what they want to do in this situation, not so sure about spark streaming. If spark does not handle this right now your best bet is to ensure your kafka retention period is high enough that your processing does not fall behind so much that the data gets trimmed (and this you should anyway do to avoid data loss) and ensure that you throw away/push to DLQ, failed messages after some small number of retries instead of retrying forever. [1]https://issues.apache.org/jira/browse/STORM-511 [2]https://issues.apache.org/jira/browse/STORM-586 [3]https://issues.apache.org/jira/browse/STORM-643 Thanks Parth On 8/6/15, 12:09 PM, Grant Henke ghe...@cloudera.com wrote: Does this Spark Jira match up with what you are seeing or sound related? https://issues.apache.org/jira/browse/SPARK-8474 What versions of Spark and Kafka are you using? Can you include more of the spark log? Any errors shown in the Kafka log? Thanks, Grant On Thu, Aug 6, 2015 at 1:17 PM, Cassa L lcas...@gmail.com wrote: Hi, Has anyone tried streaming API of Spark with Kafka? I am experimenting new Spark API to read from Kafka. KafkaUtils.createDirectStream(...) Every now and then, I get following error spark kafka.common.OffsetOutOfRangeException and my spark script stops working. I have simple topic with just one partition. I would appreciate any clues on how to debug this issue. Thanks, LCassa -- Grant Henke Software Engineer | Cloudera gr...@cloudera.com | twitter.com/gchenke | linkedin.com/in/granthenke
Re: OffsetOutOfRangeError with Kafka-Spark streaming
retention.ms is actually millisecond, you want a value much larger then 1440, which translates to 1.4 seconds. On 8/6/15, 4:35 PM, Cassa L lcas...@gmail.com wrote: Hi Grant, Yes, I saw exception in Spark and Kafka. In Kafka server logs I get this exception: kafka.common.OffsetOutOfRangeException: Request for offset 2823 but we only have log segments in the range 2824 to 2824. at kafka.log.Log.read(Log.scala:380) at kafka.server.KafkaApis.kafka$server$KafkaApis$$readMessageSet(KafkaApis.sc ala:530) at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$readMessageSets$1. apply(KafkaApis.scala:476) at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$readMessageSets$1. apply(KafkaApis.scala:471) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scal a:206) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scal a:206) at scala.collection.immutable.Map$Map1.foreach(Map.scala:105) at scala.collection.TraversableLike$class.map(TraversableLike.scala:206) at scala.collection.immutable.Map$Map1.map(Map.scala:93) at kafka.server.KafkaApis.kafka$server$KafkaApis$$readMessageSets(KafkaApis.s cala:471) at kafka.server.KafkaApis$FetchRequestPurgatory.expire(KafkaApis.scala:783) at kafka.server.KafkaApis$FetchRequestPurgatory.expire(KafkaApis.scala:765) at kafka.server.RequestPurgatory$ExpiredRequestReaper.run(RequestPurgatory.sc a Similar kind of exception comes to Spark Job. Here are my versions : Spark - 1.4.1 Kafka - 0.8.1 I changed retention on config using this command : ./kafka-topics.sh --alter --zookeeper XXX:2181 --topic MyTopic --config retention.ms=1440 (I believe this is in minutes) I am also noticing something in Kafka. When I run below command on broker: ./kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list vdc-vm8.apple.com:9092 --topic MyTopic --time -2 Earliest offset is being set to latest just in few seconds. Am I co-relating this issue correctly? Here is my example on a new Topic. Initial output of this command is ./kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list vdc-vm8.apple.com:9092 --topic MyTopic --time -2 SLF4J: Failed to load class org.slf4j.impl.StaticLoggerBinder. SLF4J: Defaulting to no-operation (NOP) logger implementation SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details. MyTopic:0:60 I published 4 messages to Kafka. Immediately after few seconds, command output is: MyTopic:0:64 Isn't this supposed to stay at 60 for longer time based on retention policy? Thanks, Leena On Thu, Aug 6, 2015 at 12:09 PM, Grant Henke ghe...@cloudera.com wrote: Does this Spark Jira match up with what you are seeing or sound related? https://issues.apache.org/jira/browse/SPARK-8474 What versions of Spark and Kafka are you using? Can you include more of the spark log? Any errors shown in the Kafka log? Thanks, Grant On Thu, Aug 6, 2015 at 1:17 PM, Cassa L lcas...@gmail.com wrote: Hi, Has anyone tried streaming API of Spark with Kafka? I am experimenting new Spark API to read from Kafka. KafkaUtils.createDirectStream(...) Every now and then, I get following error spark kafka.common.OffsetOutOfRangeException and my spark script stops working. I have simple topic with just one partition. I would appreciate any clues on how to debug this issue. Thanks, LCassa -- Grant Henke Software Engineer | Cloudera gr...@cloudera.com | twitter.com/gchenke | linkedin.com/in/granthenke
Re: OffsetOutOfRangeError with Kafka-Spark streaming
Hi Grant, Yes, I saw exception in Spark and Kafka. In Kafka server logs I get this exception: kafka.common.OffsetOutOfRangeException: Request for offset 2823 but we only have log segments in the range 2824 to 2824. at kafka.log.Log.read(Log.scala:380) at kafka.server.KafkaApis.kafka$server$KafkaApis$$readMessageSet(KafkaApis.scala:530) at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$readMessageSets$1.apply(KafkaApis.scala:476) at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$readMessageSets$1.apply(KafkaApis.scala:471) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206) at scala.collection.immutable.Map$Map1.foreach(Map.scala:105) at scala.collection.TraversableLike$class.map(TraversableLike.scala:206) at scala.collection.immutable.Map$Map1.map(Map.scala:93) at kafka.server.KafkaApis.kafka$server$KafkaApis$$readMessageSets(KafkaApis.scala:471) at kafka.server.KafkaApis$FetchRequestPurgatory.expire(KafkaApis.scala:783) at kafka.server.KafkaApis$FetchRequestPurgatory.expire(KafkaApis.scala:765) at kafka.server.RequestPurgatory$ExpiredRequestReaper.run(RequestPurgatory.sca Similar kind of exception comes to Spark Job. Here are my versions : Spark - 1.4.1 Kafka - 0.8.1 I changed retention on config using this command : ./kafka-topics.sh --alter --zookeeper XXX:2181 --topic MyTopic --config retention.ms=1440 (I believe this is in minutes) I am also noticing something in Kafka. When I run below command on broker: ./kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list vdc-vm8.apple.com:9092 --topic MyTopic --time -2 Earliest offset is being set to latest just in few seconds. Am I co-relating this issue correctly? Here is my example on a new Topic. Initial output of this command is ./kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list vdc-vm8.apple.com:9092 --topic MyTopic --time -2 SLF4J: Failed to load class org.slf4j.impl.StaticLoggerBinder. SLF4J: Defaulting to no-operation (NOP) logger implementation SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details. MyTopic:0:60 I published 4 messages to Kafka. Immediately after few seconds, command output is: MyTopic:0:64 Isn't this supposed to stay at 60 for longer time based on retention policy? Thanks, Leena On Thu, Aug 6, 2015 at 12:09 PM, Grant Henke ghe...@cloudera.com wrote: Does this Spark Jira match up with what you are seeing or sound related? https://issues.apache.org/jira/browse/SPARK-8474 What versions of Spark and Kafka are you using? Can you include more of the spark log? Any errors shown in the Kafka log? Thanks, Grant On Thu, Aug 6, 2015 at 1:17 PM, Cassa L lcas...@gmail.com wrote: Hi, Has anyone tried streaming API of Spark with Kafka? I am experimenting new Spark API to read from Kafka. KafkaUtils.createDirectStream(...) Every now and then, I get following error spark kafka.common.OffsetOutOfRangeException and my spark script stops working. I have simple topic with just one partition. I would appreciate any clues on how to debug this issue. Thanks, LCassa -- Grant Henke Software Engineer | Cloudera gr...@cloudera.com | twitter.com/gchenke | linkedin.com/in/granthenke
Re: OffsetOutOfRangeError with Kafka-Spark streaming
Looks like this is likely a case very similar to the case Parth mentioned storm users have seen, when processing falls behind the retention period. Perhaps Spark and Kafka can handle this scenario more gracefully. I would be happy to do some investigation/testing and report back with findings and potentially open a Jira to track any fix. On Thu, Aug 6, 2015 at 6:48 PM, Parth Brahmbhatt pbrahmbh...@hortonworks.com wrote: retention.ms is actually millisecond, you want a value much larger then 1440, which translates to 1.4 seconds. On 8/6/15, 4:35 PM, Cassa L lcas...@gmail.com wrote: Hi Grant, Yes, I saw exception in Spark and Kafka. In Kafka server logs I get this exception: kafka.common.OffsetOutOfRangeException: Request for offset 2823 but we only have log segments in the range 2824 to 2824. at kafka.log.Log.read(Log.scala:380) at kafka.server.KafkaApis.kafka$server$KafkaApis$$readMessageSet(KafkaApis.sc ala:530) at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$readMessageSets$1. apply(KafkaApis.scala:476) at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$readMessageSets$1. apply(KafkaApis.scala:471) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scal a:206) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scal a:206) at scala.collection.immutable.Map$Map1.foreach(Map.scala:105) at scala.collection.TraversableLike$class.map(TraversableLike.scala:206) at scala.collection.immutable.Map$Map1.map(Map.scala:93) at kafka.server.KafkaApis.kafka$server$KafkaApis$$readMessageSets(KafkaApis.s cala:471) at kafka.server.KafkaApis$FetchRequestPurgatory.expire(KafkaApis.scala:783) at kafka.server.KafkaApis$FetchRequestPurgatory.expire(KafkaApis.scala:765) at kafka.server.RequestPurgatory$ExpiredRequestReaper.run(RequestPurgatory.sc a Similar kind of exception comes to Spark Job. Here are my versions : Spark - 1.4.1 Kafka - 0.8.1 I changed retention on config using this command : ./kafka-topics.sh --alter --zookeeper XXX:2181 --topic MyTopic --config retention.ms=1440 (I believe this is in minutes) I am also noticing something in Kafka. When I run below command on broker: ./kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list vdc-vm8.apple.com:9092 --topic MyTopic --time -2 Earliest offset is being set to latest just in few seconds. Am I co-relating this issue correctly? Here is my example on a new Topic. Initial output of this command is ./kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list vdc-vm8.apple.com:9092 --topic MyTopic --time -2 SLF4J: Failed to load class org.slf4j.impl.StaticLoggerBinder. SLF4J: Defaulting to no-operation (NOP) logger implementation SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details. MyTopic:0:60 I published 4 messages to Kafka. Immediately after few seconds, command output is: MyTopic:0:64 Isn't this supposed to stay at 60 for longer time based on retention policy? Thanks, Leena On Thu, Aug 6, 2015 at 12:09 PM, Grant Henke ghe...@cloudera.com wrote: Does this Spark Jira match up with what you are seeing or sound related? https://issues.apache.org/jira/browse/SPARK-8474 What versions of Spark and Kafka are you using? Can you include more of the spark log? Any errors shown in the Kafka log? Thanks, Grant On Thu, Aug 6, 2015 at 1:17 PM, Cassa L lcas...@gmail.com wrote: Hi, Has anyone tried streaming API of Spark with Kafka? I am experimenting new Spark API to read from Kafka. KafkaUtils.createDirectStream(...) Every now and then, I get following error spark kafka.common.OffsetOutOfRangeException and my spark script stops working. I have simple topic with just one partition. I would appreciate any clues on how to debug this issue. Thanks, LCassa -- Grant Henke Software Engineer | Cloudera gr...@cloudera.com | twitter.com/gchenke | linkedin.com/in/granthenke -- Grant Henke Software Engineer | Cloudera gr...@cloudera.com | twitter.com/gchenke | linkedin.com/in/granthenke
OffsetOutOfRangeError with Kafka-Spark streaming
Hi, Has anyone tried streaming API of Spark with Kafka? I am experimenting new Spark API to read from Kafka. KafkaUtils.createDirectStream(...) Every now and then, I get following error spark kafka.common.OffsetOutOfRangeException and my spark script stops working. I have simple topic with just one partition. I would appreciate any clues on how to debug this issue. Thanks, LCassa
Re: OffsetOutOfRangeError with Kafka-Spark streaming
Does this Spark Jira match up with what you are seeing or sound related? https://issues.apache.org/jira/browse/SPARK-8474 What versions of Spark and Kafka are you using? Can you include more of the spark log? Any errors shown in the Kafka log? Thanks, Grant On Thu, Aug 6, 2015 at 1:17 PM, Cassa L lcas...@gmail.com wrote: Hi, Has anyone tried streaming API of Spark with Kafka? I am experimenting new Spark API to read from Kafka. KafkaUtils.createDirectStream(...) Every now and then, I get following error spark kafka.common.OffsetOutOfRangeException and my spark script stops working. I have simple topic with just one partition. I would appreciate any clues on how to debug this issue. Thanks, LCassa -- Grant Henke Software Engineer | Cloudera gr...@cloudera.com | twitter.com/gchenke | linkedin.com/in/granthenke