Github user srowen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20572#discussion_r170278931
  
    --- Diff: 
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/CachedKafkaConsumer.scala
 ---
    @@ -71,25 +69,62 @@ class CachedKafkaConsumer[K, V] private(
         }
     
         if (!buffer.hasNext()) { poll(timeout) }
    -    assert(buffer.hasNext(),
    +    require(buffer.hasNext(),
           s"Failed to get records for $groupId $topic $partition $offset after 
polling for $timeout")
         var record = buffer.next()
     
         if (record.offset != offset) {
           logInfo(s"Buffer miss for $groupId $topic $partition $offset")
           seek(offset)
           poll(timeout)
    -      assert(buffer.hasNext(),
    +      require(buffer.hasNext(),
             s"Failed to get records for $groupId $topic $partition $offset 
after polling for $timeout")
           record = buffer.next()
    -      assert(record.offset == offset,
    -        s"Got wrong record for $groupId $topic $partition even after 
seeking to offset $offset")
    +      require(record.offset == offset,
    +        s"Got wrong record for $groupId $topic $partition even after 
seeking to offset $offset " +
    +          s"got offset ${record.offset} instead. If this is a compacted 
topic, consider enabling " +
    +          "spark.streaming.kafka.allowNonConsecutiveOffsets"
    +      )
         }
     
         nextOffset = offset + 1
         record
       }
     
    +  /**
    +   * Start a batch on a compacted topic
    +   */
    +  def compactedStart(offset: Long, timeout: Long): Unit = {
    +    logDebug(s"compacted start $groupId $topic $partition starting 
$offset")
    +    // This seek may not be necessary, but it's hard to tell due to gaps 
in compacted topics
    +    if (offset != nextOffset) {
    +      logInfo(s"Initial fetch for compacted $groupId $topic $partition 
$offset")
    +      seek(offset)
    +      poll(timeout)
    +    }
    +  }
    +
    +  /**
    +   * Get the next record in the batch from a compacted topic.
    +   * Assumes compactedStart has been called first, and ignores gaps.
    +   */
    +  def compactedNext(timeout: Long): ConsumerRecord[K, V] = {
    +    if (!buffer.hasNext()) { poll(timeout) }
    --- End diff --
    
    Nit: I'd expand this onto two lines


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to