xuang7 commented on code in PR #4181:
URL: https://github.com/apache/texera/pull/4181#discussion_r2749021371
##########
file-service/src/main/scala/org/apache/texera/service/resource/DatasetResource.scala:
##########
@@ -72,10 +68,20 @@ import
org.apache.texera.dao.jooq.generated.tables.DatasetUploadSessionPart.DATA
import org.jooq.exception.DataAccessException
import software.amazon.awssdk.services.s3.model.UploadPartResponse
import org.apache.commons.io.FilenameUtils
+import
org.apache.texera.dao.jooq.generated.tables.records.DatasetUploadSessionRecord
+import
org.apache.texera.dao.jooq.generated.tables.DatasetUploadSession.DATASET_UPLOAD_SESSION
+import
org.apache.texera.dao.jooq.generated.tables.DatasetUploadSessionPart.DATASET_UPLOAD_SESSION_PART
+import org.jooq.exception.DataAccessException
+import software.amazon.awssdk.services.s3.model.UploadPartResponse
+import org.apache.commons.io.FilenameUtils
import
org.apache.texera.service.util.LakeFSExceptionHandler.withLakeFSErrorHandling
+import
org.apache.texera.dao.jooq.generated.tables.records.DatasetUploadSessionRecord
import java.sql.SQLException
import scala.util.Try
+import java.sql.SQLException
+import java.time.OffsetDateTime
+import scala.util.Try
Review Comment:
There are some duplicated and unused imports that can be removed.
##########
frontend/src/app/common/formly/collab-wrapper/collab-wrapper/collab-wrapper.component.css:
##########
@@ -25,7 +25,7 @@
outline: transparent;
overflow: visible;
position: relative;
- : 1pt;
+ :1pt;
Review Comment:
Seems like this change isn't relevant to the PR.
##########
file-service/src/main/scala/org/apache/texera/service/resource/DatasetResource.scala:
##########
@@ -1557,111 +1581,229 @@ class DatasetResource {
s"Computed numParts=$numPartsLong is out of range
1..$MAXIMUM_NUM_OF_MULTIPART_S3_PARTS"
)
}
- val numPartsValue: Int = numPartsLong.toInt
+ val computedNumParts: Int = numPartsLong.toInt
- // S3 multipart constraint: all non-final parts must be >= 5MiB.
- // If we have >1 parts, then partSizeBytesValue is the non-final part
size.
- if (numPartsValue > 1 && partSizeBytesValue <
MINIMUM_NUM_OF_MULTIPART_S3_PART) {
+ if (computedNumParts > 1 && partSizeBytesValue <
MINIMUM_NUM_OF_MULTIPART_S3_PART) {
throw new BadRequestException(
s"partSizeBytes=$partSizeBytesValue is too small. " +
s"All non-final parts must be >= $MINIMUM_NUM_OF_MULTIPART_S3_PART
bytes."
)
}
-
- // Reject if a session already exists
- val exists = ctx.fetchExists(
- ctx
- .selectOne()
- .from(DATASET_UPLOAD_SESSION)
+ var session: DatasetUploadSessionRecord = null
+ var rows: Result[Record2[Integer, String]] = null
+ try {
+ session = ctx
+ .selectFrom(DATASET_UPLOAD_SESSION)
.where(
DATASET_UPLOAD_SESSION.UID
.eq(uid)
.and(DATASET_UPLOAD_SESSION.DID.eq(did))
.and(DATASET_UPLOAD_SESSION.FILE_PATH.eq(filePath))
)
- )
- if (exists) {
- throw new WebApplicationException(
- "Upload already in progress for this filePath",
- Response.Status.CONFLICT
- )
- }
-
- val presign = withLakeFSErrorHandling {
- LakeFSStorageClient.initiatePresignedMultipartUploads(
- repositoryName,
- filePath,
- numPartsValue
- )
+ .forUpdate()
+ .noWait()
+ .fetchOne()
+ if (session != null) {
+ //Gain parts lock
+ rows = ctx
+ .select(DATASET_UPLOAD_SESSION_PART.PART_NUMBER,
DATASET_UPLOAD_SESSION_PART.ETAG)
+ .from(DATASET_UPLOAD_SESSION_PART)
+ .where(DATASET_UPLOAD_SESSION_PART.UPLOAD_ID.eq(session.getUploadId))
+ .forUpdate()
+ .noWait()
+ .fetch()
+ val dbFileSize = session.getFileSizeBytes
+ val dbPartSize = session.getPartSizeBytes
+ val dbNumParts = session.getNumPartsRequested
+ val createdAt: OffsetDateTime = session.getCreatedAt
+
+ val isExpired =
+ createdAt
+ .plusHours(PHYSICAL_ADDRESS_EXPIRATION_TIME_HRS.toLong)
+ .isBefore(OffsetDateTime.now(createdAt.getOffset)) // or
OffsetDateTime.now()
+
+ val conflictConfig =
+ dbFileSize != fileSizeBytesValue ||
+ dbPartSize != partSizeBytesValue ||
+ dbNumParts != computedNumParts ||
+ isExpired ||
+ Option(restart).exists(_.orElse(false))
+
+ if (conflictConfig) {
+ // Parts will be deleted automatically (ON DELETE CASCADE)
+ ctx
+ .deleteFrom(DATASET_UPLOAD_SESSION)
+ .where(DATASET_UPLOAD_SESSION.UPLOAD_ID.eq(session.getUploadId))
+ .execute()
+
+ try {
+ LakeFSStorageClient.abortPresignedMultipartUploads(
+ repositoryName,
+ filePath,
+ session.getUploadId,
+ session.getPhysicalAddress
+ )
+ } catch { case _: Throwable => () }
+ session = null
+ rows = null
+ }
+ }
+ } catch {
+ case e: DataAccessException
+ if Option(e.getCause)
+ .collect { case s: SQLException => s.getSQLState }
+ .contains("55P03") =>
+ throw new WebApplicationException(
+ "Another client is uploading this file",
+ Response.Status.CONFLICT
+ )
}
- val uploadIdStr = presign.getUploadId
- val physicalAddr = presign.getPhysicalAddress
-
- try {
- val rowsInserted = ctx
- .insertInto(DATASET_UPLOAD_SESSION)
- .set(DATASET_UPLOAD_SESSION.FILE_PATH, filePath)
- .set(DATASET_UPLOAD_SESSION.DID, did)
- .set(DATASET_UPLOAD_SESSION.UID, uid)
- .set(DATASET_UPLOAD_SESSION.UPLOAD_ID, uploadIdStr)
- .set(DATASET_UPLOAD_SESSION.PHYSICAL_ADDRESS, physicalAddr)
- .set(DATASET_UPLOAD_SESSION.NUM_PARTS_REQUESTED,
Integer.valueOf(numPartsValue))
- .set(DATASET_UPLOAD_SESSION.FILE_SIZE_BYTES,
java.lang.Long.valueOf(fileSizeBytesValue))
- .set(DATASET_UPLOAD_SESSION.PART_SIZE_BYTES,
java.lang.Long.valueOf(partSizeBytesValue))
- .onDuplicateKeyIgnore()
- .execute()
-
- if (rowsInserted != 1) {
- LakeFSStorageClient.abortPresignedMultipartUploads(
+ if (session == null) {
+ val presign = withLakeFSErrorHandling {
+ LakeFSStorageClient.initiatePresignedMultipartUploads(
repositoryName,
filePath,
- uploadIdStr,
- physicalAddr
- )
- throw new WebApplicationException(
- "Upload already in progress for this filePath",
- Response.Status.CONFLICT
+ computedNumParts
)
}
- // Pre-create part rows 1..numPartsValue with empty ETag.
- // This makes per-part locking cheap and deterministic.
+ val uploadIdStr = presign.getUploadId
+ val physicalAddr = presign.getPhysicalAddress
- val partNumberSeries = DSL.generateSeries(1,
numPartsValue).asTable("gs", "pn")
- val partNumberField = partNumberSeries.field("pn", classOf[Integer])
+ try {
+ val rowsInserted = ctx
+ .insertInto(DATASET_UPLOAD_SESSION)
+ .set(DATASET_UPLOAD_SESSION.FILE_PATH, filePath)
+ .set(DATASET_UPLOAD_SESSION.DID, did)
+ .set(DATASET_UPLOAD_SESSION.UID, uid)
+ .set(DATASET_UPLOAD_SESSION.UPLOAD_ID, uploadIdStr)
+ .set(DATASET_UPLOAD_SESSION.PHYSICAL_ADDRESS, physicalAddr)
+ .set(DATASET_UPLOAD_SESSION.NUM_PARTS_REQUESTED,
Integer.valueOf(computedNumParts))
+ .set(DATASET_UPLOAD_SESSION.FILE_SIZE_BYTES,
java.lang.Long.valueOf(fileSizeBytesValue))
+ .set(DATASET_UPLOAD_SESSION.PART_SIZE_BYTES,
java.lang.Long.valueOf(partSizeBytesValue))
+ .onDuplicateKeyIgnore()
+ .execute()
+
+ if (rowsInserted == 1) {
+ val partNumberSeries =
+ DSL.generateSeries(1, computedNumParts).asTable("gs",
"partNumberField")
+ val partNumberField = partNumberSeries.field("partNumberField",
classOf[Integer])
- ctx
- .insertInto(
- DATASET_UPLOAD_SESSION_PART,
- DATASET_UPLOAD_SESSION_PART.UPLOAD_ID,
- DATASET_UPLOAD_SESSION_PART.PART_NUMBER,
- DATASET_UPLOAD_SESSION_PART.ETAG
- )
- .select(
ctx
+ .insertInto(
+ DATASET_UPLOAD_SESSION_PART,
+ DATASET_UPLOAD_SESSION_PART.UPLOAD_ID,
+ DATASET_UPLOAD_SESSION_PART.PART_NUMBER,
+ DATASET_UPLOAD_SESSION_PART.ETAG
+ )
.select(
- inl(uploadIdStr),
- partNumberField,
- inl("") // placeholder empty etag
+ ctx
+ .select(
+ inl(uploadIdStr),
+ partNumberField,
+ inl("")
+ )
+ .from(partNumberSeries)
)
- .from(partNumberSeries)
- )
- .execute()
+ .execute()
+
+ session = ctx
+ .selectFrom(DATASET_UPLOAD_SESSION)
+ .where(
+ DATASET_UPLOAD_SESSION.UID
+ .eq(uid)
+ .and(DATASET_UPLOAD_SESSION.DID.eq(did))
+ .and(DATASET_UPLOAD_SESSION.FILE_PATH.eq(filePath))
+ )
+ .fetchOne()
+ } else {
+ try {
+ LakeFSStorageClient.abortPresignedMultipartUploads(
+ repositoryName,
+ filePath,
+ uploadIdStr,
+ physicalAddr
+ )
+ } catch { case _: Throwable => () }
+
+ session = ctx
+ .selectFrom(DATASET_UPLOAD_SESSION)
+ .where(
+ DATASET_UPLOAD_SESSION.UID
+ .eq(uid)
+ .and(DATASET_UPLOAD_SESSION.DID.eq(did))
+ .and(DATASET_UPLOAD_SESSION.FILE_PATH.eq(filePath))
+ )
+ .fetchOne()
+ }
+ } catch {
+ case e: Exception =>
+ try {
+ LakeFSStorageClient.abortPresignedMultipartUploads(
+ repositoryName,
+ filePath,
+ uploadIdStr,
+ physicalAddr
+ )
+ } catch { case _: Throwable => () }
+ throw e
+ }
+ }
- Response.ok().build()
- } catch {
- case e: Exception =>
+ if (session == null) {
+ throw new WebApplicationException(
+ "Failed to create or locate upload session",
+ Response.Status.INTERNAL_SERVER_ERROR
+ )
+ }
+
+ val dbNumParts = session.getNumPartsRequested
+
+ val uploadId = session.getUploadId
+ val nParts = dbNumParts
+
+ // CHANGED: lock rows with NOWAIT; if any row is locked by another
uploader -> 409
+ if (rows == null) {
+ rows =
try {
- LakeFSStorageClient.abortPresignedMultipartUploads(
- repositoryName,
- filePath,
- uploadIdStr,
- physicalAddr
- )
- } catch { case _: Throwable => () }
- throw e
+ ctx
+ .select(DATASET_UPLOAD_SESSION_PART.PART_NUMBER,
DATASET_UPLOAD_SESSION_PART.ETAG)
+ .from(DATASET_UPLOAD_SESSION_PART)
+ .where(DATASET_UPLOAD_SESSION_PART.UPLOAD_ID.eq(uploadId))
+ .forUpdate()
+ .noWait()
+ .fetch()
+ } catch {
+ case e: DataAccessException
+ if Option(e.getCause)
+ .collect { case s: SQLException => s.getSQLState }
+ .contains("55P03") =>
+ throw new WebApplicationException(
+ "Another client is uploading parts for this file",
+ Response.Status.CONFLICT
+ )
+ }
}
+
+ // CHANGED: compute missingParts + completedPartsCount from the SAME
query result
+ val missingPartsSmartSorted = rows.asScala
Review Comment:
This could be renamed since it's filtering and isn't sorted.
##########
frontend/src/app/dashboard/component/user/files-uploader/files-uploader.component.ts:
##########
@@ -32,98 +37,229 @@ import { UntilDestroy, untilDestroyed } from
"@ngneat/until-destroy";
styleUrls: ["./files-uploader.component.scss"],
})
export class FilesUploaderComponent {
- @Input()
- showUploadAlert: boolean = false;
+ @Input() showUploadAlert: boolean = false;
- @Output()
- uploadedFiles = new EventEmitter<FileUploadItem[]>();
+ @Output() uploadedFiles = new EventEmitter<FileUploadItem[]>();
newUploadFileTreeNodes: DatasetFileNode[] = [];
fileUploadingFinished: boolean = false;
- // four types: "success", "info", "warning" and "error"
fileUploadBannerType: "error" | "success" | "info" | "warning" = "success";
fileUploadBannerMessage: string = "";
singleFileUploadMaxSizeMiB: number = 20;
constructor(
private notificationService: NotificationService,
- private adminSettingsService: AdminSettingsService
+ private adminSettingsService: AdminSettingsService,
+ private datasetService: DatasetService,
+ @Optional() @Host() private parent: DatasetDetailComponent,
+ private modal: NzModalService
) {
this.adminSettingsService
.getSetting("single_file_upload_max_size_mib")
.pipe(untilDestroyed(this))
.subscribe(value => (this.singleFileUploadMaxSizeMiB = parseInt(value)));
}
- hideBanner() {
+ private formatBytes(n: number): string {
+ const mib = n / (1024 * 1024);
+ if (mib >= 1024) return `${(mib / 1024).toFixed(2)} GiB`;
+ if (mib >= 1) return `${mib.toFixed(2)} MiB`;
+ return `${Math.max(1, Math.round(n / 1024))} KiB`;
+ }
Review Comment:
There's an existing size formatter util
(frontend/src/app/common/util/size-formatter.util.ts), might be better to use
that one for consistency.
##########
frontend/src/app/dashboard/component/user/files-uploader/files-uploader.component.ts:
##########
@@ -32,98 +37,229 @@ import { UntilDestroy, untilDestroyed } from
"@ngneat/until-destroy";
styleUrls: ["./files-uploader.component.scss"],
})
export class FilesUploaderComponent {
- @Input()
- showUploadAlert: boolean = false;
+ @Input() showUploadAlert: boolean = false;
- @Output()
- uploadedFiles = new EventEmitter<FileUploadItem[]>();
+ @Output() uploadedFiles = new EventEmitter<FileUploadItem[]>();
newUploadFileTreeNodes: DatasetFileNode[] = [];
fileUploadingFinished: boolean = false;
- // four types: "success", "info", "warning" and "error"
fileUploadBannerType: "error" | "success" | "info" | "warning" = "success";
fileUploadBannerMessage: string = "";
singleFileUploadMaxSizeMiB: number = 20;
constructor(
private notificationService: NotificationService,
- private adminSettingsService: AdminSettingsService
+ private adminSettingsService: AdminSettingsService,
+ private datasetService: DatasetService,
+ @Optional() @Host() private parent: DatasetDetailComponent,
+ private modal: NzModalService
) {
this.adminSettingsService
.getSetting("single_file_upload_max_size_mib")
.pipe(untilDestroyed(this))
.subscribe(value => (this.singleFileUploadMaxSizeMiB = parseInt(value)));
}
- hideBanner() {
+ private formatBytes(n: number): string {
+ const mib = n / (1024 * 1024);
+ if (mib >= 1024) return `${(mib / 1024).toFixed(2)} GiB`;
+ if (mib >= 1) return `${mib.toFixed(2)} MiB`;
+ return `${Math.max(1, Math.round(n / 1024))} KiB`;
+ }
+
+ private markForceRestart(item: FileUploadItem): void {
+ // uploader should call backend init with type=forceRestart when this is
set
+ (item as any).restart = true;
+ }
+
+ private askResumeOrSkip(
+ item: FileUploadItem,
+ showForAll: boolean
+ ): Promise<"resume" | "resumeAll" | "restart" | "restartAll"> {
+ return new Promise(resolve => {
+ const fileName = item.name.split("/").pop() || item.name;
+ const sizeStr = this.formatBytes(item.file.size);
+
+ const ref = this.modal.create({
+ nzTitle: "Conflicting File",
+ nzMaskClosable: false,
+ nzClosable: false,
+ nzContent: `
+<div>
Review Comment:
nzContent is currently built via a template string with raw HTML
interpolation. It might be better to use an Angular ng-template (or a small
component) for nzContent.
##########
frontend/src/app/dashboard/component/user/user-dataset/user-dataset-explorer/dataset-detail.component.ts:
##########
@@ -426,7 +428,8 @@ export class DatasetDetailComponent implements OnInit {
file.name,
file.file,
this.chunkSizeMiB * 1024 * 1024,
- this.maxConcurrentChunks
+ this.maxConcurrentChunks,
+ Boolean((file as any).restart)
Review Comment:
We could add an optional restart?: boolean field.
##########
frontend/src/app/dashboard/component/user/files-uploader/files-uploader.component.ts:
##########
@@ -32,98 +37,229 @@ import { UntilDestroy, untilDestroyed } from
"@ngneat/until-destroy";
styleUrls: ["./files-uploader.component.scss"],
})
export class FilesUploaderComponent {
- @Input()
- showUploadAlert: boolean = false;
+ @Input() showUploadAlert: boolean = false;
- @Output()
- uploadedFiles = new EventEmitter<FileUploadItem[]>();
+ @Output() uploadedFiles = new EventEmitter<FileUploadItem[]>();
newUploadFileTreeNodes: DatasetFileNode[] = [];
fileUploadingFinished: boolean = false;
- // four types: "success", "info", "warning" and "error"
fileUploadBannerType: "error" | "success" | "info" | "warning" = "success";
fileUploadBannerMessage: string = "";
singleFileUploadMaxSizeMiB: number = 20;
constructor(
private notificationService: NotificationService,
- private adminSettingsService: AdminSettingsService
+ private adminSettingsService: AdminSettingsService,
+ private datasetService: DatasetService,
+ @Optional() @Host() private parent: DatasetDetailComponent,
+ private modal: NzModalService
) {
this.adminSettingsService
.getSetting("single_file_upload_max_size_mib")
.pipe(untilDestroyed(this))
.subscribe(value => (this.singleFileUploadMaxSizeMiB = parseInt(value)));
}
- hideBanner() {
+ private formatBytes(n: number): string {
+ const mib = n / (1024 * 1024);
+ if (mib >= 1024) return `${(mib / 1024).toFixed(2)} GiB`;
+ if (mib >= 1) return `${mib.toFixed(2)} MiB`;
+ return `${Math.max(1, Math.round(n / 1024))} KiB`;
+ }
+
+ private markForceRestart(item: FileUploadItem): void {
+ // uploader should call backend init with type=forceRestart when this is
set
+ (item as any).restart = true;
Review Comment:
We could add an optional restart?: boolean field to FileUploadItem and set
it directly.
##########
frontend/src/app/workspace/component/menu/coeditor-user-icon/coeditor-user-icon.component.css:
##########
@@ -16,4 +16,3 @@
* specific language governing permissions and limitations
* under the License.
*/
-
Review Comment:
Seems like this change isn't relevant to the PR.
##########
file-service/src/main/scala/org/apache/texera/service/resource/DatasetResource.scala:
##########
@@ -1487,11 +1495,38 @@ class DatasetResource {
dataset
}
+ private def listMultipartUploads(did: Integer, requesterUid: Int): Response
= {
+ withTransaction(context) { ctx =>
+ if (!userHasWriteAccess(ctx, did, requesterUid)) {
+ throw new ForbiddenException(ERR_USER_HAS_NO_ACCESS_TO_DATASET_MESSAGE)
+ }
+
+ val filePaths =
+ ctx
+ .selectDistinct(DATASET_UPLOAD_SESSION.FILE_PATH)
+ .from(DATASET_UPLOAD_SESSION)
+ .where(DATASET_UPLOAD_SESSION.DID.eq(did))
+ .and(
+ DSL.condition(
+ "created_at > current_timestamp - (? * interval '1 hour')",
Review Comment:
init and list seem to be checking expiry using different time sources (app
now() and DB current_timestamp). Might be better to unify this so both use the
same source to avoid inconsistent expiry decisions.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]