Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21506#discussion_r194295068
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
 ---
    @@ -280,38 +278,49 @@ private[state] class HDFSBackedStateStoreProvider 
extends StateStoreProvider wit
         if (loadedCurrentVersionMap.isDefined) {
           return loadedCurrentVersionMap.get
         }
    -    val snapshotCurrentVersionMap = readSnapshotFile(version)
    -    if (snapshotCurrentVersionMap.isDefined) {
    -      synchronized { loadedMaps.put(version, 
snapshotCurrentVersionMap.get) }
    -      return snapshotCurrentVersionMap.get
    -    }
     
    -    // Find the most recent map before this version that we can.
    -    // [SPARK-22305] This must be done iteratively to avoid stack overflow.
    -    var lastAvailableVersion = version
    -    var lastAvailableMap: Option[MapType] = None
    -    while (lastAvailableMap.isEmpty) {
    -      lastAvailableVersion -= 1
    +    logWarning(s"The state for version $version doesn't exist in 
loadedMaps. " +
    +      "Reading snapshot file and delta files if needed..." +
    +      "Note that this is normal for the first batch of starting query.")
     
    -      if (lastAvailableVersion <= 0) {
    -        // Use an empty map for versions 0 or less.
    -        lastAvailableMap = Some(new MapType)
    -      } else {
    -        lastAvailableMap =
    -          synchronized { loadedMaps.get(lastAvailableVersion) }
    -            .orElse(readSnapshotFile(lastAvailableVersion))
    +    val (result, elapsedMs) = Utils.timeTakenMs {
    +      val snapshotCurrentVersionMap = readSnapshotFile(version)
    +      if (snapshotCurrentVersionMap.isDefined) {
    +        synchronized { loadedMaps.put(version, 
snapshotCurrentVersionMap.get) }
    +        return snapshotCurrentVersionMap.get
    +      }
    +
    +      // Find the most recent map before this version that we can.
    +      // [SPARK-22305] This must be done iteratively to avoid stack 
overflow.
    +      var lastAvailableVersion = version
    +      var lastAvailableMap: Option[MapType] = None
    +      while (lastAvailableMap.isEmpty) {
    +        lastAvailableVersion -= 1
    +
    +        if (lastAvailableVersion <= 0) {
    +          // Use an empty map for versions 0 or less.
    +          lastAvailableMap = Some(new MapType)
    +        } else {
    +          lastAvailableMap =
    +            synchronized { loadedMaps.get(lastAvailableVersion) }
    +              .orElse(readSnapshotFile(lastAvailableVersion))
    +        }
    +      }
    +
    +      // Load all the deltas from the version after the last available one 
up to the target version.
    +      // The last available version is the one with a full snapshot, so it 
doesn't need deltas.
    +      val resultMap = new MapType(lastAvailableMap.get)
    +      for (deltaVersion <- lastAvailableVersion + 1 to version) {
    +        updateFromDeltaFile(deltaVersion, resultMap)
           }
    -    }
     
    -    // Load all the deltas from the version after the last available one 
up to the target version.
    -    // The last available version is the one with a full snapshot, so it 
doesn't need deltas.
    -    val resultMap = new MapType(lastAvailableMap.get)
    -    for (deltaVersion <- lastAvailableVersion + 1 to version) {
    -      updateFromDeltaFile(deltaVersion, resultMap)
    +      synchronized { loadedMaps.put(version, resultMap) }
    +      resultMap
         }
     
    -    synchronized { loadedMaps.put(version, resultMap) }
    -    resultMap
    +    logWarning(s"Loading state for $version takes $elapsedMs ms.")
    --- End diff --
    
    Changed log level to DEBUG.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to