umustafi commented on code in PR #3715:
URL: https://github.com/apache/gobblin/pull/3715#discussion_r1261814592


##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java:
##########
@@ -219,84 +247,90 @@ else if (leaseValidityStatus == 2) {
         // Use our event to acquire lease, check for previous db 
eventTimestamp and leaseAcquisitionTimestamp
         String formattedAcquireLeaseIfMatchingAllStatement =
             
String.format(CONDITIONALLY_ACQUIRE_LEASE_IF_MATCHING_ALL_COLS_STATEMENT, 
this.leaseArbiterTableName);
-        ResultSet rs = withPreparedStatement(
-            formattedAcquireLeaseIfMatchingAllStatement + "; " + 
formattedSelectAfterInsertStatement,
-            updateStatement -> {
-              completeUpdatePreparedStatement(updateStatement, flowAction, 
eventTimeMillis, true,
+        int numRowsUpdated = 
withPreparedStatement(formattedAcquireLeaseIfMatchingAllStatement,
+            insertStatement -> {
+              completeUpdatePreparedStatement(insertStatement, flowAction, 
true,
                   true, dbEventTimestamp, dbLeaseAcquisitionTimestamp);
-              return updateStatement.executeQuery();
+              return insertStatement.executeUpdate();
             }, true);
-        return handleResultFromAttemptedLeaseObtainment(rs, flowAction, 
eventTimeMillis);
+        return evaluateStatusAfterLeaseAttempt(numRowsUpdated, flowAction);
       } // No longer leasing this event
-        // CASE 6: Same event, no longer leasing event in db: terminate
+        // CASE 5: Same event, no longer leasing event in db: terminate
         if (isWithinEpsilon) {
           return new NoLongerLeasingStatus();
         }
-        // CASE 7: Distinct event, no longer leasing event in db
+        // CASE 6: Distinct event, no longer leasing event in db
         // Use our event to acquire lease, check for previous db 
eventTimestamp and NULL leaseAcquisitionTimestamp
         String formattedAcquireLeaseIfFinishedStatement =
             
String.format(CONDITIONALLY_ACQUIRE_LEASE_IF_FINISHED_LEASING_STATEMENT, 
this.leaseArbiterTableName);
-        ResultSet rs = withPreparedStatement(
-            formattedAcquireLeaseIfFinishedStatement + "; " + 
formattedSelectAfterInsertStatement,
-            updateStatement -> {
-              completeUpdatePreparedStatement(updateStatement, flowAction, 
eventTimeMillis, true,
+        int numRowsUpdated = 
withPreparedStatement(formattedAcquireLeaseIfFinishedStatement,
+            insertStatement -> {
+              completeUpdatePreparedStatement(insertStatement, flowAction, 
true,
                   false, dbEventTimestamp, null);
-              return updateStatement.executeQuery();
+              return insertStatement.executeUpdate();
             }, true);
-        return handleResultFromAttemptedLeaseObtainment(rs, flowAction, 
eventTimeMillis);
+        return evaluateStatusAfterLeaseAttempt(numRowsUpdated, flowAction);
     } catch (SQLException e) {
       throw new RuntimeException(e);
     }
   }
 
   /**
-   * Attempt lease by insert or update following a read based on the condition 
the state of the table has not changed
-   * since the read. Parse the result to return the corresponding status based 
on successful insert/update or not.
-   * @param resultSet
-   * @param eventTimeMillis
-   * @return LeaseAttemptStatus
+   * Parse result of attempted insert/update to obtain a lease for a
+   * {@link org.apache.gobblin.runtime.api.DagActionStore.DagAction} event by 
selecting values corresponding to that
+   * event from the table to return the corresponding status based on 
successful insert/update or not.
    * @throws SQLException
    * @throws IOException
    */
-  protected LeaseAttemptStatus 
handleResultFromAttemptedLeaseObtainment(ResultSet resultSet,
-      DagActionStore.DagAction flowAction, long eventTimeMillis)
+  protected LeaseAttemptStatus evaluateStatusAfterLeaseAttempt(int 
numRowsUpdated,
+      DagActionStore.DagAction flowAction)
       throws SQLException, IOException {
-    if (!resultSet.next()) {
-      throw new IOException("Expected num rows and lease_acquisition_timestamp 
returned from query but received nothing");
-    }
-    int numRowsUpdated = resultSet.getInt(1);
-    long leaseAcquisitionTimeMillis = resultSet.getTimestamp(2).getTime();
-    int dbLinger = resultSet.getInt(3);
+    // Fetch values in row after attempted insert
+    String formattedSelectAfterInsertStatement =
+        String.format(SELECT_AFTER_INSERT_STATEMENT, 
this.leaseArbiterTableName, this.constantsTableName);

Review Comment:
   The table name shouldn't change over the instance's lifetime since it would 
need to be re-initialized through the config. For this particular statement I 
defined it globally and also did so for the other SELECT statement that is 
re-used. The CREATE statements aren't re-used so it doesn't make sense to make 
them class variables.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to