I see the assertion error when I compare the offset ranges as shown below. How do I log the offset for each message?
kafkaStream.transform { rdd => // Get the offset ranges in the RDD offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges rdd }.foreachRDD { rdd => for (o <- offsetRanges) { LOGGER.info(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}"+"Queried offsets") } val collected = rdd.mapPartitionsWithIndex { (i, iter) => // For each partition, get size of the range in the partition, // and the number of items in the partition val off = offsetRanges(i) val all = iter.toSeq val partSize = all.size val rangeSize = off.untilOffset - off.fromOffset Iterator((partSize, rangeSize)) }.collect*/ // Verify whether number of elements in each partition // matches with the corresponding offset range collected.foreach { case (partSize, rangeSize) => assert(partSize == rangeSize, "offset ranges are wrong") } } On Tue, Nov 24, 2015 at 8:33 AM, Cody Koeninger <c...@koeninger.org> wrote: > Anything's possible, but that sounds pretty unlikely to me. > Are the partitions it's failing for all on the same leader? > Have there been any leader rebalances? > Do you have enough log retention? > If you log the offset for each message as it's processed, when do you see > the problem? > > On Tue, Nov 24, 2015 at 10:28 AM, swetha kasireddy < > swethakasire...@gmail.com> wrote: > >> Is it possible that the kafka offset api is somehow returning the wrong >> offsets. Because each time the job fails for different partitions with an >> error similar to the error that I get below. >> >> Job aborted due to stage failure: Task 20 in stage 117.0 failed 4 times, >> most recent failure: Lost task 20.3 in stage 117.0 (TID 2114, >> 10.227.64.52): >> java.lang.AssertionError: assertion failed: Ran out of messages before >> reaching ending offset 221572238 for topic hubble_stream partition 88 >> start >> 221563725. This should not happen, and indicates that messages may have >> been >> lost >> >> On Tue, Nov 24, 2015 at 6:31 AM, Cody Koeninger <c...@koeninger.org> >> wrote: >> >>> No, the direct stream only communicates with Kafka brokers, not >>> Zookeeper directly. It asks the leader for each topicpartition what the >>> highest available offsets are, using the Kafka offset api. >>> >>> On Mon, Nov 23, 2015 at 11:36 PM, swetha kasireddy < >>> swethakasire...@gmail.com> wrote: >>> >>>> Does Kafka direct query the offsets from the zookeeper directly? From >>>> where does it get the offsets? There is data in those offsets, but somehow >>>> Kafka Direct does not seem to pick it up. Other Consumers that use Zoo >>>> Keeper Quorum of that Stream seems to be fine. Only Kafka Direct seems to >>>> have issues. How does Kafka Direct know which offsets to query after >>>> getting the initial batches from "auto.offset.reset" -> "largest"? >>>> >>>> On Mon, Nov 23, 2015 at 11:01 AM, Cody Koeninger <c...@koeninger.org> >>>> wrote: >>>> >>>>> No, that means that at the time the batch was scheduled, the kafka >>>>> leader reported the ending offset was 221572238, but during >>>>> processing, kafka stopped returning messages before reaching that ending >>>>> offset. >>>>> >>>>> That probably means something got screwed up with Kafka - e.g. you >>>>> lost a leader and lost messages in the process. >>>>> >>>>> On Mon, Nov 23, 2015 at 12:57 PM, swetha <swethakasire...@gmail.com> >>>>> wrote: >>>>> >>>>>> Hi, >>>>>> >>>>>> I see the following error in my Spark Kafka Direct. Would this mean >>>>>> that >>>>>> Kafka Direct is not able to catch up with the messages and is failing? >>>>>> >>>>>> Job aborted due to stage failure: Task 20 in stage 117.0 failed 4 >>>>>> times, >>>>>> most recent failure: Lost task 20.3 in stage 117.0 (TID 2114, >>>>>> 10.227.64.52): >>>>>> java.lang.AssertionError: assertion failed: Ran out of messages before >>>>>> reaching ending offset 221572238 for topic hubble_stream partition 88 >>>>>> start >>>>>> 221563725. This should not happen, and indicates that messages may >>>>>> have been >>>>>> lost >>>>>> >>>>>> Thanks, >>>>>> Swetha >>>>>> >>>>>> >>>>>> >>>>>> -- >>>>>> View this message in context: >>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Kafka-Direct-Error-tp25454.html >>>>>> Sent from the Apache Spark User List mailing list archive at >>>>>> Nabble.com. >>>>>> >>>>>> --------------------------------------------------------------------- >>>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >>>>>> For additional commands, e-mail: user-h...@spark.apache.org >>>>>> >>>>>> >>>>> >>>> >>> >> >