anishshri-db commented on code in PR #47490:
URL: https://github.com/apache/spark/pull/47490#discussion_r1746151607


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/OperatorStateMetadata.scala:
##########
@@ -358,3 +359,101 @@ class OperatorStateMetadataV2Reader(
     }
   }
 }
+
+/**
+ * A helper class to manage the metadata files for the operator state 
checkpoint.
+ * This class is used to manage the metadata files for 
OperatorStateMetadataV2, and
+ * provides utils to purge the oldest files such that we only keep the 
metadata files
+ * for which a commit log is present
+ * @param stateCheckpointPath The root path of the state checkpoint directory
+ * @param stateSchemaPath The path where the schema files are stored
+ * @param hadoopConf The Hadoop configuration to create the file manager
+ */
+class OperatorStateMetadataV2FileManager(
+    stateCheckpointPath: Path,
+    stateSchemaPath: Path,
+    commitLog: CommitLog,
+    hadoopConf: Configuration) extends Logging {
+
+  private val metadataDirPath = 
OperatorStateMetadataV2.metadataDirPath(stateCheckpointPath)
+  private lazy val fm = CheckpointFileManager.create(metadataDirPath, 
hadoopConf)
+
+  protected def isBatchFile(path: Path): Boolean = {
+    try {
+      path.getName.toLong
+      true
+    } catch {
+      case _: NumberFormatException => false
+    }
+  }
+
+  /**
+   * A `PathFilter` to filter only batch files
+   */
+  protected val batchFilesFilter: PathFilter = (path: Path) => 
isBatchFile(path)
+
+  /** List the available batches on file system. */
+  protected def listBatches: Array[Long] = {
+    val batchIds = fm.list(metadataDirPath, batchFilesFilter)
+      // Batches must be files
+      .filter(f => f.isFile)
+      .map(f => pathToBatchId(f.getPath))
+    logInfo(log"BatchIds found from listing: ${MDC(BATCH_ID, 
batchIds.sorted.mkString(", "))}")
+
+    batchIds.sorted
+  }
+
+  private def pathToBatchId(path: Path): Long = {
+    path.getName.toLong
+  }
+
+  def purgeMetadataFiles(): Unit = {
+    val thresholdBatchId = findThresholdBatchId()
+    if (thresholdBatchId != -1) {
+      deleteSchemaFiles(thresholdBatchId)
+      deleteMetadataFiles(thresholdBatchId)
+    }
+  }
+
+  // We only want to keep the metadata and schema files for which the commit
+  // log is present, so we will delete any file that precedes the batch for 
the oldest
+  // commit log
+  private def findThresholdBatchId(): Long = {
+    commitLog.listBatchesOnDisk.headOption.getOrElse(0L) - 1L
+  }
+
+  private def deleteSchemaFiles(thresholdBatchId: Long): Unit = {
+    val schemaFiles = fm.list(stateSchemaPath).sorted.map(_.getPath)
+
+    if (schemaFiles.length > 1) {

Review Comment:
   Discussed offline - but i think this condition might not be sufficient



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to