Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/22138#discussion_r214917284 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala --- @@ -414,17 +468,37 @@ private[kafka010] case class InternalKafkaConsumer( } } - /** Create a new consumer and reset cached states */ - private def resetConsumer(): Unit = { - consumer.close() - consumer = createConsumer - fetchedData.reset() + /** + * Poll messages from Kafka starting from `offset` and update `fetchedData`. `fetchedData` may be + * empty if the Kafka consumer fetches some messages but all of them are not visible messages + * (either transaction messages, or aborted messages when `isolation.level` is `read_committed`). + * + * @throws OffsetOutOfRangeException if `offset` is out of range. + * @throws TimeoutException if the consumer position is not changed after polling. It means the + * consumer polls nothing before timeout. + */ + private def fetchData(offset: Long, pollTimeoutMs: Long): Unit = { + val (records, offsetAfterPoll) = consumer.fetch(offset, pollTimeoutMs) + fetchedData.withNewPoll(records.listIterator, offsetAfterPoll) + } + + private def ensureConsumerAvailable(): Unit = { + if (consumer == null) { --- End diff -- This is defined as `var` so just to avoid additional wrapping here. Same here as above: if we prefer Option I'm happy to change but not sure about it.
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org