Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21700#discussion_r201866930 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala --- @@ -99,43 +102,84 @@ class StateStoreSuite extends StateStoreSuiteBase[HDFSBackedStateStoreProvider] assert(restoreOriginValues(loadedMaps.get(2L)) === Map("a" -> 2)) assert(restoreOriginValues(loadedMaps.get(1L)) === Map("a" -> 1)) - updateVersionTo(3) + // this trigger exceeding cache and 1 will be evicted + currentVersion = updateVersionTo(provider, currentVersion, 3) assert(getData(provider) === Set("a" -> 3)) loadedMaps = provider.getClonedLoadedMaps() - assert(loadedMaps.size() === 3) + assert(loadedMaps.size() === 2) assert(loadedMaps.firstKey() === 3L) - assert(loadedMaps.lastKey() === 1L) + assert(loadedMaps.lastKey() === 2L) assert(restoreOriginValues(loadedMaps.get(3L)) === Map("a" -> 3)) assert(restoreOriginValues(loadedMaps.get(2L)) === Map("a" -> 2)) + } + + test("failure after committing with MAX_BATCHES_TO_RETAIN_IN_MEMORY set to 1") { + val provider = newStoreProvider(opId = Random.nextInt, partition = 0, + numOfVersToRetainInMemory = 1) + + var currentVersion = 0 + + def restoreOriginValues(map: provider.MapType): Map[String, Int] = { --- End diff -- I've just allowed redundant function definition cause there's no way to use `provider.MapType` in parameter type unless provider is defined. If we really want to get rid of redundant function definition, we may have to change it to ConcurrentMap directly.
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org