ericm-db commented on code in PR #47490:
URL: https://github.com/apache/spark/pull/47490#discussion_r1746325537


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/OperatorStateMetadata.scala:
##########
@@ -358,3 +358,119 @@ class OperatorStateMetadataV2Reader(
     }
   }
 }
+
+/**
+ * A helper class to manage the metadata files for the operator state 
checkpoint.
+ * This class is used to manage the metadata files for 
OperatorStateMetadataV2, and
+ * provides utils to purge the oldest files such that we only keep the 
metadata files
+ * for which a commit log is present
+ * @param checkpointLocation The root path of the checkpoint directory
+ * @param sparkSession The sparkSession that is used to access the hadoopConf
+ * @param stateStoreWriter The operator that this fileManager is being created 
for
+ */
+class OperatorStateMetadataV2FileManager(
+    checkpointLocation: Path,
+    sparkSession: SparkSession,
+    stateStoreWriter: StateStoreWriter) extends Logging {
+
+  private val hadoopConf = sparkSession.sessionState.newHadoopConf()
+  private val stateCheckpointPath = new Path(checkpointLocation, "state")
+  private val stateOpIdPath = new Path(
+    stateCheckpointPath, stateStoreWriter.getStateInfo.operatorId.toString)
+  private val commitLog =
+    new CommitLog(sparkSession, new Path(checkpointLocation, 
"commits").toString)
+  private val stateSchemaPath = stateStoreWriter.stateSchemaDirPath()
+  private val metadataDirPath = 
OperatorStateMetadataV2.metadataDirPath(stateOpIdPath)
+  private lazy val fm = CheckpointFileManager.create(metadataDirPath, 
hadoopConf)
+
+  protected def isBatchFile(path: Path): Boolean = {
+    try {
+      path.getName.toLong
+      true
+    } catch {
+      case _: NumberFormatException => false
+    }
+  }
+
+  /**
+   * A `PathFilter` to filter only batch files
+   */
+  protected val batchFilesFilter: PathFilter = (path: Path) => 
isBatchFile(path)
+
+  private def pathToBatchId(path: Path): Long = {
+    path.getName.toLong
+  }
+
+  def purgeMetadataFiles(): Unit = {
+    val thresholdBatchId = findThresholdBatchId()
+    if (thresholdBatchId != -1) {
+      val earliestBatchIdKept = deleteMetadataFiles(thresholdBatchId)
+      // we need to delete everything from 0 to (earliestBatchIdKept - 1), 
inclusive
+      deleteSchemaFiles(earliestBatchIdKept - 1)
+    }
+  }
+
+  // We only want to keep the metadata and schema files for which the commit
+  // log is present, so we will delete any file that precedes the batch for 
the oldest
+  // commit log
+  private def findThresholdBatchId(): Long = {
+    commitLog.listBatchesOnDisk.headOption.getOrElse(0L) - 1L
+  }
+
+  private def deleteSchemaFiles(thresholdBatchId: Long): Unit = {
+    val schemaFiles = fm.list(stateSchemaPath).sorted.map(_.getPath)
+
+    if (schemaFiles.length > 1) {
+      val filesBeforeThreshold = schemaFiles.filter { path =>
+        val batchIdInPath = path.getName.split("_").head.toLong
+        batchIdInPath <= thresholdBatchId
+      }
+      filesBeforeThreshold.foreach { path =>
+        fm.delete(path)
+      }
+    }
+  }
+
+  // Deletes all metadata files that are below a thresholdBatchId, except
+  // for the latest metadata file so that we have at least 1 metadata and 
schema
+  // file at all times per each stateful query
+  // Returns the batchId of the earliest schema file we want to keep
+  private def deleteMetadataFiles(thresholdBatchId: Long): Long = {
+    val metadataFiles = fm.list(metadataDirPath, batchFilesFilter)
+
+    if (metadataFiles.isEmpty) {
+      return -1L // No files to delete
+    }
+
+    val sortedBatchIds = metadataFiles.map(file => 
pathToBatchId(file.getPath)).sorted
+    val latestBatchId = sortedBatchIds.last
+    var highestDeletedBatchId = -1L
+
+    metadataFiles.foreach { batchFile =>
+      val batchId = pathToBatchId(batchFile.getPath)
+      if (batchId <= thresholdBatchId && batchId < latestBatchId) {
+        fm.delete(batchFile.getPath)
+        highestDeletedBatchId = math.max(highestDeletedBatchId, batchId)
+      }
+    }
+    // Find the next batch id immediately greater than threshold batchId.
+    // We use this to find the metadata and schema files we want to keep
+    val nextBatchId = sortedBatchIds.find(_ > 
thresholdBatchId).getOrElse(latestBatchId)

Review Comment:
   latestBatchId is the very last batchId of this list



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to