gaborgsomogyi commented on a change in pull request #22138: [SPARK-25151][SS] Apply Apache Commons Pool to KafkaDataConsumer URL: https://github.com/apache/spark/pull/22138#discussion_r317597605
########## File path: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala ########## @@ -445,197 +529,68 @@ private[kafka010] case class InternalKafkaConsumer( * Throw an exception or log a warning as per `failOnDataLoss`. */ private def reportDataLoss( + topicPartition: TopicPartition, + groupId: String, failOnDataLoss: Boolean, message: String, cause: Throwable = null): Unit = { - val finalMessage = s"$message ${additionalMessage(failOnDataLoss)}" + val finalMessage = s"$message ${additionalMessage(topicPartition, groupId, failOnDataLoss)}" reportDataLoss0(failOnDataLoss, finalMessage, cause) } - def close(): Unit = consumer.close() - - private def seek(offset: Long): Unit = { - logDebug(s"Seeking to $groupId $topicPartition $offset") - consumer.seek(topicPartition, offset) - } - - /** - * Poll messages from Kafka starting from `offset` and update `fetchedData`. `fetchedData` may be - * empty if the Kafka consumer fetches some messages but all of them are not visible messages - * (either transaction messages, or aborted messages when `isolation.level` is `read_committed`). - * - * @throws OffsetOutOfRangeException if `offset` is out of range. - * @throws TimeoutException if the consumer position is not changed after polling. It means the - * consumer polls nothing before timeout. - */ - private def fetchData(offset: Long, pollTimeoutMs: Long): Unit = { - // Seek to the offset because we may call seekToBeginning or seekToEnd before this. - seek(offset) - val p = consumer.poll(pollTimeoutMs) - val r = p.records(topicPartition) - logDebug(s"Polled $groupId ${p.partitions()} ${r.size}") - val offsetAfterPoll = consumer.position(topicPartition) - logDebug(s"Offset changed from $offset to $offsetAfterPoll after polling") - fetchedData.withNewPoll(r.listIterator, offsetAfterPoll) - if (!fetchedData.hasNext) { - // We cannot fetch anything after `poll`. Two possible cases: - // - `offset` is out of range so that Kafka returns nothing. `OffsetOutOfRangeException` will - // be thrown. - // - Cannot fetch any data before timeout. `TimeoutException` will be thrown. - // - Fetched something but all of them are not invisible. This is a valid case and let the - // caller handles this. - val range = getAvailableOffsetRange() - if (offset < range.earliest || offset >= range.latest) { - throw new OffsetOutOfRangeException( - Map(topicPartition -> java.lang.Long.valueOf(offset)).asJava) - } else if (offset == offsetAfterPoll) { - throw new TimeoutException( - s"Cannot fetch record for offset $offset in $pollTimeoutMs milliseconds") - } - } + private def runUninterruptiblyIfPossible[T](body: => T): T = Thread.currentThread match { + case ut: UninterruptibleThread => + ut.runUninterruptibly(body) + case _ => + logWarning("KafkaDataConsumer is not running in UninterruptibleThread. " + + "It may hang when KafkaDataConsumer's methods are interrupted because of KAFKA-1894") + body } } - private[kafka010] object KafkaDataConsumer extends Logging { + val UNKNOWN_OFFSET = -2L case class AvailableOffsetRange(earliest: Long, latest: Long) - private case class CachedKafkaDataConsumer(internalConsumer: InternalKafkaConsumer) - extends KafkaDataConsumer { - assert(internalConsumer.inUse) // make sure this has been set to true - override def release(): Unit = { KafkaDataConsumer.release(internalConsumer) } - } - - private case class NonCachedKafkaDataConsumer(internalConsumer: InternalKafkaConsumer) - extends KafkaDataConsumer { - override def release(): Unit = { internalConsumer.close() } - } - - private case class CacheKey(groupId: String, topicPartition: TopicPartition) { + case class CacheKey(groupId: String, topicPartition: TopicPartition) { def this(topicPartition: TopicPartition, kafkaParams: ju.Map[String, Object]) = this(kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String], topicPartition) } - // This cache has the following important properties. - // - We make a best-effort attempt to maintain the max size of the cache as configured capacity. - // The capacity is not guaranteed to be maintained, especially when there are more active - // tasks simultaneously using consumers than the capacity. - private lazy val cache = { - val conf = SparkEnv.get.conf - val capacity = conf.get(CONSUMER_CACHE_CAPACITY) - new ju.LinkedHashMap[CacheKey, InternalKafkaConsumer](capacity, 0.75f, true) { - override def removeEldestEntry( - entry: ju.Map.Entry[CacheKey, InternalKafkaConsumer]): Boolean = { - - // Try to remove the least-used entry if its currently not in use. - // - // If you cannot remove it, then the cache will keep growing. In the worst case, - // the cache will grow to the max number of concurrent tasks that can run in the executor, - // (that is, number of tasks slots) after which it will never reduce. This is unlikely to - // be a serious problem because an executor with more than 64 (default) tasks slots is - // likely running on a beefy machine that can handle a large number of simultaneously - // active consumers. - - if (!entry.getValue.inUse && this.size > capacity) { - logWarning( - s"KafkaConsumer cache hitting max capacity of $capacity, " + - s"removing consumer for ${entry.getKey}") - try { - entry.getValue.close() - } catch { - case e: SparkException => - logError(s"Error closing earliest Kafka consumer for ${entry.getKey}", e) - } - true - } else { - false - } - } + private val consumerPool = InternalKafkaConsumerPool.build + private val fetchedDataPool = FetchedDataPool.build + + ShutdownHookManager.addShutdownHook { () => + try { + fetchedDataPool.shutdown() + consumerPool.close() + } catch { + case e: Throwable => + logWarning("Ignoring Exception while shutting down pools from shutdown hook", e) } } /** - * Get a cached consumer for groupId, assigned to topic and partition. + * Get a data reader for groupId, assigned to topic and partition. * If matching consumer doesn't already exist, will be created using kafkaParams. - * The returned consumer must be released explicitly using [[KafkaDataConsumer.release()]]. - * - * Note: This method guarantees that the consumer returned is not currently in use by any one - * else. Within this guarantee, this method will make a best effort attempt to re-use consumers by - * caching them and tracking when they are in use. + * The returned data reader must be released explicitly. */ def acquire( topicPartition: TopicPartition, - kafkaParams: ju.Map[String, Object], - useCache: Boolean): KafkaDataConsumer = synchronized { - val key = new CacheKey(topicPartition, kafkaParams) - val existingInternalConsumer = cache.get(key) - - lazy val newInternalConsumer = new InternalKafkaConsumer(topicPartition, kafkaParams) - + kafkaParams: ju.Map[String, Object]): KafkaDataConsumer = { if (TaskContext.get != null && TaskContext.get.attemptNumber >= 1) { - // If this is reattempt at running the task, then invalidate cached consumer if any and - // start with a new one. - if (existingInternalConsumer != null) { - // Consumer exists in cache. If its in use, mark it for closing later, or close it now. - if (existingInternalConsumer.inUse) { - existingInternalConsumer.markedForClose = true - } else { - existingInternalConsumer.close() - } - } - cache.remove(key) // Invalidate the cache in any case - NonCachedKafkaDataConsumer(newInternalConsumer) - - } else if (!useCache) { - // If planner asks to not reuse consumers, then do not use it, return a new consumer - NonCachedKafkaDataConsumer(newInternalConsumer) + // If this is reattempt at running the task, then invalidate cached consumer if any. - } else if (existingInternalConsumer == null) { - // If consumer is not already cached, then put a new in the cache and return it - cache.put(key, newInternalConsumer) - newInternalConsumer.inUse = true - CachedKafkaDataConsumer(newInternalConsumer) + val cacheKey = new CacheKey(topicPartition, kafkaParams) - } else if (existingInternalConsumer.inUse) { - // If consumer is already cached but is currently in use, then return a new consumer - NonCachedKafkaDataConsumer(newInternalConsumer) - - } else { - // If consumer is already cached and is currently not in use, then return that consumer - existingInternalConsumer.inUse = true - CachedKafkaDataConsumer(existingInternalConsumer) + // invalidate all fetched data for the key + // sadly we can't pinpoint specific data and invalidate cause we don't have unique id + fetchedDataPool.invalidate(cacheKey) Review comment: I've filed https://github.com/apache/spark/pull/25582 to fill this gap and we can make sure this still works. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org