Github user srowen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20572#discussion_r170277915
  
    --- Diff: 
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala
 ---
    @@ -172,57 +187,138 @@ private[spark] class KafkaRDD[K, V](
     
       override def compute(thePart: Partition, context: TaskContext): 
Iterator[ConsumerRecord[K, V]] = {
         val part = thePart.asInstanceOf[KafkaRDDPartition]
    -    assert(part.fromOffset <= part.untilOffset, errBeginAfterEnd(part))
    +    require(part.fromOffset <= part.untilOffset, errBeginAfterEnd(part))
         if (part.fromOffset == part.untilOffset) {
           logInfo(s"Beginning offset ${part.fromOffset} is the same as ending 
offset " +
             s"skipping ${part.topic} ${part.partition}")
           Iterator.empty
         } else {
    -      new KafkaRDDIterator(part, context)
    +      logInfo(s"Computing topic ${part.topic}, partition ${part.partition} 
" +
    +        s"offsets ${part.fromOffset} -> ${part.untilOffset}")
    +      if (compacted) {
    +        new CompactedKafkaRDDIterator[K, V](
    +          part,
    +          context,
    +          kafkaParams,
    +          useConsumerCache,
    +          pollTimeout,
    +          cacheInitialCapacity,
    +          cacheMaxCapacity,
    +          cacheLoadFactor
    +        )
    +      } else {
    +        new KafkaRDDIterator[K, V](
    +          part,
    +          context,
    +          kafkaParams,
    +          useConsumerCache,
    +          pollTimeout,
    +          cacheInitialCapacity,
    +          cacheMaxCapacity,
    +          cacheLoadFactor
    +        )
    +      }
         }
       }
    +}
     
    -  /**
    -   * An iterator that fetches messages directly from Kafka for the offsets 
in partition.
    -   * Uses a cached consumer where possible to take advantage of prefetching
    -   */
    -  private class KafkaRDDIterator(
    -      part: KafkaRDDPartition,
    -      context: TaskContext) extends Iterator[ConsumerRecord[K, V]] {
    -
    -    logInfo(s"Computing topic ${part.topic}, partition ${part.partition} " 
+
    -      s"offsets ${part.fromOffset} -> ${part.untilOffset}")
    -
    -    val groupId = 
kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String]
    +/**
    + * An iterator that fetches messages directly from Kafka for the offsets 
in partition.
    + * Uses a cached consumer where possible to take advantage of prefetching
    + */
    +private class KafkaRDDIterator[K, V](
    +  part: KafkaRDDPartition,
    +  context: TaskContext,
    +  kafkaParams: ju.Map[String, Object],
    +  useConsumerCache: Boolean,
    +  pollTimeout: Long,
    +  cacheInitialCapacity: Int,
    +  cacheMaxCapacity: Int,
    +  cacheLoadFactor: Float
    +) extends Iterator[ConsumerRecord[K, V]] {
    +
    +  val groupId = 
kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String]
    +
    +  context.addTaskCompletionListener{ context => closeIfNeeded() }
    --- End diff --
    
    This could be `...(_ => closeIfNeeded())`


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to