amogh-jahagirdar commented on code in PR #15126:
URL: https://github.com/apache/iceberg/pull/15126#discussion_r2743679465


##########
core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java:
##########
@@ -4394,6 +4402,102 @@ private void expectNotModifiedResponseForLoadTable(
             any());
   }
 
+  /**
+   * Test concurrent appends on multiple branches simultaneously to verify 
proper handling of
+   * sequence number conflicts.
+   *
+   * <p>Creates 5 different branches on the table, then performs 10 parallel 
append commits on each
+   * branch at the same time (50 total concurrent operations). This verifies 
that: 1. Sequence
+   * number conflicts are caught by AssertLastSequenceNumber requirement 2. 
Conflicts result in
+   * CommitFailedException (retryable) not ValidationException (non-retryable) 
3. The REST catalog
+   * properly handles concurrent modifications across different branches
+   */
+  @Test
+  public void testConcurrentAppendsOnMultipleBranches() {
+    int numBranches = 5;
+    int commitsPerBranch = 10;
+    int totalConcurrentWrites = numBranches * commitsPerBranch;
+
+    RESTCatalog restCatalog = catalog();
+
+    Namespace ns = Namespace.of("concurrent_test");
+    TableIdentifier tableIdent = TableIdentifier.of(ns, "test_table");
+
+    restCatalog.createNamespace(ns);
+    Table table = restCatalog.buildTable(tableIdent, 
SCHEMA).withPartitionSpec(SPEC).create();
+
+    // Add initial data to the main branch
+    table.newFastAppend().appendFile(FILE_A).commit();
+
+    // Create 5 branches from the main branch
+    String[] branchNames = new String[numBranches];
+    for (int i = 0; i < numBranches; i++) {
+      branchNames[i] = "branch-" + i;
+      table.manageSnapshots().createBranch(branchNames[i]).commit();
+    }
+
+    // Refresh to get updated metadata with all branches
+    restCatalog.loadTable(tableIdent);
+
+    AtomicInteger successCount = new AtomicInteger(0);
+    AtomicInteger validationFailureCount = new AtomicInteger(0);
+
+    ExecutorService executor =
+        MoreExecutors.getExitingExecutorService(
+            (ThreadPoolExecutor) 
Executors.newFixedThreadPool(totalConcurrentWrites));
+
+    Tasks.range(totalConcurrentWrites)
+        .executeWith(executor)
+        .suppressFailureWhenFinished()
+        .onFailure(
+            (taskIndex, exception) -> {
+              // Check if sequence number validation error (indicates fix not 
working)
+              if (exception instanceof BadRequestException
+                  && exception.getMessage().contains("Cannot add snapshot with 
sequence number")) {
+                validationFailureCount.incrementAndGet();
+              } else if (exception instanceof ValidationException) {
+                validationFailureCount.incrementAndGet();
+              }
+              // CommitFailedException is expected - this is the correct 
retryable behavior
+            })
+        .run(
+            taskIndex -> {
+              int branchIdx = taskIndex / commitsPerBranch;
+              int commitIdx = taskIndex % commitsPerBranch;
+              String branchName = branchNames[branchIdx];
+
+              // Each thread loads the table independently
+              Table localTable = restCatalog.loadTable(tableIdent);
+
+              // Create a unique file for this commit
+              DataFile newFile =
+                  DataFiles.builder(SPEC)
+                      .withPath(
+                          String.format(
+                              "/path/to/branch-%d-commit-%d.parquet", 
branchIdx, commitIdx))
+                      .withFileSizeInBytes(15)
+                      .withPartitionPath(String.format("id_bucket=%d", 
branchIdx % 16))
+                      .withRecordCount(3)
+                      .build();
+
+              // Append to the specific branch
+              
localTable.newFastAppend().appendFile(newFile).toBranch(branchName).commit();
+
+              successCount.incrementAndGet();
+            });
+
+    // Verify the fix: with AssertLastSequenceNumber, there should be NO 
validation failures
+    // All concurrent conflicts should be caught as CommitFailedException 
(retryable)
+    assertThat(validationFailureCount.get())
+        .as(
+            "With the fix, sequence number conflicts should be caught by 
AssertLastSequenceNumber "
+                + "and throw CommitFailedException (retryable), not 
ValidationException")
+        .isEqualTo(0);
+
+    // At least some should succeed (commits that don't conflict or succeed 
after retry)
+    assertThat(successCount.get()).as("At least some appends should 
succeed").isGreaterThan(0);

Review Comment:
   I think we should aim to make this test have harder assertions. I think we 
could use an AtomicInteger barrier and essentially synchronize different rounds 
of commits and deterministically cause conflicts. At the end, I think we'd be 
able to have a deterministic number of failures (i'd probably organize it so 
the barrier causes 1 conflict per branch per round?).  Checkout 
https://github.com/apache/iceberg/blob/main/core/src/test/java/org/apache/iceberg/jdbc/TestJdbcTableConcurrency.java#L130
 for another example of this pattern



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to