Yuval Itzchakov created SPARK-21873: ---------------------------------------
Summary: CachedKafkaConsumer throws NonLocalReturnControl during fetching from Kafka Key: SPARK-21873 URL: https://issues.apache.org/jira/browse/SPARK-21873 Project: Spark Issue Type: Bug Components: Structured Streaming Affects Versions: 2.2.0, 2.1.1, 2.1.0 Reporter: Yuval Itzchakov Priority: Minor In Scala, using `return` inside a function causes a `NonLocalReturnControl` exception to be thrown and caught in order to escape the current scope. While profiling Structured Streaming in production, it clearly shows: !https://user-images.githubusercontent.com/3448320/29743366-4149ef78-8a99-11e7-94d6-f0cbb691134a.png! This happens during a 1 minute profiling session on a single executor. The code is: {code:scala} while (toFetchOffset != UNKNOWN_OFFSET) { try { return fetchData(toFetchOffset, untilOffset, pollTimeoutMs, failOnDataLoss) } catch { case e: OffsetOutOfRangeException => // When there is some error thrown, it's better to use a new consumer to drop all cached // states in the old consumer. We don't need to worry about the performance because this // is not a common path. resetConsumer() reportDataLoss(failOnDataLoss, s"Cannot fetch offset $toFetchOffset", e) toFetchOffset = getEarliestAvailableOffsetBetween(toFetchOffset, untilOffset) } } {code} This happens because this method is converted to a function which is ran inside: {code:scala} private def runUninterruptiblyIfPossible[T](body: => T): T {code} We should avoid using `return` in general, and here specifically as it is a hot path for applications using Kafka. -- This message was sent by Atlassian JIRA (v6.4.14#64029) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org