Github user lw-lin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/11645#discussion_r55794512
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
 ---
    @@ -0,0 +1,462 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.execution.streaming.state
    +
    +import java.util.{Timer, TimerTask}
    +
    +import scala.collection.mutable
    +import scala.reflect.ClassTag
    +import scala.util.Random
    +import scala.util.control.NonFatal
    +
    +import org.apache.hadoop.conf.Configuration
    +import org.apache.hadoop.fs.{FileStatus, Path}
    +
    +import org.apache.spark.{Logging, SparkConf, SparkEnv}
    +import org.apache.spark.serializer.{DeserializationStream, KryoSerializer, 
SerializationStream}
    +import org.apache.spark.sql.catalyst.InternalRow
    +import org.apache.spark.sql.catalyst.expressions.JoinedRow
    +import org.apache.spark.util.{CompletionIterator, RpcUtils, Utils}
    +
    +case class StateStoreId(operatorId: Long, partitionId: Int)
    +
    +private[state] object StateStore extends Logging {
    +
    +  sealed trait Update
    +  case class ValueUpdated(key: InternalRow, value: InternalRow) extends 
Update
    +  case class KeyRemoved(key: InternalRow) extends Update
    +
    +  private val loadedStores = new mutable.HashMap[StateStoreId, 
StateStore]()
    +  private val managementTimer = new Timer("StateStore Timer", true)
    +  @volatile private var managementTask: TimerTask = null
    +
    +  def get(storeId: StateStoreId, directory: String): StateStore = {
    +    val store = loadedStores.synchronized {
    +      startIfNeeded()
    +      loadedStores.getOrElseUpdate(storeId, new StateStore(storeId, 
directory))
    +    }
    +    reportActiveInstance(storeId)
    +    store
    +  }
    +
    +  def clearAll(): Unit = loadedStores.synchronized {
    +    loadedStores.clear()
    +    if (managementTask != null) {
    +      managementTask.cancel()
    +      managementTask = null
    +    }
    +  }
    +
    +  private def remove(storeId: StateStoreId): Unit = {
    +    loadedStores.remove(storeId)
    +  }
    +
    +  private def reportActiveInstance(storeId: StateStoreId): Unit = {
    +    val host = SparkEnv.get.blockManager.blockManagerId.host
    +    val executorId = SparkEnv.get.blockManager.blockManagerId.executorId
    +    askCoordinator[Boolean](ReportActiveInstance(storeId, host, 
executorId))
    +  }
    +
    +  private def verifyIfInstanceActive(storeId: StateStoreId): Boolean = {
    +    val executorId = SparkEnv.get.blockManager.blockManagerId.executorId
    +    askCoordinator[Boolean](VerifyIfInstanceActive(storeId, 
executorId)).getOrElse(false)
    +  }
    +
    +  private def askCoordinator[T: ClassTag](message: 
StateStoreCoordinatorMessage): Option[T] = {
    +    try {
    +      val env = SparkEnv.get
    +      if (env != null) {
    +        val coordinatorRef = 
RpcUtils.makeDriverRef("StateStoreCoordinator", env.conf, env.rpcEnv)
    +        Some(coordinatorRef.askWithRetry[T](message))
    +      } else {
    +        None
    +      }
    +    } catch {
    +      case NonFatal(e) =>
    +        clearAll()
    +        None
    +    }
    +  }
    +
    +  private def startIfNeeded(): Unit = loadedStores.synchronized {
    +    if (managementTask == null) {
    +      managementTask = new TimerTask {
    +        override def run(): Unit = { manageFiles() }
    +      }
    +      managementTimer.schedule(managementTask, 10000, 10000)
    +    }
    +  }
    +
    +  private def manageFiles(): Unit = {
    +    loadedStores.synchronized { loadedStores.values.toSeq }.foreach { 
store =>
    +      try {
    +        store.manageFiles()
    +      } catch {
    +        case NonFatal(e) =>
    +          logWarning(s"Error performing snapshot and cleaning up store 
${store.id}")
    +      }
    +    }
    +  }
    +}
    +
    +private[sql] class StateStore(
    +    val id: StateStoreId,
    +    val directory: String,
    +    numBatchesToRetain: Int = 2,
    +    maxDeltaChainForSnapshots: Int = 10
    +  ) extends Logging {
    +  type MapType = mutable.HashMap[InternalRow, InternalRow]
    +
    +  import StateStore._
    +
    +  private val loadedMaps = new mutable.HashMap[Long, MapType]
    +  private val baseDir = new Path(directory, 
s"${id.operatorId}/${id.partitionId.toString}")
    +  private val fs = baseDir.getFileSystem(new Configuration())
    +  private val serializer = new KryoSerializer(new SparkConf)
    +
    +  @volatile private var uncommittedDelta: UncommittedUpdates = null
    +
    +  initialize()
    +
    +  /**
    +   * Prepare for updates to create a new `version` of the map. The store 
ensure that updates
    +   * are made on the `version - 1` of the store data. If `version` already 
exists, it will
    +   * be overwritten when the updates are committed.
    +   */
    +  private[state] def prepareForUpdates(version: Long): Unit = synchronized 
{
    +    require(version >= 0)
    +    if (uncommittedDelta != null) {
    +      cancelUpdates()
    +    }
    +    val newMap = new MapType()
    +    if (version > 0) {
    +      val oldMap = loadMap(version - 1)
    +      newMap ++= oldMap
    +    }
    +    uncommittedDelta = new UncommittedUpdates(version, newMap)
    +  }
    +
    --- End diff --
    
    Missing `newVersion()`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to