Github user srowen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20572#discussion_r169490350
  
    --- Diff: 
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala
 ---
    @@ -178,51 +196,128 @@ private[spark] class KafkaRDD[K, V](
             s"skipping ${part.topic} ${part.partition}")
           Iterator.empty
         } else {
    -      new KafkaRDDIterator(part, context)
    +      logInfo(s"Computing topic ${part.topic}, partition ${part.partition} 
" +
    +        s"offsets ${part.fromOffset} -> ${part.untilOffset}")
    +      if (compacted) {
    +        new CompactedKafkaRDDIterator[K, V](
    +          part,
    +          context,
    +          kafkaParams,
    +          useConsumerCache,
    +          pollTimeout,
    +          cacheInitialCapacity,
    +          cacheMaxCapacity,
    +          cacheLoadFactor
    +        )
    +      } else {
    +        new KafkaRDDIterator[K, V](
    +          part,
    +          context,
    +          kafkaParams,
    +          useConsumerCache,
    +          pollTimeout,
    +          cacheInitialCapacity,
    +          cacheMaxCapacity,
    +          cacheLoadFactor
    +        )
    +      }
         }
       }
    +}
     
    -  /**
    -   * An iterator that fetches messages directly from Kafka for the offsets 
in partition.
    -   * Uses a cached consumer where possible to take advantage of prefetching
    -   */
    -  private class KafkaRDDIterator(
    -      part: KafkaRDDPartition,
    -      context: TaskContext) extends Iterator[ConsumerRecord[K, V]] {
    -
    -    logInfo(s"Computing topic ${part.topic}, partition ${part.partition} " 
+
    -      s"offsets ${part.fromOffset} -> ${part.untilOffset}")
    -
    -    val groupId = 
kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String]
    -
    -    context.addTaskCompletionListener{ context => closeIfNeeded() }
    -
    -    val consumer = if (useConsumerCache) {
    -      CachedKafkaConsumer.init(cacheInitialCapacity, cacheMaxCapacity, 
cacheLoadFactor)
    -      if (context.attemptNumber >= 1) {
    -        // just in case the prior attempt failures were cache related
    -        CachedKafkaConsumer.remove(groupId, part.topic, part.partition)
    -      }
    -      CachedKafkaConsumer.get[K, V](groupId, part.topic, part.partition, 
kafkaParams)
    -    } else {
    -      CachedKafkaConsumer.getUncached[K, V](groupId, part.topic, 
part.partition, kafkaParams)
    +/**
    + * An iterator that fetches messages directly from Kafka for the offsets 
in partition.
    + * Uses a cached consumer where possible to take advantage of prefetching
    + */
    +private class KafkaRDDIterator[K, V](
    +  part: KafkaRDDPartition,
    +  context: TaskContext,
    +  kafkaParams: ju.Map[String, Object],
    +  useConsumerCache: Boolean,
    +  pollTimeout: Long,
    +  cacheInitialCapacity: Int,
    +  cacheMaxCapacity: Int,
    +  cacheLoadFactor: Float
    +) extends Iterator[ConsumerRecord[K, V]] {
    +
    +  val groupId = 
kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String]
    +
    +  context.addTaskCompletionListener{ context => closeIfNeeded() }
    +
    +  val consumer = if (useConsumerCache) {
    +    CachedKafkaConsumer.init(cacheInitialCapacity, cacheMaxCapacity, 
cacheLoadFactor)
    +    if (context.attemptNumber >= 1) {
    +      // just in case the prior attempt failures were cache related
    +      CachedKafkaConsumer.remove(groupId, part.topic, part.partition)
         }
    +    CachedKafkaConsumer.get[K, V](groupId, part.topic, part.partition, 
kafkaParams)
    +  } else {
    +    CachedKafkaConsumer.getUncached[K, V](groupId, part.topic, 
part.partition, kafkaParams)
    +  }
     
    -    var requestOffset = part.fromOffset
    +  var requestOffset = part.fromOffset
     
    -    def closeIfNeeded(): Unit = {
    -      if (!useConsumerCache && consumer != null) {
    -        consumer.close
    -      }
    +  def closeIfNeeded(): Unit = {
    +    if (!useConsumerCache && consumer != null) {
    +      consumer.close
    --- End diff --
    
    Just nits here, but I'd write `close()` as it clearly has side effects


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to