umustafi commented on code in PR #3715:
URL: https://github.com/apache/gobblin/pull/3715#discussion_r1267236668


##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java:
##########
@@ -219,84 +258,142 @@ else if (leaseValidityStatus == 2) {
         // Use our event to acquire lease, check for previous db 
eventTimestamp and leaseAcquisitionTimestamp
         String formattedAcquireLeaseIfMatchingAllStatement =
             
String.format(CONDITIONALLY_ACQUIRE_LEASE_IF_MATCHING_ALL_COLS_STATEMENT, 
this.leaseArbiterTableName);
-        ResultSet rs = withPreparedStatement(
-            formattedAcquireLeaseIfMatchingAllStatement + "; " + 
formattedSelectAfterInsertStatement,
-            updateStatement -> {
-              completeUpdatePreparedStatement(updateStatement, flowAction, 
eventTimeMillis, true,
+        int numRowsUpdated = 
withPreparedStatement(formattedAcquireLeaseIfMatchingAllStatement,
+            insertStatement -> {
+              completeUpdatePreparedStatement(insertStatement, flowAction, 
true,
                   true, dbEventTimestamp, dbLeaseAcquisitionTimestamp);
-              return updateStatement.executeQuery();
+              return insertStatement.executeUpdate();
             }, true);
-        return handleResultFromAttemptedLeaseObtainment(rs, flowAction, 
eventTimeMillis);
+        return evaluateStatusAfterLeaseAttempt(numRowsUpdated, flowAction, 
Optional.of(dbCurrentTimestamp));
       } // No longer leasing this event
-        // CASE 6: Same event, no longer leasing event in db: terminate
         if (isWithinEpsilon) {
+          log.debug("tryAcquireLease for [{}, eventTimestamp: {}] - CASE 5: 
Same event, no longer leasing event in db: "
+              + "terminate", flowAction, dbCurrentTimestamp.getTime());
           return new NoLongerLeasingStatus();
         }
-        // CASE 7: Distinct event, no longer leasing event in db
+        log.debug("tryAcquireLease for [{}, eventTimestamp: {}] - CASE 6: 
Distinct event, no longer leasing event in "
+            + "db", flowAction, dbCurrentTimestamp.getTime());
         // Use our event to acquire lease, check for previous db 
eventTimestamp and NULL leaseAcquisitionTimestamp
         String formattedAcquireLeaseIfFinishedStatement =
             
String.format(CONDITIONALLY_ACQUIRE_LEASE_IF_FINISHED_LEASING_STATEMENT, 
this.leaseArbiterTableName);
-        ResultSet rs = withPreparedStatement(
-            formattedAcquireLeaseIfFinishedStatement + "; " + 
formattedSelectAfterInsertStatement,
-            updateStatement -> {
-              completeUpdatePreparedStatement(updateStatement, flowAction, 
eventTimeMillis, true,
+        int numRowsUpdated = 
withPreparedStatement(formattedAcquireLeaseIfFinishedStatement,
+            insertStatement -> {
+              completeUpdatePreparedStatement(insertStatement, flowAction, 
true,
                   false, dbEventTimestamp, null);
-              return updateStatement.executeQuery();
+              return insertStatement.executeUpdate();
             }, true);
-        return handleResultFromAttemptedLeaseObtainment(rs, flowAction, 
eventTimeMillis);
+        return evaluateStatusAfterLeaseAttempt(numRowsUpdated, flowAction, 
Optional.of(dbCurrentTimestamp));
     } catch (SQLException e) {
       throw new RuntimeException(e);
     }
   }
 
+  protected GetEventInfoResult createGetInfoResult(ResultSet resultSet) throws 
IOException {
+    try {
+      // Extract values from result set
+      Timestamp dbEventTimestamp = resultSet.getTimestamp("event_timestamp");
+      Timestamp dbLeaseAcquisitionTimestamp = 
resultSet.getTimestamp("lease_acquisition_timestamp");
+      boolean withinEpsilon = resultSet.getBoolean("is_within_epsilon");
+      int leaseValidityStatus = resultSet.getInt("lease_validity_status");
+      int dbLinger = resultSet.getInt("linger");
+      Timestamp dbCurrentTimestamp = 
resultSet.getTimestamp("CURRENT_TIMESTAMP");
+      return new GetEventInfoResult(dbEventTimestamp, 
dbLeaseAcquisitionTimestamp, withinEpsilon, leaseValidityStatus,
+          dbLinger, dbCurrentTimestamp);
+    } catch (SQLException e) {
+      throw new IOException(e);
+    } finally {
+      if (resultSet != null) {
+        try {
+          resultSet.close();
+        } catch (SQLException e) {
+          throw new IOException(e);
+        }
+      }
+    }
+  }
+
+  protected SelectInfoResult createSelectInfoResult(ResultSet resultSet) 
throws IOException {
+      try {
+        if (!resultSet.next()) {
+          log.error("Expected num rows and lease_acquisition_timestamp 
returned from query but received nothing");

Review Comment:
   Oh good catch, I wan't this code path to terminate so instead will through 
an IO Error. 



##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MysqlMultiActiveLeaseArbiter.java:
##########
@@ -219,84 +258,142 @@ else if (leaseValidityStatus == 2) {
         // Use our event to acquire lease, check for previous db 
eventTimestamp and leaseAcquisitionTimestamp
         String formattedAcquireLeaseIfMatchingAllStatement =
             
String.format(CONDITIONALLY_ACQUIRE_LEASE_IF_MATCHING_ALL_COLS_STATEMENT, 
this.leaseArbiterTableName);
-        ResultSet rs = withPreparedStatement(
-            formattedAcquireLeaseIfMatchingAllStatement + "; " + 
formattedSelectAfterInsertStatement,
-            updateStatement -> {
-              completeUpdatePreparedStatement(updateStatement, flowAction, 
eventTimeMillis, true,
+        int numRowsUpdated = 
withPreparedStatement(formattedAcquireLeaseIfMatchingAllStatement,
+            insertStatement -> {
+              completeUpdatePreparedStatement(insertStatement, flowAction, 
true,
                   true, dbEventTimestamp, dbLeaseAcquisitionTimestamp);
-              return updateStatement.executeQuery();
+              return insertStatement.executeUpdate();
             }, true);
-        return handleResultFromAttemptedLeaseObtainment(rs, flowAction, 
eventTimeMillis);
+        return evaluateStatusAfterLeaseAttempt(numRowsUpdated, flowAction, 
Optional.of(dbCurrentTimestamp));
       } // No longer leasing this event
-        // CASE 6: Same event, no longer leasing event in db: terminate
         if (isWithinEpsilon) {
+          log.debug("tryAcquireLease for [{}, eventTimestamp: {}] - CASE 5: 
Same event, no longer leasing event in db: "
+              + "terminate", flowAction, dbCurrentTimestamp.getTime());
           return new NoLongerLeasingStatus();
         }
-        // CASE 7: Distinct event, no longer leasing event in db
+        log.debug("tryAcquireLease for [{}, eventTimestamp: {}] - CASE 6: 
Distinct event, no longer leasing event in "
+            + "db", flowAction, dbCurrentTimestamp.getTime());
         // Use our event to acquire lease, check for previous db 
eventTimestamp and NULL leaseAcquisitionTimestamp
         String formattedAcquireLeaseIfFinishedStatement =
             
String.format(CONDITIONALLY_ACQUIRE_LEASE_IF_FINISHED_LEASING_STATEMENT, 
this.leaseArbiterTableName);
-        ResultSet rs = withPreparedStatement(
-            formattedAcquireLeaseIfFinishedStatement + "; " + 
formattedSelectAfterInsertStatement,
-            updateStatement -> {
-              completeUpdatePreparedStatement(updateStatement, flowAction, 
eventTimeMillis, true,
+        int numRowsUpdated = 
withPreparedStatement(formattedAcquireLeaseIfFinishedStatement,
+            insertStatement -> {
+              completeUpdatePreparedStatement(insertStatement, flowAction, 
true,
                   false, dbEventTimestamp, null);
-              return updateStatement.executeQuery();
+              return insertStatement.executeUpdate();
             }, true);
-        return handleResultFromAttemptedLeaseObtainment(rs, flowAction, 
eventTimeMillis);
+        return evaluateStatusAfterLeaseAttempt(numRowsUpdated, flowAction, 
Optional.of(dbCurrentTimestamp));
     } catch (SQLException e) {
       throw new RuntimeException(e);
     }
   }
 
+  protected GetEventInfoResult createGetInfoResult(ResultSet resultSet) throws 
IOException {
+    try {
+      // Extract values from result set
+      Timestamp dbEventTimestamp = resultSet.getTimestamp("event_timestamp");
+      Timestamp dbLeaseAcquisitionTimestamp = 
resultSet.getTimestamp("lease_acquisition_timestamp");
+      boolean withinEpsilon = resultSet.getBoolean("is_within_epsilon");
+      int leaseValidityStatus = resultSet.getInt("lease_validity_status");
+      int dbLinger = resultSet.getInt("linger");
+      Timestamp dbCurrentTimestamp = 
resultSet.getTimestamp("CURRENT_TIMESTAMP");
+      return new GetEventInfoResult(dbEventTimestamp, 
dbLeaseAcquisitionTimestamp, withinEpsilon, leaseValidityStatus,
+          dbLinger, dbCurrentTimestamp);
+    } catch (SQLException e) {
+      throw new IOException(e);
+    } finally {
+      if (resultSet != null) {
+        try {
+          resultSet.close();
+        } catch (SQLException e) {
+          throw new IOException(e);
+        }
+      }
+    }
+  }
+
+  protected SelectInfoResult createSelectInfoResult(ResultSet resultSet) 
throws IOException {
+      try {
+        if (!resultSet.next()) {
+          log.error("Expected num rows and lease_acquisition_timestamp 
returned from query but received nothing");

Review Comment:
   Oh good catch, I want this code path to terminate so instead will through an 
IO Error. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to