I see the assertion error when I compare the offset ranges as shown below.
How do I log the offset for each message?


kafkaStream.transform { rdd =>
  // Get the offset ranges in the RDD
  offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
  rdd
}.foreachRDD { rdd =>
  for (o <- offsetRanges) {
    LOGGER.info(s"${o.topic} ${o.partition} ${o.fromOffset}
${o.untilOffset}"+"Queried offsets")
  }
 val collected = rdd.mapPartitionsWithIndex { (i, iter) =>
    // For each partition, get size of the range in the partition,
    // and the number of items in the partition
    val off = offsetRanges(i)
    val all = iter.toSeq
    val partSize = all.size
    val rangeSize = off.untilOffset - off.fromOffset
    Iterator((partSize, rangeSize))
  }.collect*/

  // Verify whether number of elements in each partition
  // matches with the corresponding offset range
  collected.foreach { case (partSize, rangeSize) =>
    assert(partSize == rangeSize, "offset ranges are wrong")
  }
}


On Tue, Nov 24, 2015 at 8:33 AM, Cody Koeninger <c...@koeninger.org> wrote:

> Anything's possible, but that sounds pretty unlikely to me.
> Are the partitions it's failing for all on the same leader?
> Have there been any leader rebalances?
> Do you have enough log retention?
> If you log the offset for each message as it's processed, when do you see
> the problem?
>
> On Tue, Nov 24, 2015 at 10:28 AM, swetha kasireddy <
> swethakasire...@gmail.com> wrote:
>
>> Is it possible that the kafka offset api is somehow returning the wrong
>> offsets. Because each time the job fails for different partitions with an
>> error similar to the error that I get below.
>>
>> Job aborted due to stage failure: Task 20 in stage 117.0 failed 4 times,
>> most recent failure: Lost task 20.3 in stage 117.0 (TID 2114,
>> 10.227.64.52):
>> java.lang.AssertionError: assertion failed: Ran out of messages before
>> reaching ending offset 221572238 for topic hubble_stream partition 88
>> start
>> 221563725. This should not happen, and indicates that messages may have
>> been
>> lost
>>
>> On Tue, Nov 24, 2015 at 6:31 AM, Cody Koeninger <c...@koeninger.org>
>> wrote:
>>
>>> No, the direct stream only communicates with Kafka brokers, not
>>> Zookeeper directly.  It asks the leader for each topicpartition what the
>>> highest available offsets are, using the Kafka offset api.
>>>
>>> On Mon, Nov 23, 2015 at 11:36 PM, swetha kasireddy <
>>> swethakasire...@gmail.com> wrote:
>>>
>>>> Does Kafka direct query the offsets from the zookeeper directly? From
>>>> where does it get the offsets? There is data in those offsets, but somehow
>>>> Kafka Direct does not seem to pick it up. Other Consumers that use Zoo
>>>> Keeper Quorum of that Stream seems to be fine. Only Kafka Direct seems to
>>>> have issues. How does Kafka Direct know which offsets to query after
>>>> getting the initial batches from  "auto.offset.reset" -> "largest"?
>>>>
>>>> On Mon, Nov 23, 2015 at 11:01 AM, Cody Koeninger <c...@koeninger.org>
>>>> wrote:
>>>>
>>>>> No, that means that at the time the batch was scheduled, the kafka
>>>>> leader reported the ending offset was 221572238, but during
>>>>> processing, kafka stopped returning messages before reaching that ending
>>>>> offset.
>>>>>
>>>>> That probably means something got screwed up with Kafka - e.g. you
>>>>> lost a leader and lost messages in the process.
>>>>>
>>>>> On Mon, Nov 23, 2015 at 12:57 PM, swetha <swethakasire...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> I see the following error in my Spark Kafka Direct. Would this mean
>>>>>> that
>>>>>> Kafka Direct is not able to catch up with the messages and is failing?
>>>>>>
>>>>>> Job aborted due to stage failure: Task 20 in stage 117.0 failed 4
>>>>>> times,
>>>>>> most recent failure: Lost task 20.3 in stage 117.0 (TID 2114,
>>>>>> 10.227.64.52):
>>>>>> java.lang.AssertionError: assertion failed: Ran out of messages before
>>>>>> reaching ending offset 221572238 for topic hubble_stream partition 88
>>>>>> start
>>>>>> 221563725. This should not happen, and indicates that messages may
>>>>>> have been
>>>>>> lost
>>>>>>
>>>>>> Thanks,
>>>>>> Swetha
>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> View this message in context:
>>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Kafka-Direct-Error-tp25454.html
>>>>>> Sent from the Apache Spark User List mailing list archive at
>>>>>> Nabble.com.
>>>>>>
>>>>>> ---------------------------------------------------------------------
>>>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>>>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to