Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/19611#discussion_r147826631 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala --- @@ -297,17 +297,41 @@ private[state] class HDFSBackedStateStoreProvider extends StateStoreProvider wit /** Load the required version of the map data from the backing files */ private def loadMap(version: Long): MapType = { - if (version <= 0) return new MapType - synchronized { loadedMaps.get(version) }.getOrElse { - val mapFromFile = readSnapshotFile(version).getOrElse { - val prevMap = loadMap(version - 1) - val newMap = new MapType(prevMap) - updateFromDeltaFile(version, newMap) - newMap + + // Shortcut if the map for this version is already there to avoid a redundant put. + val currentVersionMap = + synchronized { loadedMaps.get(version) }.orElse(readSnapshotFile(version)) + if (currentVersionMap.isDefined) { + return currentVersionMap.get + } + + + // Find the most recent map before this version that we can. + // [SPARK-22305] This must be done iteratively to avoid stack overflow. + var lastAvailableVersion = version + var lastAvailableMap: Option[MapType] = None + while (lastAvailableMap.isEmpty) { + lastAvailableVersion -= 1 + + if (lastAvailableVersion <= 0) { + // Use an empty map for versions 0 or less. + lastAvailableMap = Some(new MapType) + } else { + lastAvailableMap = + synchronized { loadedMaps.get(lastAvailableVersion) } + .orElse(readSnapshotFile(lastAvailableVersion)) } - loadedMaps.put(version, mapFromFile) - mapFromFile } + + // Load all the deltas from the version after the last available one up to the target version. + // The last available version is the one with a full snapshot, so it doesn't need deltas. + val resultMap = lastAvailableMap.get + for (deltaVersion <- lastAvailableVersion + 1 to version) { + updateFromDeltaFile(deltaVersion, resultMap) + } + + loadedMaps.put(version, resultMap) --- End diff -- `loadedMaps.put(version, resultMap)` -> `synchronized { loadedMaps.put(version, resultMap) }` This is a different issue but since you are touching this, it's better to fix it as well.
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org