phet commented on code in PR #3721:
URL: https://github.com/apache/gobblin/pull/3721#discussion_r1272742265


##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java:
##########
@@ -101,8 +102,8 @@ protected interface CheckedFunction<T, R> {
   private static final String CREATE_CONSTANTS_TABLE_STATEMENT = "CREATE TABLE 
IF NOT EXISTS %s "
       + "(primary_key INT, epsilon INT, linger INT, PRIMARY KEY 
(primary_key))";
   // Only insert epsilon and linger values from config if this table does not 
contain a pre-existing values already.
-  private static final String UPSERT_CONSTANTS_TABLE_STATEMENT = "INSERT INTO 
%s (primary_key, epsilon, linger) "
-      + "VALUES(1, ?, ?) ON DUPLICATE KEY UPDATE epsilon=VALUES(epsilon), 
linger=VALUES(linger)";
+  private static final String INSERT_CONSTANTS_TABLE_STATEMENT = "INSERT INTO 
%s (primary_key, epsilon, linger) "
+      + "VALUES(1, ?, ?)";

Review Comment:
   sorry if I'm missing something, but, if not using `ON DUPLICATE KEY 
UPDATE`... how are `epsilon` and/or `linger` ever updated?



##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java:
##########
@@ -212,11 +220,19 @@ public LeaseAttemptStatus 
tryAcquireLease(DagActionStore.DagAction flowAction, l
         log.debug("tryAcquireLease for [{}, eventTimestamp: {}] - CASE 1: no 
existing row for this flow action, then go"
                 + " ahead and insert", flowAction, eventTimeMillis);
         String formattedAcquireLeaseNewRowStatement =
-            String.format(CONDITIONALLY_ACQUIRE_LEASE_IF_NEW_ROW_STATEMENT, 
this.leaseArbiterTableName);
+            String.format(CONDITIONALLY_ACQUIRE_LEASE_IF_NEW_ROW_STATEMENT, 
this.leaseArbiterTableName,
+                this.leaseArbiterTableName);
         int numRowsUpdated = 
withPreparedStatement(formattedAcquireLeaseNewRowStatement,
             insertStatement -> {
               completeInsertPreparedStatement(insertStatement, flowAction);
-              return insertStatement.executeUpdate();
+              try {
+                return insertStatement.executeUpdate();
+              } catch (SQLIntegrityConstraintViolationException e) {
+                if (!e.getMessage().contains("Duplicate entry")) {
+                  throw e;
+                }
+              }
+              return 0;

Review Comment:
   feels like this belongs inside the `catch`, no? (even possibly as an `else` 
of the dupe entry)



##########
gobblin-runtime/src/test/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiterTest.java:
##########
@@ -143,4 +159,128 @@ public void testAcquireLeaseSingleParticipant() throws 
Exception {
     Assert.assertTrue(sixthObtainedStatus.getEventTimestamp()
         <= sixthObtainedStatus.getLeaseAcquisitionTimestamp());
   }
+
+  /*
+     Tests CONDITIONALLY_ACQUIRE_LEASE_IF_NEW_ROW_STATEMENT to ensure an 
insertion is not attempted unless the table
+     state remains the same as the prior read, which expects no row matching 
the primary key in the table
+     Note: this isolates and tests CASE 1 in which another participant could 
have acquired the lease between the time
+     the read was done and subsequent write was carried out
+  */
+  @Test //(dependsOnMethods = "testAcquireLeaseSingleParticipant")
+  public void testConditionallyAcquireLeaseIfNewRow() throws IOException {
+    // Inserting the first time should update 1 row
+    int numRowsUpdated = 
this.mysqlMultiActiveLeaseArbiter.withPreparedStatement(formattedAcquireLeaseNewRowStatement,
+        insertStatement -> {
+          completeInsertPreparedStatement(insertStatement, resumeDagAction);
+          return insertStatement.executeUpdate();
+        }, true);
+    Assert.assertEquals(numRowsUpdated, 1);

Review Comment:
   please explain what's being tested here ITO the class under test.  I would 
expect the test to call one encapsulated method of `MysqlMALeaseArbiter`'s API, 
but here is seems like you're combining bits and pieces of it (like 
`withPreparedStatement`), which arguably should be `private`/`protected`.
   
   generally the public interface is up for testing... maybe with verification 
for a SMALL number of `@VisibleForTesting` methods.
   
   perhaps I'm just not understanding the connection between this test and the 
overall public API...



##########
gobblin-runtime/src/test/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiterTest.java:
##########
@@ -143,4 +159,128 @@ public void testAcquireLeaseSingleParticipant() throws 
Exception {
     Assert.assertTrue(sixthObtainedStatus.getEventTimestamp()
         <= sixthObtainedStatus.getLeaseAcquisitionTimestamp());
   }
+
+  /*
+     Tests CONDITIONALLY_ACQUIRE_LEASE_IF_NEW_ROW_STATEMENT to ensure an 
insertion is not attempted unless the table
+     state remains the same as the prior read, which expects no row matching 
the primary key in the table
+     Note: this isolates and tests CASE 1 in which another participant could 
have acquired the lease between the time
+     the read was done and subsequent write was carried out
+  */
+  @Test //(dependsOnMethods = "testAcquireLeaseSingleParticipant")

Review Comment:
   remove comment?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to