Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21506#discussion_r194293251
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
 ---
    @@ -280,38 +278,49 @@ private[state] class HDFSBackedStateStoreProvider 
extends StateStoreProvider wit
         if (loadedCurrentVersionMap.isDefined) {
           return loadedCurrentVersionMap.get
         }
    -    val snapshotCurrentVersionMap = readSnapshotFile(version)
    -    if (snapshotCurrentVersionMap.isDefined) {
    -      synchronized { loadedMaps.put(version, 
snapshotCurrentVersionMap.get) }
    -      return snapshotCurrentVersionMap.get
    -    }
     
    -    // Find the most recent map before this version that we can.
    -    // [SPARK-22305] This must be done iteratively to avoid stack overflow.
    -    var lastAvailableVersion = version
    -    var lastAvailableMap: Option[MapType] = None
    -    while (lastAvailableMap.isEmpty) {
    -      lastAvailableVersion -= 1
    +    logWarning(s"The state for version $version doesn't exist in 
loadedMaps. " +
    +      "Reading snapshot file and delta files if needed..." +
    +      "Note that this is normal for the first batch of starting query.")
     
    -      if (lastAvailableVersion <= 0) {
    -        // Use an empty map for versions 0 or less.
    -        lastAvailableMap = Some(new MapType)
    -      } else {
    -        lastAvailableMap =
    -          synchronized { loadedMaps.get(lastAvailableVersion) }
    -            .orElse(readSnapshotFile(lastAvailableVersion))
    +    val (result, elapsedMs) = Utils.timeTakenMs {
    --- End diff --
    
    Yup right. Most of the code change is just wrapping codes into timeTakenMs.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to