Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21700#discussion_r202933053 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala --- @@ -270,11 +273,42 @@ private[state] class HDFSBackedStateStoreProvider extends StateStoreProvider wit } else Iterator.empty } + /** This method is intended to be only used for unit test(s). DO NOT TOUCH ELEMENTS IN MAP! */ + private[state] def getClonedLoadedMaps(): util.SortedMap[Long, MapType] = synchronized { + // shallow copy as a minimal guard + loadedMaps.clone().asInstanceOf[util.SortedMap[Long, MapType]] + } + + private def putStateIntoStateCache(newVersion: Long, map: MapType): Unit = synchronized { + if (numberOfVersionsToRetainInMemory <= 0) { + if (loadedMaps.size() > 0) loadedMaps.clear() + return + } + + while (loadedMaps.size() > numberOfVersionsToRetainInMemory) { + loadedMaps.remove(loadedMaps.lastKey()) + } + + val size = loadedMaps.size() + if (size == numberOfVersionsToRetainInMemory) { + val versionIdForLastKey = loadedMaps.lastKey() + if (versionIdForLastKey > newVersion) { + // this is the only case which put doesn't need --- End diff -- Will update the comment to clarify a bit more. We just avoid the case when the element is being added to the last and required to be evicted right away.
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org