Yicong-Huang commented on code in PR #5643: URL: https://github.com/apache/texera/pull/5643#discussion_r3408645419
########## file-service/src/main/scala/org/apache/texera/service/util/StagedFileCleanupJob.scala: ########## @@ -0,0 +1,229 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.service.util + +import com.typesafe.scalalogging.LazyLogging +import io.dropwizard.lifecycle.Managed +import io.lakefs.clients.sdk.ApiException +import io.lakefs.clients.sdk.model.Diff +import org.apache.texera.amber.core.storage.util.LakeFSStorageClient +import org.apache.texera.dao.SqlServer +import org.apache.texera.dao.jooq.generated.tables.Dataset.DATASET +import org.apache.texera.dao.jooq.generated.tables.DatasetUploadSession.DATASET_UPLOAD_SESSION + +import java.time.OffsetDateTime +import java.util.concurrent.{Executors, ScheduledExecutorService, TimeUnit} +import scala.jdk.CollectionConverters._ + +/** + * Summary of one cleanup round. + * + * @param sessionsDeleted Number of abandoned upload session rows deleted. + * @param objectsReset Number of staged (uncommitted) objects reset in LakeFS. + * @param errors Number of failures encountered (each is retried next round). + */ +case class CleanupReport(sessionsDeleted: Int, objectsReset: Int, errors: Int) + +/** + * Periodically cleans up uploaded but uncommitted dataset files: + * 1. Aborts and deletes abandoned multipart upload sessions older than the retention window. + * 2. Resets staged (uncommitted) LakeFS objects older than the retention window, skipping + * objects that belong to still-active upload sessions. + * + * @param retentionHours Age (in hours) after which uncommitted uploads are cleaned up. + * @param intervalMinutes Delay (in minutes) between cleanup rounds. + */ +class StagedFileCleanupJob(retentionHours: Int, intervalMinutes: Int) + extends Managed + with LazyLogging { + + private var executor: ScheduledExecutorService = _ + + override def start(): Unit = { + executor = Executors.newSingleThreadScheduledExecutor((runnable: Runnable) => { + val thread = new Thread(runnable, "staged-file-cleanup") + thread.setDaemon(true) + thread + }) Review Comment: if we do run this on a seperate thread (which I think it is the right way to go), then we have to be careful for the destructive operations fired in this thread will not affect other threads (e.g., creating, uploading dataset). We need to make sure two things - all the DB operations in this thread can be atomic, and won't create a partial state (e.g., by using some DB transactions), but maybe hard to deal with lakefs, AND - if any operation fails and left a partial state, the partial state won't affect other threads, and also the partial state can be continued to cleaned in the next round. More concrete example: if we removed the session record, then deleting on lakefs failed due to some weird reason, it could result in: 1. **Affect other threads:** The next time user wants to create a new dataset with the same name, there is no sessiion record so we allowed user to create, but it always failed because lakefs entry is still there. 2. **Uncleaned partial state:** The lakefs entry will always stay and never be removed, because we don't know there is an untracked lakefs entry as the sessions record is gone. ########## file-service/src/main/scala/org/apache/texera/service/util/StagedFileCleanupJob.scala: ########## @@ -0,0 +1,229 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.service.util + +import com.typesafe.scalalogging.LazyLogging +import io.dropwizard.lifecycle.Managed +import io.lakefs.clients.sdk.ApiException +import io.lakefs.clients.sdk.model.Diff +import org.apache.texera.amber.core.storage.util.LakeFSStorageClient +import org.apache.texera.dao.SqlServer +import org.apache.texera.dao.jooq.generated.tables.Dataset.DATASET +import org.apache.texera.dao.jooq.generated.tables.DatasetUploadSession.DATASET_UPLOAD_SESSION + +import java.time.OffsetDateTime +import java.util.concurrent.{Executors, ScheduledExecutorService, TimeUnit} +import scala.jdk.CollectionConverters._ + +/** + * Summary of one cleanup round. + * + * @param sessionsDeleted Number of abandoned upload session rows deleted. + * @param objectsReset Number of staged (uncommitted) objects reset in LakeFS. + * @param errors Number of failures encountered (each is retried next round). + */ +case class CleanupReport(sessionsDeleted: Int, objectsReset: Int, errors: Int) + +/** + * Periodically cleans up uploaded but uncommitted dataset files: + * 1. Aborts and deletes abandoned multipart upload sessions older than the retention window. + * 2. Resets staged (uncommitted) LakeFS objects older than the retention window, skipping + * objects that belong to still-active upload sessions. + * + * @param retentionHours Age (in hours) after which uncommitted uploads are cleaned up. + * @param intervalMinutes Delay (in minutes) between cleanup rounds. + */ +class StagedFileCleanupJob(retentionHours: Int, intervalMinutes: Int) + extends Managed + with LazyLogging { + + private var executor: ScheduledExecutorService = _ + + override def start(): Unit = { + executor = Executors.newSingleThreadScheduledExecutor((runnable: Runnable) => { + val thread = new Thread(runnable, "staged-file-cleanup") + thread.setDaemon(true) + thread + }) + executor.scheduleWithFixedDelay( + () => { + try { + runCleanupOnce() + } catch { + // An exception must never kill the schedule. + case t: Throwable => logger.error("Staged file cleanup round failed", t) + } + }, + // Small fixed initial delay so a restart doesn't postpone backlog cleanup by up to a + // full interval. + 1L, + intervalMinutes.toLong, + TimeUnit.MINUTES + ) + } + + override def stop(): Unit = { + if (executor != null) { + executor.shutdown() + } + } + + /** + * Runs a single cleanup round. Idempotent: rows/objects already cleaned up are not + * revisited, and failures are retried on the next round. + * + * @param now The reference time used to evaluate the retention window. + * @return Summary counts for this round. + */ + def runCleanupOnce(now: OffsetDateTime = OffsetDateTime.now()): CleanupReport = { + val cutoff = now.minusHours(retentionHours.toLong) + var sessionsDeleted = 0 + var objectsReset = 0 + var errors = 0 + + val ctx = SqlServer.getInstance().createDSLContext() + + // Map each dataset id to its LakeFS repository name (same mapping DatasetResource uses + // via dataset.getRepositoryName). + val repoNameByDid: Map[Integer, String] = ctx + .select(DATASET.DID, DATASET.REPOSITORY_NAME) + .from(DATASET) + .where(DATASET.REPOSITORY_NAME.isNotNull) + .fetch() + .asScala + .map(record => record.get(DATASET.DID) -> record.get(DATASET.REPOSITORY_NAME)) + .toMap Review Comment: ok, as you are operating multiple actions on DB, including deletion, please combine these steps in one transaction: - repoName look up - abort partial upload (I assume this also writes to lakefs's catalog) - delete session record - active path look up - reset uncommitted objects So that any steps failed can abort and roll back, otherwise it may get into a partial state. ########## file-service/src/test/scala/org/apache/texera/service/util/StagedFileCleanupJobSpec.scala: ########## @@ -0,0 +1,716 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.service.util + +import io.lakefs.clients.sdk.ApiException +import org.apache.texera.amber.core.storage.util.LakeFSStorageClient +import org.apache.texera.auth.SessionUser +import org.apache.texera.dao.MockTexeraDB +import org.apache.texera.dao.jooq.generated.enums.UserRoleEnum +import org.apache.texera.dao.jooq.generated.tables.DatasetUploadSession.DATASET_UPLOAD_SESSION +import org.apache.texera.dao.jooq.generated.tables.DatasetUploadSessionPart.DATASET_UPLOAD_SESSION_PART +import org.apache.texera.dao.jooq.generated.tables.daos.{DatasetDao, UserDao} +import org.apache.texera.dao.jooq.generated.tables.pojos.{Dataset, User} +import org.apache.texera.service.MockLakeFS +import org.apache.texera.service.resource.DatasetResource +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers +import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach} + +import java.io.ByteArrayInputStream +import java.net.URLEncoder +import java.nio.charset.StandardCharsets +import java.time.OffsetDateTime +import java.util.Optional + +/** + * Spec for [[StagedFileCleanupJob]] (issue #3681 — automated cleanup of uploaded but + * uncommitted files). + * + * Contract under test: + * - `runCleanupOnce(now)` deletes DATASET_UPLOAD_SESSION rows whose created_at is older + * than `retentionHours` relative to the injected `now` (aborting their LakeFS multipart + * first; part rows go away via ON DELETE CASCADE), + * - resets LakeFS staged (uncommitted) objects whose mtime exceeds retention, + * - skips staged objects that belong to a non-expired upload session, + * - never touches committed objects, + * - counts per-item failures in `errors` without aborting the batch, + * - is idempotent. + * + * Tests never sleep to age things: sessions are aged either by passing a future `now` + * (everything created "now" is then older than retention) or by writing an explicit + * created_at via jOOQ. Staged-object mtimes cannot be faked, so object-expiry tests pass a + * future `now`, and "fresh staged object" semantics under a future `now` are exercised via + * the session-protection rule (created_at moved next to the future `now`). + */ +class StagedFileCleanupJobSpec + extends AnyFlatSpec + with Matchers + with MockTexeraDB + with MockLakeFS + with BeforeAndAfterAll + with BeforeAndAfterEach { + + // --------------------------------------------------------------------------- + // Job configuration under test + // --------------------------------------------------------------------------- + private val RetentionHours = 24 + private val IntervalMinutes = 60 + + private lazy val job = new StagedFileCleanupJob(RetentionHours, IntervalMinutes) + + /** A `now` far enough in the future that anything created at real wall-clock time is expired. */ + private def farFuture: OffsetDateTime = + OffsetDateTime.now().plusHours(RetentionHours.toLong + 1L) + + // --------------------------------------------------------------------------- + // Fixtures (minimal copies of the DatasetResourceSpec idioms) + // --------------------------------------------------------------------------- + private val ownerUser: User = { + val user = new User + user.setName("cleanup_test_user") + user.setPassword("123") + user.setEmail("[email protected]") + user.setRole(UserRoleEnum.ADMIN) + user + } + + private val repoName: String = s"cleanup-ds-${System.nanoTime()}" + + private val cleanupDataset: Dataset = { + val dataset = new Dataset + dataset.setName("cleanup-ds") + dataset.setRepositoryName(repoName) + dataset.setIsPublic(true) + dataset.setIsDownloadable(true) + dataset.setDescription("dataset for staged-file cleanup tests") + dataset + } + + /** Object committed once up-front; must survive every cleanup run (safety pin). */ + private val PinnedCommittedPath = "pinned/committed-pin.bin" + + private lazy val sessionUser = new SessionUser(ownerUser) + private lazy val datasetResource = new DatasetResource() + + private var lakeFsReady = false + + // --------------------------------------------------------------------------- + // Lifecycle + // --------------------------------------------------------------------------- + override protected def beforeAll(): Unit = { + super.beforeAll() + + initializeDBAndReplaceDSLContext() + + new UserDao(getDSLContext.configuration()).insert(ownerUser) + cleanupDataset.setOwnerUid(ownerUser.getUid) + new DatasetDao(getDSLContext.configuration()).insert(cleanupDataset) + } + + // Containers (MockLakeFS) only become reachable after the suite starts running, so all + // LakeFS setup happens lazily here rather than in beforeAll — same reason + // DatasetResourceSpec initializes its repo in beforeEach. + override protected def beforeEach(): Unit = { + super.beforeEach() + + if (!lakeFsReady) { + try LakeFSStorageClient.initRepo(repoName) + catch { + case e: ApiException if e.getCode == 409 => // already exists, fine + } + // Commit one object up-front: cleanup must NEVER touch committed objects. + LakeFSStorageClient.writeFileToRepo( + repoName, + PinnedCommittedPath, + new ByteArrayInputStream("pinned".getBytes(StandardCharsets.UTF_8)) + ) + LakeFSStorageClient.createCommit(repoName, "main", "pin committed object") + lakeFsReady = true + } + + // Clean slate so report counts are exact and independent of test order. + // (Deliberately NOT done via the job under test, to keep fixtures independent of it.) + getDSLContext.deleteFrom(DATASET_UPLOAD_SESSION).execute() + LakeFSStorageClient + .retrieveUncommittedObjects(repoName) + .foreach(diff => LakeFSStorageClient.resetObjectUploadOrDeletion(repoName, diff.getPath)) + } + + override protected def afterAll(): Unit = { + try shutdownDB() + finally super.afterAll() + } + + // --------------------------------------------------------------------------- + // Helpers + // --------------------------------------------------------------------------- + private def urlEnc(raw: String): String = + URLEncoder.encode(raw, StandardCharsets.UTF_8.name()) + + private def uniquePath(prefix: String): String = + s"$prefix/${System.nanoTime()}.bin" + + /** Creates a real upload session (valid uploadId + physicalAddress) and returns its uploadId. */ + private def initSession(filePath: String): String = { + val resp = datasetResource.multipartUpload( + "init", + ownerUser.getEmail, + cleanupDataset.getName, + urlEnc(filePath), + Optional.of(java.lang.Long.valueOf(16L)), + Optional.of(java.lang.Long.valueOf(32L)), // single part + Optional.empty(), + sessionUser + ) + resp.getStatus shouldEqual 200 + val record = fetchSession(filePath) + record should not be null + record.getUploadId + } + + private def fetchSession(filePath: String) = + getDSLContext + .selectFrom(DATASET_UPLOAD_SESSION) + .where( + DATASET_UPLOAD_SESSION.UID + .eq(ownerUser.getUid) + .and(DATASET_UPLOAD_SESSION.DID.eq(cleanupDataset.getDid)) + .and(DATASET_UPLOAD_SESSION.FILE_PATH.eq(filePath)) + ) + .fetchOne() + + private def countPartRows(uploadId: String): Int = + getDSLContext + .selectCount() + .from(DATASET_UPLOAD_SESSION_PART) + .where(DATASET_UPLOAD_SESSION_PART.UPLOAD_ID.eq(uploadId)) + .fetchOne(0, classOf[Int]) + + /** Pins a session's age precisely — the injectable-clock counterpart on the DB side. */ + private def setSessionCreatedAt(uploadId: String, createdAt: OffsetDateTime): Unit = + getDSLContext + .update(DATASET_UPLOAD_SESSION) + .set(DATASET_UPLOAD_SESSION.CREATED_AT, createdAt) + .where(DATASET_UPLOAD_SESSION.UPLOAD_ID.eq(uploadId)) + .execute() + + /** Uploads an object to the repo branch WITHOUT committing (a staged/uncommitted object). */ + private def stageObject(filePath: String, content: String = "staged-bytes"): Unit = + LakeFSStorageClient.writeFileToRepo( + repoName, + filePath, + new ByteArrayInputStream(content.getBytes(StandardCharsets.UTF_8)) + ) + + private def uncommittedPaths(): List[String] = + LakeFSStorageClient.retrieveUncommittedObjects(repoName).map(_.getPath) + + private def committedPaths(): List[String] = + LakeFSStorageClient.retrieveObjectsOfVersion(repoName, "main").map(_.getPath) + + /** Inserts a session row whose LakeFS multipart does not exist (forces a per-item failure). */ + private def insertBogusSession(filePath: String): Unit = + getDSLContext + .insertInto(DATASET_UPLOAD_SESSION) + .set(DATASET_UPLOAD_SESSION.DID, cleanupDataset.getDid) + .set(DATASET_UPLOAD_SESSION.UID, ownerUser.getUid) + .set(DATASET_UPLOAD_SESSION.FILE_PATH, filePath) + .set(DATASET_UPLOAD_SESSION.UPLOAD_ID, s"bogus-upload-${System.nanoTime()}") + .set( + DATASET_UPLOAD_SESSION.PHYSICAL_ADDRESS, + "s3://nonexistent-bucket/nonexistent-key" + ) + .set(DATASET_UPLOAD_SESSION.NUM_PARTS_REQUESTED, Int.box(1)) + .set(DATASET_UPLOAD_SESSION.FILE_SIZE_BYTES, java.lang.Long.valueOf(16L)) + .set(DATASET_UPLOAD_SESSION.PART_SIZE_BYTES, java.lang.Long.valueOf(32L)) + .execute() + + // =========================================================================== + // 1. Expired session is cleaned + // =========================================================================== + "StagedFileCleanupJob.runCleanupOnce" should "delete an expired upload session and its part rows" in { + val filePath = uniquePath("expired-session") + val uploadId = initSession(filePath) + countPartRows(uploadId) shouldEqual 1 // placeholder created at init + + val report = job.runCleanupOnce(farFuture) + + report.sessionsDeleted shouldEqual 1 + report.errors shouldEqual 0 + fetchSession(filePath) shouldBe null + countPartRows(uploadId) shouldEqual 0 // gone via ON DELETE CASCADE + } + + // =========================================================================== + // 2. Fresh session survives + // =========================================================================== + it should "keep a fresh (non-expired) upload session" in { + val filePath = uniquePath("fresh-session") + val uploadId = initSession(filePath) + + val report = job.runCleanupOnce(OffsetDateTime.now()) + + report.sessionsDeleted shouldEqual 0 + report.errors shouldEqual 0 + fetchSession(filePath) should not be null + countPartRows(uploadId) shouldEqual 1 + } + + // =========================================================================== + // 3. Expired staged object is reset (committed object untouched) + // =========================================================================== + it should "reset an expired staged object but never a committed one" in { + val stagedPath = uniquePath("expired-staged") + stageObject(stagedPath) + uncommittedPaths() should contain(stagedPath) + + val report = job.runCleanupOnce(farFuture) + + report.objectsReset should be >= 1 + report.errors shouldEqual 0 + uncommittedPaths() should not contain stagedPath + // safety pin: the committed object survives every cleanup + committedPaths() should contain(PinnedCommittedPath) + } + + // =========================================================================== + // 4. Fresh staged object survives + // =========================================================================== + it should "keep a freshly staged object when run with the real current time" in { + val stagedPath = uniquePath("fresh-staged") + stageObject(stagedPath) + + val report = job.runCleanupOnce(OffsetDateTime.now()) + + report.objectsReset shouldEqual 0 + report.errors shouldEqual 0 + uncommittedPaths() should contain(stagedPath) + } + + // =========================================================================== + // 5. Idempotence + // =========================================================================== + it should "be idempotent: a second run with the same now reports all zeros" in { + val sessionPath = uniquePath("idempotent-session") + val stagedPath = uniquePath("idempotent-staged") + initSession(sessionPath) + stageObject(stagedPath) + + val now = farFuture + + val first = job.runCleanupOnce(now) + first.sessionsDeleted shouldEqual 1 + first.objectsReset should be >= 1 + first.errors shouldEqual 0 + + val second = job.runCleanupOnce(now) + second.sessionsDeleted shouldEqual 0 + second.objectsReset shouldEqual 0 + second.errors shouldEqual 0 + } + + // =========================================================================== + // 6. Active upload is not touched while other items expire + // =========================================================================== + it should "not touch a non-expired session or its staged object while expiring other items" in { + val now = farFuture + + // Protected: a session that is fresh RELATIVE TO the injected now, with its staged + // file present on the branch. The skip rule must protect the object even though its + // real mtime is "older" than retention relative to the future now. + val protectedPath = uniquePath("active-upload") + val protectedUploadId = initSession(protectedPath) + stageObject(protectedPath) + setSessionCreatedAt(protectedUploadId, now.minusMinutes(5)) + + // Expirees: another session and an orphan staged object, both created at real now, + // i.e. older than retention relative to the future now. + val expiredSessionPath = uniquePath("expired-other-session") + initSession(expiredSessionPath) + val expiredStagedPath = uniquePath("expired-other-staged") + stageObject(expiredStagedPath) + + val report = job.runCleanupOnce(now) + + report.sessionsDeleted shouldEqual 1 + report.objectsReset shouldEqual 1 + report.errors shouldEqual 0 + + // survivors + fetchSession(protectedPath) should not be null + uncommittedPaths() should contain(protectedPath) + // expirees + fetchSession(expiredSessionPath) shouldBe null + uncommittedPaths() should not contain expiredStagedPath + } + + // =========================================================================== + // 7. Report counting on a mixed batch + // =========================================================================== + it should "report exact counts for a mix of expired and fresh items" in { + val now = farFuture + + // 2 expired sessions + val expired1 = uniquePath("mix-expired-1") + val expired2 = uniquePath("mix-expired-2") + initSession(expired1) + initSession(expired2) + + // 1 fresh session protecting 1 fresh staged object (fresh relative to the injected now) + val freshPath = uniquePath("mix-fresh") + val freshUploadId = initSession(freshPath) + stageObject(freshPath) + setSessionCreatedAt(freshUploadId, now.minusMinutes(5)) + + // 1 expired staged object with no session + val expiredStaged = uniquePath("mix-expired-staged") + stageObject(expiredStaged) + + val report = job.runCleanupOnce(now) + + report.sessionsDeleted shouldEqual 2 + report.objectsReset shouldEqual 1 + report.errors shouldEqual 0 + + fetchSession(expired1) shouldBe null + fetchSession(expired2) shouldBe null + fetchSession(freshPath) should not be null + uncommittedPaths() should contain(freshPath) + uncommittedPaths() should not contain expiredStaged + } + + // =========================================================================== + // 8. Retention boundary (precision via injectable clock + explicit created_at) + // =========================================================================== + it should "clean a session just past retention but keep one just inside it" in { + val now = OffsetDateTime.now() + val cutoff = now.minusHours(RetentionHours.toLong) + + val survivorPath = uniquePath("boundary-survivor") + val survivorUploadId = initSession(survivorPath) + setSessionCreatedAt(survivorUploadId, cutoff.plusMinutes(1)) // retention - epsilon + + val expiredPath = uniquePath("boundary-expired") + val expiredUploadId = initSession(expiredPath) + setSessionCreatedAt(expiredUploadId, cutoff.minusMinutes(1)) // retention + epsilon + + val report = job.runCleanupOnce(now) + + report.sessionsDeleted shouldEqual 1 + report.errors shouldEqual 0 + fetchSession(survivorPath) should not be null + fetchSession(expiredPath) shouldBe null + } + + // =========================================================================== + // 9. Committed objects are never touched (dedicated, with a fresh commit) + // =========================================================================== + it should "leave committed objects intact while resetting expired staged objects" in { + // Commit a new object in this test, alongside an expired staged object. + val committedPath = uniquePath("committed-safe") + stageObject(committedPath, content = "committed-bytes") + LakeFSStorageClient.createCommit(repoName, "main", "commit object that cleanup must keep") + + val expiredStaged = uniquePath("doomed-staged") + stageObject(expiredStaged) + + val report = job.runCleanupOnce(farFuture) + + report.errors shouldEqual 0 + uncommittedPaths() should not contain expiredStaged + committedPaths() should contain(committedPath) + committedPaths() should contain(PinnedCommittedPath) + } + + // =========================================================================== + // 10. Already-aborted multipart (LakeFS 404) is treated as cleaned, not an error + // =========================================================================== + it should "delete a session whose multipart was already aborted in LakeFS, with no error" in { + val filePath = uniquePath("already-aborted") + initSession(filePath) + val record = fetchSession(filePath) + // Abort the multipart out-of-band; the DB row stays behind (simulates a crash between + // LakeFS abort and row deletion, or a previous partially-failed cleanup round). + LakeFSStorageClient.abortPresignedMultipartUploads( + repoName, + filePath, + record.getUploadId, + record.getPhysicalAddress + ) + fetchSession(filePath) should not be null + + val report = job.runCleanupOnce(farFuture) + + report.sessionsDeleted shouldEqual 1 + report.errors shouldEqual 0 + fetchSession(filePath) shouldBe null + } + + // =========================================================================== + // 11. Per-item failures are counted and never abort the batch + // =========================================================================== Review Comment: let's also test for different step failure during clean one item, such as no DB record is found, no file found, and timeout, such that the resulting state can either be rolled back, or cleaned in next round. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
