HeartSaVioR commented on a change in pull request #22138: [SPARK-25151][SS] 
Apply Apache Commons Pool to KafkaDataConsumer
URL: https://github.com/apache/spark/pull/22138#discussion_r317610953
 
 

 ##########
 File path: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala
 ##########
 @@ -445,197 +529,68 @@ private[kafka010] case class InternalKafkaConsumer(
    * Throw an exception or log a warning as per `failOnDataLoss`.
    */
   private def reportDataLoss(
+      topicPartition: TopicPartition,
+      groupId: String,
       failOnDataLoss: Boolean,
       message: String,
       cause: Throwable = null): Unit = {
-    val finalMessage = s"$message ${additionalMessage(failOnDataLoss)}"
+    val finalMessage = s"$message ${additionalMessage(topicPartition, groupId, 
failOnDataLoss)}"
     reportDataLoss0(failOnDataLoss, finalMessage, cause)
   }
 
-  def close(): Unit = consumer.close()
-
-  private def seek(offset: Long): Unit = {
-    logDebug(s"Seeking to $groupId $topicPartition $offset")
-    consumer.seek(topicPartition, offset)
-  }
-
-  /**
-   * Poll messages from Kafka starting from `offset` and update `fetchedData`. 
`fetchedData` may be
-   * empty if the Kafka consumer fetches some messages but all of them are not 
visible messages
-   * (either transaction messages, or aborted messages when `isolation.level` 
is `read_committed`).
-   *
-   * @throws OffsetOutOfRangeException if `offset` is out of range.
-   * @throws TimeoutException if the consumer position is not changed after 
polling. It means the
-   *                          consumer polls nothing before timeout.
-   */
-  private def fetchData(offset: Long, pollTimeoutMs: Long): Unit = {
-    // Seek to the offset because we may call seekToBeginning or seekToEnd 
before this.
-    seek(offset)
-    val p = consumer.poll(pollTimeoutMs)
-    val r = p.records(topicPartition)
-    logDebug(s"Polled $groupId ${p.partitions()}  ${r.size}")
-    val offsetAfterPoll = consumer.position(topicPartition)
-    logDebug(s"Offset changed from $offset to $offsetAfterPoll after polling")
-    fetchedData.withNewPoll(r.listIterator, offsetAfterPoll)
-    if (!fetchedData.hasNext) {
-      // We cannot fetch anything after `poll`. Two possible cases:
-      // - `offset` is out of range so that Kafka returns nothing. 
`OffsetOutOfRangeException` will
-      //   be thrown.
-      // - Cannot fetch any data before timeout. `TimeoutException` will be 
thrown.
-      // - Fetched something but all of them are not invisible. This is a 
valid case and let the
-      //   caller handles this.
-      val range = getAvailableOffsetRange()
-      if (offset < range.earliest || offset >= range.latest) {
-        throw new OffsetOutOfRangeException(
-          Map(topicPartition -> java.lang.Long.valueOf(offset)).asJava)
-      } else if (offset == offsetAfterPoll) {
-        throw new TimeoutException(
-          s"Cannot fetch record for offset $offset in $pollTimeoutMs 
milliseconds")
-      }
-    }
+  private def runUninterruptiblyIfPossible[T](body: => T): T = 
Thread.currentThread match {
+    case ut: UninterruptibleThread =>
+      ut.runUninterruptibly(body)
+    case _ =>
+      logWarning("KafkaDataConsumer is not running in UninterruptibleThread. " 
+
+        "It may hang when KafkaDataConsumer's methods are interrupted because 
of KAFKA-1894")
+      body
   }
 }
 
-
 private[kafka010] object KafkaDataConsumer extends Logging {
+  val UNKNOWN_OFFSET = -2L
 
   case class AvailableOffsetRange(earliest: Long, latest: Long)
 
-  private case class CachedKafkaDataConsumer(internalConsumer: 
InternalKafkaConsumer)
-    extends KafkaDataConsumer {
-    assert(internalConsumer.inUse) // make sure this has been set to true
-    override def release(): Unit = { 
KafkaDataConsumer.release(internalConsumer) }
-  }
-
-  private case class NonCachedKafkaDataConsumer(internalConsumer: 
InternalKafkaConsumer)
-    extends KafkaDataConsumer {
-    override def release(): Unit = { internalConsumer.close() }
-  }
-
-  private case class CacheKey(groupId: String, topicPartition: TopicPartition) 
{
+  case class CacheKey(groupId: String, topicPartition: TopicPartition) {
     def this(topicPartition: TopicPartition, kafkaParams: ju.Map[String, 
Object]) =
       
this(kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String], 
topicPartition)
   }
 
-  // This cache has the following important properties.
-  // - We make a best-effort attempt to maintain the max size of the cache as 
configured capacity.
-  //   The capacity is not guaranteed to be maintained, especially when there 
are more active
-  //   tasks simultaneously using consumers than the capacity.
-  private lazy val cache = {
-    val conf = SparkEnv.get.conf
-    val capacity = conf.get(CONSUMER_CACHE_CAPACITY)
-    new ju.LinkedHashMap[CacheKey, InternalKafkaConsumer](capacity, 0.75f, 
true) {
-      override def removeEldestEntry(
-        entry: ju.Map.Entry[CacheKey, InternalKafkaConsumer]): Boolean = {
-
-        // Try to remove the least-used entry if its currently not in use.
-        //
-        // If you cannot remove it, then the cache will keep growing. In the 
worst case,
-        // the cache will grow to the max number of concurrent tasks that can 
run in the executor,
-        // (that is, number of tasks slots) after which it will never reduce. 
This is unlikely to
-        // be a serious problem because an executor with more than 64 
(default) tasks slots is
-        // likely running on a beefy machine that can handle a large number of 
simultaneously
-        // active consumers.
-
-        if (!entry.getValue.inUse && this.size > capacity) {
-          logWarning(
-            s"KafkaConsumer cache hitting max capacity of $capacity, " +
-              s"removing consumer for ${entry.getKey}")
-          try {
-            entry.getValue.close()
-          } catch {
-            case e: SparkException =>
-              logError(s"Error closing earliest Kafka consumer for 
${entry.getKey}", e)
-          }
-          true
-        } else {
-          false
-        }
-      }
+  private val consumerPool = InternalKafkaConsumerPool.build
+  private val fetchedDataPool = FetchedDataPool.build
+
+  ShutdownHookManager.addShutdownHook { () =>
+    try {
+      fetchedDataPool.shutdown()
+      consumerPool.close()
+    } catch {
+      case e: Throwable =>
+        logWarning("Ignoring Exception while shutting down pools from shutdown 
hook", e)
     }
   }
 
   /**
-   * Get a cached consumer for groupId, assigned to topic and partition.
+   * Get a data reader for groupId, assigned to topic and partition.
    * If matching consumer doesn't already exist, will be created using 
kafkaParams.
-   * The returned consumer must be released explicitly using 
[[KafkaDataConsumer.release()]].
-   *
-   * Note: This method guarantees that the consumer returned is not currently 
in use by any one
-   * else. Within this guarantee, this method will make a best effort attempt 
to re-use consumers by
-   * caching them and tracking when they are in use.
+   * The returned data reader must be released explicitly.
    */
   def acquire(
       topicPartition: TopicPartition,
-      kafkaParams: ju.Map[String, Object],
-      useCache: Boolean): KafkaDataConsumer = synchronized {
-    val key = new CacheKey(topicPartition, kafkaParams)
-    val existingInternalConsumer = cache.get(key)
-
-    lazy val newInternalConsumer = new InternalKafkaConsumer(topicPartition, 
kafkaParams)
-
+      kafkaParams: ju.Map[String, Object]): KafkaDataConsumer = {
     if (TaskContext.get != null && TaskContext.get.attemptNumber >= 1) {
-      // If this is reattempt at running the task, then invalidate cached 
consumer if any and
-      // start with a new one.
-      if (existingInternalConsumer != null) {
-        // Consumer exists in cache. If its in use, mark it for closing later, 
or close it now.
-        if (existingInternalConsumer.inUse) {
-          existingInternalConsumer.markedForClose = true
-        } else {
-          existingInternalConsumer.close()
-        }
-      }
-      cache.remove(key)  // Invalidate the cache in any case
-      NonCachedKafkaDataConsumer(newInternalConsumer)
-
-    } else if (!useCache) {
-      // If planner asks to not reuse consumers, then do not use it, return a 
new consumer
-      NonCachedKafkaDataConsumer(newInternalConsumer)
+      // If this is reattempt at running the task, then invalidate cached 
consumer if any.
 
-    } else if (existingInternalConsumer == null) {
-      // If consumer is not already cached, then put a new in the cache and 
return it
-      cache.put(key, newInternalConsumer)
-      newInternalConsumer.inUse = true
-      CachedKafkaDataConsumer(newInternalConsumer)
+      val cacheKey = new CacheKey(topicPartition, kafkaParams)
 
-    } else if (existingInternalConsumer.inUse) {
-      // If consumer is already cached but is currently in use, then return a 
new consumer
-      NonCachedKafkaDataConsumer(newInternalConsumer)
-
-    } else {
-      // If consumer is already cached and is currently not in use, then 
return that consumer
-      existingInternalConsumer.inUse = true
-      CachedKafkaDataConsumer(existingInternalConsumer)
+      // invalidate all fetched data for the key
+      // sadly we can't pinpoint specific data and invalidate cause we don't 
have unique id
+      fetchedDataPool.invalidate(cacheKey)
 
 Review comment:
   OK let's follow up #25582 and see whether the tests pass with current master 
(we may want to verify this first) and also pass with current patch.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to