Ngone51 commented on a change in pull request #31495: URL: https://github.com/apache/spark/pull/31495#discussion_r571817596
########## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/OffsetSeqLog.scala ########## @@ -46,6 +47,23 @@ import org.apache.spark.sql.connector.read.streaming.{Offset => OffsetV2} class OffsetSeqLog(sparkSession: SparkSession, path: String) extends HDFSMetadataLog[OffsetSeq](sparkSession, path) { + private val cachedMetadata = new ju.TreeMap[Long, OffsetSeq]() + + override def add(batchId: Long, metadata: OffsetSeq): Boolean = { + val added = super.add(batchId, metadata) + if (added) { + // cache metadata as it will be read again + cachedMetadata.put(batchId, metadata) + // we don't access metadata for (batchId - 2) batches; evict them Review comment: Ok..I saw where the problem is with my test. You're right the latency is trivial. I'm not against your solution here. But since we've reached here, I'd like to mention one more thing that TreeMap tends to produce the instant object `AscendingSubMap` for each batch while Array doesn't. Although, It might also be trivial. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org