Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21506#discussion_r194293481
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
 ---
    @@ -280,38 +278,49 @@ private[state] class HDFSBackedStateStoreProvider 
extends StateStoreProvider wit
         if (loadedCurrentVersionMap.isDefined) {
           return loadedCurrentVersionMap.get
         }
    -    val snapshotCurrentVersionMap = readSnapshotFile(version)
    -    if (snapshotCurrentVersionMap.isDefined) {
    -      synchronized { loadedMaps.put(version, 
snapshotCurrentVersionMap.get) }
    -      return snapshotCurrentVersionMap.get
    -    }
     
    -    // Find the most recent map before this version that we can.
    -    // [SPARK-22305] This must be done iteratively to avoid stack overflow.
    -    var lastAvailableVersion = version
    -    var lastAvailableMap: Option[MapType] = None
    -    while (lastAvailableMap.isEmpty) {
    -      lastAvailableVersion -= 1
    +    logWarning(s"The state for version $version doesn't exist in 
loadedMaps. " +
    +      "Reading snapshot file and delta files if needed..." +
    +      "Note that this is normal for the first batch of starting query.")
     
    -      if (lastAvailableVersion <= 0) {
    -        // Use an empty map for versions 0 or less.
    -        lastAvailableMap = Some(new MapType)
    -      } else {
    -        lastAvailableMap =
    -          synchronized { loadedMaps.get(lastAvailableVersion) }
    -            .orElse(readSnapshotFile(lastAvailableVersion))
    +    val (result, elapsedMs) = Utils.timeTakenMs {
    +      val snapshotCurrentVersionMap = readSnapshotFile(version)
    +      if (snapshotCurrentVersionMap.isDefined) {
    +        synchronized { loadedMaps.put(version, 
snapshotCurrentVersionMap.get) }
    +        return snapshotCurrentVersionMap.get
    +      }
    +
    +      // Find the most recent map before this version that we can.
    +      // [SPARK-22305] This must be done iteratively to avoid stack 
overflow.
    +      var lastAvailableVersion = version
    +      var lastAvailableMap: Option[MapType] = None
    +      while (lastAvailableMap.isEmpty) {
    +        lastAvailableVersion -= 1
    +
    +        if (lastAvailableVersion <= 0) {
    +          // Use an empty map for versions 0 or less.
    +          lastAvailableMap = Some(new MapType)
    +        } else {
    +          lastAvailableMap =
    +            synchronized { loadedMaps.get(lastAvailableVersion) }
    +              .orElse(readSnapshotFile(lastAvailableVersion))
    +        }
    +      }
    +
    +      // Load all the deltas from the version after the last available one 
up to the target version.
    +      // The last available version is the one with a full snapshot, so it 
doesn't need deltas.
    +      val resultMap = new MapType(lastAvailableMap.get)
    +      for (deltaVersion <- lastAvailableVersion + 1 to version) {
    +        updateFromDeltaFile(deltaVersion, resultMap)
           }
    -    }
     
    -    // Load all the deltas from the version after the last available one 
up to the target version.
    -    // The last available version is the one with a full snapshot, so it 
doesn't need deltas.
    -    val resultMap = new MapType(lastAvailableMap.get)
    -    for (deltaVersion <- lastAvailableVersion + 1 to version) {
    -      updateFromDeltaFile(deltaVersion, resultMap)
    +      synchronized { loadedMaps.put(version, resultMap) }
    +      resultMap
         }
     
    -    synchronized { loadedMaps.put(version, resultMap) }
    -    resultMap
    +    logWarning(s"Loading state for $version takes $elapsedMs ms.")
    --- End diff --
    
    I just thought about making a pair between warning message above and this, 
but once we are guiding end users to turn on DEBUG level to see information 
regarding addition latencies, turning this to DEBUG would be also OK.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to