chenlica commented on code in PR #4136:
URL: https://github.com/apache/texera/pull/4136#discussion_r2659324158


##########
common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/util/LakeFSStorageClient.scala:
##########
@@ -358,4 +359,47 @@ object LakeFSStorageClient {
 
     branchesApi.resetBranch(repoName, branchName, resetCreation).execute()
   }
+
+  /**
+    * Parse a physical address URI of the form "<scheme>://<bucket>/<key...>" 
into (bucket, key).
+    *
+    * Expected examples:
+    *   - "s3://my-bucket/path/to/file.csv"

Review Comment:
   Texera users don't see MinIO nor S3.  Why do we expose this concept of 
"bucket" related to S3 and MinIO?
   



##########
file-service/src/test/scala/org/apache/texera/service/MockLakeFS.scala:
##########
@@ -87,11 +99,44 @@ trait MockLakeFS extends ForAllTestContainer with 
BeforeAndAfterAll { self: Suit
 
   def lakefsBaseUrl: String = 
s"http://${lakefs.host}:${lakefs.mappedPort(8000)}"
   def minioEndpoint: String = s"http://${minio.host}:${minio.mappedPort(9000)}"
+  def lakefsApiBasePath: String = s"$lakefsBaseUrl/api/v1"
+
+  // ---- Clients (lazy so they initialize after containers are started) ----
+
+  lazy val lakefsApiClient: ApiClient = {
+    val apiClient = new ApiClient()
+    apiClient.setBasePath(lakefsApiBasePath)
+    // basic-auth for lakeFS API uses accessKey as username, secretKey as 
password
+    apiClient.setUsername(lakefsAccessKeyID)
+    apiClient.setPassword(lakefsSecretAccessKey)
+    apiClient
+  }
+
+  lazy val repositoriesApi: RepositoriesApi = new 
RepositoriesApi(lakefsApiClient)
+
+  /**
+    * S3 client pointed at MinIO.
+    *
+    * Notes:
+    * - Region can be any value for MinIO, but MUST match what your signing 
expects.
+    *   so we use that.
+    * - Path-style is important: http://host:port/bucket/key
+    */
+  lazy val s3Client: S3Client = {
+    val creds = AwsBasicCredentials.create("texera_minio", "password")

Review Comment:
   Why do we show these presumably sensitive values in the codebase?



##########
frontend/src/app/dashboard/component/user/user-dataset/user-dataset-explorer/dataset-detail.component.ts:
##########
@@ -558,21 +562,54 @@ export class DatasetDetailComponent implements OnInit {
       this.onUploadComplete();
     }
 
-    this.datasetService
-      .finalizeMultipartUpload(
-        this.ownerEmail,
-        this.datasetName,
-        task.filePath,
-        task.uploadId,
-        [],
-        task.physicalAddress,
-        true // abort flag
-      )
-      .pipe(untilDestroyed(this))
-      .subscribe(() => {
-        this.notificationService.info(`${task.filePath} uploading has been 
terminated`);
-      });
-    // Remove the aborted task immediately
+    let doneCalled = false;
+    const done = () => {
+      if (doneCalled) {
+        return;
+      }
+      doneCalled = true;
+      if (onAborted) {
+        onAborted();
+      }
+    };
+
+    const abortWithRetry = (attempt: number) => {
+      this.datasetService
+        .finalizeMultipartUpload(
+          this.ownerEmail,
+          this.datasetName,
+          task.filePath,
+          true // abort flag
+        )
+        .pipe(untilDestroyed(this))
+        .subscribe({
+          next: () => {
+            this.notificationService.info(`${task.filePath} uploading has been 
terminated`);
+            done();
+          },
+          error: (res: unknown) => {
+            const err = res as HttpErrorResponse;
+
+            // Already gone, treat as done
+            if (err.status === 404) {
+              done();
+              return;
+            }
+
+            // Backend is still finalizing/aborting; retry with a tiny backoff
+            if (err.status === 409 && attempt < 10) {

Review Comment:
   When we use constants, it will be good to give them a special meaning, 
possibly by using some variables. Also can we make "10" customizable?



##########
sql/updates/17.sql:
##########
@@ -0,0 +1,66 @@
+-- Licensed to the Apache Software Foundation (ASF) under one
+-- or more contributor license agreements.  See the NOTICE file
+-- distributed with this work for additional information
+-- regarding copyright ownership.  The ASF licenses this file
+-- to you under the Apache License, Version 2.0 (the
+-- "License"); you may not use this file except in compliance
+-- with the License.  You may obtain a copy of the License at
+--
+--   http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing,
+-- software distributed under the License is distributed on an
+-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+-- KIND, either express or implied.  See the License for the
+-- specific language governing permissions and limitations
+-- under the License.
+
+-- ============================================
+-- 1. Connect to the texera_db database
+-- ============================================
+\c texera_db
+
+SET search_path TO texera_db;
+
+-- ============================================
+-- 2. Update the table schema
+-- ============================================
+BEGIN;
+
+-- 1. Drop old tables (if exist)
+DROP TABLE IF EXISTS dataset_upload_session CASCADE;

Review Comment:
   The table name "dataset_upload_session" is a prefix of the table name 
"dataset_upload_session_part".  Can we rename the former to make it more 
different from the latter?



##########
file-service/src/test/scala/org/apache/texera/service/MockLakeFS.scala:
##########
@@ -87,11 +99,44 @@ trait MockLakeFS extends ForAllTestContainer with 
BeforeAndAfterAll { self: Suit
 
   def lakefsBaseUrl: String = 
s"http://${lakefs.host}:${lakefs.mappedPort(8000)}"
   def minioEndpoint: String = s"http://${minio.host}:${minio.mappedPort(9000)}"
+  def lakefsApiBasePath: String = s"$lakefsBaseUrl/api/v1"

Review Comment:
   What's the meaning of "v1"?  What if we have "v2" and "v3" later?



##########
file-service/src/test/scala/org/apache/texera/service/MockLakeFS.scala:
##########
@@ -87,11 +99,44 @@ trait MockLakeFS extends ForAllTestContainer with 
BeforeAndAfterAll { self: Suit
 
   def lakefsBaseUrl: String = 
s"http://${lakefs.host}:${lakefs.mappedPort(8000)}"
   def minioEndpoint: String = s"http://${minio.host}:${minio.mappedPort(9000)}"
+  def lakefsApiBasePath: String = s"$lakefsBaseUrl/api/v1"
+
+  // ---- Clients (lazy so they initialize after containers are started) ----
+
+  lazy val lakefsApiClient: ApiClient = {
+    val apiClient = new ApiClient()
+    apiClient.setBasePath(lakefsApiBasePath)
+    // basic-auth for lakeFS API uses accessKey as username, secretKey as 
password
+    apiClient.setUsername(lakefsAccessKeyID)
+    apiClient.setPassword(lakefsSecretAccessKey)
+    apiClient
+  }
+
+  lazy val repositoriesApi: RepositoriesApi = new 
RepositoriesApi(lakefsApiClient)
+
+  /**
+    * S3 client pointed at MinIO.
+    *
+    * Notes:
+    * - Region can be any value for MinIO, but MUST match what your signing 
expects.
+    *   so we use that.
+    * - Path-style is important: http://host:port/bucket/key
+    */
+  lazy val s3Client: S3Client = {
+    val creds = AwsBasicCredentials.create("texera_minio", "password")
+    S3Client
+      .builder()
+      .endpointOverride(URI.create(StorageConfig.s3Endpoint)) // set in 
afterStart()
+      .region(Region.US_WEST_2)

Review Comment:
   Why do we hardcode "US_WEST_2" here?  Can we make it customizable?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to